1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::engine::FlowEngine;
25use crate::runner::mocks::MockLayer;
26use crate::secrets::{DynSecretsManager, read_secret_blocking};
27use crate::storage::session::DynSessionStore;
28use crate::storage::state::DynStateStore;
29use crate::trace::PackTraceInfo;
30use crate::wasi::RunnerWasiPolicy;
31use greentic_types::SecretRequirement;
32
33const TELEGRAM_CACHE_CAPACITY: usize = 1024;
34const WEBHOOK_CACHE_CAPACITY: usize = 256;
35const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
36
37pub struct ActivePacks {
39 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
40}
41
42impl ActivePacks {
43 pub fn new() -> Self {
44 Self {
45 inner: ArcSwap::from_pointee(HashMap::new()),
46 }
47 }
48
49 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
50 self.inner.load().get(tenant).cloned()
51 }
52
53 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
54 self.inner.load_full()
55 }
56
57 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
58 self.inner.store(Arc::new(next));
59 }
60
61 pub fn len(&self) -> usize {
62 self.inner.load().len()
63 }
64
65 pub fn is_empty(&self) -> bool {
66 self.len() == 0
67 }
68}
69
70impl Default for ActivePacks {
71 fn default() -> Self {
72 Self::new()
73 }
74}
75
76pub struct TenantRuntime {
78 tenant: String,
79 config: Arc<HostConfig>,
80 packs: Vec<Arc<PackRuntime>>,
81 digests: Vec<Option<String>>,
82 engine: Arc<FlowEngine>,
83 state_machine: Arc<StateMachineRuntime>,
84 http_client: Client,
85 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
86 webhook_cache: Mutex<LruCache<String, Value>>,
87 messaging_rate: Mutex<RateLimiter>,
88 mocks: Option<Arc<MockLayer>>,
89 timer_handles: Mutex<Vec<JoinHandle<()>>>,
90 secrets: DynSecretsManager,
91 operator_registry: OperatorRegistry,
92 operator_metrics: Arc<OperatorMetrics>,
93}
94
95pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
97 if let Ok(handle) = Handle::try_current() {
98 handle.block_on(future)
99 } else {
100 Runtime::new()
101 .expect("failed to create tokio runtime")
102 .block_on(future)
103 }
104}
105
106impl TenantRuntime {
107 #[allow(clippy::too_many_arguments)]
108 pub async fn load(
109 pack_path: &Path,
110 config: Arc<HostConfig>,
111 mocks: Option<Arc<MockLayer>>,
112 archive_source: Option<&Path>,
113 digest: Option<String>,
114 wasi_policy: Arc<RunnerWasiPolicy>,
115 session_host: Arc<dyn SessionHost>,
116 session_store: DynSessionStore,
117 state_store: DynStateStore,
118 state_host: Arc<dyn StateHost>,
119 secrets_manager: DynSecretsManager,
120 ) -> Result<Arc<Self>> {
121 let oauth_config = config.oauth_broker_config();
122 let pack = Arc::new(
123 PackRuntime::load(
124 pack_path,
125 Arc::clone(&config),
126 mocks.clone(),
127 archive_source,
128 Some(Arc::clone(&session_store)),
129 Some(Arc::clone(&state_store)),
130 Arc::clone(&wasi_policy),
131 Arc::clone(&secrets_manager),
132 oauth_config.clone(),
133 true,
134 ComponentResolution::default(),
135 )
136 .await
137 .with_context(|| {
138 format!(
139 "failed to load pack {} for tenant {}",
140 pack_path.display(),
141 config.tenant
142 )
143 })?,
144 );
145 Self::from_packs(
146 config,
147 vec![(pack, digest)],
148 mocks,
149 session_host,
150 session_store,
151 state_store,
152 state_host,
153 secrets_manager,
154 )
155 .await
156 }
157
158 #[allow(clippy::too_many_arguments)]
159 pub async fn from_packs(
160 config: Arc<HostConfig>,
161 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
162 mocks: Option<Arc<MockLayer>>,
163 session_host: Arc<dyn SessionHost>,
164 session_store: DynSessionStore,
165 _state_store: DynStateStore,
166 state_host: Arc<dyn StateHost>,
167 secrets_manager: DynSecretsManager,
168 ) -> Result<Arc<Self>> {
169 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
170 .expect("telegram cache capacity must be > 0");
171 let webhook_capacity =
172 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
173 let operator_registry = OperatorRegistry::build(&packs)?;
174 let operator_metrics = Arc::new(OperatorMetrics::default());
175 let pack_runtimes = packs
176 .iter()
177 .map(|(pack, _)| Arc::clone(pack))
178 .collect::<Vec<_>>();
179 let digests = packs
180 .iter()
181 .map(|(_, digest)| digest.clone())
182 .collect::<Vec<_>>();
183 let mut pack_trace = HashMap::new();
184 for (pack, digest) in &packs {
185 let pack_id = pack.metadata().pack_id.clone();
186 let pack_ref = config
187 .pack_bindings
188 .iter()
189 .find(|binding| binding.pack_id == pack_id)
190 .map(|binding| binding.pack_ref.clone())
191 .unwrap_or_else(|| pack_id.clone());
192 pack_trace.insert(
193 pack_id,
194 PackTraceInfo {
195 pack_ref,
196 resolved_digest: digest.clone(),
197 },
198 );
199 }
200 let engine = Arc::new(
201 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
202 .await
203 .context("failed to prime flow engine")?,
204 );
205 let state_machine = Arc::new(
206 StateMachineRuntime::from_flow_engine(
207 Arc::clone(&config),
208 Arc::clone(&engine),
209 pack_trace,
210 session_host,
211 session_store,
212 state_host,
213 Arc::clone(&secrets_manager),
214 mocks.clone(),
215 )
216 .context("failed to initialise state machine runtime")?,
217 );
218 let http_client = Client::builder().build()?;
219 let rate_limits = config.rate_limits.clone();
220 Ok(Arc::new(Self {
221 tenant: config.tenant.clone(),
222 config,
223 packs: pack_runtimes,
224 digests,
225 engine,
226 state_machine,
227 http_client,
228 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
229 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
230 messaging_rate: Mutex::new(RateLimiter::new(
231 rate_limits.messaging_send_qps,
232 rate_limits.messaging_burst,
233 )),
234 mocks,
235 timer_handles: Mutex::new(Vec::new()),
236 secrets: secrets_manager,
237 operator_registry,
238 operator_metrics,
239 }))
240 }
241
242 pub fn tenant(&self) -> &str {
243 &self.tenant
244 }
245
246 pub fn config(&self) -> &Arc<HostConfig> {
247 &self.config
248 }
249
250 pub fn operator_registry(&self) -> &OperatorRegistry {
251 &self.operator_registry
252 }
253
254 pub fn operator_metrics(&self) -> &OperatorMetrics {
255 &self.operator_metrics
256 }
257
258 pub fn main_pack(&self) -> &Arc<PackRuntime> {
259 self.packs
260 .first()
261 .expect("tenant runtime must contain at least one pack")
262 }
263
264 pub fn pack(&self) -> Arc<PackRuntime> {
265 Arc::clone(self.main_pack())
266 }
267
268 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
269 self.packs.iter().skip(1).cloned().collect()
270 }
271
272 pub fn engine(&self) -> &Arc<FlowEngine> {
273 &self.engine
274 }
275
276 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
277 &self.state_machine
278 }
279
280 pub fn http_client(&self) -> &Client {
281 &self.http_client
282 }
283
284 pub fn digest(&self) -> Option<&str> {
285 self.digests.first().and_then(|d| d.as_deref())
286 }
287
288 pub fn overlay_digests(&self) -> Vec<Option<String>> {
289 self.digests.iter().skip(1).cloned().collect()
290 }
291
292 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
293 self.packs
294 .iter()
295 .flat_map(|pack| pack.required_secrets().iter().cloned())
296 .collect()
297 }
298
299 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
300 self.packs
301 .iter()
302 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
303 .collect()
304 }
305
306 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
307 &self.telegram_cache
308 }
309
310 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
311 &self.webhook_cache
312 }
313
314 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
315 &self.messaging_rate
316 }
317
318 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
319 self.mocks.as_ref()
320 }
321
322 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
323 self.timer_handles.lock().extend(handles);
324 }
325
326 pub fn get_secret(&self, key: &str) -> Result<String> {
327 if crate::provider_core_only::is_enabled() {
328 bail!(crate::provider_core_only::blocked_message("secrets"))
329 }
330 if !self.config.secrets_policy.is_allowed(key) {
331 bail!("secret {key} is not permitted by bindings policy");
332 }
333 let ctx = self.config.tenant_ctx();
334 let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
335 .context("failed to read secret from manager")?;
336 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
337 Ok(value)
338 }
339
340 pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
341 self.packs
342 .iter()
343 .find(|pack| pack.contains_component(component_ref))
344 .cloned()
345 }
346}
347
348impl Drop for TenantRuntime {
349 fn drop(&mut self) {
350 for handle in self.timer_handles.lock().drain(..) {
351 handle.abort();
352 }
353 }
354}
355
356pub struct RateLimiter {
357 allowance: f64,
358 rate: f64,
359 burst: f64,
360 last_check: Instant,
361}
362
363impl RateLimiter {
364 pub fn new(qps: u32, burst: u32) -> Self {
365 let rate = qps.max(1) as f64;
366 let burst = burst.max(1) as f64;
367 Self {
368 allowance: burst,
369 rate,
370 burst,
371 last_check: Instant::now(),
372 }
373 }
374
375 pub fn try_acquire(&mut self) -> bool {
376 let now = Instant::now();
377 let elapsed = now.duration_since(self.last_check).as_secs_f64();
378 self.last_check = now;
379 self.allowance += elapsed * self.rate;
380 if self.allowance > self.burst {
381 self.allowance = self.burst;
382 }
383 if self.allowance < 1.0 {
384 false
385 } else {
386 self.allowance -= 1.0;
387 true
388 }
389 }
390}