1use async_trait::async_trait;
2use chrono::Utc;
3use serde_json::Value;
4use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
5use sqlx::{Row, SqlitePool};
6use std::str::FromStr;
7use uuid::Uuid;
8
9use crate::types::{Flow, FlowError, FlowEvent, FlowStatus, FlowStep, FlowStepStatus, StepRuntime};
10
11const SCHEMA: &str = r#"
12CREATE TABLE IF NOT EXISTS flows (
13 id TEXT PRIMARY KEY,
14 controller_id TEXT NOT NULL,
15 goal TEXT NOT NULL,
16 owner_session_key TEXT NOT NULL,
17 requester_origin TEXT NOT NULL,
18 current_step TEXT NOT NULL,
19 state_json TEXT NOT NULL DEFAULT '{}',
20 wait_json TEXT,
21 status TEXT NOT NULL,
22 cancel_requested INTEGER NOT NULL DEFAULT 0,
23 revision INTEGER NOT NULL DEFAULT 0,
24 created_at TEXT NOT NULL,
25 updated_at TEXT NOT NULL
26);
27
28CREATE INDEX IF NOT EXISTS idx_flows_owner ON flows(owner_session_key);
29CREATE INDEX IF NOT EXISTS idx_flows_status ON flows(status);
30
31CREATE TABLE IF NOT EXISTS flow_steps (
32 id TEXT PRIMARY KEY,
33 flow_id TEXT NOT NULL REFERENCES flows(id) ON DELETE CASCADE,
34 runtime TEXT NOT NULL,
35 child_session_key TEXT,
36 run_id TEXT NOT NULL,
37 task TEXT NOT NULL,
38 status TEXT NOT NULL,
39 result_json TEXT,
40 created_at TEXT NOT NULL,
41 updated_at TEXT NOT NULL
42);
43
44CREATE INDEX IF NOT EXISTS idx_flow_steps_flow ON flow_steps(flow_id);
45
46-- Phase-14 follow-up: the engine looks up steps by `(flow_id, run_id)`
47-- on every observation event. Without the unique index, two
48-- concurrent observations could both see "no row" in
49-- `find_step_by_run_id` and each insert a fresh row — duplicate steps
50-- with the same run_id, followed by non-deterministic lookups.
51-- UNIQUE also fixes the perf issue (was O(n) per observation).
52CREATE UNIQUE INDEX IF NOT EXISTS idx_flow_steps_run
53 ON flow_steps(flow_id, run_id);
54
55CREATE TABLE IF NOT EXISTS flow_events (
56 id INTEGER PRIMARY KEY AUTOINCREMENT,
57 flow_id TEXT NOT NULL REFERENCES flows(id) ON DELETE CASCADE,
58 kind TEXT NOT NULL,
59 payload_json TEXT NOT NULL,
60 at TEXT NOT NULL
61);
62
63CREATE INDEX IF NOT EXISTS idx_flow_events_flow ON flow_events(flow_id);
64"#;
65
66#[async_trait]
71pub trait FlowStore: Send + Sync {
72 async fn insert(&self, flow: &Flow) -> Result<(), FlowError>;
73 async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError>;
74 async fn list_by_owner(&self, owner_session_key: &str) -> Result<Vec<Flow>, FlowError>;
75 async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError>;
76 async fn update_with_revision(&self, flow: &Flow) -> Result<Flow, FlowError>;
77 async fn append_event(
78 &self,
79 flow_id: Uuid,
80 kind: &str,
81 payload: Value,
82 ) -> Result<FlowEvent, FlowError>;
83 async fn list_events(&self, flow_id: Uuid, limit: i64) -> Result<Vec<FlowEvent>, FlowError>;
84
85 async fn insert_step(&self, step: &FlowStep) -> Result<(), FlowError>;
87 async fn update_step(&self, step: &FlowStep) -> Result<FlowStep, FlowError>;
88 async fn get_step(&self, id: Uuid) -> Result<Option<FlowStep>, FlowError>;
89 async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError>;
90 async fn find_step_by_run_id(
91 &self,
92 flow_id: Uuid,
93 run_id: &str,
94 ) -> Result<Option<FlowStep>, FlowError>;
95
96 async fn update_and_append(
105 &self,
106 flow: &Flow,
107 event_kind: &str,
108 event_payload: Value,
109 ) -> Result<(Flow, FlowEvent), FlowError> {
110 tracing::warn!(
111 flow_id = %flow.id,
112 "FlowStore::update_and_append using non-atomic fallback — implement a transaction-based override"
113 );
114 let updated = self.update_with_revision(flow).await?;
115 let event = self
116 .append_event(updated.id, event_kind, event_payload)
117 .await?;
118 Ok((updated, event))
119 }
120
121 async fn prune_terminal_flows(&self, _retain_days: i64) -> Result<u64, FlowError> {
128 Err(FlowError::InvalidData(
129 "prune_terminal_flows not implemented by this store".into(),
130 ))
131 }
132}
133
134#[derive(Clone)]
135pub struct SqliteFlowStore {
136 pool: SqlitePool,
137}
138
139impl SqliteFlowStore {
140 pub async fn open(path: &str) -> Result<Self, FlowError> {
142 let opts = SqliteConnectOptions::from_str(&format!("sqlite://{path}"))
143 .map_err(|e| FlowError::InvalidData(format!("bad sqlite url: {e}")))?
144 .create_if_missing(true)
145 .foreign_keys(true);
146 let pool = SqlitePoolOptions::new()
147 .max_connections(5)
148 .connect_with(opts)
149 .await?;
150 Self::with_pool(pool).await
151 }
152
153 pub async fn with_pool(pool: SqlitePool) -> Result<Self, FlowError> {
156 for stmt in SCHEMA.split(';') {
158 let trimmed = stmt.trim();
159 if trimmed.is_empty() {
160 continue;
161 }
162 sqlx::query(trimmed).execute(&pool).await?;
163 }
164 Ok(Self { pool })
165 }
166
167 pub fn pool(&self) -> &SqlitePool {
168 &self.pool
169 }
170}
171
172#[async_trait]
173impl FlowStore for SqliteFlowStore {
174 async fn insert(&self, flow: &Flow) -> Result<(), FlowError> {
175 let state_json = serde_json::to_string(&flow.state_json)
176 .map_err(|e| FlowError::InvalidData(e.to_string()))?;
177 let wait_json = match &flow.wait_json {
178 Some(v) => {
179 Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
180 }
181 None => None,
182 };
183 sqlx::query(
184 "INSERT INTO flows (id, controller_id, goal, owner_session_key, requester_origin, \
185 current_step, state_json, wait_json, status, cancel_requested, revision, created_at, updated_at) \
186 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
187 )
188 .bind(flow.id.to_string())
189 .bind(&flow.controller_id)
190 .bind(&flow.goal)
191 .bind(&flow.owner_session_key)
192 .bind(&flow.requester_origin)
193 .bind(&flow.current_step)
194 .bind(state_json)
195 .bind(wait_json)
196 .bind(flow.status.as_str())
197 .bind(flow.cancel_requested as i64)
198 .bind(flow.revision)
199 .bind(flow.created_at.to_rfc3339())
200 .bind(flow.updated_at.to_rfc3339())
201 .execute(&self.pool)
202 .await?;
203 Ok(())
204 }
205
206 async fn get(&self, id: Uuid) -> Result<Option<Flow>, FlowError> {
207 let row = sqlx::query("SELECT * FROM flows WHERE id = ?")
208 .bind(id.to_string())
209 .fetch_optional(&self.pool)
210 .await?;
211 row.map(row_to_flow).transpose()
212 }
213
214 async fn list_by_owner(&self, owner_session_key: &str) -> Result<Vec<Flow>, FlowError> {
215 let rows =
216 sqlx::query("SELECT * FROM flows WHERE owner_session_key = ? ORDER BY created_at DESC")
217 .bind(owner_session_key)
218 .fetch_all(&self.pool)
219 .await?;
220 rows.into_iter().map(row_to_flow).collect()
221 }
222
223 async fn list_by_status(&self, status: FlowStatus) -> Result<Vec<Flow>, FlowError> {
224 let rows = sqlx::query("SELECT * FROM flows WHERE status = ? ORDER BY updated_at ASC")
225 .bind(status.as_str())
226 .fetch_all(&self.pool)
227 .await?;
228 rows.into_iter().map(row_to_flow).collect()
229 }
230
231 async fn update_with_revision(&self, flow: &Flow) -> Result<Flow, FlowError> {
232 let state_json = serde_json::to_string(&flow.state_json)
233 .map_err(|e| FlowError::InvalidData(e.to_string()))?;
234 let wait_json = match &flow.wait_json {
235 Some(v) => {
236 Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
237 }
238 None => None,
239 };
240 let new_revision = flow.revision + 1;
241 let now = Utc::now();
242 let result = sqlx::query(
243 "UPDATE flows SET controller_id = ?, goal = ?, owner_session_key = ?, \
244 requester_origin = ?, current_step = ?, state_json = ?, wait_json = ?, \
245 status = ?, cancel_requested = ?, revision = ?, updated_at = ? \
246 WHERE id = ? AND revision = ?",
247 )
248 .bind(&flow.controller_id)
249 .bind(&flow.goal)
250 .bind(&flow.owner_session_key)
251 .bind(&flow.requester_origin)
252 .bind(&flow.current_step)
253 .bind(state_json)
254 .bind(wait_json)
255 .bind(flow.status.as_str())
256 .bind(flow.cancel_requested as i64)
257 .bind(new_revision)
258 .bind(now.to_rfc3339())
259 .bind(flow.id.to_string())
260 .bind(flow.revision)
261 .execute(&self.pool)
262 .await?;
263
264 if result.rows_affected() == 0 {
265 let actual = self.get(flow.id).await?;
267 return match actual {
268 None => Err(FlowError::NotFound(flow.id)),
269 Some(found) => Err(FlowError::RevisionMismatch {
270 expected: flow.revision,
271 actual: found.revision,
272 }),
273 };
274 }
275 self.get(flow.id).await?.ok_or(FlowError::NotFound(flow.id))
277 }
278
279 async fn append_event(
280 &self,
281 flow_id: Uuid,
282 kind: &str,
283 payload: Value,
284 ) -> Result<FlowEvent, FlowError> {
285 let payload_json =
286 serde_json::to_string(&payload).map_err(|e| FlowError::InvalidData(e.to_string()))?;
287 let now = Utc::now();
288 let result = sqlx::query(
289 "INSERT INTO flow_events (flow_id, kind, payload_json, at) VALUES (?, ?, ?, ?)",
290 )
291 .bind(flow_id.to_string())
292 .bind(kind)
293 .bind(payload_json)
294 .bind(now.to_rfc3339())
295 .execute(&self.pool)
296 .await?;
297 Ok(FlowEvent {
298 id: result.last_insert_rowid(),
299 flow_id,
300 kind: kind.to_string(),
301 payload_json: payload,
302 at: now,
303 })
304 }
305
306 async fn list_events(&self, flow_id: Uuid, limit: i64) -> Result<Vec<FlowEvent>, FlowError> {
307 let limit = limit.max(0);
312 let rows = sqlx::query(
313 "SELECT id, flow_id, kind, payload_json, at FROM flow_events \
314 WHERE flow_id = ? ORDER BY id DESC LIMIT ?",
315 )
316 .bind(flow_id.to_string())
317 .bind(limit)
318 .fetch_all(&self.pool)
319 .await?;
320 rows.into_iter()
321 .map(|row| {
322 let id: i64 = row.try_get("id")?;
323 let flow_id_s: String = row.try_get("flow_id")?;
324 let kind: String = row.try_get("kind")?;
325 let payload_s: String = row.try_get("payload_json")?;
326 let at_s: String = row.try_get("at")?;
327 let flow_id = Uuid::parse_str(&flow_id_s)
328 .map_err(|e| FlowError::InvalidData(format!("bad flow_id: {e}")))?;
329 let payload_json = serde_json::from_str(&payload_s)
330 .map_err(|e| FlowError::InvalidData(format!("bad event payload: {e}")))?;
331 let at = chrono::DateTime::parse_from_rfc3339(&at_s)
332 .map_err(|e| FlowError::InvalidData(format!("bad event ts: {e}")))?
333 .with_timezone(&Utc);
334 Ok(FlowEvent {
335 id,
336 flow_id,
337 kind,
338 payload_json,
339 at,
340 })
341 })
342 .collect()
343 }
344
345 async fn insert_step(&self, step: &FlowStep) -> Result<(), FlowError> {
346 let result_s = match &step.result_json {
347 Some(v) => {
348 Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
349 }
350 None => None,
351 };
352 sqlx::query(
353 "INSERT INTO flow_steps (id, flow_id, runtime, child_session_key, run_id, task, status, result_json, created_at, updated_at) \
354 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
355 )
356 .bind(step.id.to_string())
357 .bind(step.flow_id.to_string())
358 .bind(step.runtime.as_str())
359 .bind(step.child_session_key.as_deref())
360 .bind(&step.run_id)
361 .bind(&step.task)
362 .bind(step.status.as_str())
363 .bind(result_s)
364 .bind(step.created_at.to_rfc3339())
365 .bind(step.updated_at.to_rfc3339())
366 .execute(&self.pool)
367 .await?;
368 Ok(())
369 }
370
371 async fn update_step(&self, step: &FlowStep) -> Result<FlowStep, FlowError> {
372 let result_s = match &step.result_json {
373 Some(v) => {
374 Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
375 }
376 None => None,
377 };
378 let now = Utc::now();
379 let rows = sqlx::query(
380 "UPDATE flow_steps SET runtime = ?, child_session_key = ?, run_id = ?, task = ?, \
381 status = ?, result_json = ?, updated_at = ? WHERE id = ?",
382 )
383 .bind(step.runtime.as_str())
384 .bind(step.child_session_key.as_deref())
385 .bind(&step.run_id)
386 .bind(&step.task)
387 .bind(step.status.as_str())
388 .bind(result_s)
389 .bind(now.to_rfc3339())
390 .bind(step.id.to_string())
391 .execute(&self.pool)
392 .await?;
393 if rows.rows_affected() == 0 {
394 return Err(FlowError::NotFound(step.id));
395 }
396 self.get_step(step.id)
397 .await?
398 .ok_or(FlowError::NotFound(step.id))
399 }
400
401 async fn get_step(&self, id: Uuid) -> Result<Option<FlowStep>, FlowError> {
402 let row = sqlx::query("SELECT * FROM flow_steps WHERE id = ?")
403 .bind(id.to_string())
404 .fetch_optional(&self.pool)
405 .await?;
406 row.map(row_to_step).transpose()
407 }
408
409 async fn list_steps(&self, flow_id: Uuid) -> Result<Vec<FlowStep>, FlowError> {
410 let rows =
411 sqlx::query("SELECT * FROM flow_steps WHERE flow_id = ? ORDER BY created_at ASC")
412 .bind(flow_id.to_string())
413 .fetch_all(&self.pool)
414 .await?;
415 rows.into_iter().map(row_to_step).collect()
416 }
417
418 async fn find_step_by_run_id(
419 &self,
420 flow_id: Uuid,
421 run_id: &str,
422 ) -> Result<Option<FlowStep>, FlowError> {
423 let row = sqlx::query("SELECT * FROM flow_steps WHERE flow_id = ? AND run_id = ?")
424 .bind(flow_id.to_string())
425 .bind(run_id)
426 .fetch_optional(&self.pool)
427 .await?;
428 row.map(row_to_step).transpose()
429 }
430
431 async fn prune_terminal_flows(&self, retain_days: i64) -> Result<u64, FlowError> {
432 let cutoff = (Utc::now() - chrono::Duration::days(retain_days.max(0))).to_rfc3339();
436 let result = sqlx::query(
437 "DELETE FROM flows \
438 WHERE status IN ('finished','failed','cancelled') \
439 AND updated_at < ?",
440 )
441 .bind(&cutoff)
442 .execute(&self.pool)
443 .await?;
444 Ok(result.rows_affected())
445 }
446
447 async fn update_and_append(
452 &self,
453 flow: &Flow,
454 event_kind: &str,
455 event_payload: Value,
456 ) -> Result<(Flow, FlowEvent), FlowError> {
457 let state_json = serde_json::to_string(&flow.state_json)
458 .map_err(|e| FlowError::InvalidData(e.to_string()))?;
459 let wait_json = match &flow.wait_json {
460 Some(v) => {
461 Some(serde_json::to_string(v).map_err(|e| FlowError::InvalidData(e.to_string()))?)
462 }
463 None => None,
464 };
465 let payload_json = serde_json::to_string(&event_payload)
466 .map_err(|e| FlowError::InvalidData(e.to_string()))?;
467 let new_revision = flow.revision + 1;
468 let now = Utc::now();
469
470 let mut tx = self.pool.begin().await?;
471 let update_result = sqlx::query(
472 "UPDATE flows SET controller_id = ?, goal = ?, owner_session_key = ?, \
473 requester_origin = ?, current_step = ?, state_json = ?, wait_json = ?, \
474 status = ?, cancel_requested = ?, revision = ?, updated_at = ? \
475 WHERE id = ? AND revision = ?",
476 )
477 .bind(&flow.controller_id)
478 .bind(&flow.goal)
479 .bind(&flow.owner_session_key)
480 .bind(&flow.requester_origin)
481 .bind(&flow.current_step)
482 .bind(state_json)
483 .bind(wait_json)
484 .bind(flow.status.as_str())
485 .bind(flow.cancel_requested as i64)
486 .bind(new_revision)
487 .bind(now.to_rfc3339())
488 .bind(flow.id.to_string())
489 .bind(flow.revision)
490 .execute(&mut *tx)
491 .await?;
492
493 if update_result.rows_affected() == 0 {
494 tx.rollback().await?;
497 let actual = self.get(flow.id).await?;
498 return match actual {
499 None => Err(FlowError::NotFound(flow.id)),
500 Some(found) => Err(FlowError::RevisionMismatch {
501 expected: flow.revision,
502 actual: found.revision,
503 }),
504 };
505 }
506
507 let event_result = sqlx::query(
508 "INSERT INTO flow_events (flow_id, kind, payload_json, at) VALUES (?, ?, ?, ?)",
509 )
510 .bind(flow.id.to_string())
511 .bind(event_kind)
512 .bind(payload_json)
513 .bind(now.to_rfc3339())
514 .execute(&mut *tx)
515 .await?;
516 let event_id = event_result.last_insert_rowid();
517
518 let row = sqlx::query("SELECT * FROM flows WHERE id = ?")
520 .bind(flow.id.to_string())
521 .fetch_one(&mut *tx)
522 .await?;
523 let updated = row_to_flow(row)?;
524
525 tx.commit().await?;
526
527 let event = FlowEvent {
528 id: event_id,
529 flow_id: flow.id,
530 kind: event_kind.to_string(),
531 payload_json: event_payload,
532 at: now,
533 };
534 Ok((updated, event))
535 }
536}
537
538fn row_to_step(row: sqlx::sqlite::SqliteRow) -> Result<FlowStep, FlowError> {
539 let id_s: String = row.try_get("id")?;
540 let id = Uuid::parse_str(&id_s).map_err(|e| FlowError::InvalidData(format!("bad id: {e}")))?;
541 let flow_id_s: String = row.try_get("flow_id")?;
542 let flow_id = Uuid::parse_str(&flow_id_s)
543 .map_err(|e| FlowError::InvalidData(format!("bad flow_id: {e}")))?;
544 let runtime_s: String = row.try_get("runtime")?;
545 let runtime = StepRuntime::from_str(&runtime_s)
546 .ok_or_else(|| FlowError::InvalidData(format!("unknown runtime: {runtime_s}")))?;
547 let child_session_key: Option<String> = row.try_get("child_session_key")?;
548 let run_id: String = row.try_get("run_id")?;
549 let task: String = row.try_get("task")?;
550 let status_s: String = row.try_get("status")?;
551 let status = FlowStepStatus::from_str(&status_s)
552 .ok_or_else(|| FlowError::InvalidData(format!("unknown step status: {status_s}")))?;
553 let result_s: Option<String> = row.try_get("result_json")?;
554 let result_json = match result_s {
555 Some(s) => Some(
556 serde_json::from_str::<Value>(&s)
557 .map_err(|e| FlowError::InvalidData(format!("bad result_json: {e}")))?,
558 ),
559 None => None,
560 };
561 let created_at_s: String = row.try_get("created_at")?;
562 let updated_at_s: String = row.try_get("updated_at")?;
563 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_s)
564 .map_err(|e| FlowError::InvalidData(format!("bad created_at: {e}")))?
565 .with_timezone(&Utc);
566 let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_s)
567 .map_err(|e| FlowError::InvalidData(format!("bad updated_at: {e}")))?
568 .with_timezone(&Utc);
569
570 Ok(FlowStep {
571 id,
572 flow_id,
573 runtime,
574 child_session_key,
575 run_id,
576 task,
577 status,
578 result_json,
579 created_at,
580 updated_at,
581 })
582}
583
584fn row_to_flow(row: sqlx::sqlite::SqliteRow) -> Result<Flow, FlowError> {
585 let id_s: String = row.try_get("id")?;
586 let id = Uuid::parse_str(&id_s).map_err(|e| FlowError::InvalidData(format!("bad id: {e}")))?;
587 let controller_id: String = row.try_get("controller_id")?;
588 let goal: String = row.try_get("goal")?;
589 let owner_session_key: String = row.try_get("owner_session_key")?;
590 let requester_origin: String = row.try_get("requester_origin")?;
591 let current_step: String = row.try_get("current_step")?;
592 let state_json_s: String = row.try_get("state_json")?;
593 let wait_json_s: Option<String> = row.try_get("wait_json")?;
594 let status_s: String = row.try_get("status")?;
595 let cancel_requested_i: i64 = row.try_get("cancel_requested")?;
596 let revision: i64 = row.try_get("revision")?;
597 let created_at_s: String = row.try_get("created_at")?;
598 let updated_at_s: String = row.try_get("updated_at")?;
599
600 let state_json: Value = serde_json::from_str(&state_json_s)
601 .map_err(|e| FlowError::InvalidData(format!("bad state_json: {e}")))?;
602 let wait_json = match wait_json_s {
603 Some(s) => Some(
604 serde_json::from_str::<Value>(&s)
605 .map_err(|e| FlowError::InvalidData(format!("bad wait_json: {e}")))?,
606 ),
607 None => None,
608 };
609 let status = FlowStatus::from_str(&status_s)
610 .ok_or_else(|| FlowError::InvalidData(format!("unknown status: {status_s}")))?;
611 let created_at = chrono::DateTime::parse_from_rfc3339(&created_at_s)
612 .map_err(|e| FlowError::InvalidData(format!("bad created_at: {e}")))?
613 .with_timezone(&Utc);
614 let updated_at = chrono::DateTime::parse_from_rfc3339(&updated_at_s)
615 .map_err(|e| FlowError::InvalidData(format!("bad updated_at: {e}")))?
616 .with_timezone(&Utc);
617
618 Ok(Flow {
619 id,
620 controller_id,
621 goal,
622 owner_session_key,
623 requester_origin,
624 current_step,
625 state_json,
626 wait_json,
627 status,
628 cancel_requested: cancel_requested_i != 0,
629 revision,
630 created_at,
631 updated_at,
632 })
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638 use serde_json::json;
639
640 fn sample_flow() -> Flow {
641 let now = Utc::now();
642 Flow {
643 id: Uuid::new_v4(),
644 controller_id: "kate/inbox-triage".into(),
645 goal: "triage inbox".into(),
646 owner_session_key: "agent:kate:session:abc".into(),
647 requester_origin: "user-1".into(),
648 current_step: "classify".into(),
649 state_json: json!({"messages": 10, "processed": 0}),
650 wait_json: None,
651 status: FlowStatus::Created,
652 cancel_requested: false,
653 revision: 0,
654 created_at: now,
655 updated_at: now,
656 }
657 }
658
659 async fn store() -> SqliteFlowStore {
660 SqliteFlowStore::open(":memory:").await.expect("open")
661 }
662
663 #[tokio::test]
664 async fn insert_then_get_round_trip() {
665 let s = store().await;
666 let flow = sample_flow();
667 s.insert(&flow).await.expect("insert");
668 let got = s.get(flow.id).await.expect("get").expect("found");
669 assert_eq!(got.id, flow.id);
670 assert_eq!(got.controller_id, "kate/inbox-triage");
671 assert_eq!(got.state_json, flow.state_json);
672 assert_eq!(got.status, FlowStatus::Created);
673 assert_eq!(got.revision, 0);
674 assert!(!got.cancel_requested);
675 }
676
677 #[tokio::test]
678 async fn list_by_owner_returns_only_matching() {
679 let s = store().await;
680 let mut a = sample_flow();
681 a.owner_session_key = "owner-A".into();
682 let mut b = sample_flow();
683 b.owner_session_key = "owner-B".into();
684 let mut a2 = sample_flow();
685 a2.owner_session_key = "owner-A".into();
686 s.insert(&a).await.unwrap();
687 s.insert(&b).await.unwrap();
688 s.insert(&a2).await.unwrap();
689 let owned = s.list_by_owner("owner-A").await.unwrap();
690 assert_eq!(owned.len(), 2);
691 assert!(owned.iter().all(|f| f.owner_session_key == "owner-A"));
692 }
693
694 #[tokio::test]
695 async fn update_with_correct_revision_succeeds_and_bumps() {
696 let s = store().await;
697 let flow = sample_flow();
698 s.insert(&flow).await.unwrap();
699
700 let mut updated = flow.clone();
701 updated.status = FlowStatus::Running;
702 updated.current_step = "fetch".into();
703 let result = s.update_with_revision(&updated).await.expect("update");
704 assert_eq!(result.revision, 1);
705 assert_eq!(result.status, FlowStatus::Running);
706 assert_eq!(result.current_step, "fetch");
707 }
708
709 #[tokio::test]
710 async fn update_with_stale_revision_returns_mismatch() {
711 let s = store().await;
712 let flow = sample_flow();
713 s.insert(&flow).await.unwrap();
714
715 let mut first = flow.clone();
717 first.status = FlowStatus::Running;
718 s.update_with_revision(&first).await.unwrap();
719
720 let mut stale = flow.clone();
722 stale.status = FlowStatus::Waiting;
723 let err = s.update_with_revision(&stale).await.expect_err("err");
724 match err {
725 FlowError::RevisionMismatch { expected, actual } => {
726 assert_eq!(expected, 0);
727 assert_eq!(actual, 1);
728 }
729 other => panic!("expected RevisionMismatch, got {other:?}"),
730 }
731 }
732
733 #[tokio::test]
734 async fn list_by_status_filters() {
735 let s = store().await;
736 let mut a = sample_flow();
737 a.status = FlowStatus::Waiting;
738 let mut b = sample_flow();
739 b.status = FlowStatus::Running;
740 s.insert(&a).await.unwrap();
741 s.insert(&b).await.unwrap();
742 let waiting = s.list_by_status(FlowStatus::Waiting).await.unwrap();
743 assert_eq!(waiting.len(), 1);
744 assert_eq!(waiting[0].id, a.id);
745 }
746
747 fn sample_step(flow_id: Uuid, run_id: &str) -> FlowStep {
748 let now = Utc::now();
749 FlowStep {
750 id: Uuid::new_v4(),
751 flow_id,
752 runtime: StepRuntime::Managed,
753 child_session_key: Some("child:session:1".into()),
754 run_id: run_id.into(),
755 task: "classify messages".into(),
756 status: FlowStepStatus::Pending,
757 result_json: None,
758 created_at: now,
759 updated_at: now,
760 }
761 }
762
763 #[tokio::test]
764 async fn insert_and_get_step() {
765 let s = store().await;
766 let flow = sample_flow();
767 s.insert(&flow).await.unwrap();
768 let step = sample_step(flow.id, "run-1");
769 s.insert_step(&step).await.unwrap();
770 let got = s.get_step(step.id).await.unwrap().expect("found");
771 assert_eq!(got.flow_id, flow.id);
772 assert_eq!(got.run_id, "run-1");
773 assert_eq!(got.runtime, StepRuntime::Managed);
774 assert_eq!(got.status, FlowStepStatus::Pending);
775 }
776
777 #[tokio::test]
778 async fn update_step_changes_status_and_result() {
779 let s = store().await;
780 let flow = sample_flow();
781 s.insert(&flow).await.unwrap();
782 let mut step = sample_step(flow.id, "run-1");
783 s.insert_step(&step).await.unwrap();
784 step.status = FlowStepStatus::Succeeded;
785 step.result_json = Some(json!({"count": 5}));
786 let updated = s.update_step(&step).await.unwrap();
787 assert_eq!(updated.status, FlowStepStatus::Succeeded);
788 assert_eq!(updated.result_json.unwrap()["count"], 5);
789 }
790
791 #[tokio::test]
792 async fn list_steps_filters_by_flow_id_and_orders_ascending() {
793 let s = store().await;
794 let a = sample_flow();
795 let b = sample_flow();
796 s.insert(&a).await.unwrap();
797 s.insert(&b).await.unwrap();
798 s.insert_step(&sample_step(a.id, "a-1")).await.unwrap();
799 s.insert_step(&sample_step(a.id, "a-2")).await.unwrap();
800 s.insert_step(&sample_step(b.id, "b-1")).await.unwrap();
801 let a_steps = s.list_steps(a.id).await.unwrap();
802 assert_eq!(a_steps.len(), 2);
803 assert_eq!(a_steps[0].run_id, "a-1"); assert_eq!(a_steps[1].run_id, "a-2");
805 let b_steps = s.list_steps(b.id).await.unwrap();
806 assert_eq!(b_steps.len(), 1);
807 assert_eq!(b_steps[0].run_id, "b-1");
808 }
809
810 #[tokio::test]
811 async fn find_step_by_run_id_scopes_to_flow() {
812 let s = store().await;
813 let a = sample_flow();
814 s.insert(&a).await.unwrap();
815 s.insert_step(&sample_step(a.id, "run-same")).await.unwrap();
816 let hit = s
817 .find_step_by_run_id(a.id, "run-same")
818 .await
819 .unwrap()
820 .expect("found");
821 assert_eq!(hit.flow_id, a.id);
822 let miss = s.find_step_by_run_id(a.id, "run-other").await.unwrap();
823 assert!(miss.is_none());
824 }
825
826 #[tokio::test]
827 async fn update_unknown_step_returns_not_found() {
828 let s = store().await;
829 let flow = sample_flow();
830 s.insert(&flow).await.unwrap();
831 let step = sample_step(flow.id, "ghost");
832 let err = s.update_step(&step).await.expect_err("err");
834 assert!(matches!(err, FlowError::NotFound(_)));
835 }
836
837 #[tokio::test]
838 async fn append_and_list_events() {
839 let s = store().await;
840 let flow = sample_flow();
841 s.insert(&flow).await.unwrap();
842 s.append_event(flow.id, "created", json!({"goal": flow.goal}))
843 .await
844 .unwrap();
845 s.append_event(flow.id, "advanced", json!({"step": "fetch"}))
846 .await
847 .unwrap();
848 let events = s.list_events(flow.id, 10).await.unwrap();
849 assert_eq!(events.len(), 2);
850 assert_eq!(events[0].kind, "advanced");
852 assert_eq!(events[1].kind, "created");
853 }
854}