1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_session::SessionData;
6use greentic_types::{
7 EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use sha2::{Digest, Sha256};
12
13use super::api::{RunFlowRequest, RunnerApi};
14use super::builder::{Runner, RunnerBuilder};
15use super::error::{GResult, RunnerError};
16use super::glue::{FnSecretsHost, FnTelemetryHost};
17use super::host::{HostBundle, SessionHost, StateHost};
18use super::policy::Policy;
19use super::registry::{Adapter, AdapterCall, AdapterRegistry};
20use super::shims::{InMemorySessionHost, InMemoryStateHost};
21use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
22
23use crate::config::HostConfig;
24use crate::pack::FlowDescriptor;
25use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
26use crate::runner::mocks::MockLayer;
27use crate::storage::session::DynSessionStore;
28
29const DEFAULT_ENV: &str = "local";
30const PACK_FLOW_ADAPTER: &str = "pack_flow";
31
32#[derive(Clone)]
33pub struct FlowResumeStore {
34 store: DynSessionStore,
35}
36
37impl FlowResumeStore {
38 pub fn new(store: DynSessionStore) -> Self {
39 Self { store }
40 }
41
42 fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
43 let (mut ctx, user, _) = build_store_ctx(envelope)?;
44 ctx = ctx.with_user(Some(user.clone()));
45 if let Some((_key, data)) = self
46 .store
47 .find_by_user(&ctx, &user)
48 .map_err(map_store_error)?
49 {
50 let record: FlowResumeRecord =
51 serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
52 reason: format!("failed to decode flow resume snapshot: {err}"),
53 })?;
54 if record.snapshot.flow_id == envelope.flow_id {
55 return Ok(Some(record.snapshot));
56 }
57 }
58 Ok(None)
59 }
60
61 fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
62 let (ctx, user, hint) = build_store_ctx(envelope)?;
63 let record = FlowResumeRecord {
64 snapshot: wait.snapshot.clone(),
65 reason: wait.reason.clone(),
66 };
67 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
68 let existing = self
69 .store
70 .find_by_user(&ctx, &user)
71 .map_err(map_store_error)?;
72 if let Some((key, _)) = existing {
73 self.store
74 .update_session(&key, data)
75 .map_err(map_store_error)?;
76 } else {
77 self.store
78 .create_session(&ctx, data)
79 .map_err(map_store_error)?;
80 }
81 Ok(())
82 }
83
84 fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
85 let (ctx, user, _) = build_store_ctx(envelope)?;
86 if let Some((key, _)) = self
87 .store
88 .find_by_user(&ctx, &user)
89 .map_err(map_store_error)?
90 {
91 self.store.remove_session(&key).map_err(map_store_error)?;
92 }
93 Ok(())
94 }
95}
96
97#[derive(Serialize, Deserialize)]
98struct FlowResumeRecord {
99 snapshot: FlowSnapshot,
100 #[serde(default)]
101 reason: Option<String>,
102}
103
104fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
105 let hint = envelope
106 .session_hint
107 .clone()
108 .unwrap_or_else(|| envelope.canonical_session_hint());
109 let user = derive_user_id(&hint)?;
110 let mut ctx = envelope.tenant_ctx();
111 ctx = ctx.with_session(hint.clone());
112 Ok((ctx, user, hint))
113}
114
115fn record_to_session_data(
116 record: &FlowResumeRecord,
117 ctx: TenantCtx,
118 user: &UserId,
119 session_hint: &str,
120) -> GResult<SessionData> {
121 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
122 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
123 if let Some(reason) = record.reason.clone() {
124 cursor = cursor.with_wait_reason(reason);
125 }
126 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
127 reason: format!("failed to encode flow resume snapshot: {err}"),
128 })?;
129 let ctx = ctx
130 .with_user(Some(user.clone()))
131 .with_session(session_hint.to_string())
132 .with_flow(record.snapshot.flow_id.clone());
133 Ok(SessionData {
134 tenant_ctx: ctx,
135 flow_id: flow,
136 cursor,
137 context_json,
138 })
139}
140
141fn derive_user_id(hint: &str) -> GResult<UserId> {
142 let digest = Sha256::digest(hint.as_bytes());
143 let slug = format!("sess{}", hex::encode(&digest[..8]));
144 UserId::from_str(&slug).map_err(map_store_error)
145}
146
147fn map_store_error(err: GreenticError) -> RunnerError {
148 RunnerError::Session {
149 reason: err.to_string(),
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use crate::runner::engine::ExecutionState;
157 use crate::storage::session::new_session_store;
158 use serde_json::json;
159
160 fn sample_envelope() -> IngressEnvelope {
161 IngressEnvelope {
162 tenant: "demo".into(),
163 env: Some("local".into()),
164 flow_id: "flow.main".into(),
165 flow_type: None,
166 action: Some("messaging".into()),
167 session_hint: Some("demo:provider:chan:conv:user".into()),
168 provider: Some("provider".into()),
169 channel: Some("chan".into()),
170 conversation: Some("conv".into()),
171 user: Some("user".into()),
172 activity_id: Some("act-1".into()),
173 timestamp: None,
174 payload: json!({ "text": "hi" }),
175 metadata: None,
176 }
177 }
178
179 fn sample_wait() -> FlowWait {
180 let state: ExecutionState = serde_json::from_value(json!({
181 "input": { "text": "hi" },
182 "nodes": {},
183 "egress": []
184 }))
185 .expect("state");
186 FlowWait {
187 reason: Some("await-user".into()),
188 snapshot: FlowSnapshot {
189 flow_id: "flow.main".into(),
190 next_node: "node-2".into(),
191 state,
192 },
193 }
194 }
195
196 #[test]
197 fn derive_user_id_is_stable() {
198 let hint = "some-tenant::session-key";
199 let a = derive_user_id(hint).unwrap();
200 let b = derive_user_id(hint).unwrap();
201 assert_eq!(a, b);
202 assert!(a.as_str().starts_with("sess"));
203 }
204
205 #[test]
206 fn resume_store_roundtrip() -> GResult<()> {
207 let store = FlowResumeStore::new(new_session_store());
208 let envelope = sample_envelope();
209 assert!(store.fetch(&envelope)?.is_none());
210
211 let wait = sample_wait();
212 store.save(&envelope, &wait)?;
213 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
214 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
215 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
216
217 store.clear(&envelope)?;
218 assert!(store.fetch(&envelope)?.is_none());
219 Ok(())
220 }
221
222 #[test]
223 fn resume_store_overwrites_existing() -> GResult<()> {
224 let store = FlowResumeStore::new(new_session_store());
225 let envelope = sample_envelope();
226 let mut wait = sample_wait();
227 store.save(&envelope, &wait)?;
228
229 wait.snapshot.next_node = "node-3".into();
230 wait.reason = Some("retry".into());
231 store.save(&envelope, &wait)?;
232
233 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
234 assert_eq!(snapshot.next_node, "node-3");
235 store.clear(&envelope)?;
236 Ok(())
237 }
238
239 #[test]
240 fn canonicalize_populates_defaults() {
241 let envelope = IngressEnvelope {
242 tenant: "demo".into(),
243 env: None,
244 flow_id: "flow.main".into(),
245 flow_type: None,
246 action: None,
247 session_hint: None,
248 provider: None,
249 channel: None,
250 conversation: None,
251 user: None,
252 activity_id: Some("activity-1".into()),
253 timestamp: None,
254 payload: json!({}),
255 metadata: None,
256 }
257 .canonicalize();
258
259 assert_eq!(envelope.provider.as_deref(), Some("provider"));
260 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
261 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
262 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
263 assert!(envelope.session_hint.is_some());
264 }
265}
266
267pub struct StateMachineRuntime {
268 runner: Runner,
269}
270
271impl StateMachineRuntime {
272 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
274 let secrets = Arc::new(FnSecretsHost::new(|name| {
275 Err(RunnerError::Secrets {
276 reason: format!("secret {name} unavailable (noop host)"),
277 })
278 }));
279 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
280 let session = Arc::new(InMemorySessionHost::new());
281 let state = Arc::new(InMemoryStateHost::new());
282 let host = HostBundle::new(secrets, telemetry, session, state);
283
284 let adapters = AdapterRegistry::default();
285 let policy = Policy::default();
286
287 let mut builder = RunnerBuilder::new()
288 .with_host(host)
289 .with_adapters(adapters)
290 .with_policy(policy);
291 for flow in flows {
292 builder = builder.with_flow(flow);
293 }
294 let runner = builder.build()?;
295 Ok(Self { runner })
296 }
297
298 pub fn from_flow_engine(
300 config: Arc<HostConfig>,
301 engine: Arc<FlowEngine>,
302 session_host: Arc<dyn SessionHost>,
303 session_store: DynSessionStore,
304 state_host: Arc<dyn StateHost>,
305 mocks: Option<Arc<MockLayer>>,
306 ) -> Result<Self> {
307 let secrets_cfg = Arc::clone(&config);
308 let secrets = Arc::new(FnSecretsHost::new(move |name| {
309 if !secrets_cfg.secrets_policy.is_allowed(name) {
310 return Err(RunnerError::Secrets {
311 reason: format!("secret {name} denied by policy"),
312 });
313 }
314 std::env::var(name).map_err(|_| RunnerError::Secrets {
315 reason: format!("secret {name} unavailable"),
316 })
317 }));
318 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
319 tracing::debug!(?span, ?fields, "telemetry emit");
320 Ok(())
321 }));
322 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
323 let resume_store = FlowResumeStore::new(session_store);
324
325 let mut adapters = AdapterRegistry::default();
326 adapters.register(
327 PACK_FLOW_ADAPTER,
328 Box::new(PackFlowAdapter::new(
329 Arc::clone(&config),
330 Arc::clone(&engine),
331 resume_store,
332 mocks,
333 )),
334 );
335
336 let flows = build_flow_definitions(engine.flows());
337 let mut builder = RunnerBuilder::new()
338 .with_host(host)
339 .with_adapters(adapters)
340 .with_policy(Policy::default());
341 for flow in flows {
342 builder = builder.with_flow(flow);
343 }
344 let runner = builder
345 .build()
346 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
347 Ok(Self { runner })
348 }
349
350 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
352 let tenant_ctx = envelope.tenant_ctx();
353 let session_hint = envelope
354 .session_hint
355 .clone()
356 .unwrap_or_else(|| envelope.canonical_session_hint());
357 let input =
358 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
359 let request = RunFlowRequest {
360 tenant: tenant_ctx,
361 flow_id: envelope.flow_id.clone(),
362 input,
363 session_hint: Some(session_hint),
364 };
365 let result: super::api::RunFlowResult = self
366 .runner
367 .run_flow(request)
368 .await
369 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
370 let outcome = result.outcome;
371 Ok(outcome.get("response").cloned().unwrap_or(outcome))
372 }
373}
374
375fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
376 flows
377 .iter()
378 .map(|descriptor| {
379 FlowDefinition::new(
380 super::api::FlowSummary {
381 id: descriptor.id.clone(),
382 name: descriptor
383 .description
384 .clone()
385 .unwrap_or_else(|| descriptor.id.clone()),
386 version: descriptor.version.clone(),
387 description: descriptor.description.clone(),
388 },
389 serde_json::json!({
390 "type": "object"
391 }),
392 vec![FlowStep::Adapter(AdapterCall {
393 adapter: PACK_FLOW_ADAPTER.into(),
394 operation: descriptor.id.clone(),
395 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
396 })],
397 )
398 })
399 .collect()
400}
401
402struct PackFlowAdapter {
403 tenant: String,
404 config: Arc<HostConfig>,
405 engine: Arc<FlowEngine>,
406 resume: FlowResumeStore,
407 mocks: Option<Arc<MockLayer>>,
408}
409
410impl PackFlowAdapter {
411 fn new(
412 config: Arc<HostConfig>,
413 engine: Arc<FlowEngine>,
414 resume: FlowResumeStore,
415 mocks: Option<Arc<MockLayer>>,
416 ) -> Self {
417 Self {
418 tenant: config.tenant.clone(),
419 config,
420 engine,
421 resume,
422 mocks,
423 }
424 }
425}
426
427#[async_trait::async_trait]
428impl Adapter for PackFlowAdapter {
429 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
430 let envelope: IngressEnvelope =
431 serde_json::from_value(call.payload.clone()).map_err(|err| {
432 RunnerError::AdapterCall {
433 reason: format!("invalid ingress payload: {err}"),
434 }
435 })?;
436 let flow_id = call.operation.clone();
437 let action_owned = envelope.action.clone();
438 let session_owned = envelope
439 .session_hint
440 .clone()
441 .unwrap_or_else(|| envelope.canonical_session_hint());
442 let provider_owned = envelope.provider.clone();
443 let payload = envelope.payload.clone();
444 let retry_config = self.config.mcp_retry_config().into();
445
446 let mocks = self.mocks.as_deref();
447 let ctx = FlowContext {
448 tenant: &self.tenant,
449 flow_id: &flow_id,
450 node_id: None,
451 tool: None,
452 action: action_owned.as_deref(),
453 session_id: Some(session_owned.as_str()),
454 provider_id: provider_owned.as_deref(),
455 retry_config,
456 observer: None,
457 mocks,
458 };
459
460 let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
461 self.engine.resume(ctx, snapshot, payload).await
462 } else {
463 self.engine.execute(ctx, payload).await
464 }
465 .map_err(|err| RunnerError::AdapterCall {
466 reason: err.to_string(),
467 })?;
468
469 match execution.status {
470 FlowStatus::Completed => {
471 self.resume.clear(&envelope)?;
472 Ok(execution.output)
473 }
474 FlowStatus::Waiting(wait) => {
475 self.resume.save(&envelope, &wait)?;
476 Ok(json!({
477 "status": "pending",
478 "reason": wait.reason,
479 "resume": wait.snapshot,
480 "response": execution.output,
481 }))
482 }
483 }
484 }
485}
486
487#[derive(Clone, Debug, Serialize, Deserialize)]
488pub struct IngressEnvelope {
489 pub tenant: String,
490 #[serde(default, skip_serializing_if = "Option::is_none")]
491 pub env: Option<String>,
492 pub flow_id: String,
493 #[serde(default, skip_serializing_if = "Option::is_none")]
494 pub flow_type: Option<String>,
495 #[serde(default, skip_serializing_if = "Option::is_none")]
496 pub action: Option<String>,
497 #[serde(default, skip_serializing_if = "Option::is_none")]
498 pub session_hint: Option<String>,
499 #[serde(default, skip_serializing_if = "Option::is_none")]
500 pub provider: Option<String>,
501 #[serde(default, skip_serializing_if = "Option::is_none")]
502 pub channel: Option<String>,
503 #[serde(default, skip_serializing_if = "Option::is_none")]
504 pub conversation: Option<String>,
505 #[serde(default, skip_serializing_if = "Option::is_none")]
506 pub user: Option<String>,
507 #[serde(default, skip_serializing_if = "Option::is_none")]
508 pub activity_id: Option<String>,
509 #[serde(default, skip_serializing_if = "Option::is_none")]
510 pub timestamp: Option<String>,
511 #[serde(default)]
512 pub payload: Value,
513 #[serde(default, skip_serializing_if = "Option::is_none")]
514 pub metadata: Option<Value>,
515}
516
517impl IngressEnvelope {
518 pub fn canonicalize(mut self) -> Self {
519 if self.provider.is_none() {
520 self.provider = Some("provider".into());
521 }
522 if self.channel.is_none() {
523 self.channel = Some(self.flow_id.clone());
524 }
525 if self.conversation.is_none() {
526 self.conversation = self.channel.clone();
527 }
528 if self.user.is_none() {
529 if let Some(ref hint) = self.session_hint {
530 self.user = Some(hint.clone());
531 } else if let Some(ref activity) = self.activity_id {
532 self.user = Some(activity.clone());
533 } else {
534 self.user = Some("user".into());
535 }
536 }
537 if self.session_hint.is_none() {
538 self.session_hint = Some(self.canonical_session_hint());
539 }
540 self
541 }
542
543 pub fn canonical_session_hint(&self) -> String {
544 format!(
545 "{}:{}:{}:{}:{}",
546 self.tenant,
547 self.provider.as_deref().unwrap_or("provider"),
548 self.channel.as_deref().unwrap_or("channel"),
549 self.conversation.as_deref().unwrap_or("conversation"),
550 self.user.as_deref().unwrap_or("user")
551 )
552 }
553
554 pub fn tenant_ctx(&self) -> TenantCtx {
555 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
556 let env = EnvId::from_str(env_raw.as_str())
557 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
558 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
559 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
560 });
561 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
562 if let Some(provider) = &self.provider {
563 ctx = ctx.with_provider(provider.clone());
564 }
565 if let Some(session) = &self.session_hint {
566 ctx = ctx.with_session(session.clone());
567 }
568 ctx
569 }
570}