Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::engine::FlowEngine;
25use crate::runner::mocks::MockLayer;
26use crate::secrets::{DynSecretsManager, read_secret_blocking};
27use crate::storage::session::DynSessionStore;
28use crate::storage::state::DynStateStore;
29use crate::trace::PackTraceInfo;
30use crate::wasi::RunnerWasiPolicy;
31use greentic_types::SecretRequirement;
32
33const TELEGRAM_CACHE_CAPACITY: usize = 1024;
34const WEBHOOK_CACHE_CAPACITY: usize = 256;
35
36/// Atomically swapped view of live tenant runtimes.
37pub struct ActivePacks {
38    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
39}
40
41impl ActivePacks {
42    pub fn new() -> Self {
43        Self {
44            inner: ArcSwap::from_pointee(HashMap::new()),
45        }
46    }
47
48    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
49        self.inner.load().get(tenant).cloned()
50    }
51
52    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
53        self.inner.load_full()
54    }
55
56    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
57        self.inner.store(Arc::new(next));
58    }
59
60    pub fn len(&self) -> usize {
61        self.inner.load().len()
62    }
63
64    pub fn is_empty(&self) -> bool {
65        self.len() == 0
66    }
67}
68
69impl Default for ActivePacks {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75/// Runtime bundle for a tenant pack.
76pub struct TenantRuntime {
77    tenant: String,
78    config: Arc<HostConfig>,
79    packs: Vec<Arc<PackRuntime>>,
80    digests: Vec<Option<String>>,
81    engine: Arc<FlowEngine>,
82    state_machine: Arc<StateMachineRuntime>,
83    http_client: Client,
84    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
85    webhook_cache: Mutex<LruCache<String, Value>>,
86    messaging_rate: Mutex<RateLimiter>,
87    mocks: Option<Arc<MockLayer>>,
88    timer_handles: Mutex<Vec<JoinHandle<()>>>,
89    secrets: DynSecretsManager,
90    operator_registry: OperatorRegistry,
91    operator_metrics: Arc<OperatorMetrics>,
92}
93
94/// Block on a future whether or not we're already inside a tokio runtime.
95pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
96    if let Ok(handle) = Handle::try_current() {
97        handle.block_on(future)
98    } else {
99        Runtime::new()
100            .expect("failed to create tokio runtime")
101            .block_on(future)
102    }
103}
104
105impl TenantRuntime {
106    #[allow(clippy::too_many_arguments)]
107    pub async fn load(
108        pack_path: &Path,
109        config: Arc<HostConfig>,
110        mocks: Option<Arc<MockLayer>>,
111        archive_source: Option<&Path>,
112        digest: Option<String>,
113        wasi_policy: Arc<RunnerWasiPolicy>,
114        session_host: Arc<dyn SessionHost>,
115        session_store: DynSessionStore,
116        state_store: DynStateStore,
117        state_host: Arc<dyn StateHost>,
118        secrets_manager: DynSecretsManager,
119    ) -> Result<Arc<Self>> {
120        let oauth_config = config.oauth_broker_config();
121        let pack = Arc::new(
122            PackRuntime::load(
123                pack_path,
124                Arc::clone(&config),
125                mocks.clone(),
126                archive_source,
127                Some(Arc::clone(&session_store)),
128                Some(Arc::clone(&state_store)),
129                Arc::clone(&wasi_policy),
130                Arc::clone(&secrets_manager),
131                oauth_config.clone(),
132                true,
133                ComponentResolution::default(),
134            )
135            .await
136            .with_context(|| {
137                format!(
138                    "failed to load pack {} for tenant {}",
139                    pack_path.display(),
140                    config.tenant
141                )
142            })?,
143        );
144        Self::from_packs(
145            config,
146            vec![(pack, digest)],
147            mocks,
148            session_host,
149            session_store,
150            state_store,
151            state_host,
152            secrets_manager,
153        )
154        .await
155    }
156
157    #[allow(clippy::too_many_arguments)]
158    pub async fn from_packs(
159        config: Arc<HostConfig>,
160        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
161        mocks: Option<Arc<MockLayer>>,
162        session_host: Arc<dyn SessionHost>,
163        session_store: DynSessionStore,
164        _state_store: DynStateStore,
165        state_host: Arc<dyn StateHost>,
166        secrets_manager: DynSecretsManager,
167    ) -> Result<Arc<Self>> {
168        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
169            .expect("telegram cache capacity must be > 0");
170        let webhook_capacity =
171            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
172        let operator_registry = OperatorRegistry::build(&packs)?;
173        let operator_metrics = Arc::new(OperatorMetrics::default());
174        let pack_runtimes = packs
175            .iter()
176            .map(|(pack, _)| Arc::clone(pack))
177            .collect::<Vec<_>>();
178        let digests = packs
179            .iter()
180            .map(|(_, digest)| digest.clone())
181            .collect::<Vec<_>>();
182        let mut pack_trace = HashMap::new();
183        for (pack, digest) in &packs {
184            let pack_id = pack.metadata().pack_id.clone();
185            let pack_ref = config
186                .pack_bindings
187                .iter()
188                .find(|binding| binding.pack_id == pack_id)
189                .map(|binding| binding.pack_ref.clone())
190                .unwrap_or_else(|| pack_id.clone());
191            pack_trace.insert(
192                pack_id,
193                PackTraceInfo {
194                    pack_ref,
195                    resolved_digest: digest.clone(),
196                },
197            );
198        }
199        let engine = Arc::new(
200            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
201                .await
202                .context("failed to prime flow engine")?,
203        );
204        let state_machine = Arc::new(
205            StateMachineRuntime::from_flow_engine(
206                Arc::clone(&config),
207                Arc::clone(&engine),
208                pack_trace,
209                session_host,
210                session_store,
211                state_host,
212                Arc::clone(&secrets_manager),
213                mocks.clone(),
214            )
215            .context("failed to initialise state machine runtime")?,
216        );
217        let http_client = Client::builder().build()?;
218        let rate_limits = config.rate_limits.clone();
219        Ok(Arc::new(Self {
220            tenant: config.tenant.clone(),
221            config,
222            packs: pack_runtimes,
223            digests,
224            engine,
225            state_machine,
226            http_client,
227            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
228            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
229            messaging_rate: Mutex::new(RateLimiter::new(
230                rate_limits.messaging_send_qps,
231                rate_limits.messaging_burst,
232            )),
233            mocks,
234            timer_handles: Mutex::new(Vec::new()),
235            secrets: secrets_manager,
236            operator_registry,
237            operator_metrics,
238        }))
239    }
240
241    pub fn tenant(&self) -> &str {
242        &self.tenant
243    }
244
245    pub fn config(&self) -> &Arc<HostConfig> {
246        &self.config
247    }
248
249    pub fn operator_registry(&self) -> &OperatorRegistry {
250        &self.operator_registry
251    }
252
253    pub fn operator_metrics(&self) -> &OperatorMetrics {
254        &self.operator_metrics
255    }
256
257    pub fn main_pack(&self) -> &Arc<PackRuntime> {
258        self.packs
259            .first()
260            .expect("tenant runtime must contain at least one pack")
261    }
262
263    pub fn pack(&self) -> Arc<PackRuntime> {
264        Arc::clone(self.main_pack())
265    }
266
267    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
268        self.packs.iter().skip(1).cloned().collect()
269    }
270
271    pub fn engine(&self) -> &Arc<FlowEngine> {
272        &self.engine
273    }
274
275    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
276        &self.state_machine
277    }
278
279    pub fn http_client(&self) -> &Client {
280        &self.http_client
281    }
282
283    pub fn digest(&self) -> Option<&str> {
284        self.digests.first().and_then(|d| d.as_deref())
285    }
286
287    pub fn overlay_digests(&self) -> Vec<Option<String>> {
288        self.digests.iter().skip(1).cloned().collect()
289    }
290
291    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
292        self.packs
293            .iter()
294            .flat_map(|pack| pack.required_secrets().iter().cloned())
295            .collect()
296    }
297
298    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
299        self.packs
300            .iter()
301            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
302            .collect()
303    }
304
305    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
306        &self.telegram_cache
307    }
308
309    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
310        &self.webhook_cache
311    }
312
313    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
314        &self.messaging_rate
315    }
316
317    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
318        self.mocks.as_ref()
319    }
320
321    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
322        self.timer_handles.lock().extend(handles);
323    }
324
325    pub fn get_secret(&self, key: &str) -> Result<String> {
326        if crate::provider_core_only::is_enabled() {
327            bail!(crate::provider_core_only::blocked_message("secrets"))
328        }
329        if !self.config.secrets_policy.is_allowed(key) {
330            bail!("secret {key} is not permitted by bindings policy");
331        }
332        let ctx = self.config.tenant_ctx();
333        let bytes = read_secret_blocking(&self.secrets, &ctx, key)
334            .context("failed to read secret from manager")?;
335        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
336        Ok(value)
337    }
338
339    pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
340        self.packs
341            .iter()
342            .find(|pack| pack.contains_component(component_ref))
343            .cloned()
344    }
345}
346
347impl Drop for TenantRuntime {
348    fn drop(&mut self) {
349        for handle in self.timer_handles.lock().drain(..) {
350            handle.abort();
351        }
352    }
353}
354
355pub struct RateLimiter {
356    allowance: f64,
357    rate: f64,
358    burst: f64,
359    last_check: Instant,
360}
361
362impl RateLimiter {
363    pub fn new(qps: u32, burst: u32) -> Self {
364        let rate = qps.max(1) as f64;
365        let burst = burst.max(1) as f64;
366        Self {
367            allowance: burst,
368            rate,
369            burst,
370            last_check: Instant::now(),
371        }
372    }
373
374    pub fn try_acquire(&mut self) -> bool {
375        let now = Instant::now();
376        let elapsed = now.duration_since(self.last_check).as_secs_f64();
377        self.last_check = now;
378        self.allowance += elapsed * self.rate;
379        if self.allowance > self.burst {
380            self.allowance = self.burst;
381        }
382        if self.allowance < 1.0 {
383            false
384        } else {
385            self.allowance -= 1.0;
386            true
387        }
388    }
389}