greentic_runner_host/engine/
runtime.rs

1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use greentic_session::SessionData;
6use greentic_types::{
7    EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
8};
9use serde::{Deserialize, Serialize};
10use serde_json::{Value, json};
11use sha2::{Digest, Sha256};
12
13use super::api::{RunFlowRequest, RunnerApi};
14use super::builder::{Runner, RunnerBuilder};
15use super::error::{GResult, RunnerError};
16use super::glue::{FnSecretsHost, FnTelemetryHost};
17use super::host::{HostBundle, SessionHost, StateHost};
18use super::policy::Policy;
19use super::registry::{Adapter, AdapterCall, AdapterRegistry};
20use super::shims::{InMemorySessionHost, InMemoryStateHost};
21use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
22
23use crate::config::HostConfig;
24use crate::pack::FlowDescriptor;
25use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
26use crate::runner::mocks::MockLayer;
27use crate::storage::session::DynSessionStore;
28
29const DEFAULT_ENV: &str = "local";
30const PACK_FLOW_ADAPTER: &str = "pack_flow";
31
32#[derive(Clone)]
33pub struct FlowResumeStore {
34    store: DynSessionStore,
35}
36
37impl FlowResumeStore {
38    pub fn new(store: DynSessionStore) -> Self {
39        Self { store }
40    }
41
42    fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
43        let (mut ctx, user, _) = build_store_ctx(envelope)?;
44        ctx = ctx.with_user(Some(user.clone()));
45        if let Some((_key, data)) = self
46            .store
47            .find_by_user(&ctx, &user)
48            .map_err(map_store_error)?
49        {
50            let record: FlowResumeRecord =
51                serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
52                    reason: format!("failed to decode flow resume snapshot: {err}"),
53                })?;
54            if record.snapshot.flow_id == envelope.flow_id {
55                return Ok(Some(record.snapshot));
56            }
57        }
58        Ok(None)
59    }
60
61    fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
62        let (ctx, user, hint) = build_store_ctx(envelope)?;
63        let record = FlowResumeRecord {
64            snapshot: wait.snapshot.clone(),
65            reason: wait.reason.clone(),
66        };
67        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
68        let existing = self
69            .store
70            .find_by_user(&ctx, &user)
71            .map_err(map_store_error)?;
72        if let Some((key, _)) = existing {
73            self.store
74                .update_session(&key, data)
75                .map_err(map_store_error)?;
76        } else {
77            self.store
78                .create_session(&ctx, data)
79                .map_err(map_store_error)?;
80        }
81        Ok(())
82    }
83
84    fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
85        let (ctx, user, _) = build_store_ctx(envelope)?;
86        if let Some((key, _)) = self
87            .store
88            .find_by_user(&ctx, &user)
89            .map_err(map_store_error)?
90        {
91            self.store.remove_session(&key).map_err(map_store_error)?;
92        }
93        Ok(())
94    }
95}
96
97#[derive(Serialize, Deserialize)]
98struct FlowResumeRecord {
99    snapshot: FlowSnapshot,
100    #[serde(default)]
101    reason: Option<String>,
102}
103
104fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
105    let hint = envelope
106        .session_hint
107        .clone()
108        .unwrap_or_else(|| envelope.canonical_session_hint());
109    let user = derive_user_id(&hint)?;
110    let mut ctx = envelope.tenant_ctx();
111    ctx = ctx.with_session(hint.clone());
112    Ok((ctx, user, hint))
113}
114
115fn record_to_session_data(
116    record: &FlowResumeRecord,
117    ctx: TenantCtx,
118    user: &UserId,
119    session_hint: &str,
120) -> GResult<SessionData> {
121    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
122    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
123    if let Some(reason) = record.reason.clone() {
124        cursor = cursor.with_wait_reason(reason);
125    }
126    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
127        reason: format!("failed to encode flow resume snapshot: {err}"),
128    })?;
129    let ctx = ctx
130        .with_user(Some(user.clone()))
131        .with_session(session_hint.to_string())
132        .with_flow(record.snapshot.flow_id.clone());
133    Ok(SessionData {
134        tenant_ctx: ctx,
135        flow_id: flow,
136        cursor,
137        context_json,
138    })
139}
140
141fn derive_user_id(hint: &str) -> GResult<UserId> {
142    let digest = Sha256::digest(hint.as_bytes());
143    let slug = format!("sess{}", hex::encode(&digest[..8]));
144    UserId::from_str(&slug).map_err(map_store_error)
145}
146
147fn map_store_error(err: GreenticError) -> RunnerError {
148    RunnerError::Session {
149        reason: err.to_string(),
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use crate::runner::engine::ExecutionState;
157    use crate::storage::session::new_session_store;
158    use serde_json::json;
159
160    fn sample_envelope() -> IngressEnvelope {
161        IngressEnvelope {
162            tenant: "demo".into(),
163            env: Some("local".into()),
164            flow_id: "flow.main".into(),
165            flow_type: None,
166            action: Some("messaging".into()),
167            session_hint: Some("demo:provider:chan:conv:user".into()),
168            provider: Some("provider".into()),
169            channel: Some("chan".into()),
170            conversation: Some("conv".into()),
171            user: Some("user".into()),
172            activity_id: Some("act-1".into()),
173            timestamp: None,
174            payload: json!({ "text": "hi" }),
175            metadata: None,
176        }
177    }
178
179    fn sample_wait() -> FlowWait {
180        let state: ExecutionState = serde_json::from_value(json!({
181            "input": { "text": "hi" },
182            "nodes": {},
183            "egress": []
184        }))
185        .expect("state");
186        FlowWait {
187            reason: Some("await-user".into()),
188            snapshot: FlowSnapshot {
189                flow_id: "flow.main".into(),
190                next_node: "node-2".into(),
191                state,
192            },
193        }
194    }
195
196    #[test]
197    fn derive_user_id_is_stable() {
198        let hint = "some-tenant::session-key";
199        let a = derive_user_id(hint).unwrap();
200        let b = derive_user_id(hint).unwrap();
201        assert_eq!(a, b);
202        assert!(a.as_str().starts_with("sess"));
203    }
204
205    #[test]
206    fn resume_store_roundtrip() -> GResult<()> {
207        let store = FlowResumeStore::new(new_session_store());
208        let envelope = sample_envelope();
209        assert!(store.fetch(&envelope)?.is_none());
210
211        let wait = sample_wait();
212        store.save(&envelope, &wait)?;
213        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
214        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
215        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
216
217        store.clear(&envelope)?;
218        assert!(store.fetch(&envelope)?.is_none());
219        Ok(())
220    }
221
222    #[test]
223    fn resume_store_overwrites_existing() -> GResult<()> {
224        let store = FlowResumeStore::new(new_session_store());
225        let envelope = sample_envelope();
226        let mut wait = sample_wait();
227        store.save(&envelope, &wait)?;
228
229        wait.snapshot.next_node = "node-3".into();
230        wait.reason = Some("retry".into());
231        store.save(&envelope, &wait)?;
232
233        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
234        assert_eq!(snapshot.next_node, "node-3");
235        store.clear(&envelope)?;
236        Ok(())
237    }
238
239    #[test]
240    fn canonicalize_populates_defaults() {
241        let envelope = IngressEnvelope {
242            tenant: "demo".into(),
243            env: None,
244            flow_id: "flow.main".into(),
245            flow_type: None,
246            action: None,
247            session_hint: None,
248            provider: None,
249            channel: None,
250            conversation: None,
251            user: None,
252            activity_id: Some("activity-1".into()),
253            timestamp: None,
254            payload: json!({}),
255            metadata: None,
256        }
257        .canonicalize();
258
259        assert_eq!(envelope.provider.as_deref(), Some("provider"));
260        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
261        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
262        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
263        assert!(envelope.session_hint.is_some());
264    }
265}
266
267pub struct StateMachineRuntime {
268    runner: Runner,
269}
270
271impl StateMachineRuntime {
272    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
273    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
274        let secrets = Arc::new(FnSecretsHost::new(|name| {
275            Err(RunnerError::Secrets {
276                reason: format!("secret {name} unavailable (noop host)"),
277            })
278        }));
279        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
280        let session = Arc::new(InMemorySessionHost::new());
281        let state = Arc::new(InMemoryStateHost::new());
282        let host = HostBundle::new(secrets, telemetry, session, state);
283
284        let adapters = AdapterRegistry::default();
285        let policy = Policy::default();
286
287        let mut builder = RunnerBuilder::new()
288            .with_host(host)
289            .with_adapters(adapters)
290            .with_policy(policy);
291        for flow in flows {
292            builder = builder.with_flow(flow);
293        }
294        let runner = builder.build()?;
295        Ok(Self { runner })
296    }
297
298    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
299    pub fn from_flow_engine(
300        config: Arc<HostConfig>,
301        engine: Arc<FlowEngine>,
302        session_host: Arc<dyn SessionHost>,
303        session_store: DynSessionStore,
304        state_host: Arc<dyn StateHost>,
305        mocks: Option<Arc<MockLayer>>,
306    ) -> Result<Self> {
307        let secrets_cfg = Arc::clone(&config);
308        let secrets = Arc::new(FnSecretsHost::new(move |name| {
309            if !secrets_cfg.secrets_policy.is_allowed(name) {
310                return Err(RunnerError::Secrets {
311                    reason: format!("secret {name} denied by policy"),
312                });
313            }
314            std::env::var(name).map_err(|_| RunnerError::Secrets {
315                reason: format!("secret {name} unavailable"),
316            })
317        }));
318        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
319            tracing::debug!(?span, ?fields, "telemetry emit");
320            Ok(())
321        }));
322        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
323        let resume_store = FlowResumeStore::new(session_store);
324
325        let mut adapters = AdapterRegistry::default();
326        adapters.register(
327            PACK_FLOW_ADAPTER,
328            Box::new(PackFlowAdapter::new(
329                Arc::clone(&config),
330                Arc::clone(&engine),
331                resume_store,
332                mocks,
333            )),
334        );
335
336        let flows = build_flow_definitions(engine.flows());
337        let mut builder = RunnerBuilder::new()
338            .with_host(host)
339            .with_adapters(adapters)
340            .with_policy(Policy::default());
341        for flow in flows {
342            builder = builder.with_flow(flow);
343        }
344        let runner = builder
345            .build()
346            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
347        Ok(Self { runner })
348    }
349
350    /// Execute the flow associated with the provided ingress event.
351    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
352        let tenant_ctx = envelope.tenant_ctx();
353        let session_hint = envelope
354            .session_hint
355            .clone()
356            .unwrap_or_else(|| envelope.canonical_session_hint());
357        let input =
358            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
359        let request = RunFlowRequest {
360            tenant: tenant_ctx,
361            flow_id: envelope.flow_id.clone(),
362            input,
363            session_hint: Some(session_hint),
364        };
365        let result: super::api::RunFlowResult = self
366            .runner
367            .run_flow(request)
368            .await
369            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
370        let outcome = result.outcome;
371        Ok(outcome.get("response").cloned().unwrap_or(outcome))
372    }
373}
374
375fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
376    flows
377        .iter()
378        .map(|descriptor| {
379            FlowDefinition::new(
380                super::api::FlowSummary {
381                    id: descriptor.id.clone(),
382                    name: descriptor
383                        .description
384                        .clone()
385                        .unwrap_or_else(|| descriptor.id.clone()),
386                    version: descriptor.version.clone(),
387                    description: descriptor.description.clone(),
388                },
389                serde_json::json!({
390                    "type": "object"
391                }),
392                vec![FlowStep::Adapter(AdapterCall {
393                    adapter: PACK_FLOW_ADAPTER.into(),
394                    operation: descriptor.id.clone(),
395                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
396                })],
397            )
398        })
399        .collect()
400}
401
402struct PackFlowAdapter {
403    tenant: String,
404    config: Arc<HostConfig>,
405    engine: Arc<FlowEngine>,
406    resume: FlowResumeStore,
407    mocks: Option<Arc<MockLayer>>,
408}
409
410impl PackFlowAdapter {
411    fn new(
412        config: Arc<HostConfig>,
413        engine: Arc<FlowEngine>,
414        resume: FlowResumeStore,
415        mocks: Option<Arc<MockLayer>>,
416    ) -> Self {
417        Self {
418            tenant: config.tenant.clone(),
419            config,
420            engine,
421            resume,
422            mocks,
423        }
424    }
425}
426
427#[async_trait::async_trait]
428impl Adapter for PackFlowAdapter {
429    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
430        let envelope: IngressEnvelope =
431            serde_json::from_value(call.payload.clone()).map_err(|err| {
432                RunnerError::AdapterCall {
433                    reason: format!("invalid ingress payload: {err}"),
434                }
435            })?;
436        let flow_id = call.operation.clone();
437        let action_owned = envelope.action.clone();
438        let session_owned = envelope
439            .session_hint
440            .clone()
441            .unwrap_or_else(|| envelope.canonical_session_hint());
442        let provider_owned = envelope.provider.clone();
443        let payload = envelope.payload.clone();
444        let retry_config = self.config.mcp_retry_config().into();
445
446        let mocks = self.mocks.as_deref();
447        let ctx = FlowContext {
448            tenant: &self.tenant,
449            flow_id: &flow_id,
450            node_id: None,
451            tool: None,
452            action: action_owned.as_deref(),
453            session_id: Some(session_owned.as_str()),
454            provider_id: provider_owned.as_deref(),
455            retry_config,
456            observer: None,
457            mocks,
458        };
459
460        let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
461            self.engine.resume(ctx, snapshot, payload).await
462        } else {
463            self.engine.execute(ctx, payload).await
464        }
465        .map_err(|err| RunnerError::AdapterCall {
466            reason: err.to_string(),
467        })?;
468
469        match execution.status {
470            FlowStatus::Completed => {
471                self.resume.clear(&envelope)?;
472                Ok(execution.output)
473            }
474            FlowStatus::Waiting(wait) => {
475                self.resume.save(&envelope, &wait)?;
476                Ok(json!({
477                    "status": "pending",
478                    "reason": wait.reason,
479                    "resume": wait.snapshot,
480                    "response": execution.output,
481                }))
482            }
483        }
484    }
485}
486
487#[derive(Clone, Debug, Serialize, Deserialize)]
488pub struct IngressEnvelope {
489    pub tenant: String,
490    #[serde(default, skip_serializing_if = "Option::is_none")]
491    pub env: Option<String>,
492    pub flow_id: String,
493    #[serde(default, skip_serializing_if = "Option::is_none")]
494    pub flow_type: Option<String>,
495    #[serde(default, skip_serializing_if = "Option::is_none")]
496    pub action: Option<String>,
497    #[serde(default, skip_serializing_if = "Option::is_none")]
498    pub session_hint: Option<String>,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    pub provider: Option<String>,
501    #[serde(default, skip_serializing_if = "Option::is_none")]
502    pub channel: Option<String>,
503    #[serde(default, skip_serializing_if = "Option::is_none")]
504    pub conversation: Option<String>,
505    #[serde(default, skip_serializing_if = "Option::is_none")]
506    pub user: Option<String>,
507    #[serde(default, skip_serializing_if = "Option::is_none")]
508    pub activity_id: Option<String>,
509    #[serde(default, skip_serializing_if = "Option::is_none")]
510    pub timestamp: Option<String>,
511    #[serde(default)]
512    pub payload: Value,
513    #[serde(default, skip_serializing_if = "Option::is_none")]
514    pub metadata: Option<Value>,
515}
516
517impl IngressEnvelope {
518    pub fn canonicalize(mut self) -> Self {
519        if self.provider.is_none() {
520            self.provider = Some("provider".into());
521        }
522        if self.channel.is_none() {
523            self.channel = Some(self.flow_id.clone());
524        }
525        if self.conversation.is_none() {
526            self.conversation = self.channel.clone();
527        }
528        if self.user.is_none() {
529            if let Some(ref hint) = self.session_hint {
530                self.user = Some(hint.clone());
531            } else if let Some(ref activity) = self.activity_id {
532                self.user = Some(activity.clone());
533            } else {
534                self.user = Some("user".into());
535            }
536        }
537        if self.session_hint.is_none() {
538            self.session_hint = Some(self.canonical_session_hint());
539        }
540        self
541    }
542
543    pub fn canonical_session_hint(&self) -> String {
544        format!(
545            "{}:{}:{}:{}:{}",
546            self.tenant,
547            self.provider.as_deref().unwrap_or("provider"),
548            self.channel.as_deref().unwrap_or("channel"),
549            self.conversation.as_deref().unwrap_or("conversation"),
550            self.user.as_deref().unwrap_or("user")
551        )
552    }
553
554    pub fn tenant_ctx(&self) -> TenantCtx {
555        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
556        let env = EnvId::from_str(env_raw.as_str())
557            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
558        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
559            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
560        });
561        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
562        if let Some(provider) = &self.provider {
563            ctx = ctx.with_provider(provider.clone());
564        }
565        if let Some(session) = &self.session_hint {
566            ctx = ctx.with_session(session.clone());
567        }
568        ctx
569    }
570}