1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::{ComponentResolution, PackRuntime};
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::wasi::RunnerWasiPolicy;
26use greentic_types::SecretRequirement;
27
28const TELEGRAM_CACHE_CAPACITY: usize = 1024;
29const WEBHOOK_CACHE_CAPACITY: usize = 256;
30
31pub struct ActivePacks {
33 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
34}
35
36impl ActivePacks {
37 pub fn new() -> Self {
38 Self {
39 inner: ArcSwap::from_pointee(HashMap::new()),
40 }
41 }
42
43 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
44 self.inner.load().get(tenant).cloned()
45 }
46
47 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
48 self.inner.load_full()
49 }
50
51 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
52 self.inner.store(Arc::new(next));
53 }
54
55 pub fn len(&self) -> usize {
56 self.inner.load().len()
57 }
58
59 pub fn is_empty(&self) -> bool {
60 self.len() == 0
61 }
62}
63
64impl Default for ActivePacks {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70pub struct TenantRuntime {
72 tenant: String,
73 config: Arc<HostConfig>,
74 packs: Vec<Arc<PackRuntime>>,
75 digests: Vec<Option<String>>,
76 engine: Arc<FlowEngine>,
77 state_machine: Arc<StateMachineRuntime>,
78 http_client: Client,
79 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
80 webhook_cache: Mutex<LruCache<String, Value>>,
81 messaging_rate: Mutex<RateLimiter>,
82 mocks: Option<Arc<MockLayer>>,
83 timer_handles: Mutex<Vec<JoinHandle<()>>>,
84 secrets: DynSecretsManager,
85}
86
87impl TenantRuntime {
88 #[allow(clippy::too_many_arguments)]
89 pub async fn load(
90 pack_path: &Path,
91 config: Arc<HostConfig>,
92 mocks: Option<Arc<MockLayer>>,
93 archive_source: Option<&Path>,
94 digest: Option<String>,
95 wasi_policy: Arc<RunnerWasiPolicy>,
96 session_host: Arc<dyn SessionHost>,
97 session_store: DynSessionStore,
98 state_store: DynStateStore,
99 state_host: Arc<dyn StateHost>,
100 secrets_manager: DynSecretsManager,
101 ) -> Result<Arc<Self>> {
102 let oauth_config = config.oauth_broker_config();
103 let pack = Arc::new(
104 PackRuntime::load(
105 pack_path,
106 Arc::clone(&config),
107 mocks.clone(),
108 archive_source,
109 Some(Arc::clone(&session_store)),
110 Some(Arc::clone(&state_store)),
111 Arc::clone(&wasi_policy),
112 Arc::clone(&secrets_manager),
113 oauth_config.clone(),
114 true,
115 ComponentResolution::default(),
116 )
117 .await
118 .with_context(|| {
119 format!(
120 "failed to load pack {} for tenant {}",
121 pack_path.display(),
122 config.tenant
123 )
124 })?,
125 );
126 Self::from_packs(
127 config,
128 vec![(pack, digest)],
129 mocks,
130 session_host,
131 session_store,
132 state_store,
133 state_host,
134 secrets_manager,
135 )
136 .await
137 }
138
139 #[allow(clippy::too_many_arguments)]
140 pub async fn from_packs(
141 config: Arc<HostConfig>,
142 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
143 mocks: Option<Arc<MockLayer>>,
144 session_host: Arc<dyn SessionHost>,
145 session_store: DynSessionStore,
146 _state_store: DynStateStore,
147 state_host: Arc<dyn StateHost>,
148 secrets_manager: DynSecretsManager,
149 ) -> Result<Arc<Self>> {
150 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
151 .expect("telegram cache capacity must be > 0");
152 let webhook_capacity =
153 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
154 let pack_runtimes = packs
155 .iter()
156 .map(|(pack, _)| Arc::clone(pack))
157 .collect::<Vec<_>>();
158 let digests = packs
159 .iter()
160 .map(|(_, digest)| digest.clone())
161 .collect::<Vec<_>>();
162 let engine = Arc::new(
163 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
164 .await
165 .context("failed to prime flow engine")?,
166 );
167 let state_machine = Arc::new(
168 StateMachineRuntime::from_flow_engine(
169 Arc::clone(&config),
170 Arc::clone(&engine),
171 session_host,
172 session_store,
173 state_host,
174 Arc::clone(&secrets_manager),
175 mocks.clone(),
176 )
177 .context("failed to initialise state machine runtime")?,
178 );
179 let http_client = Client::builder().build()?;
180 let rate_limits = config.rate_limits.clone();
181 Ok(Arc::new(Self {
182 tenant: config.tenant.clone(),
183 config,
184 packs: pack_runtimes,
185 digests,
186 engine,
187 state_machine,
188 http_client,
189 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
190 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
191 messaging_rate: Mutex::new(RateLimiter::new(
192 rate_limits.messaging_send_qps,
193 rate_limits.messaging_burst,
194 )),
195 mocks,
196 timer_handles: Mutex::new(Vec::new()),
197 secrets: secrets_manager,
198 }))
199 }
200
201 pub fn tenant(&self) -> &str {
202 &self.tenant
203 }
204
205 pub fn config(&self) -> &Arc<HostConfig> {
206 &self.config
207 }
208
209 pub fn main_pack(&self) -> &Arc<PackRuntime> {
210 self.packs
211 .first()
212 .expect("tenant runtime must contain at least one pack")
213 }
214
215 pub fn pack(&self) -> Arc<PackRuntime> {
216 Arc::clone(self.main_pack())
217 }
218
219 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
220 self.packs.iter().skip(1).cloned().collect()
221 }
222
223 pub fn engine(&self) -> &Arc<FlowEngine> {
224 &self.engine
225 }
226
227 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
228 &self.state_machine
229 }
230
231 pub fn http_client(&self) -> &Client {
232 &self.http_client
233 }
234
235 pub fn digest(&self) -> Option<&str> {
236 self.digests.first().and_then(|d| d.as_deref())
237 }
238
239 pub fn overlay_digests(&self) -> Vec<Option<String>> {
240 self.digests.iter().skip(1).cloned().collect()
241 }
242
243 pub fn required_secrets(&self) -> Vec<SecretRequirement> {
244 self.packs
245 .iter()
246 .flat_map(|pack| pack.required_secrets().iter().cloned())
247 .collect()
248 }
249
250 pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
251 self.packs
252 .iter()
253 .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
254 .collect()
255 }
256
257 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
258 &self.telegram_cache
259 }
260
261 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
262 &self.webhook_cache
263 }
264
265 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
266 &self.messaging_rate
267 }
268
269 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
270 self.mocks.as_ref()
271 }
272
273 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
274 self.timer_handles.lock().extend(handles);
275 }
276
277 pub fn get_secret(&self, key: &str) -> Result<String> {
278 if crate::provider_core_only::is_enabled() {
279 bail!(crate::provider_core_only::blocked_message("secrets"))
280 }
281 if !self.config.secrets_policy.is_allowed(key) {
282 bail!("secret {key} is not permitted by bindings policy");
283 }
284 let bytes = read_secret_blocking(&self.secrets, key)
285 .context("failed to read secret from manager")?;
286 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
287 Ok(value)
288 }
289}
290
291impl Drop for TenantRuntime {
292 fn drop(&mut self) {
293 for handle in self.timer_handles.lock().drain(..) {
294 handle.abort();
295 }
296 }
297}
298
299pub struct RateLimiter {
300 allowance: f64,
301 rate: f64,
302 burst: f64,
303 last_check: Instant,
304}
305
306impl RateLimiter {
307 pub fn new(qps: u32, burst: u32) -> Self {
308 let rate = qps.max(1) as f64;
309 let burst = burst.max(1) as f64;
310 Self {
311 allowance: burst,
312 rate,
313 burst,
314 last_check: Instant::now(),
315 }
316 }
317
318 pub fn try_acquire(&mut self) -> bool {
319 let now = Instant::now();
320 let elapsed = now.duration_since(self.last_check).as_secs_f64();
321 self.last_check = now;
322 self.allowance += elapsed * self.rate;
323 if self.allowance > self.burst {
324 self.allowance = self.burst;
325 }
326 if self.allowance < 1.0 {
327 false
328 } else {
329 self.allowance -= 1.0;
330 true
331 }
332 }
333}