1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::PackRuntime;
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::wasi::RunnerWasiPolicy;
26
27const TELEGRAM_CACHE_CAPACITY: usize = 1024;
28const WEBHOOK_CACHE_CAPACITY: usize = 256;
29
30pub struct ActivePacks {
32 inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
33}
34
35impl ActivePacks {
36 pub fn new() -> Self {
37 Self {
38 inner: ArcSwap::from_pointee(HashMap::new()),
39 }
40 }
41
42 pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
43 self.inner.load().get(tenant).cloned()
44 }
45
46 pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
47 self.inner.load_full()
48 }
49
50 pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
51 self.inner.store(Arc::new(next));
52 }
53
54 pub fn len(&self) -> usize {
55 self.inner.load().len()
56 }
57
58 pub fn is_empty(&self) -> bool {
59 self.len() == 0
60 }
61}
62
63impl Default for ActivePacks {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69pub struct TenantRuntime {
71 tenant: String,
72 config: Arc<HostConfig>,
73 packs: Vec<Arc<PackRuntime>>,
74 digests: Vec<Option<String>>,
75 engine: Arc<FlowEngine>,
76 state_machine: Arc<StateMachineRuntime>,
77 http_client: Client,
78 telegram_cache: Mutex<LruCache<i64, StatusCode>>,
79 webhook_cache: Mutex<LruCache<String, Value>>,
80 messaging_rate: Mutex<RateLimiter>,
81 mocks: Option<Arc<MockLayer>>,
82 timer_handles: Mutex<Vec<JoinHandle<()>>>,
83 secrets: DynSecretsManager,
84}
85
86impl TenantRuntime {
87 #[allow(clippy::too_many_arguments)]
88 pub async fn load(
89 pack_path: &Path,
90 config: Arc<HostConfig>,
91 mocks: Option<Arc<MockLayer>>,
92 archive_source: Option<&Path>,
93 digest: Option<String>,
94 wasi_policy: Arc<RunnerWasiPolicy>,
95 session_host: Arc<dyn SessionHost>,
96 session_store: DynSessionStore,
97 state_store: DynStateStore,
98 state_host: Arc<dyn StateHost>,
99 secrets_manager: DynSecretsManager,
100 ) -> Result<Arc<Self>> {
101 let oauth_config = config.oauth_broker_config();
102 let pack = Arc::new(
103 PackRuntime::load(
104 pack_path,
105 Arc::clone(&config),
106 mocks.clone(),
107 archive_source,
108 Some(Arc::clone(&session_store)),
109 Some(Arc::clone(&state_store)),
110 Arc::clone(&wasi_policy),
111 Arc::clone(&secrets_manager),
112 oauth_config.clone(),
113 true,
114 )
115 .await
116 .with_context(|| {
117 format!(
118 "failed to load pack {} for tenant {}",
119 pack_path.display(),
120 config.tenant
121 )
122 })?,
123 );
124 Self::from_packs(
125 config,
126 vec![(pack, digest)],
127 mocks,
128 session_host,
129 session_store,
130 state_store,
131 state_host,
132 secrets_manager,
133 )
134 .await
135 }
136
137 #[allow(clippy::too_many_arguments)]
138 pub async fn from_packs(
139 config: Arc<HostConfig>,
140 packs: Vec<(Arc<PackRuntime>, Option<String>)>,
141 mocks: Option<Arc<MockLayer>>,
142 session_host: Arc<dyn SessionHost>,
143 session_store: DynSessionStore,
144 _state_store: DynStateStore,
145 state_host: Arc<dyn StateHost>,
146 secrets_manager: DynSecretsManager,
147 ) -> Result<Arc<Self>> {
148 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
149 .expect("telegram cache capacity must be > 0");
150 let webhook_capacity =
151 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
152 let pack_runtimes = packs
153 .iter()
154 .map(|(pack, _)| Arc::clone(pack))
155 .collect::<Vec<_>>();
156 let digests = packs
157 .iter()
158 .map(|(_, digest)| digest.clone())
159 .collect::<Vec<_>>();
160 let engine = Arc::new(
161 FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
162 .await
163 .context("failed to prime flow engine")?,
164 );
165 let state_machine = Arc::new(
166 StateMachineRuntime::from_flow_engine(
167 Arc::clone(&config),
168 Arc::clone(&engine),
169 session_host,
170 session_store,
171 state_host,
172 Arc::clone(&secrets_manager),
173 mocks.clone(),
174 )
175 .context("failed to initialise state machine runtime")?,
176 );
177 let http_client = Client::builder().build()?;
178 let rate_limits = config.rate_limits.clone();
179 Ok(Arc::new(Self {
180 tenant: config.tenant.clone(),
181 config,
182 packs: pack_runtimes,
183 digests,
184 engine,
185 state_machine,
186 http_client,
187 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
188 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
189 messaging_rate: Mutex::new(RateLimiter::new(
190 rate_limits.messaging_send_qps,
191 rate_limits.messaging_burst,
192 )),
193 mocks,
194 timer_handles: Mutex::new(Vec::new()),
195 secrets: secrets_manager,
196 }))
197 }
198
199 pub fn tenant(&self) -> &str {
200 &self.tenant
201 }
202
203 pub fn config(&self) -> &Arc<HostConfig> {
204 &self.config
205 }
206
207 pub fn main_pack(&self) -> &Arc<PackRuntime> {
208 self.packs
209 .first()
210 .expect("tenant runtime must contain at least one pack")
211 }
212
213 pub fn pack(&self) -> Arc<PackRuntime> {
214 Arc::clone(self.main_pack())
215 }
216
217 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
218 self.packs.iter().skip(1).cloned().collect()
219 }
220
221 pub fn engine(&self) -> &Arc<FlowEngine> {
222 &self.engine
223 }
224
225 pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
226 &self.state_machine
227 }
228
229 pub fn http_client(&self) -> &Client {
230 &self.http_client
231 }
232
233 pub fn digest(&self) -> Option<&str> {
234 self.digests.first().and_then(|d| d.as_deref())
235 }
236
237 pub fn overlay_digests(&self) -> Vec<Option<String>> {
238 self.digests.iter().skip(1).cloned().collect()
239 }
240
241 pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
242 &self.telegram_cache
243 }
244
245 pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
246 &self.webhook_cache
247 }
248
249 pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
250 &self.messaging_rate
251 }
252
253 pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
254 self.mocks.as_ref()
255 }
256
257 pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
258 self.timer_handles.lock().extend(handles);
259 }
260
261 pub fn get_secret(&self, key: &str) -> Result<String> {
262 if !self.config.secrets_policy.is_allowed(key) {
263 bail!("secret {key} is not permitted by bindings policy");
264 }
265 let bytes = read_secret_blocking(&self.secrets, key)
266 .context("failed to read secret from manager")?;
267 let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
268 Ok(value)
269 }
270}
271
272impl Drop for TenantRuntime {
273 fn drop(&mut self) {
274 for handle in self.timer_handles.lock().drain(..) {
275 handle.abort();
276 }
277 }
278}
279
280pub struct RateLimiter {
281 allowance: f64,
282 rate: f64,
283 burst: f64,
284 last_check: Instant,
285}
286
287impl RateLimiter {
288 pub fn new(qps: u32, burst: u32) -> Self {
289 let rate = qps.max(1) as f64;
290 let burst = burst.max(1) as f64;
291 Self {
292 allowance: burst,
293 rate,
294 burst,
295 last_check: Instant::now(),
296 }
297 }
298
299 pub fn try_acquire(&mut self) -> bool {
300 let now = Instant::now();
301 let elapsed = now.duration_since(self.last_check).as_secs_f64();
302 self.last_check = now;
303 self.allowance += elapsed * self.rate;
304 if self.allowance > self.burst {
305 self.allowance = self.burst;
306 }
307 if self.allowance < 1.0 {
308 false
309 } else {
310 self.allowance -= 1.0;
311 true
312 }
313 }
314}