1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
25use crate::runner::engine::FlowEngine;
26use crate::runner::mocks::MockLayer;
27use crate::secrets::{DynSecretsManager, read_secret_blocking};
28use crate::storage::session::DynSessionStore;
29use crate::storage::state::DynStateStore;
30use crate::trace::PackTraceInfo;
31use crate::wasi::RunnerWasiPolicy;
32use greentic_types::SecretRequirement;
33
34const TELEGRAM_CACHE_CAPACITY: usize = 1024;
35const WEBHOOK_CACHE_CAPACITY: usize = 256;
36const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
37
38pub struct ActivePacks {
40 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
41}
42
43impl ActivePacks {
44 pub fn new() -> Self {
45 Self {
46 inner: ArcSwap::from_pointee(HashMap::new()),
47 }
48 }
49
50 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
51 self.inner.load().get(tenant).cloned()
52 }
53
54 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
55 self.inner.load_full()
56 }
57
58 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
59 self.inner.store(Arc::new(next));
60 }
61
62 pub fn len(&self) -> usize {
63 self.inner.load().len()
64 }
65
66 pub fn is_empty(&self) -> bool {
67 self.len() == 0
68 }
69}
70
71impl Default for ActivePacks {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77pub struct TenantRuntime {
79 tenant: String,
80 config: Arc<HostConfig>,
81 packs: Vec<Arc<PackRuntime>>,
82 digests: Vec<Option<String>>,
83 engine: Arc<FlowEngine>,
84 state_machine: Arc<StateMachineRuntime>,
85 http_client: Client,
86 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
87 webhook_cache: Mutex<LruCache<String, Value>>,
88 messaging_rate: Mutex<RateLimiter>,
89 mocks: Option<Arc<MockLayer>>,
90 timer_handles: Mutex<Vec<JoinHandle<()>>>,
91 secrets: DynSecretsManager,
92 operator_registry: OperatorRegistry,
93 operator_metrics: Arc<OperatorMetrics>,
94 contract_cache: ContractCache,
95}
96
97#[derive(Clone)]
98pub struct ResolvedComponent {
99 pub digest: String,
100 pub component_ref: String,
101 pub pack: Arc<PackRuntime>,
102}
103
104pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
106 if let Ok(handle) = Handle::try_current() {
107 handle.block_on(future)
108 } else {
109 Runtime::new()
110 .expect("failed to create tokio runtime")
111 .block_on(future)
112 }
113}
114
115impl TenantRuntime {
116 #[allow(clippy::too_many_arguments)]
117 pub async fn load(
118 pack_path: &Path,
119 config: Arc<HostConfig>,
120 mocks: Option<Arc<MockLayer>>,
121 archive_source: Option<&Path>,
122 digest: Option<String>,
123 wasi_policy: Arc<RunnerWasiPolicy>,
124 session_host: Arc<dyn SessionHost>,
125 session_store: DynSessionStore,
126 state_store: DynStateStore,
127 state_host: Arc<dyn StateHost>,
128 secrets_manager: DynSecretsManager,
129 ) -> Result<Arc<Self>> {
130 let oauth_config = config.oauth_broker_config();
131 let pack = Arc::new(
132 PackRuntime::load(
133 pack_path,
134 Arc::clone(&config),
135 mocks.clone(),
136 archive_source,
137 Some(Arc::clone(&session_store)),
138 Some(Arc::clone(&state_store)),
139 Arc::clone(&wasi_policy),
140 Arc::clone(&secrets_manager),
141 oauth_config.clone(),
142 true,
143 ComponentResolution::default(),
144 )
145 .await
146 .with_context(|| {
147 format!(
148 "failed to load pack {} for tenant {}",
149 pack_path.display(),
150 config.tenant
151 )
152 })?,
153 );
154 Self::from_packs(
155 config,
156 vec![(pack, digest)],
157 mocks,
158 session_host,
159 session_store,
160 state_store,
161 state_host,
162 secrets_manager,
163 )
164 .await
165 }
166
167 #[allow(clippy::too_many_arguments)]
168 pub async fn from_packs(
169 config: Arc<HostConfig>,
170 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
171 mocks: Option<Arc<MockLayer>>,
172 session_host: Arc<dyn SessionHost>,
173 session_store: DynSessionStore,
174 _state_store: DynStateStore,
175 state_host: Arc<dyn StateHost>,
176 secrets_manager: DynSecretsManager,
177 ) -> Result<Arc<Self>> {
178 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
179 .expect("telegram cache capacity must be > 0");
180 let webhook_capacity =
181 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
182 let operator_registry = OperatorRegistry::build(&packs)?;
183 let operator_metrics = Arc::new(OperatorMetrics::default());
184 let pack_runtimes = packs
185 .iter()
186 .map(|(pack, _)| Arc::clone(pack))
187 .collect::<Vec<_>>();
188 let digests = packs
189 .iter()
190 .map(|(_, digest)| digest.clone())
191 .collect::<Vec<_>>();
192 let mut pack_trace = HashMap::new();
193 for (pack, digest) in &packs {
194 let pack_id = pack.metadata().pack_id.clone();
195 let pack_ref = config
196 .pack_bindings
197 .iter()
198 .find(|binding| binding.pack_id == pack_id)
199 .map(|binding| binding.pack_ref.clone())
200 .unwrap_or_else(|| pack_id.clone());
201 pack_trace.insert(
202 pack_id,
203 PackTraceInfo {
204 pack_ref,
205 resolved_digest: digest.clone(),
206 },
207 );
208 }
209 let engine = Arc::new(
210 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
211 .await
212 .context("failed to prime flow engine")?,
213 );
214 let state_machine = Arc::new(
215 StateMachineRuntime::from_flow_engine(
216 Arc::clone(&config),
217 Arc::clone(&engine),
218 pack_trace,
219 session_host,
220 session_store,
221 state_host,
222 Arc::clone(&secrets_manager),
223 mocks.clone(),
224 )
225 .context("failed to initialise state machine runtime")?,
226 );
227 let http_client = Client::builder().build()?;
228 let rate_limits = config.rate_limits.clone();
229 Ok(Arc::new(Self {
230 tenant: config.tenant.clone(),
231 config,
232 packs: pack_runtimes,
233 digests,
234 engine,
235 state_machine,
236 http_client,
237 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
238 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
239 messaging_rate: Mutex::new(RateLimiter::new(
240 rate_limits.messaging_send_qps,
241 rate_limits.messaging_burst,
242 )),
243 mocks,
244 timer_handles: Mutex::new(Vec::new()),
245 secrets: secrets_manager,
246 operator_registry,
247 operator_metrics,
248 contract_cache: ContractCache::from_env(),
249 }))
250 }
251
252 pub fn tenant(&self) -> &str {
253 &self.tenant
254 }
255
256 pub fn config(&self) -> &Arc<HostConfig> {
257 &self.config
258 }
259
260 pub fn operator_registry(&self) -> &OperatorRegistry {
261 &self.operator_registry
262 }
263
264 pub fn operator_metrics(&self) -> &OperatorMetrics {
265 &self.operator_metrics
266 }
267
268 pub fn contract_cache(&self) -> &ContractCache {
269 &self.contract_cache
270 }
271
272 pub fn contract_cache_stats(&self) -> ContractCacheStats {
273 self.contract_cache.stats()
274 }
275
276 pub fn main_pack(&self) -> &Arc<PackRuntime> {
277 self.packs
278 .first()
279 .expect("tenant runtime must contain at least one pack")
280 }
281
282 pub fn pack(&self) -> Arc<PackRuntime> {
283 Arc::clone(self.main_pack())
284 }
285
286 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
287 self.packs.iter().skip(1).cloned().collect()
288 }
289
290 pub fn engine(&self) -> &Arc<FlowEngine> {
291 &self.engine
292 }
293
294 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
295 &self.state_machine
296 }
297
298 pub fn http_client(&self) -> &Client {
299 &self.http_client
300 }
301
302 pub fn digest(&self) -> Option<&str> {
303 self.digests.first().and_then(|d| d.as_deref())
304 }
305
306 pub fn overlay_digests(&self) -> Vec<Option<String>> {
307 self.digests.iter().skip(1).cloned().collect()
308 }
309
310 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
311 self.packs
312 .iter()
313 .flat_map(|pack| pack.required_secrets().iter().cloned())
314 .collect()
315 }
316
317 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
318 self.packs
319 .iter()
320 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
321 .collect()
322 }
323
324 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
325 &self.telegram_cache
326 }
327
328 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
329 &self.webhook_cache
330 }
331
332 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
333 &self.messaging_rate
334 }
335
336 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
337 self.mocks.as_ref()
338 }
339
340 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
341 self.timer_handles.lock().extend(handles);
342 }
343
344 pub fn get_secret(&self, key: &str) -> Result<String> {
345 if crate::provider_core_only::is_enabled() {
346 bail!(crate::provider_core_only::blocked_message("secrets"))
347 }
348 if !self.config.secrets_policy.is_allowed(key) {
349 bail!("secret {key} is not permitted by bindings policy");
350 }
351 let ctx = self.config.tenant_ctx();
352 let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
353 .context("failed to read secret from manager")?;
354 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
355 Ok(value)
356 }
357
358 pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
359 self.packs
360 .iter()
361 .find(|pack| pack.contains_component(component_ref))
362 .cloned()
363 }
364
365 pub fn pack_for_component_with_digest(
366 &self,
367 component_ref: &str,
368 ) -> Option<(Arc<PackRuntime>, Option<String>)> {
369 self.packs
370 .iter()
371 .zip(self.digests.iter())
372 .find(|(pack, _)| pack.contains_component(component_ref))
373 .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
374 }
375
376 pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
377 self.pack_for_component_with_digest(component_ref)
378 .map(|(pack, digest)| ResolvedComponent {
379 digest: digest
380 .or_else(|| self.digest().map(ToString::to_string))
381 .unwrap_or_else(|| "unknown".to_string()),
382 component_ref: component_ref.to_string(),
383 pack,
384 })
385 }
386}
387
388impl Drop for TenantRuntime {
389 fn drop(&mut self) {
390 for handle in self.timer_handles.lock().drain(..) {
391 handle.abort();
392 }
393 }
394}
395
396pub struct RateLimiter {
397 allowance: f64,
398 rate: f64,
399 burst: f64,
400 last_check: Instant,
401}
402
403impl RateLimiter {
404 pub fn new(qps: u32, burst: u32) -> Self {
405 let rate = qps.max(1) as f64;
406 let burst = burst.max(1) as f64;
407 Self {
408 allowance: burst,
409 rate,
410 burst,
411 last_check: Instant::now(),
412 }
413 }
414
415 pub fn try_acquire(&mut self) -> bool {
416 let now = Instant::now();
417 let elapsed = now.duration_since(self.last_check).as_secs_f64();
418 self.last_check = now;
419 self.allowance += elapsed * self.rate;
420 if self.allowance > self.burst {
421 self.allowance = self.burst;
422 }
423 if self.allowance < 1.0 {
424 false
425 } else {
426 self.allowance -= 1.0;
427 true
428 }
429 }
430}