1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::PackRuntime;
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::wasi::RunnerWasiPolicy;
26
27const TELEGRAM_CACHE_CAPACITY: usize = 1024;
28const WEBHOOK_CACHE_CAPACITY: usize = 256;
29
30pub struct ActivePacks {
32 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
33}
34
35impl ActivePacks {
36 pub fn new() -> Self {
37 Self {
38 inner: ArcSwap::from_pointee(HashMap::new()),
39 }
40 }
41
42 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
43 self.inner.load().get(tenant).cloned()
44 }
45
46 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
47 self.inner.load_full()
48 }
49
50 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
51 self.inner.store(Arc::new(next));
52 }
53
54 pub fn len(&self) -> usize {
55 self.inner.load().len()
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.len() == 0
60 }
61}
62
63impl Default for ActivePacks {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69pub struct TenantRuntime {
71 tenant: String,
72 config: Arc<HostConfig>,
73 packs: Vec<Arc<PackRuntime>>,
74 digests: Vec<Option<String>>,
75 engine: Arc<FlowEngine>,
76 state_machine: Arc<StateMachineRuntime>,
77 http_client: Client,
78 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
79 webhook_cache: Mutex<LruCache<String, Value>>,
80 messaging_rate: Mutex<RateLimiter>,
81 mocks: Option<Arc<MockLayer>>,
82 timer_handles: Mutex<Vec<JoinHandle<()>>>,
83 secrets: DynSecretsManager,
84}
85
86impl TenantRuntime {
87 #[allow(clippy::too_many_arguments)]
88 pub async fn load(
89 pack_path: &Path,
90 config: Arc<HostConfig>,
91 mocks: Option<Arc<MockLayer>>,
92 archive_source: Option<&Path>,
93 digest: Option<String>,
94 wasi_policy: Arc<RunnerWasiPolicy>,
95 session_host: Arc<dyn SessionHost>,
96 session_store: DynSessionStore,
97 state_store: DynStateStore,
98 state_host: Arc<dyn StateHost>,
99 secrets_manager: DynSecretsManager,
100 ) -> Result<Arc<Self>> {
101 let pack = Arc::new(
102 PackRuntime::load(
103 pack_path,
104 Arc::clone(&config),
105 mocks.clone(),
106 archive_source,
107 Some(Arc::clone(&session_store)),
108 Some(Arc::clone(&state_store)),
109 Arc::clone(&wasi_policy),
110 Arc::clone(&secrets_manager),
111 true,
112 )
113 .await
114 .with_context(|| {
115 format!(
116 "failed to load pack {} for tenant {}",
117 pack_path.display(),
118 config.tenant
119 )
120 })?,
121 );
122 Self::from_packs(
123 config,
124 vec![(pack, digest)],
125 mocks,
126 session_host,
127 session_store,
128 state_store,
129 state_host,
130 secrets_manager,
131 )
132 .await
133 }
134
135 #[allow(clippy::too_many_arguments)]
136 pub async fn from_packs(
137 config: Arc<HostConfig>,
138 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
139 mocks: Option<Arc<MockLayer>>,
140 session_host: Arc<dyn SessionHost>,
141 session_store: DynSessionStore,
142 _state_store: DynStateStore,
143 state_host: Arc<dyn StateHost>,
144 secrets_manager: DynSecretsManager,
145 ) -> Result<Arc<Self>> {
146 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
147 .expect("telegram cache capacity must be > 0");
148 let webhook_capacity =
149 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
150 let pack_runtimes = packs
151 .iter()
152 .map(|(pack, _)| Arc::clone(pack))
153 .collect::<Vec<_>>();
154 let digests = packs
155 .iter()
156 .map(|(_, digest)| digest.clone())
157 .collect::<Vec<_>>();
158 let engine = Arc::new(
159 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
160 .await
161 .context("failed to prime flow engine")?,
162 );
163 let state_machine = Arc::new(
164 StateMachineRuntime::from_flow_engine(
165 Arc::clone(&config),
166 Arc::clone(&engine),
167 session_host,
168 session_store,
169 state_host,
170 Arc::clone(&secrets_manager),
171 mocks.clone(),
172 )
173 .context("failed to initialise state machine runtime")?,
174 );
175 let http_client = Client::builder().build()?;
176 let rate_limits = config.rate_limits.clone();
177 Ok(Arc::new(Self {
178 tenant: config.tenant.clone(),
179 config,
180 packs: pack_runtimes,
181 digests,
182 engine,
183 state_machine,
184 http_client,
185 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
186 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
187 messaging_rate: Mutex::new(RateLimiter::new(
188 rate_limits.messaging_send_qps,
189 rate_limits.messaging_burst,
190 )),
191 mocks,
192 timer_handles: Mutex::new(Vec::new()),
193 secrets: secrets_manager,
194 }))
195 }
196
197 pub fn tenant(&self) -> &str {
198 &self.tenant
199 }
200
201 pub fn config(&self) -> &Arc<HostConfig> {
202 &self.config
203 }
204
205 pub fn main_pack(&self) -> &Arc<PackRuntime> {
206 self.packs
207 .first()
208 .expect("tenant runtime must contain at least one pack")
209 }
210
211 pub fn pack(&self) -> Arc<PackRuntime> {
212 Arc::clone(self.main_pack())
213 }
214
215 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
216 self.packs.iter().skip(1).cloned().collect()
217 }
218
219 pub fn engine(&self) -> &Arc<FlowEngine> {
220 &self.engine
221 }
222
223 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
224 &self.state_machine
225 }
226
227 pub fn http_client(&self) -> &Client {
228 &self.http_client
229 }
230
231 pub fn digest(&self) -> Option<&str> {
232 self.digests.first().and_then(|d| d.as_deref())
233 }
234
235 pub fn overlay_digests(&self) -> Vec<Option<String>> {
236 self.digests.iter().skip(1).cloned().collect()
237 }
238
239 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
240 &self.telegram_cache
241 }
242
243 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
244 &self.webhook_cache
245 }
246
247 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
248 &self.messaging_rate
249 }
250
251 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
252 self.mocks.as_ref()
253 }
254
255 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
256 self.timer_handles.lock().extend(handles);
257 }
258
259 pub fn get_secret(&self, key: &str) -> Result<String> {
260 if !self.config.secrets_policy.is_allowed(key) {
261 bail!("secret {key} is not permitted by bindings policy");
262 }
263 let bytes = read_secret_blocking(&self.secrets, key)
264 .context("failed to read secret from manager")?;
265 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
266 Ok(value)
267 }
268}
269
270impl Drop for TenantRuntime {
271 fn drop(&mut self) {
272 for handle in self.timer_handles.lock().drain(..) {
273 handle.abort();
274 }
275 }
276}
277
278pub struct RateLimiter {
279 allowance: f64,
280 rate: f64,
281 burst: f64,
282 last_check: Instant,
283}
284
285impl RateLimiter {
286 pub fn new(qps: u32, burst: u32) -> Self {
287 let rate = qps.max(1) as f64;
288 let burst = burst.max(1) as f64;
289 Self {
290 allowance: burst,
291 rate,
292 burst,
293 last_check: Instant::now(),
294 }
295 }
296
297 pub fn try_acquire(&mut self) -> bool {
298 let now = Instant::now();
299 let elapsed = now.duration_since(self.last_check).as_secs_f64();
300 self.last_check = now;
301 self.allowance += elapsed * self.rate;
302 if self.allowance > self.burst {
303 self.allowance = self.burst;
304 }
305 if self.allowance < 1.0 {
306 false
307 } else {
308 self.allowance -= 1.0;
309 true
310 }
311 }
312}