1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4 HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, Serialize, Deserialize)]
23pub struct FlowDefinition {
24 pub summary: FlowSummary,
25 pub schema: Value,
26 pub steps: Vec<FlowStep>,
27}
28
29impl FlowDefinition {
30 pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
31 Self {
32 summary,
33 schema,
34 steps,
35 }
36 }
37}
38
39#[derive(Clone, Debug, Serialize, Deserialize)]
40pub enum FlowStep {
41 Adapter(AdapterCall),
42 AwaitInput { reason: String },
43 Complete { outcome: Value },
44}
45
46pub struct StateMachine {
47 host: Arc<HostBundle>,
48 adapters: AdapterRegistry,
49 policy: Policy,
50 flows: Arc<RwLock<HashMap<String, FlowDefinition>>>,
51}
52
53impl StateMachine {
54 pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
55 Self {
56 host,
57 adapters,
58 policy,
59 flows: Arc::new(RwLock::new(HashMap::new())),
60 }
61 }
62
63 pub fn register_flow(&self, definition: FlowDefinition) {
64 let mut guard = self.flows.write();
65 guard.insert(definition.summary.id.clone(), definition);
66 }
67
68 pub fn list_flows(&self) -> Vec<FlowSummary> {
69 let guard = self.flows.read();
70 guard.values().map(|flow| flow.summary.clone()).collect()
71 }
72
73 pub fn get_flow_schema(&self, flow_id: &str) -> GResult<FlowSchema> {
74 let guard = self.flows.read();
75 guard
76 .get(flow_id)
77 .map(|flow| FlowSchema {
78 id: flow_id.to_string(),
79 schema_json: flow.schema.clone(),
80 })
81 .ok_or_else(|| RunnerError::FlowNotFound {
82 flow_id: flow_id.to_string(),
83 })
84 }
85
86 pub async fn step(
87 &self,
88 tenant: &TenantCtx,
89 flow_id: &str,
90 session_hint: Option<String>,
91 input: Value,
92 ) -> GResult<Value> {
93 let mut telemetry_ctx = tenant
94 .clone()
95 .with_provider(PROVIDER_ID.to_string())
96 .with_flow(flow_id.to_string());
97 if let Some(hint) = session_hint.as_ref() {
98 telemetry_ctx = telemetry_ctx.with_session(hint.clone());
99 }
100 greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
101
102 let flow = {
103 let guard = self.flows.read();
104 guard
105 .get(flow_id)
106 .cloned()
107 .ok_or_else(|| RunnerError::FlowNotFound {
108 flow_id: flow_id.to_string(),
109 })?
110 };
111
112 self.ensure_policy_budget(&flow)?;
113
114 let key = SessionKey::new(tenant, flow_id, session_hint.clone());
115 let session_host = &self.host.session;
116 let mut session = match session_host.get(&key).await? {
117 Some(snapshot) => snapshot,
118 None => {
119 let session_id = key
120 .stable_session_id()
121 .unwrap_or_else(Self::generate_session_id);
122 SessionSnapshot::new(key.clone(), session_id)
123 }
124 };
125 let is_new = session.revision == 0 && session.outbox.is_empty();
126 let expected_revision = session.revision;
127
128 Self::update_state_input(&mut session, input.clone());
129
130 if session.waiting.is_some() {
131 if session.cursor.position < flow.steps.len()
132 && matches!(
133 flow.steps.get(session.cursor.position),
134 Some(FlowStep::AwaitInput { .. })
135 )
136 {
137 session.cursor.position = session.cursor.position.saturating_add(1);
138 }
139 session.waiting = None;
140 }
141
142 let outcome = loop {
143 if session.cursor.position >= flow.steps.len() {
144 let outcome = session
145 .last_outcome
146 .clone()
147 .unwrap_or_else(|| json!({"status": "done"}));
148 break outcome;
149 }
150
151 let step = flow
152 .steps
153 .get(session.cursor.position)
154 .cloned()
155 .ok_or_else(|| RunnerError::FlowNotFound {
156 flow_id: flow.summary.id.clone(),
157 })?;
158
159 match step {
160 FlowStep::Adapter(call) => {
161 let outcome = self
162 .execute_adapter_step(&flow, &mut session, call, tenant)
163 .await?;
164 session.last_outcome = Some(outcome.clone());
165 continue;
166 }
167 FlowStep::AwaitInput { reason } => {
168 let last_response = session
169 .last_outcome
170 .as_ref()
171 .and_then(|value| value.get("response"))
172 .cloned();
173 session.waiting = Some(WaitState {
174 reason: reason.clone(),
175 recorded_at: SystemTime::now(),
176 });
177 let mut pending = json!({
178 "status": "pending",
179 "reason": reason,
180 });
181 if let Some(response) = last_response
182 && let Some(obj) = pending.as_object_mut()
183 {
184 obj.insert("response".into(), response);
185 }
186 session.last_outcome = Some(pending.clone());
187 break pending;
188 }
189 FlowStep::Complete { outcome } => {
190 session.cursor.position = flow.steps.len();
191 session.last_outcome = Some(json!({
192 "status": "done",
193 "result": outcome,
194 }));
195 break session
196 .last_outcome
197 .clone()
198 .expect("complete step should set outcome");
199 }
200 }
201 };
202
203 self.host
204 .state
205 .set_json(&session.key, session.state.clone())
206 .await?;
207
208 if is_new {
209 session_host.put(session).await?;
210 } else if !session_host.update_cas(session, expected_revision).await? {
211 return Err(RunnerError::Session {
212 reason: "compare-and-swap failure".into(),
213 });
214 }
215
216 Ok(outcome)
217 }
218
219 fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
220 if flow.steps.len() > self.policy.max_egress_adapters {
221 return Err(policy_violation(format!(
222 "flow has {} steps exceeding budget {}",
223 flow.steps.len(),
224 self.policy.max_egress_adapters
225 )));
226 }
227 Ok(())
228 }
229
230 async fn execute_adapter_step(
231 &self,
232 flow: &FlowDefinition,
233 session: &mut SessionSnapshot,
234 call: AdapterCall,
235 tenant: &TenantCtx,
236 ) -> GResult<Value> {
237 let adapter =
238 self.adapters
239 .get(&call.adapter)
240 .ok_or_else(|| RunnerError::AdapterMissing {
241 adapter: call.adapter.clone(),
242 })?;
243
244 let resolved_payload = resolve_adapter_payload(&call.payload, session);
245 let payload_bytes =
246 serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
247 reason: err.to_string(),
248 })?;
249
250 if payload_bytes.len() > self.policy.max_payload_bytes {
251 return Err(policy_violation(format!(
252 "payload exceeds max size {} bytes",
253 self.policy.max_payload_bytes
254 )));
255 }
256
257 let seq = session.cursor.outbox_seq;
258 let payload_hash = Self::stable_hash(seq, &payload_bytes);
259 let key = OutboxKey::new(seq, payload_hash.clone());
260
261 let adapter_id = call.adapter.clone();
262 let operation_id = call.operation.clone();
263 let span = SpanContext {
264 trace_id: tenant.trace_id.clone(),
265 span_id: Some(format!("{}:{}", flow.summary.id, seq)),
266 };
267
268 if let Some(entry) = session.outbox.get(&key) {
269 self.host
270 .telemetry
271 .emit(
272 &span,
273 &[
274 ("adapter", adapter_id.as_str()),
275 ("operation", operation_id.as_str()),
276 ("dedup", "hit"),
277 ],
278 )
279 .await?;
280 session.cursor.position += 1;
281 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
282 session.waiting = None;
283 session.last_outcome = Some(json!({
284 "status": "done",
285 "response": entry.response.clone(),
286 }));
287 return Ok(session.last_outcome.clone().unwrap());
288 }
289
290 self.host
291 .telemetry
292 .emit(
293 &span,
294 &[
295 ("adapter", adapter_id.as_str()),
296 ("operation", operation_id.as_str()),
297 ("phase", "start"),
298 ],
299 )
300 .await?;
301
302 let adapter_clone = adapter.clone();
303 let mut call_clone = call.clone();
304 call_clone.payload = resolved_payload.clone();
305 let response = retry_with_jitter(&self.policy.retry, || {
306 let adapter = adapter_clone.clone();
307 let call = call_clone.clone();
308 async move { adapter.call(&call).await }
309 })
310 .await?;
311
312 session.cursor.position += 1;
313 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
314 session.waiting = None;
315 session.outbox.insert(
316 key,
317 SessionOutboxEntry {
318 seq,
319 hash: payload_hash,
320 response: response.clone(),
321 },
322 );
323 Self::update_state_adapter(session, &call, &response);
324 session.last_outcome = Some(json!({
325 "status": "done",
326 "response": response.clone(),
327 }));
328
329 self.host
330 .telemetry
331 .emit(
332 &span,
333 &[
334 ("adapter", adapter_id.as_str()),
335 ("operation", operation_id.as_str()),
336 ("phase", "finish"),
337 ],
338 )
339 .await?;
340
341 Ok(session.last_outcome.clone().unwrap())
342 }
343
344 fn update_state_input(session: &mut SessionSnapshot, input: Value) {
345 if !matches!(session.state, Value::Object(_)) {
346 session.state = Value::Object(Map::new());
347 }
348 if let Value::Object(map) = &mut session.state {
349 map.insert("last_input".to_string(), input);
350 }
351 }
352
353 fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
354 if !matches!(session.state, Value::Object(_)) {
355 session.state = Value::Object(Map::new());
356 }
357 if let Value::Object(map) = &mut session.state {
358 map.insert(
359 "last_adapter".to_string(),
360 Value::String(call.adapter.clone()),
361 );
362 map.insert(
363 "last_operation".to_string(),
364 Value::String(call.operation.clone()),
365 );
366 map.insert("last_response".to_string(), response.clone());
367 }
368 }
369
370 fn stable_hash(seq: u64, payload: &[u8]) -> String {
371 let mut hasher = Sha256::new();
372 hasher.update(seq.to_be_bytes());
373 hasher.update(payload);
374 hex::encode(hasher.finalize())
375 }
376
377 fn generate_session_id() -> String {
378 let mut rng = rng();
379 let value: u128 = rng.random();
380 format!("sess-{value:032x}")
381 }
382}
383
384fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
385 match call_payload {
386 Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
387 .state
388 .as_object()
389 .and_then(|map| map.get("last_input"))
390 .cloned()
391 .unwrap_or(Value::Null),
392 other => other.clone(),
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399 use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
400 use crate::engine::host::{SessionHost, StateHost};
401 use crate::engine::registry::Adapter;
402 use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
403 use async_trait::async_trait;
404 use greentic_types::{EnvId, TenantId};
405 use parking_lot::Mutex;
406 use std::str::FromStr;
407
408 #[tokio::test]
409 async fn pauses_and_resumes_after_wait_step() {
410 let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
411 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
412 let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
413 let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
414 let host = Arc::new(HostBundle::new(
415 secrets,
416 telemetry,
417 Arc::clone(&session_store) as Arc<dyn SessionHost>,
418 Arc::clone(&state_store) as Arc<dyn StateHost>,
419 ));
420
421 let adapter = MockChatAdapter::default();
422 let mut adapters = AdapterRegistry::default();
423 adapters.register("mock", Box::new(adapter.clone()));
424 let policy = Policy::default();
425
426 let sm = StateMachine::new(host, adapters, policy);
427 sm.register_flow(test_flow());
428
429 let env = EnvId::from_str("local").unwrap();
430 let tenant_id = TenantId::from_str("demo").unwrap();
431 let tenant_ctx = TenantCtx::new(env, tenant_id);
432 let session_hint = Some("demo:telegram:chat:user".to_string());
433
434 let first = sm
435 .step(
436 &tenant_ctx,
437 "support.flow",
438 session_hint.clone(),
439 json!({ "text": "hi" }),
440 )
441 .await
442 .expect("first step");
443 assert_eq!(first["status"], json!("pending"));
444 assert_eq!(
445 first["response"]["messages"][0]["text"],
446 json!("Welcome to support!")
447 );
448
449 let key = SessionKey::new(&tenant_ctx, "support.flow", session_hint.clone());
450 let snapshot = session_store.get(&key).await.unwrap().unwrap();
451 assert!(snapshot.waiting.is_some());
452 assert_eq!(snapshot.cursor.position, 1);
453
454 let second = sm
455 .step(
456 &tenant_ctx,
457 "support.flow",
458 session_hint.clone(),
459 json!({ "text": "need help" }),
460 )
461 .await
462 .expect("second step");
463 assert_eq!(second["status"], json!("done"));
464 assert_eq!(
465 second["response"]["messages"][0]["text"],
466 json!("echo: need help")
467 );
468
469 let snapshot = session_store.get(&key).await.unwrap().unwrap();
470 assert!(snapshot.waiting.is_none());
471 assert_eq!(snapshot.cursor.position, 3);
472
473 let history = adapter.history();
474 assert_eq!(history.len(), 2);
475 assert_eq!(history[1]["text"], json!("need help"));
476 }
477
478 fn test_flow() -> FlowDefinition {
479 FlowDefinition::new(
480 FlowSummary {
481 id: "support.flow".into(),
482 name: "Support".into(),
483 version: "1.0.0".into(),
484 description: None,
485 },
486 json!({ "type": "object" }),
487 vec![
488 FlowStep::Adapter(AdapterCall {
489 adapter: "mock".into(),
490 operation: "send".into(),
491 payload: json!({ "text": "Welcome to support!" }),
492 }),
493 FlowStep::AwaitInput {
494 reason: "await-user".into(),
495 },
496 FlowStep::Adapter(AdapterCall {
497 adapter: "mock".into(),
498 operation: "echo".into(),
499 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
500 }),
501 ],
502 )
503 }
504
505 #[derive(Clone, Default)]
506 struct MockChatAdapter {
507 calls: Arc<Mutex<Vec<Value>>>,
508 }
509
510 impl MockChatAdapter {
511 fn history(&self) -> Vec<Value> {
512 self.calls.lock().clone()
513 }
514 }
515
516 #[async_trait]
517 impl Adapter for MockChatAdapter {
518 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
519 self.calls.lock().push(call.payload.clone());
520 match call.operation.as_str() {
521 "send" => Ok(json!({
522 "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
523 })),
524 "echo" => {
525 let text = call
526 .payload
527 .get("text")
528 .and_then(Value::as_str)
529 .unwrap_or("")
530 .to_string();
531 Ok(json!({
532 "messages": [{ "text": format!("echo: {text}") }]
533 }))
534 }
535 _ => Ok(json!({})),
536 }
537 }
538 }
539}