greentic_runner_host/engine/
runtime.rs

1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::{SessionData, SessionKey as StoreSessionKey};
7use greentic_types::{
8    EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
9    TenantCtx, TenantId, UserId,
10};
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use sha2::{Digest, Sha256};
15
16use super::api::{RunFlowRequest, RunnerApi};
17use super::builder::{Runner, RunnerBuilder};
18use super::error::{GResult, RunnerError};
19use super::glue::{FnSecretsHost, FnTelemetryHost};
20use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
21use super::policy::Policy;
22use super::registry::{Adapter, AdapterCall, AdapterRegistry};
23use super::shims::{InMemorySessionHost, InMemoryStateHost};
24use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
25
26use crate::config::{HostConfig, SecretsPolicy};
27use crate::pack::FlowDescriptor;
28use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
29use crate::runner::mocks::MockLayer;
30use crate::secrets::{DynSecretsManager, scoped_secret_path};
31use crate::storage::session::DynSessionStore;
32
33const DEFAULT_ENV: &str = "local";
34const PACK_FLOW_ADAPTER: &str = "pack_flow";
35
36#[derive(Clone)]
37pub struct FlowResumeStore {
38    store: DynSessionStore,
39}
40
41impl FlowResumeStore {
42    pub fn new(store: DynSessionStore) -> Self {
43        Self { store }
44    }
45
46    pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
47        let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
48        ctx = ctx.with_user(Some(user.clone()));
49
50        let mut scopes = vec![scope.clone()];
51        if scope.correlation.is_some() {
52            let mut base = scope.clone();
53            base.correlation = None;
54            scopes.push(base);
55        }
56
57        for lookup in scopes {
58            if let Some(key) = self
59                .store
60                .find_wait_by_scope(&ctx, &user, &lookup)
61                .map_err(map_store_error)?
62            {
63                let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
64                    continue;
65                };
66                let record: FlowResumeRecord =
67                    serde_json::from_str(&data.context_json).map_err(|err| {
68                        RunnerError::Session {
69                            reason: format!("failed to decode flow resume snapshot: {err}"),
70                        }
71                    })?;
72                if record.snapshot.flow_id == envelope.flow_id {
73                    if let Some(pack_id) = envelope.pack_id.as_deref()
74                        && record.snapshot.pack_id != pack_id
75                    {
76                        return Err(RunnerError::Session {
77                            reason: format!(
78                                "resume pack mismatch: expected {pack_id}, found {}",
79                                record.snapshot.pack_id
80                            ),
81                        });
82                    }
83                    return Ok(Some(record.snapshot));
84                }
85            }
86        }
87
88        Ok(None)
89    }
90
91    pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
92        let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
93        let record = FlowResumeRecord {
94            snapshot: wait.snapshot.clone(),
95            reason: wait.reason.clone(),
96        };
97        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
98        let mut reply_scope = scope.clone();
99        if reply_scope.correlation.is_none() {
100            reply_scope.correlation = Some(generate_correlation_id());
101        }
102        let mut store_scope = scope;
103        store_scope.correlation = None;
104        let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
105        self.store
106            .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
107            .map_err(map_store_error)?;
108        Ok(reply_scope)
109    }
110
111    pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
112        let (ctx, user, _, scope) = build_store_ctx(envelope)?;
113        let mut scopes = vec![scope.clone()];
114        if scope.correlation.is_some() {
115            let mut base = scope;
116            base.correlation = None;
117            scopes.push(base);
118        }
119        for lookup in scopes {
120            self.store
121                .clear_wait(&ctx, &user, &lookup)
122                .map_err(map_store_error)?;
123        }
124        Ok(())
125    }
126}
127
128#[derive(Serialize, Deserialize)]
129struct FlowResumeRecord {
130    snapshot: FlowSnapshot,
131    #[serde(default)]
132    reason: Option<String>,
133}
134
135fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
136    let base_hint = envelope
137        .session_hint
138        .clone()
139        .unwrap_or_else(|| envelope.canonical_session_hint());
140    let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
141        format!("{base_hint}::pack={pack_id}")
142    } else {
143        base_hint.clone()
144    };
145    let user = derive_user_id(&hint)?;
146    let scope = envelope
147        .reply_scope
148        .clone()
149        .ok_or_else(|| RunnerError::Session {
150            reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
151                .to_string(),
152        })?;
153    let mut ctx = envelope.tenant_ctx();
154    ctx = ctx.with_session(hint.clone());
155    ctx = ctx.with_user(Some(user.clone()));
156    Ok((ctx, user, hint, scope))
157}
158
159fn record_to_session_data(
160    record: &FlowResumeRecord,
161    ctx: TenantCtx,
162    user: &UserId,
163    session_hint: &str,
164) -> GResult<SessionData> {
165    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
166    let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
167    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
168    if let Some(reason) = record.reason.clone() {
169        cursor = cursor.with_wait_reason(reason);
170    }
171    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
172        reason: format!("failed to encode flow resume snapshot: {err}"),
173    })?;
174    let ctx = ctx
175        .with_user(Some(user.clone()))
176        .with_session(session_hint.to_string())
177        .with_flow(record.snapshot.flow_id.clone());
178    Ok(SessionData {
179        tenant_ctx: ctx,
180        flow_id: flow,
181        pack_id: Some(pack),
182        cursor,
183        context_json,
184    })
185}
186
187fn derive_user_id(hint: &str) -> GResult<UserId> {
188    let digest = Sha256::digest(hint.as_bytes());
189    let slug = format!("sess{}", hex::encode(&digest[..8]));
190    UserId::from_str(&slug).map_err(map_store_error)
191}
192
193fn map_store_error(err: GreenticError) -> RunnerError {
194    RunnerError::Session {
195        reason: err.to_string(),
196    }
197}
198
199fn generate_correlation_id() -> String {
200    let mut bytes = [0u8; 16];
201    rng().fill(&mut bytes);
202    hex::encode(bytes)
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::runner::engine::ExecutionState;
209    use crate::storage::session::new_session_store;
210    use serde_json::json;
211
212    fn sample_envelope() -> IngressEnvelope {
213        IngressEnvelope {
214            tenant: "demo".into(),
215            env: Some("local".into()),
216            pack_id: Some("pack.demo".into()),
217            flow_id: "flow.main".into(),
218            flow_type: None,
219            action: Some("messaging".into()),
220            session_hint: Some("demo:provider:chan:conv:user".into()),
221            provider: Some("provider".into()),
222            channel: Some("chan".into()),
223            conversation: Some("conv".into()),
224            user: Some("user".into()),
225            activity_id: Some("act-1".into()),
226            timestamp: None,
227            payload: json!({ "text": "hi" }),
228            metadata: None,
229            reply_scope: Some(ReplyScope {
230                conversation: "conv".into(),
231                thread: None,
232                reply_to: None,
233                correlation: None,
234            }),
235        }
236    }
237
238    fn sample_wait() -> FlowWait {
239        let state: ExecutionState = serde_json::from_value(json!({
240            "input": { "text": "hi" },
241            "nodes": {},
242            "egress": []
243        }))
244        .expect("state");
245        FlowWait {
246            reason: Some("await-user".into()),
247            snapshot: FlowSnapshot {
248                pack_id: "pack.demo".into(),
249                flow_id: "flow.main".into(),
250                next_node: "node-2".into(),
251                state,
252            },
253        }
254    }
255
256    #[test]
257    fn derive_user_id_is_stable() {
258        let hint = "some-tenant::session-key";
259        let a = derive_user_id(hint).unwrap();
260        let b = derive_user_id(hint).unwrap();
261        assert_eq!(a, b);
262        assert!(a.as_str().starts_with("sess"));
263    }
264
265    #[test]
266    fn resume_store_roundtrip() -> GResult<()> {
267        let store = FlowResumeStore::new(new_session_store());
268        let envelope = sample_envelope();
269        assert!(store.fetch(&envelope)?.is_none());
270
271        let wait = sample_wait();
272        let _ = store.save(&envelope, &wait)?;
273        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
274        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
275        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
276
277        store.clear(&envelope)?;
278        assert!(store.fetch(&envelope)?.is_none());
279        Ok(())
280    }
281
282    #[test]
283    fn resume_store_overwrites_existing() -> GResult<()> {
284        let store = FlowResumeStore::new(new_session_store());
285        let envelope = sample_envelope();
286        let mut wait = sample_wait();
287        let _ = store.save(&envelope, &wait)?;
288
289        wait.snapshot.next_node = "node-3".into();
290        wait.reason = Some("retry".into());
291        let _ = store.save(&envelope, &wait)?;
292
293        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
294        assert_eq!(snapshot.next_node, "node-3");
295        store.clear(&envelope)?;
296        Ok(())
297    }
298
299    #[test]
300    fn canonicalize_populates_defaults() {
301        let envelope = IngressEnvelope {
302            tenant: "demo".into(),
303            env: None,
304            pack_id: None,
305            flow_id: "flow.main".into(),
306            flow_type: None,
307            action: None,
308            session_hint: None,
309            provider: None,
310            channel: None,
311            conversation: None,
312            user: None,
313            activity_id: Some("activity-1".into()),
314            timestamp: None,
315            payload: json!({}),
316            metadata: None,
317            reply_scope: None,
318        }
319        .canonicalize();
320
321        assert_eq!(envelope.provider.as_deref(), Some("provider"));
322        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
323        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
324        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
325        assert!(envelope.session_hint.is_some());
326    }
327}
328
329pub struct StateMachineRuntime {
330    runner: Runner,
331}
332
333impl StateMachineRuntime {
334    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
335    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
336        let secrets = Arc::new(FnSecretsHost::new(|name| {
337            Err(RunnerError::Secrets {
338                reason: format!("secret {name} unavailable (noop host)"),
339            })
340        }));
341        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
342        let session = Arc::new(InMemorySessionHost::new());
343        let state = Arc::new(InMemoryStateHost::new());
344        let host = HostBundle::new(secrets, telemetry, session, state);
345
346        let adapters = AdapterRegistry::default();
347        let policy = Policy::default();
348
349        let mut builder = RunnerBuilder::new()
350            .with_host(host)
351            .with_adapters(adapters)
352            .with_policy(policy);
353        for flow in flows {
354            builder = builder.with_flow(flow);
355        }
356        let runner = builder.build()?;
357        Ok(Self { runner })
358    }
359
360    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
361    pub fn from_flow_engine(
362        config: Arc<HostConfig>,
363        engine: Arc<FlowEngine>,
364        session_host: Arc<dyn SessionHost>,
365        session_store: DynSessionStore,
366        state_host: Arc<dyn StateHost>,
367        secrets_manager: DynSecretsManager,
368        mocks: Option<Arc<MockLayer>>,
369    ) -> Result<Self> {
370        let policy = Arc::new(config.secrets_policy.clone());
371        let tenant_ctx = config.tenant_ctx();
372        let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
373        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
374            tracing::debug!(?span, ?fields, "telemetry emit");
375            Ok(())
376        }));
377        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
378        let resume_store = FlowResumeStore::new(session_store);
379
380        let mut adapters = AdapterRegistry::default();
381        adapters.register(
382            PACK_FLOW_ADAPTER,
383            Box::new(PackFlowAdapter::new(
384                Arc::clone(&config),
385                Arc::clone(&engine),
386                resume_store,
387                mocks,
388            )),
389        );
390
391        let flows = build_flow_definitions(engine.flows());
392        let mut builder = RunnerBuilder::new()
393            .with_host(host)
394            .with_adapters(adapters)
395            .with_policy(Policy::default());
396        for flow in flows {
397            builder = builder.with_flow(flow);
398        }
399        let runner = builder
400            .build()
401            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
402        Ok(Self { runner })
403    }
404
405    /// Execute the flow associated with the provided ingress event.
406    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
407        let tenant_ctx = envelope.tenant_ctx();
408        let session_hint = envelope
409            .session_hint
410            .clone()
411            .unwrap_or_else(|| envelope.canonical_session_hint());
412        let pack_id = envelope.pack_id.clone().ok_or_else(|| {
413            anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
414        })?;
415        let input =
416            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
417        let request = RunFlowRequest {
418            tenant: tenant_ctx,
419            pack_id,
420            flow_id: envelope.flow_id.clone(),
421            input,
422            session_hint: Some(session_hint),
423        };
424        let result: super::api::RunFlowResult = self
425            .runner
426            .run_flow(request)
427            .await
428            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
429        let outcome = result.outcome;
430        Ok(outcome.get("response").cloned().unwrap_or(outcome))
431    }
432}
433
434struct PolicySecretsHost {
435    policy: Arc<SecretsPolicy>,
436    manager: DynSecretsManager,
437    tenant_ctx: TenantCtx,
438}
439
440impl PolicySecretsHost {
441    fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
442        Self {
443            policy,
444            manager,
445            tenant_ctx,
446        }
447    }
448}
449
450#[async_trait]
451impl SecretsHost for PolicySecretsHost {
452    async fn get(&self, name: &str) -> GResult<String> {
453        if !self.policy.is_allowed(name) {
454            return Err(RunnerError::Secrets {
455                reason: format!("secret {name} denied by policy"),
456            });
457        }
458        let scoped_key =
459            scoped_secret_path(&self.tenant_ctx, name).map_err(|err| RunnerError::Secrets {
460                reason: format!("secret {name} scope invalid: {err}"),
461            })?;
462        let bytes = self
463            .manager
464            .read(scoped_key.as_str())
465            .await
466            .map_err(|err| RunnerError::Secrets {
467                reason: format!("secret {name} unavailable: {err}"),
468            })?;
469        String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
470            reason: format!("secret {name} not valid UTF-8: {err}"),
471        })
472    }
473}
474
475fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
476    flows
477        .iter()
478        .map(|descriptor| {
479            FlowDefinition::new(
480                super::api::FlowSummary {
481                    pack_id: descriptor.pack_id.clone(),
482                    id: descriptor.id.clone(),
483                    name: descriptor
484                        .description
485                        .clone()
486                        .unwrap_or_else(|| descriptor.id.clone()),
487                    version: descriptor.version.clone(),
488                    description: descriptor.description.clone(),
489                },
490                serde_json::json!({
491                    "type": "object"
492                }),
493                vec![FlowStep::Adapter(AdapterCall {
494                    adapter: PACK_FLOW_ADAPTER.into(),
495                    operation: descriptor.id.clone(),
496                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
497                })],
498            )
499        })
500        .collect()
501}
502
503struct PackFlowAdapter {
504    tenant: String,
505    config: Arc<HostConfig>,
506    engine: Arc<FlowEngine>,
507    resume: FlowResumeStore,
508    mocks: Option<Arc<MockLayer>>,
509}
510
511impl PackFlowAdapter {
512    fn new(
513        config: Arc<HostConfig>,
514        engine: Arc<FlowEngine>,
515        resume: FlowResumeStore,
516        mocks: Option<Arc<MockLayer>>,
517    ) -> Self {
518        Self {
519            tenant: config.tenant.clone(),
520            config,
521            engine,
522            resume,
523            mocks,
524        }
525    }
526}
527
528#[async_trait::async_trait]
529impl Adapter for PackFlowAdapter {
530    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
531        let envelope: IngressEnvelope =
532            serde_json::from_value(call.payload.clone()).map_err(|err| {
533                RunnerError::AdapterCall {
534                    reason: format!("invalid ingress payload: {err}"),
535                }
536            })?;
537        let envelope = envelope.canonicalize();
538        let flow_id = call.operation.clone();
539        let action_owned = envelope.action.clone();
540        let session_owned = envelope
541            .session_hint
542            .clone()
543            .unwrap_or_else(|| envelope.canonical_session_hint());
544        let provider_owned = envelope.provider.clone();
545        let payload = envelope.payload.clone();
546        let retry_config = self.config.retry_config().into();
547
548        let pack_id = if let Some(pack_id) = envelope.pack_id.as_deref() {
549            let found = self.engine.flow_by_key(pack_id, &flow_id).is_some();
550            if !found {
551                return Err(RunnerError::AdapterCall {
552                    reason: format!("flow {flow_id} not registered for pack {pack_id}"),
553                });
554            }
555            pack_id
556        } else if let Some(flow) = self.engine.flow_by_id(&flow_id) {
557            flow.pack_id.as_str()
558        } else {
559            return Err(RunnerError::AdapterCall {
560                reason: format!("flow {flow_id} is ambiguous; pack_id is required"),
561            });
562        };
563
564        let mocks = self.mocks.as_deref();
565        let ctx = FlowContext {
566            tenant: &self.tenant,
567            pack_id,
568            flow_id: &flow_id,
569            node_id: None,
570            tool: None,
571            action: action_owned.as_deref(),
572            session_id: Some(session_owned.as_str()),
573            provider_id: provider_owned.as_deref(),
574            retry_config,
575            observer: None,
576            mocks,
577        };
578
579        let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
580            let resume_pack_id = snapshot.pack_id.clone();
581            let resume_ctx = FlowContext {
582                pack_id: resume_pack_id.as_str(),
583                ..ctx
584            };
585            self.engine.resume(resume_ctx, snapshot, payload).await
586        } else {
587            self.engine.execute(ctx, payload).await
588        }
589        .map_err(|err| RunnerError::AdapterCall {
590            reason: err.to_string(),
591        })?;
592
593        match execution.status {
594            FlowStatus::Completed => {
595                self.resume.clear(&envelope)?;
596                Ok(execution.output)
597            }
598            FlowStatus::Waiting(wait) => {
599                let reply_scope = self.resume.save(&envelope, &wait)?;
600                Ok(json!({
601                    "status": "pending",
602                    "reason": wait.reason,
603                    "resume": wait.snapshot,
604                    "reply_scope": reply_scope,
605                    "response": execution.output,
606                }))
607            }
608        }
609    }
610}
611
612#[derive(Clone, Debug, Serialize, Deserialize)]
613pub struct IngressEnvelope {
614    pub tenant: String,
615    #[serde(default, skip_serializing_if = "Option::is_none")]
616    pub env: Option<String>,
617    #[serde(default, skip_serializing_if = "Option::is_none")]
618    pub pack_id: Option<String>,
619    pub flow_id: String,
620    #[serde(default, skip_serializing_if = "Option::is_none")]
621    pub flow_type: Option<String>,
622    #[serde(default, skip_serializing_if = "Option::is_none")]
623    pub action: Option<String>,
624    #[serde(default, skip_serializing_if = "Option::is_none")]
625    pub session_hint: Option<String>,
626    #[serde(default, skip_serializing_if = "Option::is_none")]
627    pub provider: Option<String>,
628    #[serde(default, skip_serializing_if = "Option::is_none")]
629    pub channel: Option<String>,
630    #[serde(default, skip_serializing_if = "Option::is_none")]
631    pub conversation: Option<String>,
632    #[serde(default, skip_serializing_if = "Option::is_none")]
633    pub user: Option<String>,
634    #[serde(default, skip_serializing_if = "Option::is_none")]
635    pub activity_id: Option<String>,
636    #[serde(default, skip_serializing_if = "Option::is_none")]
637    pub timestamp: Option<String>,
638    #[serde(default)]
639    pub payload: Value,
640    #[serde(default, skip_serializing_if = "Option::is_none")]
641    pub metadata: Option<Value>,
642    #[serde(default, skip_serializing_if = "Option::is_none")]
643    pub reply_scope: Option<ReplyScope>,
644}
645
646impl IngressEnvelope {
647    pub fn canonicalize(mut self) -> Self {
648        if self.provider.is_none() {
649            self.provider = Some("provider".into());
650        }
651        if self.channel.is_none() {
652            self.channel = Some(self.flow_id.clone());
653        }
654        if self.conversation.is_none() {
655            self.conversation = self.channel.clone();
656        }
657        if self.user.is_none() {
658            if let Some(ref hint) = self.session_hint {
659                self.user = Some(hint.clone());
660            } else if let Some(ref activity) = self.activity_id {
661                self.user = Some(activity.clone());
662            } else {
663                self.user = Some("user".into());
664            }
665        }
666        if self.session_hint.is_none() {
667            self.session_hint = Some(self.canonical_session_hint());
668        }
669        if self.reply_scope.is_none()
670            && let Some(conversation) = self.conversation.clone()
671        {
672            self.reply_scope = Some(ReplyScope {
673                conversation,
674                thread: None,
675                reply_to: None,
676                correlation: None,
677            });
678        }
679        self
680    }
681
682    pub fn canonical_session_hint(&self) -> String {
683        format!(
684            "{}:{}:{}:{}:{}",
685            self.tenant,
686            self.provider.as_deref().unwrap_or("provider"),
687            self.channel.as_deref().unwrap_or("channel"),
688            self.conversation.as_deref().unwrap_or("conversation"),
689            self.user.as_deref().unwrap_or("user")
690        )
691    }
692
693    pub fn tenant_ctx(&self) -> TenantCtx {
694        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
695        let env = EnvId::from_str(env_raw.as_str())
696            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
697        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
698            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
699        });
700        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
701        if let Some(provider) = &self.provider {
702            ctx = ctx.with_provider(provider.clone());
703        }
704        if let Some(session) = &self.session_hint {
705            ctx = ctx.with_session(session.clone());
706        }
707        ctx
708    }
709}