1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::PackRuntime;
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::storage::session::DynSessionStore;
23use crate::storage::state::DynStateStore;
24use crate::wasi::RunnerWasiPolicy;
25
26const TELEGRAM_CACHE_CAPACITY: usize = 1024;
27const WEBHOOK_CACHE_CAPACITY: usize = 256;
28
29pub struct ActivePacks {
31 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
32}
33
34impl ActivePacks {
35 pub fn new() -> Self {
36 Self {
37 inner: ArcSwap::from_pointee(HashMap::new()),
38 }
39 }
40
41 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
42 self.inner.load().get(tenant).cloned()
43 }
44
45 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
46 self.inner.load_full()
47 }
48
49 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
50 self.inner.store(Arc::new(next));
51 }
52
53 pub fn len(&self) -> usize {
54 self.inner.load().len()
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.len() == 0
59 }
60}
61
62impl Default for ActivePacks {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68pub struct TenantRuntime {
70 tenant: String,
71 config: Arc<HostConfig>,
72 packs: Vec<Arc<PackRuntime>>,
73 digests: Vec<Option<String>>,
74 engine: Arc<FlowEngine>,
75 state_machine: Arc<StateMachineRuntime>,
76 http_client: Client,
77 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
78 webhook_cache: Mutex<LruCache<String, Value>>,
79 messaging_rate: Mutex<RateLimiter>,
80 mocks: Option<Arc<MockLayer>>,
81 timer_handles: Mutex<Vec<JoinHandle<()>>>,
82}
83
84impl TenantRuntime {
85 #[allow(clippy::too_many_arguments)]
86 pub async fn load(
87 pack_path: &Path,
88 config: Arc<HostConfig>,
89 mocks: Option<Arc<MockLayer>>,
90 archive_source: Option<&Path>,
91 digest: Option<String>,
92 wasi_policy: Arc<RunnerWasiPolicy>,
93 session_host: Arc<dyn SessionHost>,
94 session_store: DynSessionStore,
95 state_store: DynStateStore,
96 state_host: Arc<dyn StateHost>,
97 ) -> Result<Arc<Self>> {
98 let pack = Arc::new(
99 PackRuntime::load(
100 pack_path,
101 Arc::clone(&config),
102 mocks.clone(),
103 archive_source,
104 Some(Arc::clone(&session_store)),
105 Some(Arc::clone(&state_store)),
106 Arc::clone(&wasi_policy),
107 true,
108 )
109 .await
110 .with_context(|| {
111 format!(
112 "failed to load pack {} for tenant {}",
113 pack_path.display(),
114 config.tenant
115 )
116 })?,
117 );
118 Self::from_packs(
119 config,
120 vec![(pack, digest)],
121 mocks,
122 session_host,
123 session_store,
124 state_store,
125 state_host,
126 )
127 .await
128 }
129
130 pub async fn from_packs(
131 config: Arc<HostConfig>,
132 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
133 mocks: Option<Arc<MockLayer>>,
134 session_host: Arc<dyn SessionHost>,
135 session_store: DynSessionStore,
136 _state_store: DynStateStore,
137 state_host: Arc<dyn StateHost>,
138 ) -> Result<Arc<Self>> {
139 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
140 .expect("telegram cache capacity must be > 0");
141 let webhook_capacity =
142 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
143 let pack_runtimes = packs
144 .iter()
145 .map(|(pack, _)| Arc::clone(pack))
146 .collect::<Vec<_>>();
147 let digests = packs
148 .iter()
149 .map(|(_, digest)| digest.clone())
150 .collect::<Vec<_>>();
151 let engine = Arc::new(
152 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
153 .await
154 .context("failed to prime flow engine")?,
155 );
156 let state_machine = Arc::new(
157 StateMachineRuntime::from_flow_engine(
158 Arc::clone(&config),
159 Arc::clone(&engine),
160 session_host,
161 session_store,
162 state_host,
163 mocks.clone(),
164 )
165 .context("failed to initialise state machine runtime")?,
166 );
167 let http_client = Client::builder().build()?;
168 let rate_limits = config.rate_limits.clone();
169 Ok(Arc::new(Self {
170 tenant: config.tenant.clone(),
171 config,
172 packs: pack_runtimes,
173 digests,
174 engine,
175 state_machine,
176 http_client,
177 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
178 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
179 messaging_rate: Mutex::new(RateLimiter::new(
180 rate_limits.messaging_send_qps,
181 rate_limits.messaging_burst,
182 )),
183 mocks,
184 timer_handles: Mutex::new(Vec::new()),
185 }))
186 }
187
188 pub fn tenant(&self) -> &str {
189 &self.tenant
190 }
191
192 pub fn config(&self) -> &Arc<HostConfig> {
193 &self.config
194 }
195
196 pub fn main_pack(&self) -> &Arc<PackRuntime> {
197 self.packs
198 .first()
199 .expect("tenant runtime must contain at least one pack")
200 }
201
202 pub fn pack(&self) -> Arc<PackRuntime> {
203 Arc::clone(self.main_pack())
204 }
205
206 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
207 self.packs.iter().skip(1).cloned().collect()
208 }
209
210 pub fn engine(&self) -> &Arc<FlowEngine> {
211 &self.engine
212 }
213
214 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
215 &self.state_machine
216 }
217
218 pub fn http_client(&self) -> &Client {
219 &self.http_client
220 }
221
222 pub fn digest(&self) -> Option<&str> {
223 self.digests.first().and_then(|d| d.as_deref())
224 }
225
226 pub fn overlay_digests(&self) -> Vec<Option<String>> {
227 self.digests.iter().skip(1).cloned().collect()
228 }
229
230 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
231 &self.telegram_cache
232 }
233
234 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
235 &self.webhook_cache
236 }
237
238 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
239 &self.messaging_rate
240 }
241
242 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
243 self.mocks.as_ref()
244 }
245
246 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
247 self.timer_handles.lock().extend(handles);
248 }
249
250 pub fn get_secret(&self, key: &str) -> Result<String> {
251 if !self.config.secrets_policy.is_allowed(key) {
252 bail!("secret {key} is not permitted by bindings policy");
253 }
254 if let Ok(value) = std::env::var(key) {
255 return Ok(value);
256 }
257 bail!("secret {key} not found in environment");
258 }
259}
260
261impl Drop for TenantRuntime {
262 fn drop(&mut self) {
263 for handle in self.timer_handles.lock().drain(..) {
264 handle.abort();
265 }
266 }
267}
268
269pub struct RateLimiter {
270 allowance: f64,
271 rate: f64,
272 burst: f64,
273 last_check: Instant,
274}
275
276impl RateLimiter {
277 pub fn new(qps: u32, burst: u32) -> Self {
278 let rate = qps.max(1) as f64;
279 let burst = burst.max(1) as f64;
280 Self {
281 allowance: burst,
282 rate,
283 burst,
284 last_check: Instant::now(),
285 }
286 }
287
288 pub fn try_acquire(&mut self) -> bool {
289 let now = Instant::now();
290 let elapsed = now.duration_since(self.last_check).as_secs_f64();
291 self.last_check = now;
292 self.allowance += elapsed * self.rate;
293 if self.allowance > self.burst {
294 self.allowance = self.burst;
295 }
296 if self.allowance < 1.0 {
297 false
298 } else {
299 self.allowance -= 1.0;
300 true
301 }
302 }
303}