Skip to main content

greentic_runner_host/
runtime.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::future::Future;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use anyhow::{Context, Result, anyhow, bail};
7use arc_swap::ArcSwap;
8use parking_lot::Mutex;
9use reqwest::Client;
10use serde_json::Value;
11use tokio::runtime::{Handle, Runtime};
12use tokio::task::JoinHandle;
13
14use crate::config::HostConfig;
15use crate::engine::host::{SessionHost, StateHost};
16use crate::engine::runtime::StateMachineRuntime;
17use crate::oauth::{OAuthBrokerConfig, request_resource_token};
18use crate::operator_metrics::OperatorMetrics;
19use crate::operator_registry::OperatorRegistry;
20use crate::pack::{ComponentResolution, PackRuntime};
21use crate::runner::adapt_events_email::{
22    EmailExecutionPlan, EmailSendRequest, build_email_execution_plan, execute_email_request,
23};
24use crate::runner::contract_cache::{ContractCache, ContractCacheStats};
25use crate::runner::engine::FlowEngine;
26use crate::runner::mocks::MockLayer;
27use crate::secrets::{DynSecretsManager, canonicalize_secret_key, read_secret_blocking};
28use crate::storage::session::DynSessionStore;
29use crate::storage::state::DynStateStore;
30use crate::telemetry::RolloutIds;
31use crate::trace::PackTraceInfo;
32use crate::wasi::RunnerWasiPolicy;
33use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
34use greentic_types::SecretRequirement;
35use runner_core::packs::PackDigest;
36
37const RUNTIME_SECRETS_PACK_ID: &str = "_runner";
38
39/// Key identifying a live runtime in [`ActivePacks`].
40///
41/// Tenant-only entries use [`RuntimeKey::legacy`] (all id fields `None`);
42/// fully-qualified entries use [`RuntimeKey::revision`] (all `Some`). The two
43/// forms never collide, so both can coexist in the same map.
44#[derive(Clone, Debug, PartialEq, Eq, Hash)]
45pub struct RuntimeKey {
46    pub tenant: String,
47    pub deployment_id: Option<DeploymentId>,
48    pub bundle_id: Option<BundleId>,
49    pub revision_id: Option<RevisionId>,
50}
51
52impl RuntimeKey {
53    /// Tenant-only key for the pre-revision-routing path.
54    pub fn legacy(tenant: impl Into<String>) -> Self {
55        Self {
56            tenant: tenant.into(),
57            deployment_id: None,
58            bundle_id: None,
59            revision_id: None,
60        }
61    }
62
63    /// Fully-qualified key for a specific deployment/bundle/revision.
64    pub fn revision(
65        tenant: impl Into<String>,
66        deployment_id: DeploymentId,
67        bundle_id: BundleId,
68        revision_id: RevisionId,
69    ) -> Self {
70        Self {
71            tenant: tenant.into(),
72            deployment_id: Some(deployment_id),
73            bundle_id: Some(bundle_id),
74            revision_id: Some(revision_id),
75        }
76    }
77
78    /// `true` for the tenant-only key produced by [`legacy`](Self::legacy).
79    pub fn is_legacy(&self) -> bool {
80        self.deployment_id.is_none() && self.bundle_id.is_none() && self.revision_id.is_none()
81    }
82}
83
84/// Build the next runtime map for a legacy (tenant-only) reload: install the
85/// freshly-resolved `legacy` entries and carry over every revision-keyed entry
86/// untouched. Revision runtimes are owned by the deployment lifecycle, not the
87/// pack watcher, so a tenant-pack reload must not evict them.
88fn merge_legacy_reload<V: Clone>(
89    prev: &HashMap<RuntimeKey, V>,
90    mut legacy: HashMap<RuntimeKey, V>,
91) -> HashMap<RuntimeKey, V> {
92    for (key, value) in prev {
93        if !key.is_legacy() {
94            legacy.insert(key.clone(), value.clone());
95        }
96    }
97    legacy
98}
99
100/// Pure swap helper for [`ActivePacks::remove_revision`]: clone the prev map
101/// minus `key`, return it alongside the removed value. `None` when the key was
102/// absent so the caller can skip the `ArcSwap` store. Generic over `V` so the
103/// swap logic is testable without standing up a real `TenantRuntime`.
104fn remove_keyed_entry<V: Clone>(
105    prev: &HashMap<RuntimeKey, V>,
106    key: &RuntimeKey,
107) -> Option<(HashMap<RuntimeKey, V>, V)> {
108    let removed = prev.get(key)?.clone();
109    let mut next = prev.clone();
110    next.remove(key);
111    Some((next, removed))
112}
113
114/// Pure swap helper for [`ActivePacks::insert_revision`]: clone the prev map
115/// and insert `value` under `key`, returning the next map. Generic over `V` so
116/// the swap logic is testable without standing up a real `TenantRuntime`.
117fn insert_keyed_entry<V: Clone>(
118    prev: &HashMap<RuntimeKey, V>,
119    key: RuntimeKey,
120    value: V,
121) -> HashMap<RuntimeKey, V> {
122    let mut next = prev.clone();
123    next.insert(key, value);
124    next
125}
126
127/// Atomically swapped view of live tenant runtimes.
128///
129/// Reads are lock-free via `ArcSwap`. Mutations serialize on `write_lock` so a
130/// read-modify-write swap (e.g. a watcher reload preserving revision entries)
131/// cannot interleave with a concurrent insert and clobber the other's update.
132pub struct ActivePacks {
133    inner: ArcSwap<HashMap<RuntimeKey, Arc<TenantRuntime>>>,
134    write_lock: Mutex<()>,
135}
136
137impl ActivePacks {
138    pub fn new() -> Self {
139        Self {
140            inner: ArcSwap::from_pointee(HashMap::new()),
141            write_lock: Mutex::new(()),
142        }
143    }
144
145    /// Look up the tenant-only (legacy) runtime. Compatibility helper for the
146    /// pre-revision-routing path; see [`load_revision`](Self::load_revision).
147    pub fn load_pack(&self, tenant: &str) -> Option<Arc<TenantRuntime>> {
148        self.inner.load().get(&RuntimeKey::legacy(tenant)).cloned()
149    }
150
151    /// Look up the runtime for a specific deployment/bundle/revision.
152    pub fn load_revision(
153        &self,
154        tenant: &str,
155        deployment_id: DeploymentId,
156        bundle_id: BundleId,
157        revision_id: RevisionId,
158    ) -> Option<Arc<TenantRuntime>> {
159        self.inner
160            .load()
161            .get(&RuntimeKey::revision(
162                tenant,
163                deployment_id,
164                bundle_id,
165                revision_id,
166            ))
167            .cloned()
168    }
169
170    pub fn snapshot(&self) -> Arc<HashMap<RuntimeKey, Arc<TenantRuntime>>> {
171        self.inner.load_full()
172    }
173
174    /// Insert (or replace) a single tenant-only runtime, preserving all other
175    /// entries — including revision-keyed ones.
176    pub fn insert_pack(&self, tenant: &str, runtime: Arc<TenantRuntime>) {
177        let _guard = self.write_lock.lock();
178        let mut next = (*self.inner.load_full()).clone();
179        next.insert(RuntimeKey::legacy(tenant), runtime);
180        self.inner.store(Arc::new(next));
181    }
182
183    /// Insert (or replace) a single revision-keyed runtime, preserving every
184    /// other entry — the tenant-only legacy entry and sibling revisions alike.
185    /// This is the producer the deployment warm path calls once a revision's
186    /// packs are loaded; the pack watcher's [`replace_legacy`](Self::replace_legacy)
187    /// then carries the entry across tenant-pack reloads untouched.
188    ///
189    /// Fails closed when the runtime's identity does not match the key it would
190    /// be stored under: a wiring bug that files one tenant's runtime under
191    /// another tenant's revision (breaking isolation) or stores a runtime whose
192    /// telemetry reports a different revision than it routes (breaking rollout
193    /// attribution) is rejected here rather than silently serving traffic under
194    /// the wrong identity. Pairs with [`TenantRuntime::load_revision`], which
195    /// derives the runtime's rollout identity from these same ids.
196    pub fn insert_revision(
197        &self,
198        tenant: &str,
199        deployment_id: DeploymentId,
200        bundle_id: BundleId,
201        revision_id: RevisionId,
202        runtime: Arc<TenantRuntime>,
203    ) -> Result<()> {
204        if runtime.tenant() != tenant {
205            bail!(
206                "revision runtime tenant `{}` does not match key tenant `{tenant}`",
207                runtime.tenant()
208            );
209        }
210        let ids = runtime.engine().rollout_ids();
211        let key_deployment = deployment_id.to_string();
212        let key_bundle = bundle_id.as_str();
213        let key_revision = revision_id.to_string();
214        if ids.deployment_id.as_deref() != Some(key_deployment.as_str())
215            || ids.bundle_id.as_deref() != Some(key_bundle)
216            || ids.revision_id.as_deref() != Some(key_revision.as_str())
217        {
218            bail!(
219                "revision runtime rollout identity (deployment={:?}, bundle={:?}, revision={:?}) \
220                 does not match key (deployment=`{key_deployment}`, bundle=`{key_bundle}`, \
221                 revision=`{key_revision}`)",
222                ids.deployment_id,
223                ids.bundle_id,
224                ids.revision_id
225            );
226        }
227        let _guard = self.write_lock.lock();
228        let key = RuntimeKey::revision(tenant, deployment_id, bundle_id, revision_id);
229        let next = insert_keyed_entry(&self.inner.load_full(), key, runtime);
230        self.inner.store(Arc::new(next));
231        Ok(())
232    }
233
234    /// Swap in a freshly-resolved set of tenant-only (legacy) runtimes while
235    /// carrying over every revision-keyed entry. Used by the pack watcher, whose
236    /// index is authoritative for tenant packs but not for deployment revisions.
237    pub fn replace_legacy(&self, legacy: HashMap<RuntimeKey, Arc<TenantRuntime>>) {
238        let _guard = self.write_lock.lock();
239        let prev = self.inner.load_full();
240        let next = merge_legacy_reload(&prev, legacy);
241        self.inner.store(Arc::new(next));
242    }
243
244    /// Remove and return the runtime for a single revision-keyed entry,
245    /// preserving every other entry (legacy and revision alike). Used by the
246    /// drain coordinator (`gtc op revisions drain`) after the drain window
247    /// closes to tear down exactly the one revision being retired. `None` if
248    /// no such entry was present — idempotent for safe re-runs.
249    pub fn remove_revision(
250        &self,
251        tenant: &str,
252        deployment_id: DeploymentId,
253        bundle_id: BundleId,
254        revision_id: RevisionId,
255    ) -> Option<Arc<TenantRuntime>> {
256        let _guard = self.write_lock.lock();
257        let prev = self.inner.load_full();
258        let key = RuntimeKey::revision(tenant, deployment_id, bundle_id, revision_id);
259        let (next, removed) = remove_keyed_entry(&prev, &key)?;
260        self.inner.store(Arc::new(next));
261        Some(removed)
262    }
263
264    /// Replace the entire map, dropping every entry (legacy and revision alike).
265    /// Used for full host stop.
266    pub fn replace(&self, next: HashMap<RuntimeKey, Arc<TenantRuntime>>) {
267        let _guard = self.write_lock.lock();
268        self.inner.store(Arc::new(next));
269    }
270
271    pub fn len(&self) -> usize {
272        self.inner.load().len()
273    }
274
275    pub fn is_empty(&self) -> bool {
276        self.len() == 0
277    }
278}
279
280impl Default for ActivePacks {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Runtime bundle for a tenant pack.
287pub struct TenantRuntime {
288    tenant: String,
289    config: Arc<HostConfig>,
290    packs: Vec<Arc<PackRuntime>>,
291    digests: Vec<Option<String>>,
292    engine: Arc<FlowEngine>,
293    state_machine: Arc<StateMachineRuntime>,
294    /// The session store this runtime was loaded with. Shared with the inner
295    /// state machine — re-exposed here so callers outside the flow run loop
296    /// (M1.5 welcome-flow first-contact probe) can query the SAME bucket the
297    /// state machine will read/write. For revision mode each revision passes
298    /// its own store into `load_revision`; querying the RunnerHost-level
299    /// store would key the wrong bucket.
300    session_store: DynSessionStore,
301    http_client: Client,
302    mocks: Option<Arc<MockLayer>>,
303    timer_handles: Mutex<Vec<JoinHandle<()>>>,
304    secrets: DynSecretsManager,
305    operator_registry: OperatorRegistry,
306    operator_metrics: Arc<OperatorMetrics>,
307    contract_cache: ContractCache,
308}
309
310#[derive(Clone)]
311pub struct ResolvedComponent {
312    pub digest: String,
313    pub component_ref: String,
314    pub pack: Arc<PackRuntime>,
315}
316
317/// One pinned pack of a deployment revision: the on-disk path plus the
318/// `algo:value` content digest the deployment staged it under.
319/// [`TenantRuntime::load_revision`] fails closed when the file no longer
320/// matches the digest, defending the stage→warm window against a swapped or
321/// stale cache path.
322#[derive(Clone, Debug)]
323pub struct RevisionPackRef {
324    pub path: PathBuf,
325    pub digest: String,
326}
327
328/// Block on a future whether or not we're already inside a tokio runtime.
329pub fn block_on<F: Future<Output = R>, R>(future: F) -> R {
330    if let Ok(handle) = Handle::try_current() {
331        handle.block_on(future)
332    } else {
333        Runtime::new()
334            .expect("failed to create tokio runtime")
335            .block_on(future)
336    }
337}
338
339impl TenantRuntime {
340    #[allow(clippy::too_many_arguments)]
341    pub async fn load(
342        pack_path: &Path,
343        config: Arc<HostConfig>,
344        mocks: Option<Arc<MockLayer>>,
345        archive_source: Option<&Path>,
346        digest: Option<String>,
347        wasi_policy: Arc<RunnerWasiPolicy>,
348        session_host: Arc<dyn SessionHost>,
349        session_store: DynSessionStore,
350        state_store: DynStateStore,
351        state_host: Arc<dyn StateHost>,
352        secrets_manager: DynSecretsManager,
353    ) -> Result<Arc<Self>> {
354        let pack = Self::load_pack_runtime(
355            pack_path,
356            &config,
357            mocks.clone(),
358            archive_source,
359            &wasi_policy,
360            &session_store,
361            &state_store,
362            &secrets_manager,
363            &BTreeMap::new(),
364            &BTreeMap::new(),
365            None,
366        )
367        .await?;
368        Self::from_packs(
369            config,
370            vec![(pack, digest)],
371            mocks,
372            session_host,
373            session_store,
374            state_store,
375            state_host,
376            secrets_manager,
377        )
378        .await
379    }
380
381    /// Build a revision-keyed runtime from its pinned pack list (the resolved
382    /// `pack_list` of a deployment revision). The first entry is the main pack;
383    /// the rest are overlays.
384    ///
385    /// Fails closed if any pack file no longer matches the digest the
386    /// deployment pinned it under — defending the stage→warm window against a
387    /// swapped or stale cache path. The verified digests are threaded into the
388    /// runtime so admin status, traces, and contract hashes report the real
389    /// content (parity with the legacy index path). The rollout telemetry
390    /// identity is **derived from** `deployment_id` / `bundle_id` /
391    /// `revision_id` / `customer_id`, so the engine's attribution cannot drift
392    /// from the key the runtime is later inserted under.
393    #[allow(clippy::too_many_arguments)]
394    pub async fn load_revision(
395        pack_refs: &[RevisionPackRef],
396        config: Arc<HostConfig>,
397        mocks: Option<Arc<MockLayer>>,
398        wasi_policy: Arc<RunnerWasiPolicy>,
399        session_host: Arc<dyn SessionHost>,
400        session_store: DynSessionStore,
401        state_store: DynStateStore,
402        state_host: Arc<dyn StateHost>,
403        secrets_manager: DynSecretsManager,
404        deployment_id: DeploymentId,
405        bundle_id: BundleId,
406        revision_id: RevisionId,
407        customer_id: Option<String>,
408        runtime_configs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, Value>>>,
409        runtime_refs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, String>>>,
410        runtime_ref_resolver: Option<Arc<dyn crate::runtime_refs::RuntimeRefResolver>>,
411    ) -> Result<Arc<Self>> {
412        if pack_refs.is_empty() {
413            bail!(
414                "revision runtime for tenant {} requires at least one pack",
415                config.tenant
416            );
417        }
418        let mut packs = Vec::with_capacity(pack_refs.len());
419        let mut seen_pack_ids = HashSet::with_capacity(pack_refs.len());
420        for pack_ref in pack_refs {
421            let expected = PackDigest::parse(&pack_ref.digest).with_context(|| {
422                format!(
423                    "revision pack `{}` has an invalid digest `{}`",
424                    pack_ref.path.display(),
425                    pack_ref.digest
426                )
427            })?;
428            // `matches_file` always hashes with SHA-256, so a digest pinned under
429            // any other algorithm could never match — reject it with a clear
430            // message rather than the misleading "does not match" below.
431            if expected.algorithm() != "sha256" {
432                bail!(
433                    "revision pack `{}` pins unsupported digest algorithm `{}`; only sha256 is supported",
434                    pack_ref.path.display(),
435                    expected.algorithm()
436                );
437            }
438            if !expected.matches_file(&pack_ref.path).with_context(|| {
439                format!(
440                    "hashing revision pack `{}` for digest verification",
441                    pack_ref.path.display()
442                )
443            })? {
444                bail!(
445                    "revision pack `{}` does not match pinned digest `{}`",
446                    pack_ref.path.display(),
447                    pack_ref.digest
448                );
449            }
450            let pack = Self::load_pack_runtime(
451                &pack_ref.path,
452                &config,
453                mocks.clone(),
454                None,
455                &wasi_policy,
456                &session_store,
457                &state_store,
458                &secrets_manager,
459                runtime_configs_by_pack_id,
460                runtime_refs_by_pack_id,
461                runtime_ref_resolver.as_ref(),
462            )
463            .await?;
464            // Reject duplicate pack_id within a single revision — two refs
465            // resolving to the same pack_id would silently share config/routing
466            // entries and produce an ambiguous runtime.
467            let pack_id = pack.metadata().pack_id.clone();
468            if !seen_pack_ids.insert(pack_id.clone()) {
469                bail!(
470                    "revision for tenant {} contains duplicate pack_id `{}` (path `{}`)",
471                    config.tenant,
472                    pack_id,
473                    pack_ref.path.display(),
474                );
475            }
476            packs.push((pack, Some(expected.raw_string())));
477        }
478        let rollout = RolloutIds {
479            customer_id,
480            deployment_id: Some(deployment_id.to_string()),
481            bundle_id: Some(bundle_id.as_str().to_string()),
482            revision_id: Some(revision_id.to_string()),
483        };
484        Self::from_packs_with_rollout(
485            config,
486            packs,
487            mocks,
488            session_host,
489            session_store,
490            state_store,
491            state_host,
492            secrets_manager,
493            rollout,
494        )
495        .await
496    }
497
498    /// Load a single [`PackRuntime`] from a path, sharing the tenant's session /
499    /// state / secrets backends. Shared by [`load`](Self::load) (one pack) and
500    /// [`load_revision`](Self::load_revision) (the revision's pack list).
501    ///
502    /// `runtime_configs_by_pack_id` is consulted AFTER the pack loads (the
503    /// `pack_id` is only known after the manifest read) and BEFORE the
504    /// `Arc<PackRuntime>` is created. A matching entry is injected via
505    /// [`PackRuntime::set_runtime_config_non_secret`] so the C4.3 producer
506    /// plumbing requires no post-hoc `Arc::get_mut` dance — the single-pack
507    /// [`load`](Self::load) path just passes an empty map.
508    ///
509    /// `runtime_refs_by_pack_id` mirrors the same shape for the C5
510    /// `pack-config.v1.runtime_refs` channel; a matching entry is injected
511    /// via [`PackRuntime::set_runtime_refs`] alongside `runtime_ref_resolver`.
512    #[allow(clippy::too_many_arguments)]
513    async fn load_pack_runtime(
514        pack_path: &Path,
515        config: &Arc<HostConfig>,
516        mocks: Option<Arc<MockLayer>>,
517        archive_source: Option<&Path>,
518        wasi_policy: &Arc<RunnerWasiPolicy>,
519        session_store: &DynSessionStore,
520        state_store: &DynStateStore,
521        secrets_manager: &DynSecretsManager,
522        runtime_configs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, Value>>>,
523        runtime_refs_by_pack_id: &BTreeMap<String, Arc<BTreeMap<String, String>>>,
524        runtime_ref_resolver: Option<&Arc<dyn crate::runtime_refs::RuntimeRefResolver>>,
525    ) -> Result<Arc<PackRuntime>> {
526        let oauth_config = config.oauth_broker_config();
527        let mut pack = PackRuntime::load(
528            pack_path,
529            Arc::clone(config),
530            mocks,
531            archive_source,
532            Some(Arc::clone(session_store)),
533            Some(Arc::clone(state_store)),
534            Arc::clone(wasi_policy),
535            Arc::clone(secrets_manager),
536            oauth_config,
537            true,
538            ComponentResolution::default(),
539        )
540        .await
541        .with_context(|| {
542            format!(
543                "failed to load pack {} for tenant {}",
544                pack_path.display(),
545                config.tenant
546            )
547        })?;
548        let pack_id = pack.metadata().pack_id.clone();
549        if let Some(non_secret) = runtime_configs_by_pack_id.get(pack_id.as_str()) {
550            pack.set_runtime_config_non_secret(Some(Arc::clone(non_secret)));
551        }
552        if let Some(refs) = runtime_refs_by_pack_id.get(pack_id.as_str()) {
553            let resolver = runtime_ref_resolver.ok_or_else(|| {
554                anyhow!(
555                    "pack `{}` has runtime_refs bound but no RuntimeRefResolver was provided",
556                    pack_id,
557                )
558            })?;
559            pack.set_runtime_refs(Some(crate::runtime_refs::RuntimeRefsInjection {
560                refs: Arc::clone(refs),
561                resolver: Arc::clone(resolver),
562            }));
563        }
564        Ok(Arc::new(pack))
565    }
566
567    #[allow(clippy::too_many_arguments)]
568    pub async fn from_packs(
569        config: Arc<HostConfig>,
570        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
571        mocks: Option<Arc<MockLayer>>,
572        session_host: Arc<dyn SessionHost>,
573        session_store: DynSessionStore,
574        state_store: DynStateStore,
575        state_host: Arc<dyn StateHost>,
576        secrets_manager: DynSecretsManager,
577    ) -> Result<Arc<Self>> {
578        Self::from_packs_with_rollout(
579            config,
580            packs,
581            mocks,
582            session_host,
583            session_store,
584            state_store,
585            state_host,
586            secrets_manager,
587            RolloutIds::default(),
588        )
589        .await
590    }
591
592    /// Like [`from_packs`](Self::from_packs) but stamps `rollout` onto the flow
593    /// engine so every span this runtime emits carries the deployment / bundle /
594    /// revision / customer identity. [`from_packs`](Self::from_packs) is the
595    /// legacy (tenant-only) path and passes [`RolloutIds::default`].
596    #[allow(clippy::too_many_arguments)]
597    pub(crate) async fn from_packs_with_rollout(
598        config: Arc<HostConfig>,
599        packs: Vec<(Arc<PackRuntime>, Option<String>)>,
600        mocks: Option<Arc<MockLayer>>,
601        session_host: Arc<dyn SessionHost>,
602        session_store: DynSessionStore,
603        _state_store: DynStateStore,
604        state_host: Arc<dyn StateHost>,
605        secrets_manager: DynSecretsManager,
606        rollout: RolloutIds,
607    ) -> Result<Arc<Self>> {
608        let operator_registry = OperatorRegistry::build(&packs)?;
609        let operator_metrics = Arc::new(OperatorMetrics::default());
610        let pack_runtimes = packs
611            .iter()
612            .map(|(pack, _)| Arc::clone(pack))
613            .collect::<Vec<_>>();
614        let digests = packs
615            .iter()
616            .map(|(_, digest)| digest.clone())
617            .collect::<Vec<_>>();
618        let mut pack_trace = HashMap::new();
619        for (pack, digest) in &packs {
620            let pack_id = pack.metadata().pack_id.clone();
621            let pack_ref = config
622                .pack_bindings
623                .iter()
624                .find(|binding| binding.pack_id == pack_id)
625                .map(|binding| binding.pack_ref.clone())
626                .unwrap_or_else(|| pack_id.clone());
627            pack_trace.insert(
628                pack_id,
629                PackTraceInfo {
630                    pack_ref,
631                    resolved_digest: digest.clone(),
632                },
633            );
634        }
635        let engine = Arc::new(
636            FlowEngine::new(pack_runtimes.clone(), Arc::clone(&config))
637                .await
638                .context("failed to prime flow engine")?
639                .with_rollout_ids(rollout),
640        );
641        let state_machine = Arc::new(
642            StateMachineRuntime::from_flow_engine(
643                Arc::clone(&config),
644                Arc::clone(&engine),
645                pack_trace,
646                session_host,
647                Arc::clone(&session_store),
648                state_host,
649                Arc::clone(&secrets_manager),
650                mocks.clone(),
651            )
652            .context("failed to initialise state machine runtime")?,
653        );
654        let http_client = Client::builder().build()?;
655        Ok(Arc::new(Self {
656            tenant: config.tenant.clone(),
657            config,
658            packs: pack_runtimes,
659            digests,
660            engine,
661            state_machine,
662            session_store,
663            http_client,
664            mocks,
665            timer_handles: Mutex::new(Vec::new()),
666            secrets: secrets_manager,
667            operator_registry,
668            operator_metrics,
669            contract_cache: ContractCache::from_env(),
670        }))
671    }
672
673    pub fn tenant(&self) -> &str {
674        &self.tenant
675    }
676
677    pub fn config(&self) -> &Arc<HostConfig> {
678        &self.config
679    }
680
681    pub fn operator_registry(&self) -> &OperatorRegistry {
682        &self.operator_registry
683    }
684
685    pub fn operator_metrics(&self) -> &OperatorMetrics {
686        &self.operator_metrics
687    }
688
689    pub fn contract_cache(&self) -> &ContractCache {
690        &self.contract_cache
691    }
692
693    pub fn contract_cache_stats(&self) -> ContractCacheStats {
694        self.contract_cache.stats()
695    }
696
697    pub fn main_pack(&self) -> &Arc<PackRuntime> {
698        self.packs
699            .first()
700            .expect("tenant runtime must contain at least one pack")
701    }
702
703    pub fn pack(&self) -> Arc<PackRuntime> {
704        Arc::clone(self.main_pack())
705    }
706
707    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
708        self.packs.iter().skip(1).cloned().collect()
709    }
710
711    /// All packs in declaration order: main pack at index 0, overlays after.
712    /// Borrowed slice — no `Arc` clones, no allocation. Use this when you only
713    /// need to iterate the full pack list; prefer [`pack`]/[`overlays`] when
714    /// you need to hand out owned `Arc`s downstream.
715    ///
716    /// [`pack`]: TenantRuntime::pack
717    /// [`overlays`]: TenantRuntime::overlays
718    pub fn all_packs(&self) -> &[Arc<PackRuntime>] {
719        &self.packs
720    }
721
722    /// Resolved content digest of each loaded pack, index-aligned with the pack
723    /// list. `Some` for revision runtimes (verified at load) and the legacy
724    /// index path; `None` only when a digest was unavailable.
725    pub fn pack_digests(&self) -> &[Option<String>] {
726        &self.digests
727    }
728
729    pub fn engine(&self) -> &Arc<FlowEngine> {
730        &self.engine
731    }
732
733    pub fn state_machine(&self) -> &Arc<StateMachineRuntime> {
734        &self.state_machine
735    }
736
737    /// Shared session store. M1.5: lets `apply_welcome_flow_override`
738    /// build a `FlowResumeStore` against the SAME bucket the state machine
739    /// will read/write — important under revision mode where each revision
740    /// is given its own store via `load_revision`.
741    pub fn session_store(&self) -> &DynSessionStore {
742        &self.session_store
743    }
744
745    pub fn http_client(&self) -> &Client {
746        &self.http_client
747    }
748
749    pub fn oauth_config(&self) -> Option<OAuthBrokerConfig> {
750        self.config.oauth_broker_config()
751    }
752
753    pub fn digest(&self) -> Option<&str> {
754        self.digests.first().and_then(|d| d.as_deref())
755    }
756
757    pub fn overlay_digests(&self) -> Vec<Option<String>> {
758        self.digests.iter().skip(1).cloned().collect()
759    }
760
761    pub fn required_secrets(&self) -> Vec<SecretRequirement> {
762        self.packs
763            .iter()
764            .flat_map(|pack| pack.required_secrets().iter().cloned())
765            .collect()
766    }
767
768    pub fn missing_secrets(&self) -> Vec<SecretRequirement> {
769        self.packs
770            .iter()
771            .flat_map(|pack| pack.missing_secrets(&self.config.tenant_ctx()))
772            .collect()
773    }
774
775    pub fn mocks(&self) -> Option<&Arc<MockLayer>> {
776        self.mocks.as_ref()
777    }
778
779    pub fn register_timers(&self, handles: Vec<JoinHandle<()>>) {
780        self.timer_handles.lock().extend(handles);
781    }
782
783    pub fn get_secret(&self, key: &str) -> Result<String> {
784        if crate::provider_core_only::is_enabled() {
785            bail!(crate::provider_core_only::blocked_message("secrets"))
786        }
787        if !self.config.secrets_policy.is_allowed(key) {
788            bail!("secret {key} is not permitted by bindings policy");
789        }
790        let ctx = self.config.tenant_ctx();
791        let canonical_key = canonicalize_secret_key(key);
792        let bytes =
793            read_secret_blocking(&self.secrets, &ctx, RUNTIME_SECRETS_PACK_ID, &canonical_key)
794                .context("failed to read secret from manager")?;
795        let value = String::from_utf8(bytes).context("secret value is not valid UTF-8")?;
796        Ok(value)
797    }
798
799    pub fn build_events_email_execution_plan(
800        &self,
801        tenant: &greentic_types::TenantCtx,
802        request: &EmailSendRequest,
803    ) -> Result<EmailExecutionPlan> {
804        let oauth = self
805            .oauth_config()
806            .ok_or_else(|| anyhow!("oauth broker config is not configured for tenant runtime"))?;
807        build_email_execution_plan(&oauth, tenant, request)
808    }
809
810    pub async fn execute_events_email_request(
811        &self,
812        access_token: &str,
813        request: &EmailSendRequest,
814    ) -> Result<()> {
815        execute_email_request(self.http_client(), access_token, request).await
816    }
817
818    pub async fn execute_events_email_with_oauth(
819        &self,
820        tenant: &greentic_types::TenantCtx,
821        request: &EmailSendRequest,
822    ) -> Result<()> {
823        let plan = self.build_events_email_execution_plan(tenant, request)?;
824        let token = request_resource_token(self.http_client(), &plan.token_request).await?;
825        self.execute_events_email_request(&token.access_token, request)
826            .await
827    }
828
829    pub fn pack_for_component(&self, component_ref: &str) -> Option<Arc<PackRuntime>> {
830        self.packs
831            .iter()
832            .find(|pack| pack.contains_component(component_ref))
833            .cloned()
834    }
835
836    pub fn pack_for_component_with_digest(
837        &self,
838        component_ref: &str,
839    ) -> Option<(Arc<PackRuntime>, Option<String>)> {
840        self.packs
841            .iter()
842            .zip(self.digests.iter())
843            .find(|(pack, _)| pack.contains_component(component_ref))
844            .map(|(pack, digest)| (Arc::clone(pack), digest.clone()))
845    }
846
847    pub fn resolve_component(&self, component_ref: &str) -> Option<ResolvedComponent> {
848        self.pack_for_component_with_digest(component_ref)
849            .map(|(pack, digest)| ResolvedComponent {
850                digest: digest
851                    .or_else(|| self.digest().map(ToString::to_string))
852                    .unwrap_or_else(|| "unknown".to_string()),
853                component_ref: component_ref.to_string(),
854                pack,
855            })
856    }
857}
858
859impl Drop for TenantRuntime {
860    fn drop(&mut self) {
861        for handle in self.timer_handles.lock().drain(..) {
862            handle.abort();
863        }
864    }
865}
866
867#[cfg(test)]
868mod runtime_key_tests {
869    use super::*;
870
871    #[test]
872    fn legacy_keys_match_only_on_tenant() {
873        assert_eq!(RuntimeKey::legacy("acme"), RuntimeKey::legacy("acme"));
874        assert_ne!(RuntimeKey::legacy("acme"), RuntimeKey::legacy("other"));
875    }
876
877    #[test]
878    fn legacy_and_revision_keys_never_collide() {
879        let key = RuntimeKey::revision(
880            "acme",
881            DeploymentId::new(),
882            BundleId::from("bundle-a"),
883            RevisionId::new(),
884        );
885        assert_ne!(RuntimeKey::legacy("acme"), key);
886    }
887
888    #[test]
889    fn revision_keys_distinguish_revision_id() {
890        let deployment = DeploymentId::new();
891        let bundle = BundleId::from("bundle-a");
892        let rev_a = RevisionId::new();
893        let rev_b = RevisionId::new();
894        let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
895        let key_b = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_b);
896        let same = RuntimeKey::revision("acme", deployment, bundle, rev_a);
897        assert_ne!(key_a, key_b);
898        assert_eq!(key_a, same);
899    }
900
901    #[test]
902    fn legacy_reload_preserves_revision_entries() {
903        let revision_key = RuntimeKey::revision(
904            "acme",
905            DeploymentId::new(),
906            BundleId::from("bundle-a"),
907            RevisionId::new(),
908        );
909        let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
910        prev.insert(RuntimeKey::legacy("acme"), 1); // stale legacy, refreshed below
911        prev.insert(RuntimeKey::legacy("retired"), 2); // dropped: not in new index
912        prev.insert(revision_key.clone(), 99); // revision runtime: must survive
913
914        let mut legacy: HashMap<RuntimeKey, u32> = HashMap::new();
915        legacy.insert(RuntimeKey::legacy("acme"), 10);
916        legacy.insert(RuntimeKey::legacy("newcomer"), 20);
917
918        let next = merge_legacy_reload(&prev, legacy);
919
920        assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&10));
921        assert_eq!(next.get(&RuntimeKey::legacy("newcomer")), Some(&20));
922        assert_eq!(next.get(&RuntimeKey::legacy("retired")), None);
923        assert_eq!(next.get(&revision_key), Some(&99));
924    }
925
926    #[test]
927    fn remove_keyed_entry_pops_only_the_targeted_key() {
928        let deployment = DeploymentId::new();
929        let bundle = BundleId::from("bundle-a");
930        let rev_a = RevisionId::new();
931        let rev_b = RevisionId::new();
932        let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
933        let key_b = RuntimeKey::revision("acme", deployment, bundle, rev_b);
934
935        let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
936        prev.insert(RuntimeKey::legacy("acme"), 1);
937        prev.insert(key_a.clone(), 10);
938        prev.insert(key_b.clone(), 20);
939
940        let (next, removed) = remove_keyed_entry(&prev, &key_a).expect("present");
941        assert_eq!(removed, 10);
942        assert_eq!(next.get(&key_a), None);
943        assert_eq!(next.get(&key_b), Some(&20));
944        assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&1));
945    }
946
947    #[test]
948    fn remove_keyed_entry_returns_none_for_missing_key() {
949        let prev: HashMap<RuntimeKey, u32> = HashMap::new();
950        let ghost = RuntimeKey::revision(
951            "acme",
952            DeploymentId::new(),
953            BundleId::from("bundle-a"),
954            RevisionId::new(),
955        );
956        assert!(remove_keyed_entry(&prev, &ghost).is_none());
957    }
958
959    #[test]
960    fn remove_keyed_entry_leaves_other_deployments_alone() {
961        let bundle = BundleId::from("bundle-a");
962        let dep_a = DeploymentId::new();
963        let dep_b = DeploymentId::new();
964        let rev = RevisionId::new();
965        let key_a = RuntimeKey::revision("acme", dep_a, bundle.clone(), rev);
966        let key_b = RuntimeKey::revision("acme", dep_b, bundle, rev);
967
968        let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
969        prev.insert(key_a.clone(), 100);
970        prev.insert(key_b.clone(), 200);
971
972        let (next, removed) = remove_keyed_entry(&prev, &key_a).expect("present");
973        assert_eq!(removed, 100);
974        assert_eq!(next.get(&key_b), Some(&200));
975        assert_eq!(next.len(), 1);
976    }
977
978    #[test]
979    fn map_lookup_separates_legacy_from_revision() {
980        let deployment = DeploymentId::new();
981        let bundle = BundleId::from("bundle-a");
982        let revision = RevisionId::new();
983
984        let mut map: HashMap<RuntimeKey, u32> = HashMap::new();
985        map.insert(RuntimeKey::legacy("acme"), 1);
986        map.insert(
987            RuntimeKey::revision("acme", deployment, bundle.clone(), revision),
988            2,
989        );
990
991        assert_eq!(map.get(&RuntimeKey::legacy("acme")), Some(&1));
992        assert_eq!(
993            map.get(&RuntimeKey::revision("acme", deployment, bundle, revision)),
994            Some(&2)
995        );
996        assert_eq!(map.get(&RuntimeKey::legacy("ghost")), None);
997    }
998
999    #[test]
1000    fn insert_keyed_entry_adds_revision_preserving_legacy_and_siblings() {
1001        let deployment = DeploymentId::new();
1002        let bundle = BundleId::from("bundle-a");
1003        let rev_a = RevisionId::new();
1004        let rev_b = RevisionId::new();
1005        let key_a = RuntimeKey::revision("acme", deployment, bundle.clone(), rev_a);
1006        let key_b = RuntimeKey::revision("acme", deployment, bundle, rev_b);
1007
1008        let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
1009        prev.insert(RuntimeKey::legacy("acme"), 1);
1010        prev.insert(key_a.clone(), 10);
1011
1012        let next = insert_keyed_entry(&prev, key_b.clone(), 20);
1013
1014        // New revision lands; legacy entry and the sibling revision survive.
1015        assert_eq!(next.get(&key_b), Some(&20));
1016        assert_eq!(next.get(&key_a), Some(&10));
1017        assert_eq!(next.get(&RuntimeKey::legacy("acme")), Some(&1));
1018        assert_eq!(next.len(), 3);
1019    }
1020
1021    #[test]
1022    fn insert_keyed_entry_replaces_existing_revision() {
1023        let key = RuntimeKey::revision(
1024            "acme",
1025            DeploymentId::new(),
1026            BundleId::from("bundle-a"),
1027            RevisionId::new(),
1028        );
1029        let mut prev: HashMap<RuntimeKey, u32> = HashMap::new();
1030        prev.insert(key.clone(), 10);
1031
1032        let next = insert_keyed_entry(&prev, key.clone(), 99);
1033
1034        assert_eq!(next.get(&key), Some(&99));
1035        assert_eq!(next.len(), 1);
1036    }
1037}