Skip to main content

nexo_taskflow/
manager.rs

1use std::sync::Arc;
2
3use chrono::Utc;
4use serde_json::{json, Value};
5use uuid::Uuid;
6
7use crate::store::FlowStore;
8use crate::types::{Flow, FlowError, FlowStatus, FlowStep, FlowStepStatus, StepRuntime};
9
10/// Input record for `FlowManager::create_managed`. The runtime owns
11/// `id`/`revision`/timestamps; everything else is supplied by the caller
12/// (controller, agent tool, or skill author).
13#[derive(Debug, Clone)]
14pub struct CreateManagedInput {
15    pub controller_id: String,
16    pub goal: String,
17    pub owner_session_key: String,
18    pub requester_origin: String,
19    pub current_step: String,
20    pub state_json: Value,
21}
22
23/// Record step observation from an externally-driven task. Used in mirrored
24/// mode: the host sees a task event on NATS (or another bus), translates it
25/// to a `StepObservation`, and feeds it to the manager so the flow's
26/// `flow_steps` view stays in sync without owning task creation.
27#[derive(Debug, Clone)]
28pub struct StepObservation {
29    pub flow_id: Uuid,
30    pub run_id: String,
31    pub task: String,
32    pub status: FlowStepStatus,
33    pub child_session_key: Option<String>,
34    pub result_json: Option<Value>,
35}
36
37/// Maximum number of times a mutating call retries after a `RevisionMismatch`.
38/// Two attempts gives one optimistic try plus one re-fetch+retry — enough to
39/// cover heartbeat-vs-tool races without livelocking under heavy contention.
40const RETRY_ATTEMPTS: u32 = 2;
41
42/// High-level operational surface for managed flows.
43///
44/// Every mutation method follows the same template: read current flow,
45/// validate the requested state machine transition, persist via
46/// `update_with_revision`, append an audit event. `RevisionMismatch` errors
47/// trigger a single re-fetch + retry; persistent contention surfaces to the
48/// caller.
49#[derive(Clone)]
50pub struct FlowManager {
51    store: Arc<dyn FlowStore>,
52}
53
54impl FlowManager {
55    pub fn new(store: Arc<dyn FlowStore>) -> Self {
56        Self { store }
57    }
58
59    /// Insert a fresh flow in `Created` status. Caller receives the canonical
60    /// record with assigned `id`/timestamps.
61    pub async fn create_managed(&self, input: CreateManagedInput) -> Result<Flow, FlowError> {
62        let now = Utc::now();
63        let flow = Flow {
64            id: Uuid::new_v4(),
65            controller_id: input.controller_id,
66            goal: input.goal,
67            owner_session_key: input.owner_session_key,
68            requester_origin: input.requester_origin,
69            current_step: input.current_step,
70            state_json: input.state_json,
71            wait_json: None,
72            status: FlowStatus::Created,
73            cancel_requested: false,
74            revision: 0,
75            created_at: now,
76            updated_at: now,
77        };
78        self.store.insert(&flow).await?;
79        self.store
80            .append_event(
81                flow.id,
82                "created",
83                json!({
84                    "controller_id": flow.controller_id,
85                    "goal": flow.goal,
86                    "current_step": flow.current_step,
87                }),
88            )
89            .await?;
90        Ok(flow)
91    }
92
93    pub async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError> {
94        self.store.get(id).await
95    }
96
97    pub async fn list_by_owner(&self, owner: &str) -> Result<Vec<Flow>, FlowError> {
98        self.store.list_by_owner(owner).await
99    }
100
101    pub async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError> {
102        self.store.list_by_status(status).await
103    }
104
105    /// Created → Running.
106    pub async fn start_running(&self, id: Uuid) -> Result<Flow, FlowError> {
107        self.with_retry(id, "started", json!({}), |f| {
108            f.transition_to(FlowStatus::Running)
109        })
110        .await
111    }
112
113    /// Running → Waiting. `wait_json` describes what the flow is blocked on
114    /// (timer deadline, NATS subject, manual signal). Inspected by the
115    /// wait/resume engine in 14.4.
116    pub async fn set_waiting(&self, id: Uuid, wait_json: Value) -> Result<Flow, FlowError> {
117        self.with_retry(
118            id,
119            "waiting",
120            json!({ "wait": wait_json.clone() }),
121            move |f| {
122                f.transition_to(FlowStatus::Waiting)?;
123                f.wait_json = Some(wait_json.clone());
124                Ok(())
125            },
126        )
127        .await
128    }
129
130    /// Waiting → Running. Clears `wait_json`. Optional `state_patch` merges
131    /// shallowly into `state_json` so callers can record what unblocked them.
132    pub async fn resume(&self, id: Uuid, state_patch: Option<Value>) -> Result<Flow, FlowError> {
133        let payload = json!({ "state_patch": state_patch.clone() });
134        self.with_retry(id, "resumed", payload, move |f| {
135            f.transition_to(FlowStatus::Running)?;
136            f.wait_json = None;
137            if let Some(patch) = &state_patch {
138                merge_state(&mut f.state_json, patch.clone());
139            }
140            Ok(())
141        })
142        .await
143    }
144
145    /// Running → Finished. Optional final state patch is merged before transition.
146    pub async fn finish(&self, id: Uuid, final_state: Option<Value>) -> Result<Flow, FlowError> {
147        let payload = json!({ "final_state": final_state.clone() });
148        self.with_retry(id, "finished", payload, move |f| {
149            if let Some(patch) = &final_state {
150                merge_state(&mut f.state_json, patch.clone());
151            }
152            f.transition_to(FlowStatus::Finished)
153        })
154        .await
155    }
156
157    /// Running/Waiting → Failed. `reason` is recorded in the event log and
158    /// stamped under `state_json.failure`.
159    pub async fn fail(&self, id: Uuid, reason: impl Into<String>) -> Result<Flow, FlowError> {
160        let reason = reason.into();
161        let payload = json!({ "reason": reason });
162        self.with_retry(id, "failed", payload.clone(), move |f| {
163            merge_state(
164                &mut f.state_json,
165                json!({ "failure": { "reason": reason.clone(), "at": Utc::now().to_rfc3339() } }),
166            );
167            f.transition_to(FlowStatus::Failed)
168        })
169        .await
170    }
171
172    /// Set sticky cancel intent without changing status. Useful when an
173    /// in-flight step needs to drain before the flow can flip to `Cancelled`.
174    pub async fn request_cancel(&self, id: Uuid) -> Result<Flow, FlowError> {
175        self.with_retry(id, "cancel_requested", json!({}), |f| {
176            f.request_cancel();
177            Ok(())
178        })
179        .await
180    }
181
182    /// Force the flow to `Cancelled`. Allowed from any non-terminal status.
183    pub async fn cancel(&self, id: Uuid) -> Result<Flow, FlowError> {
184        self.with_retry(id, "cancelled", json!({}), |f| {
185            f.transition_to(FlowStatus::Cancelled)
186        })
187        .await
188    }
189
190    /// Mutate `state_json` without changing status. `current_step` is
191    /// optionally updated in the same revision.
192    pub async fn update_state(
193        &self,
194        id: Uuid,
195        patch: Value,
196        next_step: Option<String>,
197    ) -> Result<Flow, FlowError> {
198        let payload = json!({ "patch": patch.clone(), "next_step": next_step.clone() });
199        self.with_retry(id, "state_updated", payload, move |f| {
200            // Reject if cancel_requested or terminal — same policy as transition_to.
201            if f.status.is_terminal() {
202                return Err(FlowError::AlreadyTerminal {
203                    id: f.id,
204                    status: f.status,
205                });
206            }
207            if f.cancel_requested {
208                return Err(FlowError::CancelPending { id: f.id });
209            }
210            merge_state(&mut f.state_json, patch.clone());
211            if let Some(step) = &next_step {
212                f.current_step = step.clone();
213            }
214            f.updated_at = Utc::now();
215            Ok(())
216        })
217        .await
218    }
219
220    /// Create a mirrored flow. The flow is born in `Running` status because
221    /// the externally-observed work is typically already in flight. Use
222    /// `record_step_observation` to keep its steps in sync.
223    pub async fn create_mirrored(&self, input: CreateManagedInput) -> Result<Flow, FlowError> {
224        let created = self.create_managed(input).await?;
225        let running = self.start_running(created.id).await?;
226        Ok(running)
227    }
228
229    /// Upsert-style: if a step with the same `(flow_id, run_id)` exists,
230    /// update its status/result; otherwise insert a fresh step row. Designed
231    /// to be called from a NATS subscriber (or CLI/cron bridge).
232    pub async fn record_step_observation(
233        &self,
234        observation: StepObservation,
235    ) -> Result<FlowStep, FlowError> {
236        // The flow must exist; cross-reference prevents orphaned steps from
237        // polluting the store if the bus delivers late events.
238        let _flow = self
239            .store
240            .get(observation.flow_id)
241            .await?
242            .ok_or(FlowError::NotFound(observation.flow_id))?;
243
244        let existing = self
245            .store
246            .find_step_by_run_id(observation.flow_id, &observation.run_id)
247            .await?;
248        let now = chrono::Utc::now();
249        let step = match existing {
250            Some(mut s) => {
251                s.task = observation.task.clone();
252                s.status = observation.status;
253                s.result_json = observation.result_json.clone();
254                s.child_session_key = observation
255                    .child_session_key
256                    .clone()
257                    .or(s.child_session_key);
258                s.updated_at = now;
259                self.store.update_step(&s).await?
260            }
261            None => {
262                let fresh = FlowStep {
263                    id: Uuid::new_v4(),
264                    flow_id: observation.flow_id,
265                    runtime: StepRuntime::Mirrored,
266                    child_session_key: observation.child_session_key.clone(),
267                    run_id: observation.run_id.clone(),
268                    task: observation.task.clone(),
269                    status: observation.status,
270                    result_json: observation.result_json.clone(),
271                    created_at: now,
272                    updated_at: now,
273                };
274                self.store.insert_step(&fresh).await?;
275                fresh
276            }
277        };
278
279        // Audit trail on the flow itself.
280        self.store
281            .append_event(
282                observation.flow_id,
283                "step_observed",
284                json!({
285                    "run_id": observation.run_id,
286                    "status": step.status.as_str(),
287                    "runtime": step.runtime.as_str(),
288                }),
289            )
290            .await?;
291        Ok(step)
292    }
293
294    /// Read all steps linked to a flow, ordered oldest-first.
295    pub async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError> {
296        self.store.list_steps(flow_id).await
297    }
298
299    /// Read–modify–write loop with one retry on `RevisionMismatch`. The
300    /// mutation closure runs against a fresh copy each attempt. Uses
301    /// `update_and_append` so the revision-checked UPDATE and the
302    /// audit-log INSERT commit atomically — previously a crash between
303    /// the two left the flow advanced but the event log silently
304    /// incomplete.
305    async fn with_retry<F>(
306        &self,
307        id: Uuid,
308        event_kind: &str,
309        event_payload: Value,
310        mutate: F,
311    ) -> Result<Flow, FlowError>
312    where
313        F: Fn(&mut Flow) -> Result<(), FlowError> + Send + Sync,
314    {
315        let mut last_err: Option<FlowError> = None;
316        for _ in 0..RETRY_ATTEMPTS {
317            let mut current = self.store.get(id).await?.ok_or(FlowError::NotFound(id))?;
318            mutate(&mut current)?;
319            match self
320                .store
321                .update_and_append(&current, event_kind, event_payload.clone())
322                .await
323            {
324                Ok((updated, _event)) => return Ok(updated),
325                Err(FlowError::RevisionMismatch { .. }) => {
326                    last_err = Some(FlowError::RevisionMismatch {
327                        expected: current.revision,
328                        actual: -1,
329                    });
330                    continue;
331                }
332                Err(e) => return Err(e),
333            }
334        }
335        Err(last_err.unwrap_or_else(|| FlowError::InvalidData("retry exhausted".into())))
336    }
337}
338
339/// Shallow JSON merge: top-level keys in `patch` overwrite those in `target`.
340/// Non-object `patch` replaces `target` entirely. Adopted by every mutation
341/// that touches `state_json`.
342fn merge_state(target: &mut Value, patch: Value) {
343    match (target, patch) {
344        (Value::Object(t), Value::Object(p)) => {
345            for (k, v) in p {
346                t.insert(k, v);
347            }
348        }
349        (target_slot, other) => {
350            *target_slot = other;
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358    use crate::store::SqliteFlowStore;
359    use serde_json::json;
360
361    async fn manager() -> FlowManager {
362        let store = SqliteFlowStore::open(":memory:").await.unwrap();
363        FlowManager::new(Arc::new(store))
364    }
365
366    fn input() -> CreateManagedInput {
367        CreateManagedInput {
368            controller_id: "kate/inbox".into(),
369            goal: "triage inbox".into(),
370            owner_session_key: "agent:kate:session:abc".into(),
371            requester_origin: "user-1".into(),
372            current_step: "classify".into(),
373            state_json: json!({"messages": 10, "processed": 0}),
374        }
375    }
376
377    #[tokio::test]
378    async fn full_happy_path_create_run_wait_resume_finish() {
379        let m = manager().await;
380        let f = m.create_managed(input()).await.unwrap();
381        assert_eq!(f.status, FlowStatus::Created);
382        assert_eq!(f.revision, 0);
383
384        let f = m.start_running(f.id).await.unwrap();
385        assert_eq!(f.status, FlowStatus::Running);
386        assert_eq!(f.revision, 1);
387
388        let f = m
389            .set_waiting(f.id, json!({"kind": "timer", "at": "2026-04-23T15:00:00Z"}))
390            .await
391            .unwrap();
392        assert_eq!(f.status, FlowStatus::Waiting);
393        assert!(f.wait_json.is_some());
394        assert_eq!(f.revision, 2);
395
396        let f = m.resume(f.id, Some(json!({"processed": 5}))).await.unwrap();
397        assert_eq!(f.status, FlowStatus::Running);
398        assert!(f.wait_json.is_none());
399        assert_eq!(f.state_json["processed"], 5);
400        assert_eq!(f.state_json["messages"], 10);
401
402        let f = m
403            .finish(f.id, Some(json!({"summary": "10 done"})))
404            .await
405            .unwrap();
406        assert_eq!(f.status, FlowStatus::Finished);
407        assert_eq!(f.state_json["summary"], "10 done");
408    }
409
410    #[tokio::test]
411    async fn fail_records_reason_in_state_and_event() {
412        let m = manager().await;
413        let f = m.create_managed(input()).await.unwrap();
414        let f = m.start_running(f.id).await.unwrap();
415        let f = m.fail(f.id, "downstream 503").await.unwrap();
416        assert_eq!(f.status, FlowStatus::Failed);
417        assert_eq!(f.state_json["failure"]["reason"], "downstream 503");
418
419        // The store kept an "failed" event.
420        let store = SqliteFlowStore::open(":memory:").await.unwrap(); // unrelated; we use m.store
421        let _ = store; // silence
422    }
423
424    #[tokio::test]
425    async fn cancel_from_running_succeeds() {
426        let m = manager().await;
427        let f = m.create_managed(input()).await.unwrap();
428        let f = m.start_running(f.id).await.unwrap();
429        let f = m.cancel(f.id).await.unwrap();
430        assert_eq!(f.status, FlowStatus::Cancelled);
431    }
432
433    #[tokio::test]
434    async fn request_cancel_blocks_finish() {
435        let m = manager().await;
436        let f = m.create_managed(input()).await.unwrap();
437        let f = m.start_running(f.id).await.unwrap();
438        let f = m.request_cancel(f.id).await.unwrap();
439        assert!(f.cancel_requested);
440        assert_eq!(f.status, FlowStatus::Running);
441
442        let err = m.finish(f.id, None).await.expect_err("blocked");
443        assert!(matches!(err, FlowError::CancelPending { .. }));
444
445        // But cancel still works.
446        let f = m.cancel(f.id).await.unwrap();
447        assert_eq!(f.status, FlowStatus::Cancelled);
448    }
449
450    #[tokio::test]
451    async fn update_state_preserves_status_and_merges_shallow() {
452        let m = manager().await;
453        let f = m.create_managed(input()).await.unwrap();
454        let f = m.start_running(f.id).await.unwrap();
455
456        let f = m
457            .update_state(
458                f.id,
459                json!({"processed": 3, "errors": []}),
460                Some("fetch".into()),
461            )
462            .await
463            .unwrap();
464        assert_eq!(f.status, FlowStatus::Running);
465        assert_eq!(f.current_step, "fetch");
466        assert_eq!(f.state_json["processed"], 3);
467        assert_eq!(f.state_json["messages"], 10, "untouched key preserved");
468        assert!(f.state_json["errors"].is_array());
469    }
470
471    #[tokio::test]
472    async fn update_state_rejected_when_cancel_pending() {
473        let m = manager().await;
474        let f = m.create_managed(input()).await.unwrap();
475        let f = m.start_running(f.id).await.unwrap();
476        m.request_cancel(f.id).await.unwrap();
477        let err = m
478            .update_state(f.id, json!({"x": 1}), None)
479            .await
480            .expect_err("blocked");
481        assert!(matches!(err, FlowError::CancelPending { .. }));
482    }
483
484    #[tokio::test]
485    async fn create_appends_audit_event() {
486        let store = Arc::new(SqliteFlowStore::open(":memory:").await.unwrap());
487        let m = FlowManager::new(store.clone());
488        let f = m.create_managed(input()).await.unwrap();
489        let events = store.list_events(f.id, 10).await.unwrap();
490        assert_eq!(events.len(), 1);
491        assert_eq!(events[0].kind, "created");
492    }
493
494    #[tokio::test]
495    async fn create_mirrored_starts_in_running() {
496        let m = manager().await;
497        let f = m.create_mirrored(input()).await.unwrap();
498        assert_eq!(f.status, FlowStatus::Running);
499    }
500
501    #[tokio::test]
502    async fn record_step_observation_inserts_then_updates() {
503        let m = manager().await;
504        let f = m.create_mirrored(input()).await.unwrap();
505
506        // First observation — new step.
507        let s1 = m
508            .record_step_observation(StepObservation {
509                flow_id: f.id,
510                run_id: "cron-42".into(),
511                task: "classify".into(),
512                status: FlowStepStatus::Running,
513                child_session_key: Some("cron:session".into()),
514                result_json: None,
515            })
516            .await
517            .unwrap();
518        assert_eq!(s1.runtime, StepRuntime::Mirrored);
519        assert_eq!(s1.status, FlowStepStatus::Running);
520
521        // Second observation with same run_id — should update the existing
522        // row in place, not insert a new one.
523        let s2 = m
524            .record_step_observation(StepObservation {
525                flow_id: f.id,
526                run_id: "cron-42".into(),
527                task: "classify".into(),
528                status: FlowStepStatus::Succeeded,
529                child_session_key: None,
530                result_json: Some(json!({"classified": 10})),
531            })
532            .await
533            .unwrap();
534        assert_eq!(s1.id, s2.id, "same step row should be reused");
535        assert_eq!(s2.status, FlowStepStatus::Succeeded);
536        assert_eq!(s2.result_json.unwrap()["classified"], 10);
537        // child_session_key preserved from first observation when second is None.
538        assert_eq!(s2.child_session_key.as_deref(), Some("cron:session"));
539
540        // Only one step persisted.
541        let steps = m.list_steps(f.id).await.unwrap();
542        assert_eq!(steps.len(), 1);
543    }
544
545    #[tokio::test]
546    async fn record_step_on_unknown_flow_errors() {
547        let m = manager().await;
548        let err = m
549            .record_step_observation(StepObservation {
550                flow_id: Uuid::new_v4(),
551                run_id: "r".into(),
552                task: "t".into(),
553                status: FlowStepStatus::Pending,
554                child_session_key: None,
555                result_json: None,
556            })
557            .await
558            .expect_err("err");
559        assert!(matches!(err, FlowError::NotFound(_)));
560    }
561
562    #[tokio::test]
563    async fn list_steps_returns_per_flow() {
564        let m = manager().await;
565        let f = m.create_mirrored(input()).await.unwrap();
566        for i in 0..3 {
567            m.record_step_observation(StepObservation {
568                flow_id: f.id,
569                run_id: format!("run-{i}"),
570                task: format!("task-{i}"),
571                status: FlowStepStatus::Pending,
572                child_session_key: None,
573                result_json: None,
574            })
575            .await
576            .unwrap();
577        }
578        let steps = m.list_steps(f.id).await.unwrap();
579        assert_eq!(steps.len(), 3);
580    }
581
582    #[tokio::test]
583    async fn double_finish_returns_already_terminal() {
584        let m = manager().await;
585        let f = m.create_managed(input()).await.unwrap();
586        let f = m.start_running(f.id).await.unwrap();
587        let _ = m.finish(f.id, None).await.unwrap();
588        let err = m.finish(f.id, None).await.expect_err("terminal");
589        assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
590    }
591}