greentic_runner_host/engine/
state_machine.rs

1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4    HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct FlowDefinition {
24    pub summary: FlowSummary,
25    pub schema: Value,
26    pub steps: Vec<FlowStep>,
27}
28
29impl FlowDefinition {
30    pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
31        Self {
32            summary,
33            schema,
34            steps,
35        }
36    }
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub enum FlowStep {
41    Adapter(AdapterCall),
42    AwaitInput { reason: String },
43    Complete { outcome: Value },
44}
45
46pub struct StateMachine {
47    host: Arc<HostBundle>,
48    adapters: AdapterRegistry,
49    policy: Policy,
50    flows: Arc<RwLock<HashMap<String, FlowDefinition>>>,
51}
52
53impl StateMachine {
54    pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
55        Self {
56            host,
57            adapters,
58            policy,
59            flows: Arc::new(RwLock::new(HashMap::new())),
60        }
61    }
62
63    pub fn register_flow(&self, definition: FlowDefinition) {
64        let mut guard = self.flows.write();
65        guard.insert(definition.summary.id.clone(), definition);
66    }
67
68    pub fn list_flows(&self) -> Vec<FlowSummary> {
69        let guard = self.flows.read();
70        guard.values().map(|flow| flow.summary.clone()).collect()
71    }
72
73    pub fn get_flow_schema(&self, flow_id: &str) -> GResult<FlowSchema> {
74        let guard = self.flows.read();
75        guard
76            .get(flow_id)
77            .map(|flow| FlowSchema {
78                id: flow_id.to_string(),
79                schema_json: flow.schema.clone(),
80            })
81            .ok_or_else(|| RunnerError::FlowNotFound {
82                flow_id: flow_id.to_string(),
83            })
84    }
85
86    pub async fn step(
87        &self,
88        tenant: &TenantCtx,
89        flow_id: &str,
90        session_hint: Option<String>,
91        input: Value,
92    ) -> GResult<Value> {
93        let mut telemetry_ctx = tenant
94            .clone()
95            .with_provider(PROVIDER_ID.to_string())
96            .with_flow(flow_id.to_string());
97        if let Some(hint) = session_hint.as_ref() {
98            telemetry_ctx = telemetry_ctx.with_session(hint.clone());
99        }
100        greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
101
102        let flow = {
103            let guard = self.flows.read();
104            guard
105                .get(flow_id)
106                .cloned()
107                .ok_or_else(|| RunnerError::FlowNotFound {
108                    flow_id: flow_id.to_string(),
109                })?
110        };
111
112        self.ensure_policy_budget(&flow)?;
113
114        let key = SessionKey::new(tenant, flow_id, session_hint.clone());
115        let session_host = &self.host.session;
116        let mut session = match session_host.get(&key).await? {
117            Some(snapshot) => snapshot,
118            None => {
119                let session_id = key
120                    .stable_session_id()
121                    .unwrap_or_else(Self::generate_session_id);
122                SessionSnapshot::new(key.clone(), session_id)
123            }
124        };
125        let is_new = session.revision == 0 && session.outbox.is_empty();
126        let expected_revision = session.revision;
127
128        Self::update_state_input(&mut session, input.clone());
129
130        if session.waiting.is_some() {
131            if session.cursor.position < flow.steps.len()
132                && matches!(
133                    flow.steps.get(session.cursor.position),
134                    Some(FlowStep::AwaitInput { .. })
135                )
136            {
137                session.cursor.position = session.cursor.position.saturating_add(1);
138            }
139            session.waiting = None;
140        }
141
142        let mut final_outcome = None;
143        loop {
144            if session.cursor.position >= flow.steps.len() {
145                let outcome = session
146                    .last_outcome
147                    .clone()
148                    .unwrap_or_else(|| json!({"status": "done"}));
149                final_outcome = Some(outcome);
150                break;
151            }
152
153            let step = flow
154                .steps
155                .get(session.cursor.position)
156                .cloned()
157                .ok_or_else(|| RunnerError::FlowNotFound {
158                    flow_id: flow.summary.id.clone(),
159                })?;
160
161            match step {
162                FlowStep::Adapter(call) => {
163                    let outcome = self
164                        .execute_adapter_step(&flow, &mut session, call, tenant)
165                        .await?;
166                    final_outcome = Some(outcome);
167                    continue;
168                }
169                FlowStep::AwaitInput { reason } => {
170                    let last_response = session
171                        .last_outcome
172                        .as_ref()
173                        .and_then(|value| value.get("response"))
174                        .cloned();
175                    session.waiting = Some(WaitState {
176                        reason: reason.clone(),
177                        recorded_at: SystemTime::now(),
178                    });
179                    let mut pending = json!({
180                        "status": "pending",
181                        "reason": reason,
182                    });
183                    if let Some(response) = last_response
184                        && let Some(obj) = pending.as_object_mut()
185                    {
186                        obj.insert("response".into(), response);
187                    }
188                    session.last_outcome = Some(pending.clone());
189                    final_outcome = Some(pending);
190                    break;
191                }
192                FlowStep::Complete { outcome } => {
193                    session.cursor.position = flow.steps.len();
194                    session.last_outcome = Some(json!({
195                        "status": "done",
196                        "result": outcome,
197                    }));
198                    final_outcome = session.last_outcome.clone();
199                    break;
200                }
201            }
202        }
203
204        let outcome = final_outcome.expect("state machine produced no outcome");
205
206        self.host
207            .state
208            .set_json(&session.key, session.state.clone())
209            .await?;
210
211        if is_new {
212            session_host.put(session).await?;
213        } else if !session_host.update_cas(session, expected_revision).await? {
214            return Err(RunnerError::Session {
215                reason: "compare-and-swap failure".into(),
216            });
217        }
218
219        Ok(outcome)
220    }
221
222    fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
223        if flow.steps.len() > self.policy.max_egress_adapters {
224            return Err(policy_violation(format!(
225                "flow has {} steps exceeding budget {}",
226                flow.steps.len(),
227                self.policy.max_egress_adapters
228            )));
229        }
230        Ok(())
231    }
232
233    async fn execute_adapter_step(
234        &self,
235        flow: &FlowDefinition,
236        session: &mut SessionSnapshot,
237        call: AdapterCall,
238        tenant: &TenantCtx,
239    ) -> GResult<Value> {
240        let adapter =
241            self.adapters
242                .get(&call.adapter)
243                .ok_or_else(|| RunnerError::AdapterMissing {
244                    adapter: call.adapter.clone(),
245                })?;
246
247        let resolved_payload = resolve_adapter_payload(&call.payload, session);
248        let payload_bytes =
249            serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
250                reason: err.to_string(),
251            })?;
252
253        if payload_bytes.len() > self.policy.max_payload_bytes {
254            return Err(policy_violation(format!(
255                "payload exceeds max size {} bytes",
256                self.policy.max_payload_bytes
257            )));
258        }
259
260        let seq = session.cursor.outbox_seq;
261        let payload_hash = Self::stable_hash(seq, &payload_bytes);
262        let key = OutboxKey::new(seq, payload_hash.clone());
263
264        let adapter_id = call.adapter.clone();
265        let operation_id = call.operation.clone();
266        let span = SpanContext {
267            trace_id: tenant.trace_id.clone(),
268            span_id: Some(format!("{}:{}", flow.summary.id, seq)),
269        };
270
271        if let Some(entry) = session.outbox.get(&key) {
272            self.host
273                .telemetry
274                .emit(
275                    &span,
276                    &[
277                        ("adapter", adapter_id.as_str()),
278                        ("operation", operation_id.as_str()),
279                        ("dedup", "hit"),
280                    ],
281                )
282                .await?;
283            session.cursor.position += 1;
284            session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
285            session.waiting = None;
286            session.last_outcome = Some(json!({
287                "status": "done",
288                "response": entry.response.clone(),
289            }));
290            return Ok(session.last_outcome.clone().unwrap());
291        }
292
293        self.host
294            .telemetry
295            .emit(
296                &span,
297                &[
298                    ("adapter", adapter_id.as_str()),
299                    ("operation", operation_id.as_str()),
300                    ("phase", "start"),
301                ],
302            )
303            .await?;
304
305        let adapter_clone = adapter.clone();
306        let mut call_clone = call.clone();
307        call_clone.payload = resolved_payload.clone();
308        let response = retry_with_jitter(&self.policy.retry, || {
309            let adapter = adapter_clone.clone();
310            let call = call_clone.clone();
311            async move { adapter.call(&call).await }
312        })
313        .await?;
314
315        session.cursor.position += 1;
316        session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
317        session.waiting = None;
318        session.outbox.insert(
319            key,
320            SessionOutboxEntry {
321                seq,
322                hash: payload_hash,
323                response: response.clone(),
324            },
325        );
326        Self::update_state_adapter(session, &call, &response);
327        session.last_outcome = Some(json!({
328            "status": "done",
329            "response": response.clone(),
330        }));
331
332        self.host
333            .telemetry
334            .emit(
335                &span,
336                &[
337                    ("adapter", adapter_id.as_str()),
338                    ("operation", operation_id.as_str()),
339                    ("phase", "finish"),
340                ],
341            )
342            .await?;
343
344        Ok(session.last_outcome.clone().unwrap())
345    }
346
347    fn update_state_input(session: &mut SessionSnapshot, input: Value) {
348        if !matches!(session.state, Value::Object(_)) {
349            session.state = Value::Object(Map::new());
350        }
351        if let Value::Object(map) = &mut session.state {
352            map.insert("last_input".to_string(), input);
353        }
354    }
355
356    fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
357        if !matches!(session.state, Value::Object(_)) {
358            session.state = Value::Object(Map::new());
359        }
360        if let Value::Object(map) = &mut session.state {
361            map.insert(
362                "last_adapter".to_string(),
363                Value::String(call.adapter.clone()),
364            );
365            map.insert(
366                "last_operation".to_string(),
367                Value::String(call.operation.clone()),
368            );
369            map.insert("last_response".to_string(), response.clone());
370        }
371    }
372
373    fn stable_hash(seq: u64, payload: &[u8]) -> String {
374        let mut hasher = Sha256::new();
375        hasher.update(seq.to_be_bytes());
376        hasher.update(payload);
377        hex::encode(hasher.finalize())
378    }
379
380    fn generate_session_id() -> String {
381        let mut rng = rng();
382        let value: u128 = rng.random();
383        format!("sess-{value:032x}")
384    }
385}
386
387fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
388    match call_payload {
389        Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
390            .state
391            .as_object()
392            .and_then(|map| map.get("last_input"))
393            .cloned()
394            .unwrap_or(Value::Null),
395        other => other.clone(),
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
403    use crate::engine::host::{SessionHost, StateHost};
404    use crate::engine::registry::Adapter;
405    use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
406    use async_trait::async_trait;
407    use greentic_types::{EnvId, TenantId};
408    use parking_lot::Mutex;
409    use std::str::FromStr;
410
411    #[tokio::test]
412    async fn pauses_and_resumes_after_wait_step() {
413        let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
414        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
415        let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
416        let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
417        let host = Arc::new(HostBundle::new(
418            secrets,
419            telemetry,
420            Arc::clone(&session_store) as Arc<dyn SessionHost>,
421            Arc::clone(&state_store) as Arc<dyn StateHost>,
422        ));
423
424        let adapter = MockChatAdapter::default();
425        let mut adapters = AdapterRegistry::default();
426        adapters.register("mock", Box::new(adapter.clone()));
427        let policy = Policy::default();
428
429        let sm = StateMachine::new(host, adapters, policy);
430        sm.register_flow(test_flow());
431
432        let env = EnvId::from_str("local").unwrap();
433        let tenant_id = TenantId::from_str("demo").unwrap();
434        let tenant_ctx = TenantCtx::new(env, tenant_id);
435        let session_hint = Some("demo:telegram:chat:user".to_string());
436
437        let first = sm
438            .step(
439                &tenant_ctx,
440                "support.flow",
441                session_hint.clone(),
442                json!({ "text": "hi" }),
443            )
444            .await
445            .expect("first step");
446        assert_eq!(first["status"], json!("pending"));
447        assert_eq!(
448            first["response"]["messages"][0]["text"],
449            json!("Welcome to support!")
450        );
451
452        let key = SessionKey::new(&tenant_ctx, "support.flow", session_hint.clone());
453        let snapshot = session_store.get(&key).await.unwrap().unwrap();
454        assert!(snapshot.waiting.is_some());
455        assert_eq!(snapshot.cursor.position, 1);
456
457        let second = sm
458            .step(
459                &tenant_ctx,
460                "support.flow",
461                session_hint.clone(),
462                json!({ "text": "need help" }),
463            )
464            .await
465            .expect("second step");
466        assert_eq!(second["status"], json!("done"));
467        assert_eq!(
468            second["response"]["messages"][0]["text"],
469            json!("echo: need help")
470        );
471
472        let snapshot = session_store.get(&key).await.unwrap().unwrap();
473        assert!(snapshot.waiting.is_none());
474        assert_eq!(snapshot.cursor.position, 3);
475
476        let history = adapter.history();
477        assert_eq!(history.len(), 2);
478        assert_eq!(history[1]["text"], json!("need help"));
479    }
480
481    fn test_flow() -> FlowDefinition {
482        FlowDefinition::new(
483            FlowSummary {
484                id: "support.flow".into(),
485                name: "Support".into(),
486                version: "1.0.0".into(),
487                description: None,
488            },
489            json!({ "type": "object" }),
490            vec![
491                FlowStep::Adapter(AdapterCall {
492                    adapter: "mock".into(),
493                    operation: "send".into(),
494                    payload: json!({ "text": "Welcome to support!" }),
495                }),
496                FlowStep::AwaitInput {
497                    reason: "await-user".into(),
498                },
499                FlowStep::Adapter(AdapterCall {
500                    adapter: "mock".into(),
501                    operation: "echo".into(),
502                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
503                }),
504            ],
505        )
506    }
507
508    #[derive(Clone, Default)]
509    struct MockChatAdapter {
510        calls: Arc<Mutex<Vec<Value>>>,
511    }
512
513    impl MockChatAdapter {
514        fn history(&self) -> Vec<Value> {
515            self.calls.lock().clone()
516        }
517    }
518
519    #[async_trait]
520    impl Adapter for MockChatAdapter {
521        async fn call(&self, call: &AdapterCall) -> GResult<Value> {
522            self.calls.lock().push(call.payload.clone());
523            match call.operation.as_str() {
524                "send" => Ok(json!({
525                    "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
526                })),
527                "echo" => {
528                    let text = call
529                        .payload
530                        .get("text")
531                        .and_then(Value::as_str)
532                        .unwrap_or("")
533                        .to_string();
534                    Ok(json!({
535                        "messages": [{ "text": format!("echo: {text}") }]
536                    }))
537                }
538                _ => Ok(json!({})),
539            }
540        }
541    }
542}