1use std::collections::HashMap;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use greentic_session::{SessionData, SessionKey as StoreSessionKey};
8use greentic_types::{
9 EnvId, FlowId, GreenticError, PackId, ReplyScope, SessionCursor as TypesSessionCursor,
10 TenantCtx, TenantId, UserId,
11};
12use rand::{RngExt, rng};
13use serde::{Deserialize, Serialize};
14use serde_json::{Value, json};
15use sha2::{Digest, Sha256};
16
17use super::api::{RunFlowRequest, RunnerApi};
18use super::builder::{Runner, RunnerBuilder};
19use super::error::{GResult, RunnerError};
20use super::glue::{FnSecretsHost, FnTelemetryHost};
21use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
22use super::policy::Policy;
23use super::registry::{Adapter, AdapterCall, AdapterRegistry};
24use super::shims::{InMemorySessionHost, InMemoryStateHost};
25use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
26
27use crate::config::{HostConfig, SecretsPolicy};
28use crate::pack::FlowDescriptor;
29use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
30use crate::runner::mocks::MockLayer;
31use crate::secrets::{DynSecretsManager, read_secret_blocking};
32use crate::storage::session::DynSessionStore;
33use crate::trace::{PackTraceInfo, TraceContext, TraceMode, TraceRecorder};
34
35const DEFAULT_ENV: &str = "local";
36const PACK_FLOW_ADAPTER: &str = "pack_flow";
37
38#[derive(Clone)]
39pub struct FlowResumeStore {
40 store: DynSessionStore,
41}
42
43impl FlowResumeStore {
44 pub fn new(store: DynSessionStore) -> Self {
45 Self { store }
46 }
47
48 pub fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
49 let (mut ctx, user, _, scope) = build_store_ctx(envelope)?;
50 ctx = ctx.with_user(Some(user.clone()));
51
52 let mut scopes = vec![scope.clone()];
53 if scope.correlation.is_some() {
54 let mut base = scope.clone();
55 base.correlation = None;
56 scopes.push(base);
57 }
58
59 for lookup in scopes {
60 if let Some(key) = self
61 .store
62 .find_wait_by_scope(&ctx, &user, &lookup)
63 .map_err(map_store_error)?
64 {
65 let Some(data) = self.store.get_session(&key).map_err(map_store_error)? else {
66 continue;
67 };
68 let record: FlowResumeRecord =
69 serde_json::from_str(&data.context_json).map_err(|err| {
70 RunnerError::Session {
71 reason: format!("failed to decode flow resume snapshot: {err}"),
72 }
73 })?;
74 if let Some(pack_id) = envelope.pack_id.as_deref()
75 && record.snapshot.pack_id != pack_id
76 {
77 return Err(RunnerError::Session {
78 reason: format!(
79 "resume pack mismatch: expected {pack_id}, found {}",
80 record.snapshot.pack_id
81 ),
82 });
83 }
84 return Ok(Some(record.snapshot));
85 }
86 }
87
88 Ok(None)
89 }
90
91 pub fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<ReplyScope> {
92 let (ctx, user, hint, scope) = build_store_ctx(envelope)?;
93 let record = FlowResumeRecord {
94 snapshot: wait.snapshot.clone(),
95 reason: wait.reason.clone(),
96 };
97 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
98 let mut reply_scope = scope.clone();
99 if reply_scope.correlation.is_none() {
100 reply_scope.correlation = Some(generate_correlation_id());
101 }
102 let mut store_scope = scope;
103 store_scope.correlation = None;
104 let session_key = StoreSessionKey::new(format!("{hint}::{}", store_scope.scope_hash()));
105 self.store
106 .register_wait(&ctx, &user, &store_scope, &session_key, data, None)
107 .map_err(map_store_error)?;
108 Ok(reply_scope)
109 }
110
111 pub fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
112 let (ctx, user, _, scope) = build_store_ctx(envelope)?;
113 let mut scopes = vec![scope.clone()];
114 if scope.correlation.is_some() {
115 let mut base = scope;
116 base.correlation = None;
117 scopes.push(base);
118 }
119 for lookup in scopes {
120 self.store
121 .clear_wait(&ctx, &user, &lookup)
122 .map_err(map_store_error)?;
123 }
124 Ok(())
125 }
126}
127
128#[derive(Serialize, Deserialize)]
129struct FlowResumeRecord {
130 snapshot: FlowSnapshot,
131 #[serde(default)]
132 reason: Option<String>,
133}
134
135fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String, ReplyScope)> {
136 let base_hint = envelope
137 .session_hint
138 .clone()
139 .unwrap_or_else(|| envelope.canonical_session_hint());
140 let hint = if let Some(pack_id) = envelope.pack_id.as_deref() {
141 format!("{base_hint}::pack={pack_id}")
142 } else {
143 base_hint.clone()
144 };
145 let user = derive_user_id(&hint)?;
146 let scope = envelope
147 .reply_scope
148 .clone()
149 .ok_or_else(|| RunnerError::Session {
150 reason: "Cannot suspend: reply_scope missing; provider plugin must supply ReplyScope"
151 .to_string(),
152 })?;
153 let mut ctx = envelope.tenant_ctx();
154 ctx = ctx.with_session(hint.clone());
155 ctx = ctx.with_user(Some(user.clone()));
156 Ok((ctx, user, hint, scope))
157}
158
159fn record_to_session_data(
160 record: &FlowResumeRecord,
161 ctx: TenantCtx,
162 user: &UserId,
163 session_hint: &str,
164) -> GResult<SessionData> {
165 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
166 let pack = PackId::from_str(record.snapshot.pack_id.as_str()).map_err(map_store_error)?;
167 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
168 if let Some(reason) = record.reason.clone() {
169 cursor = cursor.with_wait_reason(reason);
170 }
171 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
172 reason: format!("failed to encode flow resume snapshot: {err}"),
173 })?;
174 let ctx = ctx
175 .with_user(Some(user.clone()))
176 .with_session(session_hint.to_string())
177 .with_flow(record.snapshot.flow_id.clone());
178 Ok(SessionData {
179 tenant_ctx: ctx,
180 flow_id: flow,
181 pack_id: Some(pack),
182 cursor,
183 context_json,
184 })
185}
186
187fn derive_user_id(hint: &str) -> GResult<UserId> {
188 let digest = Sha256::digest(hint.as_bytes());
189 let slug = format!("sess{}", hex::encode(&digest[..8]));
190 UserId::from_str(&slug).map_err(map_store_error)
191}
192
193fn map_store_error(err: GreenticError) -> RunnerError {
194 RunnerError::Session {
195 reason: err.to_string(),
196 }
197}
198
199fn generate_correlation_id() -> String {
200 let mut bytes = [0u8; 16];
201 rng().fill(&mut bytes);
202 hex::encode(bytes)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::runner::engine::ExecutionState;
209 use crate::storage::session::new_session_store;
210 use serde_json::json;
211
212 fn sample_envelope() -> IngressEnvelope {
213 IngressEnvelope {
214 tenant: "demo".into(),
215 env: Some("local".into()),
216 pack_id: Some("pack.demo".into()),
217 flow_id: "flow.main".into(),
218 flow_type: None,
219 action: Some("messaging".into()),
220 session_hint: Some("demo:provider:chan:conv:user".into()),
221 provider: Some("provider".into()),
222 channel: Some("chan".into()),
223 conversation: Some("conv".into()),
224 user: Some("user".into()),
225 activity_id: Some("act-1".into()),
226 timestamp: None,
227 payload: json!({ "text": "hi" }),
228 metadata: None,
229 reply_scope: Some(ReplyScope {
230 conversation: "conv".into(),
231 thread: None,
232 reply_to: None,
233 correlation: None,
234 }),
235 }
236 }
237
238 fn sample_wait() -> FlowWait {
239 let state: ExecutionState = serde_json::from_value(json!({
240 "input": { "text": "hi" },
241 "nodes": {},
242 "egress": []
243 }))
244 .expect("state");
245 FlowWait {
246 reason: Some("await-user".into()),
247 snapshot: FlowSnapshot {
248 pack_id: "pack.demo".into(),
249 flow_id: "flow.main".into(),
250 next_flow: None,
251 next_node: "node-2".into(),
252 state,
253 },
254 }
255 }
256
257 #[test]
258 fn derive_user_id_is_stable() {
259 let hint = "some-tenant::session-key";
260 let a = derive_user_id(hint).unwrap();
261 let b = derive_user_id(hint).unwrap();
262 assert_eq!(a, b);
263 assert!(a.as_str().starts_with("sess"));
264 }
265
266 #[test]
267 fn resume_store_roundtrip() -> GResult<()> {
268 let store = FlowResumeStore::new(new_session_store());
269 let envelope = sample_envelope();
270 assert!(store.fetch(&envelope)?.is_none());
271
272 let wait = sample_wait();
273 let _ = store.save(&envelope, &wait)?;
274 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
275 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
276 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
277
278 store.clear(&envelope)?;
279 assert!(store.fetch(&envelope)?.is_none());
280 Ok(())
281 }
282
283 #[test]
284 fn resume_store_overwrites_existing() -> GResult<()> {
285 let store = FlowResumeStore::new(new_session_store());
286 let envelope = sample_envelope();
287 let mut wait = sample_wait();
288 let _ = store.save(&envelope, &wait)?;
289
290 wait.snapshot.next_node = "node-3".into();
291 wait.reason = Some("retry".into());
292 let _ = store.save(&envelope, &wait)?;
293
294 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
295 assert_eq!(snapshot.next_node, "node-3");
296 store.clear(&envelope)?;
297 Ok(())
298 }
299
300 #[test]
301 fn resume_store_uses_snapshot_even_if_envelope_flow_differs() -> GResult<()> {
302 let store = FlowResumeStore::new(new_session_store());
303 let envelope = sample_envelope();
304 let wait = sample_wait();
305 let _ = store.save(&envelope, &wait)?;
306
307 let mut redirected = envelope.clone();
308 redirected.flow_id = "flow.other".into();
309 let snapshot = store.fetch(&redirected)?.expect("snapshot missing");
310 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
311
312 store.clear(&envelope)?;
313 Ok(())
314 }
315
316 #[test]
317 fn canonicalize_populates_defaults() {
318 let envelope = IngressEnvelope {
319 tenant: "demo".into(),
320 env: None,
321 pack_id: None,
322 flow_id: "flow.main".into(),
323 flow_type: None,
324 action: None,
325 session_hint: None,
326 provider: None,
327 channel: None,
328 conversation: None,
329 user: None,
330 activity_id: Some("activity-1".into()),
331 timestamp: None,
332 payload: json!({}),
333 metadata: None,
334 reply_scope: None,
335 }
336 .canonicalize();
337
338 assert_eq!(envelope.provider.as_deref(), Some("provider"));
339 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
340 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
341 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
342 assert!(envelope.session_hint.is_some());
343 }
344}
345
346pub struct StateMachineRuntime {
347 runner: Runner,
348}
349
350impl StateMachineRuntime {
351 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
353 let secrets = Arc::new(FnSecretsHost::new(|name| {
354 Err(RunnerError::Secrets {
355 reason: format!("secret {name} unavailable (noop host)"),
356 })
357 }));
358 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
359 let session = Arc::new(InMemorySessionHost::new());
360 let state = Arc::new(InMemoryStateHost::new());
361 let host = HostBundle::new(secrets, telemetry, session, state);
362
363 let adapters = AdapterRegistry::default();
364 let policy = Policy::default();
365
366 let mut builder = RunnerBuilder::new()
367 .with_host(host)
368 .with_adapters(adapters)
369 .with_policy(policy);
370 for flow in flows {
371 builder = builder.with_flow(flow);
372 }
373 let runner = builder.build()?;
374 Ok(Self { runner })
375 }
376
377 #[allow(clippy::too_many_arguments)]
379 pub fn from_flow_engine(
380 config: Arc<HostConfig>,
381 engine: Arc<FlowEngine>,
382 pack_trace: HashMap<String, PackTraceInfo>,
383 session_host: Arc<dyn SessionHost>,
384 session_store: DynSessionStore,
385 state_host: Arc<dyn StateHost>,
386 secrets_manager: DynSecretsManager,
387 mocks: Option<Arc<MockLayer>>,
388 ) -> Result<Self> {
389 let policy = Arc::new(config.secrets_policy.clone());
390 let tenant_ctx = config.tenant_ctx();
391 let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager, tenant_ctx));
392 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
393 tracing::debug!(?span, ?fields, "telemetry emit");
394 Ok(())
395 }));
396 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
397 let resume_store = FlowResumeStore::new(session_store);
398
399 let mut adapters = AdapterRegistry::default();
400 adapters.register(
401 PACK_FLOW_ADAPTER,
402 Box::new(PackFlowAdapter::new(
403 Arc::clone(&config),
404 Arc::clone(&engine),
405 pack_trace,
406 resume_store,
407 mocks,
408 )),
409 );
410
411 let flows = build_flow_definitions(engine.flows());
412 let mut builder = RunnerBuilder::new()
413 .with_host(host)
414 .with_adapters(adapters)
415 .with_policy(Policy::default());
416 for flow in flows {
417 builder = builder.with_flow(flow);
418 }
419 let runner = builder
420 .build()
421 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
422 Ok(Self { runner })
423 }
424
425 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
427 let tenant_ctx = envelope.tenant_ctx();
428 let session_hint = envelope
429 .session_hint
430 .clone()
431 .unwrap_or_else(|| envelope.canonical_session_hint());
432 let pack_id = envelope.pack_id.clone().ok_or_else(|| {
433 anyhow!("pack_id missing; ingress must specify pack_id for multi-pack flows")
434 })?;
435 let input =
436 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
437 let request = RunFlowRequest {
438 tenant: tenant_ctx,
439 pack_id,
440 flow_id: envelope.flow_id.clone(),
441 input,
442 session_hint: Some(session_hint),
443 };
444 let result: super::api::RunFlowResult = self
445 .runner
446 .run_flow(request)
447 .await
448 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
449 let outcome = result.outcome;
450 Ok(outcome.get("response").cloned().unwrap_or(outcome))
451 }
452}
453
454struct PolicySecretsHost {
455 policy: Arc<SecretsPolicy>,
456 manager: DynSecretsManager,
457 tenant_ctx: TenantCtx,
458}
459
460impl PolicySecretsHost {
461 fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager, tenant_ctx: TenantCtx) -> Self {
462 Self {
463 policy,
464 manager,
465 tenant_ctx,
466 }
467 }
468}
469
470const POLICY_SECRETS_PACK_ID: &str = "_runner";
471
472#[async_trait]
473impl SecretsHost for PolicySecretsHost {
474 async fn get(&self, name: &str) -> GResult<String> {
475 if !self.policy.is_allowed(name) {
476 return Err(RunnerError::Secrets {
477 reason: format!("secret {name} denied by policy"),
478 });
479 }
480 let bytes = read_secret_blocking(
481 &self.manager,
482 &self.tenant_ctx,
483 POLICY_SECRETS_PACK_ID,
484 name,
485 )
486 .map_err(|err| RunnerError::Secrets {
487 reason: format!("secret {name} unavailable: {err}"),
488 })?;
489 String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
490 reason: format!("secret {name} not valid UTF-8: {err}"),
491 })
492 }
493}
494
495fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
496 flows
497 .iter()
498 .map(|descriptor| {
499 FlowDefinition::new(
500 super::api::FlowSummary {
501 pack_id: descriptor.pack_id.clone(),
502 id: descriptor.id.clone(),
503 name: descriptor
504 .description
505 .clone()
506 .unwrap_or_else(|| descriptor.id.clone()),
507 version: descriptor.version.clone(),
508 description: descriptor.description.clone(),
509 },
510 serde_json::json!({
511 "type": "object"
512 }),
513 vec![FlowStep::Adapter(AdapterCall {
514 adapter: PACK_FLOW_ADAPTER.into(),
515 operation: descriptor.id.clone(),
516 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
517 })],
518 )
519 })
520 .collect()
521}
522
523struct PackFlowAdapter {
524 tenant: String,
525 config: Arc<HostConfig>,
526 engine: Arc<FlowEngine>,
527 pack_trace: HashMap<String, PackTraceInfo>,
528 resume: FlowResumeStore,
529 mocks: Option<Arc<MockLayer>>,
530}
531
532impl PackFlowAdapter {
533 fn new(
534 config: Arc<HostConfig>,
535 engine: Arc<FlowEngine>,
536 pack_trace: HashMap<String, PackTraceInfo>,
537 resume: FlowResumeStore,
538 mocks: Option<Arc<MockLayer>>,
539 ) -> Self {
540 Self {
541 tenant: config.tenant.clone(),
542 config,
543 engine,
544 pack_trace,
545 resume,
546 mocks,
547 }
548 }
549}
550
551#[async_trait::async_trait]
552impl Adapter for PackFlowAdapter {
553 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
554 let envelope: IngressEnvelope =
555 serde_json::from_value(call.payload.clone()).map_err(|err| {
556 RunnerError::AdapterCall {
557 reason: format!("invalid ingress payload: {err}"),
558 }
559 })?;
560 let envelope = envelope.canonicalize();
561 let flow_id = call.operation.clone();
562 let action_owned = envelope.action.clone();
563 let session_owned = envelope
564 .session_hint
565 .clone()
566 .unwrap_or_else(|| envelope.canonical_session_hint());
567 let provider_owned = envelope.provider.clone();
568 let payload = envelope.payload.clone();
569 let retry_config = self.config.retry_config().into();
570 let resume_snapshot = self.resume.fetch(&envelope)?;
571 let resume_flow_id = resume_snapshot
572 .as_ref()
573 .and_then(|snapshot| snapshot.next_flow.clone())
574 .or_else(|| {
575 resume_snapshot
576 .as_ref()
577 .map(|snapshot| snapshot.flow_id.clone())
578 });
579 let effective_flow_id = resume_flow_id.clone().unwrap_or_else(|| flow_id.clone());
580 let effective_pack_id = if let Some(snapshot) = resume_snapshot.as_ref() {
581 snapshot.pack_id.clone()
582 } else if let Some(pack_id) = envelope.pack_id.as_deref() {
583 let found = self
584 .engine
585 .flow_by_key(pack_id, effective_flow_id.as_str())
586 .is_some();
587 if !found {
588 return Err(RunnerError::AdapterCall {
589 reason: format!(
590 "flow {} not registered for pack {pack_id}",
591 effective_flow_id
592 ),
593 });
594 }
595 pack_id.to_string()
596 } else if let Some(flow) = self.engine.flow_by_id(effective_flow_id.as_str()) {
597 flow.pack_id.clone()
598 } else {
599 return Err(RunnerError::AdapterCall {
600 reason: format!(
601 "flow {} is ambiguous; pack_id is required",
602 effective_flow_id
603 ),
604 });
605 };
606
607 let trace_config = self.config.trace.clone();
608 let flow_version = self
609 .engine
610 .flow_by_key(effective_pack_id.as_str(), effective_flow_id.as_str())
611 .map(|desc| desc.version.clone())
612 .unwrap_or_else(|| "unknown".to_string());
613 let pack_trace = self
614 .pack_trace
615 .get(effective_pack_id.as_str())
616 .cloned()
617 .unwrap_or_else(|| PackTraceInfo {
618 pack_ref: effective_pack_id.clone(),
619 resolved_digest: None,
620 });
621 let trace_ctx = TraceContext {
622 pack_ref: pack_trace.pack_ref,
623 resolved_digest: pack_trace.resolved_digest,
624 flow_id: effective_flow_id.clone(),
625 flow_version,
626 };
627 let trace = if trace_config.mode == TraceMode::Off {
628 None
629 } else {
630 Some(TraceRecorder::new(trace_config, trace_ctx))
631 };
632
633 let mocks = self.mocks.as_deref();
634 let ctx = FlowContext {
635 tenant: &self.tenant,
636 pack_id: effective_pack_id.as_str(),
637 flow_id: effective_flow_id.as_str(),
638 node_id: None,
639 tool: None,
640 action: action_owned.as_deref(),
641 session_id: Some(session_owned.as_str()),
642 provider_id: provider_owned.as_deref(),
643 retry_config,
644 attempt: 1,
645 observer: trace
646 .as_ref()
647 .map(|recorder| recorder as &dyn crate::runner::engine::ExecutionObserver),
648 mocks,
649 };
650
651 let execution = if let Some(snapshot) = resume_snapshot {
652 let resume_pack_id = snapshot.pack_id.clone();
653 let resume_flow_id = snapshot
654 .next_flow
655 .clone()
656 .unwrap_or_else(|| snapshot.flow_id.clone());
657 let resume_ctx = FlowContext {
658 pack_id: resume_pack_id.as_str(),
659 flow_id: resume_flow_id.as_str(),
660 ..ctx
661 };
662 self.engine.resume(resume_ctx, snapshot, payload).await
663 } else {
664 self.engine.execute(ctx, payload).await
665 };
666 let execution = match execution {
667 Ok(execution) => {
668 if let Some(recorder) = trace.as_ref()
669 && let Err(err) = recorder.flush_success()
670 {
671 tracing::warn!(error = %err, "failed to write trace");
672 }
673 execution
674 }
675 Err(err) => {
676 if let Some(recorder) = trace.as_ref()
677 && let Err(write_err) = recorder.flush_error(err.as_ref())
678 {
679 tracing::warn!(error = %write_err, "failed to write trace");
680 }
681 return Err(RunnerError::AdapterCall {
682 reason: err.to_string(),
683 });
684 }
685 };
686
687 match execution.status {
688 FlowStatus::Completed => {
689 self.resume.clear(&envelope)?;
690 Ok(execution.output)
691 }
692 FlowStatus::Waiting(wait) => {
693 let reply_scope = self.resume.save(&envelope, &wait)?;
694 Ok(json!({
695 "status": "pending",
696 "reason": wait.reason,
697 "resume": wait.snapshot,
698 "reply_scope": reply_scope,
699 "response": execution.output,
700 }))
701 }
702 }
703 }
704}
705
706#[derive(Clone, Debug, Serialize, Deserialize)]
707pub struct IngressEnvelope {
708 pub tenant: String,
709 #[serde(default, skip_serializing_if = "Option::is_none")]
710 pub env: Option<String>,
711 #[serde(default, skip_serializing_if = "Option::is_none")]
712 pub pack_id: Option<String>,
713 pub flow_id: String,
714 #[serde(default, skip_serializing_if = "Option::is_none")]
715 pub flow_type: Option<String>,
716 #[serde(default, skip_serializing_if = "Option::is_none")]
717 pub action: Option<String>,
718 #[serde(default, skip_serializing_if = "Option::is_none")]
719 pub session_hint: Option<String>,
720 #[serde(default, skip_serializing_if = "Option::is_none")]
721 pub provider: Option<String>,
722 #[serde(default, skip_serializing_if = "Option::is_none")]
723 pub channel: Option<String>,
724 #[serde(default, skip_serializing_if = "Option::is_none")]
725 pub conversation: Option<String>,
726 #[serde(default, skip_serializing_if = "Option::is_none")]
727 pub user: Option<String>,
728 #[serde(default, skip_serializing_if = "Option::is_none")]
729 pub activity_id: Option<String>,
730 #[serde(default, skip_serializing_if = "Option::is_none")]
731 pub timestamp: Option<String>,
732 #[serde(default)]
733 pub payload: Value,
734 #[serde(default, skip_serializing_if = "Option::is_none")]
735 pub metadata: Option<Value>,
736 #[serde(default, skip_serializing_if = "Option::is_none")]
737 pub reply_scope: Option<ReplyScope>,
738}
739
740impl IngressEnvelope {
741 pub fn canonicalize(mut self) -> Self {
742 if self.provider.is_none() {
743 self.provider = Some("provider".into());
744 }
745 if self.channel.is_none() {
746 self.channel = Some(self.flow_id.clone());
747 }
748 if self.conversation.is_none() {
749 self.conversation = self.channel.clone();
750 }
751 if self.user.is_none() {
752 if let Some(ref hint) = self.session_hint {
753 self.user = Some(hint.clone());
754 } else if let Some(ref activity) = self.activity_id {
755 self.user = Some(activity.clone());
756 } else {
757 self.user = Some("user".into());
758 }
759 }
760 if self.session_hint.is_none() {
761 self.session_hint = Some(self.canonical_session_hint());
762 }
763 if self.reply_scope.is_none()
764 && let Some(conversation) = self.conversation.clone()
765 {
766 self.reply_scope = Some(ReplyScope {
767 conversation,
768 thread: None,
769 reply_to: None,
770 correlation: None,
771 });
772 }
773 self
774 }
775
776 pub fn canonical_session_hint(&self) -> String {
777 format!(
778 "{}:{}:{}:{}:{}",
779 self.tenant,
780 self.provider.as_deref().unwrap_or("provider"),
781 self.channel.as_deref().unwrap_or("channel"),
782 self.conversation.as_deref().unwrap_or("conversation"),
783 self.user.as_deref().unwrap_or("user")
784 )
785 }
786
787 pub fn tenant_ctx(&self) -> TenantCtx {
788 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
789 let env = EnvId::from_str(env_raw.as_str())
790 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
791 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
792 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
793 });
794 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
795 if let Some(provider) = &self.provider {
796 ctx = ctx.with_provider(provider.clone());
797 }
798 if let Some(session) = &self.session_hint {
799 ctx = ctx.with_session(session.clone());
800 }
801 ctx
802 }
803}