1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, anyhow, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::oauth::{OAuthBrokerConfig, request_resource_token};
22use crate::operator_metrics::OperatorMetrics;
23use crate::operator_registry::OperatorRegistry;
24use crate::pack::{ComponentResolution, PackRuntime};
25use crate::runner::adapt_events_email::{
26 EmailExecutionPlan, EmailSendRequest, build_email_execution_plan, execute_email_request,
27};
28use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
29use crate::runner::engine::FlowEngine;
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, read_secret_blocking};
32use crate::storage::session::DynSessionStore;
33use crate::storage::state::DynStateStore;
34use crate::trace::PackTraceInfo;
35use crate::wasi::RunnerWasiPolicy;
36use greentic_types::SecretRequirement;
37
38const TELEGRAM_CACHE_CAPACITY: usize = 1024;
39const WEBHOOK_CACHE_CAPACITY: usize = 256;
40const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
41
42pub struct ActivePacks {
44 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
45}
46
47impl ActivePacks {
48 pub fn new() -> Self {
49 Self {
50 inner: ArcSwap::from_pointee(HashMap::new()),
51 }
52 }
53
54 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
55 self.inner.load().get(tenant).cloned()
56 }
57
58 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
59 self.inner.load_full()
60 }
61
62 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
63 self.inner.store(Arc::new(next));
64 }
65
66 pub fn len(&self) -> usize {
67 self.inner.load().len()
68 }
69
70 pub fn is_empty(&self) -> bool {
71 self.len() == 0
72 }
73}
74
75impl Default for ActivePacks {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81pub struct TenantRuntime {
83 tenant: String,
84 config: Arc<HostConfig>,
85 packs: Vec<Arc<PackRuntime>>,
86 digests: Vec<Option<String>>,
87 engine: Arc<FlowEngine>,
88 state_machine: Arc<StateMachineRuntime>,
89 http_client: Client,
90 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
91 webhook_cache: Mutex<LruCache<String, Value>>,
92 messaging_rate: Mutex<RateLimiter>,
93 mocks: Option<Arc<MockLayer>>,
94 timer_handles: Mutex<Vec<JoinHandle<()>>>,
95 secrets: DynSecretsManager,
96 operator_registry: OperatorRegistry,
97 operator_metrics: Arc<OperatorMetrics>,
98 contract_cache: ContractCache,
99}
100
101#[derive(Clone)]
102pub struct ResolvedComponent {
103 pub digest: String,
104 pub component_ref: String,
105 pub pack: Arc<PackRuntime>,
106}
107
108pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
110 if let Ok(handle) = Handle::try_current() {
111 handle.block_on(future)
112 } else {
113 Runtime::new()
114 .expect("failed to create tokio runtime")
115 .block_on(future)
116 }
117}
118
119impl TenantRuntime {
120 #[allow(clippy::too_many_arguments)]
121 pub async fn load(
122 pack_path: &Path,
123 config: Arc<HostConfig>,
124 mocks: Option<Arc<MockLayer>>,
125 archive_source: Option<&Path>,
126 digest: Option<String>,
127 wasi_policy: Arc<RunnerWasiPolicy>,
128 session_host: Arc<dyn SessionHost>,
129 session_store: DynSessionStore,
130 state_store: DynStateStore,
131 state_host: Arc<dyn StateHost>,
132 secrets_manager: DynSecretsManager,
133 ) -> Result<Arc<Self>> {
134 let oauth_config = config.oauth_broker_config();
135 let pack = Arc::new(
136 PackRuntime::load(
137 pack_path,
138 Arc::clone(&config),
139 mocks.clone(),
140 archive_source,
141 Some(Arc::clone(&session_store)),
142 Some(Arc::clone(&state_store)),
143 Arc::clone(&wasi_policy),
144 Arc::clone(&secrets_manager),
145 oauth_config.clone(),
146 true,
147 ComponentResolution::default(),
148 )
149 .await
150 .with_context(|| {
151 format!(
152 "failed to load pack {} for tenant {}",
153 pack_path.display(),
154 config.tenant
155 )
156 })?,
157 );
158 Self::from_packs(
159 config,
160 vec![(pack, digest)],
161 mocks,
162 session_host,
163 session_store,
164 state_store,
165 state_host,
166 secrets_manager,
167 )
168 .await
169 }
170
171 #[allow(clippy::too_many_arguments)]
172 pub async fn from_packs(
173 config: Arc<HostConfig>,
174 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
175 mocks: Option<Arc<MockLayer>>,
176 session_host: Arc<dyn SessionHost>,
177 session_store: DynSessionStore,
178 _state_store: DynStateStore,
179 state_host: Arc<dyn StateHost>,
180 secrets_manager: DynSecretsManager,
181 ) -> Result<Arc<Self>> {
182 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
183 .expect("telegram cache capacity must be > 0");
184 let webhook_capacity =
185 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
186 let operator_registry = OperatorRegistry::build(&packs)?;
187 let operator_metrics = Arc::new(OperatorMetrics::default());
188 let pack_runtimes = packs
189 .iter()
190 .map(|(pack, _)| Arc::clone(pack))
191 .collect::<Vec<_>>();
192 let digests = packs
193 .iter()
194 .map(|(_, digest)| digest.clone())
195 .collect::<Vec<_>>();
196 let mut pack_trace = HashMap::new();
197 for (pack, digest) in &packs {
198 let pack_id = pack.metadata().pack_id.clone();
199 let pack_ref = config
200 .pack_bindings
201 .iter()
202 .find(|binding| binding.pack_id == pack_id)
203 .map(|binding| binding.pack_ref.clone())
204 .unwrap_or_else(|| pack_id.clone());
205 pack_trace.insert(
206 pack_id,
207 PackTraceInfo {
208 pack_ref,
209 resolved_digest: digest.clone(),
210 },
211 );
212 }
213 let engine = Arc::new(
214 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
215 .await
216 .context("failed to prime flow engine")?,
217 );
218 let state_machine = Arc::new(
219 StateMachineRuntime::from_flow_engine(
220 Arc::clone(&config),
221 Arc::clone(&engine),
222 pack_trace,
223 session_host,
224 session_store,
225 state_host,
226 Arc::clone(&secrets_manager),
227 mocks.clone(),
228 )
229 .context("failed to initialise state machine runtime")?,
230 );
231 let http_client = Client::builder().build()?;
232 let rate_limits = config.rate_limits.clone();
233 Ok(Arc::new(Self {
234 tenant: config.tenant.clone(),
235 config,
236 packs: pack_runtimes,
237 digests,
238 engine,
239 state_machine,
240 http_client,
241 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
242 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
243 messaging_rate: Mutex::new(RateLimiter::new(
244 rate_limits.messaging_send_qps,
245 rate_limits.messaging_burst,
246 )),
247 mocks,
248 timer_handles: Mutex::new(Vec::new()),
249 secrets: secrets_manager,
250 operator_registry,
251 operator_metrics,
252 contract_cache: ContractCache::from_env(),
253 }))
254 }
255
256 pub fn tenant(&self) -> &str {
257 &self.tenant
258 }
259
260 pub fn config(&self) -> &Arc<HostConfig> {
261 &self.config
262 }
263
264 pub fn operator_registry(&self) -> &OperatorRegistry {
265 &self.operator_registry
266 }
267
268 pub fn operator_metrics(&self) -> &OperatorMetrics {
269 &self.operator_metrics
270 }
271
272 pub fn contract_cache(&self) -> &ContractCache {
273 &self.contract_cache
274 }
275
276 pub fn contract_cache_stats(&self) -> ContractCacheStats {
277 self.contract_cache.stats()
278 }
279
280 pub fn main_pack(&self) -> &Arc<PackRuntime> {
281 self.packs
282 .first()
283 .expect("tenant runtime must contain at least one pack")
284 }
285
286 pub fn pack(&self) -> Arc<PackRuntime> {
287 Arc::clone(self.main_pack())
288 }
289
290 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
291 self.packs.iter().skip(1).cloned().collect()
292 }
293
294 pub fn engine(&self) -> &Arc<FlowEngine> {
295 &self.engine
296 }
297
298 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
299 &self.state_machine
300 }
301
302 pub fn http_client(&self) -> &Client {
303 &self.http_client
304 }
305
306 pub fn oauth_config(&self) -> Option<OAuthBrokerConfig> {
307 self.config.oauth_broker_config()
308 }
309
310 pub fn digest(&self) -> Option<&str> {
311 self.digests.first().and_then(|d| d.as_deref())
312 }
313
314 pub fn overlay_digests(&self) -> Vec<Option<String>> {
315 self.digests.iter().skip(1).cloned().collect()
316 }
317
318 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
319 self.packs
320 .iter()
321 .flat_map(|pack| pack.required_secrets().iter().cloned())
322 .collect()
323 }
324
325 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
326 self.packs
327 .iter()
328 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
329 .collect()
330 }
331
332 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
333 &self.telegram_cache
334 }
335
336 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
337 &self.webhook_cache
338 }
339
340 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
341 &self.messaging_rate
342 }
343
344 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
345 self.mocks.as_ref()
346 }
347
348 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
349 self.timer_handles.lock().extend(handles);
350 }
351
352 pub fn get_secret(&self, key: &str) -> Result<String> {
353 if crate::provider_core_only::is_enabled() {
354 bail!(crate::provider_core_only::blocked_message("secrets"))
355 }
356 if !self.config.secrets_policy.is_allowed(key) {
357 bail!("secret {key} is not permitted by bindings policy");
358 }
359 let ctx = self.config.tenant_ctx();
360 let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
361 .context("failed to read secret from manager")?;
362 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
363 Ok(value)
364 }
365
366 pub fn build_events_email_execution_plan(
367 &self,
368 tenant: &greentic_types::TenantCtx,
369 request: &EmailSendRequest,
370 ) -> Result<EmailExecutionPlan> {
371 let oauth = self
372 .oauth_config()
373 .ok_or_else(|| anyhow!("oauth broker config is not configured for tenant runtime"))?;
374 build_email_execution_plan(&oauth, tenant, request)
375 }
376
377 pub async fn execute_events_email_request(
378 &self,
379 access_token: &str,
380 request: &EmailSendRequest,
381 ) -> Result<()> {
382 execute_email_request(self.http_client(), access_token, request).await
383 }
384
385 pub async fn execute_events_email_with_oauth(
386 &self,
387 tenant: &greentic_types::TenantCtx,
388 request: &EmailSendRequest,
389 ) -> Result<()> {
390 let plan = self.build_events_email_execution_plan(tenant, request)?;
391 let token = request_resource_token(self.http_client(), &plan.token_request).await?;
392 self.execute_events_email_request(&token.access_token, request)
393 .await
394 }
395
396 pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
397 self.packs
398 .iter()
399 .find(|pack| pack.contains_component(component_ref))
400 .cloned()
401 }
402
403 pub fn pack_for_component_with_digest(
404 &self,
405 component_ref: &str,
406 ) -> Option<(Arc<PackRuntime>, Option<String>)> {
407 self.packs
408 .iter()
409 .zip(self.digests.iter())
410 .find(|(pack, _)| pack.contains_component(component_ref))
411 .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
412 }
413
414 pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
415 self.pack_for_component_with_digest(component_ref)
416 .map(|(pack, digest)| ResolvedComponent {
417 digest: digest
418 .or_else(|| self.digest().map(ToString::to_string))
419 .unwrap_or_else(|| "unknown".to_string()),
420 component_ref: component_ref.to_string(),
421 pack,
422 })
423 }
424}
425
426impl Drop for TenantRuntime {
427 fn drop(&mut self) {
428 for handle in self.timer_handles.lock().drain(..) {
429 handle.abort();
430 }
431 }
432}
433
434pub struct RateLimiter {
435 allowance: f64,
436 rate: f64,
437 burst: f64,
438 last_check: Instant,
439}
440
441impl RateLimiter {
442 pub fn new(qps: u32, burst: u32) -> Self {
443 let rate = qps.max(1) as f64;
444 let burst = burst.max(1) as f64;
445 Self {
446 allowance: burst,
447 rate,
448 burst,
449 last_check: Instant::now(),
450 }
451 }
452
453 pub fn try_acquire(&mut self) -> bool {
454 let now = Instant::now();
455 let elapsed = now.duration_since(self.last_check).as_secs_f64();
456 self.last_check = now;
457 self.allowance += elapsed * self.rate;
458 if self.allowance > self.burst {
459 self.allowance = self.burst;
460 }
461 if self.allowance < 1.0 {
462 false
463 } else {
464 self.allowance -= 1.0;
465 true
466 }
467 }
468}