1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::{ComponentResolution, PackRuntime};
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::trace::PackTraceInfo;
26use crate::wasi::RunnerWasiPolicy;
27use greentic_types::SecretRequirement;
28
29const TELEGRAM_CACHE_CAPACITY: usize = 1024;
30const WEBHOOK_CACHE_CAPACITY: usize = 256;
31
32pub struct ActivePacks {
34 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
35}
36
37impl ActivePacks {
38 pub fn new() -> Self {
39 Self {
40 inner: ArcSwap::from_pointee(HashMap::new()),
41 }
42 }
43
44 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
45 self.inner.load().get(tenant).cloned()
46 }
47
48 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
49 self.inner.load_full()
50 }
51
52 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
53 self.inner.store(Arc::new(next));
54 }
55
56 pub fn len(&self) -> usize {
57 self.inner.load().len()
58 }
59
60 pub fn is_empty(&self) -> bool {
61 self.len() == 0
62 }
63}
64
65impl Default for ActivePacks {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71pub struct TenantRuntime {
73 tenant: String,
74 config: Arc<HostConfig>,
75 packs: Vec<Arc<PackRuntime>>,
76 digests: Vec<Option<String>>,
77 engine: Arc<FlowEngine>,
78 state_machine: Arc<StateMachineRuntime>,
79 http_client: Client,
80 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
81 webhook_cache: Mutex<LruCache<String, Value>>,
82 messaging_rate: Mutex<RateLimiter>,
83 mocks: Option<Arc<MockLayer>>,
84 timer_handles: Mutex<Vec<JoinHandle<()>>>,
85 secrets: DynSecretsManager,
86}
87
88impl TenantRuntime {
89 #[allow(clippy::too_many_arguments)]
90 pub async fn load(
91 pack_path: &Path,
92 config: Arc<HostConfig>,
93 mocks: Option<Arc<MockLayer>>,
94 archive_source: Option<&Path>,
95 digest: Option<String>,
96 wasi_policy: Arc<RunnerWasiPolicy>,
97 session_host: Arc<dyn SessionHost>,
98 session_store: DynSessionStore,
99 state_store: DynStateStore,
100 state_host: Arc<dyn StateHost>,
101 secrets_manager: DynSecretsManager,
102 ) -> Result<Arc<Self>> {
103 let oauth_config = config.oauth_broker_config();
104 let pack = Arc::new(
105 PackRuntime::load(
106 pack_path,
107 Arc::clone(&config),
108 mocks.clone(),
109 archive_source,
110 Some(Arc::clone(&session_store)),
111 Some(Arc::clone(&state_store)),
112 Arc::clone(&wasi_policy),
113 Arc::clone(&secrets_manager),
114 oauth_config.clone(),
115 true,
116 ComponentResolution::default(),
117 )
118 .await
119 .with_context(|| {
120 format!(
121 "failed to load pack {} for tenant {}",
122 pack_path.display(),
123 config.tenant
124 )
125 })?,
126 );
127 Self::from_packs(
128 config,
129 vec![(pack, digest)],
130 mocks,
131 session_host,
132 session_store,
133 state_store,
134 state_host,
135 secrets_manager,
136 )
137 .await
138 }
139
140 #[allow(clippy::too_many_arguments)]
141 pub async fn from_packs(
142 config: Arc<HostConfig>,
143 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
144 mocks: Option<Arc<MockLayer>>,
145 session_host: Arc<dyn SessionHost>,
146 session_store: DynSessionStore,
147 _state_store: DynStateStore,
148 state_host: Arc<dyn StateHost>,
149 secrets_manager: DynSecretsManager,
150 ) -> Result<Arc<Self>> {
151 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
152 .expect("telegram cache capacity must be > 0");
153 let webhook_capacity =
154 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
155 let pack_runtimes = packs
156 .iter()
157 .map(|(pack, _)| Arc::clone(pack))
158 .collect::<Vec<_>>();
159 let digests = packs
160 .iter()
161 .map(|(_, digest)| digest.clone())
162 .collect::<Vec<_>>();
163 let mut pack_trace = HashMap::new();
164 for (pack, digest) in &packs {
165 let pack_id = pack.metadata().pack_id.clone();
166 let pack_ref = config
167 .pack_bindings
168 .iter()
169 .find(|binding| binding.pack_id == pack_id)
170 .map(|binding| binding.pack_ref.clone())
171 .unwrap_or_else(|| pack_id.clone());
172 pack_trace.insert(
173 pack_id,
174 PackTraceInfo {
175 pack_ref,
176 resolved_digest: digest.clone(),
177 },
178 );
179 }
180 let engine = Arc::new(
181 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
182 .await
183 .context("failed to prime flow engine")?,
184 );
185 let state_machine = Arc::new(
186 StateMachineRuntime::from_flow_engine(
187 Arc::clone(&config),
188 Arc::clone(&engine),
189 pack_trace,
190 session_host,
191 session_store,
192 state_host,
193 Arc::clone(&secrets_manager),
194 mocks.clone(),
195 )
196 .context("failed to initialise state machine runtime")?,
197 );
198 let http_client = Client::builder().build()?;
199 let rate_limits = config.rate_limits.clone();
200 Ok(Arc::new(Self {
201 tenant: config.tenant.clone(),
202 config,
203 packs: pack_runtimes,
204 digests,
205 engine,
206 state_machine,
207 http_client,
208 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
209 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
210 messaging_rate: Mutex::new(RateLimiter::new(
211 rate_limits.messaging_send_qps,
212 rate_limits.messaging_burst,
213 )),
214 mocks,
215 timer_handles: Mutex::new(Vec::new()),
216 secrets: secrets_manager,
217 }))
218 }
219
220 pub fn tenant(&self) -> &str {
221 &self.tenant
222 }
223
224 pub fn config(&self) -> &Arc<HostConfig> {
225 &self.config
226 }
227
228 pub fn main_pack(&self) -> &Arc<PackRuntime> {
229 self.packs
230 .first()
231 .expect("tenant runtime must contain at least one pack")
232 }
233
234 pub fn pack(&self) -> Arc<PackRuntime> {
235 Arc::clone(self.main_pack())
236 }
237
238 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
239 self.packs.iter().skip(1).cloned().collect()
240 }
241
242 pub fn engine(&self) -> &Arc<FlowEngine> {
243 &self.engine
244 }
245
246 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
247 &self.state_machine
248 }
249
250 pub fn http_client(&self) -> &Client {
251 &self.http_client
252 }
253
254 pub fn digest(&self) -> Option<&str> {
255 self.digests.first().and_then(|d| d.as_deref())
256 }
257
258 pub fn overlay_digests(&self) -> Vec<Option<String>> {
259 self.digests.iter().skip(1).cloned().collect()
260 }
261
262 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
263 self.packs
264 .iter()
265 .flat_map(|pack| pack.required_secrets().iter().cloned())
266 .collect()
267 }
268
269 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
270 self.packs
271 .iter()
272 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
273 .collect()
274 }
275
276 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
277 &self.telegram_cache
278 }
279
280 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
281 &self.webhook_cache
282 }
283
284 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
285 &self.messaging_rate
286 }
287
288 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
289 self.mocks.as_ref()
290 }
291
292 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
293 self.timer_handles.lock().extend(handles);
294 }
295
296 pub fn get_secret(&self, key: &str) -> Result<String> {
297 if crate::provider_core_only::is_enabled() {
298 bail!(crate::provider_core_only::blocked_message("secrets"))
299 }
300 if !self.config.secrets_policy.is_allowed(key) {
301 bail!("secret {key} is not permitted by bindings policy");
302 }
303 let ctx = self.config.tenant_ctx();
304 let bytes = read_secret_blocking(&self.secrets, &ctx, key)
305 .context("failed to read secret from manager")?;
306 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
307 Ok(value)
308 }
309}
310
311impl Drop for TenantRuntime {
312 fn drop(&mut self) {
313 for handle in self.timer_handles.lock().drain(..) {
314 handle.abort();
315 }
316 }
317}
318
319pub struct RateLimiter {
320 allowance: f64,
321 rate: f64,
322 burst: f64,
323 last_check: Instant,
324}
325
326impl RateLimiter {
327 pub fn new(qps: u32, burst: u32) -> Self {
328 let rate = qps.max(1) as f64;
329 let burst = burst.max(1) as f64;
330 Self {
331 allowance: burst,
332 rate,
333 burst,
334 last_check: Instant::now(),
335 }
336 }
337
338 pub fn try_acquire(&mut self) -> bool {
339 let now = Instant::now();
340 let elapsed = now.duration_since(self.last_check).as_secs_f64();
341 self.last_check = now;
342 self.allowance += elapsed * self.rate;
343 if self.allowance > self.burst {
344 self.allowance = self.burst;
345 }
346 if self.allowance < 1.0 {
347 false
348 } else {
349 self.allowance -= 1.0;
350 true
351 }
352 }
353}