greentic_runner_host/engine/
runtime.rs

1use std::str::FromStr;
2use std::sync::Arc;
3
4use anyhow::{Context, Result, anyhow};
5use async_trait::async_trait;
6use greentic_session::SessionData;
7use greentic_types::{
8    EnvId, FlowId, GreenticError, SessionCursor as TypesSessionCursor, TenantCtx, TenantId, UserId,
9};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12use sha2::{Digest, Sha256};
13
14use super::api::{RunFlowRequest, RunnerApi};
15use super::builder::{Runner, RunnerBuilder};
16use super::error::{GResult, RunnerError};
17use super::glue::{FnSecretsHost, FnTelemetryHost};
18use super::host::{HostBundle, SecretsHost, SessionHost, StateHost};
19use super::policy::Policy;
20use super::registry::{Adapter, AdapterCall, AdapterRegistry};
21use super::shims::{InMemorySessionHost, InMemoryStateHost};
22use super::state_machine::{FlowDefinition, FlowStep, PAYLOAD_FROM_LAST_INPUT};
23
24use crate::config::{HostConfig, SecretsPolicy};
25use crate::pack::FlowDescriptor;
26use crate::runner::engine::{FlowContext, FlowEngine, FlowSnapshot, FlowStatus, FlowWait};
27use crate::runner::mocks::MockLayer;
28use crate::secrets::DynSecretsManager;
29use crate::storage::session::DynSessionStore;
30
31const DEFAULT_ENV: &str = "local";
32const PACK_FLOW_ADAPTER: &str = "pack_flow";
33
34#[derive(Clone)]
35pub struct FlowResumeStore {
36    store: DynSessionStore,
37}
38
39impl FlowResumeStore {
40    pub fn new(store: DynSessionStore) -> Self {
41        Self { store }
42    }
43
44    fn fetch(&self, envelope: &IngressEnvelope) -> GResult<Option<FlowSnapshot>> {
45        let (mut ctx, user, _) = build_store_ctx(envelope)?;
46        ctx = ctx.with_user(Some(user.clone()));
47        if let Some((_key, data)) = self
48            .store
49            .find_by_user(&ctx, &user)
50            .map_err(map_store_error)?
51        {
52            let record: FlowResumeRecord =
53                serde_json::from_str(&data.context_json).map_err(|err| RunnerError::Session {
54                    reason: format!("failed to decode flow resume snapshot: {err}"),
55                })?;
56            if record.snapshot.flow_id == envelope.flow_id {
57                return Ok(Some(record.snapshot));
58            }
59        }
60        Ok(None)
61    }
62
63    fn save(&self, envelope: &IngressEnvelope, wait: &FlowWait) -> GResult<()> {
64        let (ctx, user, hint) = build_store_ctx(envelope)?;
65        let record = FlowResumeRecord {
66            snapshot: wait.snapshot.clone(),
67            reason: wait.reason.clone(),
68        };
69        let data = record_to_session_data(&record, ctx.clone(), &user, &hint)?;
70        let existing = self
71            .store
72            .find_by_user(&ctx, &user)
73            .map_err(map_store_error)?;
74        if let Some((key, _)) = existing {
75            self.store
76                .update_session(&key, data)
77                .map_err(map_store_error)?;
78        } else {
79            self.store
80                .create_session(&ctx, data)
81                .map_err(map_store_error)?;
82        }
83        Ok(())
84    }
85
86    fn clear(&self, envelope: &IngressEnvelope) -> GResult<()> {
87        let (ctx, user, _) = build_store_ctx(envelope)?;
88        if let Some((key, _)) = self
89            .store
90            .find_by_user(&ctx, &user)
91            .map_err(map_store_error)?
92        {
93            self.store.remove_session(&key).map_err(map_store_error)?;
94        }
95        Ok(())
96    }
97}
98
99#[derive(Serialize, Deserialize)]
100struct FlowResumeRecord {
101    snapshot: FlowSnapshot,
102    #[serde(default)]
103    reason: Option<String>,
104}
105
106fn build_store_ctx(envelope: &IngressEnvelope) -> GResult<(TenantCtx, UserId, String)> {
107    let hint = envelope
108        .session_hint
109        .clone()
110        .unwrap_or_else(|| envelope.canonical_session_hint());
111    let user = derive_user_id(&hint)?;
112    let mut ctx = envelope.tenant_ctx();
113    ctx = ctx.with_session(hint.clone());
114    ctx = ctx.with_user(Some(user.clone()));
115    Ok((ctx, user, hint))
116}
117
118fn record_to_session_data(
119    record: &FlowResumeRecord,
120    ctx: TenantCtx,
121    user: &UserId,
122    session_hint: &str,
123) -> GResult<SessionData> {
124    let flow = FlowId::from_str(record.snapshot.flow_id.as_str()).map_err(map_store_error)?;
125    let mut cursor = TypesSessionCursor::new(record.snapshot.next_node.clone());
126    if let Some(reason) = record.reason.clone() {
127        cursor = cursor.with_wait_reason(reason);
128    }
129    let context_json = serde_json::to_string(record).map_err(|err| RunnerError::Session {
130        reason: format!("failed to encode flow resume snapshot: {err}"),
131    })?;
132    let ctx = ctx
133        .with_user(Some(user.clone()))
134        .with_session(session_hint.to_string())
135        .with_flow(record.snapshot.flow_id.clone());
136    Ok(SessionData {
137        tenant_ctx: ctx,
138        flow_id: flow,
139        cursor,
140        context_json,
141    })
142}
143
144fn derive_user_id(hint: &str) -> GResult<UserId> {
145    let digest = Sha256::digest(hint.as_bytes());
146    let slug = format!("sess{}", hex::encode(&digest[..8]));
147    UserId::from_str(&slug).map_err(map_store_error)
148}
149
150fn map_store_error(err: GreenticError) -> RunnerError {
151    RunnerError::Session {
152        reason: err.to_string(),
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::runner::engine::ExecutionState;
160    use crate::storage::session::new_session_store;
161    use serde_json::json;
162
163    fn sample_envelope() -> IngressEnvelope {
164        IngressEnvelope {
165            tenant: "demo".into(),
166            env: Some("local".into()),
167            flow_id: "flow.main".into(),
168            flow_type: None,
169            action: Some("messaging".into()),
170            session_hint: Some("demo:provider:chan:conv:user".into()),
171            provider: Some("provider".into()),
172            channel: Some("chan".into()),
173            conversation: Some("conv".into()),
174            user: Some("user".into()),
175            activity_id: Some("act-1".into()),
176            timestamp: None,
177            payload: json!({ "text": "hi" }),
178            metadata: None,
179        }
180    }
181
182    fn sample_wait() -> FlowWait {
183        let state: ExecutionState = serde_json::from_value(json!({
184            "input": { "text": "hi" },
185            "nodes": {},
186            "egress": []
187        }))
188        .expect("state");
189        FlowWait {
190            reason: Some("await-user".into()),
191            snapshot: FlowSnapshot {
192                flow_id: "flow.main".into(),
193                next_node: "node-2".into(),
194                state,
195            },
196        }
197    }
198
199    #[test]
200    fn derive_user_id_is_stable() {
201        let hint = "some-tenant::session-key";
202        let a = derive_user_id(hint).unwrap();
203        let b = derive_user_id(hint).unwrap();
204        assert_eq!(a, b);
205        assert!(a.as_str().starts_with("sess"));
206    }
207
208    #[test]
209    fn resume_store_roundtrip() -> GResult<()> {
210        let store = FlowResumeStore::new(new_session_store());
211        let envelope = sample_envelope();
212        assert!(store.fetch(&envelope)?.is_none());
213
214        let wait = sample_wait();
215        store.save(&envelope, &wait)?;
216        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
217        assert_eq!(snapshot.flow_id, wait.snapshot.flow_id);
218        assert_eq!(snapshot.next_node, wait.snapshot.next_node);
219
220        store.clear(&envelope)?;
221        assert!(store.fetch(&envelope)?.is_none());
222        Ok(())
223    }
224
225    #[test]
226    fn resume_store_overwrites_existing() -> GResult<()> {
227        let store = FlowResumeStore::new(new_session_store());
228        let envelope = sample_envelope();
229        let mut wait = sample_wait();
230        store.save(&envelope, &wait)?;
231
232        wait.snapshot.next_node = "node-3".into();
233        wait.reason = Some("retry".into());
234        store.save(&envelope, &wait)?;
235
236        let snapshot = store.fetch(&envelope)?.expect("snapshot missing");
237        assert_eq!(snapshot.next_node, "node-3");
238        store.clear(&envelope)?;
239        Ok(())
240    }
241
242    #[test]
243    fn canonicalize_populates_defaults() {
244        let envelope = IngressEnvelope {
245            tenant: "demo".into(),
246            env: None,
247            flow_id: "flow.main".into(),
248            flow_type: None,
249            action: None,
250            session_hint: None,
251            provider: None,
252            channel: None,
253            conversation: None,
254            user: None,
255            activity_id: Some("activity-1".into()),
256            timestamp: None,
257            payload: json!({}),
258            metadata: None,
259        }
260        .canonicalize();
261
262        assert_eq!(envelope.provider.as_deref(), Some("provider"));
263        assert_eq!(envelope.channel.as_deref(), Some("flow.main"));
264        assert_eq!(envelope.conversation.as_deref(), Some("flow.main"));
265        assert_eq!(envelope.user.as_deref(), Some("activity-1"));
266        assert!(envelope.session_hint.is_some());
267    }
268}
269
270pub struct StateMachineRuntime {
271    runner: Runner,
272}
273
274impl StateMachineRuntime {
275    /// Construct a runtime from explicit flow definitions (legacy entrypoint used by tests/examples).
276    pub fn new(flows: Vec<FlowDefinition>) -> GResult<Self> {
277        let secrets = Arc::new(FnSecretsHost::new(|name| {
278            Err(RunnerError::Secrets {
279                reason: format!("secret {name} unavailable (noop host)"),
280            })
281        }));
282        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
283        let session = Arc::new(InMemorySessionHost::new());
284        let state = Arc::new(InMemoryStateHost::new());
285        let host = HostBundle::new(secrets, telemetry, session, state);
286
287        let adapters = AdapterRegistry::default();
288        let policy = Policy::default();
289
290        let mut builder = RunnerBuilder::new()
291            .with_host(host)
292            .with_adapters(adapters)
293            .with_policy(policy);
294        for flow in flows {
295            builder = builder.with_flow(flow);
296        }
297        let runner = builder.build()?;
298        Ok(Self { runner })
299    }
300
301    /// Build a state-machine runtime that proxies pack flows through the legacy FlowEngine.
302    pub fn from_flow_engine(
303        config: Arc<HostConfig>,
304        engine: Arc<FlowEngine>,
305        session_host: Arc<dyn SessionHost>,
306        session_store: DynSessionStore,
307        state_host: Arc<dyn StateHost>,
308        secrets_manager: DynSecretsManager,
309        mocks: Option<Arc<MockLayer>>,
310    ) -> Result<Self> {
311        let policy = Arc::new(config.secrets_policy.clone());
312        let secrets = Arc::new(PolicySecretsHost::new(policy, secrets_manager));
313        let telemetry = Arc::new(FnTelemetryHost::new(|span, fields| {
314            tracing::debug!(?span, ?fields, "telemetry emit");
315            Ok(())
316        }));
317        let host = HostBundle::new(secrets, telemetry, session_host, state_host);
318        let resume_store = FlowResumeStore::new(session_store);
319
320        let mut adapters = AdapterRegistry::default();
321        adapters.register(
322            PACK_FLOW_ADAPTER,
323            Box::new(PackFlowAdapter::new(
324                Arc::clone(&config),
325                Arc::clone(&engine),
326                resume_store,
327                mocks,
328            )),
329        );
330
331        let flows = build_flow_definitions(engine.flows());
332        let mut builder = RunnerBuilder::new()
333            .with_host(host)
334            .with_adapters(adapters)
335            .with_policy(Policy::default());
336        for flow in flows {
337            builder = builder.with_flow(flow);
338        }
339        let runner = builder
340            .build()
341            .map_err(|err| anyhow!("state machine init failed: {err}"))?;
342        Ok(Self { runner })
343    }
344
345    /// Execute the flow associated with the provided ingress event.
346    pub async fn handle(&self, envelope: IngressEnvelope) -> Result<Value> {
347        let tenant_ctx = envelope.tenant_ctx();
348        let session_hint = envelope
349            .session_hint
350            .clone()
351            .unwrap_or_else(|| envelope.canonical_session_hint());
352        let input =
353            serde_json::to_value(&envelope).context("failed to serialise ingress envelope")?;
354        let request = RunFlowRequest {
355            tenant: tenant_ctx,
356            flow_id: envelope.flow_id.clone(),
357            input,
358            session_hint: Some(session_hint),
359        };
360        let result: super::api::RunFlowResult = self
361            .runner
362            .run_flow(request)
363            .await
364            .map_err(|err| anyhow!("flow execution failed: {err}"))?;
365        let outcome = result.outcome;
366        Ok(outcome.get("response").cloned().unwrap_or(outcome))
367    }
368}
369
370struct PolicySecretsHost {
371    policy: Arc<SecretsPolicy>,
372    manager: DynSecretsManager,
373}
374
375impl PolicySecretsHost {
376    fn new(policy: Arc<SecretsPolicy>, manager: DynSecretsManager) -> Self {
377        Self { policy, manager }
378    }
379}
380
381#[async_trait]
382impl SecretsHost for PolicySecretsHost {
383    async fn get(&self, name: &str) -> GResult<String> {
384        if !self.policy.is_allowed(name) {
385            return Err(RunnerError::Secrets {
386                reason: format!("secret {name} denied by policy"),
387            });
388        }
389        let bytes = self
390            .manager
391            .read(name)
392            .await
393            .map_err(|err| RunnerError::Secrets {
394                reason: format!("secret {name} unavailable: {err}"),
395            })?;
396        String::from_utf8(bytes).map_err(|err| RunnerError::Secrets {
397            reason: format!("secret {name} not valid UTF-8: {err}"),
398        })
399    }
400}
401
402fn build_flow_definitions(flows: &[FlowDescriptor]) -> Vec<FlowDefinition> {
403    flows
404        .iter()
405        .map(|descriptor| {
406            FlowDefinition::new(
407                super::api::FlowSummary {
408                    id: descriptor.id.clone(),
409                    name: descriptor
410                        .description
411                        .clone()
412                        .unwrap_or_else(|| descriptor.id.clone()),
413                    version: descriptor.version.clone(),
414                    description: descriptor.description.clone(),
415                },
416                serde_json::json!({
417                    "type": "object"
418                }),
419                vec![FlowStep::Adapter(AdapterCall {
420                    adapter: PACK_FLOW_ADAPTER.into(),
421                    operation: descriptor.id.clone(),
422                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
423                })],
424            )
425        })
426        .collect()
427}
428
429struct PackFlowAdapter {
430    tenant: String,
431    config: Arc<HostConfig>,
432    engine: Arc<FlowEngine>,
433    resume: FlowResumeStore,
434    mocks: Option<Arc<MockLayer>>,
435}
436
437impl PackFlowAdapter {
438    fn new(
439        config: Arc<HostConfig>,
440        engine: Arc<FlowEngine>,
441        resume: FlowResumeStore,
442        mocks: Option<Arc<MockLayer>>,
443    ) -> Self {
444        Self {
445            tenant: config.tenant.clone(),
446            config,
447            engine,
448            resume,
449            mocks,
450        }
451    }
452}
453
454#[async_trait::async_trait]
455impl Adapter for PackFlowAdapter {
456    async fn call(&self, call: &AdapterCall) -> GResult<Value> {
457        let envelope: IngressEnvelope =
458            serde_json::from_value(call.payload.clone()).map_err(|err| {
459                RunnerError::AdapterCall {
460                    reason: format!("invalid ingress payload: {err}"),
461                }
462            })?;
463        let flow_id = call.operation.clone();
464        let action_owned = envelope.action.clone();
465        let session_owned = envelope
466            .session_hint
467            .clone()
468            .unwrap_or_else(|| envelope.canonical_session_hint());
469        let provider_owned = envelope.provider.clone();
470        let payload = envelope.payload.clone();
471        let retry_config = self.config.retry_config().into();
472
473        let mocks = self.mocks.as_deref();
474        let ctx = FlowContext {
475            tenant: &self.tenant,
476            flow_id: &flow_id,
477            node_id: None,
478            tool: None,
479            action: action_owned.as_deref(),
480            session_id: Some(session_owned.as_str()),
481            provider_id: provider_owned.as_deref(),
482            retry_config,
483            observer: None,
484            mocks,
485        };
486
487        let execution = if let Some(snapshot) = self.resume.fetch(&envelope)? {
488            self.engine.resume(ctx, snapshot, payload).await
489        } else {
490            self.engine.execute(ctx, payload).await
491        }
492        .map_err(|err| RunnerError::AdapterCall {
493            reason: err.to_string(),
494        })?;
495
496        match execution.status {
497            FlowStatus::Completed => {
498                self.resume.clear(&envelope)?;
499                Ok(execution.output)
500            }
501            FlowStatus::Waiting(wait) => {
502                self.resume.save(&envelope, &wait)?;
503                Ok(json!({
504                    "status": "pending",
505                    "reason": wait.reason,
506                    "resume": wait.snapshot,
507                    "response": execution.output,
508                }))
509            }
510        }
511    }
512}
513
514#[derive(Clone, Debug, Serialize, Deserialize)]
515pub struct IngressEnvelope {
516    pub tenant: String,
517    #[serde(default, skip_serializing_if = "Option::is_none")]
518    pub env: Option<String>,
519    pub flow_id: String,
520    #[serde(default, skip_serializing_if = "Option::is_none")]
521    pub flow_type: Option<String>,
522    #[serde(default, skip_serializing_if = "Option::is_none")]
523    pub action: Option<String>,
524    #[serde(default, skip_serializing_if = "Option::is_none")]
525    pub session_hint: Option<String>,
526    #[serde(default, skip_serializing_if = "Option::is_none")]
527    pub provider: Option<String>,
528    #[serde(default, skip_serializing_if = "Option::is_none")]
529    pub channel: Option<String>,
530    #[serde(default, skip_serializing_if = "Option::is_none")]
531    pub conversation: Option<String>,
532    #[serde(default, skip_serializing_if = "Option::is_none")]
533    pub user: Option<String>,
534    #[serde(default, skip_serializing_if = "Option::is_none")]
535    pub activity_id: Option<String>,
536    #[serde(default, skip_serializing_if = "Option::is_none")]
537    pub timestamp: Option<String>,
538    #[serde(default)]
539    pub payload: Value,
540    #[serde(default, skip_serializing_if = "Option::is_none")]
541    pub metadata: Option<Value>,
542}
543
544impl IngressEnvelope {
545    pub fn canonicalize(mut self) -> Self {
546        if self.provider.is_none() {
547            self.provider = Some("provider".into());
548        }
549        if self.channel.is_none() {
550            self.channel = Some(self.flow_id.clone());
551        }
552        if self.conversation.is_none() {
553            self.conversation = self.channel.clone();
554        }
555        if self.user.is_none() {
556            if let Some(ref hint) = self.session_hint {
557                self.user = Some(hint.clone());
558            } else if let Some(ref activity) = self.activity_id {
559                self.user = Some(activity.clone());
560            } else {
561                self.user = Some("user".into());
562            }
563        }
564        if self.session_hint.is_none() {
565            self.session_hint = Some(self.canonical_session_hint());
566        }
567        self
568    }
569
570    pub fn canonical_session_hint(&self) -> String {
571        format!(
572            "{}:{}:{}:{}:{}",
573            self.tenant,
574            self.provider.as_deref().unwrap_or("provider"),
575            self.channel.as_deref().unwrap_or("channel"),
576            self.conversation.as_deref().unwrap_or("conversation"),
577            self.user.as_deref().unwrap_or("user")
578        )
579    }
580
581    pub fn tenant_ctx(&self) -> TenantCtx {
582        let env_raw = self.env.clone().unwrap_or_else(|| DEFAULT_ENV.into());
583        let env = EnvId::from_str(env_raw.as_str())
584            .unwrap_or_else(|_| EnvId::from_str(DEFAULT_ENV).expect("default env must be valid"));
585        let tenant_id = TenantId::from_str(self.tenant.as_str()).unwrap_or_else(|_| {
586            TenantId::from_str("tenant.default").expect("tenant fallback must be valid")
587        });
588        let mut ctx = TenantCtx::new(env, tenant_id).with_flow(self.flow_id.clone());
589        if let Some(provider) = &self.provider {
590            ctx = ctx.with_provider(provider.clone());
591        }
592        if let Some(session) = &self.session_hint {
593            ctx = ctx.with_session(session.clone());
594        }
595        ctx
596    }
597}