Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, anyhow, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::oauth::{OAuthBrokerConfig, request_resource_token};
22use crate::operator_metrics::OperatorMetrics;
23use crate::operator_registry::OperatorRegistry;
24use crate::pack::{ComponentResolution, PackRuntime};
25use crate::runner::adapt_events_email::{
26    EmailExecutionPlan, EmailSendRequest, build_email_execution_plan, execute_email_request,
27};
28use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
29use crate::runner::engine::FlowEngine;
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, read_secret_blocking};
32use crate::storage::session::DynSessionStore;
33use crate::storage::state::DynStateStore;
34use crate::trace::PackTraceInfo;
35use crate::wasi::RunnerWasiPolicy;
36use greentic_types::SecretRequirement;
37
38const TELEGRAM_CACHE_CAPACITY: usize = 1024;
39const WEBHOOK_CACHE_CAPACITY: usize = 256;
40const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
41
42/// Atomically swapped view of live tenant runtimes.
43pub struct ActivePacks {
44    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
45}
46
47impl ActivePacks {
48    pub fn new() -> Self {
49        Self {
50            inner: ArcSwap::from_pointee(HashMap::new()),
51        }
52    }
53
54    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
55        self.inner.load().get(tenant).cloned()
56    }
57
58    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
59        self.inner.load_full()
60    }
61
62    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
63        self.inner.store(Arc::new(next));
64    }
65
66    pub fn len(&self) -> usize {
67        self.inner.load().len()
68    }
69
70    pub fn is_empty(&self) -> bool {
71        self.len() == 0
72    }
73}
74
75impl Default for ActivePacks {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81/// Runtime bundle for a tenant pack.
82pub struct TenantRuntime {
83    tenant: String,
84    config: Arc<HostConfig>,
85    packs: Vec<Arc<PackRuntime>>,
86    digests: Vec<Option<String>>,
87    engine: Arc<FlowEngine>,
88    state_machine: Arc<StateMachineRuntime>,
89    http_client: Client,
90    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
91    webhook_cache: Mutex<LruCache<String, Value>>,
92    messaging_rate: Mutex<RateLimiter>,
93    mocks: Option<Arc<MockLayer>>,
94    timer_handles: Mutex<Vec<JoinHandle<()>>>,
95    secrets: DynSecretsManager,
96    operator_registry: OperatorRegistry,
97    operator_metrics: Arc<OperatorMetrics>,
98    contract_cache: ContractCache,
99}
100
101#[derive(Clone)]
102pub struct ResolvedComponent {
103    pub digest: String,
104    pub component_ref: String,
105    pub pack: Arc<PackRuntime>,
106}
107
108/// Block on a future whether or not we're already inside a tokio runtime.
109pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
110    if let Ok(handle) = Handle::try_current() {
111        handle.block_on(future)
112    } else {
113        Runtime::new()
114            .expect("failed to create tokio runtime")
115            .block_on(future)
116    }
117}
118
119impl TenantRuntime {
120    #[allow(clippy::too_many_arguments)]
121    pub async fn load(
122        pack_path: &Path,
123        config: Arc<HostConfig>,
124        mocks: Option<Arc<MockLayer>>,
125        archive_source: Option<&Path>,
126        digest: Option<String>,
127        wasi_policy: Arc<RunnerWasiPolicy>,
128        session_host: Arc<dyn SessionHost>,
129        session_store: DynSessionStore,
130        state_store: DynStateStore,
131        state_host: Arc<dyn StateHost>,
132        secrets_manager: DynSecretsManager,
133    ) -> Result<Arc<Self>> {
134        let oauth_config = config.oauth_broker_config();
135        let pack = Arc::new(
136            PackRuntime::load(
137                pack_path,
138                Arc::clone(&config),
139                mocks.clone(),
140                archive_source,
141                Some(Arc::clone(&session_store)),
142                Some(Arc::clone(&state_store)),
143                Arc::clone(&wasi_policy),
144                Arc::clone(&secrets_manager),
145                oauth_config.clone(),
146                true,
147                ComponentResolution::default(),
148            )
149            .await
150            .with_context(|| {
151                format!(
152                    "failed to load pack {} for tenant {}",
153                    pack_path.display(),
154                    config.tenant
155                )
156            })?,
157        );
158        Self::from_packs(
159            config,
160            vec![(pack, digest)],
161            mocks,
162            session_host,
163            session_store,
164            state_store,
165            state_host,
166            secrets_manager,
167        )
168        .await
169    }
170
171    #[allow(clippy::too_many_arguments)]
172    pub async fn from_packs(
173        config: Arc<HostConfig>,
174        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
175        mocks: Option<Arc<MockLayer>>,
176        session_host: Arc<dyn SessionHost>,
177        session_store: DynSessionStore,
178        _state_store: DynStateStore,
179        state_host: Arc<dyn StateHost>,
180        secrets_manager: DynSecretsManager,
181    ) -> Result<Arc<Self>> {
182        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
183            .expect("telegram cache capacity must be > 0");
184        let webhook_capacity =
185            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
186        let operator_registry = OperatorRegistry::build(&packs)?;
187        let operator_metrics = Arc::new(OperatorMetrics::default());
188        let pack_runtimes = packs
189            .iter()
190            .map(|(pack, _)| Arc::clone(pack))
191            .collect::<Vec<_>>();
192        let digests = packs
193            .iter()
194            .map(|(_, digest)| digest.clone())
195            .collect::<Vec<_>>();
196        let mut pack_trace = HashMap::new();
197        for (pack, digest) in &packs {
198            let pack_id = pack.metadata().pack_id.clone();
199            let pack_ref = config
200                .pack_bindings
201                .iter()
202                .find(|binding| binding.pack_id == pack_id)
203                .map(|binding| binding.pack_ref.clone())
204                .unwrap_or_else(|| pack_id.clone());
205            pack_trace.insert(
206                pack_id,
207                PackTraceInfo {
208                    pack_ref,
209                    resolved_digest: digest.clone(),
210                },
211            );
212        }
213        let engine = Arc::new(
214            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
215                .await
216                .context("failed to prime flow engine")?,
217        );
218        let state_machine = Arc::new(
219            StateMachineRuntime::from_flow_engine(
220                Arc::clone(&config),
221                Arc::clone(&engine),
222                pack_trace,
223                session_host,
224                session_store,
225                state_host,
226                Arc::clone(&secrets_manager),
227                mocks.clone(),
228            )
229            .context("failed to initialise state machine runtime")?,
230        );
231        let http_client = Client::builder().build()?;
232        let rate_limits = config.rate_limits.clone();
233        Ok(Arc::new(Self {
234            tenant: config.tenant.clone(),
235            config,
236            packs: pack_runtimes,
237            digests,
238            engine,
239            state_machine,
240            http_client,
241            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
242            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
243            messaging_rate: Mutex::new(RateLimiter::new(
244                rate_limits.messaging_send_qps,
245                rate_limits.messaging_burst,
246            )),
247            mocks,
248            timer_handles: Mutex::new(Vec::new()),
249            secrets: secrets_manager,
250            operator_registry,
251            operator_metrics,
252            contract_cache: ContractCache::from_env(),
253        }))
254    }
255
256    pub fn tenant(&self) -> &str {
257        &self.tenant
258    }
259
260    pub fn config(&self) -> &Arc<HostConfig> {
261        &self.config
262    }
263
264    pub fn operator_registry(&self) -> &OperatorRegistry {
265        &self.operator_registry
266    }
267
268    pub fn operator_metrics(&self) -> &OperatorMetrics {
269        &self.operator_metrics
270    }
271
272    pub fn contract_cache(&self) -> &ContractCache {
273        &self.contract_cache
274    }
275
276    pub fn contract_cache_stats(&self) -> ContractCacheStats {
277        self.contract_cache.stats()
278    }
279
280    pub fn main_pack(&self) -> &Arc<PackRuntime> {
281        self.packs
282            .first()
283            .expect("tenant runtime must contain at least one pack")
284    }
285
286    pub fn pack(&self) -> Arc<PackRuntime> {
287        Arc::clone(self.main_pack())
288    }
289
290    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
291        self.packs.iter().skip(1).cloned().collect()
292    }
293
294    pub fn engine(&self) -> &Arc<FlowEngine> {
295        &self.engine
296    }
297
298    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
299        &self.state_machine
300    }
301
302    pub fn http_client(&self) -> &Client {
303        &self.http_client
304    }
305
306    pub fn oauth_config(&self) -> Option<OAuthBrokerConfig> {
307        self.config.oauth_broker_config()
308    }
309
310    pub fn digest(&self) -> Option<&str> {
311        self.digests.first().and_then(|d| d.as_deref())
312    }
313
314    pub fn overlay_digests(&self) -> Vec<Option<String>> {
315        self.digests.iter().skip(1).cloned().collect()
316    }
317
318    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
319        self.packs
320            .iter()
321            .flat_map(|pack| pack.required_secrets().iter().cloned())
322            .collect()
323    }
324
325    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
326        self.packs
327            .iter()
328            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
329            .collect()
330    }
331
332    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
333        &self.telegram_cache
334    }
335
336    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
337        &self.webhook_cache
338    }
339
340    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
341        &self.messaging_rate
342    }
343
344    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
345        self.mocks.as_ref()
346    }
347
348    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
349        self.timer_handles.lock().extend(handles);
350    }
351
352    pub fn get_secret(&self, key: &str) -> Result<String> {
353        if crate::provider_core_only::is_enabled() {
354            bail!(crate::provider_core_only::blocked_message("secrets"))
355        }
356        if !self.config.secrets_policy.is_allowed(key) {
357            bail!("secret {key} is not permitted by bindings policy");
358        }
359        let ctx = self.config.tenant_ctx();
360        let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
361            .context("failed to read secret from manager")?;
362        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
363        Ok(value)
364    }
365
366    pub fn build_events_email_execution_plan(
367        &self,
368        tenant: &greentic_types::TenantCtx,
369        request: &EmailSendRequest,
370    ) -> Result<EmailExecutionPlan> {
371        let oauth = self
372            .oauth_config()
373            .ok_or_else(|| anyhow!("oauth broker config is not configured for tenant runtime"))?;
374        build_email_execution_plan(&oauth, tenant, request)
375    }
376
377    pub async fn execute_events_email_request(
378        &self,
379        access_token: &str,
380        request: &EmailSendRequest,
381    ) -> Result<()> {
382        execute_email_request(self.http_client(), access_token, request).await
383    }
384
385    pub async fn execute_events_email_with_oauth(
386        &self,
387        tenant: &greentic_types::TenantCtx,
388        request: &EmailSendRequest,
389    ) -> Result<()> {
390        let plan = self.build_events_email_execution_plan(tenant, request)?;
391        let token = request_resource_token(self.http_client(), &plan.token_request).await?;
392        self.execute_events_email_request(&token.access_token, request)
393            .await
394    }
395
396    pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
397        self.packs
398            .iter()
399            .find(|pack| pack.contains_component(component_ref))
400            .cloned()
401    }
402
403    pub fn pack_for_component_with_digest(
404        &self,
405        component_ref: &str,
406    ) -> Option<(Arc<PackRuntime>, Option<String>)> {
407        self.packs
408            .iter()
409            .zip(self.digests.iter())
410            .find(|(pack, _)| pack.contains_component(component_ref))
411            .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
412    }
413
414    pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
415        self.pack_for_component_with_digest(component_ref)
416            .map(|(pack, digest)| ResolvedComponent {
417                digest: digest
418                    .or_else(|| self.digest().map(ToString::to_string))
419                    .unwrap_or_else(|| "unknown".to_string()),
420                component_ref: component_ref.to_string(),
421                pack,
422            })
423    }
424}
425
426impl Drop for TenantRuntime {
427    fn drop(&mut self) {
428        for handle in self.timer_handles.lock().drain(..) {
429            handle.abort();
430        }
431    }
432}
433
434pub struct RateLimiter {
435    allowance: f64,
436    rate: f64,
437    burst: f64,
438    last_check: Instant,
439}
440
441impl RateLimiter {
442    pub fn new(qps: u32, burst: u32) -> Self {
443        let rate = qps.max(1) as f64;
444        let burst = burst.max(1) as f64;
445        Self {
446            allowance: burst,
447            rate,
448            burst,
449            last_check: Instant::now(),
450        }
451    }
452
453    pub fn try_acquire(&mut self) -> bool {
454        let now = Instant::now();
455        let elapsed = now.duration_since(self.last_check).as_secs_f64();
456        self.last_check = now;
457        self.allowance += elapsed * self.rate;
458        if self.allowance > self.burst {
459            self.allowance = self.burst;
460        }
461        if self.allowance < 1.0 {
462            false
463        } else {
464            self.allowance -= 1.0;
465            true
466        }
467    }
468}