1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4 HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct FlowDefinition {
24 pub summary: FlowSummary,
25 pub schema: Value,
26 pub steps: Vec<FlowStep>,
27}
28
29impl FlowDefinition {
30 pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
31 Self {
32 summary,
33 schema,
34 steps,
35 }
36 }
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub enum FlowStep {
41 Adapter(AdapterCall),
42 AwaitInput { reason: String },
43 Complete { outcome: Value },
44}
45
46pub struct StateMachine {
47 host: Arc<HostBundle>,
48 adapters: AdapterRegistry,
49 policy: Policy,
50 flows: Arc<RwLock<HashMap<String, FlowDefinition>>>,
51}
52
53impl StateMachine {
54 pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
55 Self {
56 host,
57 adapters,
58 policy,
59 flows: Arc::new(RwLock::new(HashMap::new())),
60 }
61 }
62
63 pub fn register_flow(&self, definition: FlowDefinition) {
64 let mut guard = self.flows.write();
65 guard.insert(definition.summary.id.clone(), definition);
66 }
67
68 pub fn list_flows(&self) -> Vec<FlowSummary> {
69 let guard = self.flows.read();
70 guard.values().map(|flow| flow.summary.clone()).collect()
71 }
72
73 pub fn get_flow_schema(&self, flow_id: &str) -> GResult<FlowSchema> {
74 let guard = self.flows.read();
75 guard
76 .get(flow_id)
77 .map(|flow| FlowSchema {
78 id: flow_id.to_string(),
79 schema_json: flow.schema.clone(),
80 })
81 .ok_or_else(|| RunnerError::FlowNotFound {
82 flow_id: flow_id.to_string(),
83 })
84 }
85
86 pub async fn step(
87 &self,
88 tenant: &TenantCtx,
89 flow_id: &str,
90 session_hint: Option<String>,
91 input: Value,
92 ) -> GResult<Value> {
93 let mut telemetry_ctx = tenant
94 .clone()
95 .with_provider(PROVIDER_ID.to_string())
96 .with_flow(flow_id.to_string());
97 if let Some(hint) = session_hint.as_ref() {
98 telemetry_ctx = telemetry_ctx.with_session(hint.clone());
99 }
100 greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
101
102 let flow = {
103 let guard = self.flows.read();
104 guard
105 .get(flow_id)
106 .cloned()
107 .ok_or_else(|| RunnerError::FlowNotFound {
108 flow_id: flow_id.to_string(),
109 })?
110 };
111
112 self.ensure_policy_budget(&flow)?;
113
114 let key = SessionKey::new(tenant, flow_id, session_hint.clone());
115 let session_host = &self.host.session;
116 let mut session = match session_host.get(&key).await? {
117 Some(snapshot) => snapshot,
118 None => {
119 let session_id = key
120 .stable_session_id()
121 .unwrap_or_else(Self::generate_session_id);
122 SessionSnapshot::new(key.clone(), session_id)
123 }
124 };
125 let is_new = session.revision == 0 && session.outbox.is_empty();
126 let expected_revision = session.revision;
127
128 Self::update_state_input(&mut session, input.clone());
129
130 if session.waiting.is_some() {
131 if session.cursor.position < flow.steps.len()
132 && matches!(
133 flow.steps.get(session.cursor.position),
134 Some(FlowStep::AwaitInput { .. })
135 )
136 {
137 session.cursor.position = session.cursor.position.saturating_add(1);
138 }
139 session.waiting = None;
140 }
141
142 let mut final_outcome = None;
143 loop {
144 if session.cursor.position >= flow.steps.len() {
145 let outcome = session
146 .last_outcome
147 .clone()
148 .unwrap_or_else(|| json!({"status": "done"}));
149 final_outcome = Some(outcome);
150 break;
151 }
152
153 let step = flow
154 .steps
155 .get(session.cursor.position)
156 .cloned()
157 .ok_or_else(|| RunnerError::FlowNotFound {
158 flow_id: flow.summary.id.clone(),
159 })?;
160
161 match step {
162 FlowStep::Adapter(call) => {
163 let outcome = self
164 .execute_adapter_step(&flow, &mut session, call, tenant)
165 .await?;
166 final_outcome = Some(outcome);
167 continue;
168 }
169 FlowStep::AwaitInput { reason } => {
170 let last_response = session
171 .last_outcome
172 .as_ref()
173 .and_then(|value| value.get("response"))
174 .cloned();
175 session.waiting = Some(WaitState {
176 reason: reason.clone(),
177 recorded_at: SystemTime::now(),
178 });
179 let mut pending = json!({
180 "status": "pending",
181 "reason": reason,
182 });
183 if let Some(response) = last_response
184 && let Some(obj) = pending.as_object_mut()
185 {
186 obj.insert("response".into(), response);
187 }
188 session.last_outcome = Some(pending.clone());
189 final_outcome = Some(pending);
190 break;
191 }
192 FlowStep::Complete { outcome } => {
193 session.cursor.position = flow.steps.len();
194 session.last_outcome = Some(json!({
195 "status": "done",
196 "result": outcome,
197 }));
198 final_outcome = session.last_outcome.clone();
199 break;
200 }
201 }
202 }
203
204 let outcome = final_outcome.expect("state machine produced no outcome");
205
206 self.host
207 .state
208 .set_json(&session.key, session.state.clone())
209 .await?;
210
211 if is_new {
212 session_host.put(session).await?;
213 } else if !session_host.update_cas(session, expected_revision).await? {
214 return Err(RunnerError::Session {
215 reason: "compare-and-swap failure".into(),
216 });
217 }
218
219 Ok(outcome)
220 }
221
222 fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
223 if flow.steps.len() > self.policy.max_egress_adapters {
224 return Err(policy_violation(format!(
225 "flow has {} steps exceeding budget {}",
226 flow.steps.len(),
227 self.policy.max_egress_adapters
228 )));
229 }
230 Ok(())
231 }
232
233 async fn execute_adapter_step(
234 &self,
235 flow: &FlowDefinition,
236 session: &mut SessionSnapshot,
237 call: AdapterCall,
238 tenant: &TenantCtx,
239 ) -> GResult<Value> {
240 let adapter =
241 self.adapters
242 .get(&call.adapter)
243 .ok_or_else(|| RunnerError::AdapterMissing {
244 adapter: call.adapter.clone(),
245 })?;
246
247 let resolved_payload = resolve_adapter_payload(&call.payload, session);
248 let payload_bytes =
249 serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
250 reason: err.to_string(),
251 })?;
252
253 if payload_bytes.len() > self.policy.max_payload_bytes {
254 return Err(policy_violation(format!(
255 "payload exceeds max size {} bytes",
256 self.policy.max_payload_bytes
257 )));
258 }
259
260 let seq = session.cursor.outbox_seq;
261 let payload_hash = Self::stable_hash(seq, &payload_bytes);
262 let key = OutboxKey::new(seq, payload_hash.clone());
263
264 let adapter_id = call.adapter.clone();
265 let operation_id = call.operation.clone();
266 let span = SpanContext {
267 trace_id: tenant.trace_id.clone(),
268 span_id: Some(format!("{}:{}", flow.summary.id, seq)),
269 };
270
271 if let Some(entry) = session.outbox.get(&key) {
272 self.host
273 .telemetry
274 .emit(
275 &span,
276 &[
277 ("adapter", adapter_id.as_str()),
278 ("operation", operation_id.as_str()),
279 ("dedup", "hit"),
280 ],
281 )
282 .await?;
283 session.cursor.position += 1;
284 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
285 session.waiting = None;
286 session.last_outcome = Some(json!({
287 "status": "done",
288 "response": entry.response.clone(),
289 }));
290 return Ok(session.last_outcome.clone().unwrap());
291 }
292
293 self.host
294 .telemetry
295 .emit(
296 &span,
297 &[
298 ("adapter", adapter_id.as_str()),
299 ("operation", operation_id.as_str()),
300 ("phase", "start"),
301 ],
302 )
303 .await?;
304
305 let adapter_clone = adapter.clone();
306 let mut call_clone = call.clone();
307 call_clone.payload = resolved_payload.clone();
308 let response = retry_with_jitter(&self.policy.retry, || {
309 let adapter = adapter_clone.clone();
310 let call = call_clone.clone();
311 async move { adapter.call(&call).await }
312 })
313 .await?;
314
315 session.cursor.position += 1;
316 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
317 session.waiting = None;
318 session.outbox.insert(
319 key,
320 SessionOutboxEntry {
321 seq,
322 hash: payload_hash,
323 response: response.clone(),
324 },
325 );
326 Self::update_state_adapter(session, &call, &response);
327 session.last_outcome = Some(json!({
328 "status": "done",
329 "response": response.clone(),
330 }));
331
332 self.host
333 .telemetry
334 .emit(
335 &span,
336 &[
337 ("adapter", adapter_id.as_str()),
338 ("operation", operation_id.as_str()),
339 ("phase", "finish"),
340 ],
341 )
342 .await?;
343
344 Ok(session.last_outcome.clone().unwrap())
345 }
346
347 fn update_state_input(session: &mut SessionSnapshot, input: Value) {
348 if !matches!(session.state, Value::Object(_)) {
349 session.state = Value::Object(Map::new());
350 }
351 if let Value::Object(map) = &mut session.state {
352 map.insert("last_input".to_string(), input);
353 }
354 }
355
356 fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
357 if !matches!(session.state, Value::Object(_)) {
358 session.state = Value::Object(Map::new());
359 }
360 if let Value::Object(map) = &mut session.state {
361 map.insert(
362 "last_adapter".to_string(),
363 Value::String(call.adapter.clone()),
364 );
365 map.insert(
366 "last_operation".to_string(),
367 Value::String(call.operation.clone()),
368 );
369 map.insert("last_response".to_string(), response.clone());
370 }
371 }
372
373 fn stable_hash(seq: u64, payload: &[u8]) -> String {
374 let mut hasher = Sha256::new();
375 hasher.update(seq.to_be_bytes());
376 hasher.update(payload);
377 hex::encode(hasher.finalize())
378 }
379
380 fn generate_session_id() -> String {
381 let mut rng = rng();
382 let value: u128 = rng.random();
383 format!("sess-{value:032x}")
384 }
385}
386
387fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
388 match call_payload {
389 Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
390 .state
391 .as_object()
392 .and_then(|map| map.get("last_input"))
393 .cloned()
394 .unwrap_or(Value::Null),
395 other => other.clone(),
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
403 use crate::engine::host::{SessionHost, StateHost};
404 use crate::engine::registry::Adapter;
405 use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
406 use async_trait::async_trait;
407 use greentic_types::{EnvId, TenantId};
408 use parking_lot::Mutex;
409 use std::str::FromStr;
410
411 #[tokio::test]
412 async fn pauses_and_resumes_after_wait_step() {
413 let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
414 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
415 let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
416 let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
417 let host = Arc::new(HostBundle::new(
418 secrets,
419 telemetry,
420 Arc::clone(&session_store) as Arc<dyn SessionHost>,
421 Arc::clone(&state_store) as Arc<dyn StateHost>,
422 ));
423
424 let adapter = MockChatAdapter::default();
425 let mut adapters = AdapterRegistry::default();
426 adapters.register("mock", Box::new(adapter.clone()));
427 let policy = Policy::default();
428
429 let sm = StateMachine::new(host, adapters, policy);
430 sm.register_flow(test_flow());
431
432 let env = EnvId::from_str("local").unwrap();
433 let tenant_id = TenantId::from_str("demo").unwrap();
434 let tenant_ctx = TenantCtx::new(env, tenant_id);
435 let session_hint = Some("demo:telegram:chat:user".to_string());
436
437 let first = sm
438 .step(
439 &tenant_ctx,
440 "support.flow",
441 session_hint.clone(),
442 json!({ "text": "hi" }),
443 )
444 .await
445 .expect("first step");
446 assert_eq!(first["status"], json!("pending"));
447 assert_eq!(
448 first["response"]["messages"][0]["text"],
449 json!("Welcome to support!")
450 );
451
452 let key = SessionKey::new(&tenant_ctx, "support.flow", session_hint.clone());
453 let snapshot = session_store.get(&key).await.unwrap().unwrap();
454 assert!(snapshot.waiting.is_some());
455 assert_eq!(snapshot.cursor.position, 1);
456
457 let second = sm
458 .step(
459 &tenant_ctx,
460 "support.flow",
461 session_hint.clone(),
462 json!({ "text": "need help" }),
463 )
464 .await
465 .expect("second step");
466 assert_eq!(second["status"], json!("done"));
467 assert_eq!(
468 second["response"]["messages"][0]["text"],
469 json!("echo: need help")
470 );
471
472 let snapshot = session_store.get(&key).await.unwrap().unwrap();
473 assert!(snapshot.waiting.is_none());
474 assert_eq!(snapshot.cursor.position, 3);
475
476 let history = adapter.history();
477 assert_eq!(history.len(), 2);
478 assert_eq!(history[1]["text"], json!("need help"));
479 }
480
481 fn test_flow() -> FlowDefinition {
482 FlowDefinition::new(
483 FlowSummary {
484 id: "support.flow".into(),
485 name: "Support".into(),
486 version: "1.0.0".into(),
487 description: None,
488 },
489 json!({ "type": "object" }),
490 vec![
491 FlowStep::Adapter(AdapterCall {
492 adapter: "mock".into(),
493 operation: "send".into(),
494 payload: json!({ "text": "Welcome to support!" }),
495 }),
496 FlowStep::AwaitInput {
497 reason: "await-user".into(),
498 },
499 FlowStep::Adapter(AdapterCall {
500 adapter: "mock".into(),
501 operation: "echo".into(),
502 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
503 }),
504 ],
505 )
506 }
507
508 #[derive(Clone, Default)]
509 struct MockChatAdapter {
510 calls: Arc<Mutex<Vec<Value>>>,
511 }
512
513 impl MockChatAdapter {
514 fn history(&self) -> Vec<Value> {
515 self.calls.lock().clone()
516 }
517 }
518
519 #[async_trait]
520 impl Adapter for MockChatAdapter {
521 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
522 self.calls.lock().push(call.payload.clone());
523 match call.operation.as_str() {
524 "send" => Ok(json!({
525 "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
526 })),
527 "echo" => {
528 let text = call
529 .payload
530 .get("text")
531 .and_then(Value::as_str)
532 .unwrap_or("")
533 .to_string();
534 Ok(json!({
535 "messages": [{ "text": format!("echo: {text}") }]
536 }))
537 }
538 _ => Ok(json!({})),
539 }
540 }
541 }
542}