1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::SessionData;
7use greentic_types::{
8 EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12use sha2::{Digest, Sha256};
13
14use super::api::{RunFlowRequest, RunnerApi};
15use super::builder::{Runner, RunnerBuilder};
16use super::error::{GResult, RunnerError};
17use super::glue::{FnSecretsHost, FnTelemetryHost};
18use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
19use super::policy::Policy;
20use super::registry::{Adapter, AdapterCall, AdapterRegistry};
21use super::shims::{InMemorySessionHost, InMemoryStateHost};
22use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
23
24use crate::config::{HostConfig, SecretsPolicy};
25use crate::pack::FlowDescriptor;
26use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
27use crate::runner::mocks::MockLayer;
28use crate::secrets::DynSecretsManager;
29use crate::storage::session::DynSessionStore;
30
31const DEFAULT_ENV: &str = "local";
32const PACK_FLOW_ADAPTER: &str = "pack_flow";
33
34#[derive(Clone)]
35pub struct FlowResumeStore {
36 store: DynSessionStore,
37}
38
39impl FlowResumeStore {
40 pub fn new(store: DynSessionStore) -> Self {
41 Self { store }
42 }
43
44 fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
45 let (mut ctx, user, _) = build_store_ctx(envelope)?;
46 ctx = ctx.with_user(Some(user.clone()));
47 if let Some((_key, data)) = self
48 .store
49 .find_by_user(&ctx, &user)
50 .map_err(map_store_error)?
51 {
52 let record: FlowResumeRecord =
53 serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
54 reason: format!("failed to decode flow resume snapshot: {err}"),
55 })?;
56 if record.snapshot.flow_id == envelope.flow_id {
57 return Ok(Some(record.snapshot));
58 }
59 }
60 Ok(None)
61 }
62
63 fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
64 let (ctx, user, hint) = build_store_ctx(envelope)?;
65 let record = FlowResumeRecord {
66 snapshot: wait.snapshot.clone(),
67 reason: wait.reason.clone(),
68 };
69 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
70 let existing = self
71 .store
72 .find_by_user(&ctx, &user)
73 .map_err(map_store_error)?;
74 if let Some((key, _)) = existing {
75 self.store
76 .update_session(&key, data)
77 .map_err(map_store_error)?;
78 } else {
79 self.store
80 .create_session(&ctx, data)
81 .map_err(map_store_error)?;
82 }
83 Ok(())
84 }
85
86 fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
87 let (ctx, user, _) = build_store_ctx(envelope)?;
88 if let Some((key, _)) = self
89 .store
90 .find_by_user(&ctx, &user)
91 .map_err(map_store_error)?
92 {
93 self.store.remove_session(&key).map_err(map_store_error)?;
94 }
95 Ok(())
96 }
97}
98
99#[derive(Serialize, Deserialize)]
100struct FlowResumeRecord {
101 snapshot: FlowSnapshot,
102 #[serde(default)]
103 reason: Option<String>,
104}
105
106fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
107 let hint = envelope
108 .session_hint
109 .clone()
110 .unwrap_or_else(|| envelope.canonical_session_hint());
111 let user = derive_user_id(&hint)?;
112 let mut ctx = envelope.tenant_ctx();
113 ctx = ctx.with_session(hint.clone());
114 Ok((ctx, user, hint))
115}
116
117fn record_to_session_data(
118 record: &FlowResumeRecord,
119 ctx: TenantCtx,
120 user: &UserId,
121 session_hint: &str,
122) -> GResult<SessionData> {
123 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
124 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
125 if let Some(reason) = record.reason.clone() {
126 cursor = cursor.with_wait_reason(reason);
127 }
128 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
129 reason: format!("failed to encode flow resume snapshot: {err}"),
130 })?;
131 let ctx = ctx
132 .with_user(Some(user.clone()))
133 .with_session(session_hint.to_string())
134 .with_flow(record.snapshot.flow_id.clone());
135 Ok(SessionData {
136 tenant_ctx: ctx,
137 flow_id: flow,
138 cursor,
139 context_json,
140 })
141}
142
143fn derive_user_id(hint: &str) -> GResult<UserId> {
144 let digest = Sha256::digest(hint.as_bytes());
145 let slug = format!("sess{}", hex::encode(&digest[..8]));
146 UserId::from_str(&slug).map_err(map_store_error)
147}
148
149fn map_store_error(err: GreenticError) -> RunnerError {
150 RunnerError::Session {
151 reason: err.to_string(),
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::runner::engine::ExecutionState;
159 use crate::storage::session::new_session_store;
160 use serde_json::json;
161
162 fn sample_envelope() -> IngressEnvelope {
163 IngressEnvelope {
164 tenant: "demo".into(),
165 env: Some("local".into()),
166 flow_id: "flow.main".into(),
167 flow_type: None,
168 action: Some("messaging".into()),
169 session_hint: Some("demo:provider:chan:conv:user".into()),
170 provider: Some("provider".into()),
171 channel: Some("chan".into()),
172 conversation: Some("conv".into()),
173 user: Some("user".into()),
174 activity_id: Some("act-1".into()),
175 timestamp: None,
176 payload: json!({ "text": "hi" }),
177 metadata: None,
178 }
179 }
180
181 fn sample_wait() -> FlowWait {
182 let state: ExecutionState = serde_json::from_value(json!({
183 "input": { "text": "hi" },
184 "nodes": {},
185 "egress": []
186 }))
187 .expect("state");
188 FlowWait {
189 reason: Some("await-user".into()),
190 snapshot: FlowSnapshot {
191 flow_id: "flow.main".into(),
192 next_node: "node-2".into(),
193 state,
194 },
195 }
196 }
197
198 #[test]
199 fn derive_user_id_is_stable() {
200 let hint = "some-tenant::session-key";
201 let a = derive_user_id(hint).unwrap();
202 let b = derive_user_id(hint).unwrap();
203 assert_eq!(a, b);
204 assert!(a.as_str().starts_with("sess"));
205 }
206
207 #[test]
208 fn resume_store_roundtrip() -> GResult<()> {
209 let store = FlowResumeStore::new(new_session_store());
210 let envelope = sample_envelope();
211 assert!(store.fetch(&envelope)?.is_none());
212
213 let wait = sample_wait();
214 store.save(&envelope, &wait)?;
215 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
216 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
217 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
218
219 store.clear(&envelope)?;
220 assert!(store.fetch(&envelope)?.is_none());
221 Ok(())
222 }
223
224 #[test]
225 fn resume_store_overwrites_existing() -> GResult<()> {
226 let store = FlowResumeStore::new(new_session_store());
227 let envelope = sample_envelope();
228 let mut wait = sample_wait();
229 store.save(&envelope, &wait)?;
230
231 wait.snapshot.next_node = "node-3".into();
232 wait.reason = Some("retry".into());
233 store.save(&envelope, &wait)?;
234
235 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
236 assert_eq!(snapshot.next_node, "node-3");
237 store.clear(&envelope)?;
238 Ok(())
239 }
240
241 #[test]
242 fn canonicalize_populates_defaults() {
243 let envelope = IngressEnvelope {
244 tenant: "demo".into(),
245 env: None,
246 flow_id: "flow.main".into(),
247 flow_type: None,
248 action: None,
249 session_hint: None,
250 provider: None,
251 channel: None,
252 conversation: None,
253 user: None,
254 activity_id: Some("activity-1".into()),
255 timestamp: None,
256 payload: json!({}),
257 metadata: None,
258 }
259 .canonicalize();
260
261 assert_eq!(envelope.provider.as_deref(), Some("provider"));
262 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
263 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
264 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
265 assert!(envelope.session_hint.is_some());
266 }
267}
268
269pub struct StateMachineRuntime {
270 runner: Runner,
271}
272
273impl StateMachineRuntime {
274 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
276 let secrets = Arc::new(FnSecretsHost::new(|name| {
277 Err(RunnerError::Secrets {
278 reason: format!("secret {name} unavailable (noop host)"),
279 })
280 }));
281 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
282 let session = Arc::new(InMemorySessionHost::new());
283 let state = Arc::new(InMemoryStateHost::new());
284 let host = HostBundle::new(secrets, telemetry, session, state);
285
286 let adapters = AdapterRegistry::default();
287 let policy = Policy::default();
288
289 let mut builder = RunnerBuilder::new()
290 .with_host(host)
291 .with_adapters(adapters)
292 .with_policy(policy);
293 for flow in flows {
294 builder = builder.with_flow(flow);
295 }
296 let runner = builder.build()?;
297 Ok(Self { runner })
298 }
299
300 pub fn from_flow_engine(
302 config: Arc<HostConfig>,
303 engine: Arc<FlowEngine>,
304 session_host: Arc<dyn SessionHost>,
305 session_store: DynSessionStore,
306 state_host: Arc<dyn StateHost>,
307 secrets_manager: DynSecretsManager,
308 mocks: Option<Arc<MockLayer>>,
309 ) -> Result<Self> {
310 let policy = Arc::new(config.secrets_policy.clone());
311 let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager));
312 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
313 tracing::debug!(?span, ?fields, "telemetry emit");
314 Ok(())
315 }));
316 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
317 let resume_store = FlowResumeStore::new(session_store);
318
319 let mut adapters = AdapterRegistry::default();
320 adapters.register(
321 PACK_FLOW_ADAPTER,
322 Box::new(PackFlowAdapter::new(
323 Arc::clone(&config),
324 Arc::clone(&engine),
325 resume_store,
326 mocks,
327 )),
328 );
329
330 let flows = build_flow_definitions(engine.flows());
331 let mut builder = RunnerBuilder::new()
332 .with_host(host)
333 .with_adapters(adapters)
334 .with_policy(Policy::default());
335 for flow in flows {
336 builder = builder.with_flow(flow);
337 }
338 let runner = builder
339 .build()
340 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
341 Ok(Self { runner })
342 }
343
344 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
346 let tenant_ctx = envelope.tenant_ctx();
347 let session_hint = envelope
348 .session_hint
349 .clone()
350 .unwrap_or_else(|| envelope.canonical_session_hint());
351 let input =
352 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
353 let request = RunFlowRequest {
354 tenant: tenant_ctx,
355 flow_id: envelope.flow_id.clone(),
356 input,
357 session_hint: Some(session_hint),
358 };
359 let result: super::api::RunFlowResult = self
360 .runner
361 .run_flow(request)
362 .await
363 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
364 let outcome = result.outcome;
365 Ok(outcome.get("response").cloned().unwrap_or(outcome))
366 }
367}
368
369struct PolicySecretsHost {
370 policy: Arc<SecretsPolicy>,
371 manager: DynSecretsManager,
372}
373
374impl PolicySecretsHost {
375 fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager) -> Self {
376 Self { policy, manager }
377 }
378}
379
380#[async_trait]
381impl SecretsHost for PolicySecretsHost {
382 async fn get(&self, name: &str) -> GResult<String> {
383 if !self.policy.is_allowed(name) {
384 return Err(RunnerError::Secrets {
385 reason: format!("secret {name} denied by policy"),
386 });
387 }
388 let bytes = self
389 .manager
390 .read(name)
391 .await
392 .map_err(|err| RunnerError::Secrets {
393 reason: format!("secret {name} unavailable: {err}"),
394 })?;
395 String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
396 reason: format!("secret {name} not valid UTF-8: {err}"),
397 })
398 }
399}
400
401fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
402 flows
403 .iter()
404 .map(|descriptor| {
405 FlowDefinition::new(
406 super::api::FlowSummary {
407 id: descriptor.id.clone(),
408 name: descriptor
409 .description
410 .clone()
411 .unwrap_or_else(|| descriptor.id.clone()),
412 version: descriptor.version.clone(),
413 description: descriptor.description.clone(),
414 },
415 serde_json::json!({
416 "type": "object"
417 }),
418 vec![FlowStep::Adapter(AdapterCall {
419 adapter: PACK_FLOW_ADAPTER.into(),
420 operation: descriptor.id.clone(),
421 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
422 })],
423 )
424 })
425 .collect()
426}
427
428struct PackFlowAdapter {
429 tenant: String,
430 config: Arc<HostConfig>,
431 engine: Arc<FlowEngine>,
432 resume: FlowResumeStore,
433 mocks: Option<Arc<MockLayer>>,
434}
435
436impl PackFlowAdapter {
437 fn new(
438 config: Arc<HostConfig>,
439 engine: Arc<FlowEngine>,
440 resume: FlowResumeStore,
441 mocks: Option<Arc<MockLayer>>,
442 ) -> Self {
443 Self {
444 tenant: config.tenant.clone(),
445 config,
446 engine,
447 resume,
448 mocks,
449 }
450 }
451}
452
453#[async_trait::async_trait]
454impl Adapter for PackFlowAdapter {
455 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
456 let envelope: IngressEnvelope =
457 serde_json::from_value(call.payload.clone()).map_err(|err| {
458 RunnerError::AdapterCall {
459 reason: format!("invalid ingress payload: {err}"),
460 }
461 })?;
462 let flow_id = call.operation.clone();
463 let action_owned = envelope.action.clone();
464 let session_owned = envelope
465 .session_hint
466 .clone()
467 .unwrap_or_else(|| envelope.canonical_session_hint());
468 let provider_owned = envelope.provider.clone();
469 let payload = envelope.payload.clone();
470 let retry_config = self.config.mcp_retry_config().into();
471
472 let mocks = self.mocks.as_deref();
473 let ctx = FlowContext {
474 tenant: &self.tenant,
475 flow_id: &flow_id,
476 node_id: None,
477 tool: None,
478 action: action_owned.as_deref(),
479 session_id: Some(session_owned.as_str()),
480 provider_id: provider_owned.as_deref(),
481 retry_config,
482 observer: None,
483 mocks,
484 };
485
486 let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
487 self.engine.resume(ctx, snapshot, payload).await
488 } else {
489 self.engine.execute(ctx, payload).await
490 }
491 .map_err(|err| RunnerError::AdapterCall {
492 reason: err.to_string(),
493 })?;
494
495 match execution.status {
496 FlowStatus::Completed => {
497 self.resume.clear(&envelope)?;
498 Ok(execution.output)
499 }
500 FlowStatus::Waiting(wait) => {
501 self.resume.save(&envelope, &wait)?;
502 Ok(json!({
503 "status": "pending",
504 "reason": wait.reason,
505 "resume": wait.snapshot,
506 "response": execution.output,
507 }))
508 }
509 }
510 }
511}
512
513#[derive(Clone, Debug, Serialize, Deserialize)]
514pub struct IngressEnvelope {
515 pub tenant: String,
516 #[serde(default, skip_serializing_if = "Option::is_none")]
517 pub env: Option<String>,
518 pub flow_id: String,
519 #[serde(default, skip_serializing_if = "Option::is_none")]
520 pub flow_type: Option<String>,
521 #[serde(default, skip_serializing_if = "Option::is_none")]
522 pub action: Option<String>,
523 #[serde(default, skip_serializing_if = "Option::is_none")]
524 pub session_hint: Option<String>,
525 #[serde(default, skip_serializing_if = "Option::is_none")]
526 pub provider: Option<String>,
527 #[serde(default, skip_serializing_if = "Option::is_none")]
528 pub channel: Option<String>,
529 #[serde(default, skip_serializing_if = "Option::is_none")]
530 pub conversation: Option<String>,
531 #[serde(default, skip_serializing_if = "Option::is_none")]
532 pub user: Option<String>,
533 #[serde(default, skip_serializing_if = "Option::is_none")]
534 pub activity_id: Option<String>,
535 #[serde(default, skip_serializing_if = "Option::is_none")]
536 pub timestamp: Option<String>,
537 #[serde(default)]
538 pub payload: Value,
539 #[serde(default, skip_serializing_if = "Option::is_none")]
540 pub metadata: Option<Value>,
541}
542
543impl IngressEnvelope {
544 pub fn canonicalize(mut self) -> Self {
545 if self.provider.is_none() {
546 self.provider = Some("provider".into());
547 }
548 if self.channel.is_none() {
549 self.channel = Some(self.flow_id.clone());
550 }
551 if self.conversation.is_none() {
552 self.conversation = self.channel.clone();
553 }
554 if self.user.is_none() {
555 if let Some(ref hint) = self.session_hint {
556 self.user = Some(hint.clone());
557 } else if let Some(ref activity) = self.activity_id {
558 self.user = Some(activity.clone());
559 } else {
560 self.user = Some("user".into());
561 }
562 }
563 if self.session_hint.is_none() {
564 self.session_hint = Some(self.canonical_session_hint());
565 }
566 self
567 }
568
569 pub fn canonical_session_hint(&self) -> String {
570 format!(
571 "{}:{}:{}:{}:{}",
572 self.tenant,
573 self.provider.as_deref().unwrap_or("provider"),
574 self.channel.as_deref().unwrap_or("channel"),
575 self.conversation.as_deref().unwrap_or("conversation"),
576 self.user.as_deref().unwrap_or("user")
577 )
578 }
579
580 pub fn tenant_ctx(&self) -> TenantCtx {
581 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
582 let env = EnvId::from_str(env_raw.as_str())
583 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
584 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
585 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
586 });
587 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
588 if let Some(provider) = &self.provider {
589 ctx = ctx.with_provider(provider.clone());
590 }
591 if let Some(session) = &self.session_hint {
592 ctx = ctx.with_session(session.clone());
593 }
594 ctx
595 }
596}