greentic_runner_host/
watcher.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, anyhow};
6use runner_core::{Index, PackConfig, PackManager};
7use tokio::sync::mpsc;
8use tokio::task;
9
10use crate::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::host::RunnerHost;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runtime::{ActivePacks, TenantRuntime};
17use crate::storage::session::DynSessionStore;
18use crate::storage::state::DynStateStore;
19use crate::wasi::RunnerWasiPolicy;
20
21pub struct PackWatcher {
22 handle: tokio::task::JoinHandle<()>,
23}
24
25impl Drop for PackWatcher {
26 fn drop(&mut self) {
27 self.handle.abort();
28 }
29}
30
31#[derive(Clone)]
32pub struct PackReloadHandle {
33 trigger: mpsc::Sender<()>,
34}
35
36impl PackReloadHandle {
37 pub async fn trigger(&self) -> Result<()> {
38 self.trigger
39 .send(())
40 .await
41 .map_err(|_| anyhow!("pack watcher task stopped"))
42 }
43}
44
45pub async fn start_pack_watcher(
46 host: Arc<RunnerHost>,
47 cfg: PackConfig,
48 refresh: Duration,
49) -> Result<(PackWatcher, PackReloadHandle)> {
50 let cfg_clone = cfg.clone();
51 let manager = task::spawn_blocking(move || PackManager::new(cfg_clone))
52 .await
53 .context("pack manager init task failed")??;
54 let manager = Arc::new(manager);
55 let configs = Arc::new(host.tenant_configs());
56 let active = host.active_packs();
57 let health = host.health_state();
58 let session_host = host.session_host();
59 let session_store = host.session_store();
60 let state_store = host.state_store();
61 let state_host = host.state_host();
62 let wasi_policy = host.wasi_policy();
63
64 reload_once(
65 configs.as_ref(),
66 &manager,
67 &cfg,
68 &active,
69 &health,
70 session_host.clone(),
71 session_store.clone(),
72 state_store.clone(),
73 state_host.clone(),
74 Arc::clone(&wasi_policy),
75 )
76 .await?;
77
78 let (tx, mut rx) = mpsc::channel::<()>(4);
79 let index_cfg = cfg.clone();
80 let manager_clone = Arc::clone(&manager);
81 let health_clone = Arc::clone(&health);
82 let active_clone = Arc::clone(&active);
83 let configs_clone = Arc::clone(&configs);
84 let state_store_clone = Arc::clone(&state_store);
85 let wasi_policy_clone = Arc::clone(&wasi_policy);
86 let handle = tokio::spawn(async move {
87 let mut ticker = tokio::time::interval(refresh);
88 loop {
89 tokio::select! {
90 _ = ticker.tick() => {},
91 recv = rx.recv() => {
92 if recv.is_none() {
93 break;
94 }
95 }
96 }
97 if let Err(err) = reload_once(
98 configs_clone.as_ref(),
99 &manager_clone,
100 &index_cfg,
101 &active_clone,
102 &health_clone,
103 session_host.clone(),
104 session_store.clone(),
105 state_store_clone.clone(),
106 state_host.clone(),
107 Arc::clone(&wasi_policy_clone),
108 )
109 .await
110 {
111 tracing::error!(error = %err, "pack reload failed");
112 health_clone.record_reload_error(&err);
113 }
114 }
115 });
116
117 let watcher = PackWatcher { handle };
118 let handle = PackReloadHandle { trigger: tx };
119 Ok((watcher, handle))
120}
121
122#[allow(clippy::too_many_arguments)]
123async fn reload_once(
124 configs: &HashMap<String, Arc<HostConfig>>,
125 manager: &Arc<PackManager>,
126 cfg: &PackConfig,
127 active: &Arc<ActivePacks>,
128 health: &Arc<HealthState>,
129 session_host: Arc<dyn SessionHost>,
130 session_store: DynSessionStore,
131 state_store: DynStateStore,
132 state_host: Arc<dyn StateHost>,
133 wasi_policy: Arc<RunnerWasiPolicy>,
134) -> Result<()> {
135 let index = Index::load(&cfg.index_location)?;
136 let resolved = manager.resolve_all_for_index(&index)?;
137 let mut next = HashMap::new();
138 for (tenant, record) in resolved.tenants() {
139 let config = configs
140 .get(tenant)
141 .cloned()
142 .with_context(|| format!("no host config registered for tenant {tenant}"))?;
143 let mut packs = Vec::new();
144 let main_runtime = Arc::new(
145 PackRuntime::load(
146 &record.main.path,
147 Arc::clone(&config),
148 None,
149 Some(&record.main.path),
150 Some(Arc::clone(&session_store)),
151 Some(Arc::clone(&state_store)),
152 Arc::clone(&wasi_policy),
153 true,
154 )
155 .await
156 .with_context(|| format!("failed to load pack for tenant {tenant}"))?,
157 );
158 packs.push((main_runtime, Some(record.main.digest.as_str().to_string())));
159
160 for overlay in &record.overlays {
161 let runtime = Arc::new(
162 PackRuntime::load(
163 &overlay.path,
164 Arc::clone(&config),
165 None,
166 Some(&overlay.path),
167 Some(Arc::clone(&session_store)),
168 Some(Arc::clone(&state_store)),
169 Arc::clone(&wasi_policy),
170 true,
171 )
172 .await
173 .with_context(|| {
174 format!(
175 "failed to load overlay {} for tenant {tenant}",
176 overlay.reference.name
177 )
178 })?,
179 );
180 packs.push((runtime, Some(overlay.digest.as_str().to_string())));
181 }
182
183 let runtime = TenantRuntime::from_packs(
184 Arc::clone(&config),
185 packs,
186 None,
187 Arc::clone(&session_host),
188 Arc::clone(&session_store),
189 Arc::clone(&state_store),
190 Arc::clone(&state_host),
191 )
192 .await?;
193 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
194 runtime.register_timers(timers);
195
196 next.insert(tenant.clone(), runtime);
197 }
198 active.replace(next);
199 health.record_reload_success();
200 tracing::info!("pack reload completed successfully");
201 Ok(())
202}