Skip to main content

greentic_runner_host/
host.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::{Activity, WelcomeFlowHint};
9use crate::boot;
10use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
11use crate::config::{Fast2FlowRoutingConfig, HostConfig};
12use crate::engine::host::{SessionHost, StateHost};
13use crate::engine::runtime::{FlowResumeStore, IngressEnvelope};
14#[cfg(feature = "greentic-x-provider")]
15use crate::greentic_x_provider::RunnerPackFast2FlowRoutingProvider;
16use crate::http::health::HealthState;
17use crate::pack::{IdentifyOutcome, PackRuntime};
18use crate::runner::adapt_timer;
19use crate::runner::engine::FlowEngine;
20use crate::runtime::{ActivePacks, TenantRuntime};
21use crate::secrets::{DynSecretsManager, default_manager};
22use crate::storage::{
23    DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
24    state_host_from,
25};
26use crate::wasi::RunnerWasiPolicy;
27use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
28#[cfg(feature = "greentic-x-provider")]
29use greentic_x_runtime::{
30    Fast2FlowDirective, Fast2FlowMessageEnvelope, Fast2FlowRouteRequest, Fast2FlowRoutingProvider,
31};
32
33#[derive(Clone, Debug)]
34pub struct TelemetryCfg {
35    pub config: greentic_telemetry::TelemetryConfig,
36    pub export: greentic_telemetry::export::ExportConfig,
37}
38
39/// Builder for composing multi-tenant host instances.
40pub struct HostBuilder {
41    configs: HashMap<String, HostConfig>,
42    telemetry: Option<TelemetryCfg>,
43    wasi_policy: RunnerWasiPolicy,
44    secrets: Option<DynSecretsManager>,
45}
46
47impl HostBuilder {
48    pub fn new() -> Self {
49        Self {
50            configs: HashMap::new(),
51            telemetry: None,
52            wasi_policy: RunnerWasiPolicy::default(),
53            secrets: None,
54        }
55    }
56
57    pub fn with_config(mut self, config: HostConfig) -> Self {
58        self.configs.insert(config.tenant.clone(), config);
59        self
60    }
61
62    pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
63        self.telemetry = Some(telemetry);
64        self
65    }
66
67    pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
68        self.wasi_policy = policy;
69        self
70    }
71
72    pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
73        self.secrets = Some(manager);
74        self
75    }
76
77    pub fn build(self) -> Result<RunnerHost> {
78        if self.configs.is_empty() {
79            bail!("at least one tenant configuration is required");
80        }
81        let wasi_policy = Arc::new(self.wasi_policy);
82        let configs = self
83            .configs
84            .into_iter()
85            .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
86            .collect();
87        let session_store = new_session_store();
88        let session_host = session_host_from(Arc::clone(&session_store));
89        let state_store = new_state_store();
90        let state_host = state_host_from(Arc::clone(&state_store));
91        let secrets = match self.secrets {
92            Some(manager) => manager,
93            None => default_manager().context("failed to initialise default secrets backend")?,
94        };
95        Ok(RunnerHost {
96            configs,
97            active: Arc::new(ActivePacks::new()),
98            health: Arc::new(HealthState::new()),
99            session_store,
100            state_store,
101            session_host,
102            state_host,
103            wasi_policy,
104            secrets_manager: secrets,
105            telemetry: self.telemetry,
106        })
107    }
108}
109
110impl Default for HostBuilder {
111    fn default() -> Self {
112        Self::new()
113    }
114}
115
116/// Runtime host that manages tenant-bound packs and flow execution.
117pub struct RunnerHost {
118    configs: HashMap<String, Arc<HostConfig>>,
119    active: Arc<ActivePacks>,
120    health: Arc<HealthState>,
121    session_store: DynSessionStore,
122    state_store: DynStateStore,
123    session_host: Arc<dyn SessionHost>,
124    state_host: Arc<dyn StateHost>,
125    wasi_policy: Arc<RunnerWasiPolicy>,
126    secrets_manager: DynSecretsManager,
127    telemetry: Option<TelemetryCfg>,
128}
129
130/// Handle exposing tenant internals for embedding hosts (e.g. CLI server).
131#[derive(Clone)]
132pub struct TenantHandle {
133    runtime: Arc<TenantRuntime>,
134}
135
136impl RunnerHost {
137    pub async fn start(&self) -> Result<()> {
138        boot::init(&self.health, self.telemetry.as_ref())?;
139        Ok(())
140    }
141
142    pub async fn stop(&self) -> Result<()> {
143        self.active.replace(HashMap::new());
144        Ok(())
145    }
146
147    pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
148        let archive_source = if is_pack_archive(pack_path) {
149            Some(pack_path)
150        } else {
151            None
152        };
153        let runtime = self
154            .prepare_runtime(tenant, pack_path, archive_source)
155            .await
156            .with_context(|| format!("failed to load tenant {tenant}"))?;
157        self.active.insert_pack(tenant, runtime);
158        tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
159        Ok(())
160    }
161
162    pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
163        let runtime = self
164            .active
165            .load_pack(tenant)
166            .with_context(|| format!("tenant {tenant} not loaded"))?;
167        self.dispatch_activity(&runtime, tenant, activity).await
168    }
169
170    /// Execute an activity against a specific deployment/bundle/revision runtime.
171    ///
172    /// Unlike [`handle_activity`](Self::handle_activity), which resolves the
173    /// tenant-only (legacy) runtime, this targets a fully-qualified revision
174    /// entry inserted by [`ActivePacks::insert_revision`]. A tenant can host
175    /// several concurrent revisions under a traffic split, so the legacy
176    /// tenant-only lookup cannot disambiguate them — the ingress revision
177    /// dispatcher selects the revision and calls this.
178    ///
179    /// # Session isolation contract
180    ///
181    /// This method runs the selected revision's runtime against **whatever
182    /// session/state stores that runtime was built with** (at
183    /// [`TenantRuntime::load_revision`] time). It does *not* add a revision
184    /// dimension to the session key: the session/resume/state backend keys on
185    /// `(env, tenant, user)` plus pack/flow, **not** on the revision. If two
186    /// live revisions of the same pack for one tenant share a single session
187    /// backend, a `wait`/resume snapshot created by revision A can be fetched —
188    /// or clobbered — by revision B during a traffic split, retry, or
189    /// rebalance, resuming a snapshot against a different flow graph.
190    ///
191    /// Callers that load more than one revision per tenant onto one host
192    /// (i.e. every traffic-split producer) **MUST give each revision an
193    /// isolated session and state store** (a per-revision store instance, or a
194    /// revision-namespaced backend) when calling `load_revision`. The shared
195    /// `RunnerHost` stores (`session_store()`/`state_store()`) are only safe to
196    /// reuse across revisions when at most one revision is ever live per
197    /// tenant. The greentic-start activation path enforces this.
198    pub async fn handle_activity_for_revision(
199        &self,
200        tenant: &str,
201        deployment_id: DeploymentId,
202        bundle_id: BundleId,
203        revision_id: RevisionId,
204        activity: Activity,
205    ) -> Result<Vec<Activity>> {
206        let runtime = self
207            .active
208            .load_revision(tenant, deployment_id, bundle_id, revision_id)
209            .with_context(|| {
210                format!(
211                    "revision runtime not loaded for tenant {tenant} \
212                     (deployment {deployment_id}, revision {revision_id})"
213                )
214            })?;
215        self.dispatch_activity(&runtime, tenant, activity).await
216    }
217
218    /// Resolve the per-revision tenant runtime, attaching a uniform "not
219    /// loaded" context to the error. The three per-revision identify
220    /// fan-out APIs all need this exact lookup; sharing it keeps the
221    /// error chain identical across them.
222    fn load_revision_runtime(
223        &self,
224        tenant: &str,
225        deployment_id: DeploymentId,
226        bundle_id: BundleId,
227        revision_id: RevisionId,
228    ) -> Result<Arc<crate::runtime::TenantRuntime>> {
229        self.active
230            .load_revision(tenant, deployment_id, bundle_id, revision_id)
231            .with_context(|| {
232                format!(
233                    "revision runtime not loaded for tenant {tenant} \
234                     (deployment {deployment_id}, revision {revision_id})"
235                )
236            })
237    }
238
239    /// Per-revision per-`provider_type` `identify-instance` probe (M1 IID.4).
240    ///
241    /// Given the candidate `provider_types` an env declares messaging
242    /// endpoints for, ask each pack loaded under this revision (main +
243    /// overlays) which `provider_id` the inbound `payload` claims to address.
244    /// The greentic-start resolver pairs the returned `provider_id` with the
245    /// `provider_type` and looks the `MessagingEndpointId` up in the env's
246    /// admit table; that's how a header-less webhook gets auto-routed to the
247    /// right endpoint.
248    ///
249    /// `payload` is forwarded opaque to every probed component. The M1
250    /// IID.4d wrapper convention from `greentic-start` is
251    /// `{headers: [{name,value}], body: <parsed-or-null>}`. See the WIT
252    /// docstring on `greentic:provider-instance-identity@0.1.0/identify-instance`
253    /// for the full contract.
254    ///
255    /// This is the unscoped legacy API; new callers should use
256    /// [`identify_messaging_endpoints_for_revision_scoped`] for per-provider
257    /// header allowlist scoping (Phase D). Merge lattice:
258    /// `Identified > NoMatch > Unsupported` — first pack to `Identified`
259    /// wins and that type drops out of remaining probing.
260    ///
261    /// The per-pack loop is inlined (rather than factored into a shared
262    /// `AsyncFnMut`-based helper) deliberately: routing the loop through an
263    /// `AsyncFnMut` closure destabilises HRTB `Send` inference for
264    /// downstream consumers spawning the returned future (greentic-start's
265    /// hyper `service_fn`). The `Send`-bound test
266    /// [`identify_futures_are_send`] guards against silent regression.
267    ///
268    /// [`identify_messaging_endpoints_for_revision_scoped`]:
269    ///     RunnerHost::identify_messaging_endpoints_for_revision_scoped
270    pub async fn identify_messaging_endpoints_for_revision(
271        &self,
272        tenant: &str,
273        deployment_id: DeploymentId,
274        bundle_id: BundleId,
275        revision_id: RevisionId,
276        provider_types: &[&str],
277        payload: &[u8],
278    ) -> Result<HashMap<String, IdentifyOutcome>> {
279        if provider_types.is_empty() {
280            return Ok(HashMap::new());
281        }
282        let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
283        // Seed every type at Unsupported — the floor of the merge lattice
284        // (see `IdentifyOutcome::merge_in`).
285        let mut merged: HashMap<String, IdentifyOutcome> = provider_types
286            .iter()
287            .map(|ty| ((*ty).to_string(), IdentifyOutcome::Unsupported))
288            .collect();
289        for pack in runtime.all_packs() {
290            // Skip types already at the lattice top — no probe could improve them.
291            let remaining: Vec<&str> = provider_types
292                .iter()
293                .copied()
294                .filter(|ty| !matches!(merged.get(*ty), Some(IdentifyOutcome::Identified(_))))
295                .collect();
296            if remaining.is_empty() {
297                break;
298            }
299            let probe = pack
300                .identify_endpoints_by_provider_type(&remaining, payload)
301                .await?;
302            for (ty, outcome) in probe {
303                if let Some(existing) = merged.get_mut(&ty) {
304                    existing.merge_in(outcome);
305                }
306            }
307        }
308        Ok(merged)
309    }
310
311    /// Per-provider scoped variant of
312    /// [`identify_messaging_endpoints_for_revision`].
313    ///
314    /// The wrapper is built **per-provider** from the component's cached
315    /// `describe-identify-instance` hint (see
316    /// [`PackRuntime::resolve_identify_hint`]): hinted components receive
317    /// ONLY the headers their hint declares; unhinted components receive
318    /// every header the caller passed in (back-compat).
319    ///
320    /// Loop inlined for the same reason as
321    /// [`identify_messaging_endpoints_for_revision`].
322    ///
323    /// [`identify_messaging_endpoints_for_revision`]:
324    ///     RunnerHost::identify_messaging_endpoints_for_revision
325    #[allow(clippy::too_many_arguments)]
326    pub async fn identify_messaging_endpoints_for_revision_scoped(
327        &self,
328        tenant: &str,
329        deployment_id: DeploymentId,
330        bundle_id: BundleId,
331        revision_id: RevisionId,
332        provider_types: &[&str],
333        headers: &[(String, String)],
334        body: &Value,
335    ) -> Result<HashMap<String, IdentifyOutcome>> {
336        if provider_types.is_empty() {
337            return Ok(HashMap::new());
338        }
339        let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
340        let mut merged: HashMap<String, IdentifyOutcome> = provider_types
341            .iter()
342            .map(|ty| ((*ty).to_string(), IdentifyOutcome::Unsupported))
343            .collect();
344        for pack in runtime.all_packs() {
345            let remaining: Vec<&str> = provider_types
346                .iter()
347                .copied()
348                .filter(|ty| !matches!(merged.get(*ty), Some(IdentifyOutcome::Identified(_))))
349                .collect();
350            if remaining.is_empty() {
351                break;
352            }
353            let probe = pack
354                .identify_endpoints_by_provider_type_scoped(&remaining, headers, body)
355                .await?;
356            for (ty, outcome) in probe {
357                if let Some(existing) = merged.get_mut(&ty) {
358                    existing.merge_in(outcome);
359                }
360            }
361        }
362        Ok(merged)
363    }
364
365    /// Per-revision describe-identify-instance hint discovery.
366    ///
367    /// Fans the cached describe probe out across main pack + overlays;
368    /// first non-`None` hint per `provider_type` wins. Lets callers inspect
369    /// the per-provider header allowlist without running the expensive
370    /// identify-instance probe. `None` value means no pack in this revision
371    /// exposes a usable hint for that `provider_type`.
372    ///
373    /// Loop inlined for the same reason as
374    /// [`identify_messaging_endpoints_for_revision`].
375    pub async fn describe_identify_instances_for_revision(
376        &self,
377        tenant: &str,
378        deployment_id: DeploymentId,
379        bundle_id: BundleId,
380        revision_id: RevisionId,
381        provider_types: &[&str],
382    ) -> Result<HashMap<String, Option<crate::identify_hint::IdentifyInstanceHint>>> {
383        if provider_types.is_empty() {
384            return Ok(HashMap::new());
385        }
386        let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
387        let mut merged: HashMap<String, Option<crate::identify_hint::IdentifyInstanceHint>> =
388            provider_types
389                .iter()
390                .map(|ty| ((*ty).to_string(), None))
391                .collect();
392        for pack in runtime.all_packs() {
393            // First non-`None` hint per type wins — anything already populated
394            // is at the lattice top. Mirror the `matches!` shape the sibling
395            // identify fns use so the predicate is consistent across files.
396            let remaining: Vec<&str> = provider_types
397                .iter()
398                .copied()
399                .filter(|ty| !matches!(merged.get(*ty), Some(Some(_))))
400                .collect();
401            if remaining.is_empty() {
402                break;
403            }
404            let probe = pack
405                .describe_identify_hints_by_provider_type(&remaining)
406                .await?;
407            for (ty, hint) in probe {
408                if let Some(slot) = merged.get_mut(&ty)
409                    && slot.is_none()
410                {
411                    *slot = hint;
412                }
413            }
414        }
415        Ok(merged)
416    }
417
418    /// Per-revision provider invocation (Phase D).
419    ///
420    /// Locates the unique pack in `(deployment_id, bundle_id, revision_id)`
421    /// whose `greentic.provider-extension.v1` binds the requested
422    /// `provider_type`, verifies `op` is in that declaration's allowlist,
423    /// then calls `op` on it with `input_json`.
424    ///
425    /// Used by greentic-start's Phase D `ProviderRoute` admission arm to
426    /// run provider webhooks (e.g. `ingest_http`) without round-tripping
427    /// through the flow engine. The provider component returns the parsed
428    /// HTTP-out envelope verbatim; greentic-start dispatches the events
429    /// it carries back through the flow runtime separately.
430    ///
431    /// `correlation_id` is threaded into the `ComponentExecCtx` as both
432    /// `correlation_id` and `idempotency_key` (mirroring the operator-API
433    /// pattern in `build_exec_ctx`). `trace_id` rides through as-is.
434    ///
435    /// Fails closed when:
436    /// - the revision isn't loaded (error chain names deployment + revision)
437    /// - no pack in the revision binds `provider_type`
438    /// - **multiple packs in the revision bind `provider_type`** — the URL
439    ///   → provider routing the caller did at the route table is unable to
440    ///   disambiguate at invoke time. Mirrors the within-pack ambiguity
441    ///   check in [`ProviderRegistry::resolve`], lifted to the revision
442    ///   level so an overlay accidentally redeclaring a main-pack provider
443    ///   can't silently shadow the wrong runtime. A future revision of
444    ///   this API can accept an explicit `provider_id` to disambiguate
445    ///   when D.3 wires identification + invocation together.
446    /// - `op` is not in the resolved provider's declared `ops` allowlist
447    ///   (defense-in-depth: a caller bug or misconfigured `ProviderRoute`
448    ///   cannot smuggle an undeclared op past the schema-core boundary).
449    ///
450    /// Loop inlined for the same reason as
451    /// [`identify_messaging_endpoints_for_revision`].
452    ///
453    /// [`ProviderRegistry::resolve`]: crate::provider::ProviderRegistry::resolve
454    #[allow(clippy::too_many_arguments)]
455    pub async fn invoke_provider_for_revision(
456        &self,
457        tenant: &str,
458        deployment_id: DeploymentId,
459        bundle_id: BundleId,
460        revision_id: RevisionId,
461        provider_type: &str,
462        op: &str,
463        input_json: Vec<u8>,
464        correlation_id: Option<String>,
465        trace_id: Option<String>,
466    ) -> Result<Value> {
467        let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
468        // Walk ALL packs to detect cross-pack ambiguity. First-match-wins
469        // would let main pack silently shadow an overlay that binds the
470        // same provider_type — the identify-side merge lattice can return
471        // outcomes from any pack, so the invoke side must refuse to guess.
472        let mut matched = None;
473        for pack in runtime.all_packs() {
474            let Some(registry) = pack.provider_registry_optional()? else {
475                continue;
476            };
477            let Some((binding, declared_ops)) =
478                registry.try_resolve_with_ops(None, Some(provider_type))?
479            else {
480                continue;
481            };
482            if matched.is_some() {
483                bail!(
484                    "ambiguous provider_type `{provider_type}` in revision \
485                     (deployment {deployment_id}, revision {revision_id}): \
486                     multiple packs bind the same type; pack manifests must \
487                     declare each provider_type at most once across main + overlays"
488                );
489            }
490            matched = Some((Arc::clone(pack), binding, declared_ops));
491        }
492        let Some((pack, binding, declared_ops)) = matched else {
493            bail!(
494                "no pack in revision binds provider_type `{provider_type}` \
495                 (deployment {deployment_id}, revision {revision_id})"
496            );
497        };
498        if !declared_ops.iter().any(|d| d == op) {
499            bail!(
500                "op `{op}` is not declared for provider_type `{provider_type}` \
501                 in revision (deployment {deployment_id}, revision {revision_id}); \
502                 declared ops: {declared_ops:?}"
503            );
504        }
505        let exec_ctx = ComponentExecCtx {
506            tenant: ComponentTenantCtx {
507                tenant: tenant.to_string(),
508                team: None,
509                user: None,
510                trace_id,
511                i18n_id: None,
512                correlation_id: correlation_id.clone(),
513                deadline_unix_ms: None,
514                attempt: 1,
515                idempotency_key: correlation_id,
516            },
517            i18n_id: None,
518            flow_id: format!("provider-webhook/{provider_type}"),
519            node_id: None,
520        };
521        pack.invoke_provider(&binding, exec_ctx, op, input_json)
522            .await
523    }
524
525    /// Shared activity-execution body: resolve the flow, build the canonical
526    /// ingress envelope, run the state machine, and normalize replies. Both the
527    /// legacy and revision entry points funnel through here so flow resolution
528    /// and reply shaping never drift between them.
529    async fn dispatch_activity(
530        &self,
531        runtime: &TenantRuntime,
532        tenant: &str,
533        activity: Activity,
534    ) -> Result<Vec<Activity>> {
535        let activity = apply_fast2flow_routing(runtime, tenant, activity)?;
536
537        // Fast2Flow Respond/Deny returns a pre-built response activity
538        // (kind = Custom { action: "response" }, no flow_id/pack_id).
539        // Short-circuit: return it directly — do NOT resolve a flow or
540        // enter the state machine, otherwise a Deny still executes the
541        // tenant entry flow with the denial payload.
542        if activity.action() == Some("response") && activity.flow_id().is_none() {
543            return Ok(vec![activity]);
544        }
545
546        let (pack_id, flow_id) = resolve_flow_id(runtime, &activity)?;
547        let action = activity.action().map(|value| value.to_string());
548        let session = activity.session_id().map(|value| value.to_string());
549        let provider = activity.provider_id().map(|value| value.to_string());
550        let messaging_endpoint_id = activity
551            .messaging_endpoint_id()
552            .map(|value| value.to_string());
553        let channel = activity.channel().map(|value| value.to_string());
554        let conversation = activity.conversation().map(|value| value.to_string());
555        let user = activity.user().map(|value| value.to_string());
556        let welcome_flow_hint = activity.welcome_flow_hint().cloned();
557        let resolved_flow_type =
558            activity
559                .flow_type()
560                .map(|value| value.to_string())
561                .or_else(|| {
562                    runtime
563                        .engine()
564                        .flow_by_key(&pack_id, &flow_id)
565                        .map(|desc| desc.flow_type.clone())
566                });
567        let payload = activity.into_payload();
568
569        let mut envelope = IngressEnvelope {
570            tenant: tenant.to_string(),
571            env: std::env::var("GREENTIC_ENV").ok(),
572            pack_id: Some(pack_id.clone()),
573            flow_id: flow_id.clone(),
574            flow_type: resolved_flow_type,
575            action,
576            session_hint: session,
577            provider,
578            messaging_endpoint_id,
579            channel,
580            conversation,
581            user,
582            activity_id: None,
583            timestamp: None,
584            payload,
585            metadata: None,
586            reply_scope: None,
587        }
588        .canonicalize();
589
590        let hint_flow_type = welcome_flow_hint.as_ref().and_then(|hint| {
591            runtime
592                .engine()
593                .flow_by_key(&hint.pack_id, &hint.flow_id)
594                .map(|desc| desc.flow_type.clone())
595        });
596        apply_welcome_flow_override(
597            runtime.session_store(),
598            &mut envelope,
599            welcome_flow_hint.as_ref(),
600            hint_flow_type,
601        )?;
602
603        let result = runtime.state_machine().handle(envelope).await?;
604        Ok(normalize_replies(result, tenant))
605    }
606
607    pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
608        self.active
609            .load_pack(tenant)
610            .map(|runtime| TenantHandle { runtime })
611    }
612
613    pub fn active_packs(&self) -> Arc<ActivePacks> {
614        Arc::clone(&self.active)
615    }
616
617    pub fn health_state(&self) -> Arc<HealthState> {
618        Arc::clone(&self.health)
619    }
620
621    pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
622        Arc::clone(&self.wasi_policy)
623    }
624
625    pub fn session_store(&self) -> DynSessionStore {
626        Arc::clone(&self.session_store)
627    }
628
629    pub fn state_store(&self) -> DynStateStore {
630        Arc::clone(&self.state_store)
631    }
632
633    pub fn session_host(&self) -> Arc<dyn SessionHost> {
634        Arc::clone(&self.session_host)
635    }
636
637    pub fn state_host(&self) -> Arc<dyn StateHost> {
638        Arc::clone(&self.state_host)
639    }
640
641    pub fn secrets_manager(&self) -> DynSecretsManager {
642        Arc::clone(&self.secrets_manager)
643    }
644
645    pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
646        self.configs.clone()
647    }
648
649    async fn prepare_runtime(
650        &self,
651        tenant: &str,
652        pack_path: &Path,
653        archive_source: Option<&Path>,
654    ) -> Result<Arc<TenantRuntime>> {
655        let config = self
656            .configs
657            .get(tenant)
658            .cloned()
659            .with_context(|| format!("tenant {tenant} not registered"))?;
660        if config.tenant != tenant {
661            bail!(
662                "tenant mismatch: config declares '{}' but '{tenant}' was requested",
663                config.tenant
664            );
665        }
666        let runtime = TenantRuntime::load(
667            pack_path,
668            Arc::clone(&config),
669            None,
670            archive_source,
671            None,
672            self.wasi_policy(),
673            self.session_host(),
674            self.session_store(),
675            self.state_store(),
676            self.state_host(),
677            self.secrets_manager(),
678        )
679        .await?;
680        let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
681        runtime.register_timers(timers);
682        Ok(runtime)
683    }
684}
685
686impl TenantHandle {
687    pub fn config(&self) -> Arc<HostConfig> {
688        Arc::clone(self.runtime.config())
689    }
690
691    pub fn pack(&self) -> Arc<PackRuntime> {
692        self.runtime.pack()
693    }
694
695    pub fn engine(&self) -> Arc<FlowEngine> {
696        Arc::clone(self.runtime.engine())
697    }
698
699    pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
700        self.runtime.overlays()
701    }
702
703    pub fn overlay_digests(&self) -> Vec<Option<String>> {
704        self.runtime.overlay_digests()
705    }
706}
707
708/// M1.5 welcome-flow override: swap the envelope's `(pack_id, flow_id,
709/// flow_type)` to the producer-supplied [`WelcomeFlowHint`] when ALL of:
710/// the hint is present, the envelope carries a `messaging_endpoint_id`,
711/// the welcome-seen marker is absent for this `(tenant, env, eid, user)`
712/// (set atomically on success), and `FlowResumeStore::fetch` finds no
713/// active wait snapshot. Any missing precondition is a silent no-op.
714///
715/// **The welcome-seen marker is the durable first-contact gate.** Without
716/// it, post-completion / no-wait / TTL-expired turns would re-fire welcome
717/// because the wait-snapshot check is only positive while a flow is paused.
718/// The marker lives in the shared session store under a synthetic scope
719/// (`welcome-seen::ep=<eid>`) distinct from the flow's own conversation, so
720/// flow-completion `clear_wait` does NOT drop it.
721///
722/// The wait-snapshot check is kept as a belt-and-braces safety net: the
723/// marker check + write is two operations against the store with a small
724/// race window (Phase D will add an atomic `register_wait_if_absent`).
725/// The safety net guarantees an in-flight flow is never overridden even if
726/// two concurrent first-ever turns both pass the marker probe.
727///
728/// `session_store` + `hint_flow_type` are passed as primitives so the logic
729/// is unit-testable without a `TenantRuntime`; the caller does the engine
730/// lookup that produces `hint_flow_type`.
731fn apply_welcome_flow_override(
732    session_store: &DynSessionStore,
733    envelope: &mut IngressEnvelope,
734    hint: Option<&WelcomeFlowHint>,
735    hint_flow_type: Option<String>,
736) -> Result<()> {
737    let Some(hint) = hint else {
738        return Ok(());
739    };
740    if envelope.messaging_endpoint_id.is_none() {
741        return Ok(());
742    }
743
744    if !try_mark_welcome_first_contact(session_store, envelope)? {
745        return Ok(());
746    }
747
748    let resume = FlowResumeStore::new(Arc::clone(session_store));
749    let snapshot = resume
750        .fetch(envelope)
751        .map_err(|err| anyhow!("welcome-flow first-contact probe failed: {err}"))?;
752    if snapshot.is_some() {
753        return Ok(());
754    }
755
756    envelope.pack_id = Some(hint.pack_id.clone());
757    envelope.flow_id = hint.flow_id.clone();
758    envelope.flow_type = hint_flow_type;
759    Ok(())
760}
761
762/// Persists a per-`(tenant, env, eid, user)` welcome-seen marker on first
763/// contact and returns `true` only when this turn observed no marker AND
764/// wrote one. Subsequent turns short-circuit to `false`.
765///
766/// Returns `false` (without writing) if the envelope lacks a
767/// `messaging_endpoint_id` — no marker bucket is derivable.
768///
769/// Race window: check + mark is two store calls, not one atomic CAS. Two
770/// concurrent first-ever turns can both observe "no marker" and both fire
771/// welcome once — bounded harm, mitigated by the wait-snapshot safety net
772/// in [`apply_welcome_flow_override`]. A real atomic primitive
773/// (`register_wait_if_absent`) is Phase D.
774fn try_mark_welcome_first_contact(
775    store: &DynSessionStore,
776    envelope: &IngressEnvelope,
777) -> Result<bool> {
778    let Some(scope) = welcome_marker_scope(envelope) else {
779        return Ok(false);
780    };
781    let (ctx, user) = FlowResumeStore::contact_identity(envelope)
782        .map_err(|e| anyhow!("welcome marker identity probe failed: {e}"))?;
783
784    if store
785        .find_wait_by_scope(&ctx, &user, &scope)
786        .map_err(|e| anyhow!("welcome marker probe failed: {e}"))?
787        .is_some()
788    {
789        return Ok(false);
790    }
791
792    let data = marker_session_data(&ctx, &user);
793    let session_key = marker_session_key(&ctx, &user, &scope);
794    store
795        .register_wait(&ctx, &user, &scope, &session_key, data, None)
796        .map_err(|e| anyhow!("welcome marker register failed: {e}"))?;
797    Ok(true)
798}
799
800/// Stable, identity-scoped session key for the welcome marker.
801///
802/// **The session key is the store's per-entry identity.** Both backends
803/// (in-memory + Redis) overwrite or reject an existing entry on a
804/// `register_wait` collision, so a scope-only `SessionKey` would collapse
805/// every `(tenant, env, user)` on the same endpoint onto one row:
806/// in-memory's `ensure_ctx_preserved` would reject User B's first turn
807/// outright; Redis's unconditional `SET` would overwrite User A's entry
808/// and dangle User A's scope index to User B's data.
809///
810/// Fix: SHA-256 over `(env, tenant, team, user, conversation)`. The `v1`
811/// prefix lets us bump the derivation without colliding on old markers.
812fn marker_session_key(
813    ctx: &greentic_types::TenantCtx,
814    user: &greentic_types::UserId,
815    scope: &greentic_types::ReplyScope,
816) -> greentic_session::SessionKey {
817    use sha2::{Digest, Sha256};
818    let team = match ctx.team_id.as_ref().or(ctx.team.as_ref()) {
819        Some(t) => t.as_str(),
820        None => "<none>",
821    };
822    let digest = Sha256::digest(
823        format!(
824            "welcome-marker:v1\0{}\0{}\0team={team}\0{}\0{}",
825            ctx.env.as_str(),
826            ctx.tenant_id.as_str(),
827            user.as_str(),
828            scope.conversation,
829        )
830        .as_bytes(),
831    );
832    greentic_session::SessionKey::new(format!("welcome-marker::{}", hex::encode(digest)))
833}
834
835/// Synthetic [`ReplyScope`] keyed on `messaging_endpoint_id` so the marker
836/// is partitioned per-endpoint AND disjoint from any real conversation
837/// scope. Returns `None` when the envelope lacks an eid — the marker has
838/// no meaningful bucket then, and the caller exits early.
839fn welcome_marker_scope(envelope: &IngressEnvelope) -> Option<greentic_types::ReplyScope> {
840    let eid = envelope.messaging_endpoint_id.as_deref()?;
841    Some(greentic_types::ReplyScope {
842        conversation: format!("welcome-seen::ep={eid}"),
843        thread: None,
844        reply_to: None,
845        correlation: None,
846    })
847}
848
849/// Minimal `SessionData` for the marker. The store accepts any record
850/// aligned with `(ctx, user)`; the marker carries no flow semantics, so
851/// the placeholder `flow_id`/`pack_id` is fixed and validates as an
852/// identifier (ascii + `.`/`-`/`_`).
853fn marker_session_data(
854    ctx: &greentic_types::TenantCtx,
855    user: &greentic_types::UserId,
856) -> greentic_session::SessionData {
857    use std::str::FromStr;
858    use std::sync::LazyLock;
859    static FLOW_ID: LazyLock<greentic_types::FlowId> =
860        LazyLock::new(|| greentic_types::FlowId::from_str("welcome-marker").expect("valid id"));
861    static PACK_ID: LazyLock<greentic_types::PackId> =
862        LazyLock::new(|| greentic_types::PackId::from_str("welcome-marker").expect("valid id"));
863    let cursor = greentic_types::SessionCursor::new("marker".to_string());
864    let ctx = ctx.clone().with_user(Some(user.clone()));
865    greentic_session::SessionData {
866        tenant_ctx: ctx,
867        flow_id: FLOW_ID.clone(),
868        pack_id: Some(PACK_ID.clone()),
869        cursor,
870        context_json: "{}".to_string(),
871    }
872}
873
874fn apply_fast2flow_routing(
875    runtime: &TenantRuntime,
876    tenant: &str,
877    activity: Activity,
878) -> Result<Activity> {
879    let config = &runtime.config().fast2flow;
880    if !config.enabled || activity.flow_id().is_some() {
881        return Ok(activity);
882    }
883    apply_fast2flow_routing_enabled(runtime, tenant, activity, config)
884}
885
886#[cfg(feature = "greentic-x-provider")]
887fn apply_fast2flow_routing_enabled(
888    runtime: &TenantRuntime,
889    tenant: &str,
890    activity: Activity,
891    config: &Fast2FlowRoutingConfig,
892) -> Result<Activity> {
893    let Some(text) = activity.payload().get("text").and_then(Value::as_str) else {
894        return Ok(activity);
895    };
896    if text.trim().is_empty() {
897        return Ok(activity);
898    }
899
900    let mut envelope = Fast2FlowMessageEnvelope::new(text.trim().to_owned());
901    if let Some(channel) = activity.channel() {
902        envelope = envelope.with_channel(channel.to_owned());
903    }
904    if let Some(provider) = activity.provider_id() {
905        envelope = envelope.with_provider(provider.to_owned());
906    }
907    let request = Fast2FlowRouteRequest {
908        scope: config.scope.clone().unwrap_or_else(|| tenant.to_owned()),
909        envelope,
910        session_active: activity.session_id().is_some(),
911        input_locale: "en".to_owned(),
912        time_budget_ms: config.time_budget_ms,
913        registry_path: config.registry_path.clone(),
914        indexes_path: config.indexes_path.clone(),
915        now_unix_ms: chrono::Utc::now().timestamp_millis().max(0) as u64,
916        metadata: Default::default(),
917    };
918    let provider = RunnerPackFast2FlowRoutingProvider::new(runtime.pack())
919        .map_err(|err| anyhow!(err.to_string()))?
920        .with_component_ref(config.component_ref.clone())
921        .with_operation(config.operation.clone())
922        .with_tenant(tenant.to_owned());
923    let route = provider
924        .route_intent(request)
925        .map_err(|err| anyhow!(err.to_string()))?;
926
927    match route.directive {
928        Fast2FlowDirective::Continue => Ok(activity),
929        Fast2FlowDirective::Dispatch {
930            target, entities, ..
931        } => apply_fast2flow_target(activity, &target, entities),
932        Fast2FlowDirective::Respond { message } => Ok(Activity::custom(
933            "response",
934            serde_json::json!({ "messages": [{ "text": message }] }),
935        )
936        .ensure_tenant(tenant)),
937        Fast2FlowDirective::Deny { reason } => Ok(Activity::custom(
938            "response",
939            serde_json::json!({ "messages": [{ "text": reason }] }),
940        )
941        .ensure_tenant(tenant)),
942    }
943}
944
945#[cfg(not(feature = "greentic-x-provider"))]
946fn apply_fast2flow_routing_enabled(
947    _runtime: &TenantRuntime,
948    _tenant: &str,
949    _activity: Activity,
950    _config: &Fast2FlowRoutingConfig,
951) -> Result<Activity> {
952    bail!("fast2flow routing requires the greentic-x-provider feature")
953}
954
955#[cfg(feature = "greentic-x-provider")]
956fn apply_fast2flow_target(
957    activity: Activity,
958    target: &str,
959    entities: Vec<greentic_x_runtime::Fast2FlowRoutingEntity>,
960) -> Result<Activity> {
961    let target = target.trim();
962    if target.is_empty() {
963        bail!("fast2flow dispatch target is empty");
964    }
965    if let Some((pack_id, flow_id)) = target.split_once('/') {
966        if pack_id.trim().is_empty() || flow_id.trim().is_empty() {
967            bail!("fast2flow dispatch target `{target}` must be `pack_id/flow_id` or `flow_id`");
968        }
969        return Ok(attach_fast2flow_entities(
970            activity.with_pack(pack_id.trim()).with_flow(flow_id.trim()),
971            entities,
972        ));
973    }
974    Ok(attach_fast2flow_entities(
975        activity.with_flow(target),
976        entities,
977    ))
978}
979
980#[cfg(feature = "greentic-x-provider")]
981fn attach_fast2flow_entities(
982    activity: Activity,
983    entities: Vec<greentic_x_runtime::Fast2FlowRoutingEntity>,
984) -> Activity {
985    if entities.is_empty() {
986        return activity;
987    }
988    activity.with_payload_field(
989        "fast2flow",
990        serde_json::json!({
991            "entities": entities,
992        }),
993    )
994}
995
996fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<(String, String)> {
997    let engine = runtime.engine();
998    if let Some(flow_id) = activity.flow_id() {
999        if let Some(pack_id) = activity.pack_id() {
1000            if engine.flow_by_key(pack_id, flow_id).is_none() {
1001                bail!("flow {flow_id} not registered for pack {pack_id}");
1002            }
1003            return Ok((pack_id.to_string(), flow_id.to_string()));
1004        }
1005        if let Some(flow) = engine.flow_by_id(flow_id) {
1006            return Ok((flow.pack_id.clone(), flow.id.clone()));
1007        }
1008        bail!("flow {flow_id} is ambiguous; pack_id is required");
1009    }
1010
1011    if let Some(flow_type) = activity.flow_type() {
1012        if let Some(pack_id) = activity.pack_id() {
1013            if let Some(flow) = engine
1014                .flows()
1015                .iter()
1016                .find(|flow| flow.pack_id == pack_id && flow.flow_type == flow_type)
1017            {
1018                return Ok((pack_id.to_string(), flow.id.clone()));
1019            }
1020            bail!("flow type {flow_type} not registered for pack {pack_id}");
1021        }
1022        if let Some(flow) = engine.flow_by_type(flow_type) {
1023            return Ok((flow.pack_id.clone(), flow.id.clone()));
1024        }
1025        bail!("flow type {flow_type} is ambiguous; pack_id is required");
1026    }
1027
1028    let pack = runtime.pack();
1029    let flow_id = pack
1030        .metadata()
1031        .entry_flows
1032        .first()
1033        .cloned()
1034        .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))?;
1035    Ok((pack.metadata().pack_id.clone(), flow_id))
1036}
1037
1038fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
1039    result
1040        .as_array()
1041        .cloned()
1042        .unwrap_or_else(|| vec![result])
1043        .into_iter()
1044        .map(|payload| Activity::from_output(payload, tenant))
1045        .collect()
1046}
1047
1048fn is_pack_archive(path: &Path) -> bool {
1049    path.extension()
1050        .and_then(|ext| ext.to_str())
1051        .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
1052        .unwrap_or(false)
1053}
1054
1055#[cfg(test)]
1056mod welcome_flow_tests {
1057    use super::*;
1058    use crate::engine::runtime::IngressEnvelope;
1059    use crate::runner::engine::{ExecutionState, FlowSnapshot, FlowWait};
1060    use crate::storage::new_session_store;
1061    use greentic_types::ReplyScope;
1062    use serde_json::json;
1063
1064    fn sample_envelope(endpoint_id: Option<&str>) -> IngressEnvelope {
1065        sample_envelope_for_user(endpoint_id, "user-1")
1066    }
1067
1068    fn sample_envelope_for_user(endpoint_id: Option<&str>, user: &str) -> IngressEnvelope {
1069        IngressEnvelope {
1070            tenant: "demo".into(),
1071            env: Some("local".into()),
1072            pack_id: Some("pack.default".into()),
1073            flow_id: "flow.default".into(),
1074            flow_type: Some("messaging".into()),
1075            action: Some("messaging".into()),
1076            session_hint: None,
1077            provider: Some("teams".into()),
1078            messaging_endpoint_id: endpoint_id.map(String::from),
1079            channel: Some("chan".into()),
1080            conversation: Some(format!("conv-{user}")),
1081            user: Some(user.to_string()),
1082            activity_id: None,
1083            timestamp: None,
1084            payload: json!({}),
1085            metadata: None,
1086            reply_scope: Some(ReplyScope {
1087                conversation: format!("conv-{user}"),
1088                thread: None,
1089                reply_to: None,
1090                correlation: None,
1091            }),
1092        }
1093        .canonicalize()
1094    }
1095
1096    fn hint() -> WelcomeFlowHint {
1097        WelcomeFlowHint {
1098            pack_id: "pack.welcome".into(),
1099            flow_id: "flow.welcome".into(),
1100        }
1101    }
1102
1103    fn seed_resume(store: &DynSessionStore, envelope: &IngressEnvelope) {
1104        // Plant a snapshot in the exact bucket `fetch` would query so the
1105        // next call resolves to a resume — proves the override skips when a
1106        // session already exists.
1107        let resume = FlowResumeStore::new(Arc::clone(store));
1108        let state: ExecutionState = serde_json::from_value(json!({
1109            "input": { "text": "hi" },
1110            "nodes": {},
1111            "egress": []
1112        }))
1113        .expect("state");
1114        let wait = FlowWait {
1115            reason: Some("await-user".into()),
1116            snapshot: FlowSnapshot {
1117                pack_id: envelope.pack_id.clone().expect("pack_id"),
1118                flow_id: envelope.flow_id.clone(),
1119                next_flow: None,
1120                next_node: "node-2".into(),
1121                state,
1122            },
1123        };
1124        resume.save(envelope, &wait).expect("seed save");
1125    }
1126
1127    #[test]
1128    fn override_is_no_op_when_hint_absent() {
1129        // Pre-M1.5 producers don't attach a hint — flow resolution must
1130        // stay exactly the same.
1131        let store = new_session_store();
1132        let mut envelope = sample_envelope(Some("teams-legal"));
1133        let before = envelope.clone();
1134        apply_welcome_flow_override(&store, &mut envelope, None, None).expect("ok");
1135        assert_eq!(envelope.pack_id, before.pack_id);
1136        assert_eq!(envelope.flow_id, before.flow_id);
1137        assert_eq!(envelope.flow_type, before.flow_type);
1138    }
1139
1140    #[test]
1141    fn override_is_no_op_when_endpoint_id_absent() {
1142        // Non-messaging traffic carries no endpoint id and must never hit
1143        // the welcome-flow path even if the hint is somehow set.
1144        let store = new_session_store();
1145        let mut envelope = sample_envelope(None);
1146        let before = envelope.clone();
1147        apply_welcome_flow_override(&store, &mut envelope, Some(&hint()), Some("welcome".into()))
1148            .expect("ok");
1149        assert_eq!(envelope.pack_id, before.pack_id);
1150        assert_eq!(envelope.flow_id, before.flow_id);
1151    }
1152
1153    #[test]
1154    fn override_swaps_pack_flow_and_threads_flow_type_through() {
1155        // Both axes covered: when the caller pre-resolved the welcome
1156        // flow's type, it lands on the envelope; when the resolver
1157        // returned None (unknown flow in engine), it lands as None and
1158        // downstream resolution defaults take over.
1159        for hint_flow_type in [Some("welcome".to_string()), None] {
1160            let store = new_session_store();
1161            let mut envelope = sample_envelope(Some("teams-legal"));
1162            apply_welcome_flow_override(
1163                &store,
1164                &mut envelope,
1165                Some(&hint()),
1166                hint_flow_type.clone(),
1167            )
1168            .expect("ok");
1169            assert_eq!(envelope.pack_id.as_deref(), Some("pack.welcome"));
1170            assert_eq!(envelope.flow_id, "flow.welcome");
1171            assert_eq!(envelope.flow_type, hint_flow_type);
1172        }
1173    }
1174
1175    #[test]
1176    fn override_is_no_op_on_repeat_turn_with_existing_session() {
1177        // Resume path: an already-active session in the same bucket means
1178        // this isn't first contact. The user must continue on the resumed
1179        // flow, NOT be redirected to the welcome flow.
1180        let store = new_session_store();
1181        let envelope_template = sample_envelope(Some("teams-legal"));
1182        seed_resume(&store, &envelope_template);
1183
1184        let mut envelope = envelope_template.clone();
1185        apply_welcome_flow_override(&store, &mut envelope, Some(&hint()), Some("welcome".into()))
1186            .expect("ok");
1187        assert_eq!(envelope.pack_id, envelope_template.pack_id);
1188        assert_eq!(envelope.flow_id, envelope_template.flow_id);
1189        assert_eq!(envelope.flow_type, envelope_template.flow_type);
1190    }
1191
1192    #[test]
1193    fn override_is_no_op_post_completion_when_marker_present() {
1194        // POST-COMPLETION REGRESSION GUARD (Codex #201): the welcome-seen
1195        // marker is durable and survives flow completion. After welcome
1196        // fires once + the flow finishes (wait cleared), the next turn
1197        // must NOT re-fire welcome — the marker is the gate, not the
1198        // active-wait snapshot.
1199        let store = new_session_store();
1200        let mut first = sample_envelope(Some("teams-legal"));
1201        apply_welcome_flow_override(&store, &mut first, Some(&hint()), Some("welcome".into()))
1202            .expect("first turn ok");
1203        assert_eq!(
1204            first.pack_id.as_deref(),
1205            Some("pack.welcome"),
1206            "first turn fires welcome"
1207        );
1208
1209        // Simulate the welcome flow completing: the engine clears the
1210        // wait at end-of-flow (mirror `FlowResumeStore::clear`). The
1211        // marker must NOT be in the wait scope, so this clear has no
1212        // effect on the marker.
1213        let resume = FlowResumeStore::new(Arc::clone(&store));
1214        resume.clear(&first).expect("clear post-completion wait");
1215
1216        // Second turn arrives — producer still attaches the hint (it
1217        // does not know flow-completion happened). The marker keeps the
1218        // override off.
1219        let mut second = sample_envelope(Some("teams-legal"));
1220        apply_welcome_flow_override(&store, &mut second, Some(&hint()), Some("welcome".into()))
1221            .expect("second turn ok");
1222        assert_eq!(
1223            second.pack_id.as_deref(),
1224            Some("pack.default"),
1225            "second turn must NOT re-fire welcome"
1226        );
1227        assert_eq!(second.flow_id, "flow.default");
1228    }
1229
1230    #[test]
1231    fn override_is_no_op_on_second_turn_after_marker_set() {
1232        // No-wait variant of the post-completion test: a welcome flow
1233        // without `session.wait` leaves no snapshot AT ALL. Marker is the
1234        // only thing standing between turn 2 and a welcome re-fire.
1235        let store = new_session_store();
1236        let mut first = sample_envelope(Some("teams-legal"));
1237        apply_welcome_flow_override(&store, &mut first, Some(&hint()), Some("welcome".into()))
1238            .expect("first turn ok");
1239        assert_eq!(first.pack_id.as_deref(), Some("pack.welcome"));
1240
1241        let mut second = sample_envelope(Some("teams-legal"));
1242        apply_welcome_flow_override(&store, &mut second, Some(&hint()), Some("welcome".into()))
1243            .expect("second turn ok");
1244        assert_eq!(
1245            second.pack_id.as_deref(),
1246            Some("pack.default"),
1247            "second turn must NOT re-fire welcome"
1248        );
1249    }
1250
1251    #[test]
1252    fn override_partitions_marker_per_endpoint() {
1253        // The marker is keyed by `(tenant, env, eid, user)` — a user
1254        // marked seen on `teams-legal` is still first contact on
1255        // `teams-accounting`. Welcome must fire independently on each
1256        // endpoint.
1257        let store = new_session_store();
1258        let mut legal = sample_envelope(Some("teams-legal"));
1259        apply_welcome_flow_override(&store, &mut legal, Some(&hint()), Some("welcome".into()))
1260            .expect("legal first turn ok");
1261        assert_eq!(legal.pack_id.as_deref(), Some("pack.welcome"));
1262
1263        let mut accounting = sample_envelope(Some("teams-accounting"));
1264        apply_welcome_flow_override(
1265            &store,
1266            &mut accounting,
1267            Some(&hint()),
1268            Some("welcome".into()),
1269        )
1270        .expect("accounting first turn ok");
1271        assert_eq!(
1272            accounting.pack_id.as_deref(),
1273            Some("pack.welcome"),
1274            "different endpoint = independent first contact"
1275        );
1276    }
1277
1278    #[test]
1279    fn override_partitions_marker_per_user_on_same_endpoint() {
1280        // Codex adversarial review of #382 (high): a session-key derived
1281        // only from the eid collapses every user on that endpoint onto one
1282        // store row — in-memory rejects User B's first turn with a hard
1283        // error, Redis silently overwrites and lets User A re-welcome on
1284        // their next turn.
1285        //
1286        // Regression guard: two users on the same eid each get welcome on
1287        // their own first turn; the second user's first contact does NOT
1288        // fail; both subsequent turns are no-ops.
1289        let store = new_session_store();
1290
1291        // User A's first turn
1292        let mut a1 = sample_envelope_for_user(Some("teams-legal"), "user-a");
1293        apply_welcome_flow_override(&store, &mut a1, Some(&hint()), Some("welcome".into()))
1294            .expect("user-a first ok");
1295        assert_eq!(a1.pack_id.as_deref(), Some("pack.welcome"));
1296
1297        // User B's first turn — must independently fire welcome, NOT error.
1298        let mut b1 = sample_envelope_for_user(Some("teams-legal"), "user-b");
1299        apply_welcome_flow_override(&store, &mut b1, Some(&hint()), Some("welcome".into()))
1300            .expect("user-b first must not collide with user-a marker");
1301        assert_eq!(
1302            b1.pack_id.as_deref(),
1303            Some("pack.welcome"),
1304            "user-b is independent first contact"
1305        );
1306
1307        // User A's second turn — marker still intact, no re-fire.
1308        let mut a2 = sample_envelope_for_user(Some("teams-legal"), "user-a");
1309        apply_welcome_flow_override(&store, &mut a2, Some(&hint()), Some("welcome".into()))
1310            .expect("user-a second ok");
1311        assert_eq!(
1312            a2.pack_id.as_deref(),
1313            Some("pack.default"),
1314            "user-a must not be re-welcomed after user-b joined"
1315        );
1316
1317        // User B's second turn — same.
1318        let mut b2 = sample_envelope_for_user(Some("teams-legal"), "user-b");
1319        apply_welcome_flow_override(&store, &mut b2, Some(&hint()), Some("welcome".into()))
1320            .expect("user-b second ok");
1321        assert_eq!(b2.pack_id.as_deref(), Some("pack.default"));
1322    }
1323
1324    #[test]
1325    fn marker_is_not_written_when_hint_absent() {
1326        // Marker writes are gated on the hint+eid preconditions — a
1327        // pre-M1.5 turn (no hint) MUST NOT leak a marker, otherwise a
1328        // producer that later enables welcome would treat that user as
1329        // already-contacted and never fire the override.
1330        //
1331        // Mid-conversation users (active-wait safety net path) DO get a
1332        // marker — that's deliberate: they don't get retroactive welcomes.
1333        // This test only guards the no-hint gate.
1334        let store = new_session_store();
1335        let mut envelope = sample_envelope(Some("teams-legal"));
1336        apply_welcome_flow_override(&store, &mut envelope, None, None).expect("ok");
1337
1338        let mut next = sample_envelope(Some("teams-legal"));
1339        apply_welcome_flow_override(&store, &mut next, Some(&hint()), Some("welcome".into()))
1340            .expect("ok");
1341        assert_eq!(
1342            next.pack_id.as_deref(),
1343            Some("pack.welcome"),
1344            "no marker leaked from hint-absent path"
1345        );
1346    }
1347}
1348
1349#[cfg(test)]
1350mod identify_endpoints_tests {
1351    use super::*;
1352
1353    fn dummy_runner_host() -> RunnerHost {
1354        let session_store = new_session_store();
1355        let state_store = new_state_store();
1356        RunnerHost {
1357            configs: HashMap::new(),
1358            active: Arc::new(ActivePacks::new()),
1359            health: Arc::new(HealthState::new()),
1360            session_host: session_host_from(session_store.clone()),
1361            state_host: state_host_from(state_store.clone()),
1362            session_store,
1363            state_store,
1364            wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1365            secrets_manager: default_manager().expect("default secrets manager"),
1366            telemetry: None,
1367        }
1368    }
1369
1370    #[tokio::test]
1371    async fn empty_provider_types_returns_empty_map_without_loading_revision() {
1372        // No revision is loaded; this proves the fast-path short-circuits
1373        // before `load_revision` so the caller can ask "any types?" cheaply
1374        // when an env declares zero messaging endpoints.
1375        let host = dummy_runner_host();
1376        let map = host
1377            .identify_messaging_endpoints_for_revision(
1378                "demo",
1379                DeploymentId::new(),
1380                BundleId::new("anything"),
1381                RevisionId::new(),
1382                &[],
1383                b"{}",
1384            )
1385            .await
1386            .expect("empty types is the cheap fast path");
1387        assert!(map.is_empty());
1388    }
1389
1390    #[tokio::test]
1391    async fn missing_revision_surfaces_clear_error() {
1392        // Non-empty types but the revision was never loaded — the error
1393        // chain must name the revision so operators can correlate it with
1394        // their dispatch log.
1395        let host = dummy_runner_host();
1396        let deployment = DeploymentId::new();
1397        let revision = RevisionId::new();
1398        let err = host
1399            .identify_messaging_endpoints_for_revision(
1400                "demo",
1401                deployment,
1402                BundleId::new("missing"),
1403                revision,
1404                &["teams"],
1405                b"{}",
1406            )
1407            .await
1408            .expect_err("missing revision must fail closed");
1409        let msg = format!("{err:#}");
1410        assert!(
1411            msg.contains("revision runtime not loaded"),
1412            "error chain should name the failure mode, got: {msg}"
1413        );
1414        assert!(
1415            msg.contains(&deployment.to_string()),
1416            "error chain should name the deployment id, got: {msg}"
1417        );
1418        assert!(
1419            msg.contains(&revision.to_string()),
1420            "error chain should name the revision id, got: {msg}"
1421        );
1422    }
1423
1424    #[tokio::test]
1425    async fn scoped_empty_provider_types_returns_empty_map() {
1426        let host = dummy_runner_host();
1427        let map = host
1428            .identify_messaging_endpoints_for_revision_scoped(
1429                "demo",
1430                DeploymentId::new(),
1431                BundleId::new("anything"),
1432                RevisionId::new(),
1433                &[],
1434                &[],
1435                &Value::Null,
1436            )
1437            .await
1438            .expect("empty types is the cheap fast path");
1439        assert!(map.is_empty());
1440    }
1441
1442    #[tokio::test]
1443    async fn scoped_missing_revision_surfaces_clear_error() {
1444        let host = dummy_runner_host();
1445        let deployment = DeploymentId::new();
1446        let revision = RevisionId::new();
1447        let err = host
1448            .identify_messaging_endpoints_for_revision_scoped(
1449                "demo",
1450                deployment,
1451                BundleId::new("missing"),
1452                revision,
1453                &["teams"],
1454                &[],
1455                &Value::Null,
1456            )
1457            .await
1458            .expect_err("missing revision must fail closed");
1459        let msg = format!("{err:#}");
1460        assert!(
1461            msg.contains("revision runtime not loaded"),
1462            "error chain should name the failure mode, got: {msg}"
1463        );
1464        assert!(
1465            msg.contains(&deployment.to_string()),
1466            "error chain should name the deployment id, got: {msg}"
1467        );
1468        assert!(
1469            msg.contains(&revision.to_string()),
1470            "error chain should name the revision id, got: {msg}"
1471        );
1472    }
1473
1474    /// Regression guard: the futures returned by the per-revision identify
1475    /// APIs MUST be `Send`. Downstream consumers (greentic-start's hyper
1476    /// `service_fn`) spawn them through tokio; a non-`Send` future at this
1477    /// boundary breaks every spawned-service consumer with a confusing
1478    /// "implementation of Send is not general enough" diagnostic that
1479    /// surfaces far from the offending change.
1480    ///
1481    /// Concrete history: PR #394 routed all three identify entry points
1482    /// through a shared `fan_out_across_packs` helper bounded on
1483    /// `AsyncFnMut`. The HRTB inference for the resulting future
1484    /// destabilised `Send` proof for all three APIs, even the legacy
1485    /// `identify_messaging_endpoints_for_revision` that itself hadn't
1486    /// changed shape — greentic-start failed to compile on the next
1487    /// dev-publish bump. This test would have caught it.
1488    #[test]
1489    fn identify_futures_are_send() {
1490        fn assert_send<F: Send>(_: F) {}
1491        let host = dummy_runner_host();
1492        // Each call has to be wrapped in its own scope so the borrows
1493        // don't outlive the host's reference per call — the point is to
1494        // assert each returned future type is Send-clean in isolation.
1495        assert_send(host.identify_messaging_endpoints_for_revision(
1496            "demo",
1497            DeploymentId::new(),
1498            BundleId::new("anything"),
1499            RevisionId::new(),
1500            &["teams"],
1501            b"{}",
1502        ));
1503        assert_send(host.identify_messaging_endpoints_for_revision_scoped(
1504            "demo",
1505            DeploymentId::new(),
1506            BundleId::new("anything"),
1507            RevisionId::new(),
1508            &["teams"],
1509            &[("x-telegram-bot-api-secret-token".into(), "tok".into())],
1510            &Value::Null,
1511        ));
1512        assert_send(host.describe_identify_instances_for_revision(
1513            "demo",
1514            DeploymentId::new(),
1515            BundleId::new("anything"),
1516            RevisionId::new(),
1517            &["teams"],
1518        ));
1519        assert_send(host.invoke_provider_for_revision(
1520            "demo",
1521            DeploymentId::new(),
1522            BundleId::new("anything"),
1523            RevisionId::new(),
1524            "messaging.telegram.bot",
1525            "ingest_http",
1526            b"{}".to_vec(),
1527            None,
1528            None,
1529        ));
1530    }
1531
1532    #[tokio::test]
1533    async fn invoke_provider_missing_revision_surfaces_clear_error() {
1534        // Non-empty types but the revision was never loaded — the error
1535        // chain must name the revision so operators can correlate it with
1536        // their dispatch log. Mirrors the identify-side sibling so a future
1537        // refactor can't quietly drop the lookup-context wrapper.
1538        let host = dummy_runner_host();
1539        let deployment = DeploymentId::new();
1540        let revision = RevisionId::new();
1541        let err = host
1542            .invoke_provider_for_revision(
1543                "demo",
1544                deployment,
1545                BundleId::new("missing"),
1546                revision,
1547                "messaging.telegram.bot",
1548                "ingest_http",
1549                b"{}".to_vec(),
1550                None,
1551                None,
1552            )
1553            .await
1554            .expect_err("missing revision must fail closed");
1555        let msg = format!("{err:#}");
1556        assert!(
1557            msg.contains("revision runtime not loaded"),
1558            "error chain should name the failure mode, got: {msg}"
1559        );
1560        assert!(
1561            msg.contains(&deployment.to_string()),
1562            "error chain should name the deployment id, got: {msg}"
1563        );
1564        assert!(
1565            msg.contains(&revision.to_string()),
1566            "error chain should name the revision id, got: {msg}"
1567        );
1568    }
1569}
1570
1571#[cfg(all(test, feature = "greentic-x-provider"))]
1572mod fast2flow_tests {
1573    use greentic_x_runtime::Fast2FlowRoutingEntity;
1574
1575    use super::*;
1576
1577    #[test]
1578    fn dispatch_target_attaches_prefill_entities_to_payload() {
1579        let activity = Activity::text("show traffic tomorrow");
1580        let routed = apply_fast2flow_target(
1581            activity,
1582            "telco-x/prefix-traffic",
1583            vec![Fast2FlowRoutingEntity::new("date", "20260611").with_format("iso", "2026-06-11")],
1584        )
1585        .expect("target should route");
1586
1587        assert_eq!(routed.pack_id(), Some("telco-x"));
1588        assert_eq!(routed.flow_id(), Some("prefix-traffic"));
1589        assert_eq!(
1590            routed.payload()["fast2flow"]["entities"][0]["normalized"],
1591            "20260611"
1592        );
1593        assert_eq!(
1594            routed.payload()["fast2flow"]["entities"][0]["formats"]["iso"],
1595            "2026-06-11"
1596        );
1597    }
1598}