1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use greentic_session::{SessionData, SessionKey as StoreSessionKey};
8use greentic_types::{
9 EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
10 TenantCtx, TenantId, UserId,
11};
12use rand::{Rng, rng};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use super::api::{RunFlowRequest, RunnerApi};
18use super::builder::{Runner, RunnerBuilder};
19use super::error::{GResult, RunnerError};
20use super::glue::{FnSecretsHost, FnTelemetryHost};
21use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
22use super::policy::Policy;
23use super::registry::{Adapter, AdapterCall, AdapterRegistry};
24use super::shims::{InMemorySessionHost, InMemoryStateHost};
25use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
26
27use crate::config::{HostConfig, SecretsPolicy};
28use crate::pack::FlowDescriptor;
29use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, scoped_secret_path};
32use crate::storage::session::DynSessionStore;
33use crate::trace::{PackTraceInfo, TraceContext, TraceMode, TraceRecorder};
34
35const DEFAULT_ENV: &str = "local";
36const PACK_FLOW_ADAPTER: &str = "pack_flow";
37
38#[derive(Clone)]
39pub struct FlowResumeStore {
40 store: DynSessionStore,
41}
42
43impl FlowResumeStore {
44 pub fn new(store: DynSessionStore) -> Self {
45 Self { store }
46 }
47
48 pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
49 let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
50 ctx = ctx.with_user(Some(user.clone()));
51
52 let mut scopes = vec![scope.clone()];
53 if scope.correlation.is_some() {
54 let mut base = scope.clone();
55 base.correlation = None;
56 scopes.push(base);
57 }
58
59 for lookup in scopes {
60 if let Some(key) = self
61 .store
62 .find_wait_by_scope(&ctx, &user, &lookup)
63 .map_err(map_store_error)?
64 {
65 let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
66 continue;
67 };
68 let record: FlowResumeRecord =
69 serde_json::from_str(&data.context_json).map_err(|err| {
70 RunnerError::Session {
71 reason: format!("failed to decode flow resume snapshot: {err}"),
72 }
73 })?;
74 if record.snapshot.flow_id == envelope.flow_id {
75 if let Some(pack_id) = envelope.pack_id.as_deref()
76 && record.snapshot.pack_id != pack_id
77 {
78 return Err(RunnerError::Session {
79 reason: format!(
80 "resume pack mismatch: expected {pack_id}, found {}",
81 record.snapshot.pack_id
82 ),
83 });
84 }
85 return Ok(Some(record.snapshot));
86 }
87 }
88 }
89
90 Ok(None)
91 }
92
93 pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
94 let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
95 let record = FlowResumeRecord {
96 snapshot: wait.snapshot.clone(),
97 reason: wait.reason.clone(),
98 };
99 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
100 let mut reply_scope = scope.clone();
101 if reply_scope.correlation.is_none() {
102 reply_scope.correlation = Some(generate_correlation_id());
103 }
104 let mut store_scope = scope;
105 store_scope.correlation = None;
106 let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
107 self.store
108 .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
109 .map_err(map_store_error)?;
110 Ok(reply_scope)
111 }
112
113 pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
114 let (ctx, user, _, scope) = build_store_ctx(envelope)?;
115 let mut scopes = vec![scope.clone()];
116 if scope.correlation.is_some() {
117 let mut base = scope;
118 base.correlation = None;
119 scopes.push(base);
120 }
121 for lookup in scopes {
122 self.store
123 .clear_wait(&ctx, &user, &lookup)
124 .map_err(map_store_error)?;
125 }
126 Ok(())
127 }
128}
129
130#[derive(Serialize, Deserialize)]
131struct FlowResumeRecord {
132 snapshot: FlowSnapshot,
133 #[serde(default)]
134 reason: Option<String>,
135}
136
137fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
138 let base_hint = envelope
139 .session_hint
140 .clone()
141 .unwrap_or_else(|| envelope.canonical_session_hint());
142 let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
143 format!("{base_hint}::pack={pack_id}")
144 } else {
145 base_hint.clone()
146 };
147 let user = derive_user_id(&hint)?;
148 let scope = envelope
149 .reply_scope
150 .clone()
151 .ok_or_else(|| RunnerError::Session {
152 reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
153 .to_string(),
154 })?;
155 let mut ctx = envelope.tenant_ctx();
156 ctx = ctx.with_session(hint.clone());
157 ctx = ctx.with_user(Some(user.clone()));
158 Ok((ctx, user, hint, scope))
159}
160
161fn record_to_session_data(
162 record: &FlowResumeRecord,
163 ctx: TenantCtx,
164 user: &UserId,
165 session_hint: &str,
166) -> GResult<SessionData> {
167 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
168 let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
169 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
170 if let Some(reason) = record.reason.clone() {
171 cursor = cursor.with_wait_reason(reason);
172 }
173 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
174 reason: format!("failed to encode flow resume snapshot: {err}"),
175 })?;
176 let ctx = ctx
177 .with_user(Some(user.clone()))
178 .with_session(session_hint.to_string())
179 .with_flow(record.snapshot.flow_id.clone());
180 Ok(SessionData {
181 tenant_ctx: ctx,
182 flow_id: flow,
183 pack_id: Some(pack),
184 cursor,
185 context_json,
186 })
187}
188
189fn derive_user_id(hint: &str) -> GResult<UserId> {
190 let digest = Sha256::digest(hint.as_bytes());
191 let slug = format!("sess{}", hex::encode(&digest[..8]));
192 UserId::from_str(&slug).map_err(map_store_error)
193}
194
195fn map_store_error(err: GreenticError) -> RunnerError {
196 RunnerError::Session {
197 reason: err.to_string(),
198 }
199}
200
201fn generate_correlation_id() -> String {
202 let mut bytes = [0u8; 16];
203 rng().fill(&mut bytes);
204 hex::encode(bytes)
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use crate::runner::engine::ExecutionState;
211 use crate::storage::session::new_session_store;
212 use serde_json::json;
213
214 fn sample_envelope() -> IngressEnvelope {
215 IngressEnvelope {
216 tenant: "demo".into(),
217 env: Some("local".into()),
218 pack_id: Some("pack.demo".into()),
219 flow_id: "flow.main".into(),
220 flow_type: None,
221 action: Some("messaging".into()),
222 session_hint: Some("demo:provider:chan:conv:user".into()),
223 provider: Some("provider".into()),
224 channel: Some("chan".into()),
225 conversation: Some("conv".into()),
226 user: Some("user".into()),
227 activity_id: Some("act-1".into()),
228 timestamp: None,
229 payload: json!({ "text": "hi" }),
230 metadata: None,
231 reply_scope: Some(ReplyScope {
232 conversation: "conv".into(),
233 thread: None,
234 reply_to: None,
235 correlation: None,
236 }),
237 }
238 }
239
240 fn sample_wait() -> FlowWait {
241 let state: ExecutionState = serde_json::from_value(json!({
242 "input": { "text": "hi" },
243 "nodes": {},
244 "egress": []
245 }))
246 .expect("state");
247 FlowWait {
248 reason: Some("await-user".into()),
249 snapshot: FlowSnapshot {
250 pack_id: "pack.demo".into(),
251 flow_id: "flow.main".into(),
252 next_node: "node-2".into(),
253 state,
254 },
255 }
256 }
257
258 #[test]
259 fn derive_user_id_is_stable() {
260 let hint = "some-tenant::session-key";
261 let a = derive_user_id(hint).unwrap();
262 let b = derive_user_id(hint).unwrap();
263 assert_eq!(a, b);
264 assert!(a.as_str().starts_with("sess"));
265 }
266
267 #[test]
268 fn resume_store_roundtrip() -> GResult<()> {
269 let store = FlowResumeStore::new(new_session_store());
270 let envelope = sample_envelope();
271 assert!(store.fetch(&envelope)?.is_none());
272
273 let wait = sample_wait();
274 let _ = store.save(&envelope, &wait)?;
275 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
276 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
277 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
278
279 store.clear(&envelope)?;
280 assert!(store.fetch(&envelope)?.is_none());
281 Ok(())
282 }
283
284 #[test]
285 fn resume_store_overwrites_existing() -> GResult<()> {
286 let store = FlowResumeStore::new(new_session_store());
287 let envelope = sample_envelope();
288 let mut wait = sample_wait();
289 let _ = store.save(&envelope, &wait)?;
290
291 wait.snapshot.next_node = "node-3".into();
292 wait.reason = Some("retry".into());
293 let _ = store.save(&envelope, &wait)?;
294
295 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
296 assert_eq!(snapshot.next_node, "node-3");
297 store.clear(&envelope)?;
298 Ok(())
299 }
300
301 #[test]
302 fn canonicalize_populates_defaults() {
303 let envelope = IngressEnvelope {
304 tenant: "demo".into(),
305 env: None,
306 pack_id: None,
307 flow_id: "flow.main".into(),
308 flow_type: None,
309 action: None,
310 session_hint: None,
311 provider: None,
312 channel: None,
313 conversation: None,
314 user: None,
315 activity_id: Some("activity-1".into()),
316 timestamp: None,
317 payload: json!({}),
318 metadata: None,
319 reply_scope: None,
320 }
321 .canonicalize();
322
323 assert_eq!(envelope.provider.as_deref(), Some("provider"));
324 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
325 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
326 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
327 assert!(envelope.session_hint.is_some());
328 }
329}
330
331pub struct StateMachineRuntime {
332 runner: Runner,
333}
334
335impl StateMachineRuntime {
336 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
338 let secrets = Arc::new(FnSecretsHost::new(|name| {
339 Err(RunnerError::Secrets {
340 reason: format!("secret {name} unavailable (noop host)"),
341 })
342 }));
343 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
344 let session = Arc::new(InMemorySessionHost::new());
345 let state = Arc::new(InMemoryStateHost::new());
346 let host = HostBundle::new(secrets, telemetry, session, state);
347
348 let adapters = AdapterRegistry::default();
349 let policy = Policy::default();
350
351 let mut builder = RunnerBuilder::new()
352 .with_host(host)
353 .with_adapters(adapters)
354 .with_policy(policy);
355 for flow in flows {
356 builder = builder.with_flow(flow);
357 }
358 let runner = builder.build()?;
359 Ok(Self { runner })
360 }
361
362 #[allow(clippy::too_many_arguments)]
364 pub fn from_flow_engine(
365 config: Arc<HostConfig>,
366 engine: Arc<FlowEngine>,
367 pack_trace: HashMap<String, PackTraceInfo>,
368 session_host: Arc<dyn SessionHost>,
369 session_store: DynSessionStore,
370 state_host: Arc<dyn StateHost>,
371 secrets_manager: DynSecretsManager,
372 mocks: Option<Arc<MockLayer>>,
373 ) -> Result<Self> {
374 let policy = Arc::new(config.secrets_policy.clone());
375 let tenant_ctx = config.tenant_ctx();
376 let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
377 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
378 tracing::debug!(?span, ?fields, "telemetry emit");
379 Ok(())
380 }));
381 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
382 let resume_store = FlowResumeStore::new(session_store);
383
384 let mut adapters = AdapterRegistry::default();
385 adapters.register(
386 PACK_FLOW_ADAPTER,
387 Box::new(PackFlowAdapter::new(
388 Arc::clone(&config),
389 Arc::clone(&engine),
390 pack_trace,
391 resume_store,
392 mocks,
393 )),
394 );
395
396 let flows = build_flow_definitions(engine.flows());
397 let mut builder = RunnerBuilder::new()
398 .with_host(host)
399 .with_adapters(adapters)
400 .with_policy(Policy::default());
401 for flow in flows {
402 builder = builder.with_flow(flow);
403 }
404 let runner = builder
405 .build()
406 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
407 Ok(Self { runner })
408 }
409
410 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
412 let tenant_ctx = envelope.tenant_ctx();
413 let session_hint = envelope
414 .session_hint
415 .clone()
416 .unwrap_or_else(|| envelope.canonical_session_hint());
417 let pack_id = envelope.pack_id.clone().ok_or_else(|| {
418 anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
419 })?;
420 let input =
421 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
422 let request = RunFlowRequest {
423 tenant: tenant_ctx,
424 pack_id,
425 flow_id: envelope.flow_id.clone(),
426 input,
427 session_hint: Some(session_hint),
428 };
429 let result: super::api::RunFlowResult = self
430 .runner
431 .run_flow(request)
432 .await
433 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
434 let outcome = result.outcome;
435 Ok(outcome.get("response").cloned().unwrap_or(outcome))
436 }
437}
438
439struct PolicySecretsHost {
440 policy: Arc<SecretsPolicy>,
441 manager: DynSecretsManager,
442 tenant_ctx: TenantCtx,
443}
444
445impl PolicySecretsHost {
446 fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
447 Self {
448 policy,
449 manager,
450 tenant_ctx,
451 }
452 }
453}
454
455#[async_trait]
456impl SecretsHost for PolicySecretsHost {
457 async fn get(&self, name: &str) -> GResult<String> {
458 if !self.policy.is_allowed(name) {
459 return Err(RunnerError::Secrets {
460 reason: format!("secret {name} denied by policy"),
461 });
462 }
463 let scoped_key =
464 scoped_secret_path(&self.tenant_ctx, name).map_err(|err| RunnerError::Secrets {
465 reason: format!("secret {name} scope invalid: {err}"),
466 })?;
467 let bytes = self
468 .manager
469 .read(scoped_key.as_str())
470 .await
471 .map_err(|err| RunnerError::Secrets {
472 reason: format!("secret {name} unavailable: {err}"),
473 })?;
474 String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
475 reason: format!("secret {name} not valid UTF-8: {err}"),
476 })
477 }
478}
479
480fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
481 flows
482 .iter()
483 .map(|descriptor| {
484 FlowDefinition::new(
485 super::api::FlowSummary {
486 pack_id: descriptor.pack_id.clone(),
487 id: descriptor.id.clone(),
488 name: descriptor
489 .description
490 .clone()
491 .unwrap_or_else(|| descriptor.id.clone()),
492 version: descriptor.version.clone(),
493 description: descriptor.description.clone(),
494 },
495 serde_json::json!({
496 "type": "object"
497 }),
498 vec![FlowStep::Adapter(AdapterCall {
499 adapter: PACK_FLOW_ADAPTER.into(),
500 operation: descriptor.id.clone(),
501 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
502 })],
503 )
504 })
505 .collect()
506}
507
508struct PackFlowAdapter {
509 tenant: String,
510 config: Arc<HostConfig>,
511 engine: Arc<FlowEngine>,
512 pack_trace: HashMap<String, PackTraceInfo>,
513 resume: FlowResumeStore,
514 mocks: Option<Arc<MockLayer>>,
515}
516
517impl PackFlowAdapter {
518 fn new(
519 config: Arc<HostConfig>,
520 engine: Arc<FlowEngine>,
521 pack_trace: HashMap<String, PackTraceInfo>,
522 resume: FlowResumeStore,
523 mocks: Option<Arc<MockLayer>>,
524 ) -> Self {
525 Self {
526 tenant: config.tenant.clone(),
527 config,
528 engine,
529 pack_trace,
530 resume,
531 mocks,
532 }
533 }
534}
535
536#[async_trait::async_trait]
537impl Adapter for PackFlowAdapter {
538 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
539 let envelope: IngressEnvelope =
540 serde_json::from_value(call.payload.clone()).map_err(|err| {
541 RunnerError::AdapterCall {
542 reason: format!("invalid ingress payload: {err}"),
543 }
544 })?;
545 let envelope = envelope.canonicalize();
546 let flow_id = call.operation.clone();
547 let action_owned = envelope.action.clone();
548 let session_owned = envelope
549 .session_hint
550 .clone()
551 .unwrap_or_else(|| envelope.canonical_session_hint());
552 let provider_owned = envelope.provider.clone();
553 let payload = envelope.payload.clone();
554 let retry_config = self.config.retry_config().into();
555
556 let pack_id = if let Some(pack_id) = envelope.pack_id.as_deref() {
557 let found = self.engine.flow_by_key(pack_id, &flow_id).is_some();
558 if !found {
559 return Err(RunnerError::AdapterCall {
560 reason: format!("flow {flow_id} not registered for pack {pack_id}"),
561 });
562 }
563 pack_id
564 } else if let Some(flow) = self.engine.flow_by_id(&flow_id) {
565 flow.pack_id.as_str()
566 } else {
567 return Err(RunnerError::AdapterCall {
568 reason: format!("flow {flow_id} is ambiguous; pack_id is required"),
569 });
570 };
571
572 let trace_config = self.config.trace.clone();
573 let flow_version = self
574 .engine
575 .flow_by_key(pack_id, &flow_id)
576 .map(|desc| desc.version.clone())
577 .unwrap_or_else(|| "unknown".to_string());
578 let pack_trace = self
579 .pack_trace
580 .get(pack_id)
581 .cloned()
582 .unwrap_or_else(|| PackTraceInfo {
583 pack_ref: pack_id.to_string(),
584 resolved_digest: None,
585 });
586 let trace_ctx = TraceContext {
587 pack_ref: pack_trace.pack_ref,
588 resolved_digest: pack_trace.resolved_digest,
589 flow_id: flow_id.clone(),
590 flow_version,
591 };
592 let trace = if trace_config.mode == TraceMode::Off {
593 None
594 } else {
595 Some(TraceRecorder::new(trace_config, trace_ctx))
596 };
597
598 let mocks = self.mocks.as_deref();
599 let ctx = FlowContext {
600 tenant: &self.tenant,
601 pack_id,
602 flow_id: &flow_id,
603 node_id: None,
604 tool: None,
605 action: action_owned.as_deref(),
606 session_id: Some(session_owned.as_str()),
607 provider_id: provider_owned.as_deref(),
608 retry_config,
609 attempt: 1,
610 observer: trace
611 .as_ref()
612 .map(|recorder| recorder as &dyn crate::runner::engine::ExecutionObserver),
613 mocks,
614 };
615
616 let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
617 let resume_pack_id = snapshot.pack_id.clone();
618 let resume_ctx = FlowContext {
619 pack_id: resume_pack_id.as_str(),
620 ..ctx
621 };
622 self.engine.resume(resume_ctx, snapshot, payload).await
623 } else {
624 self.engine.execute(ctx, payload).await
625 };
626 let execution = match execution {
627 Ok(execution) => {
628 if let Some(recorder) = trace.as_ref()
629 && let Err(err) = recorder.flush_success()
630 {
631 tracing::warn!(error = %err, "failed to write trace");
632 }
633 execution
634 }
635 Err(err) => {
636 if let Some(recorder) = trace.as_ref()
637 && let Err(write_err) = recorder.flush_error(err.as_ref())
638 {
639 tracing::warn!(error = %write_err, "failed to write trace");
640 }
641 return Err(RunnerError::AdapterCall {
642 reason: err.to_string(),
643 });
644 }
645 };
646
647 match execution.status {
648 FlowStatus::Completed => {
649 self.resume.clear(&envelope)?;
650 Ok(execution.output)
651 }
652 FlowStatus::Waiting(wait) => {
653 let reply_scope = self.resume.save(&envelope, &wait)?;
654 Ok(json!({
655 "status": "pending",
656 "reason": wait.reason,
657 "resume": wait.snapshot,
658 "reply_scope": reply_scope,
659 "response": execution.output,
660 }))
661 }
662 }
663 }
664}
665
666#[derive(Clone, Debug, Serialize, Deserialize)]
667pub struct IngressEnvelope {
668 pub tenant: String,
669 #[serde(default, skip_serializing_if = "Option::is_none")]
670 pub env: Option<String>,
671 #[serde(default, skip_serializing_if = "Option::is_none")]
672 pub pack_id: Option<String>,
673 pub flow_id: String,
674 #[serde(default, skip_serializing_if = "Option::is_none")]
675 pub flow_type: Option<String>,
676 #[serde(default, skip_serializing_if = "Option::is_none")]
677 pub action: Option<String>,
678 #[serde(default, skip_serializing_if = "Option::is_none")]
679 pub session_hint: Option<String>,
680 #[serde(default, skip_serializing_if = "Option::is_none")]
681 pub provider: Option<String>,
682 #[serde(default, skip_serializing_if = "Option::is_none")]
683 pub channel: Option<String>,
684 #[serde(default, skip_serializing_if = "Option::is_none")]
685 pub conversation: Option<String>,
686 #[serde(default, skip_serializing_if = "Option::is_none")]
687 pub user: Option<String>,
688 #[serde(default, skip_serializing_if = "Option::is_none")]
689 pub activity_id: Option<String>,
690 #[serde(default, skip_serializing_if = "Option::is_none")]
691 pub timestamp: Option<String>,
692 #[serde(default)]
693 pub payload: Value,
694 #[serde(default, skip_serializing_if = "Option::is_none")]
695 pub metadata: Option<Value>,
696 #[serde(default, skip_serializing_if = "Option::is_none")]
697 pub reply_scope: Option<ReplyScope>,
698}
699
700impl IngressEnvelope {
701 pub fn canonicalize(mut self) -> Self {
702 if self.provider.is_none() {
703 self.provider = Some("provider".into());
704 }
705 if self.channel.is_none() {
706 self.channel = Some(self.flow_id.clone());
707 }
708 if self.conversation.is_none() {
709 self.conversation = self.channel.clone();
710 }
711 if self.user.is_none() {
712 if let Some(ref hint) = self.session_hint {
713 self.user = Some(hint.clone());
714 } else if let Some(ref activity) = self.activity_id {
715 self.user = Some(activity.clone());
716 } else {
717 self.user = Some("user".into());
718 }
719 }
720 if self.session_hint.is_none() {
721 self.session_hint = Some(self.canonical_session_hint());
722 }
723 if self.reply_scope.is_none()
724 && let Some(conversation) = self.conversation.clone()
725 {
726 self.reply_scope = Some(ReplyScope {
727 conversation,
728 thread: None,
729 reply_to: None,
730 correlation: None,
731 });
732 }
733 self
734 }
735
736 pub fn canonical_session_hint(&self) -> String {
737 format!(
738 "{}:{}:{}:{}:{}",
739 self.tenant,
740 self.provider.as_deref().unwrap_or("provider"),
741 self.channel.as_deref().unwrap_or("channel"),
742 self.conversation.as_deref().unwrap_or("conversation"),
743 self.user.as_deref().unwrap_or("user")
744 )
745 }
746
747 pub fn tenant_ctx(&self) -> TenantCtx {
748 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
749 let env = EnvId::from_str(env_raw.as_str())
750 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
751 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
752 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
753 });
754 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
755 if let Some(provider) = &self.provider {
756 ctx = ctx.with_provider(provider.clone());
757 }
758 if let Some(session) = &self.session_hint {
759 ctx = ctx.with_session(session.clone());
760 }
761 ctx
762 }
763}