Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
25use crate::runner::engine::FlowEngine;
26use crate::runner::mocks::MockLayer;
27use crate::secrets::{DynSecretsManager, read_secret_blocking};
28use crate::storage::session::DynSessionStore;
29use crate::storage::state::DynStateStore;
30use crate::trace::PackTraceInfo;
31use crate::wasi::RunnerWasiPolicy;
32use greentic_types::SecretRequirement;
33
34const TELEGRAM_CACHE_CAPACITY: usize = 1024;
35const WEBHOOK_CACHE_CAPACITY: usize = 256;
36const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
37
38/// Atomically swapped view of live tenant runtimes.
39pub struct ActivePacks {
40    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
41}
42
43impl ActivePacks {
44    pub fn new() -> Self {
45        Self {
46            inner: ArcSwap::from_pointee(HashMap::new()),
47        }
48    }
49
50    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
51        self.inner.load().get(tenant).cloned()
52    }
53
54    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
55        self.inner.load_full()
56    }
57
58    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
59        self.inner.store(Arc::new(next));
60    }
61
62    pub fn len(&self) -> usize {
63        self.inner.load().len()
64    }
65
66    pub fn is_empty(&self) -> bool {
67        self.len() == 0
68    }
69}
70
71impl Default for ActivePacks {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77/// Runtime bundle for a tenant pack.
78pub struct TenantRuntime {
79    tenant: String,
80    config: Arc<HostConfig>,
81    packs: Vec<Arc<PackRuntime>>,
82    digests: Vec<Option<String>>,
83    engine: Arc<FlowEngine>,
84    state_machine: Arc<StateMachineRuntime>,
85    http_client: Client,
86    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
87    webhook_cache: Mutex<LruCache<String, Value>>,
88    messaging_rate: Mutex<RateLimiter>,
89    mocks: Option<Arc<MockLayer>>,
90    timer_handles: Mutex<Vec<JoinHandle<()>>>,
91    secrets: DynSecretsManager,
92    operator_registry: OperatorRegistry,
93    operator_metrics: Arc<OperatorMetrics>,
94    contract_cache: ContractCache,
95}
96
97#[derive(Clone)]
98pub struct ResolvedComponent {
99    pub digest: String,
100    pub component_ref: String,
101    pub pack: Arc<PackRuntime>,
102}
103
104/// Block on a future whether or not we're already inside a tokio runtime.
105pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
106    if let Ok(handle) = Handle::try_current() {
107        handle.block_on(future)
108    } else {
109        Runtime::new()
110            .expect("failed to create tokio runtime")
111            .block_on(future)
112    }
113}
114
115impl TenantRuntime {
116    #[allow(clippy::too_many_arguments)]
117    pub async fn load(
118        pack_path: &Path,
119        config: Arc<HostConfig>,
120        mocks: Option<Arc<MockLayer>>,
121        archive_source: Option<&Path>,
122        digest: Option<String>,
123        wasi_policy: Arc<RunnerWasiPolicy>,
124        session_host: Arc<dyn SessionHost>,
125        session_store: DynSessionStore,
126        state_store: DynStateStore,
127        state_host: Arc<dyn StateHost>,
128        secrets_manager: DynSecretsManager,
129    ) -> Result<Arc<Self>> {
130        let oauth_config = config.oauth_broker_config();
131        let pack = Arc::new(
132            PackRuntime::load(
133                pack_path,
134                Arc::clone(&config),
135                mocks.clone(),
136                archive_source,
137                Some(Arc::clone(&session_store)),
138                Some(Arc::clone(&state_store)),
139                Arc::clone(&wasi_policy),
140                Arc::clone(&secrets_manager),
141                oauth_config.clone(),
142                true,
143                ComponentResolution::default(),
144            )
145            .await
146            .with_context(|| {
147                format!(
148                    "failed to load pack {} for tenant {}",
149                    pack_path.display(),
150                    config.tenant
151                )
152            })?,
153        );
154        Self::from_packs(
155            config,
156            vec![(pack, digest)],
157            mocks,
158            session_host,
159            session_store,
160            state_store,
161            state_host,
162            secrets_manager,
163        )
164        .await
165    }
166
167    #[allow(clippy::too_many_arguments)]
168    pub async fn from_packs(
169        config: Arc<HostConfig>,
170        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
171        mocks: Option<Arc<MockLayer>>,
172        session_host: Arc<dyn SessionHost>,
173        session_store: DynSessionStore,
174        _state_store: DynStateStore,
175        state_host: Arc<dyn StateHost>,
176        secrets_manager: DynSecretsManager,
177    ) -> Result<Arc<Self>> {
178        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
179            .expect("telegram cache capacity must be > 0");
180        let webhook_capacity =
181            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
182        let operator_registry = OperatorRegistry::build(&packs)?;
183        let operator_metrics = Arc::new(OperatorMetrics::default());
184        let pack_runtimes = packs
185            .iter()
186            .map(|(pack, _)| Arc::clone(pack))
187            .collect::<Vec<_>>();
188        let digests = packs
189            .iter()
190            .map(|(_, digest)| digest.clone())
191            .collect::<Vec<_>>();
192        let mut pack_trace = HashMap::new();
193        for (pack, digest) in &packs {
194            let pack_id = pack.metadata().pack_id.clone();
195            let pack_ref = config
196                .pack_bindings
197                .iter()
198                .find(|binding| binding.pack_id == pack_id)
199                .map(|binding| binding.pack_ref.clone())
200                .unwrap_or_else(|| pack_id.clone());
201            pack_trace.insert(
202                pack_id,
203                PackTraceInfo {
204                    pack_ref,
205                    resolved_digest: digest.clone(),
206                },
207            );
208        }
209        let engine = Arc::new(
210            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
211                .await
212                .context("failed to prime flow engine")?,
213        );
214        let state_machine = Arc::new(
215            StateMachineRuntime::from_flow_engine(
216                Arc::clone(&config),
217                Arc::clone(&engine),
218                pack_trace,
219                session_host,
220                session_store,
221                state_host,
222                Arc::clone(&secrets_manager),
223                mocks.clone(),
224            )
225            .context("failed to initialise state machine runtime")?,
226        );
227        let http_client = Client::builder().build()?;
228        let rate_limits = config.rate_limits.clone();
229        Ok(Arc::new(Self {
230            tenant: config.tenant.clone(),
231            config,
232            packs: pack_runtimes,
233            digests,
234            engine,
235            state_machine,
236            http_client,
237            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
238            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
239            messaging_rate: Mutex::new(RateLimiter::new(
240                rate_limits.messaging_send_qps,
241                rate_limits.messaging_burst,
242            )),
243            mocks,
244            timer_handles: Mutex::new(Vec::new()),
245            secrets: secrets_manager,
246            operator_registry,
247            operator_metrics,
248            contract_cache: ContractCache::from_env(),
249        }))
250    }
251
252    pub fn tenant(&self) -> &str {
253        &self.tenant
254    }
255
256    pub fn config(&self) -> &Arc<HostConfig> {
257        &self.config
258    }
259
260    pub fn operator_registry(&self) -> &OperatorRegistry {
261        &self.operator_registry
262    }
263
264    pub fn operator_metrics(&self) -> &OperatorMetrics {
265        &self.operator_metrics
266    }
267
268    pub fn contract_cache(&self) -> &ContractCache {
269        &self.contract_cache
270    }
271
272    pub fn contract_cache_stats(&self) -> ContractCacheStats {
273        self.contract_cache.stats()
274    }
275
276    pub fn main_pack(&self) -> &Arc<PackRuntime> {
277        self.packs
278            .first()
279            .expect("tenant runtime must contain at least one pack")
280    }
281
282    pub fn pack(&self) -> Arc<PackRuntime> {
283        Arc::clone(self.main_pack())
284    }
285
286    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
287        self.packs.iter().skip(1).cloned().collect()
288    }
289
290    pub fn engine(&self) -> &Arc<FlowEngine> {
291        &self.engine
292    }
293
294    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
295        &self.state_machine
296    }
297
298    pub fn http_client(&self) -> &Client {
299        &self.http_client
300    }
301
302    pub fn digest(&self) -> Option<&str> {
303        self.digests.first().and_then(|d| d.as_deref())
304    }
305
306    pub fn overlay_digests(&self) -> Vec<Option<String>> {
307        self.digests.iter().skip(1).cloned().collect()
308    }
309
310    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
311        self.packs
312            .iter()
313            .flat_map(|pack| pack.required_secrets().iter().cloned())
314            .collect()
315    }
316
317    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
318        self.packs
319            .iter()
320            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
321            .collect()
322    }
323
324    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
325        &self.telegram_cache
326    }
327
328    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
329        &self.webhook_cache
330    }
331
332    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
333        &self.messaging_rate
334    }
335
336    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
337        self.mocks.as_ref()
338    }
339
340    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
341        self.timer_handles.lock().extend(handles);
342    }
343
344    pub fn get_secret(&self, key: &str) -> Result<String> {
345        if crate::provider_core_only::is_enabled() {
346            bail!(crate::provider_core_only::blocked_message("secrets"))
347        }
348        if !self.config.secrets_policy.is_allowed(key) {
349            bail!("secret {key} is not permitted by bindings policy");
350        }
351        let ctx = self.config.tenant_ctx();
352        let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
353            .context("failed to read secret from manager")?;
354        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
355        Ok(value)
356    }
357
358    pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
359        self.packs
360            .iter()
361            .find(|pack| pack.contains_component(component_ref))
362            .cloned()
363    }
364
365    pub fn pack_for_component_with_digest(
366        &self,
367        component_ref: &str,
368    ) -> Option<(Arc<PackRuntime>, Option<String>)> {
369        self.packs
370            .iter()
371            .zip(self.digests.iter())
372            .find(|(pack, _)| pack.contains_component(component_ref))
373            .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
374    }
375
376    pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
377        self.pack_for_component_with_digest(component_ref)
378            .map(|(pack, digest)| ResolvedComponent {
379                digest: digest
380                    .or_else(|| self.digest().map(ToString::to_string))
381                    .unwrap_or_else(|| "unknown".to_string()),
382                component_ref: component_ref.to_string(),
383                pack,
384            })
385    }
386}
387
388impl Drop for TenantRuntime {
389    fn drop(&mut self) {
390        for handle in self.timer_handles.lock().drain(..) {
391            handle.abort();
392        }
393    }
394}
395
396pub struct RateLimiter {
397    allowance: f64,
398    rate: f64,
399    burst: f64,
400    last_check: Instant,
401}
402
403impl RateLimiter {
404    pub fn new(qps: u32, burst: u32) -> Self {
405        let rate = qps.max(1) as f64;
406        let burst = burst.max(1) as f64;
407        Self {
408            allowance: burst,
409            rate,
410            burst,
411            last_check: Instant::now(),
412        }
413    }
414
415    pub fn try_acquire(&mut self) -> bool {
416        let now = Instant::now();
417        let elapsed = now.duration_since(self.last_check).as_secs_f64();
418        self.last_check = now;
419        self.allowance += elapsed * self.rate;
420        if self.allowance > self.burst {
421            self.allowance = self.burst;
422        }
423        if self.allowance < 1.0 {
424            false
425        } else {
426            self.allowance -= 1.0;
427            true
428        }
429    }
430}