1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::SessionData;
7use greentic_types::{
8 EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12use sha2::{Digest, Sha256};
13
14use super::api::{RunFlowRequest, RunnerApi};
15use super::builder::{Runner, RunnerBuilder};
16use super::error::{GResult, RunnerError};
17use super::glue::{FnSecretsHost, FnTelemetryHost};
18use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
19use super::policy::Policy;
20use super::registry::{Adapter, AdapterCall, AdapterRegistry};
21use super::shims::{InMemorySessionHost, InMemoryStateHost};
22use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
23
24use crate::config::{HostConfig, SecretsPolicy};
25use crate::pack::FlowDescriptor;
26use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
27use crate::runner::mocks::MockLayer;
28use crate::secrets::DynSecretsManager;
29use crate::storage::session::DynSessionStore;
30
31const DEFAULT_ENV: &str = "local";
32const PACK_FLOW_ADAPTER: &str = "pack_flow";
33
34#[derive(Clone)]
35pub struct FlowResumeStore {
36 store: DynSessionStore,
37}
38
39impl FlowResumeStore {
40 pub fn new(store: DynSessionStore) -> Self {
41 Self { store }
42 }
43
44 fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
45 let (mut ctx, user, _) = build_store_ctx(envelope)?;
46 ctx = ctx.with_user(Some(user.clone()));
47 if let Some((_key, data)) = self
48 .store
49 .find_by_user(&ctx, &user)
50 .map_err(map_store_error)?
51 {
52 let record: FlowResumeRecord =
53 serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
54 reason: format!("failed to decode flow resume snapshot: {err}"),
55 })?;
56 if record.snapshot.flow_id == envelope.flow_id {
57 return Ok(Some(record.snapshot));
58 }
59 }
60 Ok(None)
61 }
62
63 fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
64 let (ctx, user, hint) = build_store_ctx(envelope)?;
65 let record = FlowResumeRecord {
66 snapshot: wait.snapshot.clone(),
67 reason: wait.reason.clone(),
68 };
69 let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
70 let existing = self
71 .store
72 .find_by_user(&ctx, &user)
73 .map_err(map_store_error)?;
74 if let Some((key, _)) = existing {
75 self.store
76 .update_session(&key, data)
77 .map_err(map_store_error)?;
78 } else {
79 self.store
80 .create_session(&ctx, data)
81 .map_err(map_store_error)?;
82 }
83 Ok(())
84 }
85
86 fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
87 let (ctx, user, _) = build_store_ctx(envelope)?;
88 if let Some((key, _)) = self
89 .store
90 .find_by_user(&ctx, &user)
91 .map_err(map_store_error)?
92 {
93 self.store.remove_session(&key).map_err(map_store_error)?;
94 }
95 Ok(())
96 }
97}
98
99#[derive(Serialize, Deserialize)]
100struct FlowResumeRecord {
101 snapshot: FlowSnapshot,
102 #[serde(default)]
103 reason: Option<String>,
104}
105
106fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
107 let hint = envelope
108 .session_hint
109 .clone()
110 .unwrap_or_else(|| envelope.canonical_session_hint());
111 let user = derive_user_id(&hint)?;
112 let mut ctx = envelope.tenant_ctx();
113 ctx = ctx.with_session(hint.clone());
114 ctx = ctx.with_user(Some(user.clone()));
115 Ok((ctx, user, hint))
116}
117
118fn record_to_session_data(
119 record: &FlowResumeRecord,
120 ctx: TenantCtx,
121 user: &UserId,
122 session_hint: &str,
123) -> GResult<SessionData> {
124 let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
125 let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
126 if let Some(reason) = record.reason.clone() {
127 cursor = cursor.with_wait_reason(reason);
128 }
129 let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
130 reason: format!("failed to encode flow resume snapshot: {err}"),
131 })?;
132 let ctx = ctx
133 .with_user(Some(user.clone()))
134 .with_session(session_hint.to_string())
135 .with_flow(record.snapshot.flow_id.clone());
136 Ok(SessionData {
137 tenant_ctx: ctx,
138 flow_id: flow,
139 cursor,
140 context_json,
141 })
142}
143
144fn derive_user_id(hint: &str) -> GResult<UserId> {
145 let digest = Sha256::digest(hint.as_bytes());
146 let slug = format!("sess{}", hex::encode(&digest[..8]));
147 UserId::from_str(&slug).map_err(map_store_error)
148}
149
150fn map_store_error(err: GreenticError) -> RunnerError {
151 RunnerError::Session {
152 reason: err.to_string(),
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::runner::engine::ExecutionState;
160 use crate::storage::session::new_session_store;
161 use serde_json::json;
162
163 fn sample_envelope() -> IngressEnvelope {
164 IngressEnvelope {
165 tenant: "demo".into(),
166 env: Some("local".into()),
167 flow_id: "flow.main".into(),
168 flow_type: None,
169 action: Some("messaging".into()),
170 session_hint: Some("demo:provider:chan:conv:user".into()),
171 provider: Some("provider".into()),
172 channel: Some("chan".into()),
173 conversation: Some("conv".into()),
174 user: Some("user".into()),
175 activity_id: Some("act-1".into()),
176 timestamp: None,
177 payload: json!({ "text": "hi" }),
178 metadata: None,
179 }
180 }
181
182 fn sample_wait() -> FlowWait {
183 let state: ExecutionState = serde_json::from_value(json!({
184 "input": { "text": "hi" },
185 "nodes": {},
186 "egress": []
187 }))
188 .expect("state");
189 FlowWait {
190 reason: Some("await-user".into()),
191 snapshot: FlowSnapshot {
192 flow_id: "flow.main".into(),
193 next_node: "node-2".into(),
194 state,
195 },
196 }
197 }
198
199 #[test]
200 fn derive_user_id_is_stable() {
201 let hint = "some-tenant::session-key";
202 let a = derive_user_id(hint).unwrap();
203 let b = derive_user_id(hint).unwrap();
204 assert_eq!(a, b);
205 assert!(a.as_str().starts_with("sess"));
206 }
207
208 #[test]
209 fn resume_store_roundtrip() -> GResult<()> {
210 let store = FlowResumeStore::new(new_session_store());
211 let envelope = sample_envelope();
212 assert!(store.fetch(&envelope)?.is_none());
213
214 let wait = sample_wait();
215 store.save(&envelope, &wait)?;
216 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
217 assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
218 assert_eq!(snapshot.next_node, wait.snapshot.next_node);
219
220 store.clear(&envelope)?;
221 assert!(store.fetch(&envelope)?.is_none());
222 Ok(())
223 }
224
225 #[test]
226 fn resume_store_overwrites_existing() -> GResult<()> {
227 let store = FlowResumeStore::new(new_session_store());
228 let envelope = sample_envelope();
229 let mut wait = sample_wait();
230 store.save(&envelope, &wait)?;
231
232 wait.snapshot.next_node = "node-3".into();
233 wait.reason = Some("retry".into());
234 store.save(&envelope, &wait)?;
235
236 let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
237 assert_eq!(snapshot.next_node, "node-3");
238 store.clear(&envelope)?;
239 Ok(())
240 }
241
242 #[test]
243 fn canonicalize_populates_defaults() {
244 let envelope = IngressEnvelope {
245 tenant: "demo".into(),
246 env: None,
247 flow_id: "flow.main".into(),
248 flow_type: None,
249 action: None,
250 session_hint: None,
251 provider: None,
252 channel: None,
253 conversation: None,
254 user: None,
255 activity_id: Some("activity-1".into()),
256 timestamp: None,
257 payload: json!({}),
258 metadata: None,
259 }
260 .canonicalize();
261
262 assert_eq!(envelope.provider.as_deref(), Some("provider"));
263 assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
264 assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
265 assert_eq!(envelope.user.as_deref(), Some("activity-1"));
266 assert!(envelope.session_hint.is_some());
267 }
268}
269
270pub struct StateMachineRuntime {
271 runner: Runner,
272}
273
274impl StateMachineRuntime {
275 pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
277 let secrets = Arc::new(FnSecretsHost::new(|name| {
278 Err(RunnerError::Secrets {
279 reason: format!("secret {name} unavailable (noop host)"),
280 })
281 }));
282 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
283 let session = Arc::new(InMemorySessionHost::new());
284 let state = Arc::new(InMemoryStateHost::new());
285 let host = HostBundle::new(secrets, telemetry, session, state);
286
287 let adapters = AdapterRegistry::default();
288 let policy = Policy::default();
289
290 let mut builder = RunnerBuilder::new()
291 .with_host(host)
292 .with_adapters(adapters)
293 .with_policy(policy);
294 for flow in flows {
295 builder = builder.with_flow(flow);
296 }
297 let runner = builder.build()?;
298 Ok(Self { runner })
299 }
300
301 pub fn from_flow_engine(
303 config: Arc<HostConfig>,
304 engine: Arc<FlowEngine>,
305 session_host: Arc<dyn SessionHost>,
306 session_store: DynSessionStore,
307 state_host: Arc<dyn StateHost>,
308 secrets_manager: DynSecretsManager,
309 mocks: Option<Arc<MockLayer>>,
310 ) -> Result<Self> {
311 let policy = Arc::new(config.secrets_policy.clone());
312 let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager));
313 let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
314 tracing::debug!(?span, ?fields, "telemetry emit");
315 Ok(())
316 }));
317 let host = HostBundle::new(secrets, telemetry, session_host, state_host);
318 let resume_store = FlowResumeStore::new(session_store);
319
320 let mut adapters = AdapterRegistry::default();
321 adapters.register(
322 PACK_FLOW_ADAPTER,
323 Box::new(PackFlowAdapter::new(
324 Arc::clone(&config),
325 Arc::clone(&engine),
326 resume_store,
327 mocks,
328 )),
329 );
330
331 let flows = build_flow_definitions(engine.flows());
332 let mut builder = RunnerBuilder::new()
333 .with_host(host)
334 .with_adapters(adapters)
335 .with_policy(Policy::default());
336 for flow in flows {
337 builder = builder.with_flow(flow);
338 }
339 let runner = builder
340 .build()
341 .map_err(|err| anyhow!("state machine init failed: {err}"))?;
342 Ok(Self { runner })
343 }
344
345 pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
347 let tenant_ctx = envelope.tenant_ctx();
348 let session_hint = envelope
349 .session_hint
350 .clone()
351 .unwrap_or_else(|| envelope.canonical_session_hint());
352 let input =
353 serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
354 let request = RunFlowRequest {
355 tenant: tenant_ctx,
356 flow_id: envelope.flow_id.clone(),
357 input,
358 session_hint: Some(session_hint),
359 };
360 let result: super::api::RunFlowResult = self
361 .runner
362 .run_flow(request)
363 .await
364 .map_err(|err| anyhow!("flow execution failed: {err}"))?;
365 let outcome = result.outcome;
366 Ok(outcome.get("response").cloned().unwrap_or(outcome))
367 }
368}
369
370struct PolicySecretsHost {
371 policy: Arc<SecretsPolicy>,
372 manager: DynSecretsManager,
373}
374
375impl PolicySecretsHost {
376 fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager) -> Self {
377 Self { policy, manager }
378 }
379}
380
381#[async_trait]
382impl SecretsHost for PolicySecretsHost {
383 async fn get(&self, name: &str) -> GResult<String> {
384 if !self.policy.is_allowed(name) {
385 return Err(RunnerError::Secrets {
386 reason: format!("secret {name} denied by policy"),
387 });
388 }
389 let bytes = self
390 .manager
391 .read(name)
392 .await
393 .map_err(|err| RunnerError::Secrets {
394 reason: format!("secret {name} unavailable: {err}"),
395 })?;
396 String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
397 reason: format!("secret {name} not valid UTF-8: {err}"),
398 })
399 }
400}
401
402fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
403 flows
404 .iter()
405 .map(|descriptor| {
406 FlowDefinition::new(
407 super::api::FlowSummary {
408 id: descriptor.id.clone(),
409 name: descriptor
410 .description
411 .clone()
412 .unwrap_or_else(|| descriptor.id.clone()),
413 version: descriptor.version.clone(),
414 description: descriptor.description.clone(),
415 },
416 serde_json::json!({
417 "type": "object"
418 }),
419 vec![FlowStep::Adapter(AdapterCall {
420 adapter: PACK_FLOW_ADAPTER.into(),
421 operation: descriptor.id.clone(),
422 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
423 })],
424 )
425 })
426 .collect()
427}
428
429struct PackFlowAdapter {
430 tenant: String,
431 config: Arc<HostConfig>,
432 engine: Arc<FlowEngine>,
433 resume: FlowResumeStore,
434 mocks: Option<Arc<MockLayer>>,
435}
436
437impl PackFlowAdapter {
438 fn new(
439 config: Arc<HostConfig>,
440 engine: Arc<FlowEngine>,
441 resume: FlowResumeStore,
442 mocks: Option<Arc<MockLayer>>,
443 ) -> Self {
444 Self {
445 tenant: config.tenant.clone(),
446 config,
447 engine,
448 resume,
449 mocks,
450 }
451 }
452}
453
454#[async_trait::async_trait]
455impl Adapter for PackFlowAdapter {
456 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
457 let envelope: IngressEnvelope =
458 serde_json::from_value(call.payload.clone()).map_err(|err| {
459 RunnerError::AdapterCall {
460 reason: format!("invalid ingress payload: {err}"),
461 }
462 })?;
463 let flow_id = call.operation.clone();
464 let action_owned = envelope.action.clone();
465 let session_owned = envelope
466 .session_hint
467 .clone()
468 .unwrap_or_else(|| envelope.canonical_session_hint());
469 let provider_owned = envelope.provider.clone();
470 let payload = envelope.payload.clone();
471 let retry_config = self.config.retry_config().into();
472
473 let mocks = self.mocks.as_deref();
474 let ctx = FlowContext {
475 tenant: &self.tenant,
476 flow_id: &flow_id,
477 node_id: None,
478 tool: None,
479 action: action_owned.as_deref(),
480 session_id: Some(session_owned.as_str()),
481 provider_id: provider_owned.as_deref(),
482 retry_config,
483 observer: None,
484 mocks,
485 };
486
487 let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
488 self.engine.resume(ctx, snapshot, payload).await
489 } else {
490 self.engine.execute(ctx, payload).await
491 }
492 .map_err(|err| RunnerError::AdapterCall {
493 reason: err.to_string(),
494 })?;
495
496 match execution.status {
497 FlowStatus::Completed => {
498 self.resume.clear(&envelope)?;
499 Ok(execution.output)
500 }
501 FlowStatus::Waiting(wait) => {
502 self.resume.save(&envelope, &wait)?;
503 Ok(json!({
504 "status": "pending",
505 "reason": wait.reason,
506 "resume": wait.snapshot,
507 "response": execution.output,
508 }))
509 }
510 }
511 }
512}
513
514#[derive(Clone, Debug, Serialize, Deserialize)]
515pub struct IngressEnvelope {
516 pub tenant: String,
517 #[serde(default, skip_serializing_if = "Option::is_none")]
518 pub env: Option<String>,
519 pub flow_id: String,
520 #[serde(default, skip_serializing_if = "Option::is_none")]
521 pub flow_type: Option<String>,
522 #[serde(default, skip_serializing_if = "Option::is_none")]
523 pub action: Option<String>,
524 #[serde(default, skip_serializing_if = "Option::is_none")]
525 pub session_hint: Option<String>,
526 #[serde(default, skip_serializing_if = "Option::is_none")]
527 pub provider: Option<String>,
528 #[serde(default, skip_serializing_if = "Option::is_none")]
529 pub channel: Option<String>,
530 #[serde(default, skip_serializing_if = "Option::is_none")]
531 pub conversation: Option<String>,
532 #[serde(default, skip_serializing_if = "Option::is_none")]
533 pub user: Option<String>,
534 #[serde(default, skip_serializing_if = "Option::is_none")]
535 pub activity_id: Option<String>,
536 #[serde(default, skip_serializing_if = "Option::is_none")]
537 pub timestamp: Option<String>,
538 #[serde(default)]
539 pub payload: Value,
540 #[serde(default, skip_serializing_if = "Option::is_none")]
541 pub metadata: Option<Value>,
542}
543
544impl IngressEnvelope {
545 pub fn canonicalize(mut self) -> Self {
546 if self.provider.is_none() {
547 self.provider = Some("provider".into());
548 }
549 if self.channel.is_none() {
550 self.channel = Some(self.flow_id.clone());
551 }
552 if self.conversation.is_none() {
553 self.conversation = self.channel.clone();
554 }
555 if self.user.is_none() {
556 if let Some(ref hint) = self.session_hint {
557 self.user = Some(hint.clone());
558 } else if let Some(ref activity) = self.activity_id {
559 self.user = Some(activity.clone());
560 } else {
561 self.user = Some("user".into());
562 }
563 }
564 if self.session_hint.is_none() {
565 self.session_hint = Some(self.canonical_session_hint());
566 }
567 self
568 }
569
570 pub fn canonical_session_hint(&self) -> String {
571 format!(
572 "{}:{}:{}:{}:{}",
573 self.tenant,
574 self.provider.as_deref().unwrap_or("provider"),
575 self.channel.as_deref().unwrap_or("channel"),
576 self.conversation.as_deref().unwrap_or("conversation"),
577 self.user.as_deref().unwrap_or("user")
578 )
579 }
580
581 pub fn tenant_ctx(&self) -> TenantCtx {
582 let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
583 let env = EnvId::from_str(env_raw.as_str())
584 .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
585 let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
586 TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
587 });
588 let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
589 if let Some(provider) = &self.provider {
590 ctx = ctx.with_provider(provider.clone());
591 }
592 if let Some(session) = &self.session_hint {
593 ctx = ctx.with_session(session.clone());
594 }
595 ctx
596 }
597}