greentic_runner_host/engine/
state_machine.rs

1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4    HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct FlowDefinition {
24    pub summary: FlowSummary,
25    pub schema: Value,
26    pub steps: Vec<FlowStep>,
27}
28
29impl FlowDefinition {
30    pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
31        Self {
32            summary,
33            schema,
34            steps,
35        }
36    }
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub enum FlowStep {
41    Adapter(AdapterCall),
42    AwaitInput { reason: String },
43    Complete { outcome: Value },
44}
45
46pub struct StateMachine {
47    host: Arc<HostBundle>,
48    adapters: AdapterRegistry,
49    policy: Policy,
50    flows: Arc<RwLock<HashMap<String, FlowDefinition>>>,
51}
52
53impl StateMachine {
54    pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
55        Self {
56            host,
57            adapters,
58            policy,
59            flows: Arc::new(RwLock::new(HashMap::new())),
60        }
61    }
62
63    pub fn register_flow(&self, definition: FlowDefinition) {
64        let mut guard = self.flows.write();
65        guard.insert(definition.summary.id.clone(), definition);
66    }
67
68    pub fn list_flows(&self) -> Vec<FlowSummary> {
69        let guard = self.flows.read();
70        guard.values().map(|flow| flow.summary.clone()).collect()
71    }
72
73    pub fn get_flow_schema(&self, flow_id: &str) -> GResult<FlowSchema> {
74        let guard = self.flows.read();
75        guard
76            .get(flow_id)
77            .map(|flow| FlowSchema {
78                id: flow_id.to_string(),
79                schema_json: flow.schema.clone(),
80            })
81            .ok_or_else(|| RunnerError::FlowNotFound {
82                flow_id: flow_id.to_string(),
83            })
84    }
85
86    pub async fn step(
87        &self,
88        tenant: &TenantCtx,
89        flow_id: &str,
90        session_hint: Option<String>,
91        input: Value,
92    ) -> GResult<Value> {
93        let mut telemetry_ctx = tenant
94            .clone()
95            .with_provider(PROVIDER_ID.to_string())
96            .with_flow(flow_id.to_string());
97        if let Some(hint) = session_hint.as_ref() {
98            telemetry_ctx = telemetry_ctx.with_session(hint.clone());
99        }
100        greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
101
102        let flow = {
103            let guard = self.flows.read();
104            guard
105                .get(flow_id)
106                .cloned()
107                .ok_or_else(|| RunnerError::FlowNotFound {
108                    flow_id: flow_id.to_string(),
109                })?
110        };
111
112        self.ensure_policy_budget(&flow)?;
113
114        let key = SessionKey::new(tenant, flow_id, session_hint.clone());
115        let session_host = &self.host.session;
116        let mut session = match session_host.get(&key).await? {
117            Some(snapshot) => snapshot,
118            None => {
119                let session_id = key
120                    .stable_session_id()
121                    .unwrap_or_else(Self::generate_session_id);
122                SessionSnapshot::new(key.clone(), session_id)
123            }
124        };
125        let is_new = session.revision == 0 && session.outbox.is_empty();
126        let expected_revision = session.revision;
127
128        Self::update_state_input(&mut session, input.clone());
129
130        if session.waiting.is_some() {
131            if session.cursor.position < flow.steps.len()
132                && matches!(
133                    flow.steps.get(session.cursor.position),
134                    Some(FlowStep::AwaitInput { .. })
135                )
136            {
137                session.cursor.position = session.cursor.position.saturating_add(1);
138            }
139            session.waiting = None;
140        }
141
142        let outcome = loop {
143            if session.cursor.position >= flow.steps.len() {
144                let outcome = session
145                    .last_outcome
146                    .clone()
147                    .unwrap_or_else(|| json!({"status": "done"}));
148                break outcome;
149            }
150
151            let step = flow
152                .steps
153                .get(session.cursor.position)
154                .cloned()
155                .ok_or_else(|| RunnerError::FlowNotFound {
156                    flow_id: flow.summary.id.clone(),
157                })?;
158
159            match step {
160                FlowStep::Adapter(call) => {
161                    let outcome = self
162                        .execute_adapter_step(&flow, &mut session, call, tenant)
163                        .await?;
164                    session.last_outcome = Some(outcome.clone());
165                    continue;
166                }
167                FlowStep::AwaitInput { reason } => {
168                    let last_response = session
169                        .last_outcome
170                        .as_ref()
171                        .and_then(|value| value.get("response"))
172                        .cloned();
173                    session.waiting = Some(WaitState {
174                        reason: reason.clone(),
175                        recorded_at: SystemTime::now(),
176                    });
177                    let mut pending = json!({
178                        "status": "pending",
179                        "reason": reason,
180                    });
181                    if let Some(response) = last_response
182                        && let Some(obj) = pending.as_object_mut()
183                    {
184                        obj.insert("response".into(), response);
185                    }
186                    session.last_outcome = Some(pending.clone());
187                    break pending;
188                }
189                FlowStep::Complete { outcome } => {
190                    session.cursor.position = flow.steps.len();
191                    session.last_outcome = Some(json!({
192                        "status": "done",
193                        "result": outcome,
194                    }));
195                    break session
196                        .last_outcome
197                        .clone()
198                        .expect("complete step should set outcome");
199                }
200            }
201        };
202
203        self.host
204            .state
205            .set_json(&session.key, session.state.clone())
206            .await?;
207
208        if is_new {
209            session_host.put(session).await?;
210        } else if !session_host.update_cas(session, expected_revision).await? {
211            return Err(RunnerError::Session {
212                reason: "compare-and-swap failure".into(),
213            });
214        }
215
216        Ok(outcome)
217    }
218
219    fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
220        if flow.steps.len() > self.policy.max_egress_adapters {
221            return Err(policy_violation(format!(
222                "flow has {} steps exceeding budget {}",
223                flow.steps.len(),
224                self.policy.max_egress_adapters
225            )));
226        }
227        Ok(())
228    }
229
230    async fn execute_adapter_step(
231        &self,
232        flow: &FlowDefinition,
233        session: &mut SessionSnapshot,
234        call: AdapterCall,
235        tenant: &TenantCtx,
236    ) -> GResult<Value> {
237        let adapter =
238            self.adapters
239                .get(&call.adapter)
240                .ok_or_else(|| RunnerError::AdapterMissing {
241                    adapter: call.adapter.clone(),
242                })?;
243
244        let resolved_payload = resolve_adapter_payload(&call.payload, session);
245        let payload_bytes =
246            serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
247                reason: err.to_string(),
248            })?;
249
250        if payload_bytes.len() > self.policy.max_payload_bytes {
251            return Err(policy_violation(format!(
252                "payload exceeds max size {} bytes",
253                self.policy.max_payload_bytes
254            )));
255        }
256
257        let seq = session.cursor.outbox_seq;
258        let payload_hash = Self::stable_hash(seq, &payload_bytes);
259        let key = OutboxKey::new(seq, payload_hash.clone());
260
261        let adapter_id = call.adapter.clone();
262        let operation_id = call.operation.clone();
263        let span = SpanContext {
264            trace_id: tenant.trace_id.clone(),
265            span_id: Some(format!("{}:{}", flow.summary.id, seq)),
266        };
267
268        if let Some(entry) = session.outbox.get(&key) {
269            self.host
270                .telemetry
271                .emit(
272                    &span,
273                    &[
274                        ("adapter", adapter_id.as_str()),
275                        ("operation", operation_id.as_str()),
276                        ("dedup", "hit"),
277                    ],
278                )
279                .await?;
280            session.cursor.position += 1;
281            session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
282            session.waiting = None;
283            session.last_outcome = Some(json!({
284                "status": "done",
285                "response": entry.response.clone(),
286            }));
287            return Ok(session.last_outcome.clone().unwrap());
288        }
289
290        self.host
291            .telemetry
292            .emit(
293                &span,
294                &[
295                    ("adapter", adapter_id.as_str()),
296                    ("operation", operation_id.as_str()),
297                    ("phase", "start"),
298                ],
299            )
300            .await?;
301
302        let adapter_clone = adapter.clone();
303        let mut call_clone = call.clone();
304        call_clone.payload = resolved_payload.clone();
305        let response = retry_with_jitter(&self.policy.retry, || {
306            let adapter = adapter_clone.clone();
307            let call = call_clone.clone();
308            async move { adapter.call(&call).await }
309        })
310        .await?;
311
312        session.cursor.position += 1;
313        session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
314        session.waiting = None;
315        session.outbox.insert(
316            key,
317            SessionOutboxEntry {
318                seq,
319                hash: payload_hash,
320                response: response.clone(),
321            },
322        );
323        Self::update_state_adapter(session, &call, &response);
324        session.last_outcome = Some(json!({
325            "status": "done",
326            "response": response.clone(),
327        }));
328
329        self.host
330            .telemetry
331            .emit(
332                &span,
333                &[
334                    ("adapter", adapter_id.as_str()),
335                    ("operation", operation_id.as_str()),
336                    ("phase", "finish"),
337                ],
338            )
339            .await?;
340
341        Ok(session.last_outcome.clone().unwrap())
342    }
343
344    fn update_state_input(session: &mut SessionSnapshot, input: Value) {
345        if !matches!(session.state, Value::Object(_)) {
346            session.state = Value::Object(Map::new());
347        }
348        if let Value::Object(map) = &mut session.state {
349            map.insert("last_input".to_string(), input);
350        }
351    }
352
353    fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
354        if !matches!(session.state, Value::Object(_)) {
355            session.state = Value::Object(Map::new());
356        }
357        if let Value::Object(map) = &mut session.state {
358            map.insert(
359                "last_adapter".to_string(),
360                Value::String(call.adapter.clone()),
361            );
362            map.insert(
363                "last_operation".to_string(),
364                Value::String(call.operation.clone()),
365            );
366            map.insert("last_response".to_string(), response.clone());
367        }
368    }
369
370    fn stable_hash(seq: u64, payload: &[u8]) -> String {
371        let mut hasher = Sha256::new();
372        hasher.update(seq.to_be_bytes());
373        hasher.update(payload);
374        hex::encode(hasher.finalize())
375    }
376
377    fn generate_session_id() -> String {
378        let mut rng = rng();
379        let value: u128 = rng.random();
380        format!("sess-{value:032x}")
381    }
382}
383
384fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
385    match call_payload {
386        Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
387            .state
388            .as_object()
389            .and_then(|map| map.get("last_input"))
390            .cloned()
391            .unwrap_or(Value::Null),
392        other => other.clone(),
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
400    use crate::engine::host::{SessionHost, StateHost};
401    use crate::engine::registry::Adapter;
402    use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
403    use async_trait::async_trait;
404    use greentic_types::{EnvId, TenantId};
405    use parking_lot::Mutex;
406    use std::str::FromStr;
407
408    #[tokio::test]
409    async fn pauses_and_resumes_after_wait_step() {
410        let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
411        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
412        let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
413        let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
414        let host = Arc::new(HostBundle::new(
415            secrets,
416            telemetry,
417            Arc::clone(&session_store) as Arc<dyn SessionHost>,
418            Arc::clone(&state_store) as Arc<dyn StateHost>,
419        ));
420
421        let adapter = MockChatAdapter::default();
422        let mut adapters = AdapterRegistry::default();
423        adapters.register("mock", Box::new(adapter.clone()));
424        let policy = Policy::default();
425
426        let sm = StateMachine::new(host, adapters, policy);
427        sm.register_flow(test_flow());
428
429        let env = EnvId::from_str("local").unwrap();
430        let tenant_id = TenantId::from_str("demo").unwrap();
431        let tenant_ctx = TenantCtx::new(env, tenant_id);
432        let session_hint = Some("demo:telegram:chat:user".to_string());
433
434        let first = sm
435            .step(
436                &tenant_ctx,
437                "support.flow",
438                session_hint.clone(),
439                json!({ "text": "hi" }),
440            )
441            .await
442            .expect("first step");
443        assert_eq!(first["status"], json!("pending"));
444        assert_eq!(
445            first["response"]["messages"][0]["text"],
446            json!("Welcome to support!")
447        );
448
449        let key = SessionKey::new(&tenant_ctx, "support.flow", session_hint.clone());
450        let snapshot = session_store.get(&key).await.unwrap().unwrap();
451        assert!(snapshot.waiting.is_some());
452        assert_eq!(snapshot.cursor.position, 1);
453
454        let second = sm
455            .step(
456                &tenant_ctx,
457                "support.flow",
458                session_hint.clone(),
459                json!({ "text": "need help" }),
460            )
461            .await
462            .expect("second step");
463        assert_eq!(second["status"], json!("done"));
464        assert_eq!(
465            second["response"]["messages"][0]["text"],
466            json!("echo: need help")
467        );
468
469        let snapshot = session_store.get(&key).await.unwrap().unwrap();
470        assert!(snapshot.waiting.is_none());
471        assert_eq!(snapshot.cursor.position, 3);
472
473        let history = adapter.history();
474        assert_eq!(history.len(), 2);
475        assert_eq!(history[1]["text"], json!("need help"));
476    }
477
478    fn test_flow() -> FlowDefinition {
479        FlowDefinition::new(
480            FlowSummary {
481                id: "support.flow".into(),
482                name: "Support".into(),
483                version: "1.0.0".into(),
484                description: None,
485            },
486            json!({ "type": "object" }),
487            vec![
488                FlowStep::Adapter(AdapterCall {
489                    adapter: "mock".into(),
490                    operation: "send".into(),
491                    payload: json!({ "text": "Welcome to support!" }),
492                }),
493                FlowStep::AwaitInput {
494                    reason: "await-user".into(),
495                },
496                FlowStep::Adapter(AdapterCall {
497                    adapter: "mock".into(),
498                    operation: "echo".into(),
499                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
500                }),
501            ],
502        )
503    }
504
505    #[derive(Clone, Default)]
506    struct MockChatAdapter {
507        calls: Arc<Mutex<Vec<Value>>>,
508    }
509
510    impl MockChatAdapter {
511        fn history(&self) -> Vec<Value> {
512            self.calls.lock().clone()
513        }
514    }
515
516    #[async_trait]
517    impl Adapter for MockChatAdapter {
518        async fn call(&self, call: &AdapterCall) -> GResult<Value> {
519            self.calls.lock().push(call.payload.clone());
520            match call.operation.as_str() {
521                "send" => Ok(json!({
522                    "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
523                })),
524                "echo" => {
525                    let text = call
526                        .payload
527                        .get("text")
528                        .and_then(Value::as_str)
529                        .unwrap_or("")
530                        .to_string();
531                    Ok(json!({
532                        "messages": [{ "text": format!("echo: {text}") }]
533                    }))
534                }
535                _ => Ok(json!({})),
536            }
537        }
538    }
539}