1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::pack::{ComponentResolution, PackRuntime};
22use crate::runner::engine::FlowEngine;
23use crate::runner::mocks::MockLayer;
24use crate::secrets::{DynSecretsManager, read_secret_blocking};
25use crate::storage::session::DynSessionStore;
26use crate::storage::state::DynStateStore;
27use crate::trace::PackTraceInfo;
28use crate::wasi::RunnerWasiPolicy;
29use greentic_types::SecretRequirement;
30
31const TELEGRAM_CACHE_CAPACITY: usize = 1024;
32const WEBHOOK_CACHE_CAPACITY: usize = 256;
33
34pub struct ActivePacks {
36 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
37}
38
39impl ActivePacks {
40 pub fn new() -> Self {
41 Self {
42 inner: ArcSwap::from_pointee(HashMap::new()),
43 }
44 }
45
46 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
47 self.inner.load().get(tenant).cloned()
48 }
49
50 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
51 self.inner.load_full()
52 }
53
54 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
55 self.inner.store(Arc::new(next));
56 }
57
58 pub fn len(&self) -> usize {
59 self.inner.load().len()
60 }
61
62 pub fn is_empty(&self) -> bool {
63 self.len() == 0
64 }
65}
66
67impl Default for ActivePacks {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73pub struct TenantRuntime {
75 tenant: String,
76 config: Arc<HostConfig>,
77 packs: Vec<Arc<PackRuntime>>,
78 digests: Vec<Option<String>>,
79 engine: Arc<FlowEngine>,
80 state_machine: Arc<StateMachineRuntime>,
81 http_client: Client,
82 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
83 webhook_cache: Mutex<LruCache<String, Value>>,
84 messaging_rate: Mutex<RateLimiter>,
85 mocks: Option<Arc<MockLayer>>,
86 timer_handles: Mutex<Vec<JoinHandle<()>>>,
87 secrets: DynSecretsManager,
88}
89
90pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
92 if let Ok(handle) = Handle::try_current() {
93 handle.block_on(future)
94 } else {
95 Runtime::new()
96 .expect("failed to create tokio runtime")
97 .block_on(future)
98 }
99}
100
101impl TenantRuntime {
102 #[allow(clippy::too_many_arguments)]
103 pub async fn load(
104 pack_path: &Path,
105 config: Arc<HostConfig>,
106 mocks: Option<Arc<MockLayer>>,
107 archive_source: Option<&Path>,
108 digest: Option<String>,
109 wasi_policy: Arc<RunnerWasiPolicy>,
110 session_host: Arc<dyn SessionHost>,
111 session_store: DynSessionStore,
112 state_store: DynStateStore,
113 state_host: Arc<dyn StateHost>,
114 secrets_manager: DynSecretsManager,
115 ) -> Result<Arc<Self>> {
116 let oauth_config = config.oauth_broker_config();
117 let pack = Arc::new(
118 PackRuntime::load(
119 pack_path,
120 Arc::clone(&config),
121 mocks.clone(),
122 archive_source,
123 Some(Arc::clone(&session_store)),
124 Some(Arc::clone(&state_store)),
125 Arc::clone(&wasi_policy),
126 Arc::clone(&secrets_manager),
127 oauth_config.clone(),
128 true,
129 ComponentResolution::default(),
130 )
131 .await
132 .with_context(|| {
133 format!(
134 "failed to load pack {} for tenant {}",
135 pack_path.display(),
136 config.tenant
137 )
138 })?,
139 );
140 Self::from_packs(
141 config,
142 vec![(pack, digest)],
143 mocks,
144 session_host,
145 session_store,
146 state_store,
147 state_host,
148 secrets_manager,
149 )
150 .await
151 }
152
153 #[allow(clippy::too_many_arguments)]
154 pub async fn from_packs(
155 config: Arc<HostConfig>,
156 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
157 mocks: Option<Arc<MockLayer>>,
158 session_host: Arc<dyn SessionHost>,
159 session_store: DynSessionStore,
160 _state_store: DynStateStore,
161 state_host: Arc<dyn StateHost>,
162 secrets_manager: DynSecretsManager,
163 ) -> Result<Arc<Self>> {
164 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
165 .expect("telegram cache capacity must be > 0");
166 let webhook_capacity =
167 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
168 let pack_runtimes = packs
169 .iter()
170 .map(|(pack, _)| Arc::clone(pack))
171 .collect::<Vec<_>>();
172 let digests = packs
173 .iter()
174 .map(|(_, digest)| digest.clone())
175 .collect::<Vec<_>>();
176 let mut pack_trace = HashMap::new();
177 for (pack, digest) in &packs {
178 let pack_id = pack.metadata().pack_id.clone();
179 let pack_ref = config
180 .pack_bindings
181 .iter()
182 .find(|binding| binding.pack_id == pack_id)
183 .map(|binding| binding.pack_ref.clone())
184 .unwrap_or_else(|| pack_id.clone());
185 pack_trace.insert(
186 pack_id,
187 PackTraceInfo {
188 pack_ref,
189 resolved_digest: digest.clone(),
190 },
191 );
192 }
193 let engine = Arc::new(
194 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
195 .await
196 .context("failed to prime flow engine")?,
197 );
198 let state_machine = Arc::new(
199 StateMachineRuntime::from_flow_engine(
200 Arc::clone(&config),
201 Arc::clone(&engine),
202 pack_trace,
203 session_host,
204 session_store,
205 state_host,
206 Arc::clone(&secrets_manager),
207 mocks.clone(),
208 )
209 .context("failed to initialise state machine runtime")?,
210 );
211 let http_client = Client::builder().build()?;
212 let rate_limits = config.rate_limits.clone();
213 Ok(Arc::new(Self {
214 tenant: config.tenant.clone(),
215 config,
216 packs: pack_runtimes,
217 digests,
218 engine,
219 state_machine,
220 http_client,
221 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
222 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
223 messaging_rate: Mutex::new(RateLimiter::new(
224 rate_limits.messaging_send_qps,
225 rate_limits.messaging_burst,
226 )),
227 mocks,
228 timer_handles: Mutex::new(Vec::new()),
229 secrets: secrets_manager,
230 }))
231 }
232
233 pub fn tenant(&self) -> &str {
234 &self.tenant
235 }
236
237 pub fn config(&self) -> &Arc<HostConfig> {
238 &self.config
239 }
240
241 pub fn main_pack(&self) -> &Arc<PackRuntime> {
242 self.packs
243 .first()
244 .expect("tenant runtime must contain at least one pack")
245 }
246
247 pub fn pack(&self) -> Arc<PackRuntime> {
248 Arc::clone(self.main_pack())
249 }
250
251 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
252 self.packs.iter().skip(1).cloned().collect()
253 }
254
255 pub fn engine(&self) -> &Arc<FlowEngine> {
256 &self.engine
257 }
258
259 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
260 &self.state_machine
261 }
262
263 pub fn http_client(&self) -> &Client {
264 &self.http_client
265 }
266
267 pub fn digest(&self) -> Option<&str> {
268 self.digests.first().and_then(|d| d.as_deref())
269 }
270
271 pub fn overlay_digests(&self) -> Vec<Option<String>> {
272 self.digests.iter().skip(1).cloned().collect()
273 }
274
275 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
276 self.packs
277 .iter()
278 .flat_map(|pack| pack.required_secrets().iter().cloned())
279 .collect()
280 }
281
282 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
283 self.packs
284 .iter()
285 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
286 .collect()
287 }
288
289 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
290 &self.telegram_cache
291 }
292
293 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
294 &self.webhook_cache
295 }
296
297 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
298 &self.messaging_rate
299 }
300
301 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
302 self.mocks.as_ref()
303 }
304
305 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
306 self.timer_handles.lock().extend(handles);
307 }
308
309 pub fn get_secret(&self, key: &str) -> Result<String> {
310 if crate::provider_core_only::is_enabled() {
311 bail!(crate::provider_core_only::blocked_message("secrets"))
312 }
313 if !self.config.secrets_policy.is_allowed(key) {
314 bail!("secret {key} is not permitted by bindings policy");
315 }
316 let ctx = self.config.tenant_ctx();
317 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
318 .context("failed to read secret from manager")?;
319 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
320 Ok(value)
321 }
322}
323
324impl Drop for TenantRuntime {
325 fn drop(&mut self) {
326 for handle in self.timer_handles.lock().drain(..) {
327 handle.abort();
328 }
329 }
330}
331
332pub struct RateLimiter {
333 allowance: f64,
334 rate: f64,
335 burst: f64,
336 last_check: Instant,
337}
338
339impl RateLimiter {
340 pub fn new(qps: u32, burst: u32) -> Self {
341 let rate = qps.max(1) as f64;
342 let burst = burst.max(1) as f64;
343 Self {
344 allowance: burst,
345 rate,
346 burst,
347 last_check: Instant::now(),
348 }
349 }
350
351 pub fn try_acquire(&mut self) -> bool {
352 let now = Instant::now();
353 let elapsed = now.duration_since(self.last_check).as_secs_f64();
354 self.last_check = now;
355 self.allowance += elapsed * self.rate;
356 if self.allowance > self.burst {
357 self.allowance = self.burst;
358 }
359 if self.allowance < 1.0 {
360 false
361 } else {
362 self.allowance -= 1.0;
363 true
364 }
365 }
366}