greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::num::NonZeroUsize;
3use std::path::Path;
4use std::sync::Arc;
5use std::time::Instant;
6
7use anyhow::{Context, Result, bail};
8use arc_swap::ArcSwap;
9use axum::http::StatusCode;
10use lru::LruCache;
11use parking_lot::Mutex;
12use reqwest::Client;
13use serde_json::Value;
14use tokio::task::JoinHandle;
15
16use crate::config::HostConfig;
17use crate::engine::host::{SessionHost, StateHost};
18use crate::engine::runtime::StateMachineRuntime;
19use crate::pack::{ComponentResolution, PackRuntime};
20use crate::runner::engine::FlowEngine;
21use crate::runner::mocks::MockLayer;
22use crate::secrets::{DynSecretsManager, read_secret_blocking};
23use crate::storage::session::DynSessionStore;
24use crate::storage::state::DynStateStore;
25use crate::trace::PackTraceInfo;
26use crate::wasi::RunnerWasiPolicy;
27use greentic_types::SecretRequirement;
28
29const TELEGRAM_CACHE_CAPACITY: usize = 1024;
30const WEBHOOK_CACHE_CAPACITY: usize = 256;
31
32/// Atomically swapped view of live tenant runtimes.
33pub struct ActivePacks {
34    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
35}
36
37impl ActivePacks {
38    pub fn new() -> Self {
39        Self {
40            inner: ArcSwap::from_pointee(HashMap::new()),
41        }
42    }
43
44    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
45        self.inner.load().get(tenant).cloned()
46    }
47
48    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
49        self.inner.load_full()
50    }
51
52    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
53        self.inner.store(Arc::new(next));
54    }
55
56    pub fn len(&self) -> usize {
57        self.inner.load().len()
58    }
59
60    pub fn is_empty(&self) -> bool {
61        self.len() == 0
62    }
63}
64
65impl Default for ActivePacks {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71/// Runtime bundle for a tenant pack.
72pub struct TenantRuntime {
73    tenant: String,
74    config: Arc<HostConfig>,
75    packs: Vec<Arc<PackRuntime>>,
76    digests: Vec<Option<String>>,
77    engine: Arc<FlowEngine>,
78    state_machine: Arc<StateMachineRuntime>,
79    http_client: Client,
80    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
81    webhook_cache: Mutex<LruCache<String, Value>>,
82    messaging_rate: Mutex<RateLimiter>,
83    mocks: Option<Arc<MockLayer>>,
84    timer_handles: Mutex<Vec<JoinHandle<()>>>,
85    secrets: DynSecretsManager,
86}
87
88impl TenantRuntime {
89    #[allow(clippy::too_many_arguments)]
90    pub async fn load(
91        pack_path: &Path,
92        config: Arc<HostConfig>,
93        mocks: Option<Arc<MockLayer>>,
94        archive_source: Option<&Path>,
95        digest: Option<String>,
96        wasi_policy: Arc<RunnerWasiPolicy>,
97        session_host: Arc<dyn SessionHost>,
98        session_store: DynSessionStore,
99        state_store: DynStateStore,
100        state_host: Arc<dyn StateHost>,
101        secrets_manager: DynSecretsManager,
102    ) -> Result<Arc<Self>> {
103        let oauth_config = config.oauth_broker_config();
104        let pack = Arc::new(
105            PackRuntime::load(
106                pack_path,
107                Arc::clone(&config),
108                mocks.clone(),
109                archive_source,
110                Some(Arc::clone(&session_store)),
111                Some(Arc::clone(&state_store)),
112                Arc::clone(&wasi_policy),
113                Arc::clone(&secrets_manager),
114                oauth_config.clone(),
115                true,
116                ComponentResolution::default(),
117            )
118            .await
119            .with_context(|| {
120                format!(
121                    "failed to load pack {} for tenant {}",
122                    pack_path.display(),
123                    config.tenant
124                )
125            })?,
126        );
127        Self::from_packs(
128            config,
129            vec![(pack, digest)],
130            mocks,
131            session_host,
132            session_store,
133            state_store,
134            state_host,
135            secrets_manager,
136        )
137        .await
138    }
139
140    #[allow(clippy::too_many_arguments)]
141    pub async fn from_packs(
142        config: Arc<HostConfig>,
143        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
144        mocks: Option<Arc<MockLayer>>,
145        session_host: Arc<dyn SessionHost>,
146        session_store: DynSessionStore,
147        _state_store: DynStateStore,
148        state_host: Arc<dyn StateHost>,
149        secrets_manager: DynSecretsManager,
150    ) -> Result<Arc<Self>> {
151        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
152            .expect("telegram cache capacity must be > 0");
153        let webhook_capacity =
154            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
155        let pack_runtimes = packs
156            .iter()
157            .map(|(pack, _)| Arc::clone(pack))
158            .collect::<Vec<_>>();
159        let digests = packs
160            .iter()
161            .map(|(_, digest)| digest.clone())
162            .collect::<Vec<_>>();
163        let mut pack_trace = HashMap::new();
164        for (pack, digest) in &packs {
165            let pack_id = pack.metadata().pack_id.clone();
166            let pack_ref = config
167                .pack_bindings
168                .iter()
169                .find(|binding| binding.pack_id == pack_id)
170                .map(|binding| binding.pack_ref.clone())
171                .unwrap_or_else(|| pack_id.clone());
172            pack_trace.insert(
173                pack_id,
174                PackTraceInfo {
175                    pack_ref,
176                    resolved_digest: digest.clone(),
177                },
178            );
179        }
180        let engine = Arc::new(
181            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
182                .await
183                .context("failed to prime flow engine")?,
184        );
185        let state_machine = Arc::new(
186            StateMachineRuntime::from_flow_engine(
187                Arc::clone(&config),
188                Arc::clone(&engine),
189                pack_trace,
190                session_host,
191                session_store,
192                state_host,
193                Arc::clone(&secrets_manager),
194                mocks.clone(),
195            )
196            .context("failed to initialise state machine runtime")?,
197        );
198        let http_client = Client::builder().build()?;
199        let rate_limits = config.rate_limits.clone();
200        Ok(Arc::new(Self {
201            tenant: config.tenant.clone(),
202            config,
203            packs: pack_runtimes,
204            digests,
205            engine,
206            state_machine,
207            http_client,
208            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
209            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
210            messaging_rate: Mutex::new(RateLimiter::new(
211                rate_limits.messaging_send_qps,
212                rate_limits.messaging_burst,
213            )),
214            mocks,
215            timer_handles: Mutex::new(Vec::new()),
216            secrets: secrets_manager,
217        }))
218    }
219
220    pub fn tenant(&self) -> &str {
221        &self.tenant
222    }
223
224    pub fn config(&self) -> &Arc<HostConfig> {
225        &self.config
226    }
227
228    pub fn main_pack(&self) -> &Arc<PackRuntime> {
229        self.packs
230            .first()
231            .expect("tenant runtime must contain at least one pack")
232    }
233
234    pub fn pack(&self) -> Arc<PackRuntime> {
235        Arc::clone(self.main_pack())
236    }
237
238    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
239        self.packs.iter().skip(1).cloned().collect()
240    }
241
242    pub fn engine(&self) -> &Arc<FlowEngine> {
243        &self.engine
244    }
245
246    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
247        &self.state_machine
248    }
249
250    pub fn http_client(&self) -> &Client {
251        &self.http_client
252    }
253
254    pub fn digest(&self) -> Option<&str> {
255        self.digests.first().and_then(|d| d.as_deref())
256    }
257
258    pub fn overlay_digests(&self) -> Vec<Option<String>> {
259        self.digests.iter().skip(1).cloned().collect()
260    }
261
262    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
263        self.packs
264            .iter()
265            .flat_map(|pack| pack.required_secrets().iter().cloned())
266            .collect()
267    }
268
269    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
270        self.packs
271            .iter()
272            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
273            .collect()
274    }
275
276    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
277        &self.telegram_cache
278    }
279
280    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
281        &self.webhook_cache
282    }
283
284    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
285        &self.messaging_rate
286    }
287
288    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
289        self.mocks.as_ref()
290    }
291
292    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
293        self.timer_handles.lock().extend(handles);
294    }
295
296    pub fn get_secret(&self, key: &str) -> Result<String> {
297        if crate::provider_core_only::is_enabled() {
298            bail!(crate::provider_core_only::blocked_message("secrets"))
299        }
300        if !self.config.secrets_policy.is_allowed(key) {
301            bail!("secret {key} is not permitted by bindings policy");
302        }
303        let ctx = self.config.tenant_ctx();
304        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
305            .context("failed to read secret from manager")?;
306        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
307        Ok(value)
308    }
309}
310
311impl Drop for TenantRuntime {
312    fn drop(&mut self) {
313        for handle in self.timer_handles.lock().drain(..) {
314            handle.abort();
315        }
316    }
317}
318
319pub struct RateLimiter {
320    allowance: f64,
321    rate: f64,
322    burst: f64,
323    last_check: Instant,
324}
325
326impl RateLimiter {
327    pub fn new(qps: u32, burst: u32) -> Self {
328        let rate = qps.max(1) as f64;
329        let burst = burst.max(1) as f64;
330        Self {
331            allowance: burst,
332            rate,
333            burst,
334            last_check: Instant::now(),
335        }
336    }
337
338    pub fn try_acquire(&mut self) -> bool {
339        let now = Instant::now();
340        let elapsed = now.duration_since(self.last_check).as_secs_f64();
341        self.last_check = now;
342        self.allowance += elapsed * self.rate;
343        if self.allowance > self.burst {
344            self.allowance = self.burst;
345        }
346        if self.allowance < 1.0 {
347            false
348        } else {
349            self.allowance -= 1.0;
350            true
351        }
352    }
353}