1use std::collections::BTreeMap;
2#[cfg(not(target_arch = "wasm32"))]
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8#[cfg(not(target_arch = "wasm32"))]
9use rusqlite::{
10 Connection, ErrorCode, OptionalExtension, Transaction, TransactionBehavior, params,
11};
12
13use crate::WorkGraphError;
14use crate::types::{
15 AttentionListRequest, WorkAttentionBinding, WorkAttentionBindingId, WorkAttentionStatus,
16 WorkEdge, WorkGraphEvent, WorkGraphEventKind, WorkItem, WorkItemFilter, WorkItemId,
17 WorkNamespace,
18};
19use crate::{WorkAttentionMachine, WorkGraphMachine};
20
21#[cfg(target_arch = "wasm32")]
22use crate::tokio::sync::RwLock;
23#[cfg(not(target_arch = "wasm32"))]
24use tokio::sync::RwLock;
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum WorkGraphStoreKind {
28 Disabled,
29 Memory,
30 Sqlite,
31 Custom,
32}
33
34impl WorkGraphStoreKind {
35 pub fn as_str(self) -> &'static str {
36 match self {
37 Self::Disabled => "disabled",
38 Self::Memory => "memory",
39 Self::Sqlite => "sqlite",
40 Self::Custom => "custom",
41 }
42 }
43}
44
45impl std::fmt::Display for WorkGraphStoreKind {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.write_str(self.as_str())
48 }
49}
50
51#[derive(Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
52#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
53pub struct WorkGraphEventFilter {
54 pub realm_id: Option<String>,
55 pub namespace: Option<WorkNamespace>,
56 #[serde(default)]
57 pub all_namespaces: bool,
58 pub after_seq: Option<i64>,
59 pub limit: Option<usize>,
60}
61
62#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
63#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
64pub trait WorkGraphStore: Send + Sync {
65 fn kind(&self) -> WorkGraphStoreKind;
66
67 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError>;
68
69 async fn insert_item(
70 &self,
71 item: WorkItem,
72 event: WorkGraphEvent,
73 ) -> Result<WorkItem, WorkGraphError>;
74
75 async fn update_item_cas(
76 &self,
77 item: WorkItem,
78 expected_previous_revision: u64,
79 event: WorkGraphEvent,
80 ) -> Result<WorkItem, WorkGraphError>;
81
82 async fn update_item_and_attention_cas(
83 &self,
84 item: WorkItem,
85 expected_previous_revision: u64,
86 item_event: WorkGraphEvent,
87 attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
88 ) -> Result<WorkItem, WorkGraphError>;
89
90 async fn get_item(
91 &self,
92 realm_id: &str,
93 namespace: &WorkNamespace,
94 id: &WorkItemId,
95 ) -> Result<Option<WorkItem>, WorkGraphError>;
96
97 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError>;
98
99 async fn insert_goal(
100 &self,
101 _item: WorkItem,
102 _item_event: WorkGraphEvent,
103 _attention: WorkAttentionBinding,
104 _attention_event: WorkGraphEvent,
105 ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
106 Err(unsupported(self.kind()))
107 }
108
109 async fn update_attention_cas(
110 &self,
111 _attention: WorkAttentionBinding,
112 _expected_previous_revision: u64,
113 _event: WorkGraphEvent,
114 ) -> Result<WorkAttentionBinding, WorkGraphError> {
115 Err(unsupported(self.kind()))
116 }
117
118 async fn get_attention(
119 &self,
120 _realm_id: &str,
121 _namespace: &WorkNamespace,
122 _binding_id: &WorkAttentionBindingId,
123 ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
124 Err(unsupported(self.kind()))
125 }
126
127 async fn list_attention(
128 &self,
129 _filter: AttentionListRequest,
130 ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
131 Err(unsupported(self.kind()))
132 }
133
134 async fn insert_edge(
135 &self,
136 edge: WorkEdge,
137 event: WorkGraphEvent,
138 ) -> Result<WorkEdge, WorkGraphError>;
139
140 async fn insert_edge_validated(
141 &self,
142 _edge: WorkEdge,
143 _event: WorkGraphEvent,
144 ) -> Result<WorkEdge, WorkGraphError> {
145 Err(unsupported(self.kind()))
146 }
147
148 async fn list_edges(
149 &self,
150 realm_id: &str,
151 namespace: &WorkNamespace,
152 ) -> Result<Vec<WorkEdge>, WorkGraphError>;
153
154 async fn list_events(
155 &self,
156 filter: WorkGraphEventFilter,
157 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError>;
158}
159
160#[derive(Default)]
161pub struct DisabledWorkGraphStore;
162
163#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
164#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
165impl WorkGraphStore for DisabledWorkGraphStore {
166 fn kind(&self) -> WorkGraphStoreKind {
167 WorkGraphStoreKind::Disabled
168 }
169
170 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
171 Err(unsupported(self.kind()))
172 }
173
174 async fn insert_item(
175 &self,
176 _item: WorkItem,
177 _event: WorkGraphEvent,
178 ) -> Result<WorkItem, WorkGraphError> {
179 Err(unsupported(self.kind()))
180 }
181
182 async fn update_item_cas(
183 &self,
184 _item: WorkItem,
185 _expected_previous_revision: u64,
186 _event: WorkGraphEvent,
187 ) -> Result<WorkItem, WorkGraphError> {
188 Err(unsupported(self.kind()))
189 }
190
191 async fn update_item_and_attention_cas(
192 &self,
193 _item: WorkItem,
194 _expected_previous_revision: u64,
195 _item_event: WorkGraphEvent,
196 _attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
197 ) -> Result<WorkItem, WorkGraphError> {
198 Err(unsupported(self.kind()))
199 }
200
201 async fn get_item(
202 &self,
203 _realm_id: &str,
204 _namespace: &WorkNamespace,
205 _id: &WorkItemId,
206 ) -> Result<Option<WorkItem>, WorkGraphError> {
207 Err(unsupported(self.kind()))
208 }
209
210 async fn list_items(&self, _filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
211 Err(unsupported(self.kind()))
212 }
213
214 async fn insert_goal(
215 &self,
216 _item: WorkItem,
217 _item_event: WorkGraphEvent,
218 _attention: WorkAttentionBinding,
219 _attention_event: WorkGraphEvent,
220 ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
221 Err(unsupported(self.kind()))
222 }
223
224 async fn update_attention_cas(
225 &self,
226 _attention: WorkAttentionBinding,
227 _expected_previous_revision: u64,
228 _event: WorkGraphEvent,
229 ) -> Result<WorkAttentionBinding, WorkGraphError> {
230 Err(unsupported(self.kind()))
231 }
232
233 async fn get_attention(
234 &self,
235 _realm_id: &str,
236 _namespace: &WorkNamespace,
237 _binding_id: &WorkAttentionBindingId,
238 ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
239 Err(unsupported(self.kind()))
240 }
241
242 async fn list_attention(
243 &self,
244 _filter: AttentionListRequest,
245 ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
246 Err(unsupported(self.kind()))
247 }
248
249 async fn insert_edge(
250 &self,
251 _edge: WorkEdge,
252 _event: WorkGraphEvent,
253 ) -> Result<WorkEdge, WorkGraphError> {
254 Err(unsupported(self.kind()))
255 }
256
257 async fn insert_edge_validated(
258 &self,
259 _edge: WorkEdge,
260 _event: WorkGraphEvent,
261 ) -> Result<WorkEdge, WorkGraphError> {
262 Err(unsupported(self.kind()))
263 }
264
265 async fn list_edges(
266 &self,
267 _realm_id: &str,
268 _namespace: &WorkNamespace,
269 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
270 Err(unsupported(self.kind()))
271 }
272
273 async fn list_events(
274 &self,
275 _filter: WorkGraphEventFilter,
276 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
277 Err(unsupported(self.kind()))
278 }
279}
280
281fn unsupported(kind: WorkGraphStoreKind) -> WorkGraphError {
282 WorkGraphError::UnsupportedBackend(kind.to_string())
283}
284
285#[derive(Default)]
286pub struct MemoryWorkGraphStore {
287 inner: Arc<RwLock<MemoryWorkGraphState>>,
288}
289
290#[derive(Default)]
291struct MemoryWorkGraphState {
292 items: BTreeMap<(String, WorkNamespace, WorkItemId), WorkItem>,
293 attention: BTreeMap<(String, WorkNamespace, WorkAttentionBindingId), WorkAttentionBinding>,
294 edges: Vec<WorkEdge>,
295 events: Vec<WorkGraphEvent>,
296 next_event_seq: i64,
297}
298
299impl MemoryWorkGraphStore {
300 pub fn new() -> Self {
301 Self::default()
302 }
303}
304
305#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
306#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
307impl WorkGraphStore for MemoryWorkGraphStore {
308 fn kind(&self) -> WorkGraphStoreKind {
309 WorkGraphStoreKind::Memory
310 }
311
312 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
313 Ok(Utc::now())
314 }
315
316 async fn insert_item(
317 &self,
318 item: WorkItem,
319 event: WorkGraphEvent,
320 ) -> Result<WorkItem, WorkGraphError> {
321 WorkGraphMachine::validate_item_projection(&item)?;
322 let mut guard = self.inner.write().await;
323 let key = item_key(&item.realm_id, &item.namespace, &item.id);
324 if guard.items.contains_key(&key) {
325 return Err(WorkGraphError::Conflict(format!(
326 "work item {} already exists",
327 item.id
328 )));
329 }
330 guard.items.insert(key, item.clone());
331 guard.append_event(event);
332 Ok(item)
333 }
334
335 async fn update_item_cas(
336 &self,
337 item: WorkItem,
338 expected_previous_revision: u64,
339 event: WorkGraphEvent,
340 ) -> Result<WorkItem, WorkGraphError> {
341 WorkGraphMachine::validate_item_projection(&item)?;
342 let mut guard = self.inner.write().await;
343 let key = item_key(&item.realm_id, &item.namespace, &item.id);
344 let Some(current) = guard.items.get(&key) else {
345 return Err(WorkGraphError::not_found(
346 item.realm_id.clone(),
347 item.namespace.clone(),
348 item.id.clone(),
349 ));
350 };
351 if current.revision != expected_previous_revision {
352 return Err(WorkGraphError::StaleRevision {
353 id: item.id.clone(),
354 expected: expected_previous_revision,
355 actual: current.revision,
356 });
357 }
358 guard.items.insert(key, item.clone());
359 guard.append_event(event);
360 Ok(item)
361 }
362
363 async fn get_item(
364 &self,
365 realm_id: &str,
366 namespace: &WorkNamespace,
367 id: &WorkItemId,
368 ) -> Result<Option<WorkItem>, WorkGraphError> {
369 let guard = self.inner.read().await;
370 Ok(guard.items.get(&item_key(realm_id, namespace, id)).cloned())
371 }
372
373 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
374 let guard = self.inner.read().await;
375 let mut items = guard
376 .items
377 .values()
378 .filter(|item| item_matches_filter(item, &filter))
379 .cloned()
380 .collect::<Vec<_>>();
381 items.sort_by(|left, right| {
382 left.updated_at
383 .cmp(&right.updated_at)
384 .then_with(|| left.id.cmp(&right.id))
385 });
386 if let Some(limit) = filter.limit {
387 items.truncate(limit);
388 }
389 Ok(items)
390 }
391
392 async fn insert_goal(
393 &self,
394 item: WorkItem,
395 item_event: WorkGraphEvent,
396 attention: WorkAttentionBinding,
397 attention_event: WorkGraphEvent,
398 ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
399 WorkGraphMachine::validate_item_projection(&item)?;
400 let mut guard = self.inner.write().await;
401 let item_key = item_key(&item.realm_id, &item.namespace, &item.id);
402 if guard.items.contains_key(&item_key) {
403 return Err(WorkGraphError::Conflict(format!(
404 "work item {} already exists",
405 item.id
406 )));
407 }
408 let attention_key = attention_key(
409 &attention.work_ref.realm_id,
410 &attention.work_ref.namespace,
411 &attention.binding_id,
412 );
413 if guard.attention.contains_key(&attention_key) {
414 return Err(WorkGraphError::Conflict(format!(
415 "work attention binding {} already exists",
416 attention.binding_id
417 )));
418 }
419 guard.items.insert(item_key, item.clone());
420 guard.attention.insert(attention_key, attention.clone());
421 guard.append_event(item_event);
422 guard.append_event(attention_event);
423 Ok((item, attention))
424 }
425
426 async fn update_attention_cas(
427 &self,
428 attention: WorkAttentionBinding,
429 expected_previous_revision: u64,
430 event: WorkGraphEvent,
431 ) -> Result<WorkAttentionBinding, WorkGraphError> {
432 let mut guard = self.inner.write().await;
433 let key = attention_key(
434 &attention.work_ref.realm_id,
435 &attention.work_ref.namespace,
436 &attention.binding_id,
437 );
438 let Some(current) = guard.attention.get(&key) else {
439 return Err(WorkGraphError::not_found(
440 attention.work_ref.realm_id.clone(),
441 attention.work_ref.namespace.clone(),
442 attention.work_ref.item_id.clone(),
443 ));
444 };
445 if current.machine_state.revision != expected_previous_revision {
446 return Err(WorkGraphError::StaleRevision {
447 id: attention.work_ref.item_id.clone(),
448 expected: expected_previous_revision,
449 actual: current.machine_state.revision,
450 });
451 }
452 guard.attention.insert(key, attention.clone());
453 guard.append_event(event);
454 Ok(attention)
455 }
456
457 async fn update_item_and_attention_cas(
458 &self,
459 item: WorkItem,
460 expected_previous_revision: u64,
461 item_event: WorkGraphEvent,
462 attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
463 ) -> Result<WorkItem, WorkGraphError> {
464 WorkGraphMachine::validate_item_projection(&item)?;
465 let mut guard = self.inner.write().await;
466 let key = item_key(&item.realm_id, &item.namespace, &item.id);
467 let Some(current) = guard.items.get(&key) else {
468 return Err(WorkGraphError::not_found(
469 item.realm_id.clone(),
470 item.namespace.clone(),
471 item.id.clone(),
472 ));
473 };
474 if current.revision != expected_previous_revision {
475 return Err(WorkGraphError::StaleRevision {
476 id: item.id.clone(),
477 expected: expected_previous_revision,
478 actual: current.revision,
479 });
480 }
481 for (attention, expected_revision, _) in &attention_updates {
482 let key = attention_key(
483 &attention.work_ref.realm_id,
484 &attention.work_ref.namespace,
485 &attention.binding_id,
486 );
487 let Some(current) = guard.attention.get(&key) else {
488 return Err(WorkGraphError::not_found(
489 attention.work_ref.realm_id.clone(),
490 attention.work_ref.namespace.clone(),
491 attention.work_ref.item_id.clone(),
492 ));
493 };
494 if current.machine_state.revision != *expected_revision {
495 return Err(WorkGraphError::StaleRevision {
496 id: attention.work_ref.item_id.clone(),
497 expected: *expected_revision,
498 actual: current.machine_state.revision,
499 });
500 }
501 }
502 guard.items.insert(key, item.clone());
503 guard.append_event(item_event);
504 for (attention, _, event) in attention_updates {
505 let key = attention_key(
506 &attention.work_ref.realm_id,
507 &attention.work_ref.namespace,
508 &attention.binding_id,
509 );
510 guard.attention.insert(key, attention);
511 guard.append_event(event);
512 }
513 Ok(item)
514 }
515
516 async fn get_attention(
517 &self,
518 realm_id: &str,
519 namespace: &WorkNamespace,
520 binding_id: &WorkAttentionBindingId,
521 ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
522 let guard = self.inner.read().await;
523 Ok(guard
524 .attention
525 .get(&attention_key(realm_id, namespace, binding_id))
526 .cloned())
527 }
528
529 async fn list_attention(
530 &self,
531 filter: AttentionListRequest,
532 ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
533 let guard = self.inner.read().await;
534 let mut bindings = guard
535 .attention
536 .values()
537 .filter(|binding| attention_matches_filter(binding, &filter))
538 .cloned()
539 .collect::<Vec<_>>();
540 bindings.sort_by(|left, right| {
541 left.updated_at
542 .cmp(&right.updated_at)
543 .then_with(|| left.binding_id.cmp(&right.binding_id))
544 });
545 Ok(bindings)
546 }
547
548 async fn insert_edge(
549 &self,
550 edge: WorkEdge,
551 event: WorkGraphEvent,
552 ) -> Result<WorkEdge, WorkGraphError> {
553 let mut guard = self.inner.write().await;
554 if guard.edges.iter().any(|existing| existing == &edge) {
555 return Err(duplicate_edge_error(&edge));
556 }
557 guard.edges.push(edge.clone());
558 guard.append_event(event);
559 Ok(edge)
560 }
561
562 async fn insert_edge_validated(
563 &self,
564 edge: WorkEdge,
565 event: WorkGraphEvent,
566 ) -> Result<WorkEdge, WorkGraphError> {
567 let mut guard = self.inner.write().await;
568 if guard.edges.iter().any(|existing| existing == &edge) {
569 return Err(duplicate_edge_error(&edge));
570 }
571 let existing_edges = guard
572 .edges
573 .iter()
574 .filter(|existing| {
575 existing.realm_id == edge.realm_id && existing.namespace == edge.namespace
576 })
577 .cloned()
578 .collect::<Vec<_>>();
579 let existing_items = guard
580 .items
581 .values()
582 .filter(|item| item.realm_id == edge.realm_id && item.namespace == edge.namespace)
583 .cloned()
584 .collect::<Vec<_>>();
585 WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
586 guard.edges.push(edge.clone());
587 guard.append_event(event);
588 Ok(edge)
589 }
590
591 async fn list_edges(
592 &self,
593 realm_id: &str,
594 namespace: &WorkNamespace,
595 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
596 let guard = self.inner.read().await;
597 Ok(guard
598 .edges
599 .iter()
600 .filter(|edge| edge.realm_id == realm_id && edge.namespace == *namespace)
601 .cloned()
602 .collect())
603 }
604
605 async fn list_events(
606 &self,
607 filter: WorkGraphEventFilter,
608 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
609 let guard = self.inner.read().await;
610 let mut events = guard
611 .events
612 .iter()
613 .filter(|event| event_matches_filter(event, &filter))
614 .cloned()
615 .collect::<Vec<_>>();
616 events.sort_by_key(|event| event.seq.unwrap_or_default());
617 if let Some(limit) = filter.limit {
618 events.truncate(limit);
619 }
620 Ok(events)
621 }
622}
623
624impl MemoryWorkGraphState {
625 fn append_event(&mut self, mut event: WorkGraphEvent) {
626 self.next_event_seq += 1;
627 event.seq = Some(self.next_event_seq);
628 self.events.push(event);
629 }
630}
631
632fn item_key(
633 realm_id: &str,
634 namespace: &WorkNamespace,
635 id: &WorkItemId,
636) -> (String, WorkNamespace, WorkItemId) {
637 (realm_id.to_string(), namespace.clone(), id.clone())
638}
639
640fn attention_key(
641 realm_id: &str,
642 namespace: &WorkNamespace,
643 id: &WorkAttentionBindingId,
644) -> (String, WorkNamespace, WorkAttentionBindingId) {
645 (realm_id.to_string(), namespace.clone(), id.clone())
646}
647
648fn item_matches_filter(item: &WorkItem, filter: &WorkItemFilter) -> bool {
649 if let Some(realm_id) = &filter.realm_id
650 && &item.realm_id != realm_id
651 {
652 return false;
653 }
654 if !filter.all_namespaces
655 && let Some(namespace) = &filter.namespace
656 && &item.namespace != namespace
657 {
658 return false;
659 }
660 if !filter.statuses.is_empty() && !filter.statuses.contains(&item.status) {
661 return false;
662 }
663 if !filter.include_terminal && WorkGraphMachine::classify_terminality(item).unwrap_or(true) {
669 return false;
670 }
671 filter
672 .labels
673 .iter()
674 .all(|label| item.labels.contains(label))
675}
676
677fn attention_matches_filter(binding: &WorkAttentionBinding, filter: &AttentionListRequest) -> bool {
678 if let Some(realm_id) = &filter.realm_id
679 && &binding.work_ref.realm_id != realm_id
680 {
681 return false;
682 }
683 if let Some(namespace) = &filter.namespace
684 && &binding.work_ref.namespace != namespace
685 {
686 return false;
687 }
688 if let Some(target) = &filter.target
689 && &binding.target != target
690 {
691 return false;
692 }
693 if let Some(status) = &filter.status
694 && !attention_status_matches_filter(&binding.status, status)
695 {
696 return false;
697 }
698 true
699}
700
701fn attention_status_matches_filter(
702 actual: &crate::types::WorkAttentionStatus,
703 filter: &crate::types::WorkAttentionStatus,
704) -> bool {
705 use crate::types::WorkAttentionStatus;
706
707 match (actual, filter) {
708 (WorkAttentionStatus::Active, WorkAttentionStatus::Active)
709 | (WorkAttentionStatus::Superseded, WorkAttentionStatus::Superseded)
710 | (WorkAttentionStatus::Stopped, WorkAttentionStatus::Stopped) => true,
711 (WorkAttentionStatus::Paused { .. }, WorkAttentionStatus::Paused { until: None }) => true,
712 (
713 WorkAttentionStatus::Paused {
714 until: Some(actual_until),
715 },
716 WorkAttentionStatus::Paused {
717 until: Some(filter_until),
718 },
719 ) => actual_until == filter_until,
720 _ => false,
721 }
722}
723
724fn event_matches_filter(event: &WorkGraphEvent, filter: &WorkGraphEventFilter) -> bool {
725 if let Some(after_seq) = filter.after_seq
726 && event.seq.unwrap_or_default() <= after_seq
727 {
728 return false;
729 }
730 if let Some(realm_id) = &filter.realm_id
731 && &event.realm_id != realm_id
732 {
733 return false;
734 }
735 if !filter.all_namespaces
736 && let Some(namespace) = &filter.namespace
737 && &event.namespace != namespace
738 {
739 return false;
740 }
741 true
742}
743
744#[cfg(not(target_arch = "wasm32"))]
745pub struct SqliteWorkGraphStore {
746 path: PathBuf,
747}
748
749#[cfg(not(target_arch = "wasm32"))]
750impl SqliteWorkGraphStore {
751 pub fn open(path: impl Into<PathBuf>) -> Result<Self, WorkGraphError> {
752 let store = Self { path: path.into() };
753 store.with_connection(|_conn| Ok(()))?;
754 Ok(store)
755 }
756
757 pub fn path(&self) -> &Path {
758 &self.path
759 }
760
761 pub fn rebuild_projection_from_events(&self) -> Result<(), WorkGraphError> {
762 self.with_connection(|conn| {
763 let tx = conn
764 .transaction()
765 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
766 tx.execute("DELETE FROM workgraph_items", [])
767 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
768 tx.execute("DELETE FROM workgraph_edges", [])
769 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
770 tx.execute("DELETE FROM workgraph_attention", [])
771 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
772
773 let events = {
774 let mut stmt = tx
775 .prepare("SELECT event_json FROM workgraph_events ORDER BY seq ASC")
776 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
777 let rows = stmt
778 .query_map([], |row| row_json::<WorkGraphEvent>(row, 0))
779 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
780 let mut events = Vec::new();
781 for row in rows {
782 events.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
783 }
784 events
785 };
786
787 for event in events {
788 replay_event_tx(&tx, &event)?;
789 }
790 normalize_attention_for_terminal_items_tx(&tx)?;
791 tx.commit()
792 .map_err(|err| WorkGraphError::Store(err.to_string()))
793 })
794 }
795
796 fn with_connection<T>(
797 &self,
798 f: impl FnOnce(&mut Connection) -> Result<T, WorkGraphError>,
799 ) -> Result<T, WorkGraphError> {
800 if let Some(parent) = self.path.parent() {
801 std::fs::create_dir_all(parent)
802 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
803 }
804 let mut conn =
805 Connection::open(&self.path).map_err(|err| WorkGraphError::Store(err.to_string()))?;
806 conn.pragma_update(None, "journal_mode", "WAL")
807 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
808 conn.pragma_update(None, "synchronous", "NORMAL")
809 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
810 init_sqlite_schema(&conn)?;
811 f(&mut conn)
812 }
813}
814
815#[cfg(not(target_arch = "wasm32"))]
816#[async_trait]
817impl WorkGraphStore for SqliteWorkGraphStore {
818 fn kind(&self) -> WorkGraphStoreKind {
819 WorkGraphStoreKind::Sqlite
820 }
821
822 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, WorkGraphError> {
823 Ok(Utc::now())
824 }
825
826 async fn insert_item(
827 &self,
828 item: WorkItem,
829 event: WorkGraphEvent,
830 ) -> Result<WorkItem, WorkGraphError> {
831 WorkGraphMachine::validate_item_projection(&item)?;
832 self.with_connection(|conn| {
833 let tx = conn
834 .transaction()
835 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
836 insert_item_tx(&tx, &item)?;
837 insert_event_tx(&tx, &event)?;
838 tx.commit()
839 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
840 Ok(item)
841 })
842 }
843
844 async fn update_item_cas(
845 &self,
846 item: WorkItem,
847 expected_previous_revision: u64,
848 event: WorkGraphEvent,
849 ) -> Result<WorkItem, WorkGraphError> {
850 WorkGraphMachine::validate_item_projection(&item)?;
851 self.with_connection(|conn| {
852 let tx = conn
853 .transaction()
854 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
855 let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
856 if changed == 0 {
857 let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
858 return match actual {
859 Some(actual) => Err(WorkGraphError::StaleRevision {
860 id: item.id,
861 expected: expected_previous_revision,
862 actual,
863 }),
864 None => Err(WorkGraphError::not_found(
865 item.realm_id,
866 item.namespace,
867 item.id,
868 )),
869 };
870 }
871 insert_event_tx(&tx, &event)?;
872 tx.commit()
873 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
874 Ok(item)
875 })
876 }
877
878 async fn get_item(
879 &self,
880 realm_id: &str,
881 namespace: &WorkNamespace,
882 id: &WorkItemId,
883 ) -> Result<Option<WorkItem>, WorkGraphError> {
884 self.with_connection(|conn| select_item(conn, realm_id, namespace, id))
885 }
886
887 async fn list_items(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
888 self.with_connection(|conn| list_sqlite_items(conn, &filter))
889 }
890
891 async fn insert_goal(
892 &self,
893 item: WorkItem,
894 item_event: WorkGraphEvent,
895 attention: WorkAttentionBinding,
896 attention_event: WorkGraphEvent,
897 ) -> Result<(WorkItem, WorkAttentionBinding), WorkGraphError> {
898 WorkGraphMachine::validate_item_projection(&item)?;
899 self.with_connection(|conn| {
900 let tx = conn
901 .transaction()
902 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
903 insert_item_tx(&tx, &item)?;
904 insert_attention_tx(&tx, &attention)?;
905 insert_event_tx(&tx, &item_event)?;
906 insert_event_tx(&tx, &attention_event)?;
907 tx.commit()
908 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
909 Ok((item, attention))
910 })
911 }
912
913 async fn update_attention_cas(
914 &self,
915 attention: WorkAttentionBinding,
916 expected_previous_revision: u64,
917 event: WorkGraphEvent,
918 ) -> Result<WorkAttentionBinding, WorkGraphError> {
919 self.with_connection(|conn| {
920 let tx = conn
921 .transaction()
922 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
923 let changed = update_attention_tx(&tx, &attention, expected_previous_revision)?;
924 if changed == 0 {
925 let actual = current_attention_revision_tx(
926 &tx,
927 &attention.work_ref.realm_id,
928 &attention.work_ref.namespace,
929 &attention.binding_id,
930 )?;
931 return match actual {
932 Some(actual) => Err(WorkGraphError::StaleRevision {
933 id: attention.work_ref.item_id,
934 expected: expected_previous_revision,
935 actual,
936 }),
937 None => Err(WorkGraphError::not_found(
938 attention.work_ref.realm_id,
939 attention.work_ref.namespace,
940 attention.work_ref.item_id,
941 )),
942 };
943 }
944 insert_event_tx(&tx, &event)?;
945 tx.commit()
946 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
947 Ok(attention)
948 })
949 }
950
951 async fn update_item_and_attention_cas(
952 &self,
953 item: WorkItem,
954 expected_previous_revision: u64,
955 item_event: WorkGraphEvent,
956 attention_updates: Vec<(WorkAttentionBinding, u64, WorkGraphEvent)>,
957 ) -> Result<WorkItem, WorkGraphError> {
958 WorkGraphMachine::validate_item_projection(&item)?;
959 self.with_connection(|conn| {
960 let tx = conn
961 .transaction()
962 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
963 let changed = update_item_tx(&tx, &item, expected_previous_revision)?;
964 if changed == 0 {
965 let actual = current_revision_tx(&tx, &item.realm_id, &item.namespace, &item.id)?;
966 return match actual {
967 Some(actual) => Err(WorkGraphError::StaleRevision {
968 id: item.id,
969 expected: expected_previous_revision,
970 actual,
971 }),
972 None => Err(WorkGraphError::not_found(
973 item.realm_id,
974 item.namespace,
975 item.id,
976 )),
977 };
978 }
979 insert_event_tx(&tx, &item_event)?;
980 for (attention, expected_revision, event) in &attention_updates {
981 let changed = update_attention_tx(&tx, attention, *expected_revision)?;
982 if changed == 0 {
983 let actual = current_attention_revision_tx(
984 &tx,
985 &attention.work_ref.realm_id,
986 &attention.work_ref.namespace,
987 &attention.binding_id,
988 )?;
989 return match actual {
990 Some(actual) => Err(WorkGraphError::StaleRevision {
991 id: attention.work_ref.item_id.clone(),
992 expected: *expected_revision,
993 actual,
994 }),
995 None => Err(WorkGraphError::not_found(
996 attention.work_ref.realm_id.clone(),
997 attention.work_ref.namespace.clone(),
998 attention.work_ref.item_id.clone(),
999 )),
1000 };
1001 }
1002 insert_event_tx(&tx, event)?;
1003 }
1004 tx.commit()
1005 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1006 Ok(item)
1007 })
1008 }
1009
1010 async fn get_attention(
1011 &self,
1012 realm_id: &str,
1013 namespace: &WorkNamespace,
1014 binding_id: &WorkAttentionBindingId,
1015 ) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
1016 self.with_connection(|conn| select_attention(conn, realm_id, namespace, binding_id))
1017 }
1018
1019 async fn list_attention(
1020 &self,
1021 filter: AttentionListRequest,
1022 ) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
1023 self.with_connection(|conn| list_sqlite_attention(conn, &filter))
1024 }
1025
1026 async fn insert_edge(
1027 &self,
1028 edge: WorkEdge,
1029 event: WorkGraphEvent,
1030 ) -> Result<WorkEdge, WorkGraphError> {
1031 self.with_connection(|conn| {
1032 let tx = conn
1033 .transaction()
1034 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1035 insert_edge_tx(&tx, &edge)?;
1036 insert_event_tx(&tx, &event)?;
1037 tx.commit()
1038 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1039 Ok(edge)
1040 })
1041 }
1042
1043 async fn insert_edge_validated(
1044 &self,
1045 edge: WorkEdge,
1046 event: WorkGraphEvent,
1047 ) -> Result<WorkEdge, WorkGraphError> {
1048 self.with_connection(|conn| {
1049 let tx = conn
1050 .transaction_with_behavior(TransactionBehavior::Immediate)
1051 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1052 let existing_edges = list_sqlite_edges(&tx, &edge.realm_id, &edge.namespace)?;
1053 let existing_items = list_sqlite_items(
1054 &tx,
1055 &WorkItemFilter {
1056 realm_id: Some(edge.realm_id.clone()),
1057 namespace: Some(edge.namespace.clone()),
1058 include_terminal: true,
1059 ..WorkItemFilter::default()
1060 },
1061 )?;
1062 WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
1063 insert_edge_tx(&tx, &edge)?;
1064 insert_event_tx(&tx, &event)?;
1065 tx.commit()
1066 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1067 Ok(edge)
1068 })
1069 }
1070
1071 async fn list_edges(
1072 &self,
1073 realm_id: &str,
1074 namespace: &WorkNamespace,
1075 ) -> Result<Vec<WorkEdge>, WorkGraphError> {
1076 self.with_connection(|conn| list_sqlite_edges(conn, realm_id, namespace))
1077 }
1078
1079 async fn list_events(
1080 &self,
1081 filter: WorkGraphEventFilter,
1082 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
1083 self.with_connection(|conn| list_sqlite_events(conn, &filter))
1084 }
1085}
1086
1087#[cfg(not(target_arch = "wasm32"))]
1088fn init_sqlite_schema(conn: &Connection) -> Result<(), WorkGraphError> {
1089 conn.execute_batch(
1090 r"
1091 CREATE TABLE IF NOT EXISTS workgraph_items (
1092 realm_id TEXT NOT NULL,
1093 namespace TEXT NOT NULL,
1094 item_id TEXT NOT NULL,
1095 revision INTEGER NOT NULL,
1096 updated_at_utc TEXT NOT NULL,
1097 item_json TEXT NOT NULL,
1098 PRIMARY KEY (realm_id, namespace, item_id)
1099 );
1100 CREATE INDEX IF NOT EXISTS idx_workgraph_items_realm_namespace_updated
1101 ON workgraph_items (realm_id, namespace, updated_at_utc);
1102
1103 CREATE TABLE IF NOT EXISTS workgraph_attention (
1104 realm_id TEXT NOT NULL,
1105 namespace TEXT NOT NULL,
1106 binding_id TEXT NOT NULL,
1107 revision INTEGER NOT NULL,
1108 updated_at_utc TEXT NOT NULL,
1109 attention_json TEXT NOT NULL,
1110 PRIMARY KEY (realm_id, namespace, binding_id)
1111 );
1112 CREATE INDEX IF NOT EXISTS idx_workgraph_attention_realm_namespace_updated
1113 ON workgraph_attention (realm_id, namespace, updated_at_utc);
1114
1115 CREATE TABLE IF NOT EXISTS workgraph_edges (
1116 realm_id TEXT NOT NULL,
1117 namespace TEXT NOT NULL,
1118 edge_kind TEXT NOT NULL,
1119 from_id TEXT NOT NULL,
1120 to_id TEXT NOT NULL,
1121 edge_json TEXT NOT NULL,
1122 PRIMARY KEY (realm_id, namespace, edge_kind, from_id, to_id)
1123 );
1124
1125 CREATE TABLE IF NOT EXISTS workgraph_events (
1126 seq INTEGER PRIMARY KEY AUTOINCREMENT,
1127 realm_id TEXT NOT NULL,
1128 namespace TEXT NOT NULL,
1129 item_id TEXT,
1130 event_kind TEXT NOT NULL,
1131 at_utc TEXT NOT NULL,
1132 event_json TEXT NOT NULL
1133 );
1134 CREATE INDEX IF NOT EXISTS idx_workgraph_events_realm_namespace_seq
1135 ON workgraph_events (realm_id, namespace, seq);
1136 ",
1137 )
1138 .map_err(|err| WorkGraphError::Store(err.to_string()))
1139}
1140
1141#[cfg(not(target_arch = "wasm32"))]
1142fn insert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
1143 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1144 tx.execute(
1145 "INSERT INTO workgraph_items (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
1146 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1147 params![
1148 item.realm_id,
1149 item.namespace.as_str(),
1150 item.id.as_str(),
1151 item.revision,
1152 item.updated_at.to_rfc3339(),
1153 json,
1154 ],
1155 )
1156 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1157 Ok(())
1158}
1159
1160#[cfg(not(target_arch = "wasm32"))]
1161fn update_item_tx(
1162 tx: &Transaction<'_>,
1163 item: &WorkItem,
1164 expected_previous_revision: u64,
1165) -> Result<usize, WorkGraphError> {
1166 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1167 tx.execute(
1168 "UPDATE workgraph_items
1169 SET revision = ?4, updated_at_utc = ?5, item_json = ?6
1170 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3 AND revision = ?7",
1171 params![
1172 item.realm_id,
1173 item.namespace.as_str(),
1174 item.id.as_str(),
1175 item.revision,
1176 item.updated_at.to_rfc3339(),
1177 json,
1178 expected_previous_revision,
1179 ],
1180 )
1181 .map_err(|err| WorkGraphError::Store(err.to_string()))
1182}
1183
1184#[cfg(not(target_arch = "wasm32"))]
1185fn upsert_item_tx(tx: &Transaction<'_>, item: &WorkItem) -> Result<(), WorkGraphError> {
1186 let json = serde_json::to_string(item).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1187 tx.execute(
1188 "INSERT INTO workgraph_items
1189 (realm_id, namespace, item_id, revision, updated_at_utc, item_json)
1190 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1191 ON CONFLICT(realm_id, namespace, item_id) DO UPDATE SET
1192 revision = excluded.revision,
1193 updated_at_utc = excluded.updated_at_utc,
1194 item_json = excluded.item_json",
1195 params![
1196 item.realm_id,
1197 item.namespace.as_str(),
1198 item.id.as_str(),
1199 item.revision,
1200 item.updated_at.to_rfc3339(),
1201 json,
1202 ],
1203 )
1204 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1205 Ok(())
1206}
1207
1208#[cfg(not(target_arch = "wasm32"))]
1209fn current_revision_tx(
1210 tx: &Transaction<'_>,
1211 realm_id: &str,
1212 namespace: &WorkNamespace,
1213 id: &WorkItemId,
1214) -> Result<Option<u64>, WorkGraphError> {
1215 tx.query_row(
1216 "SELECT revision FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1217 params![realm_id, namespace.as_str(), id.as_str()],
1218 |row| row.get::<_, u64>(0),
1219 )
1220 .optional()
1221 .map_err(|err| WorkGraphError::Store(err.to_string()))
1222}
1223
1224#[cfg(not(target_arch = "wasm32"))]
1225fn insert_attention_tx(
1226 tx: &Transaction<'_>,
1227 attention: &WorkAttentionBinding,
1228) -> Result<(), WorkGraphError> {
1229 let json =
1230 serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1231 tx.execute(
1232 "INSERT INTO workgraph_attention
1233 (realm_id, namespace, binding_id, revision, updated_at_utc, attention_json)
1234 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1235 params![
1236 attention.work_ref.realm_id,
1237 attention.work_ref.namespace.as_str(),
1238 attention.binding_id.as_str(),
1239 attention.machine_state.revision,
1240 attention.updated_at.to_rfc3339(),
1241 json,
1242 ],
1243 )
1244 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1245 Ok(())
1246}
1247
1248#[cfg(not(target_arch = "wasm32"))]
1249fn update_attention_tx(
1250 tx: &Transaction<'_>,
1251 attention: &WorkAttentionBinding,
1252 expected_previous_revision: u64,
1253) -> Result<usize, WorkGraphError> {
1254 let json =
1255 serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1256 tx.execute(
1257 "UPDATE workgraph_attention
1258 SET revision = ?4, updated_at_utc = ?5, attention_json = ?6
1259 WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3 AND revision = ?7",
1260 params![
1261 attention.work_ref.realm_id,
1262 attention.work_ref.namespace.as_str(),
1263 attention.binding_id.as_str(),
1264 attention.machine_state.revision,
1265 attention.updated_at.to_rfc3339(),
1266 json,
1267 expected_previous_revision,
1268 ],
1269 )
1270 .map_err(|err| WorkGraphError::Store(err.to_string()))
1271}
1272
1273#[cfg(not(target_arch = "wasm32"))]
1274fn upsert_attention_tx(
1275 tx: &Transaction<'_>,
1276 attention: &WorkAttentionBinding,
1277) -> Result<(), WorkGraphError> {
1278 let json =
1279 serde_json::to_string(attention).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1280 tx.execute(
1281 "INSERT INTO workgraph_attention
1282 (realm_id, namespace, binding_id, revision, updated_at_utc, attention_json)
1283 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
1284 ON CONFLICT(realm_id, namespace, binding_id) DO UPDATE SET
1285 revision = excluded.revision,
1286 updated_at_utc = excluded.updated_at_utc,
1287 attention_json = excluded.attention_json",
1288 params![
1289 attention.work_ref.realm_id,
1290 attention.work_ref.namespace.as_str(),
1291 attention.binding_id.as_str(),
1292 attention.machine_state.revision,
1293 attention.updated_at.to_rfc3339(),
1294 json,
1295 ],
1296 )
1297 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1298 Ok(())
1299}
1300
1301#[cfg(not(target_arch = "wasm32"))]
1302fn current_attention_revision_tx(
1303 tx: &Transaction<'_>,
1304 realm_id: &str,
1305 namespace: &WorkNamespace,
1306 binding_id: &WorkAttentionBindingId,
1307) -> Result<Option<u64>, WorkGraphError> {
1308 tx.query_row(
1309 "SELECT revision FROM workgraph_attention
1310 WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3",
1311 params![realm_id, namespace.as_str(), binding_id.as_str()],
1312 |row| row.get::<_, u64>(0),
1313 )
1314 .optional()
1315 .map_err(|err| WorkGraphError::Store(err.to_string()))
1316}
1317
1318#[cfg(not(target_arch = "wasm32"))]
1319fn insert_edge_tx(tx: &Transaction<'_>, edge: &WorkEdge) -> Result<(), WorkGraphError> {
1320 let json = serde_json::to_string(edge).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1321 tx.execute(
1322 "INSERT INTO workgraph_edges
1323 (realm_id, namespace, edge_kind, from_id, to_id, edge_json)
1324 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1325 params![
1326 edge.realm_id,
1327 edge.namespace.as_str(),
1328 format!("{:?}", edge.kind),
1329 edge.from_id.as_str(),
1330 edge.to_id.as_str(),
1331 json,
1332 ],
1333 )
1334 .map_err(|err| map_sqlite_insert_edge_error(err, edge))?;
1335 Ok(())
1336}
1337
1338fn duplicate_edge_error(edge: &WorkEdge) -> WorkGraphError {
1339 WorkGraphError::Conflict(format!(
1340 "work edge {:?} {} -> {} already exists",
1341 edge.kind, edge.from_id, edge.to_id
1342 ))
1343}
1344
1345#[cfg(not(target_arch = "wasm32"))]
1346fn map_sqlite_insert_edge_error(err: rusqlite::Error, edge: &WorkEdge) -> WorkGraphError {
1347 match err {
1348 rusqlite::Error::SqliteFailure(failure, _)
1349 if failure.code == ErrorCode::ConstraintViolation =>
1350 {
1351 duplicate_edge_error(edge)
1352 }
1353 err => WorkGraphError::Store(err.to_string()),
1354 }
1355}
1356
1357#[cfg(not(target_arch = "wasm32"))]
1358fn insert_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
1359 let json =
1360 serde_json::to_string(event).map_err(|err| WorkGraphError::Store(err.to_string()))?;
1361 tx.execute(
1362 "INSERT INTO workgraph_events
1363 (realm_id, namespace, item_id, event_kind, at_utc, event_json)
1364 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
1365 params![
1366 event.realm_id,
1367 event.namespace.as_str(),
1368 event.item_id.as_ref().map(WorkItemId::as_str),
1369 format!("{:?}", event.kind),
1370 event.at.to_rfc3339(),
1371 json,
1372 ],
1373 )
1374 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1375 Ok(())
1376}
1377
1378#[cfg(not(target_arch = "wasm32"))]
1379fn select_item(
1380 conn: &Connection,
1381 realm_id: &str,
1382 namespace: &WorkNamespace,
1383 id: &WorkItemId,
1384) -> Result<Option<WorkItem>, WorkGraphError> {
1385 conn.query_row(
1386 "SELECT item_json FROM workgraph_items WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1387 params![realm_id, namespace.as_str(), id.as_str()],
1388 |row| row_json(row, 0),
1389 )
1390 .optional()
1391 .map_err(|err| WorkGraphError::Store(err.to_string()))
1392}
1393
1394#[cfg(not(target_arch = "wasm32"))]
1395fn list_sqlite_items(
1396 conn: &Connection,
1397 filter: &WorkItemFilter,
1398) -> Result<Vec<WorkItem>, WorkGraphError> {
1399 let mut stmt = conn
1400 .prepare("SELECT item_json FROM workgraph_items ORDER BY updated_at_utc ASC, item_id ASC")
1401 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1402 let rows = stmt
1403 .query_map([], |row| row_json::<WorkItem>(row, 0))
1404 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1405 let mut items = Vec::new();
1406 for row in rows {
1407 let item = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1408 if item_matches_filter(&item, filter) {
1409 items.push(item);
1410 if filter.limit.is_some_and(|limit| items.len() >= limit) {
1411 break;
1412 }
1413 }
1414 }
1415 Ok(items)
1416}
1417
1418#[cfg(not(target_arch = "wasm32"))]
1419fn select_attention(
1420 conn: &Connection,
1421 realm_id: &str,
1422 namespace: &WorkNamespace,
1423 binding_id: &WorkAttentionBindingId,
1424) -> Result<Option<WorkAttentionBinding>, WorkGraphError> {
1425 conn.query_row(
1426 "SELECT attention_json FROM workgraph_attention
1427 WHERE realm_id = ?1 AND namespace = ?2 AND binding_id = ?3",
1428 params![realm_id, namespace.as_str(), binding_id.as_str()],
1429 |row| row_json(row, 0),
1430 )
1431 .optional()
1432 .map_err(|err| WorkGraphError::Store(err.to_string()))
1433}
1434
1435#[cfg(not(target_arch = "wasm32"))]
1436fn list_sqlite_attention(
1437 conn: &Connection,
1438 filter: &AttentionListRequest,
1439) -> Result<Vec<WorkAttentionBinding>, WorkGraphError> {
1440 let mut stmt = conn
1441 .prepare(
1442 "SELECT attention_json FROM workgraph_attention
1443 ORDER BY updated_at_utc ASC, binding_id ASC",
1444 )
1445 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1446 let rows = stmt
1447 .query_map([], |row| row_json::<WorkAttentionBinding>(row, 0))
1448 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1449 let mut bindings = Vec::new();
1450 for row in rows {
1451 let binding = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1452 if attention_matches_filter(&binding, filter) {
1453 bindings.push(binding);
1454 }
1455 }
1456 Ok(bindings)
1457}
1458
1459#[cfg(not(target_arch = "wasm32"))]
1460fn list_sqlite_edges(
1461 conn: &Connection,
1462 realm_id: &str,
1463 namespace: &WorkNamespace,
1464) -> Result<Vec<WorkEdge>, WorkGraphError> {
1465 let mut stmt = conn
1466 .prepare(
1467 "SELECT edge_json FROM workgraph_edges
1468 WHERE realm_id = ?1 AND namespace = ?2
1469 ORDER BY edge_kind ASC, from_id ASC, to_id ASC",
1470 )
1471 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1472 let rows = stmt
1473 .query_map(params![realm_id, namespace.as_str()], |row| {
1474 row_json::<WorkEdge>(row, 0)
1475 })
1476 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1477 let mut edges = Vec::new();
1478 for row in rows {
1479 edges.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
1480 }
1481 Ok(edges)
1482}
1483
1484#[cfg(not(target_arch = "wasm32"))]
1485fn list_sqlite_events(
1486 conn: &Connection,
1487 filter: &WorkGraphEventFilter,
1488) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
1489 let mut stmt = conn
1490 .prepare("SELECT seq, event_json FROM workgraph_events ORDER BY seq ASC")
1491 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1492 let rows = stmt
1493 .query_map([], |row| {
1494 let seq = row.get::<_, i64>(0)?;
1495 let mut event = row_json::<WorkGraphEvent>(row, 1)?;
1496 event.seq = Some(seq);
1497 Ok(event)
1498 })
1499 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1500 let mut events = Vec::new();
1501 for row in rows {
1502 let event = row.map_err(|err| WorkGraphError::Store(err.to_string()))?;
1503 if event_matches_filter(&event, filter) {
1504 events.push(event);
1505 if filter.limit.is_some_and(|limit| events.len() >= limit) {
1506 break;
1507 }
1508 }
1509 }
1510 Ok(events)
1511}
1512
1513#[cfg(not(target_arch = "wasm32"))]
1514fn replay_event_tx(tx: &Transaction<'_>, event: &WorkGraphEvent) -> Result<(), WorkGraphError> {
1515 match event.kind {
1516 WorkGraphEventKind::Linked => {
1517 let edge = payload_field::<WorkEdge>(event, "edge")?;
1518 insert_edge_tx(tx, &edge)
1519 }
1520 WorkGraphEventKind::AttentionCreated | WorkGraphEventKind::AttentionUpdated => {
1521 let attention = payload_field::<WorkAttentionBinding>(event, "attention")?;
1522 upsert_attention_tx(tx, &attention)
1523 }
1524 WorkGraphEventKind::Created
1525 | WorkGraphEventKind::Updated
1526 | WorkGraphEventKind::Claimed
1527 | WorkGraphEventKind::Released
1528 | WorkGraphEventKind::Blocked
1529 | WorkGraphEventKind::Closed
1530 | WorkGraphEventKind::EvidenceAdded => {
1531 let item = payload_field::<WorkItem>(event, "item")?;
1532 upsert_item_tx(tx, &item)
1533 }
1534 }
1535}
1536
1537#[cfg(not(target_arch = "wasm32"))]
1538fn normalize_attention_for_terminal_items_tx(tx: &Transaction<'_>) -> Result<(), WorkGraphError> {
1539 let bindings = {
1540 let mut stmt = tx
1541 .prepare("SELECT attention_json FROM workgraph_attention")
1542 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1543 let rows = stmt
1544 .query_map([], |row| row_json::<WorkAttentionBinding>(row, 0))
1545 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1546 let mut bindings = Vec::new();
1547 for row in rows {
1548 bindings.push(row.map_err(|err| WorkGraphError::Store(err.to_string()))?);
1549 }
1550 bindings
1551 };
1552
1553 for binding in bindings {
1554 if matches!(
1555 binding.status,
1556 WorkAttentionStatus::Stopped | WorkAttentionStatus::Superseded
1557 ) {
1558 continue;
1559 }
1560 let item = tx
1561 .query_row(
1562 "SELECT item_json FROM workgraph_items
1563 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1564 params![
1565 binding.work_ref.realm_id,
1566 binding.work_ref.namespace.as_str(),
1567 binding.work_ref.item_id.as_str(),
1568 ],
1569 |row| row_json::<WorkItem>(row, 0),
1570 )
1571 .optional()
1572 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1573 let Some(item) = item else {
1574 continue;
1575 };
1576 if WorkGraphMachine::classify_terminality(&item)? {
1579 let expected_revision = binding.machine_state.revision;
1580 let stopped = WorkAttentionMachine::stop(binding, expected_revision, item.updated_at)?;
1581 upsert_attention_tx(tx, &stopped)?;
1582 }
1583 }
1584 Ok(())
1585}
1586
1587#[cfg(not(target_arch = "wasm32"))]
1588fn payload_field<T: serde::de::DeserializeOwned>(
1589 event: &WorkGraphEvent,
1590 field: &str,
1591) -> Result<T, WorkGraphError> {
1592 let value = event.payload.get(field).ok_or_else(|| {
1593 WorkGraphError::Store(format!(
1594 "workgraph event {:?} missing payload field `{field}`",
1595 event.kind
1596 ))
1597 })?;
1598 serde_json::from_value(value.clone()).map_err(|err| WorkGraphError::Store(err.to_string()))
1599}
1600
1601#[cfg(not(target_arch = "wasm32"))]
1602fn row_json<T: serde::de::DeserializeOwned>(
1603 row: &rusqlite::Row<'_>,
1604 index: usize,
1605) -> rusqlite::Result<T> {
1606 let json = row.get::<_, String>(index)?;
1607 serde_json::from_str(&json).map_err(|err| {
1608 rusqlite::Error::FromSqlConversionFailure(index, rusqlite::types::Type::Text, Box::new(err))
1609 })
1610}
1611
1612#[cfg(test)]
1613#[allow(clippy::expect_used, clippy::unwrap_used)]
1614mod tests {
1615 use std::collections::BTreeSet;
1616
1617 use chrono::Utc;
1618 use serde_json::json;
1619
1620 use crate::types::WorkEdge;
1621 use crate::{
1622 AttentionDelegatedAuthority, AttentionProjectionPolicy, CreateWorkItemRequest,
1623 GoalAttentionTarget, GoalCreateRequest, GoalRequestCloseRequest, GoalTerminalStatus,
1624 LinkWorkItemsRequest, MemoryWorkGraphStore, WorkAttentionMode, WorkAttentionStatus,
1625 WorkCompletionPolicy, WorkEdgeKind, WorkGraphError, WorkGraphEvent, WorkGraphEventFilter,
1626 WorkGraphEventKind, WorkGraphService, WorkGraphStore, WorkItemFilter, WorkItemId,
1627 WorkNamespace,
1628 };
1629
1630 fn test_edge() -> WorkEdge {
1631 WorkEdge {
1632 realm_id: "realm".to_string(),
1633 namespace: WorkNamespace::default(),
1634 kind: WorkEdgeKind::Blocks,
1635 from_id: WorkItemId::generated(),
1636 to_id: WorkItemId::generated(),
1637 created_at: Utc::now(),
1638 }
1639 }
1640
1641 fn link_event(edge: &WorkEdge) -> WorkGraphEvent {
1642 WorkGraphEvent::graph(
1643 edge.realm_id.clone(),
1644 edge.namespace.clone(),
1645 WorkGraphEventKind::Linked,
1646 edge.created_at,
1647 json!({ "edge": edge }),
1648 )
1649 }
1650
1651 #[tokio::test]
1652 async fn memory_store_namespace_filters_do_not_leak() {
1653 let store = std::sync::Arc::new(MemoryWorkGraphStore::new());
1654 let default_service =
1655 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1656 let other_service = WorkGraphService::with_scope(
1657 store.clone(),
1658 "realm",
1659 WorkNamespace::new("other").expect("namespace"),
1660 );
1661 default_service
1662 .create(CreateWorkItemRequest {
1663 realm_id: None,
1664 namespace: None,
1665 title: "default".to_string(),
1666 description: None,
1667 priority: Default::default(),
1668 completion_policy: Default::default(),
1669 labels: BTreeSet::new(),
1670 due_at: None,
1671 not_before: None,
1672 snoozed_until: None,
1673 external_refs: Vec::new(),
1674 evidence_refs: Vec::new(),
1675 status: None,
1676 })
1677 .await
1678 .expect("create default");
1679 other_service
1680 .create(CreateWorkItemRequest {
1681 realm_id: None,
1682 namespace: None,
1683 title: "other".to_string(),
1684 description: None,
1685 priority: Default::default(),
1686 completion_policy: Default::default(),
1687 labels: BTreeSet::new(),
1688 due_at: None,
1689 not_before: None,
1690 snoozed_until: None,
1691 external_refs: Vec::new(),
1692 evidence_refs: Vec::new(),
1693 status: None,
1694 })
1695 .await
1696 .expect("create other");
1697
1698 let items = store
1699 .list_items(WorkItemFilter {
1700 realm_id: Some("realm".to_string()),
1701 namespace: Some(WorkNamespace::default()),
1702 ..WorkItemFilter::default()
1703 })
1704 .await
1705 .expect("list");
1706 assert_eq!(items.len(), 1);
1707 assert_eq!(items[0].title, "default");
1708 }
1709
1710 #[tokio::test]
1711 async fn memory_store_duplicate_edge_does_not_append_event() {
1712 let store = MemoryWorkGraphStore::new();
1713 let edge = test_edge();
1714 store
1715 .insert_edge(edge.clone(), link_event(&edge))
1716 .await
1717 .expect("insert edge");
1718
1719 let error = store
1720 .insert_edge(edge.clone(), link_event(&edge))
1721 .await
1722 .expect_err("duplicate edge should fail");
1723 assert!(matches!(error, WorkGraphError::Conflict(_)));
1724
1725 let events = store
1726 .list_events(WorkGraphEventFilter {
1727 realm_id: Some(edge.realm_id),
1728 namespace: Some(edge.namespace),
1729 all_namespaces: false,
1730 after_seq: None,
1731 limit: None,
1732 })
1733 .await
1734 .expect("events");
1735 assert_eq!(events.len(), 1);
1736 }
1737
1738 #[cfg(not(target_arch = "wasm32"))]
1739 #[tokio::test]
1740 async fn sqlite_persistence_survives_restart() {
1741 let dir = tempfile::tempdir().expect("tempdir");
1742 let path = dir.path().join("workgraph.sqlite3");
1743 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1744 let service = WorkGraphService::with_scope(store, "realm", WorkNamespace::default());
1745 let item = service
1746 .create(CreateWorkItemRequest {
1747 realm_id: None,
1748 namespace: None,
1749 title: "persist me".to_string(),
1750 description: None,
1751 priority: Default::default(),
1752 completion_policy: Default::default(),
1753 labels: BTreeSet::new(),
1754 due_at: None,
1755 not_before: None,
1756 snoozed_until: None,
1757 external_refs: Vec::new(),
1758 evidence_refs: Vec::new(),
1759 status: None,
1760 })
1761 .await
1762 .expect("create");
1763
1764 let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1765 let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1766 let fetched = service.get(None, None, item.id.clone()).await.expect("get");
1767 assert_eq!(fetched.title, "persist me");
1768 }
1769
1770 #[cfg(not(target_arch = "wasm32"))]
1771 #[tokio::test]
1772 async fn sqlite_item_without_machine_state_fails_closed_on_read() {
1773 let dir = tempfile::tempdir().expect("tempdir");
1774 let path = dir.path().join("workgraph.sqlite3");
1775 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1776 let service =
1777 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1778 let item = service
1779 .create(CreateWorkItemRequest {
1780 realm_id: None,
1781 namespace: None,
1782 title: "legacy item".to_string(),
1783 description: None,
1784 priority: Default::default(),
1785 completion_policy: Default::default(),
1786 labels: BTreeSet::new(),
1787 due_at: None,
1788 not_before: None,
1789 snoozed_until: None,
1790 external_refs: Vec::new(),
1791 evidence_refs: Vec::new(),
1792 status: None,
1793 })
1794 .await
1795 .expect("create");
1796
1797 store
1798 .with_connection(|conn| {
1799 let json: String = conn
1800 .query_row(
1801 "SELECT item_json FROM workgraph_items
1802 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1803 rusqlite::params![
1804 &item.realm_id,
1805 item.namespace.as_str(),
1806 item.id.as_str()
1807 ],
1808 |row| row.get(0),
1809 )
1810 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1811 let mut value = serde_json::from_str::<serde_json::Value>(&json)
1812 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1813 value
1814 .as_object_mut()
1815 .expect("item json object")
1816 .remove("machine_state");
1817 conn.execute(
1818 "UPDATE workgraph_items
1819 SET item_json = ?4
1820 WHERE realm_id = ?1 AND namespace = ?2 AND item_id = ?3",
1821 rusqlite::params![
1822 &item.realm_id,
1823 item.namespace.as_str(),
1824 item.id.as_str(),
1825 serde_json::to_string(&value)
1826 .map_err(|err| WorkGraphError::Store(err.to_string()))?
1827 ],
1828 )
1829 .map_err(|err| WorkGraphError::Store(err.to_string()))?;
1830 Ok(())
1831 })
1832 .expect("strip machine state");
1833
1834 let reopened = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1839 let service = WorkGraphService::with_scope(reopened, "realm", WorkNamespace::default());
1840 let err = service
1841 .get(None, None, item.id)
1842 .await
1843 .expect_err("reading an item with no machine_state must fail closed");
1844 assert!(
1845 matches!(err, WorkGraphError::Store(_)),
1846 "expected a typed Store deserialization error, got: {err:?}"
1847 );
1848 }
1849
1850 #[cfg(not(target_arch = "wasm32"))]
1851 #[tokio::test]
1852 async fn sqlite_event_replay_rebuilds_projection() {
1853 let dir = tempfile::tempdir().expect("tempdir");
1854 let path = dir.path().join("workgraph.sqlite3");
1855 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1856 let service =
1857 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1858 let blocker = service
1859 .create(CreateWorkItemRequest {
1860 realm_id: None,
1861 namespace: None,
1862 title: "blocker".to_string(),
1863 description: None,
1864 priority: Default::default(),
1865 completion_policy: Default::default(),
1866 labels: BTreeSet::new(),
1867 due_at: None,
1868 not_before: None,
1869 snoozed_until: None,
1870 external_refs: Vec::new(),
1871 evidence_refs: Vec::new(),
1872 status: None,
1873 })
1874 .await
1875 .expect("create blocker");
1876 let blocked = service
1877 .create(CreateWorkItemRequest {
1878 realm_id: None,
1879 namespace: None,
1880 title: "blocked".to_string(),
1881 description: None,
1882 priority: Default::default(),
1883 completion_policy: Default::default(),
1884 labels: BTreeSet::new(),
1885 due_at: None,
1886 not_before: None,
1887 snoozed_until: None,
1888 external_refs: Vec::new(),
1889 evidence_refs: Vec::new(),
1890 status: None,
1891 })
1892 .await
1893 .expect("create blocked");
1894 service
1895 .link(LinkWorkItemsRequest {
1896 realm_id: None,
1897 namespace: None,
1898 kind: WorkEdgeKind::Blocks,
1899 from_id: blocker.id.clone(),
1900 to_id: blocked.id.clone(),
1901 })
1902 .await
1903 .expect("link");
1904
1905 store
1906 .with_connection(|conn| {
1907 conn.execute("DELETE FROM workgraph_items", [])
1908 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1909 conn.execute("DELETE FROM workgraph_edges", [])
1910 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1911 Ok(())
1912 })
1913 .expect("clear projection");
1914
1915 let empty_items = store
1916 .list_items(WorkItemFilter {
1917 realm_id: Some("realm".to_string()),
1918 namespace: Some(WorkNamespace::default()),
1919 ..WorkItemFilter::default()
1920 })
1921 .await
1922 .expect("empty list");
1923 assert!(empty_items.is_empty());
1924
1925 store
1926 .rebuild_projection_from_events()
1927 .expect("rebuild projection");
1928
1929 let rebuilt_items = store
1930 .list_items(WorkItemFilter {
1931 realm_id: Some("realm".to_string()),
1932 namespace: Some(WorkNamespace::default()),
1933 ..WorkItemFilter::default()
1934 })
1935 .await
1936 .expect("rebuilt list");
1937 assert_eq!(rebuilt_items.len(), 2);
1938 let rebuilt_edges = store
1939 .list_edges("realm", &WorkNamespace::default())
1940 .await
1941 .expect("rebuilt edges");
1942 assert_eq!(rebuilt_edges.len(), 1);
1943 }
1944
1945 #[cfg(not(target_arch = "wasm32"))]
1946 #[tokio::test]
1947 async fn sqlite_event_replay_stops_attention_for_terminal_goal_items() {
1948 let dir = tempfile::tempdir().expect("tempdir");
1949 let path = dir.path().join("workgraph.sqlite3");
1950 let store = std::sync::Arc::new(crate::SqliteWorkGraphStore::open(&path).expect("open"));
1951 let service =
1952 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1953 let session_id = meerkat_core::SessionId::parse("019e63c2-0000-7000-8000-000000000045")
1954 .expect("session id");
1955 let goal = service
1956 .create_goal(GoalCreateRequest {
1957 realm_id: None,
1958 namespace: None,
1959 title: "terminal goal".to_string(),
1960 description: None,
1961 target: GoalAttentionTarget::Session { session_id },
1962 mode: WorkAttentionMode::Pursue,
1963 completion_policy: WorkCompletionPolicy::SelfAttest,
1964 delegated_authority: AttentionDelegatedAuthority::CloseIfPolicyAllows,
1965 projection_policy: AttentionProjectionPolicy::default(),
1966 })
1967 .await
1968 .expect("create goal");
1969 service
1970 .goal_request_close(GoalRequestCloseRequest {
1971 binding_id: goal.attention.binding_id.clone(),
1972 realm_id: None,
1973 namespace: None,
1974 expected_revision: goal.item.revision,
1975 status: GoalTerminalStatus::Completed,
1976 })
1977 .await
1978 .expect("close goal");
1979
1980 store
1981 .with_connection(|conn| {
1982 conn.execute("DELETE FROM workgraph_items", [])
1983 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1984 conn.execute("DELETE FROM workgraph_attention", [])
1985 .map_err(|err| crate::WorkGraphError::Store(err.to_string()))?;
1986 Ok(())
1987 })
1988 .expect("clear projection");
1989
1990 store
1991 .rebuild_projection_from_events()
1992 .expect("rebuild projection");
1993
1994 let binding = store
1995 .get_attention(
1996 "realm",
1997 &WorkNamespace::default(),
1998 &goal.attention.binding_id,
1999 )
2000 .await
2001 .expect("read binding")
2002 .expect("rebuilt binding");
2003 assert_eq!(binding.status, WorkAttentionStatus::Stopped);
2004 }
2005
2006 #[cfg(not(target_arch = "wasm32"))]
2007 #[tokio::test]
2008 async fn sqlite_store_duplicate_edge_does_not_append_event() {
2009 let dir = tempfile::tempdir().expect("tempdir");
2010 let path = dir.path().join("workgraph.sqlite3");
2011 let store = crate::SqliteWorkGraphStore::open(&path).expect("open");
2012 let edge = test_edge();
2013 store
2014 .insert_edge(edge.clone(), link_event(&edge))
2015 .await
2016 .expect("insert edge");
2017
2018 let error = store
2019 .insert_edge(edge.clone(), link_event(&edge))
2020 .await
2021 .expect_err("duplicate edge should fail");
2022 assert!(matches!(error, WorkGraphError::Conflict(_)));
2023
2024 let events = store
2025 .list_events(WorkGraphEventFilter {
2026 realm_id: Some(edge.realm_id),
2027 namespace: Some(edge.namespace),
2028 all_namespaces: false,
2029 after_seq: None,
2030 limit: None,
2031 })
2032 .await
2033 .expect("events");
2034 assert_eq!(events.len(), 1);
2035 }
2036}