greentic_runner_host/engine/
runtime.rs

1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::SessionData;
7use greentic_types::{
8    EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12use sha2::{Digest, Sha256};
13
14use super::api::{RunFlowRequest, RunnerApi};
15use super::builder::{Runner, RunnerBuilder};
16use super::error::{GResult, RunnerError};
17use super::glue::{FnSecretsHost, FnTelemetryHost};
18use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
19use super::policy::Policy;
20use super::registry::{Adapter, AdapterCall, AdapterRegistry};
21use super::shims::{InMemorySessionHost, InMemoryStateHost};
22use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
23
24use crate::config::{HostConfig, SecretsPolicy};
25use crate::pack::FlowDescriptor;
26use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
27use crate::runner::mocks::MockLayer;
28use crate::secrets::DynSecretsManager;
29use crate::storage::session::DynSessionStore;
30
31const DEFAULT_ENV: &str = "local";
32const PACK_FLOW_ADAPTER: &str = "pack_flow";
33
34#[derive(Clone)]
35pub struct FlowResumeStore {
36    store: DynSessionStore,
37}
38
39impl FlowResumeStore {
40    pub fn new(store: DynSessionStore) -> Self {
41        Self { store }
42    }
43
44    fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
45        let (mut ctx, user, _) = build_store_ctx(envelope)?;
46        ctx = ctx.with_user(Some(user.clone()));
47        if let Some((_key, data)) = self
48            .store
49            .find_by_user(&ctx, &user)
50            .map_err(map_store_error)?
51        {
52            let record: FlowResumeRecord =
53                serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
54                    reason: format!("failed to decode flow resume snapshot: {err}"),
55                })?;
56            if record.snapshot.flow_id == envelope.flow_id {
57                return Ok(Some(record.snapshot));
58            }
59        }
60        Ok(None)
61    }
62
63    fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
64        let (ctx, user, hint) = build_store_ctx(envelope)?;
65        let record = FlowResumeRecord {
66            snapshot: wait.snapshot.clone(),
67            reason: wait.reason.clone(),
68        };
69        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
70        let existing = self
71            .store
72            .find_by_user(&ctx, &user)
73            .map_err(map_store_error)?;
74        if let Some((key, _)) = existing {
75            self.store
76                .update_session(&key, data)
77                .map_err(map_store_error)?;
78        } else {
79            self.store
80                .create_session(&ctx, data)
81                .map_err(map_store_error)?;
82        }
83        Ok(())
84    }
85
86    fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
87        let (ctx, user, _) = build_store_ctx(envelope)?;
88        if let Some((key, _)) = self
89            .store
90            .find_by_user(&ctx, &user)
91            .map_err(map_store_error)?
92        {
93            self.store.remove_session(&key).map_err(map_store_error)?;
94        }
95        Ok(())
96    }
97}
98
99#[derive(Serialize, Deserialize)]
100struct FlowResumeRecord {
101    snapshot: FlowSnapshot,
102    #[serde(default)]
103    reason: Option<String>,
104}
105
106fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
107    let hint = envelope
108        .session_hint
109        .clone()
110        .unwrap_or_else(|| envelope.canonical_session_hint());
111    let user = derive_user_id(&hint)?;
112    let mut ctx = envelope.tenant_ctx();
113    ctx = ctx.with_session(hint.clone());
114    Ok((ctx, user, hint))
115}
116
117fn record_to_session_data(
118    record: &FlowResumeRecord,
119    ctx: TenantCtx,
120    user: &UserId,
121    session_hint: &str,
122) -> GResult<SessionData> {
123    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
124    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
125    if let Some(reason) = record.reason.clone() {
126        cursor = cursor.with_wait_reason(reason);
127    }
128    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
129        reason: format!("failed to encode flow resume snapshot: {err}"),
130    })?;
131    let ctx = ctx
132        .with_user(Some(user.clone()))
133        .with_session(session_hint.to_string())
134        .with_flow(record.snapshot.flow_id.clone());
135    Ok(SessionData {
136        tenant_ctx: ctx,
137        flow_id: flow,
138        cursor,
139        context_json,
140    })
141}
142
143fn derive_user_id(hint: &str) -> GResult<UserId> {
144    let digest = Sha256::digest(hint.as_bytes());
145    let slug = format!("sess{}", hex::encode(&digest[..8]));
146    UserId::from_str(&slug).map_err(map_store_error)
147}
148
149fn map_store_error(err: GreenticError) -> RunnerError {
150    RunnerError::Session {
151        reason: err.to_string(),
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::runner::engine::ExecutionState;
159    use crate::storage::session::new_session_store;
160    use serde_json::json;
161
162    fn sample_envelope() -> IngressEnvelope {
163        IngressEnvelope {
164            tenant: "demo".into(),
165            env: Some("local".into()),
166            flow_id: "flow.main".into(),
167            flow_type: None,
168            action: Some("messaging".into()),
169            session_hint: Some("demo:provider:chan:conv:user".into()),
170            provider: Some("provider".into()),
171            channel: Some("chan".into()),
172            conversation: Some("conv".into()),
173            user: Some("user".into()),
174            activity_id: Some("act-1".into()),
175            timestamp: None,
176            payload: json!({ "text": "hi" }),
177            metadata: None,
178        }
179    }
180
181    fn sample_wait() -> FlowWait {
182        let state: ExecutionState = serde_json::from_value(json!({
183            "input": { "text": "hi" },
184            "nodes": {},
185            "egress": []
186        }))
187        .expect("state");
188        FlowWait {
189            reason: Some("await-user".into()),
190            snapshot: FlowSnapshot {
191                flow_id: "flow.main".into(),
192                next_node: "node-2".into(),
193                state,
194            },
195        }
196    }
197
198    #[test]
199    fn derive_user_id_is_stable() {
200        let hint = "some-tenant::session-key";
201        let a = derive_user_id(hint).unwrap();
202        let b = derive_user_id(hint).unwrap();
203        assert_eq!(a, b);
204        assert!(a.as_str().starts_with("sess"));
205    }
206
207    #[test]
208    fn resume_store_roundtrip() -> GResult<()> {
209        let store = FlowResumeStore::new(new_session_store());
210        let envelope = sample_envelope();
211        assert!(store.fetch(&envelope)?.is_none());
212
213        let wait = sample_wait();
214        store.save(&envelope, &wait)?;
215        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
216        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
217        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
218
219        store.clear(&envelope)?;
220        assert!(store.fetch(&envelope)?.is_none());
221        Ok(())
222    }
223
224    #[test]
225    fn resume_store_overwrites_existing() -> GResult<()> {
226        let store = FlowResumeStore::new(new_session_store());
227        let envelope = sample_envelope();
228        let mut wait = sample_wait();
229        store.save(&envelope, &wait)?;
230
231        wait.snapshot.next_node = "node-3".into();
232        wait.reason = Some("retry".into());
233        store.save(&envelope, &wait)?;
234
235        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
236        assert_eq!(snapshot.next_node, "node-3");
237        store.clear(&envelope)?;
238        Ok(())
239    }
240
241    #[test]
242    fn canonicalize_populates_defaults() {
243        let envelope = IngressEnvelope {
244            tenant: "demo".into(),
245            env: None,
246            flow_id: "flow.main".into(),
247            flow_type: None,
248            action: None,
249            session_hint: None,
250            provider: None,
251            channel: None,
252            conversation: None,
253            user: None,
254            activity_id: Some("activity-1".into()),
255            timestamp: None,
256            payload: json!({}),
257            metadata: None,
258        }
259        .canonicalize();
260
261        assert_eq!(envelope.provider.as_deref(), Some("provider"));
262        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
263        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
264        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
265        assert!(envelope.session_hint.is_some());
266    }
267}
268
269pub struct StateMachineRuntime {
270    runner: Runner,
271}
272
273impl StateMachineRuntime {
274    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
275    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
276        let secrets = Arc::new(FnSecretsHost::new(|name| {
277            Err(RunnerError::Secrets {
278                reason: format!("secret {name} unavailable (noop host)"),
279            })
280        }));
281        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
282        let session = Arc::new(InMemorySessionHost::new());
283        let state = Arc::new(InMemoryStateHost::new());
284        let host = HostBundle::new(secrets, telemetry, session, state);
285
286        let adapters = AdapterRegistry::default();
287        let policy = Policy::default();
288
289        let mut builder = RunnerBuilder::new()
290            .with_host(host)
291            .with_adapters(adapters)
292            .with_policy(policy);
293        for flow in flows {
294            builder = builder.with_flow(flow);
295        }
296        let runner = builder.build()?;
297        Ok(Self { runner })
298    }
299
300    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
301    pub fn from_flow_engine(
302        config: Arc<HostConfig>,
303        engine: Arc<FlowEngine>,
304        session_host: Arc<dyn SessionHost>,
305        session_store: DynSessionStore,
306        state_host: Arc<dyn StateHost>,
307        secrets_manager: DynSecretsManager,
308        mocks: Option<Arc<MockLayer>>,
309    ) -> Result<Self> {
310        let policy = Arc::new(config.secrets_policy.clone());
311        let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager));
312        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
313            tracing::debug!(?span, ?fields, "telemetry emit");
314            Ok(())
315        }));
316        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
317        let resume_store = FlowResumeStore::new(session_store);
318
319        let mut adapters = AdapterRegistry::default();
320        adapters.register(
321            PACK_FLOW_ADAPTER,
322            Box::new(PackFlowAdapter::new(
323                Arc::clone(&config),
324                Arc::clone(&engine),
325                resume_store,
326                mocks,
327            )),
328        );
329
330        let flows = build_flow_definitions(engine.flows());
331        let mut builder = RunnerBuilder::new()
332            .with_host(host)
333            .with_adapters(adapters)
334            .with_policy(Policy::default());
335        for flow in flows {
336            builder = builder.with_flow(flow);
337        }
338        let runner = builder
339            .build()
340            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
341        Ok(Self { runner })
342    }
343
344    /// Execute the flow associated with the provided ingress event.
345    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
346        let tenant_ctx = envelope.tenant_ctx();
347        let session_hint = envelope
348            .session_hint
349            .clone()
350            .unwrap_or_else(|| envelope.canonical_session_hint());
351        let input =
352            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
353        let request = RunFlowRequest {
354            tenant: tenant_ctx,
355            flow_id: envelope.flow_id.clone(),
356            input,
357            session_hint: Some(session_hint),
358        };
359        let result: super::api::RunFlowResult = self
360            .runner
361            .run_flow(request)
362            .await
363            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
364        let outcome = result.outcome;
365        Ok(outcome.get("response").cloned().unwrap_or(outcome))
366    }
367}
368
369struct PolicySecretsHost {
370    policy: Arc<SecretsPolicy>,
371    manager: DynSecretsManager,
372}
373
374impl PolicySecretsHost {
375    fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager) -> Self {
376        Self { policy, manager }
377    }
378}
379
380#[async_trait]
381impl SecretsHost for PolicySecretsHost {
382    async fn get(&self, name: &str) -> GResult<String> {
383        if !self.policy.is_allowed(name) {
384            return Err(RunnerError::Secrets {
385                reason: format!("secret {name} denied by policy"),
386            });
387        }
388        let bytes = self
389            .manager
390            .read(name)
391            .await
392            .map_err(|err| RunnerError::Secrets {
393                reason: format!("secret {name} unavailable: {err}"),
394            })?;
395        String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
396            reason: format!("secret {name} not valid UTF-8: {err}"),
397        })
398    }
399}
400
401fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
402    flows
403        .iter()
404        .map(|descriptor| {
405            FlowDefinition::new(
406                super::api::FlowSummary {
407                    id: descriptor.id.clone(),
408                    name: descriptor
409                        .description
410                        .clone()
411                        .unwrap_or_else(|| descriptor.id.clone()),
412                    version: descriptor.version.clone(),
413                    description: descriptor.description.clone(),
414                },
415                serde_json::json!({
416                    "type": "object"
417                }),
418                vec![FlowStep::Adapter(AdapterCall {
419                    adapter: PACK_FLOW_ADAPTER.into(),
420                    operation: descriptor.id.clone(),
421                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
422                })],
423            )
424        })
425        .collect()
426}
427
428struct PackFlowAdapter {
429    tenant: String,
430    config: Arc<HostConfig>,
431    engine: Arc<FlowEngine>,
432    resume: FlowResumeStore,
433    mocks: Option<Arc<MockLayer>>,
434}
435
436impl PackFlowAdapter {
437    fn new(
438        config: Arc<HostConfig>,
439        engine: Arc<FlowEngine>,
440        resume: FlowResumeStore,
441        mocks: Option<Arc<MockLayer>>,
442    ) -> Self {
443        Self {
444            tenant: config.tenant.clone(),
445            config,
446            engine,
447            resume,
448            mocks,
449        }
450    }
451}
452
453#[async_trait::async_trait]
454impl Adapter for PackFlowAdapter {
455    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
456        let envelope: IngressEnvelope =
457            serde_json::from_value(call.payload.clone()).map_err(|err| {
458                RunnerError::AdapterCall {
459                    reason: format!("invalid ingress payload: {err}"),
460                }
461            })?;
462        let flow_id = call.operation.clone();
463        let action_owned = envelope.action.clone();
464        let session_owned = envelope
465            .session_hint
466            .clone()
467            .unwrap_or_else(|| envelope.canonical_session_hint());
468        let provider_owned = envelope.provider.clone();
469        let payload = envelope.payload.clone();
470        let retry_config = self.config.mcp_retry_config().into();
471
472        let mocks = self.mocks.as_deref();
473        let ctx = FlowContext {
474            tenant: &self.tenant,
475            flow_id: &flow_id,
476            node_id: None,
477            tool: None,
478            action: action_owned.as_deref(),
479            session_id: Some(session_owned.as_str()),
480            provider_id: provider_owned.as_deref(),
481            retry_config,
482            observer: None,
483            mocks,
484        };
485
486        let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
487            self.engine.resume(ctx, snapshot, payload).await
488        } else {
489            self.engine.execute(ctx, payload).await
490        }
491        .map_err(|err| RunnerError::AdapterCall {
492            reason: err.to_string(),
493        })?;
494
495        match execution.status {
496            FlowStatus::Completed => {
497                self.resume.clear(&envelope)?;
498                Ok(execution.output)
499            }
500            FlowStatus::Waiting(wait) => {
501                self.resume.save(&envelope, &wait)?;
502                Ok(json!({
503                    "status": "pending",
504                    "reason": wait.reason,
505                    "resume": wait.snapshot,
506                    "response": execution.output,
507                }))
508            }
509        }
510    }
511}
512
513#[derive(Clone, Debug, Serialize, Deserialize)]
514pub struct IngressEnvelope {
515    pub tenant: String,
516    #[serde(default, skip_serializing_if = "Option::is_none")]
517    pub env: Option<String>,
518    pub flow_id: String,
519    #[serde(default, skip_serializing_if = "Option::is_none")]
520    pub flow_type: Option<String>,
521    #[serde(default, skip_serializing_if = "Option::is_none")]
522    pub action: Option<String>,
523    #[serde(default, skip_serializing_if = "Option::is_none")]
524    pub session_hint: Option<String>,
525    #[serde(default, skip_serializing_if = "Option::is_none")]
526    pub provider: Option<String>,
527    #[serde(default, skip_serializing_if = "Option::is_none")]
528    pub channel: Option<String>,
529    #[serde(default, skip_serializing_if = "Option::is_none")]
530    pub conversation: Option<String>,
531    #[serde(default, skip_serializing_if = "Option::is_none")]
532    pub user: Option<String>,
533    #[serde(default, skip_serializing_if = "Option::is_none")]
534    pub activity_id: Option<String>,
535    #[serde(default, skip_serializing_if = "Option::is_none")]
536    pub timestamp: Option<String>,
537    #[serde(default)]
538    pub payload: Value,
539    #[serde(default, skip_serializing_if = "Option::is_none")]
540    pub metadata: Option<Value>,
541}
542
543impl IngressEnvelope {
544    pub fn canonicalize(mut self) -> Self {
545        if self.provider.is_none() {
546            self.provider = Some("provider".into());
547        }
548        if self.channel.is_none() {
549            self.channel = Some(self.flow_id.clone());
550        }
551        if self.conversation.is_none() {
552            self.conversation = self.channel.clone();
553        }
554        if self.user.is_none() {
555            if let Some(ref hint) = self.session_hint {
556                self.user = Some(hint.clone());
557            } else if let Some(ref activity) = self.activity_id {
558                self.user = Some(activity.clone());
559            } else {
560                self.user = Some("user".into());
561            }
562        }
563        if self.session_hint.is_none() {
564            self.session_hint = Some(self.canonical_session_hint());
565        }
566        self
567    }
568
569    pub fn canonical_session_hint(&self) -> String {
570        format!(
571            "{}:{}:{}:{}:{}",
572            self.tenant,
573            self.provider.as_deref().unwrap_or("provider"),
574            self.channel.as_deref().unwrap_or("channel"),
575            self.conversation.as_deref().unwrap_or("conversation"),
576            self.user.as_deref().unwrap_or("user")
577        )
578    }
579
580    pub fn tenant_ctx(&self) -> TenantCtx {
581        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
582        let env = EnvId::from_str(env_raw.as_str())
583            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
584        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
585            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
586        });
587        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
588        if let Some(provider) = &self.provider {
589            ctx = ctx.with_provider(provider.clone());
590        }
591        if let Some(session) = &self.session_hint {
592            ctx = ctx.with_session(session.clone());
593        }
594        ctx
595    }
596}