Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::num::NonZeroUsize;
4use std::path::Path;
5use std::sync::Arc;
6use std::time::Instant;
7
8use anyhow::{Context, Result, bail};
9use arc_swap::ArcSwap;
10use axum::http::StatusCode;
11use lru::LruCache;
12use parking_lot::Mutex;
13use reqwest::Client;
14use serde_json::Value;
15use tokio::runtime::{Handle, Runtime};
16use tokio::task::JoinHandle;
17
18use crate::config::HostConfig;
19use crate::engine::host::{SessionHost, StateHost};
20use crate::engine::runtime::StateMachineRuntime;
21use crate::operator_metrics::OperatorMetrics;
22use crate::operator_registry::OperatorRegistry;
23use crate::pack::{ComponentResolution, PackRuntime};
24use crate::runner::engine::FlowEngine;
25use crate::runner::mocks::MockLayer;
26use crate::secrets::{DynSecretsManager, read_secret_blocking};
27use crate::storage::session::DynSessionStore;
28use crate::storage::state::DynStateStore;
29use crate::trace::PackTraceInfo;
30use crate::wasi::RunnerWasiPolicy;
31use greentic_types::SecretRequirement;
32
33const TELEGRAM_CACHE_CAPACITY: usize = 1024;
34const WEBHOOK_CACHE_CAPACITY: usize = 256;
35const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
36
37/// Atomically swapped view of live tenant runtimes.
38pub struct ActivePacks {
39    inner: ArcSwap<HashMap<String, Arc<TenantRuntime>>>,
40}
41
42impl ActivePacks {
43    pub fn new() -> Self {
44        Self {
45            inner: ArcSwap::from_pointee(HashMap::new()),
46        }
47    }
48
49    pub fn load(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
50        self.inner.load().get(tenant).cloned()
51    }
52
53    pub fn snapshot(&self) -> Arc<HashMap<String, Arc<TenantRuntime>>> {
54        self.inner.load_full()
55    }
56
57    pub fn replace(&self, next: HashMap<String, Arc<TenantRuntime>>) {
58        self.inner.store(Arc::new(next));
59    }
60
61    pub fn len(&self) -> usize {
62        self.inner.load().len()
63    }
64
65    pub fn is_empty(&self) -> bool {
66        self.len() == 0
67    }
68}
69
70impl Default for ActivePacks {
71    fn default() -> Self {
72        Self::new()
73    }
74}
75
76/// Runtime bundle for a tenant pack.
77pub struct TenantRuntime {
78    tenant: String,
79    config: Arc<HostConfig>,
80    packs: Vec<Arc<PackRuntime>>,
81    digests: Vec<Option<String>>,
82    engine: Arc<FlowEngine>,
83    state_machine: Arc<StateMachineRuntime>,
84    http_client: Client,
85    telegram_cache: Mutex<LruCache<i64, StatusCode>>,
86    webhook_cache: Mutex<LruCache<String, Value>>,
87    messaging_rate: Mutex<RateLimiter>,
88    mocks: Option<Arc<MockLayer>>,
89    timer_handles: Mutex<Vec<JoinHandle<()>>>,
90    secrets: DynSecretsManager,
91    operator_registry: OperatorRegistry,
92    operator_metrics: Arc<OperatorMetrics>,
93}
94
95/// Block on a future whether or not we're already inside a tokio runtime.
96pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
97    if let Ok(handle) = Handle::try_current() {
98        handle.block_on(future)
99    } else {
100        Runtime::new()
101            .expect("failed to create tokio runtime")
102            .block_on(future)
103    }
104}
105
106impl TenantRuntime {
107    #[allow(clippy::too_many_arguments)]
108    pub async fn load(
109        pack_path: &Path,
110        config: Arc<HostConfig>,
111        mocks: Option<Arc<MockLayer>>,
112        archive_source: Option<&Path>,
113        digest: Option<String>,
114        wasi_policy: Arc<RunnerWasiPolicy>,
115        session_host: Arc<dyn SessionHost>,
116        session_store: DynSessionStore,
117        state_store: DynStateStore,
118        state_host: Arc<dyn StateHost>,
119        secrets_manager: DynSecretsManager,
120    ) -> Result<Arc<Self>> {
121        let oauth_config = config.oauth_broker_config();
122        let pack = Arc::new(
123            PackRuntime::load(
124                pack_path,
125                Arc::clone(&config),
126                mocks.clone(),
127                archive_source,
128                Some(Arc::clone(&session_store)),
129                Some(Arc::clone(&state_store)),
130                Arc::clone(&wasi_policy),
131                Arc::clone(&secrets_manager),
132                oauth_config.clone(),
133                true,
134                ComponentResolution::default(),
135            )
136            .await
137            .with_context(|| {
138                format!(
139                    "failed to load pack {} for tenant {}",
140                    pack_path.display(),
141                    config.tenant
142                )
143            })?,
144        );
145        Self::from_packs(
146            config,
147            vec![(pack, digest)],
148            mocks,
149            session_host,
150            session_store,
151            state_store,
152            state_host,
153            secrets_manager,
154        )
155        .await
156    }
157
158    #[allow(clippy::too_many_arguments)]
159    pub async fn from_packs(
160        config: Arc<HostConfig>,
161        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
162        mocks: Option<Arc<MockLayer>>,
163        session_host: Arc<dyn SessionHost>,
164        session_store: DynSessionStore,
165        _state_store: DynStateStore,
166        state_host: Arc<dyn StateHost>,
167        secrets_manager: DynSecretsManager,
168    ) -> Result<Arc<Self>> {
169        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
170            .expect("telegram cache capacity must be > 0");
171        let webhook_capacity =
172            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
173        let operator_registry = OperatorRegistry::build(&packs)?;
174        let operator_metrics = Arc::new(OperatorMetrics::default());
175        let pack_runtimes = packs
176            .iter()
177            .map(|(pack, _)| Arc::clone(pack))
178            .collect::<Vec<_>>();
179        let digests = packs
180            .iter()
181            .map(|(_, digest)| digest.clone())
182            .collect::<Vec<_>>();
183        let mut pack_trace = HashMap::new();
184        for (pack, digest) in &packs {
185            let pack_id = pack.metadata().pack_id.clone();
186            let pack_ref = config
187                .pack_bindings
188                .iter()
189                .find(|binding| binding.pack_id == pack_id)
190                .map(|binding| binding.pack_ref.clone())
191                .unwrap_or_else(|| pack_id.clone());
192            pack_trace.insert(
193                pack_id,
194                PackTraceInfo {
195                    pack_ref,
196                    resolved_digest: digest.clone(),
197                },
198            );
199        }
200        let engine = Arc::new(
201            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
202                .await
203                .context("failed to prime flow engine")?,
204        );
205        let state_machine = Arc::new(
206            StateMachineRuntime::from_flow_engine(
207                Arc::clone(&config),
208                Arc::clone(&engine),
209                pack_trace,
210                session_host,
211                session_store,
212                state_host,
213                Arc::clone(&secrets_manager),
214                mocks.clone(),
215            )
216            .context("failed to initialise state machine runtime")?,
217        );
218        let http_client = Client::builder().build()?;
219        let rate_limits = config.rate_limits.clone();
220        Ok(Arc::new(Self {
221            tenant: config.tenant.clone(),
222            config,
223            packs: pack_runtimes,
224            digests,
225            engine,
226            state_machine,
227            http_client,
228            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
229            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
230            messaging_rate: Mutex::new(RateLimiter::new(
231                rate_limits.messaging_send_qps,
232                rate_limits.messaging_burst,
233            )),
234            mocks,
235            timer_handles: Mutex::new(Vec::new()),
236            secrets: secrets_manager,
237            operator_registry,
238            operator_metrics,
239        }))
240    }
241
242    pub fn tenant(&self) -> &str {
243        &self.tenant
244    }
245
246    pub fn config(&self) -> &Arc<HostConfig> {
247        &self.config
248    }
249
250    pub fn operator_registry(&self) -> &OperatorRegistry {
251        &self.operator_registry
252    }
253
254    pub fn operator_metrics(&self) -> &OperatorMetrics {
255        &self.operator_metrics
256    }
257
258    pub fn main_pack(&self) -> &Arc<PackRuntime> {
259        self.packs
260            .first()
261            .expect("tenant runtime must contain at least one pack")
262    }
263
264    pub fn pack(&self) -> Arc<PackRuntime> {
265        Arc::clone(self.main_pack())
266    }
267
268    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
269        self.packs.iter().skip(1).cloned().collect()
270    }
271
272    pub fn engine(&self) -> &Arc<FlowEngine> {
273        &self.engine
274    }
275
276    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
277        &self.state_machine
278    }
279
280    pub fn http_client(&self) -> &Client {
281        &self.http_client
282    }
283
284    pub fn digest(&self) -> Option<&str> {
285        self.digests.first().and_then(|d| d.as_deref())
286    }
287
288    pub fn overlay_digests(&self) -> Vec<Option<String>> {
289        self.digests.iter().skip(1).cloned().collect()
290    }
291
292    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
293        self.packs
294            .iter()
295            .flat_map(|pack| pack.required_secrets().iter().cloned())
296            .collect()
297    }
298
299    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
300        self.packs
301            .iter()
302            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
303            .collect()
304    }
305
306    pub fn telegram_cache(&self) -> &Mutex<LruCache<i64, StatusCode>> {
307        &self.telegram_cache
308    }
309
310    pub fn webhook_cache(&self) -> &Mutex<LruCache<String, Value>> {
311        &self.webhook_cache
312    }
313
314    pub fn messaging_rate(&self) -> &Mutex<RateLimiter> {
315        &self.messaging_rate
316    }
317
318    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
319        self.mocks.as_ref()
320    }
321
322    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
323        self.timer_handles.lock().extend(handles);
324    }
325
326    pub fn get_secret(&self, key: &str) -> Result<String> {
327        if crate::provider_core_only::is_enabled() {
328            bail!(crate::provider_core_only::blocked_message("secrets"))
329        }
330        if !self.config.secrets_policy.is_allowed(key) {
331            bail!("secret {key} is not permitted by bindings policy");
332        }
333        let ctx = self.config.tenant_ctx();
334        let bytes = read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, key)
335            .context("failed to read secret from manager")?;
336        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
337        Ok(value)
338    }
339
340    pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
341        self.packs
342            .iter()
343            .find(|pack| pack.contains_component(component_ref))
344            .cloned()
345    }
346}
347
348impl Drop for TenantRuntime {
349    fn drop(&mut self) {
350        for handle in self.timer_handles.lock().drain(..) {
351            handle.abort();
352        }
353    }
354}
355
356pub struct RateLimiter {
357    allowance: f64,
358    rate: f64,
359    burst: f64,
360    last_check: Instant,
361}
362
363impl RateLimiter {
364    pub fn new(qps: u32, burst: u32) -> Self {
365        let rate = qps.max(1) as f64;
366        let burst = burst.max(1) as f64;
367        Self {
368            allowance: burst,
369            rate,
370            burst,
371            last_check: Instant::now(),
372        }
373    }
374
375    pub fn try_acquire(&mut self) -> bool {
376        let now = Instant::now();
377        let elapsed = now.duration_since(self.last_check).as_secs_f64();
378        self.last_check = now;
379        self.allowance += elapsed * self.rate;
380        if self.allowance > self.burst {
381            self.allowance = self.burst;
382        }
383        if self.allowance < 1.0 {
384            false
385        } else {
386            self.allowance -= 1.0;
387            true
388        }
389    }
390}