1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::engine::FlowEngine;
25use crate::runner::mocks::MockLayer;
26use crate::secrets::{DynSecretsManager, read_secret_blocking};
27use crate::storage::session::DynSessionStore;
28use crate::storage::state::DynStateStore;
29use crate::trace::PackTraceInfo;
30use crate::wasi::RunnerWasiPolicy;
31use greentic_types::SecretRequirement;
32
33const TELEGRAM_CACHE_CAPACITY: usize = 1024;
34const WEBHOOK_CACHE_CAPACITY: usize = 256;
35
36pub struct ActivePacks {
38 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
39}
40
41impl ActivePacks {
42 pub fn new() -> Self {
43 Self {
44 inner: ArcSwap::from_pointee(HashMap::new()),
45 }
46 }
47
48 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
49 self.inner.load().get(tenant).cloned()
50 }
51
52 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
53 self.inner.load_full()
54 }
55
56 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
57 self.inner.store(Arc::new(next));
58 }
59
60 pub fn len(&self) -> usize {
61 self.inner.load().len()
62 }
63
64 pub fn is_empty(&self) -> bool {
65 self.len() == 0
66 }
67}
68
69impl Default for ActivePacks {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75pub struct TenantRuntime {
77 tenant: String,
78 config: Arc<HostConfig>,
79 packs: Vec<Arc<PackRuntime>>,
80 digests: Vec<Option<String>>,
81 engine: Arc<FlowEngine>,
82 state_machine: Arc<StateMachineRuntime>,
83 http_client: Client,
84 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
85 webhook_cache: Mutex<LruCache<String, Value>>,
86 messaging_rate: Mutex<RateLimiter>,
87 mocks: Option<Arc<MockLayer>>,
88 timer_handles: Mutex<Vec<JoinHandle<()>>>,
89 secrets: DynSecretsManager,
90 operator_registry: OperatorRegistry,
91 operator_metrics: Arc<OperatorMetrics>,
92}
93
94pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
96 if let Ok(handle) = Handle::try_current() {
97 handle.block_on(future)
98 } else {
99 Runtime::new()
100 .expect("failed to create tokio runtime")
101 .block_on(future)
102 }
103}
104
105impl TenantRuntime {
106 #[allow(clippy::too_many_arguments)]
107 pub async fn load(
108 pack_path: &Path,
109 config: Arc<HostConfig>,
110 mocks: Option<Arc<MockLayer>>,
111 archive_source: Option<&Path>,
112 digest: Option<String>,
113 wasi_policy: Arc<RunnerWasiPolicy>,
114 session_host: Arc<dyn SessionHost>,
115 session_store: DynSessionStore,
116 state_store: DynStateStore,
117 state_host: Arc<dyn StateHost>,
118 secrets_manager: DynSecretsManager,
119 ) -> Result<Arc<Self>> {
120 let oauth_config = config.oauth_broker_config();
121 let pack = Arc::new(
122 PackRuntime::load(
123 pack_path,
124 Arc::clone(&config),
125 mocks.clone(),
126 archive_source,
127 Some(Arc::clone(&session_store)),
128 Some(Arc::clone(&state_store)),
129 Arc::clone(&wasi_policy),
130 Arc::clone(&secrets_manager),
131 oauth_config.clone(),
132 true,
133 ComponentResolution::default(),
134 )
135 .await
136 .with_context(|| {
137 format!(
138 "failed to load pack {} for tenant {}",
139 pack_path.display(),
140 config.tenant
141 )
142 })?,
143 );
144 Self::from_packs(
145 config,
146 vec![(pack, digest)],
147 mocks,
148 session_host,
149 session_store,
150 state_store,
151 state_host,
152 secrets_manager,
153 )
154 .await
155 }
156
157 #[allow(clippy::too_many_arguments)]
158 pub async fn from_packs(
159 config: Arc<HostConfig>,
160 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
161 mocks: Option<Arc<MockLayer>>,
162 session_host: Arc<dyn SessionHost>,
163 session_store: DynSessionStore,
164 _state_store: DynStateStore,
165 state_host: Arc<dyn StateHost>,
166 secrets_manager: DynSecretsManager,
167 ) -> Result<Arc<Self>> {
168 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
169 .expect("telegram cache capacity must be > 0");
170 let webhook_capacity =
171 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
172 let operator_registry = OperatorRegistry::build(&packs)?;
173 let operator_metrics = Arc::new(OperatorMetrics::default());
174 let pack_runtimes = packs
175 .iter()
176 .map(|(pack, _)| Arc::clone(pack))
177 .collect::<Vec<_>>();
178 let digests = packs
179 .iter()
180 .map(|(_, digest)| digest.clone())
181 .collect::<Vec<_>>();
182 let mut pack_trace = HashMap::new();
183 for (pack, digest) in &packs {
184 let pack_id = pack.metadata().pack_id.clone();
185 let pack_ref = config
186 .pack_bindings
187 .iter()
188 .find(|binding| binding.pack_id == pack_id)
189 .map(|binding| binding.pack_ref.clone())
190 .unwrap_or_else(|| pack_id.clone());
191 pack_trace.insert(
192 pack_id,
193 PackTraceInfo {
194 pack_ref,
195 resolved_digest: digest.clone(),
196 },
197 );
198 }
199 let engine = Arc::new(
200 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
201 .await
202 .context("failed to prime flow engine")?,
203 );
204 let state_machine = Arc::new(
205 StateMachineRuntime::from_flow_engine(
206 Arc::clone(&config),
207 Arc::clone(&engine),
208 pack_trace,
209 session_host,
210 session_store,
211 state_host,
212 Arc::clone(&secrets_manager),
213 mocks.clone(),
214 )
215 .context("failed to initialise state machine runtime")?,
216 );
217 let http_client = Client::builder().build()?;
218 let rate_limits = config.rate_limits.clone();
219 Ok(Arc::new(Self {
220 tenant: config.tenant.clone(),
221 config,
222 packs: pack_runtimes,
223 digests,
224 engine,
225 state_machine,
226 http_client,
227 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
228 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
229 messaging_rate: Mutex::new(RateLimiter::new(
230 rate_limits.messaging_send_qps,
231 rate_limits.messaging_burst,
232 )),
233 mocks,
234 timer_handles: Mutex::new(Vec::new()),
235 secrets: secrets_manager,
236 operator_registry,
237 operator_metrics,
238 }))
239 }
240
241 pub fn tenant(&self) -> &str {
242 &self.tenant
243 }
244
245 pub fn config(&self) -> &Arc<HostConfig> {
246 &self.config
247 }
248
249 pub fn operator_registry(&self) -> &OperatorRegistry {
250 &self.operator_registry
251 }
252
253 pub fn operator_metrics(&self) -> &OperatorMetrics {
254 &self.operator_metrics
255 }
256
257 pub fn main_pack(&self) -> &Arc<PackRuntime> {
258 self.packs
259 .first()
260 .expect("tenant runtime must contain at least one pack")
261 }
262
263 pub fn pack(&self) -> Arc<PackRuntime> {
264 Arc::clone(self.main_pack())
265 }
266
267 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
268 self.packs.iter().skip(1).cloned().collect()
269 }
270
271 pub fn engine(&self) -> &Arc<FlowEngine> {
272 &self.engine
273 }
274
275 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
276 &self.state_machine
277 }
278
279 pub fn http_client(&self) -> &Client {
280 &self.http_client
281 }
282
283 pub fn digest(&self) -> Option<&str> {
284 self.digests.first().and_then(|d| d.as_deref())
285 }
286
287 pub fn overlay_digests(&self) -> Vec<Option<String>> {
288 self.digests.iter().skip(1).cloned().collect()
289 }
290
291 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
292 self.packs
293 .iter()
294 .flat_map(|pack| pack.required_secrets().iter().cloned())
295 .collect()
296 }
297
298 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
299 self.packs
300 .iter()
301 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
302 .collect()
303 }
304
305 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
306 &self.telegram_cache
307 }
308
309 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
310 &self.webhook_cache
311 }
312
313 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
314 &self.messaging_rate
315 }
316
317 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
318 self.mocks.as_ref()
319 }
320
321 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
322 self.timer_handles.lock().extend(handles);
323 }
324
325 pub fn get_secret(&self, key: &str) -> Result<String> {
326 if crate::provider_core_only::is_enabled() {
327 bail!(crate::provider_core_only::blocked_message("secrets"))
328 }
329 if !self.config.secrets_policy.is_allowed(key) {
330 bail!("secret {key} is not permitted by bindings policy");
331 }
332 let ctx = self.config.tenant_ctx();
333 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
334 .context("failed to read secret from manager")?;
335 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
336 Ok(value)
337 }
338
339 pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
340 self.packs
341 .iter()
342 .find(|pack| pack.contains_component(component_ref))
343 .cloned()
344 }
345}
346
347impl Drop for TenantRuntime {
348 fn drop(&mut self) {
349 for handle in self.timer_handles.lock().drain(..) {
350 handle.abort();
351 }
352 }
353}
354
355pub struct RateLimiter {
356 allowance: f64,
357 rate: f64,
358 burst: f64,
359 last_check: Instant,
360}
361
362impl RateLimiter {
363 pub fn new(qps: u32, burst: u32) -> Self {
364 let rate = qps.max(1) as f64;
365 let burst = burst.max(1) as f64;
366 Self {
367 allowance: burst,
368 rate,
369 burst,
370 last_check: Instant::now(),
371 }
372 }
373
374 pub fn try_acquire(&mut self) -> bool {
375 let now = Instant::now();
376 let elapsed = now.duration_since(self.last_check).as_secs_f64();
377 self.last_check = now;
378 self.allowance += elapsed * self.rate;
379 if self.allowance > self.burst {
380 self.allowance = self.burst;
381 }
382 if self.allowance < 1.0 {
383 false
384 } else {
385 self.allowance -= 1.0;
386 true
387 }
388 }
389}