Skip to main content

meerkat_workgraph/
store.rs

1use std::collections::BTreeMap;
2#[cfg(not(target_arch = "wasm32"))]
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8#[cfg(not(target_arch = "wasm32"))]
9use rusqlite::{
10    Connection, ErrorCode, OptionalExtension, Transaction, TransactionBehavior, params,
11};
12
13use crate::WorkGraphError;
14use crate::types::{
15    AttentionListRequest, WorkAttentionBinding, WorkAttentionBindingId, WorkAttentionStatus,
16    WorkEdge, WorkGraphEvent, WorkGraphEventKind, WorkItem, WorkItemFilter, WorkItemId,
17    WorkNamespace,
18};
19use crate::{WorkAttentionMachine, WorkGraphMachine};
20
21#[cfg(target_arch = "wasm32")]
22use crate::tokio::sync::RwLock;
23#[cfg(not(target_arch = "wasm32"))]
24use tokio::sync::RwLock;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum WorkGraphStoreKind {
28    Disabled,
29    Memory,
30    Sqlite,
31    Custom,
32}
33
34impl WorkGraphStoreKind {
35    pub fn as_str(self) -> &'static str {
36        match self {
37            Self::Disabled => "disabled",
38            Self::Memory => "memory",
39            Self::Sqlite => "sqlite",
40            Self::Custom => "custom",
41        }
42    }
43}
44
45impl std::fmt::Display for WorkGraphStoreKind {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.write_str(self.as_str())
48    }
49}
50
51#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
52#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
53pub struct WorkGraphEventFilter {
54    pub realm_id: Option<String>,
55    pub namespace: Option<WorkNamespace>,
56    #[serde(default)]
57    pub all_namespaces: bool,
58    pub after_seq: Option<i64>,
59    pub limit: Option<usize>,
60}
61
62#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
63#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
64pub trait WorkGraphStore: Send + Sync {
65    fn kind(&self) -> WorkGraphStoreKind;
66
67    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError>;
68
69    async fn insert_item(
70        &self,
71        item: WorkItem,
72        event: WorkGraphEvent,
73    ) -> Result<WorkItem, WorkGraphError>;
74
75    async fn update_item_cas(
76        &self,
77        item: WorkItem,
78        expected_previous_revision: u64,
79        event: WorkGraphEvent,
80    ) -> Result<WorkItem, WorkGraphError>;
81
82    async fn update_item_and_attention_cas(
83        &self,
84        item: WorkItem,
85        expected_previous_revision: u64,
86        item_event: WorkGraphEvent,
87        attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
88    ) -> Result<WorkItem, WorkGraphError>;
89
90    async fn get_item(
91        &self,
92        realm_id: &str,
93        namespace: &WorkNamespace,
94        id: &WorkItemId,
95    ) -> Result<Option<WorkItem>, WorkGraphError>;
96
97    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError>;
98
99    async fn insert_goal(
100        &self,
101        _item: WorkItem,
102        _item_event: WorkGraphEvent,
103        _attention: WorkAttentionBinding,
104        _attention_event: WorkGraphEvent,
105    ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
106        Err(unsupported(self.kind()))
107    }
108
109    async fn update_attention_cas(
110        &self,
111        _attention: WorkAttentionBinding,
112        _expected_previous_revision: u64,
113        _event: WorkGraphEvent,
114    ) -> Result<WorkAttentionBinding, WorkGraphError> {
115        Err(unsupported(self.kind()))
116    }
117
118    async fn get_attention(
119        &self,
120        _realm_id: &str,
121        _namespace: &WorkNamespace,
122        _binding_id: &WorkAttentionBindingId,
123    ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
124        Err(unsupported(self.kind()))
125    }
126
127    async fn list_attention(
128        &self,
129        _filter: AttentionListRequest,
130    ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
131        Err(unsupported(self.kind()))
132    }
133
134    async fn insert_edge(
135        &self,
136        edge: WorkEdge,
137        event: WorkGraphEvent,
138    ) -> Result<WorkEdge, WorkGraphError>;
139
140    async fn insert_edge_validated(
141        &self,
142        _edge: WorkEdge,
143        _event: WorkGraphEvent,
144    ) -> Result<WorkEdge, WorkGraphError> {
145        Err(unsupported(self.kind()))
146    }
147
148    async fn list_edges(
149        &self,
150        realm_id: &str,
151        namespace: &WorkNamespace,
152    ) -> Result<Vec<WorkEdge>, WorkGraphError>;
153
154    async fn list_events(
155        &self,
156        filter: WorkGraphEventFilter,
157    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError>;
158}
159
160#[derive(Default)]
161pub struct DisabledWorkGraphStore;
162
163#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
164#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
165impl WorkGraphStore for DisabledWorkGraphStore {
166    fn kind(&self) -> WorkGraphStoreKind {
167        WorkGraphStoreKind::Disabled
168    }
169
170    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
171        Err(unsupported(self.kind()))
172    }
173
174    async fn insert_item(
175        &self,
176        _item: WorkItem,
177        _event: WorkGraphEvent,
178    ) -> Result<WorkItem, WorkGraphError> {
179        Err(unsupported(self.kind()))
180    }
181
182    async fn update_item_cas(
183        &self,
184        _item: WorkItem,
185        _expected_previous_revision: u64,
186        _event: WorkGraphEvent,
187    ) -> Result<WorkItem, WorkGraphError> {
188        Err(unsupported(self.kind()))
189    }
190
191    async fn update_item_and_attention_cas(
192        &self,
193        _item: WorkItem,
194        _expected_previous_revision: u64,
195        _item_event: WorkGraphEvent,
196        _attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
197    ) -> Result<WorkItem, WorkGraphError> {
198        Err(unsupported(self.kind()))
199    }
200
201    async fn get_item(
202        &self,
203        _realm_id: &str,
204        _namespace: &WorkNamespace,
205        _id: &WorkItemId,
206    ) -> Result<Option<WorkItem>, WorkGraphError> {
207        Err(unsupported(self.kind()))
208    }
209
210    async fn list_items(&self, _filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
211        Err(unsupported(self.kind()))
212    }
213
214    async fn insert_goal(
215        &self,
216        _item: WorkItem,
217        _item_event: WorkGraphEvent,
218        _attention: WorkAttentionBinding,
219        _attention_event: WorkGraphEvent,
220    ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
221        Err(unsupported(self.kind()))
222    }
223
224    async fn update_attention_cas(
225        &self,
226        _attention: WorkAttentionBinding,
227        _expected_previous_revision: u64,
228        _event: WorkGraphEvent,
229    ) -> Result<WorkAttentionBinding, WorkGraphError> {
230        Err(unsupported(self.kind()))
231    }
232
233    async fn get_attention(
234        &self,
235        _realm_id: &str,
236        _namespace: &WorkNamespace,
237        _binding_id: &WorkAttentionBindingId,
238    ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
239        Err(unsupported(self.kind()))
240    }
241
242    async fn list_attention(
243        &self,
244        _filter: AttentionListRequest,
245    ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
246        Err(unsupported(self.kind()))
247    }
248
249    async fn insert_edge(
250        &self,
251        _edge: WorkEdge,
252        _event: WorkGraphEvent,
253    ) -> Result<WorkEdge, WorkGraphError> {
254        Err(unsupported(self.kind()))
255    }
256
257    async fn insert_edge_validated(
258        &self,
259        _edge: WorkEdge,
260        _event: WorkGraphEvent,
261    ) -> Result<WorkEdge, WorkGraphError> {
262        Err(unsupported(self.kind()))
263    }
264
265    async fn list_edges(
266        &self,
267        _realm_id: &str,
268        _namespace: &WorkNamespace,
269    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
270        Err(unsupported(self.kind()))
271    }
272
273    async fn list_events(
274        &self,
275        _filter: WorkGraphEventFilter,
276    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
277        Err(unsupported(self.kind()))
278    }
279}
280
281fn unsupported(kind: WorkGraphStoreKind) -> WorkGraphError {
282    WorkGraphError::UnsupportedBackend(kind.to_string())
283}
284
285#[derive(Default)]
286pub struct MemoryWorkGraphStore {
287    inner: Arc<RwLock<MemoryWorkGraphState>>,
288}
289
290#[derive(Default)]
291struct MemoryWorkGraphState {
292    items: BTreeMap<(String, WorkNamespace, WorkItemId), WorkItem>,
293    attention: BTreeMap<(String, WorkNamespace, WorkAttentionBindingId), WorkAttentionBinding>,
294    edges: Vec<WorkEdge>,
295    events: Vec<WorkGraphEvent>,
296    next_event_seq: i64,
297}
298
299impl MemoryWorkGraphStore {
300    pub fn new() -> Self {
301        Self::default()
302    }
303}
304
305#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
306#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
307impl WorkGraphStore for MemoryWorkGraphStore {
308    fn kind(&self) -> WorkGraphStoreKind {
309        WorkGraphStoreKind::Memory
310    }
311
312    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
313        Ok(Utc::now())
314    }
315
316    async fn insert_item(
317        &self,
318        item: WorkItem,
319        event: WorkGraphEvent,
320    ) -> Result<WorkItem, WorkGraphError> {
321        WorkGraphMachine::validate_item_projection(&item)?;
322        let mut guard = self.inner.write().await;
323        let key = item_key(&item.realm_id, &item.namespace, &item.id);
324        if guard.items.contains_key(&key) {
325            return Err(WorkGraphError::Conflict(format!(
326                "work item {} already exists",
327                item.id
328            )));
329        }
330        guard.items.insert(key, item.clone());
331        guard.append_event(event);
332        Ok(item)
333    }
334
335    async fn update_item_cas(
336        &self,
337        item: WorkItem,
338        expected_previous_revision: u64,
339        event: WorkGraphEvent,
340    ) -> Result<WorkItem, WorkGraphError> {
341        WorkGraphMachine::validate_item_projection(&item)?;
342        let mut guard = self.inner.write().await;
343        let key = item_key(&item.realm_id, &item.namespace, &item.id);
344        let Some(current) = guard.items.get(&key) else {
345            return Err(WorkGraphError::not_found(
346                item.realm_id.clone(),
347                item.namespace.clone(),
348                item.id.clone(),
349            ));
350        };
351        if current.revision != expected_previous_revision {
352            return Err(WorkGraphError::StaleRevision {
353                id: item.id.clone(),
354                expected: expected_previous_revision,
355                actual: current.revision,
356            });
357        }
358        guard.items.insert(key, item.clone());
359        guard.append_event(event);
360        Ok(item)
361    }
362
363    async fn get_item(
364        &self,
365        realm_id: &str,
366        namespace: &WorkNamespace,
367        id: &WorkItemId,
368    ) -> Result<Option<WorkItem>, WorkGraphError> {
369        let guard = self.inner.read().await;
370        Ok(guard.items.get(&item_key(realm_id, namespace, id)).cloned())
371    }
372
373    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
374        let guard = self.inner.read().await;
375        let mut items = guard
376            .items
377            .values()
378            .filter(|item| item_matches_filter(item, &filter))
379            .cloned()
380            .collect::<Vec<_>>();
381        items.sort_by(|left, right| {
382            left.updated_at
383                .cmp(&right.updated_at)
384                .then_with(|| left.id.cmp(&right.id))
385        });
386        if let Some(limit) = filter.limit {
387            items.truncate(limit);
388        }
389        Ok(items)
390    }
391
392    async fn insert_goal(
393        &self,
394        item: WorkItem,
395        item_event: WorkGraphEvent,
396        attention: WorkAttentionBinding,
397        attention_event: WorkGraphEvent,
398    ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
399        WorkGraphMachine::validate_item_projection(&item)?;
400        let mut guard = self.inner.write().await;
401        let item_key = item_key(&item.realm_id, &item.namespace, &item.id);
402        if guard.items.contains_key(&item_key) {
403            return Err(WorkGraphError::Conflict(format!(
404                "work item {} already exists",
405                item.id
406            )));
407        }
408        let attention_key = attention_key(
409            &attention.work_ref.realm_id,
410            &attention.work_ref.namespace,
411            &attention.binding_id,
412        );
413        if guard.attention.contains_key(&attention_key) {
414            return Err(WorkGraphError::Conflict(format!(
415                "work attention binding {} already exists",
416                attention.binding_id
417            )));
418        }
419        guard.items.insert(item_key, item.clone());
420        guard.attention.insert(attention_key, attention.clone());
421        guard.append_event(item_event);
422        guard.append_event(attention_event);
423        Ok((item, attention))
424    }
425
426    async fn update_attention_cas(
427        &self,
428        attention: WorkAttentionBinding,
429        expected_previous_revision: u64,
430        event: WorkGraphEvent,
431    ) -> Result<WorkAttentionBinding, WorkGraphError> {
432        let mut guard = self.inner.write().await;
433        let key = attention_key(
434            &attention.work_ref.realm_id,
435            &attention.work_ref.namespace,
436            &attention.binding_id,
437        );
438        let Some(current) = guard.attention.get(&key) else {
439            return Err(WorkGraphError::not_found(
440                attention.work_ref.realm_id.clone(),
441                attention.work_ref.namespace.clone(),
442                attention.work_ref.item_id.clone(),
443            ));
444        };
445        if current.machine_state.revision != expected_previous_revision {
446            return Err(WorkGraphError::StaleRevision {
447                id: attention.work_ref.item_id.clone(),
448                expected: expected_previous_revision,
449                actual: current.machine_state.revision,
450            });
451        }
452        guard.attention.insert(key, attention.clone());
453        guard.append_event(event);
454        Ok(attention)
455    }
456
457    async fn update_item_and_attention_cas(
458        &self,
459        item: WorkItem,
460        expected_previous_revision: u64,
461        item_event: WorkGraphEvent,
462        attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
463    ) -> Result<WorkItem, WorkGraphError> {
464        WorkGraphMachine::validate_item_projection(&item)?;
465        let mut guard = self.inner.write().await;
466        let key = item_key(&item.realm_id, &item.namespace, &item.id);
467        let Some(current) = guard.items.get(&key) else {
468            return Err(WorkGraphError::not_found(
469                item.realm_id.clone(),
470                item.namespace.clone(),
471                item.id.clone(),
472            ));
473        };
474        if current.revision != expected_previous_revision {
475            return Err(WorkGraphError::StaleRevision {
476                id: item.id.clone(),
477                expected: expected_previous_revision,
478                actual: current.revision,
479            });
480        }
481        for (attention, expected_revision, _) in &attention_updates {
482            let key = attention_key(
483                &attention.work_ref.realm_id,
484                &attention.work_ref.namespace,
485                &attention.binding_id,
486            );
487            let Some(current) = guard.attention.get(&key) else {
488                return Err(WorkGraphError::not_found(
489                    attention.work_ref.realm_id.clone(),
490                    attention.work_ref.namespace.clone(),
491                    attention.work_ref.item_id.clone(),
492                ));
493            };
494            if current.machine_state.revision != *expected_revision {
495                return Err(WorkGraphError::StaleRevision {
496                    id: attention.work_ref.item_id.clone(),
497                    expected: *expected_revision,
498                    actual: current.machine_state.revision,
499                });
500            }
501        }
502        guard.items.insert(key, item.clone());
503        guard.append_event(item_event);
504        for (attention, _, event) in attention_updates {
505            let key = attention_key(
506                &attention.work_ref.realm_id,
507                &attention.work_ref.namespace,
508                &attention.binding_id,
509            );
510            guard.attention.insert(key, attention);
511            guard.append_event(event);
512        }
513        Ok(item)
514    }
515
516    async fn get_attention(
517        &self,
518        realm_id: &str,
519        namespace: &WorkNamespace,
520        binding_id: &WorkAttentionBindingId,
521    ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
522        let guard = self.inner.read().await;
523        Ok(guard
524            .attention
525            .get(&attention_key(realm_id, namespace, binding_id))
526            .cloned())
527    }
528
529    async fn list_attention(
530        &self,
531        filter: AttentionListRequest,
532    ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
533        let guard = self.inner.read().await;
534        let mut bindings = guard
535            .attention
536            .values()
537            .filter(|binding| attention_matches_filter(binding, &filter))
538            .cloned()
539            .collect::<Vec<_>>();
540        bindings.sort_by(|left, right| {
541            left.updated_at
542                .cmp(&right.updated_at)
543                .then_with(|| left.binding_id.cmp(&right.binding_id))
544        });
545        Ok(bindings)
546    }
547
548    async fn insert_edge(
549        &self,
550        edge: WorkEdge,
551        event: WorkGraphEvent,
552    ) -> Result<WorkEdge, WorkGraphError> {
553        let mut guard = self.inner.write().await;
554        if guard.edges.iter().any(|existing| existing == &edge) {
555            return Err(duplicate_edge_error(&edge));
556        }
557        guard.edges.push(edge.clone());
558        guard.append_event(event);
559        Ok(edge)
560    }
561
562    async fn insert_edge_validated(
563        &self,
564        edge: WorkEdge,
565        event: WorkGraphEvent,
566    ) -> Result<WorkEdge, WorkGraphError> {
567        let mut guard = self.inner.write().await;
568        if guard.edges.iter().any(|existing| existing == &edge) {
569            return Err(duplicate_edge_error(&edge));
570        }
571        let existing_edges = guard
572            .edges
573            .iter()
574            .filter(|existing| {
575                existing.realm_id == edge.realm_id && existing.namespace == edge.namespace
576            })
577            .cloned()
578            .collect::<Vec<_>>();
579        let existing_items = guard
580            .items
581            .values()
582            .filter(|item| item.realm_id == edge.realm_id && item.namespace == edge.namespace)
583            .cloned()
584            .collect::<Vec<_>>();
585        WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
586        guard.edges.push(edge.clone());
587        guard.append_event(event);
588        Ok(edge)
589    }
590
591    async fn list_edges(
592        &self,
593        realm_id: &str,
594        namespace: &WorkNamespace,
595    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
596        let guard = self.inner.read().await;
597        Ok(guard
598            .edges
599            .iter()
600            .filter(|edge| edge.realm_id == realm_id && edge.namespace == *namespace)
601            .cloned()
602            .collect())
603    }
604
605    async fn list_events(
606        &self,
607        filter: WorkGraphEventFilter,
608    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
609        let guard = self.inner.read().await;
610        let mut events = guard
611            .events
612            .iter()
613            .filter(|event| event_matches_filter(event, &filter))
614            .cloned()
615            .collect::<Vec<_>>();
616        events.sort_by_key(|event| event.seq.unwrap_or_default());
617        if let Some(limit) = filter.limit {
618            events.truncate(limit);
619        }
620        Ok(events)
621    }
622}
623
624impl MemoryWorkGraphState {
625    fn append_event(&mut self, mut event: WorkGraphEvent) {
626        self.next_event_seq += 1;
627        event.seq = Some(self.next_event_seq);
628        self.events.push(event);
629    }
630}
631
632fn item_key(
633    realm_id: &str,
634    namespace: &WorkNamespace,
635    id: &WorkItemId,
636) -> (String, WorkNamespace, WorkItemId) {
637    (realm_id.to_string(), namespace.clone(), id.clone())
638}
639
640fn attention_key(
641    realm_id: &str,
642    namespace: &WorkNamespace,
643    id: &WorkAttentionBindingId,
644) -> (String, WorkNamespace, WorkAttentionBindingId) {
645    (realm_id.to_string(), namespace.clone(), id.clone())
646}
647
648fn item_matches_filter(item: &WorkItem, filter: &WorkItemFilter) -> bool {
649    if let Some(realm_id) = &filter.realm_id
650        && &item.realm_id != realm_id
651    {
652        return false;
653    }
654    if !filter.all_namespaces
655        && let Some(namespace) = &filter.namespace
656        && &item.namespace != namespace
657    {
658        return false;
659    }
660    if !filter.statuses.is_empty() && !filter.statuses.contains(&item.status) {
661        return false;
662    }
663    // The terminality verdict (which lifecycle phases are terminal) is a machine
664    // fact owned by WorkGraphLifecycleMachine, not this filter. We drive the
665    // machine's ClassifyTerminality over the item's recovered state and mirror the
666    // verdict, failing closed: an item the machine cannot classify is treated as
667    // terminal so it is never surfaced as live work when terminals are excluded.
668    if !filter.include_terminal && WorkGraphMachine::classify_terminality(item).unwrap_or(true) {
669        return false;
670    }
671    filter
672        .labels
673        .iter()
674        .all(|label| item.labels.contains(label))
675}
676
677fn attention_matches_filter(binding: &WorkAttentionBinding, filter: &AttentionListRequest) -> bool {
678    if let Some(realm_id) = &filter.realm_id
679        && &binding.work_ref.realm_id != realm_id
680    {
681        return false;
682    }
683    if let Some(namespace) = &filter.namespace
684        && &binding.work_ref.namespace != namespace
685    {
686        return false;
687    }
688    if let Some(target) = &filter.target
689        && &binding.target != target
690    {
691        return false;
692    }
693    if let Some(status) = &filter.status
694        && !attention_status_matches_filter(&binding.status, status)
695    {
696        return false;
697    }
698    true
699}
700
701fn attention_status_matches_filter(
702    actual: &crate::types::WorkAttentionStatus,
703    filter: &crate::types::WorkAttentionStatus,
704) -> bool {
705    use crate::types::WorkAttentionStatus;
706
707    match (actual, filter) {
708        (WorkAttentionStatus::Active, WorkAttentionStatus::Active)
709        | (WorkAttentionStatus::Superseded, WorkAttentionStatus::Superseded)
710        | (WorkAttentionStatus::Stopped, WorkAttentionStatus::Stopped) => true,
711        (WorkAttentionStatus::Paused { .. }, WorkAttentionStatus::Paused { until: None }) => true,
712        (
713            WorkAttentionStatus::Paused {
714                until: Some(actual_until),
715            },
716            WorkAttentionStatus::Paused {
717                until: Some(filter_until),
718            },
719        ) => actual_until == filter_until,
720        _ => false,
721    }
722}
723
724fn event_matches_filter(event: &WorkGraphEvent, filter: &WorkGraphEventFilter) -> bool {
725    if let Some(after_seq) = filter.after_seq
726        && event.seq.unwrap_or_default() <= after_seq
727    {
728        return false;
729    }
730    if let Some(realm_id) = &filter.realm_id
731        && &event.realm_id != realm_id
732    {
733        return false;
734    }
735    if !filter.all_namespaces
736        && let Some(namespace) = &filter.namespace
737        && &event.namespace != namespace
738    {
739        return false;
740    }
741    true
742}
743
744#[cfg(not(target_arch = "wasm32"))]
745pub struct SqliteWorkGraphStore {
746    path: PathBuf,
747}
748
749#[cfg(not(target_arch = "wasm32"))]
750impl SqliteWorkGraphStore {
751    pub fn open(path: impl Into<PathBuf>) -> Result<Self, WorkGraphError> {
752        let store = Self { path: path.into() };
753        store.with_connection(|_conn| Ok(()))?;
754        Ok(store)
755    }
756
757    pub fn path(&self) -> &Path {
758        &self.path
759    }
760
761    pub fn rebuild_projection_from_events(&self) -> Result<(), WorkGraphError> {
762        self.with_connection(|conn| {
763            let tx = conn
764                .transaction()
765                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
766            tx.execute("DELETE FROM workgraph_items", [])
767                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
768            tx.execute("DELETE FROM workgraph_edges", [])
769                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
770            tx.execute("DELETE FROM workgraph_attention", [])
771                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
772
773            let events = {
774                let mut stmt = tx
775                    .prepare("SELECT event_json FROM workgraph_events ORDER BY seq ASC")
776                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
777                let rows = stmt
778                    .query_map([], |row| row_json::<WorkGraphEvent>(row, 0))
779                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
780                let mut events = Vec::new();
781                for row in rows {
782                    events.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
783                }
784                events
785            };
786
787            for event in events {
788                replay_event_tx(&tx, &event)?;
789            }
790            normalize_attention_for_terminal_items_tx(&tx)?;
791            tx.commit()
792                .map_err(|err| WorkGraphError::Store(err.to_string()))
793        })
794    }
795
796    fn with_connection<T>(
797        &self,
798        f: impl FnOnce(&mut Connection) -> Result<T, WorkGraphError>,
799    ) -> Result<T, WorkGraphError> {
800        if let Some(parent) = self.path.parent() {
801            std::fs::create_dir_all(parent)
802                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
803        }
804        let mut conn =
805            Connection::open(&self.path).map_err(|err| WorkGraphError::Store(err.to_string()))?;
806        conn.pragma_update(None, "journal_mode", "WAL")
807            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
808        conn.pragma_update(None, "synchronous", "NORMAL")
809            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
810        init_sqlite_schema(&conn)?;
811        f(&mut conn)
812    }
813}
814
815#[cfg(not(target_arch = "wasm32"))]
816#[async_trait]
817impl WorkGraphStore for SqliteWorkGraphStore {
818    fn kind(&self) -> WorkGraphStoreKind {
819        WorkGraphStoreKind::Sqlite
820    }
821
822    async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
823        Ok(Utc::now())
824    }
825
826    async fn insert_item(
827        &self,
828        item: WorkItem,
829        event: WorkGraphEvent,
830    ) -> Result<WorkItem, WorkGraphError> {
831        WorkGraphMachine::validate_item_projection(&item)?;
832        self.with_connection(|conn| {
833            let tx = conn
834                .transaction()
835                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
836            insert_item_tx(&tx, &item)?;
837            insert_event_tx(&tx, &event)?;
838            tx.commit()
839                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
840            Ok(item)
841        })
842    }
843
844    async fn update_item_cas(
845        &self,
846        item: WorkItem,
847        expected_previous_revision: u64,
848        event: WorkGraphEvent,
849    ) -> Result<WorkItem, WorkGraphError> {
850        WorkGraphMachine::validate_item_projection(&item)?;
851        self.with_connection(|conn| {
852            let tx = conn
853                .transaction()
854                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
855            let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
856            if changed == 0 {
857                let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
858                return match actual {
859                    Some(actual) => Err(WorkGraphError::StaleRevision {
860                        id: item.id,
861                        expected: expected_previous_revision,
862                        actual,
863                    }),
864                    None => Err(WorkGraphError::not_found(
865                        item.realm_id,
866                        item.namespace,
867                        item.id,
868                    )),
869                };
870            }
871            insert_event_tx(&tx, &event)?;
872            tx.commit()
873                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
874            Ok(item)
875        })
876    }
877
878    async fn get_item(
879        &self,
880        realm_id: &str,
881        namespace: &WorkNamespace,
882        id: &WorkItemId,
883    ) -> Result<Option<WorkItem>, WorkGraphError> {
884        self.with_connection(|conn| select_item(conn, realm_id, namespace, id))
885    }
886
887    async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
888        self.with_connection(|conn| list_sqlite_items(conn, &filter))
889    }
890
891    async fn insert_goal(
892        &self,
893        item: WorkItem,
894        item_event: WorkGraphEvent,
895        attention: WorkAttentionBinding,
896        attention_event: WorkGraphEvent,
897    ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
898        WorkGraphMachine::validate_item_projection(&item)?;
899        self.with_connection(|conn| {
900            let tx = conn
901                .transaction()
902                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
903            insert_item_tx(&tx, &item)?;
904            insert_attention_tx(&tx, &attention)?;
905            insert_event_tx(&tx, &item_event)?;
906            insert_event_tx(&tx, &attention_event)?;
907            tx.commit()
908                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
909            Ok((item, attention))
910        })
911    }
912
913    async fn update_attention_cas(
914        &self,
915        attention: WorkAttentionBinding,
916        expected_previous_revision: u64,
917        event: WorkGraphEvent,
918    ) -> Result<WorkAttentionBinding, WorkGraphError> {
919        self.with_connection(|conn| {
920            let tx = conn
921                .transaction()
922                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
923            let changed = update_attention_tx(&tx, &attention, expected_previous_revision)?;
924            if changed == 0 {
925                let actual = current_attention_revision_tx(
926                    &tx,
927                    &attention.work_ref.realm_id,
928                    &attention.work_ref.namespace,
929                    &attention.binding_id,
930                )?;
931                return match actual {
932                    Some(actual) => Err(WorkGraphError::StaleRevision {
933                        id: attention.work_ref.item_id,
934                        expected: expected_previous_revision,
935                        actual,
936                    }),
937                    None => Err(WorkGraphError::not_found(
938                        attention.work_ref.realm_id,
939                        attention.work_ref.namespace,
940                        attention.work_ref.item_id,
941                    )),
942                };
943            }
944            insert_event_tx(&tx, &event)?;
945            tx.commit()
946                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
947            Ok(attention)
948        })
949    }
950
951    async fn update_item_and_attention_cas(
952        &self,
953        item: WorkItem,
954        expected_previous_revision: u64,
955        item_event: WorkGraphEvent,
956        attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
957    ) -> Result<WorkItem, WorkGraphError> {
958        WorkGraphMachine::validate_item_projection(&item)?;
959        self.with_connection(|conn| {
960            let tx = conn
961                .transaction()
962                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
963            let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
964            if changed == 0 {
965                let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
966                return match actual {
967                    Some(actual) => Err(WorkGraphError::StaleRevision {
968                        id: item.id,
969                        expected: expected_previous_revision,
970                        actual,
971                    }),
972                    None => Err(WorkGraphError::not_found(
973                        item.realm_id,
974                        item.namespace,
975                        item.id,
976                    )),
977                };
978            }
979            insert_event_tx(&tx, &item_event)?;
980            for (attention, expected_revision, event) in &attention_updates {
981                let changed = update_attention_tx(&tx, attention, *expected_revision)?;
982                if changed == 0 {
983                    let actual = current_attention_revision_tx(
984                        &tx,
985                        &attention.work_ref.realm_id,
986                        &attention.work_ref.namespace,
987                        &attention.binding_id,
988                    )?;
989                    return match actual {
990                        Some(actual) => Err(WorkGraphError::StaleRevision {
991                            id: attention.work_ref.item_id.clone(),
992                            expected: *expected_revision,
993                            actual,
994                        }),
995                        None => Err(WorkGraphError::not_found(
996                            attention.work_ref.realm_id.clone(),
997                            attention.work_ref.namespace.clone(),
998                            attention.work_ref.item_id.clone(),
999                        )),
1000                    };
1001                }
1002                insert_event_tx(&tx, event)?;
1003            }
1004            tx.commit()
1005                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1006            Ok(item)
1007        })
1008    }
1009
1010    async fn get_attention(
1011        &self,
1012        realm_id: &str,
1013        namespace: &WorkNamespace,
1014        binding_id: &WorkAttentionBindingId,
1015    ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
1016        self.with_connection(|conn| select_attention(conn, realm_id, namespace, binding_id))
1017    }
1018
1019    async fn list_attention(
1020        &self,
1021        filter: AttentionListRequest,
1022    ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
1023        self.with_connection(|conn| list_sqlite_attention(conn, &filter))
1024    }
1025
1026    async fn insert_edge(
1027        &self,
1028        edge: WorkEdge,
1029        event: WorkGraphEvent,
1030    ) -> Result<WorkEdge, WorkGraphError> {
1031        self.with_connection(|conn| {
1032            let tx = conn
1033                .transaction()
1034                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1035            insert_edge_tx(&tx, &edge)?;
1036            insert_event_tx(&tx, &event)?;
1037            tx.commit()
1038                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1039            Ok(edge)
1040        })
1041    }
1042
1043    async fn insert_edge_validated(
1044        &self,
1045        edge: WorkEdge,
1046        event: WorkGraphEvent,
1047    ) -> Result<WorkEdge, WorkGraphError> {
1048        self.with_connection(|conn| {
1049            let tx = conn
1050                .transaction_with_behavior(TransactionBehavior::Immediate)
1051                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1052            let existing_edges = list_sqlite_edges(&tx, &edge.realm_id, &edge.namespace)?;
1053            let existing_items = list_sqlite_items(
1054                &tx,
1055                &WorkItemFilter {
1056                    realm_id: Some(edge.realm_id.clone()),
1057                    namespace: Some(edge.namespace.clone()),
1058                    include_terminal: true,
1059                    ..WorkItemFilter::default()
1060                },
1061            )?;
1062            WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
1063            insert_edge_tx(&tx, &edge)?;
1064            insert_event_tx(&tx, &event)?;
1065            tx.commit()
1066                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1067            Ok(edge)
1068        })
1069    }
1070
1071    async fn list_edges(
1072        &self,
1073        realm_id: &str,
1074        namespace: &WorkNamespace,
1075    ) -> Result<Vec<WorkEdge>, WorkGraphError> {
1076        self.with_connection(|conn| list_sqlite_edges(conn, realm_id, namespace))
1077    }
1078
1079    async fn list_events(
1080        &self,
1081        filter: WorkGraphEventFilter,
1082    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
1083        self.with_connection(|conn| list_sqlite_events(conn, &filter))
1084    }
1085}
1086
1087#[cfg(not(target_arch = "wasm32"))]
1088fn init_sqlite_schema(conn: &Connection) -> Result<(), WorkGraphError> {
1089    conn.execute_batch(
1090        r"
1091        CREATE TABLE IF NOT EXISTS workgraph_items (
1092            realm_id TEXT NOT NULL,
1093            namespace TEXT NOT NULL,
1094            item_id TEXT NOT NULL,
1095            revision INTEGER NOT NULL,
1096            updated_at_utc TEXT NOT NULL,
1097            item_json TEXT NOT NULL,
1098            PRIMARY KEY (realm_id, namespace, item_id)
1099        );
1100        CREATE INDEX IF NOT EXISTS idx_workgraph_items_realm_namespace_updated
1101            ON workgraph_items (realm_id, namespace, updated_at_utc);
1102
1103        CREATE TABLE IF NOT EXISTS workgraph_attention (
1104            realm_id TEXT NOT NULL,
1105            namespace TEXT NOT NULL,
1106            binding_id TEXT NOT NULL,
1107            revision INTEGER NOT NULL,
1108            updated_at_utc TEXT NOT NULL,
1109            attention_json TEXT NOT NULL,
1110            PRIMARY KEY (realm_id, namespace, binding_id)
1111        );
1112        CREATE INDEX IF NOT EXISTS idx_workgraph_attention_realm_namespace_updated
1113            ON workgraph_attention (realm_id, namespace, updated_at_utc);
1114
1115        CREATE TABLE IF NOT EXISTS workgraph_edges (
1116            realm_id TEXT NOT NULL,
1117            namespace TEXT NOT NULL,
1118            edge_kind TEXT NOT NULL,
1119            from_id TEXT NOT NULL,
1120            to_id TEXT NOT NULL,
1121            edge_json TEXT NOT NULL,
1122            PRIMARY KEY (realm_id, namespace, edge_kind, from_id, to_id)
1123        );
1124
1125        CREATE TABLE IF NOT EXISTS workgraph_events (
1126            seq INTEGER PRIMARY KEY AUTOINCREMENT,
1127            realm_id TEXT NOT NULL,
1128            namespace TEXT NOT NULL,
1129            item_id TEXT,
1130            event_kind TEXT NOT NULL,
1131            at_utc TEXT NOT NULL,
1132            event_json TEXT NOT NULL
1133        );
1134        CREATE INDEX IF NOT EXISTS idx_workgraph_events_realm_namespace_seq
1135            ON workgraph_events (realm_id, namespace, seq);
1136        ",
1137    )
1138    .map_err(|err| WorkGraphError::Store(err.to_string()))
1139}
1140
1141#[cfg(not(target_arch = "wasm32"))]
1142fn insert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
1143    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1144    tx.execute(
1145        "INSERT INTO workgraph_items (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
1146         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1147        params![
1148            item.realm_id,
1149            item.namespace.as_str(),
1150            item.id.as_str(),
1151            item.revision,
1152            item.updated_at.to_rfc3339(),
1153            json,
1154        ],
1155    )
1156    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1157    Ok(())
1158}
1159
1160#[cfg(not(target_arch = "wasm32"))]
1161fn update_item_tx(
1162    tx: &Transaction<'_>,
1163    item: &WorkItem,
1164    expected_previous_revision: u64,
1165) -> Result<usize, WorkGraphError> {
1166    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1167    tx.execute(
1168        "UPDATE workgraph_items
1169            SET revision = ?4, updated_at_utc = ?5, item_json = ?6
1170          WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3 AND revision = ?7",
1171        params![
1172            item.realm_id,
1173            item.namespace.as_str(),
1174            item.id.as_str(),
1175            item.revision,
1176            item.updated_at.to_rfc3339(),
1177            json,
1178            expected_previous_revision,
1179        ],
1180    )
1181    .map_err(|err| WorkGraphError::Store(err.to_string()))
1182}
1183
1184#[cfg(not(target_arch = "wasm32"))]
1185fn upsert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
1186    let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1187    tx.execute(
1188        "INSERT INTO workgraph_items
1189            (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
1190         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1191         ON CONFLICT(realm_id, namespace, item_id) DO UPDATE SET
1192            revision = excluded.revision,
1193            updated_at_utc = excluded.updated_at_utc,
1194            item_json = excluded.item_json",
1195        params![
1196            item.realm_id,
1197            item.namespace.as_str(),
1198            item.id.as_str(),
1199            item.revision,
1200            item.updated_at.to_rfc3339(),
1201            json,
1202        ],
1203    )
1204    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1205    Ok(())
1206}
1207
1208#[cfg(not(target_arch = "wasm32"))]
1209fn current_revision_tx(
1210    tx: &Transaction<'_>,
1211    realm_id: &str,
1212    namespace: &WorkNamespace,
1213    id: &WorkItemId,
1214) -> Result<Option<u64>, WorkGraphError> {
1215    tx.query_row(
1216        "SELECT revision FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1217        params![realm_id, namespace.as_str(), id.as_str()],
1218        |row| row.get::<_, u64>(0),
1219    )
1220    .optional()
1221    .map_err(|err| WorkGraphError::Store(err.to_string()))
1222}
1223
1224#[cfg(not(target_arch = "wasm32"))]
1225fn insert_attention_tx(
1226    tx: &Transaction<'_>,
1227    attention: &WorkAttentionBinding,
1228) -> Result<(), WorkGraphError> {
1229    let json =
1230        serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1231    tx.execute(
1232        "INSERT INTO workgraph_attention
1233            (realm_id, namespace, binding_id, revision, updated_at_utc, attention_json)
1234         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1235        params![
1236            attention.work_ref.realm_id,
1237            attention.work_ref.namespace.as_str(),
1238            attention.binding_id.as_str(),
1239            attention.machine_state.revision,
1240            attention.updated_at.to_rfc3339(),
1241            json,
1242        ],
1243    )
1244    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1245    Ok(())
1246}
1247
1248#[cfg(not(target_arch = "wasm32"))]
1249fn update_attention_tx(
1250    tx: &Transaction<'_>,
1251    attention: &WorkAttentionBinding,
1252    expected_previous_revision: u64,
1253) -> Result<usize, WorkGraphError> {
1254    let json =
1255        serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1256    tx.execute(
1257        "UPDATE workgraph_attention
1258            SET revision = ?4, updated_at_utc = ?5, attention_json = ?6
1259          WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3 AND revision = ?7",
1260        params![
1261            attention.work_ref.realm_id,
1262            attention.work_ref.namespace.as_str(),
1263            attention.binding_id.as_str(),
1264            attention.machine_state.revision,
1265            attention.updated_at.to_rfc3339(),
1266            json,
1267            expected_previous_revision,
1268        ],
1269    )
1270    .map_err(|err| WorkGraphError::Store(err.to_string()))
1271}
1272
1273#[cfg(not(target_arch = "wasm32"))]
1274fn upsert_attention_tx(
1275    tx: &Transaction<'_>,
1276    attention: &WorkAttentionBinding,
1277) -> Result<(), WorkGraphError> {
1278    let json =
1279        serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1280    tx.execute(
1281        "INSERT INTO workgraph_attention
1282            (realm_id, namespace, binding_id, revision, updated_at_utc, attention_json)
1283         VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1284         ON CONFLICT(realm_id, namespace, binding_id) DO UPDATE SET
1285            revision = excluded.revision,
1286            updated_at_utc = excluded.updated_at_utc,
1287            attention_json = excluded.attention_json",
1288        params![
1289            attention.work_ref.realm_id,
1290            attention.work_ref.namespace.as_str(),
1291            attention.binding_id.as_str(),
1292            attention.machine_state.revision,
1293            attention.updated_at.to_rfc3339(),
1294            json,
1295        ],
1296    )
1297    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1298    Ok(())
1299}
1300
1301#[cfg(not(target_arch = "wasm32"))]
1302fn current_attention_revision_tx(
1303    tx: &Transaction<'_>,
1304    realm_id: &str,
1305    namespace: &WorkNamespace,
1306    binding_id: &WorkAttentionBindingId,
1307) -> Result<Option<u64>, WorkGraphError> {
1308    tx.query_row(
1309        "SELECT revision FROM workgraph_attention
1310         WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3",
1311        params![realm_id, namespace.as_str(), binding_id.as_str()],
1312        |row| row.get::<_, u64>(0),
1313    )
1314    .optional()
1315    .map_err(|err| WorkGraphError::Store(err.to_string()))
1316}
1317
1318#[cfg(not(target_arch = "wasm32"))]
1319fn insert_edge_tx(tx: &Transaction<'_>, edge: &WorkEdge) -> Result<(), WorkGraphError> {
1320    let json = serde_json::to_string(edge).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1321    tx.execute(
1322        "INSERT INTO workgraph_edges
1323            (realm_id, namespace, edge_kind, from_id, to_id, edge_json)
1324         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1325        params![
1326            edge.realm_id,
1327            edge.namespace.as_str(),
1328            format!("{:?}", edge.kind),
1329            edge.from_id.as_str(),
1330            edge.to_id.as_str(),
1331            json,
1332        ],
1333    )
1334    .map_err(|err| map_sqlite_insert_edge_error(err, edge))?;
1335    Ok(())
1336}
1337
1338fn duplicate_edge_error(edge: &WorkEdge) -> WorkGraphError {
1339    WorkGraphError::Conflict(format!(
1340        "work edge {:?} {} -> {} already exists",
1341        edge.kind, edge.from_id, edge.to_id
1342    ))
1343}
1344
1345#[cfg(not(target_arch = "wasm32"))]
1346fn map_sqlite_insert_edge_error(err: rusqlite::Error, edge: &WorkEdge) -> WorkGraphError {
1347    match err {
1348        rusqlite::Error::SqliteFailure(failure, _)
1349            if failure.code == ErrorCode::ConstraintViolation =>
1350        {
1351            duplicate_edge_error(edge)
1352        }
1353        err => WorkGraphError::Store(err.to_string()),
1354    }
1355}
1356
1357#[cfg(not(target_arch = "wasm32"))]
1358fn insert_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
1359    let json =
1360        serde_json::to_string(event).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1361    tx.execute(
1362        "INSERT INTO workgraph_events
1363            (realm_id, namespace, item_id, event_kind, at_utc, event_json)
1364         VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1365        params![
1366            event.realm_id,
1367            event.namespace.as_str(),
1368            event.item_id.as_ref().map(WorkItemId::as_str),
1369            format!("{:?}", event.kind),
1370            event.at.to_rfc3339(),
1371            json,
1372        ],
1373    )
1374    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1375    Ok(())
1376}
1377
1378#[cfg(not(target_arch = "wasm32"))]
1379fn select_item(
1380    conn: &Connection,
1381    realm_id: &str,
1382    namespace: &WorkNamespace,
1383    id: &WorkItemId,
1384) -> Result<Option<WorkItem>, WorkGraphError> {
1385    conn.query_row(
1386        "SELECT item_json FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1387        params![realm_id, namespace.as_str(), id.as_str()],
1388        |row| row_json(row, 0),
1389    )
1390    .optional()
1391    .map_err(|err| WorkGraphError::Store(err.to_string()))
1392}
1393
1394#[cfg(not(target_arch = "wasm32"))]
1395fn list_sqlite_items(
1396    conn: &Connection,
1397    filter: &WorkItemFilter,
1398) -> Result<Vec<WorkItem>, WorkGraphError> {
1399    let mut stmt = conn
1400        .prepare("SELECT item_json FROM workgraph_items ORDER BY updated_at_utc ASC, item_id ASC")
1401        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1402    let rows = stmt
1403        .query_map([], |row| row_json::<WorkItem>(row, 0))
1404        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1405    let mut items = Vec::new();
1406    for row in rows {
1407        let item = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1408        if item_matches_filter(&item, filter) {
1409            items.push(item);
1410            if filter.limit.is_some_and(|limit| items.len() >= limit) {
1411                break;
1412            }
1413        }
1414    }
1415    Ok(items)
1416}
1417
1418#[cfg(not(target_arch = "wasm32"))]
1419fn select_attention(
1420    conn: &Connection,
1421    realm_id: &str,
1422    namespace: &WorkNamespace,
1423    binding_id: &WorkAttentionBindingId,
1424) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
1425    conn.query_row(
1426        "SELECT attention_json FROM workgraph_attention
1427         WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3",
1428        params![realm_id, namespace.as_str(), binding_id.as_str()],
1429        |row| row_json(row, 0),
1430    )
1431    .optional()
1432    .map_err(|err| WorkGraphError::Store(err.to_string()))
1433}
1434
1435#[cfg(not(target_arch = "wasm32"))]
1436fn list_sqlite_attention(
1437    conn: &Connection,
1438    filter: &AttentionListRequest,
1439) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
1440    let mut stmt = conn
1441        .prepare(
1442            "SELECT attention_json FROM workgraph_attention
1443             ORDER BY updated_at_utc ASC, binding_id ASC",
1444        )
1445        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1446    let rows = stmt
1447        .query_map([], |row| row_json::<WorkAttentionBinding>(row, 0))
1448        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1449    let mut bindings = Vec::new();
1450    for row in rows {
1451        let binding = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1452        if attention_matches_filter(&binding, filter) {
1453            bindings.push(binding);
1454        }
1455    }
1456    Ok(bindings)
1457}
1458
1459#[cfg(not(target_arch = "wasm32"))]
1460fn list_sqlite_edges(
1461    conn: &Connection,
1462    realm_id: &str,
1463    namespace: &WorkNamespace,
1464) -> Result<Vec<WorkEdge>, WorkGraphError> {
1465    let mut stmt = conn
1466        .prepare(
1467            "SELECT edge_json FROM workgraph_edges
1468             WHERE realm_id = ?1 AND namespace = ?2
1469             ORDER BY edge_kind ASC, from_id ASC, to_id ASC",
1470        )
1471        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1472    let rows = stmt
1473        .query_map(params![realm_id, namespace.as_str()], |row| {
1474            row_json::<WorkEdge>(row, 0)
1475        })
1476        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1477    let mut edges = Vec::new();
1478    for row in rows {
1479        edges.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
1480    }
1481    Ok(edges)
1482}
1483
1484#[cfg(not(target_arch = "wasm32"))]
1485fn list_sqlite_events(
1486    conn: &Connection,
1487    filter: &WorkGraphEventFilter,
1488) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
1489    let mut stmt = conn
1490        .prepare("SELECT seq, event_json FROM workgraph_events ORDER BY seq ASC")
1491        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1492    let rows = stmt
1493        .query_map([], |row| {
1494            let seq = row.get::<_, i64>(0)?;
1495            let mut event = row_json::<WorkGraphEvent>(row, 1)?;
1496            event.seq = Some(seq);
1497            Ok(event)
1498        })
1499        .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1500    let mut events = Vec::new();
1501    for row in rows {
1502        let event = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1503        if event_matches_filter(&event, filter) {
1504            events.push(event);
1505            if filter.limit.is_some_and(|limit| events.len() >= limit) {
1506                break;
1507            }
1508        }
1509    }
1510    Ok(events)
1511}
1512
1513#[cfg(not(target_arch = "wasm32"))]
1514fn replay_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
1515    match event.kind {
1516        WorkGraphEventKind::Linked => {
1517            let edge = payload_field::<WorkEdge>(event, "edge")?;
1518            insert_edge_tx(tx, &edge)
1519        }
1520        WorkGraphEventKind::AttentionCreated | WorkGraphEventKind::AttentionUpdated => {
1521            let attention = payload_field::<WorkAttentionBinding>(event, "attention")?;
1522            upsert_attention_tx(tx, &attention)
1523        }
1524        WorkGraphEventKind::Created
1525        | WorkGraphEventKind::Updated
1526        | WorkGraphEventKind::Claimed
1527        | WorkGraphEventKind::Released
1528        | WorkGraphEventKind::Blocked
1529        | WorkGraphEventKind::Closed
1530        | WorkGraphEventKind::EvidenceAdded => {
1531            let item = payload_field::<WorkItem>(event, "item")?;
1532            upsert_item_tx(tx, &item)
1533        }
1534    }
1535}
1536
1537#[cfg(not(target_arch = "wasm32"))]
1538fn normalize_attention_for_terminal_items_tx(tx: &Transaction<'_>) -> Result<(), WorkGraphError> {
1539    let bindings = {
1540        let mut stmt = tx
1541            .prepare("SELECT attention_json FROM workgraph_attention")
1542            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1543        let rows = stmt
1544            .query_map([], |row| row_json::<WorkAttentionBinding>(row, 0))
1545            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1546        let mut bindings = Vec::new();
1547        for row in rows {
1548            bindings.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
1549        }
1550        bindings
1551    };
1552
1553    for binding in bindings {
1554        if matches!(
1555            binding.status,
1556            WorkAttentionStatus::Stopped | WorkAttentionStatus::Superseded
1557        ) {
1558            continue;
1559        }
1560        let item = tx
1561            .query_row(
1562                "SELECT item_json FROM workgraph_items
1563                 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1564                params![
1565                    binding.work_ref.realm_id,
1566                    binding.work_ref.namespace.as_str(),
1567                    binding.work_ref.item_id.as_str(),
1568                ],
1569                |row| row_json::<WorkItem>(row, 0),
1570            )
1571            .optional()
1572            .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1573        let Some(item) = item else {
1574            continue;
1575        };
1576        // Terminality is a WorkGraph machine fact: the shell mirrors the
1577        // canonical classify verdict rather than re-deciding `is_terminal()`.
1578        if WorkGraphMachine::classify_terminality(&item)? {
1579            let expected_revision = binding.machine_state.revision;
1580            let stopped = WorkAttentionMachine::stop(binding, expected_revision, item.updated_at)?;
1581            upsert_attention_tx(tx, &stopped)?;
1582        }
1583    }
1584    Ok(())
1585}
1586
1587#[cfg(not(target_arch = "wasm32"))]
1588fn payload_field<T: serde::de::DeserializeOwned>(
1589    event: &WorkGraphEvent,
1590    field: &str,
1591) -> Result<T, WorkGraphError> {
1592    let value = event.payload.get(field).ok_or_else(|| {
1593        WorkGraphError::Store(format!(
1594            "workgraph event {:?} missing payload field `{field}`",
1595            event.kind
1596        ))
1597    })?;
1598    serde_json::from_value(value.clone()).map_err(|err| WorkGraphError::Store(err.to_string()))
1599}
1600
1601#[cfg(not(target_arch = "wasm32"))]
1602fn row_json<T: serde::de::DeserializeOwned>(
1603    row: &rusqlite::Row<'_>,
1604    index: usize,
1605) -> rusqlite::Result<T> {
1606    let json = row.get::<_, String>(index)?;
1607    serde_json::from_str(&json).map_err(|err| {
1608        rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(err))
1609    })
1610}
1611
1612#[cfg(test)]
1613#[allow(clippy::expect_used, clippy::unwrap_used)]
1614mod tests {
1615    use std::collections::BTreeSet;
1616
1617    use chrono::Utc;
1618    use serde_json::json;
1619
1620    use crate::types::WorkEdge;
1621    use crate::{
1622        AttentionDelegatedAuthority, AttentionProjectionPolicy, CreateWorkItemRequest,
1623        GoalAttentionTarget, GoalCreateRequest, GoalRequestCloseRequest, GoalTerminalStatus,
1624        LinkWorkItemsRequest, MemoryWorkGraphStore, WorkAttentionMode, WorkAttentionStatus,
1625        WorkCompletionPolicy, WorkEdgeKind, WorkGraphError, WorkGraphEvent, WorkGraphEventFilter,
1626        WorkGraphEventKind, WorkGraphService, WorkGraphStore, WorkItemFilter, WorkItemId,
1627        WorkNamespace,
1628    };
1629
1630    fn test_edge() -> WorkEdge {
1631        WorkEdge {
1632            realm_id: "realm".to_string(),
1633            namespace: WorkNamespace::default(),
1634            kind: WorkEdgeKind::Blocks,
1635            from_id: WorkItemId::generated(),
1636            to_id: WorkItemId::generated(),
1637            created_at: Utc::now(),
1638        }
1639    }
1640
1641    fn link_event(edge: &WorkEdge) -> WorkGraphEvent {
1642        WorkGraphEvent::graph(
1643            edge.realm_id.clone(),
1644            edge.namespace.clone(),
1645            WorkGraphEventKind::Linked,
1646            edge.created_at,
1647            json!({ "edge": edge }),
1648        )
1649    }
1650
1651    #[tokio::test]
1652    async fn memory_store_namespace_filters_do_not_leak() {
1653        let store = std::sync::Arc::new(MemoryWorkGraphStore::new());
1654        let default_service =
1655            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1656        let other_service = WorkGraphService::with_scope(
1657            store.clone(),
1658            "realm",
1659            WorkNamespace::new("other").expect("namespace"),
1660        );
1661        default_service
1662            .create(CreateWorkItemRequest {
1663                realm_id: None,
1664                namespace: None,
1665                title: "default".to_string(),
1666                description: None,
1667                priority: Default::default(),
1668                completion_policy: Default::default(),
1669                labels: BTreeSet::new(),
1670                due_at: None,
1671                not_before: None,
1672                snoozed_until: None,
1673                external_refs: Vec::new(),
1674                evidence_refs: Vec::new(),
1675                status: None,
1676            })
1677            .await
1678            .expect("create default");
1679        other_service
1680            .create(CreateWorkItemRequest {
1681                realm_id: None,
1682                namespace: None,
1683                title: "other".to_string(),
1684                description: None,
1685                priority: Default::default(),
1686                completion_policy: Default::default(),
1687                labels: BTreeSet::new(),
1688                due_at: None,
1689                not_before: None,
1690                snoozed_until: None,
1691                external_refs: Vec::new(),
1692                evidence_refs: Vec::new(),
1693                status: None,
1694            })
1695            .await
1696            .expect("create other");
1697
1698        let items = store
1699            .list_items(WorkItemFilter {
1700                realm_id: Some("realm".to_string()),
1701                namespace: Some(WorkNamespace::default()),
1702                ..WorkItemFilter::default()
1703            })
1704            .await
1705            .expect("list");
1706        assert_eq!(items.len(), 1);
1707        assert_eq!(items[0].title, "default");
1708    }
1709
1710    #[tokio::test]
1711    async fn memory_store_duplicate_edge_does_not_append_event() {
1712        let store = MemoryWorkGraphStore::new();
1713        let edge = test_edge();
1714        store
1715            .insert_edge(edge.clone(), link_event(&edge))
1716            .await
1717            .expect("insert edge");
1718
1719        let error = store
1720            .insert_edge(edge.clone(), link_event(&edge))
1721            .await
1722            .expect_err("duplicate edge should fail");
1723        assert!(matches!(error, WorkGraphError::Conflict(_)));
1724
1725        let events = store
1726            .list_events(WorkGraphEventFilter {
1727                realm_id: Some(edge.realm_id),
1728                namespace: Some(edge.namespace),
1729                all_namespaces: false,
1730                after_seq: None,
1731                limit: None,
1732            })
1733            .await
1734            .expect("events");
1735        assert_eq!(events.len(), 1);
1736    }
1737
1738    #[cfg(not(target_arch = "wasm32"))]
1739    #[tokio::test]
1740    async fn sqlite_persistence_survives_restart() {
1741        let dir = tempfile::tempdir().expect("tempdir");
1742        let path = dir.path().join("workgraph.sqlite3");
1743        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1744        let service = WorkGraphService::with_scope(store, "realm", WorkNamespace::default());
1745        let item = service
1746            .create(CreateWorkItemRequest {
1747                realm_id: None,
1748                namespace: None,
1749                title: "persist me".to_string(),
1750                description: None,
1751                priority: Default::default(),
1752                completion_policy: Default::default(),
1753                labels: BTreeSet::new(),
1754                due_at: None,
1755                not_before: None,
1756                snoozed_until: None,
1757                external_refs: Vec::new(),
1758                evidence_refs: Vec::new(),
1759                status: None,
1760            })
1761            .await
1762            .expect("create");
1763
1764        let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1765        let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1766        let fetched = service.get(None, None, item.id.clone()).await.expect("get");
1767        assert_eq!(fetched.title, "persist me");
1768    }
1769
1770    #[cfg(not(target_arch = "wasm32"))]
1771    #[tokio::test]
1772    async fn sqlite_item_without_machine_state_fails_closed_on_read() {
1773        let dir = tempfile::tempdir().expect("tempdir");
1774        let path = dir.path().join("workgraph.sqlite3");
1775        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1776        let service =
1777            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1778        let item = service
1779            .create(CreateWorkItemRequest {
1780                realm_id: None,
1781                namespace: None,
1782                title: "legacy item".to_string(),
1783                description: None,
1784                priority: Default::default(),
1785                completion_policy: Default::default(),
1786                labels: BTreeSet::new(),
1787                due_at: None,
1788                not_before: None,
1789                snoozed_until: None,
1790                external_refs: Vec::new(),
1791                evidence_refs: Vec::new(),
1792                status: None,
1793            })
1794            .await
1795            .expect("create");
1796
1797        store
1798            .with_connection(|conn| {
1799                let json: String = conn
1800                    .query_row(
1801                        "SELECT item_json FROM workgraph_items
1802                         WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1803                        rusqlite::params![
1804                            &item.realm_id,
1805                            item.namespace.as_str(),
1806                            item.id.as_str()
1807                        ],
1808                        |row| row.get(0),
1809                    )
1810                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1811                let mut value = serde_json::from_str::<serde_json::Value>(&json)
1812                    .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1813                value
1814                    .as_object_mut()
1815                    .expect("item json object")
1816                    .remove("machine_state");
1817                conn.execute(
1818                    "UPDATE workgraph_items
1819                        SET item_json = ?4
1820                      WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1821                    rusqlite::params![
1822                        &item.realm_id,
1823                        item.namespace.as_str(),
1824                        item.id.as_str(),
1825                        serde_json::to_string(&value)
1826                            .map_err(|err| WorkGraphError::Store(err.to_string()))?
1827                    ],
1828                )
1829                .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1830                Ok(())
1831            })
1832            .expect("strip machine state");
1833
1834        // machine_state is the sole machine-owned lifecycle/revision authority.
1835        // A persisted item missing it can no longer be backfilled from projected
1836        // fields (that fabrication path was deleted); reading it must FAIL CLOSED
1837        // with a typed error rather than reconstructing machine truth.
1838        let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1839        let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1840        let err = service
1841            .get(None, None, item.id)
1842            .await
1843            .expect_err("reading an item with no machine_state must fail closed");
1844        assert!(
1845            matches!(err, WorkGraphError::Store(_)),
1846            "expected a typed Store deserialization error, got: {err:?}"
1847        );
1848    }
1849
1850    #[cfg(not(target_arch = "wasm32"))]
1851    #[tokio::test]
1852    async fn sqlite_event_replay_rebuilds_projection() {
1853        let dir = tempfile::tempdir().expect("tempdir");
1854        let path = dir.path().join("workgraph.sqlite3");
1855        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1856        let service =
1857            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1858        let blocker = service
1859            .create(CreateWorkItemRequest {
1860                realm_id: None,
1861                namespace: None,
1862                title: "blocker".to_string(),
1863                description: None,
1864                priority: Default::default(),
1865                completion_policy: Default::default(),
1866                labels: BTreeSet::new(),
1867                due_at: None,
1868                not_before: None,
1869                snoozed_until: None,
1870                external_refs: Vec::new(),
1871                evidence_refs: Vec::new(),
1872                status: None,
1873            })
1874            .await
1875            .expect("create blocker");
1876        let blocked = service
1877            .create(CreateWorkItemRequest {
1878                realm_id: None,
1879                namespace: None,
1880                title: "blocked".to_string(),
1881                description: None,
1882                priority: Default::default(),
1883                completion_policy: Default::default(),
1884                labels: BTreeSet::new(),
1885                due_at: None,
1886                not_before: None,
1887                snoozed_until: None,
1888                external_refs: Vec::new(),
1889                evidence_refs: Vec::new(),
1890                status: None,
1891            })
1892            .await
1893            .expect("create blocked");
1894        service
1895            .link(LinkWorkItemsRequest {
1896                realm_id: None,
1897                namespace: None,
1898                kind: WorkEdgeKind::Blocks,
1899                from_id: blocker.id.clone(),
1900                to_id: blocked.id.clone(),
1901            })
1902            .await
1903            .expect("link");
1904
1905        store
1906            .with_connection(|conn| {
1907                conn.execute("DELETE FROM workgraph_items", [])
1908                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1909                conn.execute("DELETE FROM workgraph_edges", [])
1910                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1911                Ok(())
1912            })
1913            .expect("clear projection");
1914
1915        let empty_items = store
1916            .list_items(WorkItemFilter {
1917                realm_id: Some("realm".to_string()),
1918                namespace: Some(WorkNamespace::default()),
1919                ..WorkItemFilter::default()
1920            })
1921            .await
1922            .expect("empty list");
1923        assert!(empty_items.is_empty());
1924
1925        store
1926            .rebuild_projection_from_events()
1927            .expect("rebuild projection");
1928
1929        let rebuilt_items = store
1930            .list_items(WorkItemFilter {
1931                realm_id: Some("realm".to_string()),
1932                namespace: Some(WorkNamespace::default()),
1933                ..WorkItemFilter::default()
1934            })
1935            .await
1936            .expect("rebuilt list");
1937        assert_eq!(rebuilt_items.len(), 2);
1938        let rebuilt_edges = store
1939            .list_edges("realm", &WorkNamespace::default())
1940            .await
1941            .expect("rebuilt edges");
1942        assert_eq!(rebuilt_edges.len(), 1);
1943    }
1944
1945    #[cfg(not(target_arch = "wasm32"))]
1946    #[tokio::test]
1947    async fn sqlite_event_replay_stops_attention_for_terminal_goal_items() {
1948        let dir = tempfile::tempdir().expect("tempdir");
1949        let path = dir.path().join("workgraph.sqlite3");
1950        let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1951        let service =
1952            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1953        let session_id = meerkat_core::SessionId::parse("019e63c2-0000-7000-8000-000000000045")
1954            .expect("session id");
1955        let goal = service
1956            .create_goal(GoalCreateRequest {
1957                realm_id: None,
1958                namespace: None,
1959                title: "terminal goal".to_string(),
1960                description: None,
1961                target: GoalAttentionTarget::Session { session_id },
1962                mode: WorkAttentionMode::Pursue,
1963                completion_policy: WorkCompletionPolicy::SelfAttest,
1964                delegated_authority: AttentionDelegatedAuthority::CloseIfPolicyAllows,
1965                projection_policy: AttentionProjectionPolicy::default(),
1966            })
1967            .await
1968            .expect("create goal");
1969        service
1970            .goal_request_close(GoalRequestCloseRequest {
1971                binding_id: goal.attention.binding_id.clone(),
1972                realm_id: None,
1973                namespace: None,
1974                expected_revision: goal.item.revision,
1975                status: GoalTerminalStatus::Completed,
1976            })
1977            .await
1978            .expect("close goal");
1979
1980        store
1981            .with_connection(|conn| {
1982                conn.execute("DELETE FROM workgraph_items", [])
1983                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1984                conn.execute("DELETE FROM workgraph_attention", [])
1985                    .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1986                Ok(())
1987            })
1988            .expect("clear projection");
1989
1990        store
1991            .rebuild_projection_from_events()
1992            .expect("rebuild projection");
1993
1994        let binding = store
1995            .get_attention(
1996                "realm",
1997                &WorkNamespace::default(),
1998                &goal.attention.binding_id,
1999            )
2000            .await
2001            .expect("read binding")
2002            .expect("rebuilt binding");
2003        assert_eq!(binding.status, WorkAttentionStatus::Stopped);
2004    }
2005
2006    #[cfg(not(target_arch = "wasm32"))]
2007    #[tokio::test]
2008    async fn sqlite_store_duplicate_edge_does_not_append_event() {
2009        let dir = tempfile::tempdir().expect("tempdir");
2010        let path = dir.path().join("workgraph.sqlite3");
2011        let store = crate::SqliteWorkGraphStore::open(&path).expect("open");
2012        let edge = test_edge();
2013        store
2014            .insert_edge(edge.clone(), link_event(&edge))
2015            .await
2016            .expect("insert edge");
2017
2018        let error = store
2019            .insert_edge(edge.clone(), link_event(&edge))
2020            .await
2021            .expect_err("duplicate edge should fail");
2022        assert!(matches!(error, WorkGraphError::Conflict(_)));
2023
2024        let events = store
2025            .list_events(WorkGraphEventFilter {
2026                realm_id: Some(edge.realm_id),
2027                namespace: Some(edge.namespace),
2028                all_namespaces: false,
2029                after_seq: None,
2030                limit: None,
2031            })
2032            .await
2033            .expect("events");
2034        assert_eq!(events.len(), 1);
2035    }
2036}