Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::{ComponentResolution, PackRuntime};
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::wasi::RunnerWasiPolicy;
26use greentic_types::SecretRequirement;
27
28const TELEGRAM_CACHE_CAPACITY: usize = 1024;
29const WEBHOOK_CACHE_CAPACITY: usize = 256;
30
31/// Atomically swapped view of live tenant runtimes.
32pub struct ActivePacks {
33    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
34}
35
36impl ActivePacks {
37    pub fn new() -> Self {
38        Self {
39            inner: ArcSwap::from_pointee(HashMap::new()),
40        }
41    }
42
43    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
44        self.inner.load().get(tenant).cloned()
45    }
46
47    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
48        self.inner.load_full()
49    }
50
51    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
52        self.inner.store(Arc::new(next));
53    }
54
55    pub fn len(&self) -> usize {
56        self.inner.load().len()
57    }
58
59    pub fn is_empty(&self) -> bool {
60        self.len() == 0
61    }
62}
63
64impl Default for ActivePacks {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70/// Runtime bundle for a tenant pack.
71pub struct TenantRuntime {
72    tenant: String,
73    config: Arc<HostConfig>,
74    packs: Vec<Arc<PackRuntime>>,
75    digests: Vec<Option<String>>,
76    engine: Arc<FlowEngine>,
77    state_machine: Arc<StateMachineRuntime>,
78    http_client: Client,
79    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
80    webhook_cache: Mutex<LruCache<String, Value>>,
81    messaging_rate: Mutex<RateLimiter>,
82    mocks: Option<Arc<MockLayer>>,
83    timer_handles: Mutex<Vec<JoinHandle<()>>>,
84    secrets: DynSecretsManager,
85}
86
87impl TenantRuntime {
88    #[allow(clippy::too_many_arguments)]
89    pub async fn load(
90        pack_path: &Path,
91        config: Arc<HostConfig>,
92        mocks: Option<Arc<MockLayer>>,
93        archive_source: Option<&Path>,
94        digest: Option<String>,
95        wasi_policy: Arc<RunnerWasiPolicy>,
96        session_host: Arc<dyn SessionHost>,
97        session_store: DynSessionStore,
98        state_store: DynStateStore,
99        state_host: Arc<dyn StateHost>,
100        secrets_manager: DynSecretsManager,
101    ) -> Result<Arc<Self>> {
102        let oauth_config = config.oauth_broker_config();
103        let pack = Arc::new(
104            PackRuntime::load(
105                pack_path,
106                Arc::clone(&config),
107                mocks.clone(),
108                archive_source,
109                Some(Arc::clone(&session_store)),
110                Some(Arc::clone(&state_store)),
111                Arc::clone(&wasi_policy),
112                Arc::clone(&secrets_manager),
113                oauth_config.clone(),
114                true,
115                ComponentResolution::default(),
116            )
117            .await
118            .with_context(|| {
119                format!(
120                    "failed to load pack {} for tenant {}",
121                    pack_path.display(),
122                    config.tenant
123                )
124            })?,
125        );
126        Self::from_packs(
127            config,
128            vec![(pack, digest)],
129            mocks,
130            session_host,
131            session_store,
132            state_store,
133            state_host,
134            secrets_manager,
135        )
136        .await
137    }
138
139    #[allow(clippy::too_many_arguments)]
140    pub async fn from_packs(
141        config: Arc<HostConfig>,
142        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
143        mocks: Option<Arc<MockLayer>>,
144        session_host: Arc<dyn SessionHost>,
145        session_store: DynSessionStore,
146        _state_store: DynStateStore,
147        state_host: Arc<dyn StateHost>,
148        secrets_manager: DynSecretsManager,
149    ) -> Result<Arc<Self>> {
150        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
151            .expect("telegram cache capacity must be > 0");
152        let webhook_capacity =
153            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
154        let pack_runtimes = packs
155            .iter()
156            .map(|(pack, _)| Arc::clone(pack))
157            .collect::<Vec<_>>();
158        let digests = packs
159            .iter()
160            .map(|(_, digest)| digest.clone())
161            .collect::<Vec<_>>();
162        let engine = Arc::new(
163            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
164                .await
165                .context("failed to prime flow engine")?,
166        );
167        let state_machine = Arc::new(
168            StateMachineRuntime::from_flow_engine(
169                Arc::clone(&config),
170                Arc::clone(&engine),
171                session_host,
172                session_store,
173                state_host,
174                Arc::clone(&secrets_manager),
175                mocks.clone(),
176            )
177            .context("failed to initialise state machine runtime")?,
178        );
179        let http_client = Client::builder().build()?;
180        let rate_limits = config.rate_limits.clone();
181        Ok(Arc::new(Self {
182            tenant: config.tenant.clone(),
183            config,
184            packs: pack_runtimes,
185            digests,
186            engine,
187            state_machine,
188            http_client,
189            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
190            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
191            messaging_rate: Mutex::new(RateLimiter::new(
192                rate_limits.messaging_send_qps,
193                rate_limits.messaging_burst,
194            )),
195            mocks,
196            timer_handles: Mutex::new(Vec::new()),
197            secrets: secrets_manager,
198        }))
199    }
200
201    pub fn tenant(&self) -> &str {
202        &self.tenant
203    }
204
205    pub fn config(&self) -> &Arc<HostConfig> {
206        &self.config
207    }
208
209    pub fn main_pack(&self) -> &Arc<PackRuntime> {
210        self.packs
211            .first()
212            .expect("tenant runtime must contain at least one pack")
213    }
214
215    pub fn pack(&self) -> Arc<PackRuntime> {
216        Arc::clone(self.main_pack())
217    }
218
219    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
220        self.packs.iter().skip(1).cloned().collect()
221    }
222
223    pub fn engine(&self) -> &Arc<FlowEngine> {
224        &self.engine
225    }
226
227    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
228        &self.state_machine
229    }
230
231    pub fn http_client(&self) -> &Client {
232        &self.http_client
233    }
234
235    pub fn digest(&self) -> Option<&str> {
236        self.digests.first().and_then(|d| d.as_deref())
237    }
238
239    pub fn overlay_digests(&self) -> Vec<Option<String>> {
240        self.digests.iter().skip(1).cloned().collect()
241    }
242
243    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
244        self.packs
245            .iter()
246            .flat_map(|pack| pack.required_secrets().iter().cloned())
247            .collect()
248    }
249
250    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
251        self.packs
252            .iter()
253            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
254            .collect()
255    }
256
257    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
258        &self.telegram_cache
259    }
260
261    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
262        &self.webhook_cache
263    }
264
265    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
266        &self.messaging_rate
267    }
268
269    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
270        self.mocks.as_ref()
271    }
272
273    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
274        self.timer_handles.lock().extend(handles);
275    }
276
277    pub fn get_secret(&self, key: &str) -> Result<String> {
278        if crate::provider_core_only::is_enabled() {
279            bail!(crate::provider_core_only::blocked_message("secrets"))
280        }
281        if !self.config.secrets_policy.is_allowed(key) {
282            bail!("secret {key} is not permitted by bindings policy");
283        }
284        let bytes = read_secret_blocking(&self.secrets, key)
285            .context("failed to read secret from manager")?;
286        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
287        Ok(value)
288    }
289}
290
291impl Drop for TenantRuntime {
292    fn drop(&mut self) {
293        for handle in self.timer_handles.lock().drain(..) {
294            handle.abort();
295        }
296    }
297}
298
299pub struct RateLimiter {
300    allowance: f64,
301    rate: f64,
302    burst: f64,
303    last_check: Instant,
304}
305
306impl RateLimiter {
307    pub fn new(qps: u32, burst: u32) -> Self {
308        let rate = qps.max(1) as f64;
309        let burst = burst.max(1) as f64;
310        Self {
311            allowance: burst,
312            rate,
313            burst,
314            last_check: Instant::now(),
315        }
316    }
317
318    pub fn try_acquire(&mut self) -> bool {
319        let now = Instant::now();
320        let elapsed = now.duration_since(self.last_check).as_secs_f64();
321        self.last_check = now;
322        self.allowance += elapsed * self.rate;
323        if self.allowance > self.burst {
324            self.allowance = self.burst;
325        }
326        if self.allowance < 1.0 {
327            false
328        } else {
329            self.allowance -= 1.0;
330            true
331        }
332    }
333}