Skip to main content

greentic_runner_host/engine/
runtime.rs

1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use greentic_session::{SessionData, SessionKey as StoreSessionKey};
8use greentic_types::{
9    EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
10    TenantCtx, TenantId, UserId,
11};
12use rand::{Rng, rng};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use super::api::{RunFlowRequest, RunnerApi};
18use super::builder::{Runner, RunnerBuilder};
19use super::error::{GResult, RunnerError};
20use super::glue::{FnSecretsHost, FnTelemetryHost};
21use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
22use super::policy::Policy;
23use super::registry::{Adapter, AdapterCall, AdapterRegistry};
24use super::shims::{InMemorySessionHost, InMemoryStateHost};
25use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
26
27use crate::config::{HostConfig, SecretsPolicy};
28use crate::pack::FlowDescriptor;
29use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, scoped_secret_path};
32use crate::storage::session::DynSessionStore;
33use crate::trace::{PackTraceInfo, TraceContext, TraceMode, TraceRecorder};
34
35const DEFAULT_ENV: &str = "local";
36const PACK_FLOW_ADAPTER: &str = "pack_flow";
37
38#[derive(Clone)]
39pub struct FlowResumeStore {
40    store: DynSessionStore,
41}
42
43impl FlowResumeStore {
44    pub fn new(store: DynSessionStore) -> Self {
45        Self { store }
46    }
47
48    pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
49        let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
50        ctx = ctx.with_user(Some(user.clone()));
51
52        let mut scopes = vec![scope.clone()];
53        if scope.correlation.is_some() {
54            let mut base = scope.clone();
55            base.correlation = None;
56            scopes.push(base);
57        }
58
59        for lookup in scopes {
60            if let Some(key) = self
61                .store
62                .find_wait_by_scope(&ctx, &user, &lookup)
63                .map_err(map_store_error)?
64            {
65                let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
66                    continue;
67                };
68                let record: FlowResumeRecord =
69                    serde_json::from_str(&data.context_json).map_err(|err| {
70                        RunnerError::Session {
71                            reason: format!("failed to decode flow resume snapshot: {err}"),
72                        }
73                    })?;
74                if record.snapshot.flow_id == envelope.flow_id {
75                    if let Some(pack_id) = envelope.pack_id.as_deref()
76                        && record.snapshot.pack_id != pack_id
77                    {
78                        return Err(RunnerError::Session {
79                            reason: format!(
80                                "resume pack mismatch: expected {pack_id}, found {}",
81                                record.snapshot.pack_id
82                            ),
83                        });
84                    }
85                    return Ok(Some(record.snapshot));
86                }
87            }
88        }
89
90        Ok(None)
91    }
92
93    pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
94        let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
95        let record = FlowResumeRecord {
96            snapshot: wait.snapshot.clone(),
97            reason: wait.reason.clone(),
98        };
99        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
100        let mut reply_scope = scope.clone();
101        if reply_scope.correlation.is_none() {
102            reply_scope.correlation = Some(generate_correlation_id());
103        }
104        let mut store_scope = scope;
105        store_scope.correlation = None;
106        let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
107        self.store
108            .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
109            .map_err(map_store_error)?;
110        Ok(reply_scope)
111    }
112
113    pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
114        let (ctx, user, _, scope) = build_store_ctx(envelope)?;
115        let mut scopes = vec![scope.clone()];
116        if scope.correlation.is_some() {
117            let mut base = scope;
118            base.correlation = None;
119            scopes.push(base);
120        }
121        for lookup in scopes {
122            self.store
123                .clear_wait(&ctx, &user, &lookup)
124                .map_err(map_store_error)?;
125        }
126        Ok(())
127    }
128}
129
130#[derive(Serialize, Deserialize)]
131struct FlowResumeRecord {
132    snapshot: FlowSnapshot,
133    #[serde(default)]
134    reason: Option<String>,
135}
136
137fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
138    let base_hint = envelope
139        .session_hint
140        .clone()
141        .unwrap_or_else(|| envelope.canonical_session_hint());
142    let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
143        format!("{base_hint}::pack={pack_id}")
144    } else {
145        base_hint.clone()
146    };
147    let user = derive_user_id(&hint)?;
148    let scope = envelope
149        .reply_scope
150        .clone()
151        .ok_or_else(|| RunnerError::Session {
152            reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
153                .to_string(),
154        })?;
155    let mut ctx = envelope.tenant_ctx();
156    ctx = ctx.with_session(hint.clone());
157    ctx = ctx.with_user(Some(user.clone()));
158    Ok((ctx, user, hint, scope))
159}
160
161fn record_to_session_data(
162    record: &FlowResumeRecord,
163    ctx: TenantCtx,
164    user: &UserId,
165    session_hint: &str,
166) -> GResult<SessionData> {
167    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
168    let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
169    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
170    if let Some(reason) = record.reason.clone() {
171        cursor = cursor.with_wait_reason(reason);
172    }
173    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
174        reason: format!("failed to encode flow resume snapshot: {err}"),
175    })?;
176    let ctx = ctx
177        .with_user(Some(user.clone()))
178        .with_session(session_hint.to_string())
179        .with_flow(record.snapshot.flow_id.clone());
180    Ok(SessionData {
181        tenant_ctx: ctx,
182        flow_id: flow,
183        pack_id: Some(pack),
184        cursor,
185        context_json,
186    })
187}
188
189fn derive_user_id(hint: &str) -> GResult<UserId> {
190    let digest = Sha256::digest(hint.as_bytes());
191    let slug = format!("sess{}", hex::encode(&digest[..8]));
192    UserId::from_str(&slug).map_err(map_store_error)
193}
194
195fn map_store_error(err: GreenticError) -> RunnerError {
196    RunnerError::Session {
197        reason: err.to_string(),
198    }
199}
200
201fn generate_correlation_id() -> String {
202    let mut bytes = [0u8; 16];
203    rng().fill(&mut bytes);
204    hex::encode(bytes)
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use crate::runner::engine::ExecutionState;
211    use crate::storage::session::new_session_store;
212    use serde_json::json;
213
214    fn sample_envelope() -> IngressEnvelope {
215        IngressEnvelope {
216            tenant: "demo".into(),
217            env: Some("local".into()),
218            pack_id: Some("pack.demo".into()),
219            flow_id: "flow.main".into(),
220            flow_type: None,
221            action: Some("messaging".into()),
222            session_hint: Some("demo:provider:chan:conv:user".into()),
223            provider: Some("provider".into()),
224            channel: Some("chan".into()),
225            conversation: Some("conv".into()),
226            user: Some("user".into()),
227            activity_id: Some("act-1".into()),
228            timestamp: None,
229            payload: json!({ "text": "hi" }),
230            metadata: None,
231            reply_scope: Some(ReplyScope {
232                conversation: "conv".into(),
233                thread: None,
234                reply_to: None,
235                correlation: None,
236            }),
237        }
238    }
239
240    fn sample_wait() -> FlowWait {
241        let state: ExecutionState = serde_json::from_value(json!({
242            "input": { "text": "hi" },
243            "nodes": {},
244            "egress": []
245        }))
246        .expect("state");
247        FlowWait {
248            reason: Some("await-user".into()),
249            snapshot: FlowSnapshot {
250                pack_id: "pack.demo".into(),
251                flow_id: "flow.main".into(),
252                next_node: "node-2".into(),
253                state,
254            },
255        }
256    }
257
258    #[test]
259    fn derive_user_id_is_stable() {
260        let hint = "some-tenant::session-key";
261        let a = derive_user_id(hint).unwrap();
262        let b = derive_user_id(hint).unwrap();
263        assert_eq!(a, b);
264        assert!(a.as_str().starts_with("sess"));
265    }
266
267    #[test]
268    fn resume_store_roundtrip() -> GResult<()> {
269        let store = FlowResumeStore::new(new_session_store());
270        let envelope = sample_envelope();
271        assert!(store.fetch(&envelope)?.is_none());
272
273        let wait = sample_wait();
274        let _ = store.save(&envelope, &wait)?;
275        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
276        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
277        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
278
279        store.clear(&envelope)?;
280        assert!(store.fetch(&envelope)?.is_none());
281        Ok(())
282    }
283
284    #[test]
285    fn resume_store_overwrites_existing() -> GResult<()> {
286        let store = FlowResumeStore::new(new_session_store());
287        let envelope = sample_envelope();
288        let mut wait = sample_wait();
289        let _ = store.save(&envelope, &wait)?;
290
291        wait.snapshot.next_node = "node-3".into();
292        wait.reason = Some("retry".into());
293        let _ = store.save(&envelope, &wait)?;
294
295        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
296        assert_eq!(snapshot.next_node, "node-3");
297        store.clear(&envelope)?;
298        Ok(())
299    }
300
301    #[test]
302    fn canonicalize_populates_defaults() {
303        let envelope = IngressEnvelope {
304            tenant: "demo".into(),
305            env: None,
306            pack_id: None,
307            flow_id: "flow.main".into(),
308            flow_type: None,
309            action: None,
310            session_hint: None,
311            provider: None,
312            channel: None,
313            conversation: None,
314            user: None,
315            activity_id: Some("activity-1".into()),
316            timestamp: None,
317            payload: json!({}),
318            metadata: None,
319            reply_scope: None,
320        }
321        .canonicalize();
322
323        assert_eq!(envelope.provider.as_deref(), Some("provider"));
324        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
325        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
326        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
327        assert!(envelope.session_hint.is_some());
328    }
329}
330
331pub struct StateMachineRuntime {
332    runner: Runner,
333}
334
335impl StateMachineRuntime {
336    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
337    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
338        let secrets = Arc::new(FnSecretsHost::new(|name| {
339            Err(RunnerError::Secrets {
340                reason: format!("secret {name} unavailable (noop host)"),
341            })
342        }));
343        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
344        let session = Arc::new(InMemorySessionHost::new());
345        let state = Arc::new(InMemoryStateHost::new());
346        let host = HostBundle::new(secrets, telemetry, session, state);
347
348        let adapters = AdapterRegistry::default();
349        let policy = Policy::default();
350
351        let mut builder = RunnerBuilder::new()
352            .with_host(host)
353            .with_adapters(adapters)
354            .with_policy(policy);
355        for flow in flows {
356            builder = builder.with_flow(flow);
357        }
358        let runner = builder.build()?;
359        Ok(Self { runner })
360    }
361
362    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
363    #[allow(clippy::too_many_arguments)]
364    pub fn from_flow_engine(
365        config: Arc<HostConfig>,
366        engine: Arc<FlowEngine>,
367        pack_trace: HashMap<String, PackTraceInfo>,
368        session_host: Arc<dyn SessionHost>,
369        session_store: DynSessionStore,
370        state_host: Arc<dyn StateHost>,
371        secrets_manager: DynSecretsManager,
372        mocks: Option<Arc<MockLayer>>,
373    ) -> Result<Self> {
374        let policy = Arc::new(config.secrets_policy.clone());
375        let tenant_ctx = config.tenant_ctx();
376        let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
377        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
378            tracing::debug!(?span, ?fields, "telemetry emit");
379            Ok(())
380        }));
381        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
382        let resume_store = FlowResumeStore::new(session_store);
383
384        let mut adapters = AdapterRegistry::default();
385        adapters.register(
386            PACK_FLOW_ADAPTER,
387            Box::new(PackFlowAdapter::new(
388                Arc::clone(&config),
389                Arc::clone(&engine),
390                pack_trace,
391                resume_store,
392                mocks,
393            )),
394        );
395
396        let flows = build_flow_definitions(engine.flows());
397        let mut builder = RunnerBuilder::new()
398            .with_host(host)
399            .with_adapters(adapters)
400            .with_policy(Policy::default());
401        for flow in flows {
402            builder = builder.with_flow(flow);
403        }
404        let runner = builder
405            .build()
406            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
407        Ok(Self { runner })
408    }
409
410    /// Execute the flow associated with the provided ingress event.
411    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
412        let tenant_ctx = envelope.tenant_ctx();
413        let session_hint = envelope
414            .session_hint
415            .clone()
416            .unwrap_or_else(|| envelope.canonical_session_hint());
417        let pack_id = envelope.pack_id.clone().ok_or_else(|| {
418            anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
419        })?;
420        let input =
421            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
422        let request = RunFlowRequest {
423            tenant: tenant_ctx,
424            pack_id,
425            flow_id: envelope.flow_id.clone(),
426            input,
427            session_hint: Some(session_hint),
428        };
429        let result: super::api::RunFlowResult = self
430            .runner
431            .run_flow(request)
432            .await
433            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
434        let outcome = result.outcome;
435        Ok(outcome.get("response").cloned().unwrap_or(outcome))
436    }
437}
438
439struct PolicySecretsHost {
440    policy: Arc<SecretsPolicy>,
441    manager: DynSecretsManager,
442    tenant_ctx: TenantCtx,
443}
444
445impl PolicySecretsHost {
446    fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
447        Self {
448            policy,
449            manager,
450            tenant_ctx,
451        }
452    }
453}
454
455#[async_trait]
456impl SecretsHost for PolicySecretsHost {
457    async fn get(&self, name: &str) -> GResult<String> {
458        if !self.policy.is_allowed(name) {
459            return Err(RunnerError::Secrets {
460                reason: format!("secret {name} denied by policy"),
461            });
462        }
463        let scoped_key =
464            scoped_secret_path(&self.tenant_ctx, name).map_err(|err| RunnerError::Secrets {
465                reason: format!("secret {name} scope invalid: {err}"),
466            })?;
467        let bytes = self
468            .manager
469            .read(scoped_key.as_str())
470            .await
471            .map_err(|err| RunnerError::Secrets {
472                reason: format!("secret {name} unavailable: {err}"),
473            })?;
474        String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
475            reason: format!("secret {name} not valid UTF-8: {err}"),
476        })
477    }
478}
479
480fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
481    flows
482        .iter()
483        .map(|descriptor| {
484            FlowDefinition::new(
485                super::api::FlowSummary {
486                    pack_id: descriptor.pack_id.clone(),
487                    id: descriptor.id.clone(),
488                    name: descriptor
489                        .description
490                        .clone()
491                        .unwrap_or_else(|| descriptor.id.clone()),
492                    version: descriptor.version.clone(),
493                    description: descriptor.description.clone(),
494                },
495                serde_json::json!({
496                    "type": "object"
497                }),
498                vec![FlowStep::Adapter(AdapterCall {
499                    adapter: PACK_FLOW_ADAPTER.into(),
500                    operation: descriptor.id.clone(),
501                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
502                })],
503            )
504        })
505        .collect()
506}
507
508struct PackFlowAdapter {
509    tenant: String,
510    config: Arc<HostConfig>,
511    engine: Arc<FlowEngine>,
512    pack_trace: HashMap<String, PackTraceInfo>,
513    resume: FlowResumeStore,
514    mocks: Option<Arc<MockLayer>>,
515}
516
517impl PackFlowAdapter {
518    fn new(
519        config: Arc<HostConfig>,
520        engine: Arc<FlowEngine>,
521        pack_trace: HashMap<String, PackTraceInfo>,
522        resume: FlowResumeStore,
523        mocks: Option<Arc<MockLayer>>,
524    ) -> Self {
525        Self {
526            tenant: config.tenant.clone(),
527            config,
528            engine,
529            pack_trace,
530            resume,
531            mocks,
532        }
533    }
534}
535
536#[async_trait::async_trait]
537impl Adapter for PackFlowAdapter {
538    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
539        let envelope: IngressEnvelope =
540            serde_json::from_value(call.payload.clone()).map_err(|err| {
541                RunnerError::AdapterCall {
542                    reason: format!("invalid ingress payload: {err}"),
543                }
544            })?;
545        let envelope = envelope.canonicalize();
546        let flow_id = call.operation.clone();
547        let action_owned = envelope.action.clone();
548        let session_owned = envelope
549            .session_hint
550            .clone()
551            .unwrap_or_else(|| envelope.canonical_session_hint());
552        let provider_owned = envelope.provider.clone();
553        let payload = envelope.payload.clone();
554        let retry_config = self.config.retry_config().into();
555
556        let pack_id = if let Some(pack_id) = envelope.pack_id.as_deref() {
557            let found = self.engine.flow_by_key(pack_id, &flow_id).is_some();
558            if !found {
559                return Err(RunnerError::AdapterCall {
560                    reason: format!("flow {flow_id} not registered for pack {pack_id}"),
561                });
562            }
563            pack_id
564        } else if let Some(flow) = self.engine.flow_by_id(&flow_id) {
565            flow.pack_id.as_str()
566        } else {
567            return Err(RunnerError::AdapterCall {
568                reason: format!("flow {flow_id} is ambiguous; pack_id is required"),
569            });
570        };
571
572        let trace_config = self.config.trace.clone();
573        let flow_version = self
574            .engine
575            .flow_by_key(pack_id, &flow_id)
576            .map(|desc| desc.version.clone())
577            .unwrap_or_else(|| "unknown".to_string());
578        let pack_trace = self
579            .pack_trace
580            .get(pack_id)
581            .cloned()
582            .unwrap_or_else(|| PackTraceInfo {
583                pack_ref: pack_id.to_string(),
584                resolved_digest: None,
585            });
586        let trace_ctx = TraceContext {
587            pack_ref: pack_trace.pack_ref,
588            resolved_digest: pack_trace.resolved_digest,
589            flow_id: flow_id.clone(),
590            flow_version,
591        };
592        let trace = if trace_config.mode == TraceMode::Off {
593            None
594        } else {
595            Some(TraceRecorder::new(trace_config, trace_ctx))
596        };
597
598        let mocks = self.mocks.as_deref();
599        let ctx = FlowContext {
600            tenant: &self.tenant,
601            pack_id,
602            flow_id: &flow_id,
603            node_id: None,
604            tool: None,
605            action: action_owned.as_deref(),
606            session_id: Some(session_owned.as_str()),
607            provider_id: provider_owned.as_deref(),
608            retry_config,
609            attempt: 1,
610            observer: trace
611                .as_ref()
612                .map(|recorder| recorder as &dyn crate::runner::engine::ExecutionObserver),
613            mocks,
614        };
615
616        let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
617            let resume_pack_id = snapshot.pack_id.clone();
618            let resume_ctx = FlowContext {
619                pack_id: resume_pack_id.as_str(),
620                ..ctx
621            };
622            self.engine.resume(resume_ctx, snapshot, payload).await
623        } else {
624            self.engine.execute(ctx, payload).await
625        };
626        let execution = match execution {
627            Ok(execution) => {
628                if let Some(recorder) = trace.as_ref()
629                    && let Err(err) = recorder.flush_success()
630                {
631                    tracing::warn!(error = %err, "failed to write trace");
632                }
633                execution
634            }
635            Err(err) => {
636                if let Some(recorder) = trace.as_ref()
637                    && let Err(write_err) = recorder.flush_error(err.as_ref())
638                {
639                    tracing::warn!(error = %write_err, "failed to write trace");
640                }
641                return Err(RunnerError::AdapterCall {
642                    reason: err.to_string(),
643                });
644            }
645        };
646
647        match execution.status {
648            FlowStatus::Completed => {
649                self.resume.clear(&envelope)?;
650                Ok(execution.output)
651            }
652            FlowStatus::Waiting(wait) => {
653                let reply_scope = self.resume.save(&envelope, &wait)?;
654                Ok(json!({
655                    "status": "pending",
656                    "reason": wait.reason,
657                    "resume": wait.snapshot,
658                    "reply_scope": reply_scope,
659                    "response": execution.output,
660                }))
661            }
662        }
663    }
664}
665
666#[derive(Clone, Debug, Serialize, Deserialize)]
667pub struct IngressEnvelope {
668    pub tenant: String,
669    #[serde(default, skip_serializing_if = "Option::is_none")]
670    pub env: Option<String>,
671    #[serde(default, skip_serializing_if = "Option::is_none")]
672    pub pack_id: Option<String>,
673    pub flow_id: String,
674    #[serde(default, skip_serializing_if = "Option::is_none")]
675    pub flow_type: Option<String>,
676    #[serde(default, skip_serializing_if = "Option::is_none")]
677    pub action: Option<String>,
678    #[serde(default, skip_serializing_if = "Option::is_none")]
679    pub session_hint: Option<String>,
680    #[serde(default, skip_serializing_if = "Option::is_none")]
681    pub provider: Option<String>,
682    #[serde(default, skip_serializing_if = "Option::is_none")]
683    pub channel: Option<String>,
684    #[serde(default, skip_serializing_if = "Option::is_none")]
685    pub conversation: Option<String>,
686    #[serde(default, skip_serializing_if = "Option::is_none")]
687    pub user: Option<String>,
688    #[serde(default, skip_serializing_if = "Option::is_none")]
689    pub activity_id: Option<String>,
690    #[serde(default, skip_serializing_if = "Option::is_none")]
691    pub timestamp: Option<String>,
692    #[serde(default)]
693    pub payload: Value,
694    #[serde(default, skip_serializing_if = "Option::is_none")]
695    pub metadata: Option<Value>,
696    #[serde(default, skip_serializing_if = "Option::is_none")]
697    pub reply_scope: Option<ReplyScope>,
698}
699
700impl IngressEnvelope {
701    pub fn canonicalize(mut self) -> Self {
702        if self.provider.is_none() {
703            self.provider = Some("provider".into());
704        }
705        if self.channel.is_none() {
706            self.channel = Some(self.flow_id.clone());
707        }
708        if self.conversation.is_none() {
709            self.conversation = self.channel.clone();
710        }
711        if self.user.is_none() {
712            if let Some(ref hint) = self.session_hint {
713                self.user = Some(hint.clone());
714            } else if let Some(ref activity) = self.activity_id {
715                self.user = Some(activity.clone());
716            } else {
717                self.user = Some("user".into());
718            }
719        }
720        if self.session_hint.is_none() {
721            self.session_hint = Some(self.canonical_session_hint());
722        }
723        if self.reply_scope.is_none()
724            && let Some(conversation) = self.conversation.clone()
725        {
726            self.reply_scope = Some(ReplyScope {
727                conversation,
728                thread: None,
729                reply_to: None,
730                correlation: None,
731            });
732        }
733        self
734    }
735
736    pub fn canonical_session_hint(&self) -> String {
737        format!(
738            "{}:{}:{}:{}:{}",
739            self.tenant,
740            self.provider.as_deref().unwrap_or("provider"),
741            self.channel.as_deref().unwrap_or("channel"),
742            self.conversation.as_deref().unwrap_or("conversation"),
743            self.user.as_deref().unwrap_or("user")
744        )
745    }
746
747    pub fn tenant_ctx(&self) -> TenantCtx {
748        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
749        let env = EnvId::from_str(env_raw.as_str())
750            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
751        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
752            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
753        });
754        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
755        if let Some(provider) = &self.provider {
756            ctx = ctx.with_provider(provider.clone());
757        }
758        if let Some(session) = &self.session_hint {
759            ctx = ctx.with_session(session.clone());
760        }
761        ctx
762    }
763}