greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::PackRuntime;
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::storage::session::DynSessionStore;
23use crate::storage::state::DynStateStore;
24use crate::wasi::RunnerWasiPolicy;
25
26const TELEGRAM_CACHE_CAPACITY: usize = 1024;
27const WEBHOOK_CACHE_CAPACITY: usize = 256;
28
29/// Atomically swapped view of live tenant runtimes.
30pub struct ActivePacks {
31    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
32}
33
34impl ActivePacks {
35    pub fn new() -> Self {
36        Self {
37            inner: ArcSwap::from_pointee(HashMap::new()),
38        }
39    }
40
41    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
42        self.inner.load().get(tenant).cloned()
43    }
44
45    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
46        self.inner.load_full()
47    }
48
49    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
50        self.inner.store(Arc::new(next));
51    }
52
53    pub fn len(&self) -> usize {
54        self.inner.load().len()
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.len() == 0
59    }
60}
61
62impl Default for ActivePacks {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68/// Runtime bundle for a tenant pack.
69pub struct TenantRuntime {
70    tenant: String,
71    config: Arc<HostConfig>,
72    packs: Vec<Arc<PackRuntime>>,
73    digests: Vec<Option<String>>,
74    engine: Arc<FlowEngine>,
75    state_machine: Arc<StateMachineRuntime>,
76    http_client: Client,
77    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
78    webhook_cache: Mutex<LruCache<String, Value>>,
79    messaging_rate: Mutex<RateLimiter>,
80    mocks: Option<Arc<MockLayer>>,
81    timer_handles: Mutex<Vec<JoinHandle<()>>>,
82}
83
84impl TenantRuntime {
85    #[allow(clippy::too_many_arguments)]
86    pub async fn load(
87        pack_path: &Path,
88        config: Arc<HostConfig>,
89        mocks: Option<Arc<MockLayer>>,
90        archive_source: Option<&Path>,
91        digest: Option<String>,
92        wasi_policy: Arc<RunnerWasiPolicy>,
93        session_host: Arc<dyn SessionHost>,
94        session_store: DynSessionStore,
95        state_store: DynStateStore,
96        state_host: Arc<dyn StateHost>,
97    ) -> Result<Arc<Self>> {
98        let pack = Arc::new(
99            PackRuntime::load(
100                pack_path,
101                Arc::clone(&config),
102                mocks.clone(),
103                archive_source,
104                Some(Arc::clone(&session_store)),
105                Some(Arc::clone(&state_store)),
106                Arc::clone(&wasi_policy),
107                true,
108            )
109            .await
110            .with_context(|| {
111                format!(
112                    "failed to load pack {} for tenant {}",
113                    pack_path.display(),
114                    config.tenant
115                )
116            })?,
117        );
118        Self::from_packs(
119            config,
120            vec![(pack, digest)],
121            mocks,
122            session_host,
123            session_store,
124            state_store,
125            state_host,
126        )
127        .await
128    }
129
130    pub async fn from_packs(
131        config: Arc<HostConfig>,
132        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
133        mocks: Option<Arc<MockLayer>>,
134        session_host: Arc<dyn SessionHost>,
135        session_store: DynSessionStore,
136        _state_store: DynStateStore,
137        state_host: Arc<dyn StateHost>,
138    ) -> Result<Arc<Self>> {
139        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
140            .expect("telegram cache capacity must be > 0");
141        let webhook_capacity =
142            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
143        let pack_runtimes = packs
144            .iter()
145            .map(|(pack, _)| Arc::clone(pack))
146            .collect::<Vec<_>>();
147        let digests = packs
148            .iter()
149            .map(|(_, digest)| digest.clone())
150            .collect::<Vec<_>>();
151        let engine = Arc::new(
152            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
153                .await
154                .context("failed to prime flow engine")?,
155        );
156        let state_machine = Arc::new(
157            StateMachineRuntime::from_flow_engine(
158                Arc::clone(&config),
159                Arc::clone(&engine),
160                session_host,
161                session_store,
162                state_host,
163                mocks.clone(),
164            )
165            .context("failed to initialise state machine runtime")?,
166        );
167        let http_client = Client::builder().build()?;
168        let rate_limits = config.rate_limits.clone();
169        Ok(Arc::new(Self {
170            tenant: config.tenant.clone(),
171            config,
172            packs: pack_runtimes,
173            digests,
174            engine,
175            state_machine,
176            http_client,
177            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
178            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
179            messaging_rate: Mutex::new(RateLimiter::new(
180                rate_limits.messaging_send_qps,
181                rate_limits.messaging_burst,
182            )),
183            mocks,
184            timer_handles: Mutex::new(Vec::new()),
185        }))
186    }
187
188    pub fn tenant(&self) -> &str {
189        &self.tenant
190    }
191
192    pub fn config(&self) -> &Arc<HostConfig> {
193        &self.config
194    }
195
196    pub fn main_pack(&self) -> &Arc<PackRuntime> {
197        self.packs
198            .first()
199            .expect("tenant runtime must contain at least one pack")
200    }
201
202    pub fn pack(&self) -> Arc<PackRuntime> {
203        Arc::clone(self.main_pack())
204    }
205
206    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
207        self.packs.iter().skip(1).cloned().collect()
208    }
209
210    pub fn engine(&self) -> &Arc<FlowEngine> {
211        &self.engine
212    }
213
214    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
215        &self.state_machine
216    }
217
218    pub fn http_client(&self) -> &Client {
219        &self.http_client
220    }
221
222    pub fn digest(&self) -> Option<&str> {
223        self.digests.first().and_then(|d| d.as_deref())
224    }
225
226    pub fn overlay_digests(&self) -> Vec<Option<String>> {
227        self.digests.iter().skip(1).cloned().collect()
228    }
229
230    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
231        &self.telegram_cache
232    }
233
234    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
235        &self.webhook_cache
236    }
237
238    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
239        &self.messaging_rate
240    }
241
242    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
243        self.mocks.as_ref()
244    }
245
246    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
247        self.timer_handles.lock().extend(handles);
248    }
249
250    pub fn get_secret(&self, key: &str) -> Result<String> {
251        if !self.config.secrets_policy.is_allowed(key) {
252            bail!("secret {key} is not permitted by bindings policy");
253        }
254        if let Ok(value) = std::env::var(key) {
255            return Ok(value);
256        }
257        bail!("secret {key} not found in environment");
258    }
259}
260
261impl Drop for TenantRuntime {
262    fn drop(&mut self) {
263        for handle in self.timer_handles.lock().drain(..) {
264            handle.abort();
265        }
266    }
267}
268
269pub struct RateLimiter {
270    allowance: f64,
271    rate: f64,
272    burst: f64,
273    last_check: Instant,
274}
275
276impl RateLimiter {
277    pub fn new(qps: u32, burst: u32) -> Self {
278        let rate = qps.max(1) as f64;
279        let burst = burst.max(1) as f64;
280        Self {
281            allowance: burst,
282            rate,
283            burst,
284            last_check: Instant::now(),
285        }
286    }
287
288    pub fn try_acquire(&mut self) -> bool {
289        let now = Instant::now();
290        let elapsed = now.duration_since(self.last_check).as_secs_f64();
291        self.last_check = now;
292        self.allowance += elapsed * self.rate;
293        if self.allowance > self.burst {
294            self.allowance = self.burst;
295        }
296        if self.allowance < 1.0 {
297            false
298        } else {
299            self.allowance -= 1.0;
300            true
301        }
302    }
303}