1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow, bail};
6use serde_json::Value;
7
8use crate::activity::{Activity, WelcomeFlowHint};
9use crate::boot;
10use crate::component_api::node::{ExecCtx as ComponentExecCtx, TenantCtx as ComponentTenantCtx};
11use crate::config::{Fast2FlowRoutingConfig, HostConfig};
12use crate::engine::host::{SessionHost, StateHost};
13use crate::engine::runtime::{FlowResumeStore, IngressEnvelope};
14#[cfg(feature = "greentic-x-provider")]
15use crate::greentic_x_provider::RunnerPackFast2FlowRoutingProvider;
16use crate::http::health::HealthState;
17use crate::pack::{IdentifyOutcome, PackRuntime};
18use crate::runner::adapt_timer;
19use crate::runner::engine::FlowEngine;
20use crate::runtime::{ActivePacks, TenantRuntime};
21use crate::secrets::{DynSecretsManager, default_manager};
22use crate::storage::{
23 DynSessionStore, DynStateStore, new_session_store, new_state_store, session_host_from,
24 state_host_from,
25};
26use crate::wasi::RunnerWasiPolicy;
27use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
28#[cfg(feature = "greentic-x-provider")]
29use greentic_x_runtime::{
30 Fast2FlowDirective, Fast2FlowMessageEnvelope, Fast2FlowRouteRequest, Fast2FlowRoutingProvider,
31};
32
33#[derive(Clone, Debug)]
34pub struct TelemetryCfg {
35 pub config: greentic_telemetry::TelemetryConfig,
36 pub export: greentic_telemetry::export::ExportConfig,
37}
38
39pub struct HostBuilder {
41 configs: HashMap<String, HostConfig>,
42 telemetry: Option<TelemetryCfg>,
43 wasi_policy: RunnerWasiPolicy,
44 secrets: Option<DynSecretsManager>,
45}
46
47impl HostBuilder {
48 pub fn new() -> Self {
49 Self {
50 configs: HashMap::new(),
51 telemetry: None,
52 wasi_policy: RunnerWasiPolicy::default(),
53 secrets: None,
54 }
55 }
56
57 pub fn with_config(mut self, config: HostConfig) -> Self {
58 self.configs.insert(config.tenant.clone(), config);
59 self
60 }
61
62 pub fn with_telemetry(mut self, telemetry: TelemetryCfg) -> Self {
63 self.telemetry = Some(telemetry);
64 self
65 }
66
67 pub fn with_wasi_policy(mut self, policy: RunnerWasiPolicy) -> Self {
68 self.wasi_policy = policy;
69 self
70 }
71
72 pub fn with_secrets_manager(mut self, manager: DynSecretsManager) -> Self {
73 self.secrets = Some(manager);
74 self
75 }
76
77 pub fn build(self) -> Result<RunnerHost> {
78 if self.configs.is_empty() {
79 bail!("at least one tenant configuration is required");
80 }
81 let wasi_policy = Arc::new(self.wasi_policy);
82 let configs = self
83 .configs
84 .into_iter()
85 .map(|(tenant, cfg)| (tenant, Arc::new(cfg)))
86 .collect();
87 let session_store = new_session_store();
88 let session_host = session_host_from(Arc::clone(&session_store));
89 let state_store = new_state_store();
90 let state_host = state_host_from(Arc::clone(&state_store));
91 let secrets = match self.secrets {
92 Some(manager) => manager,
93 None => default_manager().context("failed to initialise default secrets backend")?,
94 };
95 Ok(RunnerHost {
96 configs,
97 active: Arc::new(ActivePacks::new()),
98 health: Arc::new(HealthState::new()),
99 session_store,
100 state_store,
101 session_host,
102 state_host,
103 wasi_policy,
104 secrets_manager: secrets,
105 telemetry: self.telemetry,
106 })
107 }
108}
109
110impl Default for HostBuilder {
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116pub struct RunnerHost {
118 configs: HashMap<String, Arc<HostConfig>>,
119 active: Arc<ActivePacks>,
120 health: Arc<HealthState>,
121 session_store: DynSessionStore,
122 state_store: DynStateStore,
123 session_host: Arc<dyn SessionHost>,
124 state_host: Arc<dyn StateHost>,
125 wasi_policy: Arc<RunnerWasiPolicy>,
126 secrets_manager: DynSecretsManager,
127 telemetry: Option<TelemetryCfg>,
128}
129
130#[derive(Clone)]
132pub struct TenantHandle {
133 runtime: Arc<TenantRuntime>,
134}
135
136impl RunnerHost {
137 pub async fn start(&self) -> Result<()> {
138 boot::init(&self.health, self.telemetry.as_ref())?;
139 Ok(())
140 }
141
142 pub async fn stop(&self) -> Result<()> {
143 self.active.replace(HashMap::new());
144 Ok(())
145 }
146
147 pub async fn load_pack(&self, tenant: &str, pack_path: &Path) -> Result<()> {
148 let archive_source = if is_pack_archive(pack_path) {
149 Some(pack_path)
150 } else {
151 None
152 };
153 let runtime = self
154 .prepare_runtime(tenant, pack_path, archive_source)
155 .await
156 .with_context(|| format!("failed to load tenant {tenant}"))?;
157 self.active.insert_pack(tenant, runtime);
158 tracing::info!(tenant, pack = %pack_path.display(), "pack loaded");
159 Ok(())
160 }
161
162 pub async fn handle_activity(&self, tenant: &str, activity: Activity) -> Result<Vec<Activity>> {
163 let runtime = self
164 .active
165 .load_pack(tenant)
166 .with_context(|| format!("tenant {tenant} not loaded"))?;
167 self.dispatch_activity(&runtime, tenant, activity).await
168 }
169
170 pub async fn handle_activity_for_revision(
199 &self,
200 tenant: &str,
201 deployment_id: DeploymentId,
202 bundle_id: BundleId,
203 revision_id: RevisionId,
204 activity: Activity,
205 ) -> Result<Vec<Activity>> {
206 let runtime = self
207 .active
208 .load_revision(tenant, deployment_id, bundle_id, revision_id)
209 .with_context(|| {
210 format!(
211 "revision runtime not loaded for tenant {tenant} \
212 (deployment {deployment_id}, revision {revision_id})"
213 )
214 })?;
215 self.dispatch_activity(&runtime, tenant, activity).await
216 }
217
218 fn load_revision_runtime(
223 &self,
224 tenant: &str,
225 deployment_id: DeploymentId,
226 bundle_id: BundleId,
227 revision_id: RevisionId,
228 ) -> Result<Arc<crate::runtime::TenantRuntime>> {
229 self.active
230 .load_revision(tenant, deployment_id, bundle_id, revision_id)
231 .with_context(|| {
232 format!(
233 "revision runtime not loaded for tenant {tenant} \
234 (deployment {deployment_id}, revision {revision_id})"
235 )
236 })
237 }
238
239 pub async fn identify_messaging_endpoints_for_revision(
271 &self,
272 tenant: &str,
273 deployment_id: DeploymentId,
274 bundle_id: BundleId,
275 revision_id: RevisionId,
276 provider_types: &[&str],
277 payload: &[u8],
278 ) -> Result<HashMap<String, IdentifyOutcome>> {
279 if provider_types.is_empty() {
280 return Ok(HashMap::new());
281 }
282 let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
283 let mut merged: HashMap<String, IdentifyOutcome> = provider_types
286 .iter()
287 .map(|ty| ((*ty).to_string(), IdentifyOutcome::Unsupported))
288 .collect();
289 for pack in runtime.all_packs() {
290 let remaining: Vec<&str> = provider_types
292 .iter()
293 .copied()
294 .filter(|ty| !matches!(merged.get(*ty), Some(IdentifyOutcome::Identified(_))))
295 .collect();
296 if remaining.is_empty() {
297 break;
298 }
299 let probe = pack
300 .identify_endpoints_by_provider_type(&remaining, payload)
301 .await?;
302 for (ty, outcome) in probe {
303 if let Some(existing) = merged.get_mut(&ty) {
304 existing.merge_in(outcome);
305 }
306 }
307 }
308 Ok(merged)
309 }
310
311 #[allow(clippy::too_many_arguments)]
326 pub async fn identify_messaging_endpoints_for_revision_scoped(
327 &self,
328 tenant: &str,
329 deployment_id: DeploymentId,
330 bundle_id: BundleId,
331 revision_id: RevisionId,
332 provider_types: &[&str],
333 headers: &[(String, String)],
334 body: &Value,
335 ) -> Result<HashMap<String, IdentifyOutcome>> {
336 if provider_types.is_empty() {
337 return Ok(HashMap::new());
338 }
339 let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
340 let mut merged: HashMap<String, IdentifyOutcome> = provider_types
341 .iter()
342 .map(|ty| ((*ty).to_string(), IdentifyOutcome::Unsupported))
343 .collect();
344 for pack in runtime.all_packs() {
345 let remaining: Vec<&str> = provider_types
346 .iter()
347 .copied()
348 .filter(|ty| !matches!(merged.get(*ty), Some(IdentifyOutcome::Identified(_))))
349 .collect();
350 if remaining.is_empty() {
351 break;
352 }
353 let probe = pack
354 .identify_endpoints_by_provider_type_scoped(&remaining, headers, body)
355 .await?;
356 for (ty, outcome) in probe {
357 if let Some(existing) = merged.get_mut(&ty) {
358 existing.merge_in(outcome);
359 }
360 }
361 }
362 Ok(merged)
363 }
364
365 pub async fn describe_identify_instances_for_revision(
376 &self,
377 tenant: &str,
378 deployment_id: DeploymentId,
379 bundle_id: BundleId,
380 revision_id: RevisionId,
381 provider_types: &[&str],
382 ) -> Result<HashMap<String, Option<crate::identify_hint::IdentifyInstanceHint>>> {
383 if provider_types.is_empty() {
384 return Ok(HashMap::new());
385 }
386 let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
387 let mut merged: HashMap<String, Option<crate::identify_hint::IdentifyInstanceHint>> =
388 provider_types
389 .iter()
390 .map(|ty| ((*ty).to_string(), None))
391 .collect();
392 for pack in runtime.all_packs() {
393 let remaining: Vec<&str> = provider_types
397 .iter()
398 .copied()
399 .filter(|ty| !matches!(merged.get(*ty), Some(Some(_))))
400 .collect();
401 if remaining.is_empty() {
402 break;
403 }
404 let probe = pack
405 .describe_identify_hints_by_provider_type(&remaining)
406 .await?;
407 for (ty, hint) in probe {
408 if let Some(slot) = merged.get_mut(&ty)
409 && slot.is_none()
410 {
411 *slot = hint;
412 }
413 }
414 }
415 Ok(merged)
416 }
417
418 #[allow(clippy::too_many_arguments)]
455 pub async fn invoke_provider_for_revision(
456 &self,
457 tenant: &str,
458 deployment_id: DeploymentId,
459 bundle_id: BundleId,
460 revision_id: RevisionId,
461 provider_type: &str,
462 op: &str,
463 input_json: Vec<u8>,
464 correlation_id: Option<String>,
465 trace_id: Option<String>,
466 ) -> Result<Value> {
467 let runtime = self.load_revision_runtime(tenant, deployment_id, bundle_id, revision_id)?;
468 let mut matched = None;
473 for pack in runtime.all_packs() {
474 let Some(registry) = pack.provider_registry_optional()? else {
475 continue;
476 };
477 let Some((binding, declared_ops)) =
478 registry.try_resolve_with_ops(None, Some(provider_type))?
479 else {
480 continue;
481 };
482 if matched.is_some() {
483 bail!(
484 "ambiguous provider_type `{provider_type}` in revision \
485 (deployment {deployment_id}, revision {revision_id}): \
486 multiple packs bind the same type; pack manifests must \
487 declare each provider_type at most once across main + overlays"
488 );
489 }
490 matched = Some((Arc::clone(pack), binding, declared_ops));
491 }
492 let Some((pack, binding, declared_ops)) = matched else {
493 bail!(
494 "no pack in revision binds provider_type `{provider_type}` \
495 (deployment {deployment_id}, revision {revision_id})"
496 );
497 };
498 if !declared_ops.iter().any(|d| d == op) {
499 bail!(
500 "op `{op}` is not declared for provider_type `{provider_type}` \
501 in revision (deployment {deployment_id}, revision {revision_id}); \
502 declared ops: {declared_ops:?}"
503 );
504 }
505 let exec_ctx = ComponentExecCtx {
506 tenant: ComponentTenantCtx {
507 tenant: tenant.to_string(),
508 team: None,
509 user: None,
510 trace_id,
511 i18n_id: None,
512 correlation_id: correlation_id.clone(),
513 deadline_unix_ms: None,
514 attempt: 1,
515 idempotency_key: correlation_id,
516 },
517 i18n_id: None,
518 flow_id: format!("provider-webhook/{provider_type}"),
519 node_id: None,
520 };
521 pack.invoke_provider(&binding, exec_ctx, op, input_json)
522 .await
523 }
524
525 async fn dispatch_activity(
530 &self,
531 runtime: &TenantRuntime,
532 tenant: &str,
533 activity: Activity,
534 ) -> Result<Vec<Activity>> {
535 let activity = apply_fast2flow_routing(runtime, tenant, activity)?;
536
537 if activity.action() == Some("response") && activity.flow_id().is_none() {
543 return Ok(vec![activity]);
544 }
545
546 let (pack_id, flow_id) = resolve_flow_id(runtime, &activity)?;
547 let action = activity.action().map(|value| value.to_string());
548 let session = activity.session_id().map(|value| value.to_string());
549 let provider = activity.provider_id().map(|value| value.to_string());
550 let messaging_endpoint_id = activity
551 .messaging_endpoint_id()
552 .map(|value| value.to_string());
553 let channel = activity.channel().map(|value| value.to_string());
554 let conversation = activity.conversation().map(|value| value.to_string());
555 let user = activity.user().map(|value| value.to_string());
556 let welcome_flow_hint = activity.welcome_flow_hint().cloned();
557 let resolved_flow_type =
558 activity
559 .flow_type()
560 .map(|value| value.to_string())
561 .or_else(|| {
562 runtime
563 .engine()
564 .flow_by_key(&pack_id, &flow_id)
565 .map(|desc| desc.flow_type.clone())
566 });
567 let payload = activity.into_payload();
568
569 let mut envelope = IngressEnvelope {
570 tenant: tenant.to_string(),
571 env: std::env::var("GREENTIC_ENV").ok(),
572 pack_id: Some(pack_id.clone()),
573 flow_id: flow_id.clone(),
574 flow_type: resolved_flow_type,
575 action,
576 session_hint: session,
577 provider,
578 messaging_endpoint_id,
579 channel,
580 conversation,
581 user,
582 activity_id: None,
583 timestamp: None,
584 payload,
585 metadata: None,
586 reply_scope: None,
587 }
588 .canonicalize();
589
590 let hint_flow_type = welcome_flow_hint.as_ref().and_then(|hint| {
591 runtime
592 .engine()
593 .flow_by_key(&hint.pack_id, &hint.flow_id)
594 .map(|desc| desc.flow_type.clone())
595 });
596 apply_welcome_flow_override(
597 runtime.session_store(),
598 &mut envelope,
599 welcome_flow_hint.as_ref(),
600 hint_flow_type,
601 )?;
602
603 let result = runtime.state_machine().handle(envelope).await?;
604 Ok(normalize_replies(result, tenant))
605 }
606
607 pub async fn tenant(&self, tenant: &str) -> Option<TenantHandle> {
608 self.active
609 .load_pack(tenant)
610 .map(|runtime| TenantHandle { runtime })
611 }
612
613 pub fn active_packs(&self) -> Arc<ActivePacks> {
614 Arc::clone(&self.active)
615 }
616
617 pub fn health_state(&self) -> Arc<HealthState> {
618 Arc::clone(&self.health)
619 }
620
621 pub fn wasi_policy(&self) -> Arc<RunnerWasiPolicy> {
622 Arc::clone(&self.wasi_policy)
623 }
624
625 pub fn session_store(&self) -> DynSessionStore {
626 Arc::clone(&self.session_store)
627 }
628
629 pub fn state_store(&self) -> DynStateStore {
630 Arc::clone(&self.state_store)
631 }
632
633 pub fn session_host(&self) -> Arc<dyn SessionHost> {
634 Arc::clone(&self.session_host)
635 }
636
637 pub fn state_host(&self) -> Arc<dyn StateHost> {
638 Arc::clone(&self.state_host)
639 }
640
641 pub fn secrets_manager(&self) -> DynSecretsManager {
642 Arc::clone(&self.secrets_manager)
643 }
644
645 pub fn tenant_configs(&self) -> HashMap<String, Arc<HostConfig>> {
646 self.configs.clone()
647 }
648
649 async fn prepare_runtime(
650 &self,
651 tenant: &str,
652 pack_path: &Path,
653 archive_source: Option<&Path>,
654 ) -> Result<Arc<TenantRuntime>> {
655 let config = self
656 .configs
657 .get(tenant)
658 .cloned()
659 .with_context(|| format!("tenant {tenant} not registered"))?;
660 if config.tenant != tenant {
661 bail!(
662 "tenant mismatch: config declares '{}' but '{tenant}' was requested",
663 config.tenant
664 );
665 }
666 let runtime = TenantRuntime::load(
667 pack_path,
668 Arc::clone(&config),
669 None,
670 archive_source,
671 None,
672 self.wasi_policy(),
673 self.session_host(),
674 self.session_store(),
675 self.state_store(),
676 self.state_host(),
677 self.secrets_manager(),
678 )
679 .await?;
680 let timers = adapt_timer::spawn_timers(Arc::clone(&runtime))?;
681 runtime.register_timers(timers);
682 Ok(runtime)
683 }
684}
685
686impl TenantHandle {
687 pub fn config(&self) -> Arc<HostConfig> {
688 Arc::clone(self.runtime.config())
689 }
690
691 pub fn pack(&self) -> Arc<PackRuntime> {
692 self.runtime.pack()
693 }
694
695 pub fn engine(&self) -> Arc<FlowEngine> {
696 Arc::clone(self.runtime.engine())
697 }
698
699 pub fn overlays(&self) -> Vec<Arc<PackRuntime>> {
700 self.runtime.overlays()
701 }
702
703 pub fn overlay_digests(&self) -> Vec<Option<String>> {
704 self.runtime.overlay_digests()
705 }
706}
707
708fn apply_welcome_flow_override(
732 session_store: &DynSessionStore,
733 envelope: &mut IngressEnvelope,
734 hint: Option<&WelcomeFlowHint>,
735 hint_flow_type: Option<String>,
736) -> Result<()> {
737 let Some(hint) = hint else {
738 return Ok(());
739 };
740 if envelope.messaging_endpoint_id.is_none() {
741 return Ok(());
742 }
743
744 if !try_mark_welcome_first_contact(session_store, envelope)? {
745 return Ok(());
746 }
747
748 let resume = FlowResumeStore::new(Arc::clone(session_store));
749 let snapshot = resume
750 .fetch(envelope)
751 .map_err(|err| anyhow!("welcome-flow first-contact probe failed: {err}"))?;
752 if snapshot.is_some() {
753 return Ok(());
754 }
755
756 envelope.pack_id = Some(hint.pack_id.clone());
757 envelope.flow_id = hint.flow_id.clone();
758 envelope.flow_type = hint_flow_type;
759 Ok(())
760}
761
762fn try_mark_welcome_first_contact(
775 store: &DynSessionStore,
776 envelope: &IngressEnvelope,
777) -> Result<bool> {
778 let Some(scope) = welcome_marker_scope(envelope) else {
779 return Ok(false);
780 };
781 let (ctx, user) = FlowResumeStore::contact_identity(envelope)
782 .map_err(|e| anyhow!("welcome marker identity probe failed: {e}"))?;
783
784 if store
785 .find_wait_by_scope(&ctx, &user, &scope)
786 .map_err(|e| anyhow!("welcome marker probe failed: {e}"))?
787 .is_some()
788 {
789 return Ok(false);
790 }
791
792 let data = marker_session_data(&ctx, &user);
793 let session_key = marker_session_key(&ctx, &user, &scope);
794 store
795 .register_wait(&ctx, &user, &scope, &session_key, data, None)
796 .map_err(|e| anyhow!("welcome marker register failed: {e}"))?;
797 Ok(true)
798}
799
800fn marker_session_key(
813 ctx: &greentic_types::TenantCtx,
814 user: &greentic_types::UserId,
815 scope: &greentic_types::ReplyScope,
816) -> greentic_session::SessionKey {
817 use sha2::{Digest, Sha256};
818 let team = match ctx.team_id.as_ref().or(ctx.team.as_ref()) {
819 Some(t) => t.as_str(),
820 None => "<none>",
821 };
822 let digest = Sha256::digest(
823 format!(
824 "welcome-marker:v1\0{}\0{}\0team={team}\0{}\0{}",
825 ctx.env.as_str(),
826 ctx.tenant_id.as_str(),
827 user.as_str(),
828 scope.conversation,
829 )
830 .as_bytes(),
831 );
832 greentic_session::SessionKey::new(format!("welcome-marker::{}", hex::encode(digest)))
833}
834
835fn welcome_marker_scope(envelope: &IngressEnvelope) -> Option<greentic_types::ReplyScope> {
840 let eid = envelope.messaging_endpoint_id.as_deref()?;
841 Some(greentic_types::ReplyScope {
842 conversation: format!("welcome-seen::ep={eid}"),
843 thread: None,
844 reply_to: None,
845 correlation: None,
846 })
847}
848
849fn marker_session_data(
854 ctx: &greentic_types::TenantCtx,
855 user: &greentic_types::UserId,
856) -> greentic_session::SessionData {
857 use std::str::FromStr;
858 use std::sync::LazyLock;
859 static FLOW_ID: LazyLock<greentic_types::FlowId> =
860 LazyLock::new(|| greentic_types::FlowId::from_str("welcome-marker").expect("valid id"));
861 static PACK_ID: LazyLock<greentic_types::PackId> =
862 LazyLock::new(|| greentic_types::PackId::from_str("welcome-marker").expect("valid id"));
863 let cursor = greentic_types::SessionCursor::new("marker".to_string());
864 let ctx = ctx.clone().with_user(Some(user.clone()));
865 greentic_session::SessionData {
866 tenant_ctx: ctx,
867 flow_id: FLOW_ID.clone(),
868 pack_id: Some(PACK_ID.clone()),
869 cursor,
870 context_json: "{}".to_string(),
871 }
872}
873
874fn apply_fast2flow_routing(
875 runtime: &TenantRuntime,
876 tenant: &str,
877 activity: Activity,
878) -> Result<Activity> {
879 let config = &runtime.config().fast2flow;
880 if !config.enabled || activity.flow_id().is_some() {
881 return Ok(activity);
882 }
883 apply_fast2flow_routing_enabled(runtime, tenant, activity, config)
884}
885
886#[cfg(feature = "greentic-x-provider")]
887fn apply_fast2flow_routing_enabled(
888 runtime: &TenantRuntime,
889 tenant: &str,
890 activity: Activity,
891 config: &Fast2FlowRoutingConfig,
892) -> Result<Activity> {
893 let Some(text) = activity.payload().get("text").and_then(Value::as_str) else {
894 return Ok(activity);
895 };
896 if text.trim().is_empty() {
897 return Ok(activity);
898 }
899
900 let mut envelope = Fast2FlowMessageEnvelope::new(text.trim().to_owned());
901 if let Some(channel) = activity.channel() {
902 envelope = envelope.with_channel(channel.to_owned());
903 }
904 if let Some(provider) = activity.provider_id() {
905 envelope = envelope.with_provider(provider.to_owned());
906 }
907 let request = Fast2FlowRouteRequest {
908 scope: config.scope.clone().unwrap_or_else(|| tenant.to_owned()),
909 envelope,
910 session_active: activity.session_id().is_some(),
911 input_locale: "en".to_owned(),
912 time_budget_ms: config.time_budget_ms,
913 registry_path: config.registry_path.clone(),
914 indexes_path: config.indexes_path.clone(),
915 now_unix_ms: chrono::Utc::now().timestamp_millis().max(0) as u64,
916 metadata: Default::default(),
917 };
918 let provider = RunnerPackFast2FlowRoutingProvider::new(runtime.pack())
919 .map_err(|err| anyhow!(err.to_string()))?
920 .with_component_ref(config.component_ref.clone())
921 .with_operation(config.operation.clone())
922 .with_tenant(tenant.to_owned());
923 let route = provider
924 .route_intent(request)
925 .map_err(|err| anyhow!(err.to_string()))?;
926
927 match route.directive {
928 Fast2FlowDirective::Continue => Ok(activity),
929 Fast2FlowDirective::Dispatch {
930 target, entities, ..
931 } => apply_fast2flow_target(activity, &target, entities),
932 Fast2FlowDirective::Respond { message } => Ok(Activity::custom(
933 "response",
934 serde_json::json!({ "messages": [{ "text": message }] }),
935 )
936 .ensure_tenant(tenant)),
937 Fast2FlowDirective::Deny { reason } => Ok(Activity::custom(
938 "response",
939 serde_json::json!({ "messages": [{ "text": reason }] }),
940 )
941 .ensure_tenant(tenant)),
942 }
943}
944
945#[cfg(not(feature = "greentic-x-provider"))]
946fn apply_fast2flow_routing_enabled(
947 _runtime: &TenantRuntime,
948 _tenant: &str,
949 _activity: Activity,
950 _config: &Fast2FlowRoutingConfig,
951) -> Result<Activity> {
952 bail!("fast2flow routing requires the greentic-x-provider feature")
953}
954
955#[cfg(feature = "greentic-x-provider")]
956fn apply_fast2flow_target(
957 activity: Activity,
958 target: &str,
959 entities: Vec<greentic_x_runtime::Fast2FlowRoutingEntity>,
960) -> Result<Activity> {
961 let target = target.trim();
962 if target.is_empty() {
963 bail!("fast2flow dispatch target is empty");
964 }
965 if let Some((pack_id, flow_id)) = target.split_once('/') {
966 if pack_id.trim().is_empty() || flow_id.trim().is_empty() {
967 bail!("fast2flow dispatch target `{target}` must be `pack_id/flow_id` or `flow_id`");
968 }
969 return Ok(attach_fast2flow_entities(
970 activity.with_pack(pack_id.trim()).with_flow(flow_id.trim()),
971 entities,
972 ));
973 }
974 Ok(attach_fast2flow_entities(
975 activity.with_flow(target),
976 entities,
977 ))
978}
979
980#[cfg(feature = "greentic-x-provider")]
981fn attach_fast2flow_entities(
982 activity: Activity,
983 entities: Vec<greentic_x_runtime::Fast2FlowRoutingEntity>,
984) -> Activity {
985 if entities.is_empty() {
986 return activity;
987 }
988 activity.with_payload_field(
989 "fast2flow",
990 serde_json::json!({
991 "entities": entities,
992 }),
993 )
994}
995
996fn resolve_flow_id(runtime: &TenantRuntime, activity: &Activity) -> Result<(String, String)> {
997 let engine = runtime.engine();
998 if let Some(flow_id) = activity.flow_id() {
999 if let Some(pack_id) = activity.pack_id() {
1000 if engine.flow_by_key(pack_id, flow_id).is_none() {
1001 bail!("flow {flow_id} not registered for pack {pack_id}");
1002 }
1003 return Ok((pack_id.to_string(), flow_id.to_string()));
1004 }
1005 if let Some(flow) = engine.flow_by_id(flow_id) {
1006 return Ok((flow.pack_id.clone(), flow.id.clone()));
1007 }
1008 bail!("flow {flow_id} is ambiguous; pack_id is required");
1009 }
1010
1011 if let Some(flow_type) = activity.flow_type() {
1012 if let Some(pack_id) = activity.pack_id() {
1013 if let Some(flow) = engine
1014 .flows()
1015 .iter()
1016 .find(|flow| flow.pack_id == pack_id && flow.flow_type == flow_type)
1017 {
1018 return Ok((pack_id.to_string(), flow.id.clone()));
1019 }
1020 bail!("flow type {flow_type} not registered for pack {pack_id}");
1021 }
1022 if let Some(flow) = engine.flow_by_type(flow_type) {
1023 return Ok((flow.pack_id.clone(), flow.id.clone()));
1024 }
1025 bail!("flow type {flow_type} is ambiguous; pack_id is required");
1026 }
1027
1028 let pack = runtime.pack();
1029 let flow_id = pack
1030 .metadata()
1031 .entry_flows
1032 .first()
1033 .cloned()
1034 .ok_or_else(|| anyhow!("no entry flows registered for tenant {}", runtime.tenant()))?;
1035 Ok((pack.metadata().pack_id.clone(), flow_id))
1036}
1037
1038fn normalize_replies(result: Value, tenant: &str) -> Vec<Activity> {
1039 result
1040 .as_array()
1041 .cloned()
1042 .unwrap_or_else(|| vec![result])
1043 .into_iter()
1044 .map(|payload| Activity::from_output(payload, tenant))
1045 .collect()
1046}
1047
1048fn is_pack_archive(path: &Path) -> bool {
1049 path.extension()
1050 .and_then(|ext| ext.to_str())
1051 .map(|ext| ext.eq_ignore_ascii_case("gtpack"))
1052 .unwrap_or(false)
1053}
1054
1055#[cfg(test)]
1056mod welcome_flow_tests {
1057 use super::*;
1058 use crate::engine::runtime::IngressEnvelope;
1059 use crate::runner::engine::{ExecutionState, FlowSnapshot, FlowWait};
1060 use crate::storage::new_session_store;
1061 use greentic_types::ReplyScope;
1062 use serde_json::json;
1063
1064 fn sample_envelope(endpoint_id: Option<&str>) -> IngressEnvelope {
1065 sample_envelope_for_user(endpoint_id, "user-1")
1066 }
1067
1068 fn sample_envelope_for_user(endpoint_id: Option<&str>, user: &str) -> IngressEnvelope {
1069 IngressEnvelope {
1070 tenant: "demo".into(),
1071 env: Some("local".into()),
1072 pack_id: Some("pack.default".into()),
1073 flow_id: "flow.default".into(),
1074 flow_type: Some("messaging".into()),
1075 action: Some("messaging".into()),
1076 session_hint: None,
1077 provider: Some("teams".into()),
1078 messaging_endpoint_id: endpoint_id.map(String::from),
1079 channel: Some("chan".into()),
1080 conversation: Some(format!("conv-{user}")),
1081 user: Some(user.to_string()),
1082 activity_id: None,
1083 timestamp: None,
1084 payload: json!({}),
1085 metadata: None,
1086 reply_scope: Some(ReplyScope {
1087 conversation: format!("conv-{user}"),
1088 thread: None,
1089 reply_to: None,
1090 correlation: None,
1091 }),
1092 }
1093 .canonicalize()
1094 }
1095
1096 fn hint() -> WelcomeFlowHint {
1097 WelcomeFlowHint {
1098 pack_id: "pack.welcome".into(),
1099 flow_id: "flow.welcome".into(),
1100 }
1101 }
1102
1103 fn seed_resume(store: &DynSessionStore, envelope: &IngressEnvelope) {
1104 let resume = FlowResumeStore::new(Arc::clone(store));
1108 let state: ExecutionState = serde_json::from_value(json!({
1109 "input": { "text": "hi" },
1110 "nodes": {},
1111 "egress": []
1112 }))
1113 .expect("state");
1114 let wait = FlowWait {
1115 reason: Some("await-user".into()),
1116 snapshot: FlowSnapshot {
1117 pack_id: envelope.pack_id.clone().expect("pack_id"),
1118 flow_id: envelope.flow_id.clone(),
1119 next_flow: None,
1120 next_node: "node-2".into(),
1121 state,
1122 },
1123 };
1124 resume.save(envelope, &wait).expect("seed save");
1125 }
1126
1127 #[test]
1128 fn override_is_no_op_when_hint_absent() {
1129 let store = new_session_store();
1132 let mut envelope = sample_envelope(Some("teams-legal"));
1133 let before = envelope.clone();
1134 apply_welcome_flow_override(&store, &mut envelope, None, None).expect("ok");
1135 assert_eq!(envelope.pack_id, before.pack_id);
1136 assert_eq!(envelope.flow_id, before.flow_id);
1137 assert_eq!(envelope.flow_type, before.flow_type);
1138 }
1139
1140 #[test]
1141 fn override_is_no_op_when_endpoint_id_absent() {
1142 let store = new_session_store();
1145 let mut envelope = sample_envelope(None);
1146 let before = envelope.clone();
1147 apply_welcome_flow_override(&store, &mut envelope, Some(&hint()), Some("welcome".into()))
1148 .expect("ok");
1149 assert_eq!(envelope.pack_id, before.pack_id);
1150 assert_eq!(envelope.flow_id, before.flow_id);
1151 }
1152
1153 #[test]
1154 fn override_swaps_pack_flow_and_threads_flow_type_through() {
1155 for hint_flow_type in [Some("welcome".to_string()), None] {
1160 let store = new_session_store();
1161 let mut envelope = sample_envelope(Some("teams-legal"));
1162 apply_welcome_flow_override(
1163 &store,
1164 &mut envelope,
1165 Some(&hint()),
1166 hint_flow_type.clone(),
1167 )
1168 .expect("ok");
1169 assert_eq!(envelope.pack_id.as_deref(), Some("pack.welcome"));
1170 assert_eq!(envelope.flow_id, "flow.welcome");
1171 assert_eq!(envelope.flow_type, hint_flow_type);
1172 }
1173 }
1174
1175 #[test]
1176 fn override_is_no_op_on_repeat_turn_with_existing_session() {
1177 let store = new_session_store();
1181 let envelope_template = sample_envelope(Some("teams-legal"));
1182 seed_resume(&store, &envelope_template);
1183
1184 let mut envelope = envelope_template.clone();
1185 apply_welcome_flow_override(&store, &mut envelope, Some(&hint()), Some("welcome".into()))
1186 .expect("ok");
1187 assert_eq!(envelope.pack_id, envelope_template.pack_id);
1188 assert_eq!(envelope.flow_id, envelope_template.flow_id);
1189 assert_eq!(envelope.flow_type, envelope_template.flow_type);
1190 }
1191
1192 #[test]
1193 fn override_is_no_op_post_completion_when_marker_present() {
1194 let store = new_session_store();
1200 let mut first = sample_envelope(Some("teams-legal"));
1201 apply_welcome_flow_override(&store, &mut first, Some(&hint()), Some("welcome".into()))
1202 .expect("first turn ok");
1203 assert_eq!(
1204 first.pack_id.as_deref(),
1205 Some("pack.welcome"),
1206 "first turn fires welcome"
1207 );
1208
1209 let resume = FlowResumeStore::new(Arc::clone(&store));
1214 resume.clear(&first).expect("clear post-completion wait");
1215
1216 let mut second = sample_envelope(Some("teams-legal"));
1220 apply_welcome_flow_override(&store, &mut second, Some(&hint()), Some("welcome".into()))
1221 .expect("second turn ok");
1222 assert_eq!(
1223 second.pack_id.as_deref(),
1224 Some("pack.default"),
1225 "second turn must NOT re-fire welcome"
1226 );
1227 assert_eq!(second.flow_id, "flow.default");
1228 }
1229
1230 #[test]
1231 fn override_is_no_op_on_second_turn_after_marker_set() {
1232 let store = new_session_store();
1236 let mut first = sample_envelope(Some("teams-legal"));
1237 apply_welcome_flow_override(&store, &mut first, Some(&hint()), Some("welcome".into()))
1238 .expect("first turn ok");
1239 assert_eq!(first.pack_id.as_deref(), Some("pack.welcome"));
1240
1241 let mut second = sample_envelope(Some("teams-legal"));
1242 apply_welcome_flow_override(&store, &mut second, Some(&hint()), Some("welcome".into()))
1243 .expect("second turn ok");
1244 assert_eq!(
1245 second.pack_id.as_deref(),
1246 Some("pack.default"),
1247 "second turn must NOT re-fire welcome"
1248 );
1249 }
1250
1251 #[test]
1252 fn override_partitions_marker_per_endpoint() {
1253 let store = new_session_store();
1258 let mut legal = sample_envelope(Some("teams-legal"));
1259 apply_welcome_flow_override(&store, &mut legal, Some(&hint()), Some("welcome".into()))
1260 .expect("legal first turn ok");
1261 assert_eq!(legal.pack_id.as_deref(), Some("pack.welcome"));
1262
1263 let mut accounting = sample_envelope(Some("teams-accounting"));
1264 apply_welcome_flow_override(
1265 &store,
1266 &mut accounting,
1267 Some(&hint()),
1268 Some("welcome".into()),
1269 )
1270 .expect("accounting first turn ok");
1271 assert_eq!(
1272 accounting.pack_id.as_deref(),
1273 Some("pack.welcome"),
1274 "different endpoint = independent first contact"
1275 );
1276 }
1277
1278 #[test]
1279 fn override_partitions_marker_per_user_on_same_endpoint() {
1280 let store = new_session_store();
1290
1291 let mut a1 = sample_envelope_for_user(Some("teams-legal"), "user-a");
1293 apply_welcome_flow_override(&store, &mut a1, Some(&hint()), Some("welcome".into()))
1294 .expect("user-a first ok");
1295 assert_eq!(a1.pack_id.as_deref(), Some("pack.welcome"));
1296
1297 let mut b1 = sample_envelope_for_user(Some("teams-legal"), "user-b");
1299 apply_welcome_flow_override(&store, &mut b1, Some(&hint()), Some("welcome".into()))
1300 .expect("user-b first must not collide with user-a marker");
1301 assert_eq!(
1302 b1.pack_id.as_deref(),
1303 Some("pack.welcome"),
1304 "user-b is independent first contact"
1305 );
1306
1307 let mut a2 = sample_envelope_for_user(Some("teams-legal"), "user-a");
1309 apply_welcome_flow_override(&store, &mut a2, Some(&hint()), Some("welcome".into()))
1310 .expect("user-a second ok");
1311 assert_eq!(
1312 a2.pack_id.as_deref(),
1313 Some("pack.default"),
1314 "user-a must not be re-welcomed after user-b joined"
1315 );
1316
1317 let mut b2 = sample_envelope_for_user(Some("teams-legal"), "user-b");
1319 apply_welcome_flow_override(&store, &mut b2, Some(&hint()), Some("welcome".into()))
1320 .expect("user-b second ok");
1321 assert_eq!(b2.pack_id.as_deref(), Some("pack.default"));
1322 }
1323
1324 #[test]
1325 fn marker_is_not_written_when_hint_absent() {
1326 let store = new_session_store();
1335 let mut envelope = sample_envelope(Some("teams-legal"));
1336 apply_welcome_flow_override(&store, &mut envelope, None, None).expect("ok");
1337
1338 let mut next = sample_envelope(Some("teams-legal"));
1339 apply_welcome_flow_override(&store, &mut next, Some(&hint()), Some("welcome".into()))
1340 .expect("ok");
1341 assert_eq!(
1342 next.pack_id.as_deref(),
1343 Some("pack.welcome"),
1344 "no marker leaked from hint-absent path"
1345 );
1346 }
1347}
1348
1349#[cfg(test)]
1350mod identify_endpoints_tests {
1351 use super::*;
1352
1353 fn dummy_runner_host() -> RunnerHost {
1354 let session_store = new_session_store();
1355 let state_store = new_state_store();
1356 RunnerHost {
1357 configs: HashMap::new(),
1358 active: Arc::new(ActivePacks::new()),
1359 health: Arc::new(HealthState::new()),
1360 session_host: session_host_from(session_store.clone()),
1361 state_host: state_host_from(state_store.clone()),
1362 session_store,
1363 state_store,
1364 wasi_policy: Arc::new(RunnerWasiPolicy::new()),
1365 secrets_manager: default_manager().expect("default secrets manager"),
1366 telemetry: None,
1367 }
1368 }
1369
1370 #[tokio::test]
1371 async fn empty_provider_types_returns_empty_map_without_loading_revision() {
1372 let host = dummy_runner_host();
1376 let map = host
1377 .identify_messaging_endpoints_for_revision(
1378 "demo",
1379 DeploymentId::new(),
1380 BundleId::new("anything"),
1381 RevisionId::new(),
1382 &[],
1383 b"{}",
1384 )
1385 .await
1386 .expect("empty types is the cheap fast path");
1387 assert!(map.is_empty());
1388 }
1389
1390 #[tokio::test]
1391 async fn missing_revision_surfaces_clear_error() {
1392 let host = dummy_runner_host();
1396 let deployment = DeploymentId::new();
1397 let revision = RevisionId::new();
1398 let err = host
1399 .identify_messaging_endpoints_for_revision(
1400 "demo",
1401 deployment,
1402 BundleId::new("missing"),
1403 revision,
1404 &["teams"],
1405 b"{}",
1406 )
1407 .await
1408 .expect_err("missing revision must fail closed");
1409 let msg = format!("{err:#}");
1410 assert!(
1411 msg.contains("revision runtime not loaded"),
1412 "error chain should name the failure mode, got: {msg}"
1413 );
1414 assert!(
1415 msg.contains(&deployment.to_string()),
1416 "error chain should name the deployment id, got: {msg}"
1417 );
1418 assert!(
1419 msg.contains(&revision.to_string()),
1420 "error chain should name the revision id, got: {msg}"
1421 );
1422 }
1423
1424 #[tokio::test]
1425 async fn scoped_empty_provider_types_returns_empty_map() {
1426 let host = dummy_runner_host();
1427 let map = host
1428 .identify_messaging_endpoints_for_revision_scoped(
1429 "demo",
1430 DeploymentId::new(),
1431 BundleId::new("anything"),
1432 RevisionId::new(),
1433 &[],
1434 &[],
1435 &Value::Null,
1436 )
1437 .await
1438 .expect("empty types is the cheap fast path");
1439 assert!(map.is_empty());
1440 }
1441
1442 #[tokio::test]
1443 async fn scoped_missing_revision_surfaces_clear_error() {
1444 let host = dummy_runner_host();
1445 let deployment = DeploymentId::new();
1446 let revision = RevisionId::new();
1447 let err = host
1448 .identify_messaging_endpoints_for_revision_scoped(
1449 "demo",
1450 deployment,
1451 BundleId::new("missing"),
1452 revision,
1453 &["teams"],
1454 &[],
1455 &Value::Null,
1456 )
1457 .await
1458 .expect_err("missing revision must fail closed");
1459 let msg = format!("{err:#}");
1460 assert!(
1461 msg.contains("revision runtime not loaded"),
1462 "error chain should name the failure mode, got: {msg}"
1463 );
1464 assert!(
1465 msg.contains(&deployment.to_string()),
1466 "error chain should name the deployment id, got: {msg}"
1467 );
1468 assert!(
1469 msg.contains(&revision.to_string()),
1470 "error chain should name the revision id, got: {msg}"
1471 );
1472 }
1473
1474 #[test]
1489 fn identify_futures_are_send() {
1490 fn assert_send<F: Send>(_: F) {}
1491 let host = dummy_runner_host();
1492 assert_send(host.identify_messaging_endpoints_for_revision(
1496 "demo",
1497 DeploymentId::new(),
1498 BundleId::new("anything"),
1499 RevisionId::new(),
1500 &["teams"],
1501 b"{}",
1502 ));
1503 assert_send(host.identify_messaging_endpoints_for_revision_scoped(
1504 "demo",
1505 DeploymentId::new(),
1506 BundleId::new("anything"),
1507 RevisionId::new(),
1508 &["teams"],
1509 &[("x-telegram-bot-api-secret-token".into(), "tok".into())],
1510 &Value::Null,
1511 ));
1512 assert_send(host.describe_identify_instances_for_revision(
1513 "demo",
1514 DeploymentId::new(),
1515 BundleId::new("anything"),
1516 RevisionId::new(),
1517 &["teams"],
1518 ));
1519 assert_send(host.invoke_provider_for_revision(
1520 "demo",
1521 DeploymentId::new(),
1522 BundleId::new("anything"),
1523 RevisionId::new(),
1524 "messaging.telegram.bot",
1525 "ingest_http",
1526 b"{}".to_vec(),
1527 None,
1528 None,
1529 ));
1530 }
1531
1532 #[tokio::test]
1533 async fn invoke_provider_missing_revision_surfaces_clear_error() {
1534 let host = dummy_runner_host();
1539 let deployment = DeploymentId::new();
1540 let revision = RevisionId::new();
1541 let err = host
1542 .invoke_provider_for_revision(
1543 "demo",
1544 deployment,
1545 BundleId::new("missing"),
1546 revision,
1547 "messaging.telegram.bot",
1548 "ingest_http",
1549 b"{}".to_vec(),
1550 None,
1551 None,
1552 )
1553 .await
1554 .expect_err("missing revision must fail closed");
1555 let msg = format!("{err:#}");
1556 assert!(
1557 msg.contains("revision runtime not loaded"),
1558 "error chain should name the failure mode, got: {msg}"
1559 );
1560 assert!(
1561 msg.contains(&deployment.to_string()),
1562 "error chain should name the deployment id, got: {msg}"
1563 );
1564 assert!(
1565 msg.contains(&revision.to_string()),
1566 "error chain should name the revision id, got: {msg}"
1567 );
1568 }
1569}
1570
1571#[cfg(all(test, feature = "greentic-x-provider"))]
1572mod fast2flow_tests {
1573 use greentic_x_runtime::Fast2FlowRoutingEntity;
1574
1575 use super::*;
1576
1577 #[test]
1578 fn dispatch_target_attaches_prefill_entities_to_payload() {
1579 let activity = Activity::text("show traffic tomorrow");
1580 let routed = apply_fast2flow_target(
1581 activity,
1582 "telco-x/prefix-traffic",
1583 vec![Fast2FlowRoutingEntity::new("date", "20260611").with_format("iso", "2026-06-11")],
1584 )
1585 .expect("target should route");
1586
1587 assert_eq!(routed.pack_id(), Some("telco-x"));
1588 assert_eq!(routed.flow_id(), Some("prefix-traffic"));
1589 assert_eq!(
1590 routed.payload()["fast2flow"]["entities"][0]["normalized"],
1591 "20260611"
1592 );
1593 assert_eq!(
1594 routed.payload()["fast2flow"]["entities"][0]["formats"]["iso"],
1595 "2026-06-11"
1596 );
1597 }
1598}