1use std::sync::Arc;
2
3use chrono::Utc;
4use serde_json::{json, Value};
5use uuid::Uuid;
6
7use crate::store::FlowStore;
8use crate::types::{Flow, FlowError, FlowStatus, FlowStep, FlowStepStatus, StepRuntime};
9
10#[derive(Debug, Clone)]
14pub struct CreateManagedInput {
15 pub controller_id: String,
16 pub goal: String,
17 pub owner_session_key: String,
18 pub requester_origin: String,
19 pub current_step: String,
20 pub state_json: Value,
21}
22
23#[derive(Debug, Clone)]
28pub struct StepObservation {
29 pub flow_id: Uuid,
30 pub run_id: String,
31 pub task: String,
32 pub status: FlowStepStatus,
33 pub child_session_key: Option<String>,
34 pub result_json: Option<Value>,
35}
36
37const RETRY_ATTEMPTS: u32 = 2;
41
42#[derive(Clone)]
50pub struct FlowManager {
51 store: Arc<dyn FlowStore>,
52}
53
54impl FlowManager {
55 pub fn new(store: Arc<dyn FlowStore>) -> Self {
56 Self { store }
57 }
58
59 pub async fn create_managed(&self, input: CreateManagedInput) -> Result<Flow, FlowError> {
62 let now = Utc::now();
63 let flow = Flow {
64 id: Uuid::new_v4(),
65 controller_id: input.controller_id,
66 goal: input.goal,
67 owner_session_key: input.owner_session_key,
68 requester_origin: input.requester_origin,
69 current_step: input.current_step,
70 state_json: input.state_json,
71 wait_json: None,
72 status: FlowStatus::Created,
73 cancel_requested: false,
74 revision: 0,
75 created_at: now,
76 updated_at: now,
77 };
78 self.store.insert(&flow).await?;
79 self.store
80 .append_event(
81 flow.id,
82 "created",
83 json!({
84 "controller_id": flow.controller_id,
85 "goal": flow.goal,
86 "current_step": flow.current_step,
87 }),
88 )
89 .await?;
90 Ok(flow)
91 }
92
93 pub async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError> {
94 self.store.get(id).await
95 }
96
97 pub async fn list_by_owner(&self, owner: &str) -> Result<Vec<Flow>, FlowError> {
98 self.store.list_by_owner(owner).await
99 }
100
101 pub async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError> {
102 self.store.list_by_status(status).await
103 }
104
105 pub async fn start_running(&self, id: Uuid) -> Result<Flow, FlowError> {
107 self.with_retry(id, "started", json!({}), |f| {
108 f.transition_to(FlowStatus::Running)
109 })
110 .await
111 }
112
113 pub async fn set_waiting(&self, id: Uuid, wait_json: Value) -> Result<Flow, FlowError> {
117 self.with_retry(
118 id,
119 "waiting",
120 json!({ "wait": wait_json.clone() }),
121 move |f| {
122 f.transition_to(FlowStatus::Waiting)?;
123 f.wait_json = Some(wait_json.clone());
124 Ok(())
125 },
126 )
127 .await
128 }
129
130 pub async fn resume(&self, id: Uuid, state_patch: Option<Value>) -> Result<Flow, FlowError> {
133 let payload = json!({ "state_patch": state_patch.clone() });
134 self.with_retry(id, "resumed", payload, move |f| {
135 f.transition_to(FlowStatus::Running)?;
136 f.wait_json = None;
137 if let Some(patch) = &state_patch {
138 merge_state(&mut f.state_json, patch.clone());
139 }
140 Ok(())
141 })
142 .await
143 }
144
145 pub async fn finish(&self, id: Uuid, final_state: Option<Value>) -> Result<Flow, FlowError> {
147 let payload = json!({ "final_state": final_state.clone() });
148 self.with_retry(id, "finished", payload, move |f| {
149 if let Some(patch) = &final_state {
150 merge_state(&mut f.state_json, patch.clone());
151 }
152 f.transition_to(FlowStatus::Finished)
153 })
154 .await
155 }
156
157 pub async fn fail(&self, id: Uuid, reason: impl Into<String>) -> Result<Flow, FlowError> {
160 let reason = reason.into();
161 let payload = json!({ "reason": reason });
162 self.with_retry(id, "failed", payload.clone(), move |f| {
163 merge_state(
164 &mut f.state_json,
165 json!({ "failure": { "reason": reason.clone(), "at": Utc::now().to_rfc3339() } }),
166 );
167 f.transition_to(FlowStatus::Failed)
168 })
169 .await
170 }
171
172 pub async fn request_cancel(&self, id: Uuid) -> Result<Flow, FlowError> {
175 self.with_retry(id, "cancel_requested", json!({}), |f| {
176 f.request_cancel();
177 Ok(())
178 })
179 .await
180 }
181
182 pub async fn cancel(&self, id: Uuid) -> Result<Flow, FlowError> {
184 self.with_retry(id, "cancelled", json!({}), |f| {
185 f.transition_to(FlowStatus::Cancelled)
186 })
187 .await
188 }
189
190 pub async fn update_state(
193 &self,
194 id: Uuid,
195 patch: Value,
196 next_step: Option<String>,
197 ) -> Result<Flow, FlowError> {
198 let payload = json!({ "patch": patch.clone(), "next_step": next_step.clone() });
199 self.with_retry(id, "state_updated", payload, move |f| {
200 if f.status.is_terminal() {
202 return Err(FlowError::AlreadyTerminal {
203 id: f.id,
204 status: f.status,
205 });
206 }
207 if f.cancel_requested {
208 return Err(FlowError::CancelPending { id: f.id });
209 }
210 merge_state(&mut f.state_json, patch.clone());
211 if let Some(step) = &next_step {
212 f.current_step = step.clone();
213 }
214 f.updated_at = Utc::now();
215 Ok(())
216 })
217 .await
218 }
219
220 pub async fn create_mirrored(&self, input: CreateManagedInput) -> Result<Flow, FlowError> {
224 let created = self.create_managed(input).await?;
225 let running = self.start_running(created.id).await?;
226 Ok(running)
227 }
228
229 pub async fn record_step_observation(
233 &self,
234 observation: StepObservation,
235 ) -> Result<FlowStep, FlowError> {
236 let _flow = self
239 .store
240 .get(observation.flow_id)
241 .await?
242 .ok_or(FlowError::NotFound(observation.flow_id))?;
243
244 let existing = self
245 .store
246 .find_step_by_run_id(observation.flow_id, &observation.run_id)
247 .await?;
248 let now = chrono::Utc::now();
249 let step = match existing {
250 Some(mut s) => {
251 s.task = observation.task.clone();
252 s.status = observation.status;
253 s.result_json = observation.result_json.clone();
254 s.child_session_key = observation
255 .child_session_key
256 .clone()
257 .or(s.child_session_key);
258 s.updated_at = now;
259 self.store.update_step(&s).await?
260 }
261 None => {
262 let fresh = FlowStep {
263 id: Uuid::new_v4(),
264 flow_id: observation.flow_id,
265 runtime: StepRuntime::Mirrored,
266 child_session_key: observation.child_session_key.clone(),
267 run_id: observation.run_id.clone(),
268 task: observation.task.clone(),
269 status: observation.status,
270 result_json: observation.result_json.clone(),
271 created_at: now,
272 updated_at: now,
273 };
274 self.store.insert_step(&fresh).await?;
275 fresh
276 }
277 };
278
279 self.store
281 .append_event(
282 observation.flow_id,
283 "step_observed",
284 json!({
285 "run_id": observation.run_id,
286 "status": step.status.as_str(),
287 "runtime": step.runtime.as_str(),
288 }),
289 )
290 .await?;
291 Ok(step)
292 }
293
294 pub async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError> {
296 self.store.list_steps(flow_id).await
297 }
298
299 async fn with_retry<F>(
306 &self,
307 id: Uuid,
308 event_kind: &str,
309 event_payload: Value,
310 mutate: F,
311 ) -> Result<Flow, FlowError>
312 where
313 F: Fn(&mut Flow) -> Result<(), FlowError> + Send + Sync,
314 {
315 let mut last_err: Option<FlowError> = None;
316 for _ in 0..RETRY_ATTEMPTS {
317 let mut current = self.store.get(id).await?.ok_or(FlowError::NotFound(id))?;
318 mutate(&mut current)?;
319 match self
320 .store
321 .update_and_append(¤t, event_kind, event_payload.clone())
322 .await
323 {
324 Ok((updated, _event)) => return Ok(updated),
325 Err(FlowError::RevisionMismatch { .. }) => {
326 last_err = Some(FlowError::RevisionMismatch {
327 expected: current.revision,
328 actual: -1,
329 });
330 continue;
331 }
332 Err(e) => return Err(e),
333 }
334 }
335 Err(last_err.unwrap_or_else(|| FlowError::InvalidData("retry exhausted".into())))
336 }
337}
338
339fn merge_state(target: &mut Value, patch: Value) {
343 match (target, patch) {
344 (Value::Object(t), Value::Object(p)) => {
345 for (k, v) in p {
346 t.insert(k, v);
347 }
348 }
349 (target_slot, other) => {
350 *target_slot = other;
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358 use crate::store::SqliteFlowStore;
359 use serde_json::json;
360
361 async fn manager() -> FlowManager {
362 let store = SqliteFlowStore::open(":memory:").await.unwrap();
363 FlowManager::new(Arc::new(store))
364 }
365
366 fn input() -> CreateManagedInput {
367 CreateManagedInput {
368 controller_id: "kate/inbox".into(),
369 goal: "triage inbox".into(),
370 owner_session_key: "agent:kate:session:abc".into(),
371 requester_origin: "user-1".into(),
372 current_step: "classify".into(),
373 state_json: json!({"messages": 10, "processed": 0}),
374 }
375 }
376
377 #[tokio::test]
378 async fn full_happy_path_create_run_wait_resume_finish() {
379 let m = manager().await;
380 let f = m.create_managed(input()).await.unwrap();
381 assert_eq!(f.status, FlowStatus::Created);
382 assert_eq!(f.revision, 0);
383
384 let f = m.start_running(f.id).await.unwrap();
385 assert_eq!(f.status, FlowStatus::Running);
386 assert_eq!(f.revision, 1);
387
388 let f = m
389 .set_waiting(f.id, json!({"kind": "timer", "at": "2026-04-23T15:00:00Z"}))
390 .await
391 .unwrap();
392 assert_eq!(f.status, FlowStatus::Waiting);
393 assert!(f.wait_json.is_some());
394 assert_eq!(f.revision, 2);
395
396 let f = m.resume(f.id, Some(json!({"processed": 5}))).await.unwrap();
397 assert_eq!(f.status, FlowStatus::Running);
398 assert!(f.wait_json.is_none());
399 assert_eq!(f.state_json["processed"], 5);
400 assert_eq!(f.state_json["messages"], 10);
401
402 let f = m
403 .finish(f.id, Some(json!({"summary": "10 done"})))
404 .await
405 .unwrap();
406 assert_eq!(f.status, FlowStatus::Finished);
407 assert_eq!(f.state_json["summary"], "10 done");
408 }
409
410 #[tokio::test]
411 async fn fail_records_reason_in_state_and_event() {
412 let m = manager().await;
413 let f = m.create_managed(input()).await.unwrap();
414 let f = m.start_running(f.id).await.unwrap();
415 let f = m.fail(f.id, "downstream 503").await.unwrap();
416 assert_eq!(f.status, FlowStatus::Failed);
417 assert_eq!(f.state_json["failure"]["reason"], "downstream 503");
418
419 let store = SqliteFlowStore::open(":memory:").await.unwrap(); let _ = store; }
423
424 #[tokio::test]
425 async fn cancel_from_running_succeeds() {
426 let m = manager().await;
427 let f = m.create_managed(input()).await.unwrap();
428 let f = m.start_running(f.id).await.unwrap();
429 let f = m.cancel(f.id).await.unwrap();
430 assert_eq!(f.status, FlowStatus::Cancelled);
431 }
432
433 #[tokio::test]
434 async fn request_cancel_blocks_finish() {
435 let m = manager().await;
436 let f = m.create_managed(input()).await.unwrap();
437 let f = m.start_running(f.id).await.unwrap();
438 let f = m.request_cancel(f.id).await.unwrap();
439 assert!(f.cancel_requested);
440 assert_eq!(f.status, FlowStatus::Running);
441
442 let err = m.finish(f.id, None).await.expect_err("blocked");
443 assert!(matches!(err, FlowError::CancelPending { .. }));
444
445 let f = m.cancel(f.id).await.unwrap();
447 assert_eq!(f.status, FlowStatus::Cancelled);
448 }
449
450 #[tokio::test]
451 async fn update_state_preserves_status_and_merges_shallow() {
452 let m = manager().await;
453 let f = m.create_managed(input()).await.unwrap();
454 let f = m.start_running(f.id).await.unwrap();
455
456 let f = m
457 .update_state(
458 f.id,
459 json!({"processed": 3, "errors": []}),
460 Some("fetch".into()),
461 )
462 .await
463 .unwrap();
464 assert_eq!(f.status, FlowStatus::Running);
465 assert_eq!(f.current_step, "fetch");
466 assert_eq!(f.state_json["processed"], 3);
467 assert_eq!(f.state_json["messages"], 10, "untouched key preserved");
468 assert!(f.state_json["errors"].is_array());
469 }
470
471 #[tokio::test]
472 async fn update_state_rejected_when_cancel_pending() {
473 let m = manager().await;
474 let f = m.create_managed(input()).await.unwrap();
475 let f = m.start_running(f.id).await.unwrap();
476 m.request_cancel(f.id).await.unwrap();
477 let err = m
478 .update_state(f.id, json!({"x": 1}), None)
479 .await
480 .expect_err("blocked");
481 assert!(matches!(err, FlowError::CancelPending { .. }));
482 }
483
484 #[tokio::test]
485 async fn create_appends_audit_event() {
486 let store = Arc::new(SqliteFlowStore::open(":memory:").await.unwrap());
487 let m = FlowManager::new(store.clone());
488 let f = m.create_managed(input()).await.unwrap();
489 let events = store.list_events(f.id, 10).await.unwrap();
490 assert_eq!(events.len(), 1);
491 assert_eq!(events[0].kind, "created");
492 }
493
494 #[tokio::test]
495 async fn create_mirrored_starts_in_running() {
496 let m = manager().await;
497 let f = m.create_mirrored(input()).await.unwrap();
498 assert_eq!(f.status, FlowStatus::Running);
499 }
500
501 #[tokio::test]
502 async fn record_step_observation_inserts_then_updates() {
503 let m = manager().await;
504 let f = m.create_mirrored(input()).await.unwrap();
505
506 let s1 = m
508 .record_step_observation(StepObservation {
509 flow_id: f.id,
510 run_id: "cron-42".into(),
511 task: "classify".into(),
512 status: FlowStepStatus::Running,
513 child_session_key: Some("cron:session".into()),
514 result_json: None,
515 })
516 .await
517 .unwrap();
518 assert_eq!(s1.runtime, StepRuntime::Mirrored);
519 assert_eq!(s1.status, FlowStepStatus::Running);
520
521 let s2 = m
524 .record_step_observation(StepObservation {
525 flow_id: f.id,
526 run_id: "cron-42".into(),
527 task: "classify".into(),
528 status: FlowStepStatus::Succeeded,
529 child_session_key: None,
530 result_json: Some(json!({"classified": 10})),
531 })
532 .await
533 .unwrap();
534 assert_eq!(s1.id, s2.id, "same step row should be reused");
535 assert_eq!(s2.status, FlowStepStatus::Succeeded);
536 assert_eq!(s2.result_json.unwrap()["classified"], 10);
537 assert_eq!(s2.child_session_key.as_deref(), Some("cron:session"));
539
540 let steps = m.list_steps(f.id).await.unwrap();
542 assert_eq!(steps.len(), 1);
543 }
544
545 #[tokio::test]
546 async fn record_step_on_unknown_flow_errors() {
547 let m = manager().await;
548 let err = m
549 .record_step_observation(StepObservation {
550 flow_id: Uuid::new_v4(),
551 run_id: "r".into(),
552 task: "t".into(),
553 status: FlowStepStatus::Pending,
554 child_session_key: None,
555 result_json: None,
556 })
557 .await
558 .expect_err("err");
559 assert!(matches!(err, FlowError::NotFound(_)));
560 }
561
562 #[tokio::test]
563 async fn list_steps_returns_per_flow() {
564 let m = manager().await;
565 let f = m.create_mirrored(input()).await.unwrap();
566 for i in 0..3 {
567 m.record_step_observation(StepObservation {
568 flow_id: f.id,
569 run_id: format!("run-{i}"),
570 task: format!("task-{i}"),
571 status: FlowStepStatus::Pending,
572 child_session_key: None,
573 result_json: None,
574 })
575 .await
576 .unwrap();
577 }
578 let steps = m.list_steps(f.id).await.unwrap();
579 assert_eq!(steps.len(), 3);
580 }
581
582 #[tokio::test]
583 async fn double_finish_returns_already_terminal() {
584 let m = manager().await;
585 let f = m.create_managed(input()).await.unwrap();
586 let f = m.start_running(f.id).await.unwrap();
587 let _ = m.finish(f.id, None).await.unwrap();
588 let err = m.finish(f.id, None).await.expect_err("terminal");
589 assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
590 }
591}