Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::pack::{ComponentResolution, PackRuntime};
22use crate::runner::engine::FlowEngine;
23use crate::runner::mocks::MockLayer;
24use crate::secrets::{DynSecretsManager, read_secret_blocking};
25use crate::storage::session::DynSessionStore;
26use crate::storage::state::DynStateStore;
27use crate::trace::PackTraceInfo;
28use crate::wasi::RunnerWasiPolicy;
29use greentic_types::SecretRequirement;
30
31const TELEGRAM_CACHE_CAPACITY: usize = 1024;
32const WEBHOOK_CACHE_CAPACITY: usize = 256;
33
34/// Atomically swapped view of live tenant runtimes.
35pub struct ActivePacks {
36    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
37}
38
39impl ActivePacks {
40    pub fn new() -> Self {
41        Self {
42            inner: ArcSwap::from_pointee(HashMap::new()),
43        }
44    }
45
46    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
47        self.inner.load().get(tenant).cloned()
48    }
49
50    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
51        self.inner.load_full()
52    }
53
54    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
55        self.inner.store(Arc::new(next));
56    }
57
58    pub fn len(&self) -> usize {
59        self.inner.load().len()
60    }
61
62    pub fn is_empty(&self) -> bool {
63        self.len() == 0
64    }
65}
66
67impl Default for ActivePacks {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73/// Runtime bundle for a tenant pack.
74pub struct TenantRuntime {
75    tenant: String,
76    config: Arc<HostConfig>,
77    packs: Vec<Arc<PackRuntime>>,
78    digests: Vec<Option<String>>,
79    engine: Arc<FlowEngine>,
80    state_machine: Arc<StateMachineRuntime>,
81    http_client: Client,
82    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
83    webhook_cache: Mutex<LruCache<String, Value>>,
84    messaging_rate: Mutex<RateLimiter>,
85    mocks: Option<Arc<MockLayer>>,
86    timer_handles: Mutex<Vec<JoinHandle<()>>>,
87    secrets: DynSecretsManager,
88}
89
90/// Block on a future whether or not we're already inside a tokio runtime.
91pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
92    if let Ok(handle) = Handle::try_current() {
93        handle.block_on(future)
94    } else {
95        Runtime::new()
96            .expect("failed to create tokio runtime")
97            .block_on(future)
98    }
99}
100
101impl TenantRuntime {
102    #[allow(clippy::too_many_arguments)]
103    pub async fn load(
104        pack_path: &Path,
105        config: Arc<HostConfig>,
106        mocks: Option<Arc<MockLayer>>,
107        archive_source: Option<&Path>,
108        digest: Option<String>,
109        wasi_policy: Arc<RunnerWasiPolicy>,
110        session_host: Arc<dyn SessionHost>,
111        session_store: DynSessionStore,
112        state_store: DynStateStore,
113        state_host: Arc<dyn StateHost>,
114        secrets_manager: DynSecretsManager,
115    ) -> Result<Arc<Self>> {
116        let oauth_config = config.oauth_broker_config();
117        let pack = Arc::new(
118            PackRuntime::load(
119                pack_path,
120                Arc::clone(&config),
121                mocks.clone(),
122                archive_source,
123                Some(Arc::clone(&session_store)),
124                Some(Arc::clone(&state_store)),
125                Arc::clone(&wasi_policy),
126                Arc::clone(&secrets_manager),
127                oauth_config.clone(),
128                true,
129                ComponentResolution::default(),
130            )
131            .await
132            .with_context(|| {
133                format!(
134                    "failed to load pack {} for tenant {}",
135                    pack_path.display(),
136                    config.tenant
137                )
138            })?,
139        );
140        Self::from_packs(
141            config,
142            vec![(pack, digest)],
143            mocks,
144            session_host,
145            session_store,
146            state_store,
147            state_host,
148            secrets_manager,
149        )
150        .await
151    }
152
153    #[allow(clippy::too_many_arguments)]
154    pub async fn from_packs(
155        config: Arc<HostConfig>,
156        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
157        mocks: Option<Arc<MockLayer>>,
158        session_host: Arc<dyn SessionHost>,
159        session_store: DynSessionStore,
160        _state_store: DynStateStore,
161        state_host: Arc<dyn StateHost>,
162        secrets_manager: DynSecretsManager,
163    ) -> Result<Arc<Self>> {
164        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
165            .expect("telegram cache capacity must be > 0");
166        let webhook_capacity =
167            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
168        let pack_runtimes = packs
169            .iter()
170            .map(|(pack, _)| Arc::clone(pack))
171            .collect::<Vec<_>>();
172        let digests = packs
173            .iter()
174            .map(|(_, digest)| digest.clone())
175            .collect::<Vec<_>>();
176        let mut pack_trace = HashMap::new();
177        for (pack, digest) in &packs {
178            let pack_id = pack.metadata().pack_id.clone();
179            let pack_ref = config
180                .pack_bindings
181                .iter()
182                .find(|binding| binding.pack_id == pack_id)
183                .map(|binding| binding.pack_ref.clone())
184                .unwrap_or_else(|| pack_id.clone());
185            pack_trace.insert(
186                pack_id,
187                PackTraceInfo {
188                    pack_ref,
189                    resolved_digest: digest.clone(),
190                },
191            );
192        }
193        let engine = Arc::new(
194            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
195                .await
196                .context("failed to prime flow engine")?,
197        );
198        let state_machine = Arc::new(
199            StateMachineRuntime::from_flow_engine(
200                Arc::clone(&config),
201                Arc::clone(&engine),
202                pack_trace,
203                session_host,
204                session_store,
205                state_host,
206                Arc::clone(&secrets_manager),
207                mocks.clone(),
208            )
209            .context("failed to initialise state machine runtime")?,
210        );
211        let http_client = Client::builder().build()?;
212        let rate_limits = config.rate_limits.clone();
213        Ok(Arc::new(Self {
214            tenant: config.tenant.clone(),
215            config,
216            packs: pack_runtimes,
217            digests,
218            engine,
219            state_machine,
220            http_client,
221            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
222            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
223            messaging_rate: Mutex::new(RateLimiter::new(
224                rate_limits.messaging_send_qps,
225                rate_limits.messaging_burst,
226            )),
227            mocks,
228            timer_handles: Mutex::new(Vec::new()),
229            secrets: secrets_manager,
230        }))
231    }
232
233    pub fn tenant(&self) -> &str {
234        &self.tenant
235    }
236
237    pub fn config(&self) -> &Arc<HostConfig> {
238        &self.config
239    }
240
241    pub fn main_pack(&self) -> &Arc<PackRuntime> {
242        self.packs
243            .first()
244            .expect("tenant runtime must contain at least one pack")
245    }
246
247    pub fn pack(&self) -> Arc<PackRuntime> {
248        Arc::clone(self.main_pack())
249    }
250
251    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
252        self.packs.iter().skip(1).cloned().collect()
253    }
254
255    pub fn engine(&self) -> &Arc<FlowEngine> {
256        &self.engine
257    }
258
259    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
260        &self.state_machine
261    }
262
263    pub fn http_client(&self) -> &Client {
264        &self.http_client
265    }
266
267    pub fn digest(&self) -> Option<&str> {
268        self.digests.first().and_then(|d| d.as_deref())
269    }
270
271    pub fn overlay_digests(&self) -> Vec<Option<String>> {
272        self.digests.iter().skip(1).cloned().collect()
273    }
274
275    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
276        self.packs
277            .iter()
278            .flat_map(|pack| pack.required_secrets().iter().cloned())
279            .collect()
280    }
281
282    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
283        self.packs
284            .iter()
285            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
286            .collect()
287    }
288
289    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
290        &self.telegram_cache
291    }
292
293    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
294        &self.webhook_cache
295    }
296
297    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
298        &self.messaging_rate
299    }
300
301    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
302        self.mocks.as_ref()
303    }
304
305    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
306        self.timer_handles.lock().extend(handles);
307    }
308
309    pub fn get_secret(&self, key: &str) -> Result<String> {
310        if crate::provider_core_only::is_enabled() {
311            bail!(crate::provider_core_only::blocked_message("secrets"))
312        }
313        if !self.config.secrets_policy.is_allowed(key) {
314            bail!("secret {key} is not permitted by bindings policy");
315        }
316        let ctx = self.config.tenant_ctx();
317        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
318            .context("failed to read secret from manager")?;
319        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
320        Ok(value)
321    }
322}
323
324impl Drop for TenantRuntime {
325    fn drop(&mut self) {
326        for handle in self.timer_handles.lock().drain(..) {
327            handle.abort();
328        }
329    }
330}
331
332pub struct RateLimiter {
333    allowance: f64,
334    rate: f64,
335    burst: f64,
336    last_check: Instant,
337}
338
339impl RateLimiter {
340    pub fn new(qps: u32, burst: u32) -> Self {
341        let rate = qps.max(1) as f64;
342        let burst = burst.max(1) as f64;
343        Self {
344            allowance: burst,
345            rate,
346            burst,
347            last_check: Instant::now(),
348        }
349    }
350
351    pub fn try_acquire(&mut self) -> bool {
352        let now = Instant::now();
353        let elapsed = now.duration_since(self.last_check).as_secs_f64();
354        self.last_check = now;
355        self.allowance += elapsed * self.rate;
356        if self.allowance > self.burst {
357            self.allowance = self.burst;
358        }
359        if self.allowance < 1.0 {
360            false
361        } else {
362            self.allowance -= 1.0;
363            true
364        }
365    }
366}