greentic_runner_host/engine/
state_machine.rs

1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4    HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23struct FlowKey {
24    pack_id: String,
25    flow_id: String,
26}
27
28impl FlowKey {
29    fn new(pack_id: &str, flow_id: &str) -> Self {
30        Self {
31            pack_id: pack_id.to_string(),
32            flow_id: flow_id.to_string(),
33        }
34    }
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowDefinition {
39    pub summary: FlowSummary,
40    pub schema: Value,
41    pub steps: Vec<FlowStep>,
42}
43
44impl FlowDefinition {
45    pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
46        Self {
47            summary,
48            schema,
49            steps,
50        }
51    }
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub enum FlowStep {
56    Adapter(AdapterCall),
57    AwaitInput { reason: String },
58    Complete { outcome: Value },
59}
60
61pub struct StateMachine {
62    host: Arc<HostBundle>,
63    adapters: AdapterRegistry,
64    policy: Policy,
65    flows: Arc<RwLock<HashMap<FlowKey, FlowDefinition>>>,
66}
67
68impl StateMachine {
69    pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
70        Self {
71            host,
72            adapters,
73            policy,
74            flows: Arc::new(RwLock::new(HashMap::new())),
75        }
76    }
77
78    pub fn register_flow(&self, definition: FlowDefinition) {
79        let mut guard = self.flows.write();
80        guard.insert(
81            FlowKey::new(&definition.summary.pack_id, &definition.summary.id),
82            definition,
83        );
84    }
85
86    pub fn list_flows(&self) -> Vec<FlowSummary> {
87        let guard = self.flows.read();
88        guard.values().map(|flow| flow.summary.clone()).collect()
89    }
90
91    pub fn get_flow_schema(&self, pack_id: &str, flow_id: &str) -> GResult<FlowSchema> {
92        let guard = self.flows.read();
93        guard
94            .get(&FlowKey::new(pack_id, flow_id))
95            .map(|flow| FlowSchema {
96                pack_id: pack_id.to_string(),
97                id: flow_id.to_string(),
98                schema_json: flow.schema.clone(),
99            })
100            .ok_or_else(|| RunnerError::FlowNotFound {
101                flow_id: flow_id.to_string(),
102            })
103    }
104
105    pub async fn step(
106        &self,
107        tenant: &TenantCtx,
108        pack_id: &str,
109        flow_id: &str,
110        session_hint: Option<String>,
111        input: Value,
112    ) -> GResult<Value> {
113        let mut telemetry_ctx = tenant
114            .clone()
115            .with_provider(PROVIDER_ID.to_string())
116            .with_flow(flow_id.to_string());
117        if let Some(hint) = session_hint.as_ref() {
118            telemetry_ctx = telemetry_ctx.with_session(hint.clone());
119        }
120        greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
121
122        let flow = {
123            let guard = self.flows.read();
124            guard
125                .get(&FlowKey::new(pack_id, flow_id))
126                .cloned()
127                .ok_or_else(|| RunnerError::FlowNotFound {
128                    flow_id: flow_id.to_string(),
129                })?
130        };
131
132        self.ensure_policy_budget(&flow)?;
133
134        let key = SessionKey::new(tenant, pack_id, flow_id, session_hint.clone());
135        let session_host = &self.host.session;
136        let mut session = match session_host.get(&key).await? {
137            Some(snapshot) => snapshot,
138            None => {
139                let session_id = key
140                    .stable_session_id()
141                    .unwrap_or_else(Self::generate_session_id);
142                SessionSnapshot::new(key.clone(), session_id)
143            }
144        };
145        let is_new = session.revision == 0 && session.outbox.is_empty();
146        let expected_revision = session.revision;
147
148        Self::update_state_input(&mut session, input.clone());
149
150        if session.waiting.is_some() {
151            if session.cursor.position < flow.steps.len()
152                && matches!(
153                    flow.steps.get(session.cursor.position),
154                    Some(FlowStep::AwaitInput { .. })
155                )
156            {
157                session.cursor.position = session.cursor.position.saturating_add(1);
158            }
159            session.waiting = None;
160        }
161
162        let outcome = loop {
163            if session.cursor.position >= flow.steps.len() {
164                let outcome = session
165                    .last_outcome
166                    .clone()
167                    .unwrap_or_else(|| json!({"status": "done"}));
168                break outcome;
169            }
170
171            let step = flow
172                .steps
173                .get(session.cursor.position)
174                .cloned()
175                .ok_or_else(|| RunnerError::FlowNotFound {
176                    flow_id: flow.summary.id.clone(),
177                })?;
178
179            match step {
180                FlowStep::Adapter(call) => {
181                    let outcome = self
182                        .execute_adapter_step(&flow, &mut session, call, tenant)
183                        .await?;
184                    session.last_outcome = Some(outcome.clone());
185                    continue;
186                }
187                FlowStep::AwaitInput { reason } => {
188                    let last_response = session
189                        .last_outcome
190                        .as_ref()
191                        .and_then(|value| value.get("response"))
192                        .cloned();
193                    session.waiting = Some(WaitState {
194                        reason: reason.clone(),
195                        recorded_at: SystemTime::now(),
196                    });
197                    let mut pending = json!({
198                        "status": "pending",
199                        "reason": reason,
200                    });
201                    if let Some(response) = last_response
202                        && let Some(obj) = pending.as_object_mut()
203                    {
204                        obj.insert("response".into(), response);
205                    }
206                    session.last_outcome = Some(pending.clone());
207                    break pending;
208                }
209                FlowStep::Complete { outcome } => {
210                    session.cursor.position = flow.steps.len();
211                    session.last_outcome = Some(json!({
212                        "status": "done",
213                        "result": outcome,
214                    }));
215                    break session
216                        .last_outcome
217                        .clone()
218                        .expect("complete step should set outcome");
219                }
220            }
221        };
222
223        self.host
224            .state
225            .set_json(&session.key, session.state.clone())
226            .await?;
227
228        if is_new {
229            session_host.put(session).await?;
230        } else if !session_host.update_cas(session, expected_revision).await? {
231            return Err(RunnerError::Session {
232                reason: "compare-and-swap failure".into(),
233            });
234        }
235
236        Ok(outcome)
237    }
238
239    fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
240        if flow.steps.len() > self.policy.max_egress_adapters {
241            return Err(policy_violation(format!(
242                "flow has {} steps exceeding budget {}",
243                flow.steps.len(),
244                self.policy.max_egress_adapters
245            )));
246        }
247        Ok(())
248    }
249
250    async fn execute_adapter_step(
251        &self,
252        flow: &FlowDefinition,
253        session: &mut SessionSnapshot,
254        call: AdapterCall,
255        tenant: &TenantCtx,
256    ) -> GResult<Value> {
257        let adapter =
258            self.adapters
259                .get(&call.adapter)
260                .ok_or_else(|| RunnerError::AdapterMissing {
261                    adapter: call.adapter.clone(),
262                })?;
263
264        let resolved_payload = resolve_adapter_payload(&call.payload, session);
265        let payload_bytes =
266            serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
267                reason: err.to_string(),
268            })?;
269
270        if payload_bytes.len() > self.policy.max_payload_bytes {
271            return Err(policy_violation(format!(
272                "payload exceeds max size {} bytes",
273                self.policy.max_payload_bytes
274            )));
275        }
276
277        let seq = session.cursor.outbox_seq;
278        let payload_hash = Self::stable_hash(seq, &payload_bytes);
279        let key = OutboxKey::new(seq, payload_hash.clone());
280
281        let adapter_id = call.adapter.clone();
282        let operation_id = call.operation.clone();
283        let span = SpanContext {
284            trace_id: tenant.trace_id.clone(),
285            span_id: Some(format!("{}:{}", flow.summary.id, seq)),
286        };
287
288        if let Some(entry) = session.outbox.get(&key) {
289            self.host
290                .telemetry
291                .emit(
292                    &span,
293                    &[
294                        ("adapter", adapter_id.as_str()),
295                        ("operation", operation_id.as_str()),
296                        ("dedup", "hit"),
297                    ],
298                )
299                .await?;
300            session.cursor.position += 1;
301            session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
302            session.waiting = None;
303            session.last_outcome = Some(json!({
304                "status": "done",
305                "response": entry.response.clone(),
306            }));
307            return Ok(session.last_outcome.clone().unwrap());
308        }
309
310        self.host
311            .telemetry
312            .emit(
313                &span,
314                &[
315                    ("adapter", adapter_id.as_str()),
316                    ("operation", operation_id.as_str()),
317                    ("phase", "start"),
318                ],
319            )
320            .await?;
321
322        let adapter_clone = adapter.clone();
323        let mut call_clone = call.clone();
324        call_clone.payload = resolved_payload.clone();
325        let response = retry_with_jitter(&self.policy.retry, || {
326            let adapter = adapter_clone.clone();
327            let call = call_clone.clone();
328            async move { adapter.call(&call).await }
329        })
330        .await?;
331
332        session.cursor.position += 1;
333        session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
334        session.waiting = None;
335        session.outbox.insert(
336            key,
337            SessionOutboxEntry {
338                seq,
339                hash: payload_hash,
340                response: response.clone(),
341            },
342        );
343        Self::update_state_adapter(session, &call, &response);
344        session.last_outcome = Some(json!({
345            "status": "done",
346            "response": response.clone(),
347        }));
348
349        self.host
350            .telemetry
351            .emit(
352                &span,
353                &[
354                    ("adapter", adapter_id.as_str()),
355                    ("operation", operation_id.as_str()),
356                    ("phase", "finish"),
357                ],
358            )
359            .await?;
360
361        Ok(session.last_outcome.clone().unwrap())
362    }
363
364    fn update_state_input(session: &mut SessionSnapshot, input: Value) {
365        if !matches!(session.state, Value::Object(_)) {
366            session.state = Value::Object(Map::new());
367        }
368        if let Value::Object(map) = &mut session.state {
369            map.insert("last_input".to_string(), input);
370        }
371    }
372
373    fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
374        if !matches!(session.state, Value::Object(_)) {
375            session.state = Value::Object(Map::new());
376        }
377        if let Value::Object(map) = &mut session.state {
378            map.insert(
379                "last_adapter".to_string(),
380                Value::String(call.adapter.clone()),
381            );
382            map.insert(
383                "last_operation".to_string(),
384                Value::String(call.operation.clone()),
385            );
386            map.insert("last_response".to_string(), response.clone());
387        }
388    }
389
390    fn stable_hash(seq: u64, payload: &[u8]) -> String {
391        let mut hasher = Sha256::new();
392        hasher.update(seq.to_be_bytes());
393        hasher.update(payload);
394        hex::encode(hasher.finalize())
395    }
396
397    fn generate_session_id() -> String {
398        let mut rng = rng();
399        let value: u128 = rng.random();
400        format!("sess-{value:032x}")
401    }
402}
403
404fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
405    match call_payload {
406        Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
407            .state
408            .as_object()
409            .and_then(|map| map.get("last_input"))
410            .cloned()
411            .unwrap_or(Value::Null),
412        other => other.clone(),
413    }
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
420    use crate::engine::host::{SessionHost, StateHost};
421    use crate::engine::registry::Adapter;
422    use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
423    use async_trait::async_trait;
424    use greentic_types::{EnvId, TenantId};
425    use parking_lot::Mutex;
426    use std::str::FromStr;
427
428    #[tokio::test]
429    async fn pauses_and_resumes_after_wait_step() {
430        let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
431        let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
432        let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
433        let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
434        let host = Arc::new(HostBundle::new(
435            secrets,
436            telemetry,
437            Arc::clone(&session_store) as Arc<dyn SessionHost>,
438            Arc::clone(&state_store) as Arc<dyn StateHost>,
439        ));
440
441        let adapter = MockChatAdapter::default();
442        let mut adapters = AdapterRegistry::default();
443        adapters.register("mock", Box::new(adapter.clone()));
444        let policy = Policy::default();
445
446        let sm = StateMachine::new(host, adapters, policy);
447        sm.register_flow(test_flow());
448
449        let env = EnvId::from_str("local").unwrap();
450        let tenant_id = TenantId::from_str("demo").unwrap();
451        let tenant_ctx = TenantCtx::new(env, tenant_id);
452        let session_hint = Some("demo:telegram:chat:user".to_string());
453
454        let first = sm
455            .step(
456                &tenant_ctx,
457                "test-pack",
458                "support.flow",
459                session_hint.clone(),
460                json!({ "text": "hi" }),
461            )
462            .await
463            .expect("first step");
464        assert_eq!(first["status"], json!("pending"));
465        assert_eq!(
466            first["response"]["messages"][0]["text"],
467            json!("Welcome to support!")
468        );
469
470        let key = SessionKey::new(
471            &tenant_ctx,
472            "test-pack",
473            "support.flow",
474            session_hint.clone(),
475        );
476        let snapshot = session_store.get(&key).await.unwrap().unwrap();
477        assert!(snapshot.waiting.is_some());
478        assert_eq!(snapshot.cursor.position, 1);
479
480        let second = sm
481            .step(
482                &tenant_ctx,
483                "test-pack",
484                "support.flow",
485                session_hint.clone(),
486                json!({ "text": "need help" }),
487            )
488            .await
489            .expect("second step");
490        assert_eq!(second["status"], json!("done"));
491        assert_eq!(
492            second["response"]["messages"][0]["text"],
493            json!("echo: need help")
494        );
495
496        let snapshot = session_store.get(&key).await.unwrap().unwrap();
497        assert!(snapshot.waiting.is_none());
498        assert_eq!(snapshot.cursor.position, 3);
499
500        let history = adapter.history();
501        assert_eq!(history.len(), 2);
502        assert_eq!(history[1]["text"], json!("need help"));
503    }
504
505    fn test_flow() -> FlowDefinition {
506        FlowDefinition::new(
507            FlowSummary {
508                pack_id: "test-pack".into(),
509                id: "support.flow".into(),
510                name: "Support".into(),
511                version: "1.0.0".into(),
512                description: None,
513            },
514            json!({ "type": "object" }),
515            vec![
516                FlowStep::Adapter(AdapterCall {
517                    adapter: "mock".into(),
518                    operation: "send".into(),
519                    payload: json!({ "text": "Welcome to support!" }),
520                }),
521                FlowStep::AwaitInput {
522                    reason: "await-user".into(),
523                },
524                FlowStep::Adapter(AdapterCall {
525                    adapter: "mock".into(),
526                    operation: "echo".into(),
527                    payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
528                }),
529            ],
530        )
531    }
532
533    #[derive(Clone, Default)]
534    struct MockChatAdapter {
535        calls: Arc<Mutex<Vec<Value>>>,
536    }
537
538    impl MockChatAdapter {
539        fn history(&self) -> Vec<Value> {
540            self.calls.lock().clone()
541        }
542    }
543
544    #[async_trait]
545    impl Adapter for MockChatAdapter {
546        async fn call(&self, call: &AdapterCall) -> GResult<Value> {
547            self.calls.lock().push(call.payload.clone());
548            match call.operation.as_str() {
549                "send" => Ok(json!({
550                    "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
551                })),
552                "echo" => {
553                    let text = call
554                        .payload
555                        .get("text")
556                        .and_then(Value::as_str)
557                        .unwrap_or("")
558                        .to_string();
559                    Ok(json!({
560                        "messages": [{ "text": format!("echo: {text}") }]
561                    }))
562                }
563                _ => Ok(json!({})),
564            }
565        }
566    }
567}