1use super::api::{FlowSchema, FlowSummary};
2use super::error::{GResult, RunnerError};
3use super::host::{
4 HostBundle, OutboxKey, SessionKey, SessionOutboxEntry, SessionSnapshot, SpanContext, WaitState,
5};
6use super::policy::retry_with_jitter;
7use super::policy::{Policy, policy_violation};
8use super::registry::{AdapterCall, AdapterRegistry};
9use greentic_types::TenantCtx;
10use parking_lot::RwLock;
11use rand::{Rng, rng};
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value, json};
14use sha2::{Digest, Sha256};
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::SystemTime;
18
19const PROVIDER_ID: &str = "greentic-runner";
20pub const PAYLOAD_FROM_LAST_INPUT: &str = "$ingress";
21
22#[derive(Clone, Debug, PartialEq, Eq, Hash)]
23struct FlowKey {
24 pack_id: String,
25 flow_id: String,
26}
27
28impl FlowKey {
29 fn new(pack_id: &str, flow_id: &str) -> Self {
30 Self {
31 pack_id: pack_id.to_string(),
32 flow_id: flow_id.to_string(),
33 }
34 }
35}
36
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct FlowDefinition {
39 pub summary: FlowSummary,
40 pub schema: Value,
41 pub steps: Vec<FlowStep>,
42}
43
44impl FlowDefinition {
45 pub fn new(summary: FlowSummary, schema: Value, steps: Vec<FlowStep>) -> Self {
46 Self {
47 summary,
48 schema,
49 steps,
50 }
51 }
52}
53
54#[derive(Clone, Debug, Serialize, Deserialize)]
55pub enum FlowStep {
56 Adapter(AdapterCall),
57 AwaitInput { reason: String },
58 Complete { outcome: Value },
59}
60
61pub struct StateMachine {
62 host: Arc<HostBundle>,
63 adapters: AdapterRegistry,
64 policy: Policy,
65 flows: Arc<RwLock<HashMap<FlowKey, FlowDefinition>>>,
66}
67
68impl StateMachine {
69 pub fn new(host: Arc<HostBundle>, adapters: AdapterRegistry, policy: Policy) -> Self {
70 Self {
71 host,
72 adapters,
73 policy,
74 flows: Arc::new(RwLock::new(HashMap::new())),
75 }
76 }
77
78 pub fn register_flow(&self, definition: FlowDefinition) {
79 let mut guard = self.flows.write();
80 guard.insert(
81 FlowKey::new(&definition.summary.pack_id, &definition.summary.id),
82 definition,
83 );
84 }
85
86 pub fn list_flows(&self) -> Vec<FlowSummary> {
87 let guard = self.flows.read();
88 guard.values().map(|flow| flow.summary.clone()).collect()
89 }
90
91 pub fn get_flow_schema(&self, pack_id: &str, flow_id: &str) -> GResult<FlowSchema> {
92 let guard = self.flows.read();
93 guard
94 .get(&FlowKey::new(pack_id, flow_id))
95 .map(|flow| FlowSchema {
96 pack_id: pack_id.to_string(),
97 id: flow_id.to_string(),
98 schema_json: flow.schema.clone(),
99 })
100 .ok_or_else(|| RunnerError::FlowNotFound {
101 flow_id: flow_id.to_string(),
102 })
103 }
104
105 pub async fn step(
106 &self,
107 tenant: &TenantCtx,
108 pack_id: &str,
109 flow_id: &str,
110 session_hint: Option<String>,
111 input: Value,
112 ) -> GResult<Value> {
113 let mut telemetry_ctx = tenant
114 .clone()
115 .with_provider(PROVIDER_ID.to_string())
116 .with_flow(flow_id.to_string());
117 if let Some(hint) = session_hint.as_ref() {
118 telemetry_ctx = telemetry_ctx.with_session(hint.clone());
119 }
120 greentic_types::telemetry::set_current_tenant_ctx(&telemetry_ctx);
121
122 let flow = {
123 let guard = self.flows.read();
124 guard
125 .get(&FlowKey::new(pack_id, flow_id))
126 .cloned()
127 .ok_or_else(|| RunnerError::FlowNotFound {
128 flow_id: flow_id.to_string(),
129 })?
130 };
131
132 self.ensure_policy_budget(&flow)?;
133
134 let key = SessionKey::new(tenant, pack_id, flow_id, session_hint.clone());
135 let session_host = &self.host.session;
136 let mut session = match session_host.get(&key).await? {
137 Some(snapshot) => snapshot,
138 None => {
139 let session_id = key
140 .stable_session_id()
141 .unwrap_or_else(Self::generate_session_id);
142 SessionSnapshot::new(key.clone(), session_id)
143 }
144 };
145 let is_new = session.revision == 0 && session.outbox.is_empty();
146 let expected_revision = session.revision;
147
148 Self::update_state_input(&mut session, input.clone());
149
150 if session.waiting.is_some() {
151 if session.cursor.position < flow.steps.len()
152 && matches!(
153 flow.steps.get(session.cursor.position),
154 Some(FlowStep::AwaitInput { .. })
155 )
156 {
157 session.cursor.position = session.cursor.position.saturating_add(1);
158 }
159 session.waiting = None;
160 }
161
162 let outcome = loop {
163 if session.cursor.position >= flow.steps.len() {
164 let outcome = session
165 .last_outcome
166 .clone()
167 .unwrap_or_else(|| json!({"status": "done"}));
168 break outcome;
169 }
170
171 let step = flow
172 .steps
173 .get(session.cursor.position)
174 .cloned()
175 .ok_or_else(|| RunnerError::FlowNotFound {
176 flow_id: flow.summary.id.clone(),
177 })?;
178
179 match step {
180 FlowStep::Adapter(call) => {
181 let outcome = self
182 .execute_adapter_step(&flow, &mut session, call, tenant)
183 .await?;
184 session.last_outcome = Some(outcome.clone());
185 continue;
186 }
187 FlowStep::AwaitInput { reason } => {
188 let last_response = session
189 .last_outcome
190 .as_ref()
191 .and_then(|value| value.get("response"))
192 .cloned();
193 session.waiting = Some(WaitState {
194 reason: reason.clone(),
195 recorded_at: SystemTime::now(),
196 });
197 let mut pending = json!({
198 "status": "pending",
199 "reason": reason,
200 });
201 if let Some(response) = last_response
202 && let Some(obj) = pending.as_object_mut()
203 {
204 obj.insert("response".into(), response);
205 }
206 session.last_outcome = Some(pending.clone());
207 break pending;
208 }
209 FlowStep::Complete { outcome } => {
210 session.cursor.position = flow.steps.len();
211 session.last_outcome = Some(json!({
212 "status": "done",
213 "result": outcome,
214 }));
215 break session
216 .last_outcome
217 .clone()
218 .expect("complete step should set outcome");
219 }
220 }
221 };
222
223 self.host
224 .state
225 .set_json(&session.key, session.state.clone())
226 .await?;
227
228 if is_new {
229 session_host.put(session).await?;
230 } else if !session_host.update_cas(session, expected_revision).await? {
231 return Err(RunnerError::Session {
232 reason: "compare-and-swap failure".into(),
233 });
234 }
235
236 Ok(outcome)
237 }
238
239 fn ensure_policy_budget(&self, flow: &FlowDefinition) -> GResult<()> {
240 if flow.steps.len() > self.policy.max_egress_adapters {
241 return Err(policy_violation(format!(
242 "flow has {} steps exceeding budget {}",
243 flow.steps.len(),
244 self.policy.max_egress_adapters
245 )));
246 }
247 Ok(())
248 }
249
250 async fn execute_adapter_step(
251 &self,
252 flow: &FlowDefinition,
253 session: &mut SessionSnapshot,
254 call: AdapterCall,
255 tenant: &TenantCtx,
256 ) -> GResult<Value> {
257 let adapter =
258 self.adapters
259 .get(&call.adapter)
260 .ok_or_else(|| RunnerError::AdapterMissing {
261 adapter: call.adapter.clone(),
262 })?;
263
264 let resolved_payload = resolve_adapter_payload(&call.payload, session);
265 let payload_bytes =
266 serde_json::to_vec(&resolved_payload).map_err(|err| RunnerError::Serialization {
267 reason: err.to_string(),
268 })?;
269
270 if payload_bytes.len() > self.policy.max_payload_bytes {
271 return Err(policy_violation(format!(
272 "payload exceeds max size {} bytes",
273 self.policy.max_payload_bytes
274 )));
275 }
276
277 let seq = session.cursor.outbox_seq;
278 let payload_hash = Self::stable_hash(seq, &payload_bytes);
279 let key = OutboxKey::new(seq, payload_hash.clone());
280
281 let adapter_id = call.adapter.clone();
282 let operation_id = call.operation.clone();
283 let span = SpanContext {
284 trace_id: tenant.trace_id.clone(),
285 span_id: Some(format!("{}:{}", flow.summary.id, seq)),
286 };
287
288 if let Some(entry) = session.outbox.get(&key) {
289 self.host
290 .telemetry
291 .emit(
292 &span,
293 &[
294 ("adapter", adapter_id.as_str()),
295 ("operation", operation_id.as_str()),
296 ("dedup", "hit"),
297 ],
298 )
299 .await?;
300 session.cursor.position += 1;
301 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
302 session.waiting = None;
303 session.last_outcome = Some(json!({
304 "status": "done",
305 "response": entry.response.clone(),
306 }));
307 return Ok(session.last_outcome.clone().unwrap());
308 }
309
310 self.host
311 .telemetry
312 .emit(
313 &span,
314 &[
315 ("adapter", adapter_id.as_str()),
316 ("operation", operation_id.as_str()),
317 ("phase", "start"),
318 ],
319 )
320 .await?;
321
322 let adapter_clone = adapter.clone();
323 let mut call_clone = call.clone();
324 call_clone.payload = resolved_payload.clone();
325 let response = retry_with_jitter(&self.policy.retry, || {
326 let adapter = adapter_clone.clone();
327 let call = call_clone.clone();
328 async move { adapter.call(&call).await }
329 })
330 .await?;
331
332 session.cursor.position += 1;
333 session.cursor.outbox_seq = session.cursor.outbox_seq.saturating_add(1);
334 session.waiting = None;
335 session.outbox.insert(
336 key,
337 SessionOutboxEntry {
338 seq,
339 hash: payload_hash,
340 response: response.clone(),
341 },
342 );
343 Self::update_state_adapter(session, &call, &response);
344 session.last_outcome = Some(json!({
345 "status": "done",
346 "response": response.clone(),
347 }));
348
349 self.host
350 .telemetry
351 .emit(
352 &span,
353 &[
354 ("adapter", adapter_id.as_str()),
355 ("operation", operation_id.as_str()),
356 ("phase", "finish"),
357 ],
358 )
359 .await?;
360
361 Ok(session.last_outcome.clone().unwrap())
362 }
363
364 fn update_state_input(session: &mut SessionSnapshot, input: Value) {
365 if !matches!(session.state, Value::Object(_)) {
366 session.state = Value::Object(Map::new());
367 }
368 if let Value::Object(map) = &mut session.state {
369 map.insert("last_input".to_string(), input);
370 }
371 }
372
373 fn update_state_adapter(session: &mut SessionSnapshot, call: &AdapterCall, response: &Value) {
374 if !matches!(session.state, Value::Object(_)) {
375 session.state = Value::Object(Map::new());
376 }
377 if let Value::Object(map) = &mut session.state {
378 map.insert(
379 "last_adapter".to_string(),
380 Value::String(call.adapter.clone()),
381 );
382 map.insert(
383 "last_operation".to_string(),
384 Value::String(call.operation.clone()),
385 );
386 map.insert("last_response".to_string(), response.clone());
387 }
388 }
389
390 fn stable_hash(seq: u64, payload: &[u8]) -> String {
391 let mut hasher = Sha256::new();
392 hasher.update(seq.to_be_bytes());
393 hasher.update(payload);
394 hex::encode(hasher.finalize())
395 }
396
397 fn generate_session_id() -> String {
398 let mut rng = rng();
399 let value: u128 = rng.random();
400 format!("sess-{value:032x}")
401 }
402}
403
404fn resolve_adapter_payload(call_payload: &Value, session: &SessionSnapshot) -> Value {
405 match call_payload {
406 Value::String(token) if token == PAYLOAD_FROM_LAST_INPUT => session
407 .state
408 .as_object()
409 .and_then(|map| map.get("last_input"))
410 .cloned()
411 .unwrap_or(Value::Null),
412 other => other.clone(),
413 }
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419 use crate::engine::glue::{FnSecretsHost, FnTelemetryHost};
420 use crate::engine::host::{SessionHost, StateHost};
421 use crate::engine::registry::Adapter;
422 use crate::engine::shims::{InMemorySessionHost, InMemoryStateHost};
423 use async_trait::async_trait;
424 use greentic_types::{EnvId, TenantId};
425 use parking_lot::Mutex;
426 use std::str::FromStr;
427
428 #[tokio::test]
429 async fn pauses_and_resumes_after_wait_step() {
430 let secrets = Arc::new(FnSecretsHost::new(|_| Ok(String::new())));
431 let telemetry = Arc::new(FnTelemetryHost::new(|_, _| Ok(())));
432 let session_store: Arc<InMemorySessionHost> = Arc::new(InMemorySessionHost::new());
433 let state_store: Arc<InMemoryStateHost> = Arc::new(InMemoryStateHost::new());
434 let host = Arc::new(HostBundle::new(
435 secrets,
436 telemetry,
437 Arc::clone(&session_store) as Arc<dyn SessionHost>,
438 Arc::clone(&state_store) as Arc<dyn StateHost>,
439 ));
440
441 let adapter = MockChatAdapter::default();
442 let mut adapters = AdapterRegistry::default();
443 adapters.register("mock", Box::new(adapter.clone()));
444 let policy = Policy::default();
445
446 let sm = StateMachine::new(host, adapters, policy);
447 sm.register_flow(test_flow());
448
449 let env = EnvId::from_str("local").unwrap();
450 let tenant_id = TenantId::from_str("demo").unwrap();
451 let tenant_ctx = TenantCtx::new(env, tenant_id);
452 let session_hint = Some("demo:telegram:chat:user".to_string());
453
454 let first = sm
455 .step(
456 &tenant_ctx,
457 "test-pack",
458 "support.flow",
459 session_hint.clone(),
460 json!({ "text": "hi" }),
461 )
462 .await
463 .expect("first step");
464 assert_eq!(first["status"], json!("pending"));
465 assert_eq!(
466 first["response"]["messages"][0]["text"],
467 json!("Welcome to support!")
468 );
469
470 let key = SessionKey::new(
471 &tenant_ctx,
472 "test-pack",
473 "support.flow",
474 session_hint.clone(),
475 );
476 let snapshot = session_store.get(&key).await.unwrap().unwrap();
477 assert!(snapshot.waiting.is_some());
478 assert_eq!(snapshot.cursor.position, 1);
479
480 let second = sm
481 .step(
482 &tenant_ctx,
483 "test-pack",
484 "support.flow",
485 session_hint.clone(),
486 json!({ "text": "need help" }),
487 )
488 .await
489 .expect("second step");
490 assert_eq!(second["status"], json!("done"));
491 assert_eq!(
492 second["response"]["messages"][0]["text"],
493 json!("echo: need help")
494 );
495
496 let snapshot = session_store.get(&key).await.unwrap().unwrap();
497 assert!(snapshot.waiting.is_none());
498 assert_eq!(snapshot.cursor.position, 3);
499
500 let history = adapter.history();
501 assert_eq!(history.len(), 2);
502 assert_eq!(history[1]["text"], json!("need help"));
503 }
504
505 fn test_flow() -> FlowDefinition {
506 FlowDefinition::new(
507 FlowSummary {
508 pack_id: "test-pack".into(),
509 id: "support.flow".into(),
510 name: "Support".into(),
511 version: "1.0.0".into(),
512 description: None,
513 },
514 json!({ "type": "object" }),
515 vec![
516 FlowStep::Adapter(AdapterCall {
517 adapter: "mock".into(),
518 operation: "send".into(),
519 payload: json!({ "text": "Welcome to support!" }),
520 }),
521 FlowStep::AwaitInput {
522 reason: "await-user".into(),
523 },
524 FlowStep::Adapter(AdapterCall {
525 adapter: "mock".into(),
526 operation: "echo".into(),
527 payload: Value::String(PAYLOAD_FROM_LAST_INPUT.into()),
528 }),
529 ],
530 )
531 }
532
533 #[derive(Clone, Default)]
534 struct MockChatAdapter {
535 calls: Arc<Mutex<Vec<Value>>>,
536 }
537
538 impl MockChatAdapter {
539 fn history(&self) -> Vec<Value> {
540 self.calls.lock().clone()
541 }
542 }
543
544 #[async_trait]
545 impl Adapter for MockChatAdapter {
546 async fn call(&self, call: &AdapterCall) -> GResult<Value> {
547 self.calls.lock().push(call.payload.clone());
548 match call.operation.as_str() {
549 "send" => Ok(json!({
550 "messages": [{ "text": call.payload.get("text").and_then(Value::as_str).unwrap_or("hello").to_string() }]
551 })),
552 "echo" => {
553 let text = call
554 .payload
555 .get("text")
556 .and_then(Value::as_str)
557 .unwrap_or("")
558 .to_string();
559 Ok(json!({
560 "messages": [{ "text": format!("echo: {text}") }]
561 }))
562 }
563 _ => Ok(json!({})),
564 }
565 }
566 }
567}