1use std::collections::BTreeMap;
2#[cfg(not(target_arch = "wasm32"))]
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8#[cfg(not(target_arch = "wasm32"))]
9use rusqlite::{Connection, ErrorCode, OptionalExtension, Transaction, params};
10
11use crate::WorkGraphError;
12use crate::WorkGraphMachine;
13use crate::types::{
14 WorkEdge, WorkGraphEvent, WorkGraphEventKind, WorkItem, WorkItemFilter, WorkItemId,
15 WorkNamespace,
16};
17
18#[cfg(target_arch = "wasm32")]
19use crate::tokio::sync::RwLock;
20#[cfg(not(target_arch = "wasm32"))]
21use tokio::sync::RwLock;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum WorkGraphStoreKind {
25 Disabled,
26 Memory,
27 Sqlite,
28 Custom,
29}
30
31impl WorkGraphStoreKind {
32 pub fn as_str(self) -> &'static str {
33 match self {
34 Self::Disabled => "disabled",
35 Self::Memory => "memory",
36 Self::Sqlite => "sqlite",
37 Self::Custom => "custom",
38 }
39 }
40}
41
42impl std::fmt::Display for WorkGraphStoreKind {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str(self.as_str())
45 }
46}
47
48#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
49pub struct WorkGraphEventFilter {
50 pub realm_id: Option<String>,
51 pub namespace: Option<WorkNamespace>,
52 #[serde(default)]
53 pub all_namespaces: bool,
54 pub after_seq: Option<i64>,
55 pub limit: Option<usize>,
56}
57
58#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
59#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
60pub trait WorkGraphStore: Send + Sync {
61 fn kind(&self) -> WorkGraphStoreKind;
62
63 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError>;
64
65 async fn insert_item(
66 &self,
67 item: WorkItem,
68 event: WorkGraphEvent,
69 ) -> Result<WorkItem, WorkGraphError>;
70
71 async fn update_item_cas(
72 &self,
73 item: WorkItem,
74 expected_previous_revision: u64,
75 event: WorkGraphEvent,
76 ) -> Result<WorkItem, WorkGraphError>;
77
78 async fn get_item(
79 &self,
80 realm_id: &str,
81 namespace: &WorkNamespace,
82 id: &WorkItemId,
83 ) -> Result<Option<WorkItem>, WorkGraphError>;
84
85 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError>;
86
87 async fn insert_edge(
88 &self,
89 edge: WorkEdge,
90 event: WorkGraphEvent,
91 ) -> Result<WorkEdge, WorkGraphError>;
92
93 async fn list_edges(
94 &self,
95 realm_id: &str,
96 namespace: &WorkNamespace,
97 ) -> Result<Vec<WorkEdge>, WorkGraphError>;
98
99 async fn list_events(
100 &self,
101 filter: WorkGraphEventFilter,
102 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError>;
103}
104
105#[derive(Default)]
106pub struct DisabledWorkGraphStore;
107
108#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
109#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
110impl WorkGraphStore for DisabledWorkGraphStore {
111 fn kind(&self) -> WorkGraphStoreKind {
112 WorkGraphStoreKind::Disabled
113 }
114
115 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
116 Err(unsupported(self.kind()))
117 }
118
119 async fn insert_item(
120 &self,
121 _item: WorkItem,
122 _event: WorkGraphEvent,
123 ) -> Result<WorkItem, WorkGraphError> {
124 Err(unsupported(self.kind()))
125 }
126
127 async fn update_item_cas(
128 &self,
129 _item: WorkItem,
130 _expected_previous_revision: u64,
131 _event: WorkGraphEvent,
132 ) -> Result<WorkItem, WorkGraphError> {
133 Err(unsupported(self.kind()))
134 }
135
136 async fn get_item(
137 &self,
138 _realm_id: &str,
139 _namespace: &WorkNamespace,
140 _id: &WorkItemId,
141 ) -> Result<Option<WorkItem>, WorkGraphError> {
142 Err(unsupported(self.kind()))
143 }
144
145 async fn list_items(&self, _filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
146 Err(unsupported(self.kind()))
147 }
148
149 async fn insert_edge(
150 &self,
151 _edge: WorkEdge,
152 _event: WorkGraphEvent,
153 ) -> Result<WorkEdge, WorkGraphError> {
154 Err(unsupported(self.kind()))
155 }
156
157 async fn list_edges(
158 &self,
159 _realm_id: &str,
160 _namespace: &WorkNamespace,
161 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
162 Err(unsupported(self.kind()))
163 }
164
165 async fn list_events(
166 &self,
167 _filter: WorkGraphEventFilter,
168 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
169 Err(unsupported(self.kind()))
170 }
171}
172
173fn unsupported(kind: WorkGraphStoreKind) -> WorkGraphError {
174 WorkGraphError::UnsupportedBackend(kind.to_string())
175}
176
177#[derive(Default)]
178pub struct MemoryWorkGraphStore {
179 inner: Arc<RwLock<MemoryWorkGraphState>>,
180}
181
182#[derive(Default)]
183struct MemoryWorkGraphState {
184 items: BTreeMap<(String, WorkNamespace, WorkItemId), WorkItem>,
185 edges: Vec<WorkEdge>,
186 events: Vec<WorkGraphEvent>,
187 next_event_seq: i64,
188}
189
190impl MemoryWorkGraphStore {
191 pub fn new() -> Self {
192 Self::default()
193 }
194}
195
196#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
197#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
198impl WorkGraphStore for MemoryWorkGraphStore {
199 fn kind(&self) -> WorkGraphStoreKind {
200 WorkGraphStoreKind::Memory
201 }
202
203 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
204 Ok(Utc::now())
205 }
206
207 async fn insert_item(
208 &self,
209 item: WorkItem,
210 event: WorkGraphEvent,
211 ) -> Result<WorkItem, WorkGraphError> {
212 WorkGraphMachine::validate_item_projection(&item)?;
213 let mut guard = self.inner.write().await;
214 let key = item_key(&item.realm_id, &item.namespace, &item.id);
215 if guard.items.contains_key(&key) {
216 return Err(WorkGraphError::Conflict(format!(
217 "work item {} already exists",
218 item.id
219 )));
220 }
221 guard.items.insert(key, item.clone());
222 guard.append_event(event);
223 Ok(item)
224 }
225
226 async fn update_item_cas(
227 &self,
228 item: WorkItem,
229 expected_previous_revision: u64,
230 event: WorkGraphEvent,
231 ) -> Result<WorkItem, WorkGraphError> {
232 WorkGraphMachine::validate_item_projection(&item)?;
233 let mut guard = self.inner.write().await;
234 let key = item_key(&item.realm_id, &item.namespace, &item.id);
235 let Some(current) = guard.items.get(&key) else {
236 return Err(WorkGraphError::not_found(
237 item.realm_id.clone(),
238 item.namespace.clone(),
239 item.id.clone(),
240 ));
241 };
242 if current.revision != expected_previous_revision {
243 return Err(WorkGraphError::StaleRevision {
244 id: item.id.clone(),
245 expected: expected_previous_revision,
246 actual: current.revision,
247 });
248 }
249 guard.items.insert(key, item.clone());
250 guard.append_event(event);
251 Ok(item)
252 }
253
254 async fn get_item(
255 &self,
256 realm_id: &str,
257 namespace: &WorkNamespace,
258 id: &WorkItemId,
259 ) -> Result<Option<WorkItem>, WorkGraphError> {
260 let guard = self.inner.read().await;
261 Ok(guard.items.get(&item_key(realm_id, namespace, id)).cloned())
262 }
263
264 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
265 let guard = self.inner.read().await;
266 let mut items = guard
267 .items
268 .values()
269 .filter(|item| item_matches_filter(item, &filter))
270 .cloned()
271 .collect::<Vec<_>>();
272 items.sort_by(|left, right| {
273 left.updated_at
274 .cmp(&right.updated_at)
275 .then_with(|| left.id.cmp(&right.id))
276 });
277 if let Some(limit) = filter.limit {
278 items.truncate(limit);
279 }
280 Ok(items)
281 }
282
283 async fn insert_edge(
284 &self,
285 edge: WorkEdge,
286 event: WorkGraphEvent,
287 ) -> Result<WorkEdge, WorkGraphError> {
288 let mut guard = self.inner.write().await;
289 if guard.edges.iter().any(|existing| existing == &edge) {
290 return Err(duplicate_edge_error(&edge));
291 }
292 guard.edges.push(edge.clone());
293 guard.append_event(event);
294 Ok(edge)
295 }
296
297 async fn list_edges(
298 &self,
299 realm_id: &str,
300 namespace: &WorkNamespace,
301 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
302 let guard = self.inner.read().await;
303 Ok(guard
304 .edges
305 .iter()
306 .filter(|edge| edge.realm_id == realm_id && edge.namespace == *namespace)
307 .cloned()
308 .collect())
309 }
310
311 async fn list_events(
312 &self,
313 filter: WorkGraphEventFilter,
314 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
315 let guard = self.inner.read().await;
316 let mut events = guard
317 .events
318 .iter()
319 .filter(|event| event_matches_filter(event, &filter))
320 .cloned()
321 .collect::<Vec<_>>();
322 events.sort_by_key(|event| event.seq.unwrap_or_default());
323 if let Some(limit) = filter.limit {
324 events.truncate(limit);
325 }
326 Ok(events)
327 }
328}
329
330impl MemoryWorkGraphState {
331 fn append_event(&mut self, mut event: WorkGraphEvent) {
332 self.next_event_seq += 1;
333 event.seq = Some(self.next_event_seq);
334 self.events.push(event);
335 }
336}
337
338fn item_key(
339 realm_id: &str,
340 namespace: &WorkNamespace,
341 id: &WorkItemId,
342) -> (String, WorkNamespace, WorkItemId) {
343 (realm_id.to_string(), namespace.clone(), id.clone())
344}
345
346fn item_matches_filter(item: &WorkItem, filter: &WorkItemFilter) -> bool {
347 if let Some(realm_id) = &filter.realm_id
348 && &item.realm_id != realm_id
349 {
350 return false;
351 }
352 if !filter.all_namespaces
353 && let Some(namespace) = &filter.namespace
354 && &item.namespace != namespace
355 {
356 return false;
357 }
358 if !filter.statuses.is_empty() && !filter.statuses.contains(&item.status) {
359 return false;
360 }
361 if !filter.include_terminal && item.status.is_terminal() {
362 return false;
363 }
364 filter
365 .labels
366 .iter()
367 .all(|label| item.labels.contains(label))
368}
369
370fn event_matches_filter(event: &WorkGraphEvent, filter: &WorkGraphEventFilter) -> bool {
371 if let Some(after_seq) = filter.after_seq
372 && event.seq.unwrap_or_default() <= after_seq
373 {
374 return false;
375 }
376 if let Some(realm_id) = &filter.realm_id
377 && &event.realm_id != realm_id
378 {
379 return false;
380 }
381 if !filter.all_namespaces
382 && let Some(namespace) = &filter.namespace
383 && &event.namespace != namespace
384 {
385 return false;
386 }
387 true
388}
389
390#[cfg(not(target_arch = "wasm32"))]
391pub struct SqliteWorkGraphStore {
392 path: PathBuf,
393}
394
395#[cfg(not(target_arch = "wasm32"))]
396impl SqliteWorkGraphStore {
397 pub fn open(path: impl Into<PathBuf>) -> Result<Self, WorkGraphError> {
398 let store = Self { path: path.into() };
399 store.with_connection(|_conn| Ok(()))?;
400 Ok(store)
401 }
402
403 pub fn path(&self) -> &Path {
404 &self.path
405 }
406
407 pub fn rebuild_projection_from_events(&self) -> Result<(), WorkGraphError> {
408 self.with_connection(|conn| {
409 let tx = conn
410 .transaction()
411 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
412 tx.execute("DELETE FROM workgraph_items", [])
413 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
414 tx.execute("DELETE FROM workgraph_edges", [])
415 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
416
417 let events = {
418 let mut stmt = tx
419 .prepare("SELECT event_json FROM workgraph_events ORDER BY seq ASC")
420 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
421 let rows = stmt
422 .query_map([], |row| row_json::<WorkGraphEvent>(row, 0))
423 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
424 let mut events = Vec::new();
425 for row in rows {
426 events.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
427 }
428 events
429 };
430
431 for event in events {
432 replay_event_tx(&tx, &event)?;
433 }
434 tx.commit()
435 .map_err(|err| WorkGraphError::Store(err.to_string()))
436 })
437 }
438
439 fn with_connection<T>(
440 &self,
441 f: impl FnOnce(&mut Connection) -> Result<T, WorkGraphError>,
442 ) -> Result<T, WorkGraphError> {
443 if let Some(parent) = self.path.parent() {
444 std::fs::create_dir_all(parent)
445 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
446 }
447 let mut conn =
448 Connection::open(&self.path).map_err(|err| WorkGraphError::Store(err.to_string()))?;
449 conn.pragma_update(None, "journal_mode", "WAL")
450 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
451 conn.pragma_update(None, "synchronous", "NORMAL")
452 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
453 init_sqlite_schema(&conn)?;
454 f(&mut conn)
455 }
456}
457
458#[cfg(not(target_arch = "wasm32"))]
459#[async_trait]
460impl WorkGraphStore for SqliteWorkGraphStore {
461 fn kind(&self) -> WorkGraphStoreKind {
462 WorkGraphStoreKind::Sqlite
463 }
464
465 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
466 Ok(Utc::now())
467 }
468
469 async fn insert_item(
470 &self,
471 item: WorkItem,
472 event: WorkGraphEvent,
473 ) -> Result<WorkItem, WorkGraphError> {
474 WorkGraphMachine::validate_item_projection(&item)?;
475 self.with_connection(|conn| {
476 let tx = conn
477 .transaction()
478 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
479 insert_item_tx(&tx, &item)?;
480 insert_event_tx(&tx, &event)?;
481 tx.commit()
482 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
483 Ok(item)
484 })
485 }
486
487 async fn update_item_cas(
488 &self,
489 item: WorkItem,
490 expected_previous_revision: u64,
491 event: WorkGraphEvent,
492 ) -> Result<WorkItem, WorkGraphError> {
493 WorkGraphMachine::validate_item_projection(&item)?;
494 self.with_connection(|conn| {
495 let tx = conn
496 .transaction()
497 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
498 let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
499 if changed == 0 {
500 let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
501 return match actual {
502 Some(actual) => Err(WorkGraphError::StaleRevision {
503 id: item.id,
504 expected: expected_previous_revision,
505 actual,
506 }),
507 None => Err(WorkGraphError::not_found(
508 item.realm_id,
509 item.namespace,
510 item.id,
511 )),
512 };
513 }
514 insert_event_tx(&tx, &event)?;
515 tx.commit()
516 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
517 Ok(item)
518 })
519 }
520
521 async fn get_item(
522 &self,
523 realm_id: &str,
524 namespace: &WorkNamespace,
525 id: &WorkItemId,
526 ) -> Result<Option<WorkItem>, WorkGraphError> {
527 self.with_connection(|conn| select_item(conn, realm_id, namespace, id))
528 }
529
530 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
531 self.with_connection(|conn| list_sqlite_items(conn, &filter))
532 }
533
534 async fn insert_edge(
535 &self,
536 edge: WorkEdge,
537 event: WorkGraphEvent,
538 ) -> Result<WorkEdge, WorkGraphError> {
539 self.with_connection(|conn| {
540 let tx = conn
541 .transaction()
542 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
543 insert_edge_tx(&tx, &edge)?;
544 insert_event_tx(&tx, &event)?;
545 tx.commit()
546 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
547 Ok(edge)
548 })
549 }
550
551 async fn list_edges(
552 &self,
553 realm_id: &str,
554 namespace: &WorkNamespace,
555 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
556 self.with_connection(|conn| list_sqlite_edges(conn, realm_id, namespace))
557 }
558
559 async fn list_events(
560 &self,
561 filter: WorkGraphEventFilter,
562 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
563 self.with_connection(|conn| list_sqlite_events(conn, &filter))
564 }
565}
566
567#[cfg(not(target_arch = "wasm32"))]
568fn init_sqlite_schema(conn: &Connection) -> Result<(), WorkGraphError> {
569 conn.execute_batch(
570 r"
571 CREATE TABLE IF NOT EXISTS workgraph_items (
572 realm_id TEXT NOT NULL,
573 namespace TEXT NOT NULL,
574 item_id TEXT NOT NULL,
575 revision INTEGER NOT NULL,
576 updated_at_utc TEXT NOT NULL,
577 item_json TEXT NOT NULL,
578 PRIMARY KEY (realm_id, namespace, item_id)
579 );
580 CREATE INDEX IF NOT EXISTS idx_workgraph_items_realm_namespace_updated
581 ON workgraph_items (realm_id, namespace, updated_at_utc);
582
583 CREATE TABLE IF NOT EXISTS workgraph_edges (
584 realm_id TEXT NOT NULL,
585 namespace TEXT NOT NULL,
586 edge_kind TEXT NOT NULL,
587 from_id TEXT NOT NULL,
588 to_id TEXT NOT NULL,
589 edge_json TEXT NOT NULL,
590 PRIMARY KEY (realm_id, namespace, edge_kind, from_id, to_id)
591 );
592
593 CREATE TABLE IF NOT EXISTS workgraph_events (
594 seq INTEGER PRIMARY KEY AUTOINCREMENT,
595 realm_id TEXT NOT NULL,
596 namespace TEXT NOT NULL,
597 item_id TEXT,
598 event_kind TEXT NOT NULL,
599 at_utc TEXT NOT NULL,
600 event_json TEXT NOT NULL
601 );
602 CREATE INDEX IF NOT EXISTS idx_workgraph_events_realm_namespace_seq
603 ON workgraph_events (realm_id, namespace, seq);
604 ",
605 )
606 .map_err(|err| WorkGraphError::Store(err.to_string()))
607}
608
609#[cfg(not(target_arch = "wasm32"))]
610fn insert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
611 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
612 tx.execute(
613 "INSERT INTO workgraph_items (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
614 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
615 params![
616 item.realm_id,
617 item.namespace.as_str(),
618 item.id.as_str(),
619 item.revision,
620 item.updated_at.to_rfc3339(),
621 json,
622 ],
623 )
624 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
625 Ok(())
626}
627
628#[cfg(not(target_arch = "wasm32"))]
629fn update_item_tx(
630 tx: &Transaction<'_>,
631 item: &WorkItem,
632 expected_previous_revision: u64,
633) -> Result<usize, WorkGraphError> {
634 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
635 tx.execute(
636 "UPDATE workgraph_items
637 SET revision = ?4, updated_at_utc = ?5, item_json = ?6
638 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3 AND revision = ?7",
639 params![
640 item.realm_id,
641 item.namespace.as_str(),
642 item.id.as_str(),
643 item.revision,
644 item.updated_at.to_rfc3339(),
645 json,
646 expected_previous_revision,
647 ],
648 )
649 .map_err(|err| WorkGraphError::Store(err.to_string()))
650}
651
652#[cfg(not(target_arch = "wasm32"))]
653fn upsert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
654 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
655 tx.execute(
656 "INSERT INTO workgraph_items
657 (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
658 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
659 ON CONFLICT(realm_id, namespace, item_id) DO UPDATE SET
660 revision = excluded.revision,
661 updated_at_utc = excluded.updated_at_utc,
662 item_json = excluded.item_json",
663 params![
664 item.realm_id,
665 item.namespace.as_str(),
666 item.id.as_str(),
667 item.revision,
668 item.updated_at.to_rfc3339(),
669 json,
670 ],
671 )
672 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
673 Ok(())
674}
675
676#[cfg(not(target_arch = "wasm32"))]
677fn current_revision_tx(
678 tx: &Transaction<'_>,
679 realm_id: &str,
680 namespace: &WorkNamespace,
681 id: &WorkItemId,
682) -> Result<Option<u64>, WorkGraphError> {
683 tx.query_row(
684 "SELECT revision FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
685 params![realm_id, namespace.as_str(), id.as_str()],
686 |row| row.get::<_, u64>(0),
687 )
688 .optional()
689 .map_err(|err| WorkGraphError::Store(err.to_string()))
690}
691
692#[cfg(not(target_arch = "wasm32"))]
693fn insert_edge_tx(tx: &Transaction<'_>, edge: &WorkEdge) -> Result<(), WorkGraphError> {
694 let json = serde_json::to_string(edge).map_err(|err| WorkGraphError::Store(err.to_string()))?;
695 tx.execute(
696 "INSERT INTO workgraph_edges
697 (realm_id, namespace, edge_kind, from_id, to_id, edge_json)
698 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
699 params![
700 edge.realm_id,
701 edge.namespace.as_str(),
702 format!("{:?}", edge.kind),
703 edge.from_id.as_str(),
704 edge.to_id.as_str(),
705 json,
706 ],
707 )
708 .map_err(|err| map_sqlite_insert_edge_error(err, edge))?;
709 Ok(())
710}
711
712fn duplicate_edge_error(edge: &WorkEdge) -> WorkGraphError {
713 WorkGraphError::Conflict(format!(
714 "work edge {:?} {} -> {} already exists",
715 edge.kind, edge.from_id, edge.to_id
716 ))
717}
718
719#[cfg(not(target_arch = "wasm32"))]
720fn map_sqlite_insert_edge_error(err: rusqlite::Error, edge: &WorkEdge) -> WorkGraphError {
721 match err {
722 rusqlite::Error::SqliteFailure(failure, _)
723 if failure.code == ErrorCode::ConstraintViolation =>
724 {
725 duplicate_edge_error(edge)
726 }
727 err => WorkGraphError::Store(err.to_string()),
728 }
729}
730
731#[cfg(not(target_arch = "wasm32"))]
732fn insert_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
733 let json =
734 serde_json::to_string(event).map_err(|err| WorkGraphError::Store(err.to_string()))?;
735 tx.execute(
736 "INSERT INTO workgraph_events
737 (realm_id, namespace, item_id, event_kind, at_utc, event_json)
738 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
739 params![
740 event.realm_id,
741 event.namespace.as_str(),
742 event.item_id.as_ref().map(WorkItemId::as_str),
743 format!("{:?}", event.kind),
744 event.at.to_rfc3339(),
745 json,
746 ],
747 )
748 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
749 Ok(())
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn select_item(
754 conn: &Connection,
755 realm_id: &str,
756 namespace: &WorkNamespace,
757 id: &WorkItemId,
758) -> Result<Option<WorkItem>, WorkGraphError> {
759 conn.query_row(
760 "SELECT item_json FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
761 params![realm_id, namespace.as_str(), id.as_str()],
762 |row| row_json(row, 0),
763 )
764 .optional()
765 .map_err(|err| WorkGraphError::Store(err.to_string()))
766}
767
768#[cfg(not(target_arch = "wasm32"))]
769fn list_sqlite_items(
770 conn: &Connection,
771 filter: &WorkItemFilter,
772) -> Result<Vec<WorkItem>, WorkGraphError> {
773 let mut stmt = conn
774 .prepare("SELECT item_json FROM workgraph_items ORDER BY updated_at_utc ASC, item_id ASC")
775 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
776 let rows = stmt
777 .query_map([], |row| row_json::<WorkItem>(row, 0))
778 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
779 let mut items = Vec::new();
780 for row in rows {
781 let item = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
782 if item_matches_filter(&item, filter) {
783 items.push(item);
784 if filter.limit.is_some_and(|limit| items.len() >= limit) {
785 break;
786 }
787 }
788 }
789 Ok(items)
790}
791
792#[cfg(not(target_arch = "wasm32"))]
793fn list_sqlite_edges(
794 conn: &Connection,
795 realm_id: &str,
796 namespace: &WorkNamespace,
797) -> Result<Vec<WorkEdge>, WorkGraphError> {
798 let mut stmt = conn
799 .prepare(
800 "SELECT edge_json FROM workgraph_edges
801 WHERE realm_id = ?1 AND namespace = ?2
802 ORDER BY edge_kind ASC, from_id ASC, to_id ASC",
803 )
804 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
805 let rows = stmt
806 .query_map(params![realm_id, namespace.as_str()], |row| {
807 row_json::<WorkEdge>(row, 0)
808 })
809 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
810 let mut edges = Vec::new();
811 for row in rows {
812 edges.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
813 }
814 Ok(edges)
815}
816
817#[cfg(not(target_arch = "wasm32"))]
818fn list_sqlite_events(
819 conn: &Connection,
820 filter: &WorkGraphEventFilter,
821) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
822 let mut stmt = conn
823 .prepare("SELECT seq, event_json FROM workgraph_events ORDER BY seq ASC")
824 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
825 let rows = stmt
826 .query_map([], |row| {
827 let seq = row.get::<_, i64>(0)?;
828 let mut event = row_json::<WorkGraphEvent>(row, 1)?;
829 event.seq = Some(seq);
830 Ok(event)
831 })
832 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
833 let mut events = Vec::new();
834 for row in rows {
835 let event = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
836 if event_matches_filter(&event, filter) {
837 events.push(event);
838 if filter.limit.is_some_and(|limit| events.len() >= limit) {
839 break;
840 }
841 }
842 }
843 Ok(events)
844}
845
846#[cfg(not(target_arch = "wasm32"))]
847fn replay_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
848 match event.kind {
849 WorkGraphEventKind::Linked => {
850 let edge = payload_field::<WorkEdge>(event, "edge")?;
851 insert_edge_tx(tx, &edge)
852 }
853 WorkGraphEventKind::Created
854 | WorkGraphEventKind::Updated
855 | WorkGraphEventKind::Claimed
856 | WorkGraphEventKind::Released
857 | WorkGraphEventKind::Blocked
858 | WorkGraphEventKind::Closed
859 | WorkGraphEventKind::EvidenceAdded => {
860 let item = payload_field::<WorkItem>(event, "item")?;
861 upsert_item_tx(tx, &item)
862 }
863 }
864}
865
866#[cfg(not(target_arch = "wasm32"))]
867fn payload_field<T: serde::de::DeserializeOwned>(
868 event: &WorkGraphEvent,
869 field: &str,
870) -> Result<T, WorkGraphError> {
871 let value = event.payload.get(field).ok_or_else(|| {
872 WorkGraphError::Store(format!(
873 "workgraph event {:?} missing payload field `{field}`",
874 event.kind
875 ))
876 })?;
877 serde_json::from_value(value.clone()).map_err(|err| WorkGraphError::Store(err.to_string()))
878}
879
880#[cfg(not(target_arch = "wasm32"))]
881fn row_json<T: serde::de::DeserializeOwned>(
882 row: &rusqlite::Row<'_>,
883 index: usize,
884) -> rusqlite::Result<T> {
885 let json = row.get::<_, String>(index)?;
886 serde_json::from_str(&json).map_err(|err| {
887 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(err))
888 })
889}
890
891#[cfg(test)]
892#[allow(clippy::expect_used, clippy::unwrap_used)]
893mod tests {
894 use std::collections::BTreeSet;
895
896 use chrono::Utc;
897 use serde_json::json;
898
899 use crate::types::WorkEdge;
900 use crate::{
901 ClaimWorkItemRequest, CreateWorkItemRequest, LinkWorkItemsRequest, MemoryWorkGraphStore,
902 WorkEdgeKind, WorkGraphError, WorkGraphEvent, WorkGraphEventFilter, WorkGraphEventKind,
903 WorkGraphService, WorkGraphStore, WorkItemFilter, WorkItemId, WorkNamespace, WorkOwner,
904 WorkOwnerKey, WorkStatus,
905 };
906
907 fn test_edge() -> WorkEdge {
908 WorkEdge {
909 realm_id: "realm".to_string(),
910 namespace: WorkNamespace::default(),
911 kind: WorkEdgeKind::Blocks,
912 from_id: WorkItemId::generated(),
913 to_id: WorkItemId::generated(),
914 created_at: Utc::now(),
915 }
916 }
917
918 fn link_event(edge: &WorkEdge) -> WorkGraphEvent {
919 WorkGraphEvent::graph(
920 edge.realm_id.clone(),
921 edge.namespace.clone(),
922 WorkGraphEventKind::Linked,
923 edge.created_at,
924 json!({ "edge": edge }),
925 )
926 }
927
928 #[tokio::test]
929 async fn memory_store_namespace_filters_do_not_leak() {
930 let store = std::sync::Arc::new(MemoryWorkGraphStore::new());
931 let default_service =
932 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
933 let other_service = WorkGraphService::with_scope(
934 store.clone(),
935 "realm",
936 WorkNamespace::new("other").expect("namespace"),
937 );
938 default_service
939 .create(CreateWorkItemRequest {
940 realm_id: None,
941 namespace: None,
942 title: "default".to_string(),
943 description: None,
944 priority: Default::default(),
945 labels: BTreeSet::new(),
946 due_at: None,
947 not_before: None,
948 snoozed_until: None,
949 external_refs: Vec::new(),
950 evidence_refs: Vec::new(),
951 status: None,
952 })
953 .await
954 .expect("create default");
955 other_service
956 .create(CreateWorkItemRequest {
957 realm_id: None,
958 namespace: None,
959 title: "other".to_string(),
960 description: None,
961 priority: Default::default(),
962 labels: BTreeSet::new(),
963 due_at: None,
964 not_before: None,
965 snoozed_until: None,
966 external_refs: Vec::new(),
967 evidence_refs: Vec::new(),
968 status: None,
969 })
970 .await
971 .expect("create other");
972
973 let items = store
974 .list_items(WorkItemFilter {
975 realm_id: Some("realm".to_string()),
976 namespace: Some(WorkNamespace::default()),
977 ..WorkItemFilter::default()
978 })
979 .await
980 .expect("list");
981 assert_eq!(items.len(), 1);
982 assert_eq!(items[0].title, "default");
983 }
984
985 #[tokio::test]
986 async fn memory_store_duplicate_edge_does_not_append_event() {
987 let store = MemoryWorkGraphStore::new();
988 let edge = test_edge();
989 store
990 .insert_edge(edge.clone(), link_event(&edge))
991 .await
992 .expect("insert edge");
993
994 let error = store
995 .insert_edge(edge.clone(), link_event(&edge))
996 .await
997 .expect_err("duplicate edge should fail");
998 assert!(matches!(error, WorkGraphError::Conflict(_)));
999
1000 let events = store
1001 .list_events(WorkGraphEventFilter {
1002 realm_id: Some(edge.realm_id),
1003 namespace: Some(edge.namespace),
1004 all_namespaces: false,
1005 after_seq: None,
1006 limit: None,
1007 })
1008 .await
1009 .expect("events");
1010 assert_eq!(events.len(), 1);
1011 }
1012
1013 #[cfg(not(target_arch = "wasm32"))]
1014 #[tokio::test]
1015 async fn sqlite_persistence_survives_restart() {
1016 let dir = tempfile::tempdir().expect("tempdir");
1017 let path = dir.path().join("workgraph.sqlite3");
1018 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1019 let service = WorkGraphService::with_scope(store, "realm", WorkNamespace::default());
1020 let item = service
1021 .create(CreateWorkItemRequest {
1022 realm_id: None,
1023 namespace: None,
1024 title: "persist me".to_string(),
1025 description: None,
1026 priority: Default::default(),
1027 labels: BTreeSet::new(),
1028 due_at: None,
1029 not_before: None,
1030 snoozed_until: None,
1031 external_refs: Vec::new(),
1032 evidence_refs: Vec::new(),
1033 status: None,
1034 })
1035 .await
1036 .expect("create");
1037
1038 let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1039 let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1040 let fetched = service.get(None, None, item.id.clone()).await.expect("get");
1041 assert_eq!(fetched.title, "persist me");
1042 }
1043
1044 #[cfg(not(target_arch = "wasm32"))]
1045 #[tokio::test]
1046 async fn sqlite_legacy_item_without_machine_state_backfills_on_write() {
1047 let dir = tempfile::tempdir().expect("tempdir");
1048 let path = dir.path().join("workgraph.sqlite3");
1049 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1050 let service =
1051 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1052 let item = service
1053 .create(CreateWorkItemRequest {
1054 realm_id: None,
1055 namespace: None,
1056 title: "legacy item".to_string(),
1057 description: None,
1058 priority: Default::default(),
1059 labels: BTreeSet::new(),
1060 due_at: None,
1061 not_before: None,
1062 snoozed_until: None,
1063 external_refs: Vec::new(),
1064 evidence_refs: Vec::new(),
1065 status: None,
1066 })
1067 .await
1068 .expect("create");
1069
1070 store
1071 .with_connection(|conn| {
1072 let json: String = conn
1073 .query_row(
1074 "SELECT item_json FROM workgraph_items
1075 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1076 rusqlite::params![
1077 &item.realm_id,
1078 item.namespace.as_str(),
1079 item.id.as_str()
1080 ],
1081 |row| row.get(0),
1082 )
1083 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1084 let mut value = serde_json::from_str::<serde_json::Value>(&json)
1085 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1086 value
1087 .as_object_mut()
1088 .expect("item json object")
1089 .remove("machine_state");
1090 conn.execute(
1091 "UPDATE workgraph_items
1092 SET item_json = ?4
1093 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1094 rusqlite::params![
1095 &item.realm_id,
1096 item.namespace.as_str(),
1097 item.id.as_str(),
1098 serde_json::to_string(&value)
1099 .map_err(|err| WorkGraphError::Store(err.to_string()))?
1100 ],
1101 )
1102 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1103 Ok(())
1104 })
1105 .expect("strip machine state");
1106
1107 let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1108 let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1109 let legacy = service.get(None, None, item.id).await.expect("get legacy");
1110 assert_eq!(legacy.machine_state.revision, legacy.revision);
1111 assert!(matches!(
1112 legacy.machine_state.lifecycle_phase,
1113 crate::machines::workgraph_lifecycle::WorkLifecycleState::Open
1114 ));
1115
1116 let claimed = service
1117 .claim(ClaimWorkItemRequest {
1118 id: legacy.id,
1119 realm_id: None,
1120 namespace: None,
1121 expected_revision: legacy.revision,
1122 owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner")),
1123 lease_seconds: Some(60),
1124 lease_expires_at: None,
1125 })
1126 .await
1127 .expect("claim legacy");
1128 assert_eq!(claimed.status, WorkStatus::InProgress);
1129 }
1130
1131 #[cfg(not(target_arch = "wasm32"))]
1132 #[tokio::test]
1133 async fn sqlite_event_replay_rebuilds_projection() {
1134 let dir = tempfile::tempdir().expect("tempdir");
1135 let path = dir.path().join("workgraph.sqlite3");
1136 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1137 let service =
1138 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1139 let blocker = service
1140 .create(CreateWorkItemRequest {
1141 realm_id: None,
1142 namespace: None,
1143 title: "blocker".to_string(),
1144 description: None,
1145 priority: Default::default(),
1146 labels: BTreeSet::new(),
1147 due_at: None,
1148 not_before: None,
1149 snoozed_until: None,
1150 external_refs: Vec::new(),
1151 evidence_refs: Vec::new(),
1152 status: None,
1153 })
1154 .await
1155 .expect("create blocker");
1156 let blocked = service
1157 .create(CreateWorkItemRequest {
1158 realm_id: None,
1159 namespace: None,
1160 title: "blocked".to_string(),
1161 description: None,
1162 priority: Default::default(),
1163 labels: BTreeSet::new(),
1164 due_at: None,
1165 not_before: None,
1166 snoozed_until: None,
1167 external_refs: Vec::new(),
1168 evidence_refs: Vec::new(),
1169 status: None,
1170 })
1171 .await
1172 .expect("create blocked");
1173 service
1174 .link(LinkWorkItemsRequest {
1175 realm_id: None,
1176 namespace: None,
1177 kind: WorkEdgeKind::Blocks,
1178 from_id: blocker.id.clone(),
1179 to_id: blocked.id.clone(),
1180 })
1181 .await
1182 .expect("link");
1183
1184 store
1185 .with_connection(|conn| {
1186 conn.execute("DELETE FROM workgraph_items", [])
1187 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1188 conn.execute("DELETE FROM workgraph_edges", [])
1189 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1190 Ok(())
1191 })
1192 .expect("clear projection");
1193
1194 let empty_items = store
1195 .list_items(WorkItemFilter {
1196 realm_id: Some("realm".to_string()),
1197 namespace: Some(WorkNamespace::default()),
1198 ..WorkItemFilter::default()
1199 })
1200 .await
1201 .expect("empty list");
1202 assert!(empty_items.is_empty());
1203
1204 store
1205 .rebuild_projection_from_events()
1206 .expect("rebuild projection");
1207
1208 let rebuilt_items = store
1209 .list_items(WorkItemFilter {
1210 realm_id: Some("realm".to_string()),
1211 namespace: Some(WorkNamespace::default()),
1212 ..WorkItemFilter::default()
1213 })
1214 .await
1215 .expect("rebuilt list");
1216 assert_eq!(rebuilt_items.len(), 2);
1217 let rebuilt_edges = store
1218 .list_edges("realm", &WorkNamespace::default())
1219 .await
1220 .expect("rebuilt edges");
1221 assert_eq!(rebuilt_edges.len(), 1);
1222 }
1223
1224 #[cfg(not(target_arch = "wasm32"))]
1225 #[tokio::test]
1226 async fn sqlite_store_duplicate_edge_does_not_append_event() {
1227 let dir = tempfile::tempdir().expect("tempdir");
1228 let path = dir.path().join("workgraph.sqlite3");
1229 let store = crate::SqliteWorkGraphStore::open(&path).expect("open");
1230 let edge = test_edge();
1231 store
1232 .insert_edge(edge.clone(), link_event(&edge))
1233 .await
1234 .expect("insert edge");
1235
1236 let error = store
1237 .insert_edge(edge.clone(), link_event(&edge))
1238 .await
1239 .expect_err("duplicate edge should fail");
1240 assert!(matches!(error, WorkGraphError::Conflict(_)));
1241
1242 let events = store
1243 .list_events(WorkGraphEventFilter {
1244 realm_id: Some(edge.realm_id),
1245 namespace: Some(edge.namespace),
1246 all_namespaces: false,
1247 after_seq: None,
1248 limit: None,
1249 })
1250 .await
1251 .expect("events");
1252 assert_eq!(events.len(), 1);
1253 }
1254}