greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::PackRuntime;
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::wasi::RunnerWasiPolicy;
26
27const TELEGRAM_CACHE_CAPACITY: usize = 1024;
28const WEBHOOK_CACHE_CAPACITY: usize = 256;
29
30/// Atomically swapped view of live tenant runtimes.
31pub struct ActivePacks {
32    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
33}
34
35impl ActivePacks {
36    pub fn new() -> Self {
37        Self {
38            inner: ArcSwap::from_pointee(HashMap::new()),
39        }
40    }
41
42    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
43        self.inner.load().get(tenant).cloned()
44    }
45
46    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
47        self.inner.load_full()
48    }
49
50    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
51        self.inner.store(Arc::new(next));
52    }
53
54    pub fn len(&self) -> usize {
55        self.inner.load().len()
56    }
57
58    pub fn is_empty(&self) -> bool {
59        self.len() == 0
60    }
61}
62
63impl Default for ActivePacks {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69/// Runtime bundle for a tenant pack.
70pub struct TenantRuntime {
71    tenant: String,
72    config: Arc<HostConfig>,
73    packs: Vec<Arc<PackRuntime>>,
74    digests: Vec<Option<String>>,
75    engine: Arc<FlowEngine>,
76    state_machine: Arc<StateMachineRuntime>,
77    http_client: Client,
78    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
79    webhook_cache: Mutex<LruCache<String, Value>>,
80    messaging_rate: Mutex<RateLimiter>,
81    mocks: Option<Arc<MockLayer>>,
82    timer_handles: Mutex<Vec<JoinHandle<()>>>,
83    secrets: DynSecretsManager,
84}
85
86impl TenantRuntime {
87    #[allow(clippy::too_many_arguments)]
88    pub async fn load(
89        pack_path: &Path,
90        config: Arc<HostConfig>,
91        mocks: Option<Arc<MockLayer>>,
92        archive_source: Option<&Path>,
93        digest: Option<String>,
94        wasi_policy: Arc<RunnerWasiPolicy>,
95        session_host: Arc<dyn SessionHost>,
96        session_store: DynSessionStore,
97        state_store: DynStateStore,
98        state_host: Arc<dyn StateHost>,
99        secrets_manager: DynSecretsManager,
100    ) -> Result<Arc<Self>> {
101        let oauth_config = config.oauth_broker_config();
102        let pack = Arc::new(
103            PackRuntime::load(
104                pack_path,
105                Arc::clone(&config),
106                mocks.clone(),
107                archive_source,
108                Some(Arc::clone(&session_store)),
109                Some(Arc::clone(&state_store)),
110                Arc::clone(&wasi_policy),
111                Arc::clone(&secrets_manager),
112                oauth_config.clone(),
113                true,
114            )
115            .await
116            .with_context(|| {
117                format!(
118                    "failed to load pack {} for tenant {}",
119                    pack_path.display(),
120                    config.tenant
121                )
122            })?,
123        );
124        Self::from_packs(
125            config,
126            vec![(pack, digest)],
127            mocks,
128            session_host,
129            session_store,
130            state_store,
131            state_host,
132            secrets_manager,
133        )
134        .await
135    }
136
137    #[allow(clippy::too_many_arguments)]
138    pub async fn from_packs(
139        config: Arc<HostConfig>,
140        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
141        mocks: Option<Arc<MockLayer>>,
142        session_host: Arc<dyn SessionHost>,
143        session_store: DynSessionStore,
144        _state_store: DynStateStore,
145        state_host: Arc<dyn StateHost>,
146        secrets_manager: DynSecretsManager,
147    ) -> Result<Arc<Self>> {
148        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
149            .expect("telegram cache capacity must be > 0");
150        let webhook_capacity =
151            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
152        let pack_runtimes = packs
153            .iter()
154            .map(|(pack, _)| Arc::clone(pack))
155            .collect::<Vec<_>>();
156        let digests = packs
157            .iter()
158            .map(|(_, digest)| digest.clone())
159            .collect::<Vec<_>>();
160        let engine = Arc::new(
161            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
162                .await
163                .context("failed to prime flow engine")?,
164        );
165        let state_machine = Arc::new(
166            StateMachineRuntime::from_flow_engine(
167                Arc::clone(&config),
168                Arc::clone(&engine),
169                session_host,
170                session_store,
171                state_host,
172                Arc::clone(&secrets_manager),
173                mocks.clone(),
174            )
175            .context("failed to initialise state machine runtime")?,
176        );
177        let http_client = Client::builder().build()?;
178        let rate_limits = config.rate_limits.clone();
179        Ok(Arc::new(Self {
180            tenant: config.tenant.clone(),
181            config,
182            packs: pack_runtimes,
183            digests,
184            engine,
185            state_machine,
186            http_client,
187            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
188            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
189            messaging_rate: Mutex::new(RateLimiter::new(
190                rate_limits.messaging_send_qps,
191                rate_limits.messaging_burst,
192            )),
193            mocks,
194            timer_handles: Mutex::new(Vec::new()),
195            secrets: secrets_manager,
196        }))
197    }
198
199    pub fn tenant(&self) -> &str {
200        &self.tenant
201    }
202
203    pub fn config(&self) -> &Arc<HostConfig> {
204        &self.config
205    }
206
207    pub fn main_pack(&self) -> &Arc<PackRuntime> {
208        self.packs
209            .first()
210            .expect("tenant runtime must contain at least one pack")
211    }
212
213    pub fn pack(&self) -> Arc<PackRuntime> {
214        Arc::clone(self.main_pack())
215    }
216
217    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
218        self.packs.iter().skip(1).cloned().collect()
219    }
220
221    pub fn engine(&self) -> &Arc<FlowEngine> {
222        &self.engine
223    }
224
225    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
226        &self.state_machine
227    }
228
229    pub fn http_client(&self) -> &Client {
230        &self.http_client
231    }
232
233    pub fn digest(&self) -> Option<&str> {
234        self.digests.first().and_then(|d| d.as_deref())
235    }
236
237    pub fn overlay_digests(&self) -> Vec<Option<String>> {
238        self.digests.iter().skip(1).cloned().collect()
239    }
240
241    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
242        &self.telegram_cache
243    }
244
245    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
246        &self.webhook_cache
247    }
248
249    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
250        &self.messaging_rate
251    }
252
253    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
254        self.mocks.as_ref()
255    }
256
257    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
258        self.timer_handles.lock().extend(handles);
259    }
260
261    pub fn get_secret(&self, key: &str) -> Result<String> {
262        if !self.config.secrets_policy.is_allowed(key) {
263            bail!("secret {key} is not permitted by bindings policy");
264        }
265        let bytes = read_secret_blocking(&self.secrets, key)
266            .context("failed to read secret from manager")?;
267        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
268        Ok(value)
269    }
270}
271
272impl Drop for TenantRuntime {
273    fn drop(&mut self) {
274        for handle in self.timer_handles.lock().drain(..) {
275            handle.abort();
276        }
277    }
278}
279
280pub struct RateLimiter {
281    allowance: f64,
282    rate: f64,
283    burst: f64,
284    last_check: Instant,
285}
286
287impl RateLimiter {
288    pub fn new(qps: u32, burst: u32) -> Self {
289        let rate = qps.max(1) as f64;
290        let burst = burst.max(1) as f64;
291        Self {
292            allowance: burst,
293            rate,
294            burst,
295            last_check: Instant::now(),
296        }
297    }
298
299    pub fn try_acquire(&mut self) -> bool {
300        let now = Instant::now();
301        let elapsed = now.duration_since(self.last_check).as_secs_f64();
302        self.last_check = now;
303        self.allowance += elapsed * self.rate;
304        if self.allowance > self.burst {
305            self.allowance = self.burst;
306        }
307        if self.allowance < 1.0 {
308            false
309        } else {
310            self.allowance -= 1.0;
311            true
312        }
313    }
314}