greentic_runner_host/
watcher.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, anyhow};
6use runner_core::{Index, PackConfig, PackManager};
7use tokio::sync::mpsc;
8use tokio::task;
9
10use crate::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::host::RunnerHost;
13use crate::http::health::HealthState;
14use crate::pack::PackRuntime;
15use crate::runner::adapt_timer;
16use crate::runtime::{ActivePacks, TenantRuntime};
17use crate::secrets::DynSecretsManager;
18use crate::storage::session::DynSessionStore;
19use crate::storage::state::DynStateStore;
20use crate::wasi::RunnerWasiPolicy;
21
22pub struct PackWatcher {
23 handle: tokio::task::JoinHandle<()>,
24}
25
26impl Drop for PackWatcher {
27 fn drop(&mut self) {
28 self.handle.abort();
29 }
30}
31
32#[derive(Clone)]
33pub struct PackReloadHandle {
34 trigger: mpsc::Sender<()>,
35}
36
37impl PackReloadHandle {
38 pub async fn trigger(&self) -> Result<()> {
39 self.trigger
40 .send(())
41 .await
42 .map_err(|_| anyhow!("pack watcher task stopped"))
43 }
44}
45
46pub async fn start_pack_watcher(
47 host: Arc<RunnerHost>,
48 cfg: PackConfig,
49 refresh: Duration,
50) -> Result<(PackWatcher, PackReloadHandle)> {
51 let cfg_clone = cfg.clone();
52 let manager = task::spawn_blocking(move || PackManager::new(cfg_clone))
53 .await
54 .context("pack manager init task failed")??;
55 let manager = Arc::new(manager);
56 let configs = Arc::new(host.tenant_configs());
57 let active = host.active_packs();
58 let health = host.health_state();
59 let session_host = host.session_host();
60 let session_store = host.session_store();
61 let state_store = host.state_store();
62 let state_host = host.state_host();
63 let wasi_policy = host.wasi_policy();
64 let secrets_manager = host.secrets_manager();
65
66 reload_once(
67 configs.as_ref(),
68 &manager,
69 &cfg,
70 &active,
71 &health,
72 session_host.clone(),
73 session_store.clone(),
74 state_store.clone(),
75 state_host.clone(),
76 Arc::clone(&wasi_policy),
77 secrets_manager.clone(),
78 )
79 .await?;
80
81 let (tx, mut rx) = mpsc::channel::<()>(4);
82 let index_cfg = cfg.clone();
83 let manager_clone = Arc::clone(&manager);
84 let health_clone = Arc::clone(&health);
85 let active_clone = Arc::clone(&active);
86 let configs_clone = Arc::clone(&configs);
87 let state_store_clone = Arc::clone(&state_store);
88 let wasi_policy_clone = Arc::clone(&wasi_policy);
89 let secrets_manager_clone = secrets_manager.clone();
90 let handle = tokio::spawn(async move {
91 let mut ticker = tokio::time::interval(refresh);
92 loop {
93 tokio::select! {
94 _ = ticker.tick() => {},
95 recv = rx.recv() => {
96 if recv.is_none() {
97 break;
98 }
99 }
100 }
101 if let Err(err) = reload_once(
102 configs_clone.as_ref(),
103 &manager_clone,
104 &index_cfg,
105 &active_clone,
106 &health_clone,
107 session_host.clone(),
108 session_store.clone(),
109 state_store_clone.clone(),
110 state_host.clone(),
111 Arc::clone(&wasi_policy_clone),
112 secrets_manager_clone.clone(),
113 )
114 .await
115 {
116 tracing::error!(error = %err, "pack reload failed");
117 health_clone.record_reload_error(&err);
118 }
119 }
120 });
121
122 let watcher = PackWatcher { handle };
123 let handle = PackReloadHandle { trigger: tx };
124 Ok((watcher, handle))
125}
126
127#[allow(clippy::too_many_arguments)]
128async fn reload_once(
129 configs: &HashMap<String, Arc<HostConfig>>,
130 manager: &Arc<PackManager>,
131 cfg: &PackConfig,
132 active: &Arc<ActivePacks>,
133 health: &Arc<HealthState>,
134 session_host: Arc<dyn SessionHost>,
135 session_store: DynSessionStore,
136 state_store: DynStateStore,
137 state_host: Arc<dyn StateHost>,
138 wasi_policy: Arc<RunnerWasiPolicy>,
139 secrets_manager: DynSecretsManager,
140) -> Result<()> {
141 let index = Index::load(&cfg.index_location)?;
142 let resolved = manager.resolve_all_for_index(&index)?;
143 let mut next = HashMap::new();
144 for (tenant, record) in resolved.tenants() {
145 let config = configs
146 .get(tenant)
147 .cloned()
148 .with_context(|| format!("no host config registered for tenant {tenant}"))?;
149 let mut packs = Vec::new();
150 let main_runtime = Arc::new(
151 PackRuntime::load(
152 &record.main.path,
153 Arc::clone(&config),
154 None,
155 Some(&record.main.path),
156 Some(Arc::clone(&session_store)),
157 Some(Arc::clone(&state_store)),
158 Arc::clone(&wasi_policy),
159 Arc::clone(&secrets_manager),
160 true,
161 )
162 .await
163 .with_context(|| format!("failed to load pack for tenant {tenant}"))?,
164 );
165 packs.push((main_runtime, Some(record.main.digest.as_str().to_string())));
166
167 for overlay in &record.overlays {
168 let runtime = Arc::new(
169 PackRuntime::load(
170 &overlay.path,
171 Arc::clone(&config),
172 None,
173 Some(&overlay.path),
174 Some(Arc::clone(&session_store)),
175 Some(Arc::clone(&state_store)),
176 Arc::clone(&wasi_policy),
177 Arc::clone(&secrets_manager),
178 true,
179 )
180 .await
181 .with_context(|| {
182 format!(
183 "failed to load overlay {} for tenant {tenant}",
184 overlay.reference.name
185 )
186 })?,
187 );
188 packs.push((runtime, Some(overlay.digest.as_str().to_string())));
189 }
190
191 let runtime = TenantRuntime::from_packs(
192 Arc::clone(&config),
193 packs,
194 None,
195 Arc::clone(&session_host),
196 Arc::clone(&session_store),
197 Arc::clone(&state_store),
198 Arc::clone(&state_host),
199 Arc::clone(&secrets_manager),
200 )
201 .await?;
202 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
203 runtime.register_timers(timers);
204
205 next.insert(tenant.clone(), runtime);
206 }
207 active.replace(next);
208 health.record_reload_success();
209 tracing::info!("pack reload completed successfully");
210 Ok(())
211}