greentic_runner_host/
watcher.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, anyhow};
6use runner_core::{Index, PackConfig, PackManager};
7use tokio::sync::mpsc;
8use tokio::task;
9
10use crate::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::host::RunnerHost;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runtime::{ActivePacks, TenantRuntime};
17use crate::storage::session::DynSessionStore;
18use crate::storage::state::DynStateStore;
19use crate::wasi::RunnerWasiPolicy;
20
21pub struct PackWatcher {
22    handle: tokio::task::JoinHandle<()>,
23}
24
25impl Drop for PackWatcher {
26    fn drop(&mut self) {
27        self.handle.abort();
28    }
29}
30
31#[derive(Clone)]
32pub struct PackReloadHandle {
33    trigger: mpsc::Sender<()>,
34}
35
36impl PackReloadHandle {
37    pub async fn trigger(&self) -> Result<()> {
38        self.trigger
39            .send(())
40            .await
41            .map_err(|_| anyhow!("pack watcher task stopped"))
42    }
43}
44
45pub async fn start_pack_watcher(
46    host: Arc<RunnerHost>,
47    cfg: PackConfig,
48    refresh: Duration,
49) -> Result<(PackWatcher, PackReloadHandle)> {
50    let cfg_clone = cfg.clone();
51    let manager = task::spawn_blocking(move || PackManager::new(cfg_clone))
52        .await
53        .context("pack manager init task failed")??;
54    let manager = Arc::new(manager);
55    let configs = Arc::new(host.tenant_configs());
56    let active = host.active_packs();
57    let health = host.health_state();
58    let session_host = host.session_host();
59    let session_store = host.session_store();
60    let state_store = host.state_store();
61    let state_host = host.state_host();
62    let wasi_policy = host.wasi_policy();
63
64    reload_once(
65        configs.as_ref(),
66        &manager,
67        &cfg,
68        &active,
69        &health,
70        session_host.clone(),
71        session_store.clone(),
72        state_store.clone(),
73        state_host.clone(),
74        Arc::clone(&wasi_policy),
75    )
76    .await?;
77
78    let (tx, mut rx) = mpsc::channel::<()>(4);
79    let index_cfg = cfg.clone();
80    let manager_clone = Arc::clone(&manager);
81    let health_clone = Arc::clone(&health);
82    let active_clone = Arc::clone(&active);
83    let configs_clone = Arc::clone(&configs);
84    let state_store_clone = Arc::clone(&state_store);
85    let wasi_policy_clone = Arc::clone(&wasi_policy);
86    let handle = tokio::spawn(async move {
87        let mut ticker = tokio::time::interval(refresh);
88        loop {
89            tokio::select! {
90                _ = ticker.tick() => {},
91                recv = rx.recv() => {
92                    if recv.is_none() {
93                        break;
94                    }
95                }
96            }
97            if let Err(err) = reload_once(
98                configs_clone.as_ref(),
99                &manager_clone,
100                &index_cfg,
101                &active_clone,
102                &health_clone,
103                session_host.clone(),
104                session_store.clone(),
105                state_store_clone.clone(),
106                state_host.clone(),
107                Arc::clone(&wasi_policy_clone),
108            )
109            .await
110            {
111                tracing::error!(error = %err, "pack reload failed");
112                health_clone.record_reload_error(&err);
113            }
114        }
115    });
116
117    let watcher = PackWatcher { handle };
118    let handle = PackReloadHandle { trigger: tx };
119    Ok((watcher, handle))
120}
121
122#[allow(clippy::too_many_arguments)]
123async fn reload_once(
124    configs: &HashMap<String, Arc<HostConfig>>,
125    manager: &Arc<PackManager>,
126    cfg: &PackConfig,
127    active: &Arc<ActivePacks>,
128    health: &Arc<HealthState>,
129    session_host: Arc<dyn SessionHost>,
130    session_store: DynSessionStore,
131    state_store: DynStateStore,
132    state_host: Arc<dyn StateHost>,
133    wasi_policy: Arc<RunnerWasiPolicy>,
134) -> Result<()> {
135    let index = Index::load(&cfg.index_location)?;
136    let resolved = manager.resolve_all_for_index(&index)?;
137    let mut next = HashMap::new();
138    for (tenant, record) in resolved.tenants() {
139        let config = configs
140            .get(tenant)
141            .cloned()
142            .with_context(|| format!("no host config registered for tenant {tenant}"))?;
143        let mut packs = Vec::new();
144        let main_runtime = Arc::new(
145            PackRuntime::load(
146                &record.main.path,
147                Arc::clone(&config),
148                None,
149                Some(&record.main.path),
150                Some(Arc::clone(&session_store)),
151                Some(Arc::clone(&state_store)),
152                Arc::clone(&wasi_policy),
153                true,
154            )
155            .await
156            .with_context(|| format!("failed to load pack for tenant {tenant}"))?,
157        );
158        packs.push((main_runtime, Some(record.main.digest.as_str().to_string())));
159
160        for overlay in &record.overlays {
161            let runtime = Arc::new(
162                PackRuntime::load(
163                    &overlay.path,
164                    Arc::clone(&config),
165                    None,
166                    Some(&overlay.path),
167                    Some(Arc::clone(&session_store)),
168                    Some(Arc::clone(&state_store)),
169                    Arc::clone(&wasi_policy),
170                    true,
171                )
172                .await
173                .with_context(|| {
174                    format!(
175                        "failed to load overlay {} for tenant {tenant}",
176                        overlay.reference.name
177                    )
178                })?,
179            );
180            packs.push((runtime, Some(overlay.digest.as_str().to_string())));
181        }
182
183        let runtime = TenantRuntime::from_packs(
184            Arc::clone(&config),
185            packs,
186            None,
187            Arc::clone(&session_host),
188            Arc::clone(&session_store),
189            Arc::clone(&state_store),
190            Arc::clone(&state_host),
191        )
192        .await?;
193        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
194        runtime.register_timers(timers);
195
196        next.insert(tenant.clone(), runtime);
197    }
198    active.replace(next);
199    health.record_reload_success();
200    tracing::info!("pack reload completed successfully");
201    Ok(())
202}