1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::{Context, Result, anyhow};
6use runner_core::{Index, PackConfig, PackManager};
7use tokio::sync::mpsc;
8use tokio::task;
9
10use crate::HostConfig;
11use crate::engine::host::{SessionHost, StateHost};
12use crate::host::RunnerHost;
13use crate::http::health::HealthState;
14use crate::pack::{ComponentResolution, PackRuntime};
15use crate::runner::adapt_timer;
16use crate::runtime::{ActivePacks, TenantRuntime};
17use crate::secrets::DynSecretsManager;
18use crate::storage::session::DynSessionStore;
19use crate::storage::state::DynStateStore;
20use crate::wasi::RunnerWasiPolicy;
21
22pub struct PackWatcher {
23 handle: tokio::task::JoinHandle<()>,
24}
25
26impl Drop for PackWatcher {
27 fn drop(&mut self) {
28 self.handle.abort();
29 }
30}
31
32#[derive(Clone)]
33pub struct PackReloadHandle {
34 trigger: mpsc::Sender<()>,
35}
36
37impl PackReloadHandle {
38 pub async fn trigger(&self) -> Result<()> {
39 self.trigger
40 .send(())
41 .await
42 .map_err(|_| anyhow!("pack watcher task stopped"))
43 }
44}
45
46pub async fn start_pack_watcher(
47 host: Arc<RunnerHost>,
48 cfg: PackConfig,
49 refresh: Duration,
50) -> Result<(PackWatcher, PackReloadHandle)> {
51 let cfg_clone = cfg.clone();
52 let manager = task::spawn_blocking(move || PackManager::new(cfg_clone))
53 .await
54 .context("pack manager init task failed")??;
55 let manager = Arc::new(manager);
56 let configs = Arc::new(host.tenant_configs());
57 let active = host.active_packs();
58 let health = host.health_state();
59 let session_host = host.session_host();
60 let session_store = host.session_store();
61 let state_store = host.state_store();
62 let state_host = host.state_host();
63 let wasi_policy = host.wasi_policy();
64 let secrets_manager = host.secrets_manager();
65
66 reload_once(
67 configs.as_ref(),
68 &manager,
69 &cfg,
70 &active,
71 &health,
72 session_host.clone(),
73 session_store.clone(),
74 state_store.clone(),
75 state_host.clone(),
76 Arc::clone(&wasi_policy),
77 secrets_manager.clone(),
78 )
79 .await?;
80
81 let (tx, mut rx) = mpsc::channel::<()>(4);
82 let index_cfg = cfg.clone();
83 let manager_clone = Arc::clone(&manager);
84 let health_clone = Arc::clone(&health);
85 let active_clone = Arc::clone(&active);
86 let configs_clone = Arc::clone(&configs);
87 let state_store_clone = Arc::clone(&state_store);
88 let wasi_policy_clone = Arc::clone(&wasi_policy);
89 let secrets_manager_clone = secrets_manager.clone();
90 let handle = tokio::spawn(async move {
91 let mut ticker = tokio::time::interval(refresh);
92 loop {
93 tokio::select! {
94 _ = ticker.tick() => {},
95 recv = rx.recv() => {
96 if recv.is_none() {
97 break;
98 }
99 }
100 }
101 if let Err(err) = reload_once(
102 configs_clone.as_ref(),
103 &manager_clone,
104 &index_cfg,
105 &active_clone,
106 &health_clone,
107 session_host.clone(),
108 session_store.clone(),
109 state_store_clone.clone(),
110 state_host.clone(),
111 Arc::clone(&wasi_policy_clone),
112 secrets_manager_clone.clone(),
113 )
114 .await
115 {
116 tracing::error!(error = %err, "pack reload failed");
117 health_clone.record_reload_error(&err);
118 }
119 }
120 });
121
122 let watcher = PackWatcher { handle };
123 let handle = PackReloadHandle { trigger: tx };
124 Ok((watcher, handle))
125}
126
127#[allow(clippy::too_many_arguments)]
128async fn reload_once(
129 configs: &HashMap<String, Arc<HostConfig>>,
130 manager: &Arc<PackManager>,
131 cfg: &PackConfig,
132 active: &Arc<ActivePacks>,
133 health: &Arc<HealthState>,
134 session_host: Arc<dyn SessionHost>,
135 session_store: DynSessionStore,
136 state_store: DynStateStore,
137 state_host: Arc<dyn StateHost>,
138 wasi_policy: Arc<RunnerWasiPolicy>,
139 secrets_manager: DynSecretsManager,
140) -> Result<()> {
141 let index = Index::load(&cfg.index_location)?;
142 let resolved = manager.resolve_all_for_index(&index)?;
143 let mut next = HashMap::new();
144 for (tenant, record) in resolved.tenants() {
145 let config = configs
146 .get(tenant)
147 .cloned()
148 .with_context(|| format!("no host config registered for tenant {tenant}"))?;
149 let oauth_config = config.oauth_broker_config();
150 let mut packs = Vec::new();
151 let main_runtime = Arc::new(
152 PackRuntime::load(
153 &record.main.path,
154 Arc::clone(&config),
155 None,
156 Some(&record.main.path),
157 Some(Arc::clone(&session_store)),
158 Some(Arc::clone(&state_store)),
159 Arc::clone(&wasi_policy),
160 Arc::clone(&secrets_manager),
161 oauth_config.clone(),
162 true,
163 ComponentResolution::default(),
164 )
165 .await
166 .with_context(|| format!("failed to load pack for tenant {tenant}"))?,
167 );
168 packs.push((main_runtime, Some(record.main.digest.as_str().to_string())));
169
170 for overlay in &record.overlays {
171 let runtime = Arc::new(
172 PackRuntime::load(
173 &overlay.path,
174 Arc::clone(&config),
175 None,
176 Some(&overlay.path),
177 Some(Arc::clone(&session_store)),
178 Some(Arc::clone(&state_store)),
179 Arc::clone(&wasi_policy),
180 Arc::clone(&secrets_manager),
181 oauth_config.clone(),
182 true,
183 ComponentResolution::default(),
184 )
185 .await
186 .with_context(|| {
187 format!(
188 "failed to load overlay {} for tenant {tenant}",
189 overlay.reference.name
190 )
191 })?,
192 );
193 packs.push((runtime, Some(overlay.digest.as_str().to_string())));
194 }
195
196 let runtime = TenantRuntime::from_packs(
197 Arc::clone(&config),
198 packs,
199 None,
200 Arc::clone(&session_host),
201 Arc::clone(&session_store),
202 Arc::clone(&state_store),
203 Arc::clone(&state_host),
204 Arc::clone(&secrets_manager),
205 )
206 .await?;
207 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
208 runtime.register_timers(timers);
209
210 next.insert(tenant.clone(), runtime);
211 }
212 active.replace(next);
213 health.record_reload_success();
214 tracing::info!("pack reload completed successfully");
215 Ok(())
216}