Skip to main content

meerkat_workgraph/
store.rs

1use std::collections::BTreeMap;
2#[cfg(not(target_arch = "wasm32"))]
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8#[cfg(not(target_arch = "wasm32"))]
9use rusqlite::{Connection, ErrorCode, OptionalExtension, Transaction, params};
10
11use crate::WorkGraphError;
12use crate::WorkGraphMachine;
13use crate::types::{
14    WorkEdge, WorkGraphEvent, WorkGraphEventKind, WorkItem, WorkItemFilter, WorkItemId,
15    WorkNamespace,
16};
17
18#[cfg(target_arch = "wasm32")]
19use crate::tokio::sync::RwLock;
20#[cfg(not(target_arch = "wasm32"))]
21use tokio::sync::RwLock;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum WorkGraphStoreKind {
25    Disabled,
26    Memory,
27    Sqlite,
28    Custom,
29}
30
31impl WorkGraphStoreKind {
32    pub fn as_str(self) -> &'static str {
33        match self {
34            Self::Disabled => "disabled",
35            Self::Memory => "memory",
36            Self::Sqlite => "sqlite",
37            Self::Custom => "custom",
38        }
39    }
40}
41
42impl std::fmt::Display for WorkGraphStoreKind {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.write_str(self.as_str())
45    }
46}
47
48#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
49pub struct WorkGraphEventFilter {
50    pub realm_id: Option<String>,
51    pub namespace: Option<WorkNamespace>,
52    #[serde(default)]
53    pub all_namespaces: bool,
54    pub after_seq: Option<i64>,
55    pub limit: Option<usize>,
56}
57
58#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
59#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
60pub trait WorkGraphStore: Send + Sync {
61    fn kind(&self) -> WorkGraphStoreKind;
62
63    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError>;
64
65    async fn insert_item(
66        &self,
67        item: WorkItem,
68        event: WorkGraphEvent,
69    ) -> Result<WorkItem, WorkGraphError>;
70
71    async fn update_item_cas(
72        &self,
73        item: WorkItem,
74        expected_previous_revision: u64,
75        event: WorkGraphEvent,
76    ) -> Result<WorkItem, WorkGraphError>;
77
78    async fn get_item(
79        &self,
80        realm_id: &str,
81        namespace: &WorkNamespace,
82        id: &WorkItemId,
83    ) -> Result<Option<WorkItem>, WorkGraphError>;
84
85    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError>;
86
87    async fn insert_edge(
88        &self,
89        edge: WorkEdge,
90        event: WorkGraphEvent,
91    ) -> Result<WorkEdge, WorkGraphError>;
92
93    async fn list_edges(
94        &self,
95        realm_id: &str,
96        namespace: &WorkNamespace,
97    ) -> Result<Vec<WorkEdge>, WorkGraphError>;
98
99    async fn list_events(
100        &self,
101        filter: WorkGraphEventFilter,
102    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError>;
103}
104
105#[derive(Default)]
106pub struct DisabledWorkGraphStore;
107
108#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
109#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
110impl WorkGraphStore for DisabledWorkGraphStore {
111    fn kind(&self) -> WorkGraphStoreKind {
112        WorkGraphStoreKind::Disabled
113    }
114
115    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
116        Err(unsupported(self.kind()))
117    }
118
119    async fn insert_item(
120        &self,
121        _item: WorkItem,
122        _event: WorkGraphEvent,
123    ) -> Result<WorkItem, WorkGraphError> {
124        Err(unsupported(self.kind()))
125    }
126
127    async fn update_item_cas(
128        &self,
129        _item: WorkItem,
130        _expected_previous_revision: u64,
131        _event: WorkGraphEvent,
132    ) -> Result<WorkItem, WorkGraphError> {
133        Err(unsupported(self.kind()))
134    }
135
136    async fn get_item(
137        &self,
138        _realm_id: &str,
139        _namespace: &WorkNamespace,
140        _id: &WorkItemId,
141    ) -> Result<Option<WorkItem>, WorkGraphError> {
142        Err(unsupported(self.kind()))
143    }
144
145    async fn list_items(&self, _filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
146        Err(unsupported(self.kind()))
147    }
148
149    async fn insert_edge(
150        &self,
151        _edge: WorkEdge,
152        _event: WorkGraphEvent,
153    ) -> Result<WorkEdge, WorkGraphError> {
154        Err(unsupported(self.kind()))
155    }
156
157    async fn list_edges(
158        &self,
159        _realm_id: &str,
160        _namespace: &WorkNamespace,
161    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
162        Err(unsupported(self.kind()))
163    }
164
165    async fn list_events(
166        &self,
167        _filter: WorkGraphEventFilter,
168    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
169        Err(unsupported(self.kind()))
170    }
171}
172
173fn unsupported(kind: WorkGraphStoreKind) -> WorkGraphError {
174    WorkGraphError::UnsupportedBackend(kind.to_string())
175}
176
177#[derive(Default)]
178pub struct MemoryWorkGraphStore {
179    inner: Arc<RwLock<MemoryWorkGraphState>>,
180}
181
182#[derive(Default)]
183struct MemoryWorkGraphState {
184    items: BTreeMap<(String, WorkNamespace, WorkItemId), WorkItem>,
185    edges: Vec<WorkEdge>,
186    events: Vec<WorkGraphEvent>,
187    next_event_seq: i64,
188}
189
190impl MemoryWorkGraphStore {
191    pub fn new() -> Self {
192        Self::default()
193    }
194}
195
196#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
197#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
198impl WorkGraphStore for MemoryWorkGraphStore {
199    fn kind(&self) -> WorkGraphStoreKind {
200        WorkGraphStoreKind::Memory
201    }
202
203    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
204        Ok(Utc::now())
205    }
206
207    async fn insert_item(
208        &self,
209        item: WorkItem,
210        event: WorkGraphEvent,
211    ) -> Result<WorkItem, WorkGraphError> {
212        WorkGraphMachine::validate_item_projection(&item)?;
213        let mut guard = self.inner.write().await;
214        let key = item_key(&item.realm_id, &item.namespace, &item.id);
215        if guard.items.contains_key(&key) {
216            return Err(WorkGraphError::Conflict(format!(
217                "work item {} already exists",
218                item.id
219            )));
220        }
221        guard.items.insert(key, item.clone());
222        guard.append_event(event);
223        Ok(item)
224    }
225
226    async fn update_item_cas(
227        &self,
228        item: WorkItem,
229        expected_previous_revision: u64,
230        event: WorkGraphEvent,
231    ) -> Result<WorkItem, WorkGraphError> {
232        WorkGraphMachine::validate_item_projection(&item)?;
233        let mut guard = self.inner.write().await;
234        let key = item_key(&item.realm_id, &item.namespace, &item.id);
235        let Some(current) = guard.items.get(&key) else {
236            return Err(WorkGraphError::not_found(
237                item.realm_id.clone(),
238                item.namespace.clone(),
239                item.id.clone(),
240            ));
241        };
242        if current.revision != expected_previous_revision {
243            return Err(WorkGraphError::StaleRevision {
244                id: item.id.clone(),
245                expected: expected_previous_revision,
246                actual: current.revision,
247            });
248        }
249        guard.items.insert(key, item.clone());
250        guard.append_event(event);
251        Ok(item)
252    }
253
254    async fn get_item(
255        &self,
256        realm_id: &str,
257        namespace: &WorkNamespace,
258        id: &WorkItemId,
259    ) -> Result<Option<WorkItem>, WorkGraphError> {
260        let guard = self.inner.read().await;
261        Ok(guard.items.get(&item_key(realm_id, namespace, id)).cloned())
262    }
263
264    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
265        let guard = self.inner.read().await;
266        let mut items = guard
267            .items
268            .values()
269            .filter(|item| item_matches_filter(item, &filter))
270            .cloned()
271            .collect::<Vec<_>>();
272        items.sort_by(|left, right| {
273            left.updated_at
274                .cmp(&right.updated_at)
275                .then_with(|| left.id.cmp(&right.id))
276        });
277        if let Some(limit) = filter.limit {
278            items.truncate(limit);
279        }
280        Ok(items)
281    }
282
283    async fn insert_edge(
284        &self,
285        edge: WorkEdge,
286        event: WorkGraphEvent,
287    ) -> Result<WorkEdge, WorkGraphError> {
288        let mut guard = self.inner.write().await;
289        if guard.edges.iter().any(|existing| existing == &edge) {
290            return Err(duplicate_edge_error(&edge));
291        }
292        guard.edges.push(edge.clone());
293        guard.append_event(event);
294        Ok(edge)
295    }
296
297    async fn list_edges(
298        &self,
299        realm_id: &str,
300        namespace: &WorkNamespace,
301    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
302        let guard = self.inner.read().await;
303        Ok(guard
304            .edges
305            .iter()
306            .filter(|edge| edge.realm_id == realm_id && edge.namespace == *namespace)
307            .cloned()
308            .collect())
309    }
310
311    async fn list_events(
312        &self,
313        filter: WorkGraphEventFilter,
314    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
315        let guard = self.inner.read().await;
316        let mut events = guard
317            .events
318            .iter()
319            .filter(|event| event_matches_filter(event, &filter))
320            .cloned()
321            .collect::<Vec<_>>();
322        events.sort_by_key(|event| event.seq.unwrap_or_default());
323        if let Some(limit) = filter.limit {
324            events.truncate(limit);
325        }
326        Ok(events)
327    }
328}
329
330impl MemoryWorkGraphState {
331    fn append_event(&mut self, mut event: WorkGraphEvent) {
332        self.next_event_seq += 1;
333        event.seq = Some(self.next_event_seq);
334        self.events.push(event);
335    }
336}
337
338fn item_key(
339    realm_id: &str,
340    namespace: &WorkNamespace,
341    id: &WorkItemId,
342) -> (String, WorkNamespace, WorkItemId) {
343    (realm_id.to_string(), namespace.clone(), id.clone())
344}
345
346fn item_matches_filter(item: &WorkItem, filter: &WorkItemFilter) -> bool {
347    if let Some(realm_id) = &filter.realm_id
348        && &item.realm_id != realm_id
349    {
350        return false;
351    }
352    if !filter.all_namespaces
353        && let Some(namespace) = &filter.namespace
354        && &item.namespace != namespace
355    {
356        return false;
357    }
358    if !filter.statuses.is_empty() && !filter.statuses.contains(&item.status) {
359        return false;
360    }
361    if !filter.include_terminal && item.status.is_terminal() {
362        return false;
363    }
364    filter
365        .labels
366        .iter()
367        .all(|label| item.labels.contains(label))
368}
369
370fn event_matches_filter(event: &WorkGraphEvent, filter: &WorkGraphEventFilter) -> bool {
371    if let Some(after_seq) = filter.after_seq
372        && event.seq.unwrap_or_default() <= after_seq
373    {
374        return false;
375    }
376    if let Some(realm_id) = &filter.realm_id
377        && &event.realm_id != realm_id
378    {
379        return false;
380    }
381    if !filter.all_namespaces
382        && let Some(namespace) = &filter.namespace
383        && &event.namespace != namespace
384    {
385        return false;
386    }
387    true
388}
389
390#[cfg(not(target_arch = "wasm32"))]
391pub struct SqliteWorkGraphStore {
392    path: PathBuf,
393}
394
395#[cfg(not(target_arch = "wasm32"))]
396impl SqliteWorkGraphStore {
397    pub fn open(path: impl Into<PathBuf>) -> Result<Self, WorkGraphError> {
398        let store = Self { path: path.into() };
399        store.with_connection(|_conn| Ok(()))?;
400        Ok(store)
401    }
402
403    pub fn path(&self) -> &Path {
404        &self.path
405    }
406
407    pub fn rebuild_projection_from_events(&self) -> Result<(), WorkGraphError> {
408        self.with_connection(|conn| {
409            let tx = conn
410                .transaction()
411                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
412            tx.execute("DELETE FROM workgraph_items", [])
413                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
414            tx.execute("DELETE FROM workgraph_edges", [])
415                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
416
417            let events = {
418                let mut stmt = tx
419                    .prepare("SELECT event_json FROM workgraph_events ORDER BY seq ASC")
420                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
421                let rows = stmt
422                    .query_map([], |row| row_json::<WorkGraphEvent>(row, 0))
423                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
424                let mut events = Vec::new();
425                for row in rows {
426                    events.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
427                }
428                events
429            };
430
431            for event in events {
432                replay_event_tx(&tx, &event)?;
433            }
434            tx.commit()
435                .map_err(|err| WorkGraphError::Store(err.to_string()))
436        })
437    }
438
439    fn with_connection<T>(
440        &self,
441        f: impl FnOnce(&mut Connection) -> Result<T, WorkGraphError>,
442    ) -> Result<T, WorkGraphError> {
443        if let Some(parent) = self.path.parent() {
444            std::fs::create_dir_all(parent)
445                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
446        }
447        let mut conn =
448            Connection::open(&self.path).map_err(|err| WorkGraphError::Store(err.to_string()))?;
449        conn.pragma_update(None, "journal_mode", "WAL")
450            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
451        conn.pragma_update(None, "synchronous", "NORMAL")
452            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
453        init_sqlite_schema(&conn)?;
454        f(&mut conn)
455    }
456}
457
458#[cfg(not(target_arch = "wasm32"))]
459#[async_trait]
460impl WorkGraphStore for SqliteWorkGraphStore {
461    fn kind(&self) -> WorkGraphStoreKind {
462        WorkGraphStoreKind::Sqlite
463    }
464
465    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
466        Ok(Utc::now())
467    }
468
469    async fn insert_item(
470        &self,
471        item: WorkItem,
472        event: WorkGraphEvent,
473    ) -> Result<WorkItem, WorkGraphError> {
474        WorkGraphMachine::validate_item_projection(&item)?;
475        self.with_connection(|conn| {
476            let tx = conn
477                .transaction()
478                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
479            insert_item_tx(&tx, &item)?;
480            insert_event_tx(&tx, &event)?;
481            tx.commit()
482                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
483            Ok(item)
484        })
485    }
486
487    async fn update_item_cas(
488        &self,
489        item: WorkItem,
490        expected_previous_revision: u64,
491        event: WorkGraphEvent,
492    ) -> Result<WorkItem, WorkGraphError> {
493        WorkGraphMachine::validate_item_projection(&item)?;
494        self.with_connection(|conn| {
495            let tx = conn
496                .transaction()
497                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
498            let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
499            if changed == 0 {
500                let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
501                return match actual {
502                    Some(actual) => Err(WorkGraphError::StaleRevision {
503                        id: item.id,
504                        expected: expected_previous_revision,
505                        actual,
506                    }),
507                    None => Err(WorkGraphError::not_found(
508                        item.realm_id,
509                        item.namespace,
510                        item.id,
511                    )),
512                };
513            }
514            insert_event_tx(&tx, &event)?;
515            tx.commit()
516                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
517            Ok(item)
518        })
519    }
520
521    async fn get_item(
522        &self,
523        realm_id: &str,
524        namespace: &WorkNamespace,
525        id: &WorkItemId,
526    ) -> Result<Option<WorkItem>, WorkGraphError> {
527        self.with_connection(|conn| select_item(conn, realm_id, namespace, id))
528    }
529
530    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
531        self.with_connection(|conn| list_sqlite_items(conn, &filter))
532    }
533
534    async fn insert_edge(
535        &self,
536        edge: WorkEdge,
537        event: WorkGraphEvent,
538    ) -> Result<WorkEdge, WorkGraphError> {
539        self.with_connection(|conn| {
540            let tx = conn
541                .transaction()
542                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
543            insert_edge_tx(&tx, &edge)?;
544            insert_event_tx(&tx, &event)?;
545            tx.commit()
546                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
547            Ok(edge)
548        })
549    }
550
551    async fn list_edges(
552        &self,
553        realm_id: &str,
554        namespace: &WorkNamespace,
555    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
556        self.with_connection(|conn| list_sqlite_edges(conn, realm_id, namespace))
557    }
558
559    async fn list_events(
560        &self,
561        filter: WorkGraphEventFilter,
562    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
563        self.with_connection(|conn| list_sqlite_events(conn, &filter))
564    }
565}
566
567#[cfg(not(target_arch = "wasm32"))]
568fn init_sqlite_schema(conn: &Connection) -> Result<(), WorkGraphError> {
569    conn.execute_batch(
570        r"
571        CREATE TABLE IF NOT EXISTS workgraph_items (
572            realm_id TEXT NOT NULL,
573            namespace TEXT NOT NULL,
574            item_id TEXT NOT NULL,
575            revision INTEGER NOT NULL,
576            updated_at_utc TEXT NOT NULL,
577            item_json TEXT NOT NULL,
578            PRIMARY KEY (realm_id, namespace, item_id)
579        );
580        CREATE INDEX IF NOT EXISTS idx_workgraph_items_realm_namespace_updated
581            ON workgraph_items (realm_id, namespace, updated_at_utc);
582
583        CREATE TABLE IF NOT EXISTS workgraph_edges (
584            realm_id TEXT NOT NULL,
585            namespace TEXT NOT NULL,
586            edge_kind TEXT NOT NULL,
587            from_id TEXT NOT NULL,
588            to_id TEXT NOT NULL,
589            edge_json TEXT NOT NULL,
590            PRIMARY KEY (realm_id, namespace, edge_kind, from_id, to_id)
591        );
592
593        CREATE TABLE IF NOT EXISTS workgraph_events (
594            seq INTEGER PRIMARY KEY AUTOINCREMENT,
595            realm_id TEXT NOT NULL,
596            namespace TEXT NOT NULL,
597            item_id TEXT,
598            event_kind TEXT NOT NULL,
599            at_utc TEXT NOT NULL,
600            event_json TEXT NOT NULL
601        );
602        CREATE INDEX IF NOT EXISTS idx_workgraph_events_realm_namespace_seq
603            ON workgraph_events (realm_id, namespace, seq);
604        ",
605    )
606    .map_err(|err| WorkGraphError::Store(err.to_string()))
607}
608
609#[cfg(not(target_arch = "wasm32"))]
610fn insert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
611    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
612    tx.execute(
613        "INSERT INTO workgraph_items (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
614         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
615        params![
616            item.realm_id,
617            item.namespace.as_str(),
618            item.id.as_str(),
619            item.revision,
620            item.updated_at.to_rfc3339(),
621            json,
622        ],
623    )
624    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
625    Ok(())
626}
627
628#[cfg(not(target_arch = "wasm32"))]
629fn update_item_tx(
630    tx: &Transaction<'_>,
631    item: &WorkItem,
632    expected_previous_revision: u64,
633) -> Result<usize, WorkGraphError> {
634    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
635    tx.execute(
636        "UPDATE workgraph_items
637            SET revision = ?4, updated_at_utc = ?5, item_json = ?6
638          WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3 AND revision = ?7",
639        params![
640            item.realm_id,
641            item.namespace.as_str(),
642            item.id.as_str(),
643            item.revision,
644            item.updated_at.to_rfc3339(),
645            json,
646            expected_previous_revision,
647        ],
648    )
649    .map_err(|err| WorkGraphError::Store(err.to_string()))
650}
651
652#[cfg(not(target_arch = "wasm32"))]
653fn upsert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
654    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
655    tx.execute(
656        "INSERT INTO workgraph_items
657            (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
658         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
659         ON CONFLICT(realm_id, namespace, item_id) DO UPDATE SET
660            revision = excluded.revision,
661            updated_at_utc = excluded.updated_at_utc,
662            item_json = excluded.item_json",
663        params![
664            item.realm_id,
665            item.namespace.as_str(),
666            item.id.as_str(),
667            item.revision,
668            item.updated_at.to_rfc3339(),
669            json,
670        ],
671    )
672    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
673    Ok(())
674}
675
676#[cfg(not(target_arch = "wasm32"))]
677fn current_revision_tx(
678    tx: &Transaction<'_>,
679    realm_id: &str,
680    namespace: &WorkNamespace,
681    id: &WorkItemId,
682) -> Result<Option<u64>, WorkGraphError> {
683    tx.query_row(
684        "SELECT revision FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
685        params![realm_id, namespace.as_str(), id.as_str()],
686        |row| row.get::<_, u64>(0),
687    )
688    .optional()
689    .map_err(|err| WorkGraphError::Store(err.to_string()))
690}
691
692#[cfg(not(target_arch = "wasm32"))]
693fn insert_edge_tx(tx: &Transaction<'_>, edge: &WorkEdge) -> Result<(), WorkGraphError> {
694    let json = serde_json::to_string(edge).map_err(|err| WorkGraphError::Store(err.to_string()))?;
695    tx.execute(
696        "INSERT INTO workgraph_edges
697            (realm_id, namespace, edge_kind, from_id, to_id, edge_json)
698         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
699        params![
700            edge.realm_id,
701            edge.namespace.as_str(),
702            format!("{:?}", edge.kind),
703            edge.from_id.as_str(),
704            edge.to_id.as_str(),
705            json,
706        ],
707    )
708    .map_err(|err| map_sqlite_insert_edge_error(err, edge))?;
709    Ok(())
710}
711
712fn duplicate_edge_error(edge: &WorkEdge) -> WorkGraphError {
713    WorkGraphError::Conflict(format!(
714        "work edge {:?} {} -> {} already exists",
715        edge.kind, edge.from_id, edge.to_id
716    ))
717}
718
719#[cfg(not(target_arch = "wasm32"))]
720fn map_sqlite_insert_edge_error(err: rusqlite::Error, edge: &WorkEdge) -> WorkGraphError {
721    match err {
722        rusqlite::Error::SqliteFailure(failure, _)
723            if failure.code == ErrorCode::ConstraintViolation =>
724        {
725            duplicate_edge_error(edge)
726        }
727        err => WorkGraphError::Store(err.to_string()),
728    }
729}
730
731#[cfg(not(target_arch = "wasm32"))]
732fn insert_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
733    let json =
734        serde_json::to_string(event).map_err(|err| WorkGraphError::Store(err.to_string()))?;
735    tx.execute(
736        "INSERT INTO workgraph_events
737            (realm_id, namespace, item_id, event_kind, at_utc, event_json)
738         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
739        params![
740            event.realm_id,
741            event.namespace.as_str(),
742            event.item_id.as_ref().map(WorkItemId::as_str),
743            format!("{:?}", event.kind),
744            event.at.to_rfc3339(),
745            json,
746        ],
747    )
748    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
749    Ok(())
750}
751
752#[cfg(not(target_arch = "wasm32"))]
753fn select_item(
754    conn: &Connection,
755    realm_id: &str,
756    namespace: &WorkNamespace,
757    id: &WorkItemId,
758) -> Result<Option<WorkItem>, WorkGraphError> {
759    conn.query_row(
760        "SELECT item_json FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
761        params![realm_id, namespace.as_str(), id.as_str()],
762        |row| row_json(row, 0),
763    )
764    .optional()
765    .map_err(|err| WorkGraphError::Store(err.to_string()))
766}
767
768#[cfg(not(target_arch = "wasm32"))]
769fn list_sqlite_items(
770    conn: &Connection,
771    filter: &WorkItemFilter,
772) -> Result<Vec<WorkItem>, WorkGraphError> {
773    let mut stmt = conn
774        .prepare("SELECT item_json FROM workgraph_items ORDER BY updated_at_utc ASC, item_id ASC")
775        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
776    let rows = stmt
777        .query_map([], |row| row_json::<WorkItem>(row, 0))
778        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
779    let mut items = Vec::new();
780    for row in rows {
781        let item = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
782        if item_matches_filter(&item, filter) {
783            items.push(item);
784            if filter.limit.is_some_and(|limit| items.len() >= limit) {
785                break;
786            }
787        }
788    }
789    Ok(items)
790}
791
792#[cfg(not(target_arch = "wasm32"))]
793fn list_sqlite_edges(
794    conn: &Connection,
795    realm_id: &str,
796    namespace: &WorkNamespace,
797) -> Result<Vec<WorkEdge>, WorkGraphError> {
798    let mut stmt = conn
799        .prepare(
800            "SELECT edge_json FROM workgraph_edges
801             WHERE realm_id = ?1 AND namespace = ?2
802             ORDER BY edge_kind ASC, from_id ASC, to_id ASC",
803        )
804        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
805    let rows = stmt
806        .query_map(params![realm_id, namespace.as_str()], |row| {
807            row_json::<WorkEdge>(row, 0)
808        })
809        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
810    let mut edges = Vec::new();
811    for row in rows {
812        edges.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
813    }
814    Ok(edges)
815}
816
817#[cfg(not(target_arch = "wasm32"))]
818fn list_sqlite_events(
819    conn: &Connection,
820    filter: &WorkGraphEventFilter,
821) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
822    let mut stmt = conn
823        .prepare("SELECT seq, event_json FROM workgraph_events ORDER BY seq ASC")
824        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
825    let rows = stmt
826        .query_map([], |row| {
827            let seq = row.get::<_, i64>(0)?;
828            let mut event = row_json::<WorkGraphEvent>(row, 1)?;
829            event.seq = Some(seq);
830            Ok(event)
831        })
832        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
833    let mut events = Vec::new();
834    for row in rows {
835        let event = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
836        if event_matches_filter(&event, filter) {
837            events.push(event);
838            if filter.limit.is_some_and(|limit| events.len() >= limit) {
839                break;
840            }
841        }
842    }
843    Ok(events)
844}
845
846#[cfg(not(target_arch = "wasm32"))]
847fn replay_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
848    match event.kind {
849        WorkGraphEventKind::Linked => {
850            let edge = payload_field::<WorkEdge>(event, "edge")?;
851            insert_edge_tx(tx, &edge)
852        }
853        WorkGraphEventKind::Created
854        | WorkGraphEventKind::Updated
855        | WorkGraphEventKind::Claimed
856        | WorkGraphEventKind::Released
857        | WorkGraphEventKind::Blocked
858        | WorkGraphEventKind::Closed
859        | WorkGraphEventKind::EvidenceAdded => {
860            let item = payload_field::<WorkItem>(event, "item")?;
861            upsert_item_tx(tx, &item)
862        }
863    }
864}
865
866#[cfg(not(target_arch = "wasm32"))]
867fn payload_field<T: serde::de::DeserializeOwned>(
868    event: &WorkGraphEvent,
869    field: &str,
870) -> Result<T, WorkGraphError> {
871    let value = event.payload.get(field).ok_or_else(|| {
872        WorkGraphError::Store(format!(
873            "workgraph event {:?} missing payload field `{field}`",
874            event.kind
875        ))
876    })?;
877    serde_json::from_value(value.clone()).map_err(|err| WorkGraphError::Store(err.to_string()))
878}
879
880#[cfg(not(target_arch = "wasm32"))]
881fn row_json<T: serde::de::DeserializeOwned>(
882    row: &rusqlite::Row<'_>,
883    index: usize,
884) -> rusqlite::Result<T> {
885    let json = row.get::<_, String>(index)?;
886    serde_json::from_str(&json).map_err(|err| {
887        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(err))
888    })
889}
890
891#[cfg(test)]
892#[allow(clippy::expect_used, clippy::unwrap_used)]
893mod tests {
894    use std::collections::BTreeSet;
895
896    use chrono::Utc;
897    use serde_json::json;
898
899    use crate::types::WorkEdge;
900    use crate::{
901        ClaimWorkItemRequest, CreateWorkItemRequest, LinkWorkItemsRequest, MemoryWorkGraphStore,
902        WorkEdgeKind, WorkGraphError, WorkGraphEvent, WorkGraphEventFilter, WorkGraphEventKind,
903        WorkGraphService, WorkGraphStore, WorkItemFilter, WorkItemId, WorkNamespace, WorkOwner,
904        WorkOwnerKey, WorkStatus,
905    };
906
907    fn test_edge() -> WorkEdge {
908        WorkEdge {
909            realm_id: "realm".to_string(),
910            namespace: WorkNamespace::default(),
911            kind: WorkEdgeKind::Blocks,
912            from_id: WorkItemId::generated(),
913            to_id: WorkItemId::generated(),
914            created_at: Utc::now(),
915        }
916    }
917
918    fn link_event(edge: &WorkEdge) -> WorkGraphEvent {
919        WorkGraphEvent::graph(
920            edge.realm_id.clone(),
921            edge.namespace.clone(),
922            WorkGraphEventKind::Linked,
923            edge.created_at,
924            json!({ "edge": edge }),
925        )
926    }
927
928    #[tokio::test]
929    async fn memory_store_namespace_filters_do_not_leak() {
930        let store = std::sync::Arc::new(MemoryWorkGraphStore::new());
931        let default_service =
932            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
933        let other_service = WorkGraphService::with_scope(
934            store.clone(),
935            "realm",
936            WorkNamespace::new("other").expect("namespace"),
937        );
938        default_service
939            .create(CreateWorkItemRequest {
940                realm_id: None,
941                namespace: None,
942                title: "default".to_string(),
943                description: None,
944                priority: Default::default(),
945                labels: BTreeSet::new(),
946                due_at: None,
947                not_before: None,
948                snoozed_until: None,
949                external_refs: Vec::new(),
950                evidence_refs: Vec::new(),
951                status: None,
952            })
953            .await
954            .expect("create default");
955        other_service
956            .create(CreateWorkItemRequest {
957                realm_id: None,
958                namespace: None,
959                title: "other".to_string(),
960                description: None,
961                priority: Default::default(),
962                labels: BTreeSet::new(),
963                due_at: None,
964                not_before: None,
965                snoozed_until: None,
966                external_refs: Vec::new(),
967                evidence_refs: Vec::new(),
968                status: None,
969            })
970            .await
971            .expect("create other");
972
973        let items = store
974            .list_items(WorkItemFilter {
975                realm_id: Some("realm".to_string()),
976                namespace: Some(WorkNamespace::default()),
977                ..WorkItemFilter::default()
978            })
979            .await
980            .expect("list");
981        assert_eq!(items.len(), 1);
982        assert_eq!(items[0].title, "default");
983    }
984
985    #[tokio::test]
986    async fn memory_store_duplicate_edge_does_not_append_event() {
987        let store = MemoryWorkGraphStore::new();
988        let edge = test_edge();
989        store
990            .insert_edge(edge.clone(), link_event(&edge))
991            .await
992            .expect("insert edge");
993
994        let error = store
995            .insert_edge(edge.clone(), link_event(&edge))
996            .await
997            .expect_err("duplicate edge should fail");
998        assert!(matches!(error, WorkGraphError::Conflict(_)));
999
1000        let events = store
1001            .list_events(WorkGraphEventFilter {
1002                realm_id: Some(edge.realm_id),
1003                namespace: Some(edge.namespace),
1004                all_namespaces: false,
1005                after_seq: None,
1006                limit: None,
1007            })
1008            .await
1009            .expect("events");
1010        assert_eq!(events.len(), 1);
1011    }
1012
1013    #[cfg(not(target_arch = "wasm32"))]
1014    #[tokio::test]
1015    async fn sqlite_persistence_survives_restart() {
1016        let dir = tempfile::tempdir().expect("tempdir");
1017        let path = dir.path().join("workgraph.sqlite3");
1018        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1019        let service = WorkGraphService::with_scope(store, "realm", WorkNamespace::default());
1020        let item = service
1021            .create(CreateWorkItemRequest {
1022                realm_id: None,
1023                namespace: None,
1024                title: "persist me".to_string(),
1025                description: None,
1026                priority: Default::default(),
1027                labels: BTreeSet::new(),
1028                due_at: None,
1029                not_before: None,
1030                snoozed_until: None,
1031                external_refs: Vec::new(),
1032                evidence_refs: Vec::new(),
1033                status: None,
1034            })
1035            .await
1036            .expect("create");
1037
1038        let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1039        let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1040        let fetched = service.get(None, None, item.id.clone()).await.expect("get");
1041        assert_eq!(fetched.title, "persist me");
1042    }
1043
1044    #[cfg(not(target_arch = "wasm32"))]
1045    #[tokio::test]
1046    async fn sqlite_legacy_item_without_machine_state_backfills_on_write() {
1047        let dir = tempfile::tempdir().expect("tempdir");
1048        let path = dir.path().join("workgraph.sqlite3");
1049        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1050        let service =
1051            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1052        let item = service
1053            .create(CreateWorkItemRequest {
1054                realm_id: None,
1055                namespace: None,
1056                title: "legacy item".to_string(),
1057                description: None,
1058                priority: Default::default(),
1059                labels: BTreeSet::new(),
1060                due_at: None,
1061                not_before: None,
1062                snoozed_until: None,
1063                external_refs: Vec::new(),
1064                evidence_refs: Vec::new(),
1065                status: None,
1066            })
1067            .await
1068            .expect("create");
1069
1070        store
1071            .with_connection(|conn| {
1072                let json: String = conn
1073                    .query_row(
1074                        "SELECT item_json FROM workgraph_items
1075                         WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1076                        rusqlite::params![
1077                            &item.realm_id,
1078                            item.namespace.as_str(),
1079                            item.id.as_str()
1080                        ],
1081                        |row| row.get(0),
1082                    )
1083                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1084                let mut value = serde_json::from_str::<serde_json::Value>(&json)
1085                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1086                value
1087                    .as_object_mut()
1088                    .expect("item json object")
1089                    .remove("machine_state");
1090                conn.execute(
1091                    "UPDATE workgraph_items
1092                        SET item_json = ?4
1093                      WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1094                    rusqlite::params![
1095                        &item.realm_id,
1096                        item.namespace.as_str(),
1097                        item.id.as_str(),
1098                        serde_json::to_string(&value)
1099                            .map_err(|err| WorkGraphError::Store(err.to_string()))?
1100                    ],
1101                )
1102                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1103                Ok(())
1104            })
1105            .expect("strip machine state");
1106
1107        let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1108        let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1109        let legacy = service.get(None, None, item.id).await.expect("get legacy");
1110        assert_eq!(legacy.machine_state.revision, legacy.revision);
1111        assert!(matches!(
1112            legacy.machine_state.lifecycle_phase,
1113            crate::machines::workgraph_lifecycle::WorkLifecycleState::Open
1114        ));
1115
1116        let claimed = service
1117            .claim(ClaimWorkItemRequest {
1118                id: legacy.id,
1119                realm_id: None,
1120                namespace: None,
1121                expected_revision: legacy.revision,
1122                owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner")),
1123                lease_seconds: Some(60),
1124                lease_expires_at: None,
1125            })
1126            .await
1127            .expect("claim legacy");
1128        assert_eq!(claimed.status, WorkStatus::InProgress);
1129    }
1130
1131    #[cfg(not(target_arch = "wasm32"))]
1132    #[tokio::test]
1133    async fn sqlite_event_replay_rebuilds_projection() {
1134        let dir = tempfile::tempdir().expect("tempdir");
1135        let path = dir.path().join("workgraph.sqlite3");
1136        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1137        let service =
1138            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1139        let blocker = service
1140            .create(CreateWorkItemRequest {
1141                realm_id: None,
1142                namespace: None,
1143                title: "blocker".to_string(),
1144                description: None,
1145                priority: Default::default(),
1146                labels: BTreeSet::new(),
1147                due_at: None,
1148                not_before: None,
1149                snoozed_until: None,
1150                external_refs: Vec::new(),
1151                evidence_refs: Vec::new(),
1152                status: None,
1153            })
1154            .await
1155            .expect("create blocker");
1156        let blocked = service
1157            .create(CreateWorkItemRequest {
1158                realm_id: None,
1159                namespace: None,
1160                title: "blocked".to_string(),
1161                description: None,
1162                priority: Default::default(),
1163                labels: BTreeSet::new(),
1164                due_at: None,
1165                not_before: None,
1166                snoozed_until: None,
1167                external_refs: Vec::new(),
1168                evidence_refs: Vec::new(),
1169                status: None,
1170            })
1171            .await
1172            .expect("create blocked");
1173        service
1174            .link(LinkWorkItemsRequest {
1175                realm_id: None,
1176                namespace: None,
1177                kind: WorkEdgeKind::Blocks,
1178                from_id: blocker.id.clone(),
1179                to_id: blocked.id.clone(),
1180            })
1181            .await
1182            .expect("link");
1183
1184        store
1185            .with_connection(|conn| {
1186                conn.execute("DELETE FROM workgraph_items", [])
1187                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1188                conn.execute("DELETE FROM workgraph_edges", [])
1189                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1190                Ok(())
1191            })
1192            .expect("clear projection");
1193
1194        let empty_items = store
1195            .list_items(WorkItemFilter {
1196                realm_id: Some("realm".to_string()),
1197                namespace: Some(WorkNamespace::default()),
1198                ..WorkItemFilter::default()
1199            })
1200            .await
1201            .expect("empty list");
1202        assert!(empty_items.is_empty());
1203
1204        store
1205            .rebuild_projection_from_events()
1206            .expect("rebuild projection");
1207
1208        let rebuilt_items = store
1209            .list_items(WorkItemFilter {
1210                realm_id: Some("realm".to_string()),
1211                namespace: Some(WorkNamespace::default()),
1212                ..WorkItemFilter::default()
1213            })
1214            .await
1215            .expect("rebuilt list");
1216        assert_eq!(rebuilt_items.len(), 2);
1217        let rebuilt_edges = store
1218            .list_edges("realm", &WorkNamespace::default())
1219            .await
1220            .expect("rebuilt edges");
1221        assert_eq!(rebuilt_edges.len(), 1);
1222    }
1223
1224    #[cfg(not(target_arch = "wasm32"))]
1225    #[tokio::test]
1226    async fn sqlite_store_duplicate_edge_does_not_append_event() {
1227        let dir = tempfile::tempdir().expect("tempdir");
1228        let path = dir.path().join("workgraph.sqlite3");
1229        let store = crate::SqliteWorkGraphStore::open(&path).expect("open");
1230        let edge = test_edge();
1231        store
1232            .insert_edge(edge.clone(), link_event(&edge))
1233            .await
1234            .expect("insert edge");
1235
1236        let error = store
1237            .insert_edge(edge.clone(), link_event(&edge))
1238            .await
1239            .expect_err("duplicate edge should fail");
1240        assert!(matches!(error, WorkGraphError::Conflict(_)));
1241
1242        let events = store
1243            .list_events(WorkGraphEventFilter {
1244                realm_id: Some(edge.realm_id),
1245                namespace: Some(edge.namespace),
1246                all_namespaces: false,
1247                after_seq: None,
1248                limit: None,
1249            })
1250            .await
1251            .expect("events");
1252        assert_eq!(events.len(), 1);
1253    }
1254}