Skip to main content

greentic_runner_host/engine/
runtime.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use greentic_session::{SessionData, SessionKey as StoreSessionKey};
8use greentic_types::{
9    EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
10    TenantCtx, TenantId, UserId,
11};
12use rand::{RngExt, rng};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use super::api::{RunFlowRequest, RunnerApi};
18use super::builder::{Runner, RunnerBuilder};
19use super::error::{GResult, RunnerError};
20use super::glue::{FnSecretsHost, FnTelemetryHost};
21use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
22use super::policy::Policy;
23use super::registry::{Adapter, AdapterCall, AdapterRegistry};
24use super::shims::{InMemorySessionHost, InMemoryStateHost};
25use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
26
27use crate::config::{HostConfig, SecretsPolicy};
28use crate::pack::FlowDescriptor;
29use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, read_secret_blocking};
32use crate::storage::session::DynSessionStore;
33use crate::trace::{PackTraceInfo, TraceContext, TraceMode, TraceRecorder};
34
35const DEFAULT_ENV: &str = "local";
36const PACK_FLOW_ADAPTER: &str = "pack_flow";
37
38#[derive(Clone)]
39pub struct FlowResumeStore {
40    store: DynSessionStore,
41}
42
43impl FlowResumeStore {
44    pub fn new(store: DynSessionStore) -> Self {
45        Self { store }
46    }
47
48    pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
49        let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
50        ctx = ctx.with_user(Some(user.clone()));
51
52        let mut scopes = vec![scope.clone()];
53        if scope.correlation.is_some() {
54            let mut base = scope.clone();
55            base.correlation = None;
56            scopes.push(base);
57        }
58
59        for lookup in scopes {
60            if let Some(key) = self
61                .store
62                .find_wait_by_scope(&ctx, &user, &lookup)
63                .map_err(map_store_error)?
64            {
65                let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
66                    continue;
67                };
68                let record: FlowResumeRecord =
69                    serde_json::from_str(&data.context_json).map_err(|err| {
70                        RunnerError::Session {
71                            reason: format!("failed to decode flow resume snapshot: {err}"),
72                        }
73                    })?;
74                if let Some(pack_id) = envelope.pack_id.as_deref()
75                    && record.snapshot.pack_id != pack_id
76                {
77                    return Err(RunnerError::Session {
78                        reason: format!(
79                            "resume pack mismatch: expected {pack_id}, found {}",
80                            record.snapshot.pack_id
81                        ),
82                    });
83                }
84                return Ok(Some(record.snapshot));
85            }
86        }
87
88        Ok(None)
89    }
90
91    pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
92        let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
93        let record = FlowResumeRecord {
94            snapshot: wait.snapshot.clone(),
95            reason: wait.reason.clone(),
96        };
97        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
98        let mut reply_scope = scope.clone();
99        if reply_scope.correlation.is_none() {
100            reply_scope.correlation = Some(generate_correlation_id());
101        }
102        let mut store_scope = scope;
103        store_scope.correlation = None;
104        let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
105        self.store
106            .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
107            .map_err(map_store_error)?;
108        Ok(reply_scope)
109    }
110
111    pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
112        let (ctx, user, _, scope) = build_store_ctx(envelope)?;
113        let mut scopes = vec![scope.clone()];
114        if scope.correlation.is_some() {
115            let mut base = scope;
116            base.correlation = None;
117            scopes.push(base);
118        }
119        for lookup in scopes {
120            self.store
121                .clear_wait(&ctx, &user, &lookup)
122                .map_err(map_store_error)?;
123        }
124        Ok(())
125    }
126}
127
128#[derive(Serialize, Deserialize)]
129struct FlowResumeRecord {
130    snapshot: FlowSnapshot,
131    #[serde(default)]
132    reason: Option<String>,
133}
134
135fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
136    let base_hint = envelope
137        .session_hint
138        .clone()
139        .unwrap_or_else(|| envelope.canonical_session_hint());
140    let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
141        format!("{base_hint}::pack={pack_id}")
142    } else {
143        base_hint.clone()
144    };
145    let user = derive_user_id(&hint)?;
146    let scope = envelope
147        .reply_scope
148        .clone()
149        .ok_or_else(|| RunnerError::Session {
150            reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
151                .to_string(),
152        })?;
153    let mut ctx = envelope.tenant_ctx();
154    ctx = ctx.with_session(hint.clone());
155    ctx = ctx.with_user(Some(user.clone()));
156    Ok((ctx, user, hint, scope))
157}
158
159fn record_to_session_data(
160    record: &FlowResumeRecord,
161    ctx: TenantCtx,
162    user: &UserId,
163    session_hint: &str,
164) -> GResult<SessionData> {
165    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
166    let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
167    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
168    if let Some(reason) = record.reason.clone() {
169        cursor = cursor.with_wait_reason(reason);
170    }
171    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
172        reason: format!("failed to encode flow resume snapshot: {err}"),
173    })?;
174    let ctx = ctx
175        .with_user(Some(user.clone()))
176        .with_session(session_hint.to_string())
177        .with_flow(record.snapshot.flow_id.clone());
178    Ok(SessionData {
179        tenant_ctx: ctx,
180        flow_id: flow,
181        pack_id: Some(pack),
182        cursor,
183        context_json,
184    })
185}
186
187fn derive_user_id(hint: &str) -> GResult<UserId> {
188    let digest = Sha256::digest(hint.as_bytes());
189    let slug = format!("sess{}", hex::encode(&digest[..8]));
190    UserId::from_str(&slug).map_err(map_store_error)
191}
192
193fn map_store_error(err: GreenticError) -> RunnerError {
194    RunnerError::Session {
195        reason: err.to_string(),
196    }
197}
198
199fn generate_correlation_id() -> String {
200    let mut bytes = [0u8; 16];
201    rng().fill(&mut bytes);
202    hex::encode(bytes)
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::runner::engine::ExecutionState;
209    use crate::storage::session::new_session_store;
210    use serde_json::json;
211
212    fn sample_envelope() -> IngressEnvelope {
213        IngressEnvelope {
214            tenant: "demo".into(),
215            env: Some("local".into()),
216            pack_id: Some("pack.demo".into()),
217            flow_id: "flow.main".into(),
218            flow_type: None,
219            action: Some("messaging".into()),
220            session_hint: Some("demo:provider:chan:conv:user".into()),
221            provider: Some("provider".into()),
222            channel: Some("chan".into()),
223            conversation: Some("conv".into()),
224            user: Some("user".into()),
225            activity_id: Some("act-1".into()),
226            timestamp: None,
227            payload: json!({ "text": "hi" }),
228            metadata: None,
229            reply_scope: Some(ReplyScope {
230                conversation: "conv".into(),
231                thread: None,
232                reply_to: None,
233                correlation: None,
234            }),
235        }
236    }
237
238    fn sample_wait() -> FlowWait {
239        let state: ExecutionState = serde_json::from_value(json!({
240            "input": { "text": "hi" },
241            "nodes": {},
242            "egress": []
243        }))
244        .expect("state");
245        FlowWait {
246            reason: Some("await-user".into()),
247            snapshot: FlowSnapshot {
248                pack_id: "pack.demo".into(),
249                flow_id: "flow.main".into(),
250                next_flow: None,
251                next_node: "node-2".into(),
252                state,
253            },
254        }
255    }
256
257    #[test]
258    fn derive_user_id_is_stable() {
259        let hint = "some-tenant::session-key";
260        let a = derive_user_id(hint).unwrap();
261        let b = derive_user_id(hint).unwrap();
262        assert_eq!(a, b);
263        assert!(a.as_str().starts_with("sess"));
264    }
265
266    #[test]
267    fn resume_store_roundtrip() -> GResult<()> {
268        let store = FlowResumeStore::new(new_session_store());
269        let envelope = sample_envelope();
270        assert!(store.fetch(&envelope)?.is_none());
271
272        let wait = sample_wait();
273        let _ = store.save(&envelope, &wait)?;
274        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
275        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
276        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
277
278        store.clear(&envelope)?;
279        assert!(store.fetch(&envelope)?.is_none());
280        Ok(())
281    }
282
283    #[test]
284    fn resume_store_overwrites_existing() -> GResult<()> {
285        let store = FlowResumeStore::new(new_session_store());
286        let envelope = sample_envelope();
287        let mut wait = sample_wait();
288        let _ = store.save(&envelope, &wait)?;
289
290        wait.snapshot.next_node = "node-3".into();
291        wait.reason = Some("retry".into());
292        let _ = store.save(&envelope, &wait)?;
293
294        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
295        assert_eq!(snapshot.next_node, "node-3");
296        store.clear(&envelope)?;
297        Ok(())
298    }
299
300    #[test]
301    fn resume_store_uses_snapshot_even_if_envelope_flow_differs() -> GResult<()> {
302        let store = FlowResumeStore::new(new_session_store());
303        let envelope = sample_envelope();
304        let wait = sample_wait();
305        let _ = store.save(&envelope, &wait)?;
306
307        let mut redirected = envelope.clone();
308        redirected.flow_id = "flow.other".into();
309        let snapshot = store.fetch(&redirected)?.expect("snapshot missing");
310        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
311
312        store.clear(&envelope)?;
313        Ok(())
314    }
315
316    #[test]
317    fn canonicalize_populates_defaults() {
318        let envelope = IngressEnvelope {
319            tenant: "demo".into(),
320            env: None,
321            pack_id: None,
322            flow_id: "flow.main".into(),
323            flow_type: None,
324            action: None,
325            session_hint: None,
326            provider: None,
327            channel: None,
328            conversation: None,
329            user: None,
330            activity_id: Some("activity-1".into()),
331            timestamp: None,
332            payload: json!({}),
333            metadata: None,
334            reply_scope: None,
335        }
336        .canonicalize();
337
338        assert_eq!(envelope.provider.as_deref(), Some("provider"));
339        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
340        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
341        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
342        assert!(envelope.session_hint.is_some());
343    }
344}
345
346pub struct StateMachineRuntime {
347    runner: Runner,
348}
349
350impl StateMachineRuntime {
351    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
352    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
353        let secrets = Arc::new(FnSecretsHost::new(|name| {
354            Err(RunnerError::Secrets {
355                reason: format!("secret {name} unavailable (noop host)"),
356            })
357        }));
358        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
359        let session = Arc::new(InMemorySessionHost::new());
360        let state = Arc::new(InMemoryStateHost::new());
361        let host = HostBundle::new(secrets, telemetry, session, state);
362
363        let adapters = AdapterRegistry::default();
364        let policy = Policy::default();
365
366        let mut builder = RunnerBuilder::new()
367            .with_host(host)
368            .with_adapters(adapters)
369            .with_policy(policy);
370        for flow in flows {
371            builder = builder.with_flow(flow);
372        }
373        let runner = builder.build()?;
374        Ok(Self { runner })
375    }
376
377    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
378    #[allow(clippy::too_many_arguments)]
379    pub fn from_flow_engine(
380        config: Arc<HostConfig>,
381        engine: Arc<FlowEngine>,
382        pack_trace: HashMap<String, PackTraceInfo>,
383        session_host: Arc<dyn SessionHost>,
384        session_store: DynSessionStore,
385        state_host: Arc<dyn StateHost>,
386        secrets_manager: DynSecretsManager,
387        mocks: Option<Arc<MockLayer>>,
388    ) -> Result<Self> {
389        let policy = Arc::new(config.secrets_policy.clone());
390        let tenant_ctx = config.tenant_ctx();
391        let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
392        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
393            tracing::debug!(?span, ?fields, "telemetry emit");
394            Ok(())
395        }));
396        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
397        let resume_store = FlowResumeStore::new(session_store);
398
399        let mut adapters = AdapterRegistry::default();
400        adapters.register(
401            PACK_FLOW_ADAPTER,
402            Box::new(PackFlowAdapter::new(
403                Arc::clone(&config),
404                Arc::clone(&engine),
405                pack_trace,
406                resume_store,
407                mocks,
408            )),
409        );
410
411        let flows = build_flow_definitions(engine.flows());
412        let mut builder = RunnerBuilder::new()
413            .with_host(host)
414            .with_adapters(adapters)
415            .with_policy(Policy::default());
416        for flow in flows {
417            builder = builder.with_flow(flow);
418        }
419        let runner = builder
420            .build()
421            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
422        Ok(Self { runner })
423    }
424
425    /// Execute the flow associated with the provided ingress event.
426    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
427        let tenant_ctx = envelope.tenant_ctx();
428        let session_hint = envelope
429            .session_hint
430            .clone()
431            .unwrap_or_else(|| envelope.canonical_session_hint());
432        let pack_id = envelope.pack_id.clone().ok_or_else(|| {
433            anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
434        })?;
435        let input =
436            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
437        let request = RunFlowRequest {
438            tenant: tenant_ctx,
439            pack_id,
440            flow_id: envelope.flow_id.clone(),
441            input,
442            session_hint: Some(session_hint),
443        };
444        let result: super::api::RunFlowResult = self
445            .runner
446            .run_flow(request)
447            .await
448            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
449        let outcome = result.outcome;
450        Ok(outcome.get("response").cloned().unwrap_or(outcome))
451    }
452}
453
454struct PolicySecretsHost {
455    policy: Arc<SecretsPolicy>,
456    manager: DynSecretsManager,
457    tenant_ctx: TenantCtx,
458}
459
460impl PolicySecretsHost {
461    fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
462        Self {
463            policy,
464            manager,
465            tenant_ctx,
466        }
467    }
468}
469
470const POLICY_SECRETS_PACK_ID: &str = "_runner";
471
472#[async_trait]
473impl SecretsHost for PolicySecretsHost {
474    async fn get(&self, name: &str) -> GResult<String> {
475        if !self.policy.is_allowed(name) {
476            return Err(RunnerError::Secrets {
477                reason: format!("secret {name} denied by policy"),
478            });
479        }
480        let bytes = read_secret_blocking(
481            &self.manager,
482            &self.tenant_ctx,
483            POLICY_SECRETS_PACK_ID,
484            name,
485        )
486        .map_err(|err| RunnerError::Secrets {
487            reason: format!("secret {name} unavailable: {err}"),
488        })?;
489        String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
490            reason: format!("secret {name} not valid UTF-8: {err}"),
491        })
492    }
493}
494
495fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
496    flows
497        .iter()
498        .map(|descriptor| {
499            FlowDefinition::new(
500                super::api::FlowSummary {
501                    pack_id: descriptor.pack_id.clone(),
502                    id: descriptor.id.clone(),
503                    name: descriptor
504                        .description
505                        .clone()
506                        .unwrap_or_else(|| descriptor.id.clone()),
507                    version: descriptor.version.clone(),
508                    description: descriptor.description.clone(),
509                },
510                serde_json::json!({
511                    "type": "object"
512                }),
513                vec![FlowStep::Adapter(AdapterCall {
514                    adapter: PACK_FLOW_ADAPTER.into(),
515                    operation: descriptor.id.clone(),
516                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
517                })],
518            )
519        })
520        .collect()
521}
522
523struct PackFlowAdapter {
524    tenant: String,
525    config: Arc<HostConfig>,
526    engine: Arc<FlowEngine>,
527    pack_trace: HashMap<String, PackTraceInfo>,
528    resume: FlowResumeStore,
529    mocks: Option<Arc<MockLayer>>,
530}
531
532impl PackFlowAdapter {
533    fn new(
534        config: Arc<HostConfig>,
535        engine: Arc<FlowEngine>,
536        pack_trace: HashMap<String, PackTraceInfo>,
537        resume: FlowResumeStore,
538        mocks: Option<Arc<MockLayer>>,
539    ) -> Self {
540        Self {
541            tenant: config.tenant.clone(),
542            config,
543            engine,
544            pack_trace,
545            resume,
546            mocks,
547        }
548    }
549}
550
551#[async_trait::async_trait]
552impl Adapter for PackFlowAdapter {
553    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
554        let envelope: IngressEnvelope =
555            serde_json::from_value(call.payload.clone()).map_err(|err| {
556                RunnerError::AdapterCall {
557                    reason: format!("invalid ingress payload: {err}"),
558                }
559            })?;
560        let envelope = envelope.canonicalize();
561        let flow_id = call.operation.clone();
562        let action_owned = envelope.action.clone();
563        let session_owned = envelope
564            .session_hint
565            .clone()
566            .unwrap_or_else(|| envelope.canonical_session_hint());
567        let provider_owned = envelope.provider.clone();
568        let payload = envelope.payload.clone();
569        let retry_config = self.config.retry_config().into();
570        let resume_snapshot = self.resume.fetch(&envelope)?;
571        let resume_flow_id = resume_snapshot
572            .as_ref()
573            .and_then(|snapshot| snapshot.next_flow.clone())
574            .or_else(|| {
575                resume_snapshot
576                    .as_ref()
577                    .map(|snapshot| snapshot.flow_id.clone())
578            });
579        let effective_flow_id = resume_flow_id.clone().unwrap_or_else(|| flow_id.clone());
580        let effective_pack_id = if let Some(snapshot) = resume_snapshot.as_ref() {
581            snapshot.pack_id.clone()
582        } else if let Some(pack_id) = envelope.pack_id.as_deref() {
583            let found = self
584                .engine
585                .flow_by_key(pack_id, effective_flow_id.as_str())
586                .is_some();
587            if !found {
588                return Err(RunnerError::AdapterCall {
589                    reason: format!(
590                        "flow {} not registered for pack {pack_id}",
591                        effective_flow_id
592                    ),
593                });
594            }
595            pack_id.to_string()
596        } else if let Some(flow) = self.engine.flow_by_id(effective_flow_id.as_str()) {
597            flow.pack_id.clone()
598        } else {
599            return Err(RunnerError::AdapterCall {
600                reason: format!(
601                    "flow {} is ambiguous; pack_id is required",
602                    effective_flow_id
603                ),
604            });
605        };
606
607        let trace_config = self.config.trace.clone();
608        let flow_version = self
609            .engine
610            .flow_by_key(effective_pack_id.as_str(), effective_flow_id.as_str())
611            .map(|desc| desc.version.clone())
612            .unwrap_or_else(|| "unknown".to_string());
613        let pack_trace = self
614            .pack_trace
615            .get(effective_pack_id.as_str())
616            .cloned()
617            .unwrap_or_else(|| PackTraceInfo {
618                pack_ref: effective_pack_id.clone(),
619                resolved_digest: None,
620            });
621        let trace_ctx = TraceContext {
622            pack_ref: pack_trace.pack_ref,
623            resolved_digest: pack_trace.resolved_digest,
624            flow_id: effective_flow_id.clone(),
625            flow_version,
626        };
627        let trace = if trace_config.mode == TraceMode::Off {
628            None
629        } else {
630            Some(TraceRecorder::new(trace_config, trace_ctx))
631        };
632
633        let mocks = self.mocks.as_deref();
634        let ctx = FlowContext {
635            tenant: &self.tenant,
636            pack_id: effective_pack_id.as_str(),
637            flow_id: effective_flow_id.as_str(),
638            node_id: None,
639            tool: None,
640            action: action_owned.as_deref(),
641            session_id: Some(session_owned.as_str()),
642            provider_id: provider_owned.as_deref(),
643            retry_config,
644            attempt: 1,
645            observer: trace
646                .as_ref()
647                .map(|recorder| recorder as &dyn crate::runner::engine::ExecutionObserver),
648            mocks,
649        };
650
651        let execution = if let Some(snapshot) = resume_snapshot {
652            let resume_pack_id = snapshot.pack_id.clone();
653            let resume_flow_id = snapshot
654                .next_flow
655                .clone()
656                .unwrap_or_else(|| snapshot.flow_id.clone());
657            let resume_ctx = FlowContext {
658                pack_id: resume_pack_id.as_str(),
659                flow_id: resume_flow_id.as_str(),
660                ..ctx
661            };
662            self.engine.resume(resume_ctx, snapshot, payload).await
663        } else {
664            self.engine.execute(ctx, payload).await
665        };
666        let execution = match execution {
667            Ok(execution) => {
668                if let Some(recorder) = trace.as_ref()
669                    && let Err(err) = recorder.flush_success()
670                {
671                    tracing::warn!(error = %err, "failed to write trace");
672                }
673                execution
674            }
675            Err(err) => {
676                if let Some(recorder) = trace.as_ref()
677                    && let Err(write_err) = recorder.flush_error(err.as_ref())
678                {
679                    tracing::warn!(error = %write_err, "failed to write trace");
680                }
681                return Err(RunnerError::AdapterCall {
682                    reason: err.to_string(),
683                });
684            }
685        };
686
687        match execution.status {
688            FlowStatus::Completed => {
689                self.resume.clear(&envelope)?;
690                Ok(execution.output)
691            }
692            FlowStatus::Waiting(wait) => {
693                let reply_scope = self.resume.save(&envelope, &wait)?;
694                Ok(json!({
695                    "status": "pending",
696                    "reason": wait.reason,
697                    "resume": wait.snapshot,
698                    "reply_scope": reply_scope,
699                    "response": execution.output,
700                }))
701            }
702        }
703    }
704}
705
706#[derive(Clone, Debug, Serialize, Deserialize)]
707pub struct IngressEnvelope {
708    pub tenant: String,
709    #[serde(default, skip_serializing_if = "Option::is_none")]
710    pub env: Option<String>,
711    #[serde(default, skip_serializing_if = "Option::is_none")]
712    pub pack_id: Option<String>,
713    pub flow_id: String,
714    #[serde(default, skip_serializing_if = "Option::is_none")]
715    pub flow_type: Option<String>,
716    #[serde(default, skip_serializing_if = "Option::is_none")]
717    pub action: Option<String>,
718    #[serde(default, skip_serializing_if = "Option::is_none")]
719    pub session_hint: Option<String>,
720    #[serde(default, skip_serializing_if = "Option::is_none")]
721    pub provider: Option<String>,
722    #[serde(default, skip_serializing_if = "Option::is_none")]
723    pub channel: Option<String>,
724    #[serde(default, skip_serializing_if = "Option::is_none")]
725    pub conversation: Option<String>,
726    #[serde(default, skip_serializing_if = "Option::is_none")]
727    pub user: Option<String>,
728    #[serde(default, skip_serializing_if = "Option::is_none")]
729    pub activity_id: Option<String>,
730    #[serde(default, skip_serializing_if = "Option::is_none")]
731    pub timestamp: Option<String>,
732    #[serde(default)]
733    pub payload: Value,
734    #[serde(default, skip_serializing_if = "Option::is_none")]
735    pub metadata: Option<Value>,
736    #[serde(default, skip_serializing_if = "Option::is_none")]
737    pub reply_scope: Option<ReplyScope>,
738}
739
740impl IngressEnvelope {
741    pub fn canonicalize(mut self) -> Self {
742        if self.provider.is_none() {
743            self.provider = Some("provider".into());
744        }
745        if self.channel.is_none() {
746            self.channel = Some(self.flow_id.clone());
747        }
748        if self.conversation.is_none() {
749            self.conversation = self.channel.clone();
750        }
751        if self.user.is_none() {
752            if let Some(ref hint) = self.session_hint {
753                self.user = Some(hint.clone());
754            } else if let Some(ref activity) = self.activity_id {
755                self.user = Some(activity.clone());
756            } else {
757                self.user = Some("user".into());
758            }
759        }
760        if self.session_hint.is_none() {
761            self.session_hint = Some(self.canonical_session_hint());
762        }
763        if self.reply_scope.is_none()
764            && let Some(conversation) = self.conversation.clone()
765        {
766            self.reply_scope = Some(ReplyScope {
767                conversation,
768                thread: None,
769                reply_to: None,
770                correlation: None,
771            });
772        }
773        self
774    }
775
776    pub fn canonical_session_hint(&self) -> String {
777        format!(
778            "{}:{}:{}:{}:{}",
779            self.tenant,
780            self.provider.as_deref().unwrap_or("provider"),
781            self.channel.as_deref().unwrap_or("channel"),
782            self.conversation.as_deref().unwrap_or("conversation"),
783            self.user.as_deref().unwrap_or("user")
784        )
785    }
786
787    pub fn tenant_ctx(&self) -> TenantCtx {
788        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
789        let env = EnvId::from_str(env_raw.as_str())
790            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
791        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
792            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
793        });
794        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
795        if let Some(provider) = &self.provider {
796            ctx = ctx.with_provider(provider.clone());
797        }
798        if let Some(session) = &self.session_hint {
799            ctx = ctx.with_session(session.clone());
800        }
801        ctx
802    }
803}