greentic_runner_host/
watcher.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, anyhow};
6use runner_core::{Index, PackConfig, PackManager};
7use tokio::sync::mpsc;
8use tokio::task;
9
10use crate::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::host::RunnerHost;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runtime::{ActivePacks, TenantRuntime};
17use crate::secrets::DynSecretsManager;
18use crate::storage::session::DynSessionStore;
19use crate::storage::state::DynStateStore;
20use crate::wasi::RunnerWasiPolicy;
21
22pub struct PackWatcher {
23    handle: tokio::task::JoinHandle<()>,
24}
25
26impl Drop for PackWatcher {
27    fn drop(&mut self) {
28        self.handle.abort();
29    }
30}
31
32#[derive(Clone)]
33pub struct PackReloadHandle {
34    trigger: mpsc::Sender<()>,
35}
36
37impl PackReloadHandle {
38    pub async fn trigger(&self) -> Result<()> {
39        self.trigger
40            .send(())
41            .await
42            .map_err(|_| anyhow!("pack watcher task stopped"))
43    }
44}
45
46pub async fn start_pack_watcher(
47    host: Arc<RunnerHost>,
48    cfg: PackConfig,
49    refresh: Duration,
50) -> Result<(PackWatcher, PackReloadHandle)> {
51    let cfg_clone = cfg.clone();
52    let manager = task::spawn_blocking(move || PackManager::new(cfg_clone))
53        .await
54        .context("pack manager init task failed")??;
55    let manager = Arc::new(manager);
56    let configs = Arc::new(host.tenant_configs());
57    let active = host.active_packs();
58    let health = host.health_state();
59    let session_host = host.session_host();
60    let session_store = host.session_store();
61    let state_store = host.state_store();
62    let state_host = host.state_host();
63    let wasi_policy = host.wasi_policy();
64    let secrets_manager = host.secrets_manager();
65
66    reload_once(
67        configs.as_ref(),
68        &manager,
69        &cfg,
70        &active,
71        &health,
72        session_host.clone(),
73        session_store.clone(),
74        state_store.clone(),
75        state_host.clone(),
76        Arc::clone(&wasi_policy),
77        secrets_manager.clone(),
78    )
79    .await?;
80
81    let (tx, mut rx) = mpsc::channel::<()>(4);
82    let index_cfg = cfg.clone();
83    let manager_clone = Arc::clone(&manager);
84    let health_clone = Arc::clone(&health);
85    let active_clone = Arc::clone(&active);
86    let configs_clone = Arc::clone(&configs);
87    let state_store_clone = Arc::clone(&state_store);
88    let wasi_policy_clone = Arc::clone(&wasi_policy);
89    let secrets_manager_clone = secrets_manager.clone();
90    let handle = tokio::spawn(async move {
91        let mut ticker = tokio::time::interval(refresh);
92        loop {
93            tokio::select! {
94                _ = ticker.tick() => {},
95                recv = rx.recv() => {
96                    if recv.is_none() {
97                        break;
98                    }
99                }
100            }
101            if let Err(err) = reload_once(
102                configs_clone.as_ref(),
103                &manager_clone,
104                &index_cfg,
105                &active_clone,
106                &health_clone,
107                session_host.clone(),
108                session_store.clone(),
109                state_store_clone.clone(),
110                state_host.clone(),
111                Arc::clone(&wasi_policy_clone),
112                secrets_manager_clone.clone(),
113            )
114            .await
115            {
116                tracing::error!(error = %err, "pack reload failed");
117                health_clone.record_reload_error(&err);
118            }
119        }
120    });
121
122    let watcher = PackWatcher { handle };
123    let handle = PackReloadHandle { trigger: tx };
124    Ok((watcher, handle))
125}
126
127#[allow(clippy::too_many_arguments)]
128async fn reload_once(
129    configs: &HashMap<String, Arc<HostConfig>>,
130    manager: &Arc<PackManager>,
131    cfg: &PackConfig,
132    active: &Arc<ActivePacks>,
133    health: &Arc<HealthState>,
134    session_host: Arc<dyn SessionHost>,
135    session_store: DynSessionStore,
136    state_store: DynStateStore,
137    state_host: Arc<dyn StateHost>,
138    wasi_policy: Arc<RunnerWasiPolicy>,
139    secrets_manager: DynSecretsManager,
140) -> Result<()> {
141    let index = Index::load(&cfg.index_location)?;
142    let resolved = manager.resolve_all_for_index(&index)?;
143    let mut next = HashMap::new();
144    for (tenant, record) in resolved.tenants() {
145        let config = configs
146            .get(tenant)
147            .cloned()
148            .with_context(|| format!("no host config registered for tenant {tenant}"))?;
149        let oauth_config = config.oauth_broker_config();
150        let mut packs = Vec::new();
151        let main_runtime = Arc::new(
152            PackRuntime::load(
153                &record.main.path,
154                Arc::clone(&config),
155                None,
156                Some(&record.main.path),
157                Some(Arc::clone(&session_store)),
158                Some(Arc::clone(&state_store)),
159                Arc::clone(&wasi_policy),
160                Arc::clone(&secrets_manager),
161                oauth_config.clone(),
162                true,
163            )
164            .await
165            .with_context(|| format!("failed to load pack for tenant {tenant}"))?,
166        );
167        packs.push((main_runtime, Some(record.main.digest.as_str().to_string())));
168
169        for overlay in &record.overlays {
170            let runtime = Arc::new(
171                PackRuntime::load(
172                    &overlay.path,
173                    Arc::clone(&config),
174                    None,
175                    Some(&overlay.path),
176                    Some(Arc::clone(&session_store)),
177                    Some(Arc::clone(&state_store)),
178                    Arc::clone(&wasi_policy),
179                    Arc::clone(&secrets_manager),
180                    oauth_config.clone(),
181                    true,
182                )
183                .await
184                .with_context(|| {
185                    format!(
186                        "failed to load overlay {} for tenant {tenant}",
187                        overlay.reference.name
188                    )
189                })?,
190            );
191            packs.push((runtime, Some(overlay.digest.as_str().to_string())));
192        }
193
194        let runtime = TenantRuntime::from_packs(
195            Arc::clone(&config),
196            packs,
197            None,
198            Arc::clone(&session_host),
199            Arc::clone(&session_store),
200            Arc::clone(&state_store),
201            Arc::clone(&state_host),
202            Arc::clone(&secrets_manager),
203        )
204        .await?;
205        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
206        runtime.register_timers(timers);
207
208        next.insert(tenant.clone(), runtime);
209    }
210    active.replace(next);
211    health.record_reload_success();
212    tracing::info!("pack reload completed successfully");
213    Ok(())
214}