Skip to main content

nexo_taskflow/
store.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
5use sqlx::{Row, SqlitePool};
6use std::str::FromStr;
7use uuid::Uuid;
8
9use crate::types::{Flow, FlowError, FlowEvent, FlowStatus, FlowStep, FlowStepStatus, StepRuntime};
10
11const SCHEMA: &str = r#"
12CREATE TABLE IF NOT EXISTS flows (
13    id TEXT PRIMARY KEY,
14    controller_id TEXT NOT NULL,
15    goal TEXT NOT NULL,
16    owner_session_key TEXT NOT NULL,
17    requester_origin TEXT NOT NULL,
18    current_step TEXT NOT NULL,
19    state_json TEXT NOT NULL DEFAULT '{}',
20    wait_json TEXT,
21    status TEXT NOT NULL,
22    cancel_requested INTEGER NOT NULL DEFAULT 0,
23    revision INTEGER NOT NULL DEFAULT 0,
24    created_at TEXT NOT NULL,
25    updated_at TEXT NOT NULL
26);
27
28CREATE INDEX IF NOT EXISTS idx_flows_owner ON flows(owner_session_key);
29CREATE INDEX IF NOT EXISTS idx_flows_status ON flows(status);
30
31CREATE TABLE IF NOT EXISTS flow_steps (
32    id TEXT PRIMARY KEY,
33    flow_id TEXT NOT NULL REFERENCES flows(id) ON DELETE CASCADE,
34    runtime TEXT NOT NULL,
35    child_session_key TEXT,
36    run_id TEXT NOT NULL,
37    task TEXT NOT NULL,
38    status TEXT NOT NULL,
39    result_json TEXT,
40    created_at TEXT NOT NULL,
41    updated_at TEXT NOT NULL
42);
43
44CREATE INDEX IF NOT EXISTS idx_flow_steps_flow ON flow_steps(flow_id);
45
46-- Phase-14 follow-up: the engine looks up steps by `(flow_id, run_id)`
47-- on every observation event. Without the unique index, two
48-- concurrent observations could both see "no row" in
49-- `find_step_by_run_id` and each insert a fresh row — duplicate steps
50-- with the same run_id, followed by non-deterministic lookups.
51-- UNIQUE also fixes the perf issue (was O(n) per observation).
52CREATE UNIQUE INDEX IF NOT EXISTS idx_flow_steps_run
53    ON flow_steps(flow_id, run_id);
54
55CREATE TABLE IF NOT EXISTS flow_events (
56    id INTEGER PRIMARY KEY AUTOINCREMENT,
57    flow_id TEXT NOT NULL REFERENCES flows(id) ON DELETE CASCADE,
58    kind TEXT NOT NULL,
59    payload_json TEXT NOT NULL,
60    at TEXT NOT NULL
61);
62
63CREATE INDEX IF NOT EXISTS idx_flow_events_flow ON flow_events(flow_id);
64"#;
65
66/// Persistence layer for `Flow` records.
67///
68/// All mutating operations are revision-checked. Stale callers receive
69/// `FlowError::RevisionMismatch` and must re-fetch before retrying.
70#[async_trait]
71pub trait FlowStore: Send + Sync {
72    async fn insert(&self, flow: &Flow) -> Result<(), FlowError>;
73    async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError>;
74    async fn list_by_owner(&self, owner_session_key: &str) -> Result<Vec<Flow>, FlowError>;
75    async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError>;
76    async fn update_with_revision(&self, flow: &Flow) -> Result<Flow, FlowError>;
77    async fn append_event(
78        &self,
79        flow_id: Uuid,
80        kind: &str,
81        payload: Value,
82    ) -> Result<FlowEvent, FlowError>;
83    async fn list_events(&self, flow_id: Uuid, limit: i64) -> Result<Vec<FlowEvent>, FlowError>;
84
85    // ---- flow_steps ----
86    async fn insert_step(&self, step: &FlowStep) -> Result<(), FlowError>;
87    async fn update_step(&self, step: &FlowStep) -> Result<FlowStep, FlowError>;
88    async fn get_step(&self, id: Uuid) -> Result<Option<FlowStep>, FlowError>;
89    async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError>;
90    async fn find_step_by_run_id(
91        &self,
92        flow_id: Uuid,
93        run_id: &str,
94    ) -> Result<Option<FlowStep>, FlowError>;
95
96    /// Update a flow's revision + append an audit event atomically.
97    /// Previously `FlowManager::with_retry` called `update_with_revision`
98    /// followed by `append_event` as two round-trips — a crash between
99    /// them left the flow updated but with no event row, silently
100    /// corrupting the audit trail. Implementers should run both in a
101    /// single transaction; the default impl here is safe for stores
102    /// where atomic multi-op isn't possible (falls back to the
103    /// non-atomic pair with a warn log).
104    async fn update_and_append(
105        &self,
106        flow: &Flow,
107        event_kind: &str,
108        event_payload: Value,
109    ) -> Result<(Flow, FlowEvent), FlowError> {
110        tracing::warn!(
111            flow_id = %flow.id,
112            "FlowStore::update_and_append using non-atomic fallback — implement a transaction-based override"
113        );
114        let updated = self.update_with_revision(flow).await?;
115        let event = self
116            .append_event(updated.id, event_kind, event_payload)
117            .await?;
118        Ok((updated, event))
119    }
120
121    /// Drop flows in terminal status (`Finished`, `Failed`, `Cancelled`)
122    /// whose `updated_at` is older than `retain_days`. Cascades through
123    /// `flow_steps` and `flow_events` via ON DELETE CASCADE in schema.
124    /// Intended for a daily heartbeat so `list_by_owner` / `list_by_
125    /// status` don't grow O(n) over all history. Default impl returns
126    /// an error — stores must override.
127    async fn prune_terminal_flows(&self, _retain_days: i64) -> Result<u64, FlowError> {
128        Err(FlowError::InvalidData(
129            "prune_terminal_flows not implemented by this store".into(),
130        ))
131    }
132}
133
134#[derive(Clone)]
135pub struct SqliteFlowStore {
136    pool: SqlitePool,
137}
138
139impl SqliteFlowStore {
140    /// Open or create a SQLite database at `path`. Use `:memory:` for tests.
141    pub async fn open(path: &str) -> Result<Self, FlowError> {
142        let opts = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))
143            .map_err(|e| FlowError::InvalidData(format!("bad sqlite url: {e}")))?
144            .create_if_missing(true)
145            .foreign_keys(true);
146        let pool = SqlitePoolOptions::new()
147            .max_connections(5)
148            .connect_with(opts)
149            .await?;
150        Self::with_pool(pool).await
151    }
152
153    /// Open against a pool the caller already owns. Useful for tests that
154    /// share an in-memory DB across multiple stores.
155    pub async fn with_pool(pool: SqlitePool) -> Result<Self, FlowError> {
156        // Run schema once. SQL is idempotent (`IF NOT EXISTS`).
157        for stmt in SCHEMA.split(';') {
158            let trimmed = stmt.trim();
159            if trimmed.is_empty() {
160                continue;
161            }
162            sqlx::query(trimmed).execute(&pool).await?;
163        }
164        Ok(Self { pool })
165    }
166
167    pub fn pool(&self) -> &SqlitePool {
168        &self.pool
169    }
170}
171
172#[async_trait]
173impl FlowStore for SqliteFlowStore {
174    async fn insert(&self, flow: &Flow) -> Result<(), FlowError> {
175        let state_json = serde_json::to_string(&flow.state_json)
176            .map_err(|e| FlowError::InvalidData(e.to_string()))?;
177        let wait_json = match &flow.wait_json {
178            Some(v) => {
179                Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
180            }
181            None => None,
182        };
183        sqlx::query(
184            "INSERT INTO flows (id, controller_id, goal, owner_session_key, requester_origin, \
185             current_step, state_json, wait_json, status, cancel_requested, revision, created_at, updated_at) \
186             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
187        )
188        .bind(flow.id.to_string())
189        .bind(&flow.controller_id)
190        .bind(&flow.goal)
191        .bind(&flow.owner_session_key)
192        .bind(&flow.requester_origin)
193        .bind(&flow.current_step)
194        .bind(state_json)
195        .bind(wait_json)
196        .bind(flow.status.as_str())
197        .bind(flow.cancel_requested as i64)
198        .bind(flow.revision)
199        .bind(flow.created_at.to_rfc3339())
200        .bind(flow.updated_at.to_rfc3339())
201        .execute(&self.pool)
202        .await?;
203        Ok(())
204    }
205
206    async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError> {
207        let row = sqlx::query("SELECT * FROM flows WHERE id = ?")
208            .bind(id.to_string())
209            .fetch_optional(&self.pool)
210            .await?;
211        row.map(row_to_flow).transpose()
212    }
213
214    async fn list_by_owner(&self, owner_session_key: &str) -> Result<Vec<Flow>, FlowError> {
215        let rows =
216            sqlx::query("SELECT * FROM flows WHERE owner_session_key = ? ORDER BY created_at DESC")
217                .bind(owner_session_key)
218                .fetch_all(&self.pool)
219                .await?;
220        rows.into_iter().map(row_to_flow).collect()
221    }
222
223    async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError> {
224        let rows = sqlx::query("SELECT * FROM flows WHERE status = ? ORDER BY updated_at ASC")
225            .bind(status.as_str())
226            .fetch_all(&self.pool)
227            .await?;
228        rows.into_iter().map(row_to_flow).collect()
229    }
230
231    async fn update_with_revision(&self, flow: &Flow) -> Result<Flow, FlowError> {
232        let state_json = serde_json::to_string(&flow.state_json)
233            .map_err(|e| FlowError::InvalidData(e.to_string()))?;
234        let wait_json = match &flow.wait_json {
235            Some(v) => {
236                Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
237            }
238            None => None,
239        };
240        let new_revision = flow.revision + 1;
241        let now = Utc::now();
242        let result = sqlx::query(
243            "UPDATE flows SET controller_id = ?, goal = ?, owner_session_key = ?, \
244             requester_origin = ?, current_step = ?, state_json = ?, wait_json = ?, \
245             status = ?, cancel_requested = ?, revision = ?, updated_at = ? \
246             WHERE id = ? AND revision = ?",
247        )
248        .bind(&flow.controller_id)
249        .bind(&flow.goal)
250        .bind(&flow.owner_session_key)
251        .bind(&flow.requester_origin)
252        .bind(&flow.current_step)
253        .bind(state_json)
254        .bind(wait_json)
255        .bind(flow.status.as_str())
256        .bind(flow.cancel_requested as i64)
257        .bind(new_revision)
258        .bind(now.to_rfc3339())
259        .bind(flow.id.to_string())
260        .bind(flow.revision)
261        .execute(&self.pool)
262        .await?;
263
264        if result.rows_affected() == 0 {
265            // Either gone, or stale revision. Disambiguate.
266            let actual = self.get(flow.id).await?;
267            return match actual {
268                None => Err(FlowError::NotFound(flow.id)),
269                Some(found) => Err(FlowError::RevisionMismatch {
270                    expected: flow.revision,
271                    actual: found.revision,
272                }),
273            };
274        }
275        // Refetch to return the canonical post-update state.
276        self.get(flow.id).await?.ok_or(FlowError::NotFound(flow.id))
277    }
278
279    async fn append_event(
280        &self,
281        flow_id: Uuid,
282        kind: &str,
283        payload: Value,
284    ) -> Result<FlowEvent, FlowError> {
285        let payload_json =
286            serde_json::to_string(&payload).map_err(|e| FlowError::InvalidData(e.to_string()))?;
287        let now = Utc::now();
288        let result = sqlx::query(
289            "INSERT INTO flow_events (flow_id, kind, payload_json, at) VALUES (?, ?, ?, ?)",
290        )
291        .bind(flow_id.to_string())
292        .bind(kind)
293        .bind(payload_json)
294        .bind(now.to_rfc3339())
295        .execute(&self.pool)
296        .await?;
297        Ok(FlowEvent {
298            id: result.last_insert_rowid(),
299            flow_id,
300            kind: kind.to_string(),
301            payload_json: payload,
302            at: now,
303        })
304    }
305
306    async fn list_events(&self, flow_id: Uuid, limit: i64) -> Result<Vec<FlowEvent>, FlowError> {
307        // Guard against negative limit — SQLite treats `LIMIT -1` as
308        // unbounded which would scan the entire flow_events table for
309        // a long-lived flow. Callers that want "all" should say so
310        // explicitly via `i64::MAX`.
311        let limit = limit.max(0);
312        let rows = sqlx::query(
313            "SELECT id, flow_id, kind, payload_json, at FROM flow_events \
314             WHERE flow_id = ? ORDER BY id DESC LIMIT ?",
315        )
316        .bind(flow_id.to_string())
317        .bind(limit)
318        .fetch_all(&self.pool)
319        .await?;
320        rows.into_iter()
321            .map(|row| {
322                let id: i64 = row.try_get("id")?;
323                let flow_id_s: String = row.try_get("flow_id")?;
324                let kind: String = row.try_get("kind")?;
325                let payload_s: String = row.try_get("payload_json")?;
326                let at_s: String = row.try_get("at")?;
327                let flow_id = Uuid::parse_str(&flow_id_s)
328                    .map_err(|e| FlowError::InvalidData(format!("bad flow_id: {e}")))?;
329                let payload_json = serde_json::from_str(&payload_s)
330                    .map_err(|e| FlowError::InvalidData(format!("bad event payload: {e}")))?;
331                let at = chrono::DateTime::parse_from_rfc3339(&at_s)
332                    .map_err(|e| FlowError::InvalidData(format!("bad event ts: {e}")))?
333                    .with_timezone(&Utc);
334                Ok(FlowEvent {
335                    id,
336                    flow_id,
337                    kind,
338                    payload_json,
339                    at,
340                })
341            })
342            .collect()
343    }
344
345    async fn insert_step(&self, step: &FlowStep) -> Result<(), FlowError> {
346        let result_s = match &step.result_json {
347            Some(v) => {
348                Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
349            }
350            None => None,
351        };
352        sqlx::query(
353            "INSERT INTO flow_steps (id, flow_id, runtime, child_session_key, run_id, task, status, result_json, created_at, updated_at) \
354             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
355        )
356        .bind(step.id.to_string())
357        .bind(step.flow_id.to_string())
358        .bind(step.runtime.as_str())
359        .bind(step.child_session_key.as_deref())
360        .bind(&step.run_id)
361        .bind(&step.task)
362        .bind(step.status.as_str())
363        .bind(result_s)
364        .bind(step.created_at.to_rfc3339())
365        .bind(step.updated_at.to_rfc3339())
366        .execute(&self.pool)
367        .await?;
368        Ok(())
369    }
370
371    async fn update_step(&self, step: &FlowStep) -> Result<FlowStep, FlowError> {
372        let result_s = match &step.result_json {
373            Some(v) => {
374                Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
375            }
376            None => None,
377        };
378        let now = Utc::now();
379        let rows = sqlx::query(
380            "UPDATE flow_steps SET runtime = ?, child_session_key = ?, run_id = ?, task = ?, \
381             status = ?, result_json = ?, updated_at = ? WHERE id = ?",
382        )
383        .bind(step.runtime.as_str())
384        .bind(step.child_session_key.as_deref())
385        .bind(&step.run_id)
386        .bind(&step.task)
387        .bind(step.status.as_str())
388        .bind(result_s)
389        .bind(now.to_rfc3339())
390        .bind(step.id.to_string())
391        .execute(&self.pool)
392        .await?;
393        if rows.rows_affected() == 0 {
394            return Err(FlowError::NotFound(step.id));
395        }
396        self.get_step(step.id)
397            .await?
398            .ok_or(FlowError::NotFound(step.id))
399    }
400
401    async fn get_step(&self, id: Uuid) -> Result<Option<FlowStep>, FlowError> {
402        let row = sqlx::query("SELECT * FROM flow_steps WHERE id = ?")
403            .bind(id.to_string())
404            .fetch_optional(&self.pool)
405            .await?;
406        row.map(row_to_step).transpose()
407    }
408
409    async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError> {
410        let rows =
411            sqlx::query("SELECT * FROM flow_steps WHERE flow_id = ? ORDER BY created_at ASC")
412                .bind(flow_id.to_string())
413                .fetch_all(&self.pool)
414                .await?;
415        rows.into_iter().map(row_to_step).collect()
416    }
417
418    async fn find_step_by_run_id(
419        &self,
420        flow_id: Uuid,
421        run_id: &str,
422    ) -> Result<Option<FlowStep>, FlowError> {
423        let row = sqlx::query("SELECT * FROM flow_steps WHERE flow_id = ? AND run_id = ?")
424            .bind(flow_id.to_string())
425            .bind(run_id)
426            .fetch_optional(&self.pool)
427            .await?;
428        row.map(row_to_step).transpose()
429    }
430
431    async fn prune_terminal_flows(&self, retain_days: i64) -> Result<u64, FlowError> {
432        // Cutoff as RFC3339 so the comparison matches the stored
433        // `updated_at` text format (SQLite string compare works for
434        // ISO8601 UTC strings because the encoding is sort-preserving).
435        let cutoff = (Utc::now() - chrono::Duration::days(retain_days.max(0))).to_rfc3339();
436        let result = sqlx::query(
437            "DELETE FROM flows \
438             WHERE status IN ('finished','failed','cancelled') \
439               AND updated_at < ?",
440        )
441        .bind(&cutoff)
442        .execute(&self.pool)
443        .await?;
444        Ok(result.rows_affected())
445    }
446
447    /// Atomic override: runs the revision check, UPDATE, event INSERT,
448    /// and final SELECT inside one SQLite transaction. A crash at any
449    /// point rolls back and the caller sees either the full old state
450    /// or the full new state + event — no torn audit trail.
451    async fn update_and_append(
452        &self,
453        flow: &Flow,
454        event_kind: &str,
455        event_payload: Value,
456    ) -> Result<(Flow, FlowEvent), FlowError> {
457        let state_json = serde_json::to_string(&flow.state_json)
458            .map_err(|e| FlowError::InvalidData(e.to_string()))?;
459        let wait_json = match &flow.wait_json {
460            Some(v) => {
461                Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
462            }
463            None => None,
464        };
465        let payload_json = serde_json::to_string(&event_payload)
466            .map_err(|e| FlowError::InvalidData(e.to_string()))?;
467        let new_revision = flow.revision + 1;
468        let now = Utc::now();
469
470        let mut tx = self.pool.begin().await?;
471        let update_result = sqlx::query(
472            "UPDATE flows SET controller_id = ?, goal = ?, owner_session_key = ?, \
473             requester_origin = ?, current_step = ?, state_json = ?, wait_json = ?, \
474             status = ?, cancel_requested = ?, revision = ?, updated_at = ? \
475             WHERE id = ? AND revision = ?",
476        )
477        .bind(&flow.controller_id)
478        .bind(&flow.goal)
479        .bind(&flow.owner_session_key)
480        .bind(&flow.requester_origin)
481        .bind(&flow.current_step)
482        .bind(state_json)
483        .bind(wait_json)
484        .bind(flow.status.as_str())
485        .bind(flow.cancel_requested as i64)
486        .bind(new_revision)
487        .bind(now.to_rfc3339())
488        .bind(flow.id.to_string())
489        .bind(flow.revision)
490        .execute(&mut *tx)
491        .await?;
492
493        if update_result.rows_affected() == 0 {
494            // Roll back — the caller re-fetches via update_with_revision
495            // path to distinguish NotFound vs RevisionMismatch.
496            tx.rollback().await?;
497            let actual = self.get(flow.id).await?;
498            return match actual {
499                None => Err(FlowError::NotFound(flow.id)),
500                Some(found) => Err(FlowError::RevisionMismatch {
501                    expected: flow.revision,
502                    actual: found.revision,
503                }),
504            };
505        }
506
507        let event_result = sqlx::query(
508            "INSERT INTO flow_events (flow_id, kind, payload_json, at) VALUES (?, ?, ?, ?)",
509        )
510        .bind(flow.id.to_string())
511        .bind(event_kind)
512        .bind(payload_json)
513        .bind(now.to_rfc3339())
514        .execute(&mut *tx)
515        .await?;
516        let event_id = event_result.last_insert_rowid();
517
518        // Read the canonical post-commit flow state inside the tx.
519        let row = sqlx::query("SELECT * FROM flows WHERE id = ?")
520            .bind(flow.id.to_string())
521            .fetch_one(&mut *tx)
522            .await?;
523        let updated = row_to_flow(row)?;
524
525        tx.commit().await?;
526
527        let event = FlowEvent {
528            id: event_id,
529            flow_id: flow.id,
530            kind: event_kind.to_string(),
531            payload_json: event_payload,
532            at: now,
533        };
534        Ok((updated, event))
535    }
536}
537
538fn row_to_step(row: sqlx::sqlite::SqliteRow) -> Result<FlowStep, FlowError> {
539    let id_s: String = row.try_get("id")?;
540    let id = Uuid::parse_str(&id_s).map_err(|e| FlowError::InvalidData(format!("bad id: {e}")))?;
541    let flow_id_s: String = row.try_get("flow_id")?;
542    let flow_id = Uuid::parse_str(&flow_id_s)
543        .map_err(|e| FlowError::InvalidData(format!("bad flow_id: {e}")))?;
544    let runtime_s: String = row.try_get("runtime")?;
545    let runtime = StepRuntime::from_str(&runtime_s)
546        .ok_or_else(|| FlowError::InvalidData(format!("unknown runtime: {runtime_s}")))?;
547    let child_session_key: Option<String> = row.try_get("child_session_key")?;
548    let run_id: String = row.try_get("run_id")?;
549    let task: String = row.try_get("task")?;
550    let status_s: String = row.try_get("status")?;
551    let status = FlowStepStatus::from_str(&status_s)
552        .ok_or_else(|| FlowError::InvalidData(format!("unknown step status: {status_s}")))?;
553    let result_s: Option<String> = row.try_get("result_json")?;
554    let result_json = match result_s {
555        Some(s) => Some(
556            serde_json::from_str::<Value>(&s)
557                .map_err(|e| FlowError::InvalidData(format!("bad result_json: {e}")))?,
558        ),
559        None => None,
560    };
561    let created_at_s: String = row.try_get("created_at")?;
562    let updated_at_s: String = row.try_get("updated_at")?;
563    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_s)
564        .map_err(|e| FlowError::InvalidData(format!("bad created_at: {e}")))?
565        .with_timezone(&Utc);
566    let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_s)
567        .map_err(|e| FlowError::InvalidData(format!("bad updated_at: {e}")))?
568        .with_timezone(&Utc);
569
570    Ok(FlowStep {
571        id,
572        flow_id,
573        runtime,
574        child_session_key,
575        run_id,
576        task,
577        status,
578        result_json,
579        created_at,
580        updated_at,
581    })
582}
583
584fn row_to_flow(row: sqlx::sqlite::SqliteRow) -> Result<Flow, FlowError> {
585    let id_s: String = row.try_get("id")?;
586    let id = Uuid::parse_str(&id_s).map_err(|e| FlowError::InvalidData(format!("bad id: {e}")))?;
587    let controller_id: String = row.try_get("controller_id")?;
588    let goal: String = row.try_get("goal")?;
589    let owner_session_key: String = row.try_get("owner_session_key")?;
590    let requester_origin: String = row.try_get("requester_origin")?;
591    let current_step: String = row.try_get("current_step")?;
592    let state_json_s: String = row.try_get("state_json")?;
593    let wait_json_s: Option<String> = row.try_get("wait_json")?;
594    let status_s: String = row.try_get("status")?;
595    let cancel_requested_i: i64 = row.try_get("cancel_requested")?;
596    let revision: i64 = row.try_get("revision")?;
597    let created_at_s: String = row.try_get("created_at")?;
598    let updated_at_s: String = row.try_get("updated_at")?;
599
600    let state_json: Value = serde_json::from_str(&state_json_s)
601        .map_err(|e| FlowError::InvalidData(format!("bad state_json: {e}")))?;
602    let wait_json = match wait_json_s {
603        Some(s) => Some(
604            serde_json::from_str::<Value>(&s)
605                .map_err(|e| FlowError::InvalidData(format!("bad wait_json: {e}")))?,
606        ),
607        None => None,
608    };
609    let status = FlowStatus::from_str(&status_s)
610        .ok_or_else(|| FlowError::InvalidData(format!("unknown status: {status_s}")))?;
611    let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_s)
612        .map_err(|e| FlowError::InvalidData(format!("bad created_at: {e}")))?
613        .with_timezone(&Utc);
614    let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_s)
615        .map_err(|e| FlowError::InvalidData(format!("bad updated_at: {e}")))?
616        .with_timezone(&Utc);
617
618    Ok(Flow {
619        id,
620        controller_id,
621        goal,
622        owner_session_key,
623        requester_origin,
624        current_step,
625        state_json,
626        wait_json,
627        status,
628        cancel_requested: cancel_requested_i != 0,
629        revision,
630        created_at,
631        updated_at,
632    })
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use serde_json::json;
639
640    fn sample_flow() -> Flow {
641        let now = Utc::now();
642        Flow {
643            id: Uuid::new_v4(),
644            controller_id: "kate/inbox-triage".into(),
645            goal: "triage inbox".into(),
646            owner_session_key: "agent:kate:session:abc".into(),
647            requester_origin: "user-1".into(),
648            current_step: "classify".into(),
649            state_json: json!({"messages": 10, "processed": 0}),
650            wait_json: None,
651            status: FlowStatus::Created,
652            cancel_requested: false,
653            revision: 0,
654            created_at: now,
655            updated_at: now,
656        }
657    }
658
659    async fn store() -> SqliteFlowStore {
660        SqliteFlowStore::open(":memory:").await.expect("open")
661    }
662
663    #[tokio::test]
664    async fn insert_then_get_round_trip() {
665        let s = store().await;
666        let flow = sample_flow();
667        s.insert(&flow).await.expect("insert");
668        let got = s.get(flow.id).await.expect("get").expect("found");
669        assert_eq!(got.id, flow.id);
670        assert_eq!(got.controller_id, "kate/inbox-triage");
671        assert_eq!(got.state_json, flow.state_json);
672        assert_eq!(got.status, FlowStatus::Created);
673        assert_eq!(got.revision, 0);
674        assert!(!got.cancel_requested);
675    }
676
677    #[tokio::test]
678    async fn list_by_owner_returns_only_matching() {
679        let s = store().await;
680        let mut a = sample_flow();
681        a.owner_session_key = "owner-A".into();
682        let mut b = sample_flow();
683        b.owner_session_key = "owner-B".into();
684        let mut a2 = sample_flow();
685        a2.owner_session_key = "owner-A".into();
686        s.insert(&a).await.unwrap();
687        s.insert(&b).await.unwrap();
688        s.insert(&a2).await.unwrap();
689        let owned = s.list_by_owner("owner-A").await.unwrap();
690        assert_eq!(owned.len(), 2);
691        assert!(owned.iter().all(|f| f.owner_session_key == "owner-A"));
692    }
693
694    #[tokio::test]
695    async fn update_with_correct_revision_succeeds_and_bumps() {
696        let s = store().await;
697        let flow = sample_flow();
698        s.insert(&flow).await.unwrap();
699
700        let mut updated = flow.clone();
701        updated.status = FlowStatus::Running;
702        updated.current_step = "fetch".into();
703        let result = s.update_with_revision(&updated).await.expect("update");
704        assert_eq!(result.revision, 1);
705        assert_eq!(result.status, FlowStatus::Running);
706        assert_eq!(result.current_step, "fetch");
707    }
708
709    #[tokio::test]
710    async fn update_with_stale_revision_returns_mismatch() {
711        let s = store().await;
712        let flow = sample_flow();
713        s.insert(&flow).await.unwrap();
714
715        // First update: 0 → 1
716        let mut first = flow.clone();
717        first.status = FlowStatus::Running;
718        s.update_with_revision(&first).await.unwrap();
719
720        // Second update with stale revision (still 0) should fail.
721        let mut stale = flow.clone();
722        stale.status = FlowStatus::Waiting;
723        let err = s.update_with_revision(&stale).await.expect_err("err");
724        match err {
725            FlowError::RevisionMismatch { expected, actual } => {
726                assert_eq!(expected, 0);
727                assert_eq!(actual, 1);
728            }
729            other => panic!("expected RevisionMismatch, got {other:?}"),
730        }
731    }
732
733    #[tokio::test]
734    async fn list_by_status_filters() {
735        let s = store().await;
736        let mut a = sample_flow();
737        a.status = FlowStatus::Waiting;
738        let mut b = sample_flow();
739        b.status = FlowStatus::Running;
740        s.insert(&a).await.unwrap();
741        s.insert(&b).await.unwrap();
742        let waiting = s.list_by_status(FlowStatus::Waiting).await.unwrap();
743        assert_eq!(waiting.len(), 1);
744        assert_eq!(waiting[0].id, a.id);
745    }
746
747    fn sample_step(flow_id: Uuid, run_id: &str) -> FlowStep {
748        let now = Utc::now();
749        FlowStep {
750            id: Uuid::new_v4(),
751            flow_id,
752            runtime: StepRuntime::Managed,
753            child_session_key: Some("child:session:1".into()),
754            run_id: run_id.into(),
755            task: "classify messages".into(),
756            status: FlowStepStatus::Pending,
757            result_json: None,
758            created_at: now,
759            updated_at: now,
760        }
761    }
762
763    #[tokio::test]
764    async fn insert_and_get_step() {
765        let s = store().await;
766        let flow = sample_flow();
767        s.insert(&flow).await.unwrap();
768        let step = sample_step(flow.id, "run-1");
769        s.insert_step(&step).await.unwrap();
770        let got = s.get_step(step.id).await.unwrap().expect("found");
771        assert_eq!(got.flow_id, flow.id);
772        assert_eq!(got.run_id, "run-1");
773        assert_eq!(got.runtime, StepRuntime::Managed);
774        assert_eq!(got.status, FlowStepStatus::Pending);
775    }
776
777    #[tokio::test]
778    async fn update_step_changes_status_and_result() {
779        let s = store().await;
780        let flow = sample_flow();
781        s.insert(&flow).await.unwrap();
782        let mut step = sample_step(flow.id, "run-1");
783        s.insert_step(&step).await.unwrap();
784        step.status = FlowStepStatus::Succeeded;
785        step.result_json = Some(json!({"count": 5}));
786        let updated = s.update_step(&step).await.unwrap();
787        assert_eq!(updated.status, FlowStepStatus::Succeeded);
788        assert_eq!(updated.result_json.unwrap()["count"], 5);
789    }
790
791    #[tokio::test]
792    async fn list_steps_filters_by_flow_id_and_orders_ascending() {
793        let s = store().await;
794        let a = sample_flow();
795        let b = sample_flow();
796        s.insert(&a).await.unwrap();
797        s.insert(&b).await.unwrap();
798        s.insert_step(&sample_step(a.id, "a-1")).await.unwrap();
799        s.insert_step(&sample_step(a.id, "a-2")).await.unwrap();
800        s.insert_step(&sample_step(b.id, "b-1")).await.unwrap();
801        let a_steps = s.list_steps(a.id).await.unwrap();
802        assert_eq!(a_steps.len(), 2);
803        assert_eq!(a_steps[0].run_id, "a-1"); // insertion order, ASC
804        assert_eq!(a_steps[1].run_id, "a-2");
805        let b_steps = s.list_steps(b.id).await.unwrap();
806        assert_eq!(b_steps.len(), 1);
807        assert_eq!(b_steps[0].run_id, "b-1");
808    }
809
810    #[tokio::test]
811    async fn find_step_by_run_id_scopes_to_flow() {
812        let s = store().await;
813        let a = sample_flow();
814        s.insert(&a).await.unwrap();
815        s.insert_step(&sample_step(a.id, "run-same")).await.unwrap();
816        let hit = s
817            .find_step_by_run_id(a.id, "run-same")
818            .await
819            .unwrap()
820            .expect("found");
821        assert_eq!(hit.flow_id, a.id);
822        let miss = s.find_step_by_run_id(a.id, "run-other").await.unwrap();
823        assert!(miss.is_none());
824    }
825
826    #[tokio::test]
827    async fn update_unknown_step_returns_not_found() {
828        let s = store().await;
829        let flow = sample_flow();
830        s.insert(&flow).await.unwrap();
831        let step = sample_step(flow.id, "ghost");
832        // Never inserted — update should fail.
833        let err = s.update_step(&step).await.expect_err("err");
834        assert!(matches!(err, FlowError::NotFound(_)));
835    }
836
837    #[tokio::test]
838    async fn append_and_list_events() {
839        let s = store().await;
840        let flow = sample_flow();
841        s.insert(&flow).await.unwrap();
842        s.append_event(flow.id, "created", json!({"goal": flow.goal}))
843            .await
844            .unwrap();
845        s.append_event(flow.id, "advanced", json!({"step": "fetch"}))
846            .await
847            .unwrap();
848        let events = s.list_events(flow.id, 10).await.unwrap();
849        assert_eq!(events.len(), 2);
850        // ORDER BY id DESC — most recent first.
851        assert_eq!(events[0].kind, "advanced");
852        assert_eq!(events[1].kind, "created");
853    }
854}