Skip to main content

meerkat_workgraph/
service.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machine::WorkGraphMachine;
8use crate::store::{WorkGraphEventFilter, WorkGraphStore};
9use crate::types::{
10    AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, CreateWorkItemRequest,
11    LinkWorkItemsRequest, ReadyWorkFilter, ReleaseWorkItemRequest, UpdateWorkItemRequest, WorkEdge,
12    WorkEdgeKind, WorkGraphEvent, WorkGraphEventKind, WorkGraphSnapshot, WorkGraphSnapshotFilter,
13    WorkItem, WorkItemFilter, WorkItemId, WorkNamespace,
14};
15
16const BEST_EFFORT_REFRESH_ATTEMPTS: usize = 3;
17
18#[derive(Clone)]
19pub struct WorkGraphService {
20    store: Arc<dyn WorkGraphStore>,
21    default_realm_id: Arc<str>,
22    default_namespace: WorkNamespace,
23}
24
25impl WorkGraphService {
26    pub fn new(store: Arc<dyn WorkGraphStore>) -> Self {
27        Self::with_scope(store, "default", WorkNamespace::default())
28    }
29
30    pub fn with_scope(
31        store: Arc<dyn WorkGraphStore>,
32        default_realm_id: impl Into<String>,
33        default_namespace: WorkNamespace,
34    ) -> Self {
35        Self {
36            store,
37            default_realm_id: Arc::<str>::from(default_realm_id.into()),
38            default_namespace,
39        }
40    }
41
42    pub fn store(&self) -> &Arc<dyn WorkGraphStore> {
43        &self.store
44    }
45
46    pub fn default_realm_id(&self) -> &str {
47        &self.default_realm_id
48    }
49
50    pub fn default_namespace(&self) -> &WorkNamespace {
51        &self.default_namespace
52    }
53
54    pub async fn create(&self, request: CreateWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
55        let now = self.store.get_store_time_utc().await?;
56        let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
57        let (item, event) = WorkGraphMachine::create_item(request, realm_id, namespace, now)?;
58        self.store.insert_item(item, event).await
59    }
60
61    pub async fn get(
62        &self,
63        realm_id: Option<String>,
64        namespace: Option<WorkNamespace>,
65        id: WorkItemId,
66    ) -> Result<WorkItem, WorkGraphError> {
67        let (realm_id, namespace) = self.scope(realm_id, namespace);
68        self.store
69            .get_item(&realm_id, &namespace, &id)
70            .await?
71            .ok_or_else(|| WorkGraphError::not_found(realm_id, namespace, id))
72    }
73
74    pub async fn list(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
75        self.store
76            .list_items(self.normalize_item_filter(filter))
77            .await
78    }
79
80    pub async fn ready(&self, filter: ReadyWorkFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
81        let now = self.store.get_store_time_utc().await?;
82        let (realm_id, namespace) = self.scope(filter.realm_id.clone(), filter.namespace.clone());
83        let all_items = self
84            .store
85            .list_items(WorkItemFilter {
86                realm_id: Some(realm_id.clone()),
87                namespace: Some(namespace.clone()),
88                include_terminal: true,
89                ..WorkItemFilter::default()
90            })
91            .await?;
92        let labels = filter.labels.clone();
93        let mut ready = WorkGraphMachine::ready_items(
94            all_items
95                .into_iter()
96                .filter(|item| labels.iter().all(|label| item.labels.contains(label)))
97                .collect(),
98            now,
99        );
100        if let Some(limit) = filter.limit {
101            ready.truncate(limit);
102        }
103        Ok(ready)
104    }
105
106    pub async fn snapshot(
107        &self,
108        filter: WorkGraphSnapshotFilter,
109    ) -> Result<WorkGraphSnapshot, WorkGraphError> {
110        let captured_at = self.store.get_store_time_utc().await?;
111        let filter = self.normalize_snapshot_filter(filter);
112        let realm_id = filter
113            .realm_id
114            .clone()
115            .unwrap_or_else(|| self.default_realm_id.to_string());
116        let items = self
117            .store
118            .list_items(WorkItemFilter {
119                realm_id: Some(realm_id.clone()),
120                namespace: filter.namespace.clone(),
121                all_namespaces: filter.all_namespaces,
122                statuses: filter.statuses.clone(),
123                labels: filter.labels.clone(),
124                include_terminal: filter.include_terminal,
125                limit: filter.limit,
126            })
127            .await?;
128
129        let namespaces = self.snapshot_namespaces(&realm_id, &filter, &items).await?;
130        let mut edges = Vec::new();
131        for namespace in &namespaces {
132            edges.extend(self.store.list_edges(&realm_id, namespace).await?);
133        }
134
135        let ready_item_ids = self
136            .ready_item_ids_in_namespaces(&realm_id, &namespaces, &filter.labels, captured_at)
137            .await?;
138        let event_high_water_mark = self
139            .store
140            .list_events(WorkGraphEventFilter {
141                realm_id: Some(realm_id.clone()),
142                namespace: if filter.all_namespaces {
143                    None
144                } else {
145                    filter.namespace.clone()
146                },
147                all_namespaces: filter.all_namespaces,
148                after_seq: None,
149                limit: None,
150            })
151            .await?
152            .into_iter()
153            .filter_map(|event| event.seq)
154            .max();
155
156        Ok(WorkGraphSnapshot {
157            realm_id,
158            namespace: if filter.all_namespaces {
159                None
160            } else {
161                filter.namespace
162            },
163            all_namespaces: filter.all_namespaces,
164            captured_at,
165            event_high_water_mark,
166            items,
167            edges,
168            ready_item_ids,
169        })
170    }
171
172    pub async fn claim(&self, request: ClaimWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
173        let now = self.store.get_store_time_utc().await?;
174        let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
175        let item = self
176            .store
177            .get_item(&realm_id, &namespace, &request.id)
178            .await?
179            .ok_or_else(|| {
180                WorkGraphError::not_found(realm_id.clone(), namespace.clone(), request.id.clone())
181            })?;
182        let expected_previous_revision = item.revision;
183        let unresolved_blockers = self
184            .unresolved_blocker_count_for_item(&realm_id, &namespace, &item)
185            .await?;
186        let (item, event) = WorkGraphMachine::claim_item_with_unresolved_blockers(
187            item,
188            unresolved_blockers,
189            request,
190            now,
191        )?;
192        self.store
193            .update_item_cas(item, expected_previous_revision, event)
194            .await
195    }
196
197    pub async fn release(
198        &self,
199        request: ReleaseWorkItemRequest,
200    ) -> Result<WorkItem, WorkGraphError> {
201        let now = self.store.get_store_time_utc().await?;
202        let item = self
203            .get(
204                request.realm_id.clone(),
205                request.namespace.clone(),
206                request.id.clone(),
207            )
208            .await?;
209        let expected_previous_revision = item.revision;
210        let (item, event) = WorkGraphMachine::release_item(item, request, now)?;
211        self.store
212            .update_item_cas(item, expected_previous_revision, event)
213            .await
214    }
215
216    pub async fn update(&self, request: UpdateWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
217        let now = self.store.get_store_time_utc().await?;
218        let item = self
219            .get(
220                request.realm_id.clone(),
221                request.namespace.clone(),
222                request.id.clone(),
223            )
224            .await?;
225        let expected_previous_revision = item.revision;
226        let (item, event) = WorkGraphMachine::update_item(item, request, now)?;
227        self.store
228            .update_item_cas(item, expected_previous_revision, event)
229            .await
230    }
231
232    pub async fn block(
233        &self,
234        realm_id: Option<String>,
235        namespace: Option<WorkNamespace>,
236        id: WorkItemId,
237        expected_revision: u64,
238    ) -> Result<WorkItem, WorkGraphError> {
239        let now = self.store.get_store_time_utc().await?;
240        let item = self.get(realm_id, namespace, id).await?;
241        let expected_previous_revision = item.revision;
242        let (item, event) = WorkGraphMachine::block_item(item, expected_revision, now)?;
243        self.store
244            .update_item_cas(item, expected_previous_revision, event)
245            .await
246    }
247
248    pub async fn close(&self, request: CloseWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
249        let now = self.store.get_store_time_utc().await?;
250        let item = self
251            .get(
252                request.realm_id.clone(),
253                request.namespace.clone(),
254                request.id.clone(),
255            )
256            .await?;
257        let expected_previous_revision = item.revision;
258        let (item, event) = WorkGraphMachine::close_item(item, request, now)?;
259        let closed = self
260            .store
261            .update_item_cas(item, expected_previous_revision, event)
262            .await?;
263        self.best_effort_refresh_dependents_after_blocker_change(&closed, now)
264            .await;
265        Ok(closed)
266    }
267
268    pub async fn link(&self, request: LinkWorkItemsRequest) -> Result<WorkEdge, WorkGraphError> {
269        let now = self.store.get_store_time_utc().await?;
270        let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
271        let edge = WorkEdge {
272            realm_id,
273            namespace,
274            kind: request.kind,
275            from_id: request.from_id,
276            to_id: request.to_id,
277            created_at: now,
278        };
279        let existing_edges = self
280            .store
281            .list_edges(&edge.realm_id, &edge.namespace)
282            .await?;
283        let existing_items = self
284            .store
285            .list_items(WorkItemFilter {
286                realm_id: Some(edge.realm_id.clone()),
287                namespace: Some(edge.namespace.clone()),
288                include_terminal: true,
289                ..WorkItemFilter::default()
290            })
291            .await?;
292        WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
293        let event = WorkGraphEvent::graph(
294            edge.realm_id.clone(),
295            edge.namespace.clone(),
296            WorkGraphEventKind::Linked,
297            now,
298            json!({ "edge": edge }),
299        );
300        let inserted = self.store.insert_edge(edge, event).await?;
301        if inserted.kind == WorkEdgeKind::Blocks {
302            self.best_effort_refresh_item_eligibility(
303                &inserted.realm_id,
304                &inserted.namespace,
305                &inserted.to_id,
306                now,
307            )
308            .await;
309        }
310        Ok(inserted)
311    }
312
313    pub async fn add_evidence(
314        &self,
315        request: AddEvidenceRequest,
316    ) -> Result<WorkItem, WorkGraphError> {
317        let now = self.store.get_store_time_utc().await?;
318        let item = self
319            .get(
320                request.realm_id.clone(),
321                request.namespace.clone(),
322                request.id.clone(),
323            )
324            .await?;
325        let expected_previous_revision = item.revision;
326        let (item, event) = WorkGraphMachine::add_evidence(item, request, now)?;
327        self.store
328            .update_item_cas(item, expected_previous_revision, event)
329            .await
330    }
331
332    pub async fn events(
333        &self,
334        mut filter: WorkGraphEventFilter,
335    ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
336        if filter.realm_id.is_none() {
337            filter.realm_id = Some(self.default_realm_id.to_string());
338        }
339        if !filter.all_namespaces && filter.namespace.is_none() {
340            filter.namespace = Some(self.default_namespace.clone());
341        }
342        self.store.list_events(filter).await
343    }
344
345    fn scope(
346        &self,
347        realm_id: Option<String>,
348        namespace: Option<WorkNamespace>,
349    ) -> (String, WorkNamespace) {
350        (
351            realm_id.unwrap_or_else(|| self.default_realm_id.to_string()),
352            namespace.unwrap_or_else(|| self.default_namespace.clone()),
353        )
354    }
355
356    fn normalize_item_filter(&self, mut filter: WorkItemFilter) -> WorkItemFilter {
357        if filter.realm_id.is_none() {
358            filter.realm_id = Some(self.default_realm_id.to_string());
359        }
360        if !filter.all_namespaces && filter.namespace.is_none() {
361            filter.namespace = Some(self.default_namespace.clone());
362        }
363        filter
364    }
365
366    fn normalize_snapshot_filter(
367        &self,
368        mut filter: WorkGraphSnapshotFilter,
369    ) -> WorkGraphSnapshotFilter {
370        if filter.realm_id.is_none() {
371            filter.realm_id = Some(self.default_realm_id.to_string());
372        }
373        if !filter.all_namespaces && filter.namespace.is_none() {
374            filter.namespace = Some(self.default_namespace.clone());
375        }
376        filter
377    }
378
379    async fn snapshot_namespaces(
380        &self,
381        realm_id: &str,
382        filter: &WorkGraphSnapshotFilter,
383        items: &[WorkItem],
384    ) -> Result<BTreeSet<WorkNamespace>, WorkGraphError> {
385        if !filter.all_namespaces {
386            return Ok(BTreeSet::from_iter([filter
387                .namespace
388                .clone()
389                .unwrap_or_else(|| self.default_namespace.clone())]));
390        }
391
392        let mut namespaces = items
393            .iter()
394            .map(|item| item.namespace.clone())
395            .collect::<BTreeSet<_>>();
396        if namespaces.is_empty() {
397            namespaces.extend(
398                self.store
399                    .list_events(WorkGraphEventFilter {
400                        realm_id: Some(realm_id.to_string()),
401                        namespace: None,
402                        all_namespaces: true,
403                        after_seq: None,
404                        limit: None,
405                    })
406                    .await?
407                    .into_iter()
408                    .map(|event| event.namespace),
409            );
410        }
411        Ok(namespaces)
412    }
413
414    async fn ready_item_ids_in_namespaces(
415        &self,
416        realm_id: &str,
417        namespaces: &BTreeSet<WorkNamespace>,
418        labels: &[String],
419        now: chrono::DateTime<chrono::Utc>,
420    ) -> Result<Vec<WorkItemId>, WorkGraphError> {
421        let mut ready_ids = Vec::new();
422        for namespace in namespaces {
423            let all_items = self
424                .store
425                .list_items(WorkItemFilter {
426                    realm_id: Some(realm_id.to_string()),
427                    namespace: Some(namespace.clone()),
428                    include_terminal: true,
429                    ..WorkItemFilter::default()
430                })
431                .await?;
432            let ready_items = WorkGraphMachine::ready_items(
433                all_items
434                    .into_iter()
435                    .filter(|item| labels.iter().all(|label| item.labels.contains(label)))
436                    .collect(),
437                now,
438            );
439            ready_ids.extend(ready_items.into_iter().map(|item| item.id));
440        }
441        Ok(ready_ids)
442    }
443
444    async fn refresh_dependents_after_blocker_change(
445        &self,
446        blocker: &WorkItem,
447        now: chrono::DateTime<chrono::Utc>,
448    ) -> Result<(), WorkGraphError> {
449        let edges = self
450            .store
451            .list_edges(&blocker.realm_id, &blocker.namespace)
452            .await?;
453        for edge in edges
454            .iter()
455            .filter(|edge| edge.kind == WorkEdgeKind::Blocks && edge.from_id == blocker.id)
456        {
457            self.refresh_item_eligibility(&blocker.realm_id, &blocker.namespace, &edge.to_id, now)
458                .await?;
459        }
460        Ok(())
461    }
462
463    async fn best_effort_refresh_dependents_after_blocker_change(
464        &self,
465        blocker: &WorkItem,
466        now: chrono::DateTime<chrono::Utc>,
467    ) {
468        for _ in 0..BEST_EFFORT_REFRESH_ATTEMPTS {
469            match self
470                .refresh_dependents_after_blocker_change(blocker, now)
471                .await
472            {
473                Ok(()) => return,
474                Err(WorkGraphError::StaleRevision { .. }) => continue,
475                Err(_) => return,
476            }
477        }
478    }
479
480    async fn best_effort_refresh_item_eligibility(
481        &self,
482        realm_id: &str,
483        namespace: &WorkNamespace,
484        id: &WorkItemId,
485        now: chrono::DateTime<chrono::Utc>,
486    ) {
487        for _ in 0..BEST_EFFORT_REFRESH_ATTEMPTS {
488            match self
489                .refresh_item_eligibility(realm_id, namespace, id, now)
490                .await
491            {
492                Ok(()) => return,
493                Err(WorkGraphError::StaleRevision { .. }) => continue,
494                Err(_) => return,
495            }
496        }
497    }
498
499    async fn refresh_item_eligibility(
500        &self,
501        realm_id: &str,
502        namespace: &WorkNamespace,
503        id: &WorkItemId,
504        now: chrono::DateTime<chrono::Utc>,
505    ) -> Result<(), WorkGraphError> {
506        let Some(item) = self.store.get_item(realm_id, namespace, id).await? else {
507            return Ok(());
508        };
509        let all_items = self
510            .store
511            .list_items(WorkItemFilter {
512                realm_id: Some(realm_id.to_string()),
513                namespace: Some(namespace.clone()),
514                include_terminal: true,
515                ..WorkItemFilter::default()
516            })
517            .await?
518            .into_iter()
519            .map(|item| (item.id.clone(), item))
520            .collect::<BTreeMap<_, _>>();
521        let edges = self.store.list_edges(realm_id, namespace).await?;
522        let unresolved_blockers = unresolved_blocker_count(&item, &all_items, &edges);
523        if let Some((item, event)) =
524            WorkGraphMachine::refresh_eligibility(item, unresolved_blockers, now)?
525        {
526            let expected_previous_revision = item.revision;
527            self.store
528                .update_item_cas(item, expected_previous_revision, event)
529                .await?;
530        }
531        Ok(())
532    }
533
534    async fn unresolved_blocker_count_for_item(
535        &self,
536        realm_id: &str,
537        namespace: &WorkNamespace,
538        item: &WorkItem,
539    ) -> Result<u64, WorkGraphError> {
540        let all_items = self
541            .store
542            .list_items(WorkItemFilter {
543                realm_id: Some(realm_id.to_string()),
544                namespace: Some(namespace.clone()),
545                include_terminal: true,
546                ..WorkItemFilter::default()
547            })
548            .await?
549            .into_iter()
550            .map(|item| (item.id.clone(), item))
551            .collect::<BTreeMap<_, _>>();
552        let edges = self.store.list_edges(realm_id, namespace).await?;
553        Ok(unresolved_blocker_count(item, &all_items, &edges))
554    }
555}
556
557fn unresolved_blocker_count(
558    item: &WorkItem,
559    all_items: &BTreeMap<WorkItemId, WorkItem>,
560    edges: &[WorkEdge],
561) -> u64 {
562    edges
563        .iter()
564        .filter(|edge| edge.kind == WorkEdgeKind::Blocks && edge.to_id == item.id)
565        .filter(|edge| {
566            all_items
567                .get(&edge.from_id)
568                .is_none_or(|blocker| !blocker.status.is_terminal_success())
569        })
570        .count()
571        .try_into()
572        .unwrap_or(u64::MAX)
573}
574
575#[cfg(test)]
576#[allow(clippy::expect_used, clippy::unwrap_used)]
577mod tests {
578    use std::collections::BTreeSet;
579    use std::sync::Arc;
580    use std::sync::atomic::{AtomicUsize, Ordering};
581
582    use async_trait::async_trait;
583    use chrono::{DateTime, Utc};
584    use serde_json::json;
585
586    use crate::store::WorkGraphEventFilter;
587    use crate::types::{
588        ClaimWorkItemRequest, LinkWorkItemsRequest, WorkEdge, WorkEdgeKind, WorkGraphEvent,
589        WorkGraphEventKind, WorkItem, WorkItemFilter, WorkOwner, WorkOwnerKey,
590    };
591    use crate::{
592        CreateWorkItemRequest, MemoryWorkGraphStore, UpdateWorkItemRequest, WorkGraphService,
593        WorkGraphStore, WorkGraphStoreKind, WorkItemId, WorkNamespace,
594    };
595
596    fn create_req(title: &str) -> CreateWorkItemRequest {
597        CreateWorkItemRequest {
598            realm_id: None,
599            namespace: None,
600            title: title.to_string(),
601            description: None,
602            priority: Default::default(),
603            labels: BTreeSet::new(),
604            due_at: None,
605            not_before: None,
606            snoozed_until: None,
607            external_refs: Vec::new(),
608            evidence_refs: Vec::new(),
609            status: None,
610        }
611    }
612
613    struct RefreshConflictStore {
614        inner: MemoryWorkGraphStore,
615        fail_updated_events: AtomicUsize,
616    }
617
618    impl RefreshConflictStore {
619        fn new() -> Self {
620            Self {
621                inner: MemoryWorkGraphStore::new(),
622                fail_updated_events: AtomicUsize::new(0),
623            }
624        }
625
626        fn fail_next_refresh_update(&self) {
627            self.fail_updated_events.fetch_add(1, Ordering::SeqCst);
628        }
629    }
630
631    #[async_trait]
632    impl WorkGraphStore for RefreshConflictStore {
633        fn kind(&self) -> WorkGraphStoreKind {
634            WorkGraphStoreKind::Custom
635        }
636
637        async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, crate::WorkGraphError> {
638            self.inner.get_store_time_utc().await
639        }
640
641        async fn insert_item(
642            &self,
643            item: WorkItem,
644            event: WorkGraphEvent,
645        ) -> Result<WorkItem, crate::WorkGraphError> {
646            self.inner.insert_item(item, event).await
647        }
648
649        async fn update_item_cas(
650            &self,
651            item: WorkItem,
652            expected_previous_revision: u64,
653            event: WorkGraphEvent,
654        ) -> Result<WorkItem, crate::WorkGraphError> {
655            if event.kind == WorkGraphEventKind::Updated
656                && self
657                    .fail_updated_events
658                    .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| {
659                        remaining.checked_sub(1)
660                    })
661                    .is_ok()
662            {
663                return Err(crate::WorkGraphError::StaleRevision {
664                    id: item.id,
665                    expected: expected_previous_revision,
666                    actual: expected_previous_revision.saturating_add(1),
667                });
668            }
669            self.inner
670                .update_item_cas(item, expected_previous_revision, event)
671                .await
672        }
673
674        async fn get_item(
675            &self,
676            realm_id: &str,
677            namespace: &WorkNamespace,
678            id: &WorkItemId,
679        ) -> Result<Option<WorkItem>, crate::WorkGraphError> {
680            self.inner.get_item(realm_id, namespace, id).await
681        }
682
683        async fn list_items(
684            &self,
685            filter: WorkItemFilter,
686        ) -> Result<Vec<WorkItem>, crate::WorkGraphError> {
687            self.inner.list_items(filter).await
688        }
689
690        async fn insert_edge(
691            &self,
692            edge: WorkEdge,
693            event: WorkGraphEvent,
694        ) -> Result<WorkEdge, crate::WorkGraphError> {
695            self.inner.insert_edge(edge, event).await
696        }
697
698        async fn list_edges(
699            &self,
700            realm_id: &str,
701            namespace: &WorkNamespace,
702        ) -> Result<Vec<WorkEdge>, crate::WorkGraphError> {
703            self.inner.list_edges(realm_id, namespace).await
704        }
705
706        async fn list_events(
707            &self,
708            filter: WorkGraphEventFilter,
709        ) -> Result<Vec<WorkGraphEvent>, crate::WorkGraphError> {
710            self.inner.list_events(filter).await
711        }
712    }
713
714    #[tokio::test]
715    async fn blocked_dependencies_are_not_ready_until_completed() {
716        let service = WorkGraphService::with_scope(
717            Arc::new(MemoryWorkGraphStore::new()),
718            "realm",
719            WorkNamespace::default(),
720        );
721        let blocker = service
722            .create(create_req("blocker"))
723            .await
724            .expect("blocker");
725        let blocked = service
726            .create(create_req("blocked"))
727            .await
728            .expect("blocked");
729        service
730            .link(LinkWorkItemsRequest {
731                realm_id: None,
732                namespace: None,
733                kind: WorkEdgeKind::Blocks,
734                from_id: blocker.id.clone(),
735                to_id: blocked.id.clone(),
736            })
737            .await
738            .expect("link");
739
740        let ready = service.ready(Default::default()).await.expect("ready");
741        assert!(ready.iter().any(|item| item.id == blocker.id));
742        assert!(!ready.iter().any(|item| item.id == blocked.id));
743        service
744            .close(crate::CloseWorkItemRequest {
745                id: blocker.id,
746                realm_id: None,
747                namespace: None,
748                expected_revision: blocker.revision,
749                status: crate::WorkStatus::Completed,
750            })
751            .await
752            .expect("close blocker");
753        let ready = service.ready(Default::default()).await.expect("ready");
754        assert!(ready.iter().any(|item| item.id == blocked.id));
755    }
756
757    #[tokio::test]
758    async fn link_reports_success_when_post_insert_refresh_conflicts() {
759        let store = Arc::new(RefreshConflictStore::new());
760        let service =
761            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
762        let blocker = service
763            .create(create_req("blocker"))
764            .await
765            .expect("blocker");
766        let blocked = service
767            .create(create_req("blocked"))
768            .await
769            .expect("blocked");
770
771        store.fail_next_refresh_update();
772        let edge = service
773            .link(LinkWorkItemsRequest {
774                realm_id: None,
775                namespace: None,
776                kind: WorkEdgeKind::Blocks,
777                from_id: blocker.id.clone(),
778                to_id: blocked.id.clone(),
779            })
780            .await
781            .expect("link should report inserted edge despite refresh conflict");
782
783        assert_eq!(edge.from_id, blocker.id);
784        assert_eq!(edge.to_id, blocked.id);
785        let edges = store
786            .list_edges("realm", &WorkNamespace::default())
787            .await
788            .expect("edges");
789        assert_eq!(edges.len(), 1);
790        let ready = service.ready(Default::default()).await.expect("ready");
791        assert!(!ready.iter().any(|item| item.id == blocked.id));
792    }
793
794    #[tokio::test]
795    async fn close_reports_success_when_dependent_refresh_conflicts() {
796        let store = Arc::new(RefreshConflictStore::new());
797        let service =
798            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
799        let blocker = service
800            .create(create_req("blocker"))
801            .await
802            .expect("blocker");
803        let blocked = service
804            .create(create_req("blocked"))
805            .await
806            .expect("blocked");
807        service
808            .link(LinkWorkItemsRequest {
809                realm_id: None,
810                namespace: None,
811                kind: WorkEdgeKind::Blocks,
812                from_id: blocker.id.clone(),
813                to_id: blocked.id.clone(),
814            })
815            .await
816            .expect("link");
817
818        store.fail_next_refresh_update();
819        let closed = service
820            .close(crate::CloseWorkItemRequest {
821                id: blocker.id.clone(),
822                realm_id: None,
823                namespace: None,
824                expected_revision: blocker.revision,
825                status: crate::WorkStatus::Completed,
826            })
827            .await
828            .expect("close should report committed terminal item despite refresh conflict");
829
830        assert_eq!(closed.id, blocker.id);
831        assert_eq!(closed.status, crate::WorkStatus::Completed);
832        let fetched = service
833            .get(None, None, closed.id)
834            .await
835            .expect("closed item should be stored");
836        assert_eq!(fetched.status, crate::WorkStatus::Completed);
837        let ready = service.ready(Default::default()).await.expect("ready");
838        assert!(ready.iter().any(|item| item.id == blocked.id));
839    }
840
841    #[tokio::test]
842    async fn blocked_dependency_stays_unready_after_item_update() {
843        let service = WorkGraphService::with_scope(
844            Arc::new(MemoryWorkGraphStore::new()),
845            "realm",
846            WorkNamespace::default(),
847        );
848        let blocker = service
849            .create(create_req("blocker"))
850            .await
851            .expect("blocker");
852        let blocked = service
853            .create(create_req("blocked"))
854            .await
855            .expect("blocked");
856        service
857            .link(LinkWorkItemsRequest {
858                realm_id: None,
859                namespace: None,
860                kind: WorkEdgeKind::Blocks,
861                from_id: blocker.id,
862                to_id: blocked.id.clone(),
863            })
864            .await
865            .expect("link");
866        let blocked = service
867            .get(None, None, blocked.id.clone())
868            .await
869            .expect("blocked after link");
870
871        service
872            .update(UpdateWorkItemRequest {
873                id: blocked.id.clone(),
874                realm_id: None,
875                namespace: None,
876                expected_revision: blocked.revision,
877                title: Some("blocked, updated".to_string()),
878                description: None,
879                priority: None,
880                labels: None,
881                due_at: None,
882                not_before: None,
883                snoozed_until: None,
884                external_refs: Vec::new(),
885            })
886            .await
887            .expect("update blocked item");
888
889        let ready = service.ready(Default::default()).await.expect("ready");
890        assert!(!ready.iter().any(|item| item.id == blocked.id));
891    }
892
893    #[tokio::test]
894    async fn concurrent_claim_attempts_have_one_winner() {
895        let service = WorkGraphService::with_scope(
896            Arc::new(MemoryWorkGraphStore::new()),
897            "realm",
898            WorkNamespace::default(),
899        );
900        let item = service.create(create_req("claim")).await.expect("create");
901        let request = ClaimWorkItemRequest {
902            id: item.id,
903            realm_id: None,
904            namespace: None,
905            expected_revision: item.revision,
906            owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
907            lease_seconds: Some(60),
908            lease_expires_at: None,
909        };
910        let first = service.claim(request.clone()).await;
911        let second = service.claim(request).await;
912        assert!(first.is_ok() ^ second.is_ok());
913    }
914
915    #[tokio::test]
916    async fn blocker_item_remains_claimable_after_linking_dependents() {
917        let service = WorkGraphService::with_scope(
918            Arc::new(MemoryWorkGraphStore::new()),
919            "realm",
920            WorkNamespace::default(),
921        );
922        let blocker = service
923            .create(create_req("blocker"))
924            .await
925            .expect("blocker");
926        let dependent = service
927            .create(create_req("dependent"))
928            .await
929            .expect("dependent");
930        service
931            .link(LinkWorkItemsRequest {
932                realm_id: None,
933                namespace: None,
934                kind: WorkEdgeKind::Blocks,
935                from_id: blocker.id.clone(),
936                to_id: dependent.id.clone(),
937            })
938            .await
939            .expect("link");
940
941        let claimed = service
942            .claim(ClaimWorkItemRequest {
943                id: blocker.id.clone(),
944                realm_id: None,
945                namespace: None,
946                expected_revision: blocker.revision,
947                owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
948                lease_seconds: Some(60),
949                lease_expires_at: None,
950            })
951            .await
952            .expect("blocker with outgoing dependencies should remain claimable");
953
954        assert_eq!(claimed.id, blocker.id);
955        assert_eq!(claimed.status, crate::WorkStatus::InProgress);
956    }
957
958    #[tokio::test]
959    async fn claim_recomputes_dependency_projection_before_admission() {
960        let store = Arc::new(MemoryWorkGraphStore::new());
961        let service =
962            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
963        let blocker = service
964            .create(create_req("blocker"))
965            .await
966            .expect("blocker");
967        let dependent = service
968            .create(create_req("dependent"))
969            .await
970            .expect("dependent");
971        let now = store.get_store_time_utc().await.expect("time");
972        store
973            .insert_edge(
974                WorkEdge {
975                    realm_id: "realm".to_string(),
976                    namespace: WorkNamespace::default(),
977                    kind: WorkEdgeKind::Blocks,
978                    from_id: blocker.id,
979                    to_id: dependent.id.clone(),
980                    created_at: now,
981                },
982                WorkGraphEvent::graph(
983                    "realm".to_string(),
984                    WorkNamespace::default(),
985                    WorkGraphEventKind::Linked,
986                    now,
987                    json!({ "test": "stale-projection" }),
988                ),
989            )
990            .await
991            .expect("raw edge insert");
992
993        let error = service
994            .claim(ClaimWorkItemRequest {
995                id: dependent.id,
996                realm_id: None,
997                namespace: None,
998                expected_revision: dependent.revision,
999                owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
1000                lease_seconds: Some(60),
1001                lease_expires_at: None,
1002            })
1003            .await
1004            .expect_err("fresh graph blockers should reject stale ready projection");
1005
1006        assert!(matches!(error, crate::WorkGraphError::InvalidTransition(_)));
1007    }
1008
1009    #[tokio::test]
1010    async fn dependency_cycles_are_rejected() {
1011        let service = WorkGraphService::with_scope(
1012            Arc::new(MemoryWorkGraphStore::new()),
1013            "realm",
1014            WorkNamespace::default(),
1015        );
1016        let first = service.create(create_req("first")).await.expect("first");
1017        let second = service.create(create_req("second")).await.expect("second");
1018        service
1019            .link(LinkWorkItemsRequest {
1020                realm_id: None,
1021                namespace: None,
1022                kind: WorkEdgeKind::Blocks,
1023                from_id: first.id.clone(),
1024                to_id: second.id.clone(),
1025            })
1026            .await
1027            .expect("first edge");
1028        let error = service
1029            .link(LinkWorkItemsRequest {
1030                realm_id: None,
1031                namespace: None,
1032                kind: WorkEdgeKind::Blocks,
1033                from_id: second.id,
1034                to_id: first.id,
1035            })
1036            .await
1037            .expect_err("cycle should fail");
1038        assert!(matches!(error, crate::WorkGraphError::InvalidTransition(_)));
1039    }
1040
1041    #[tokio::test]
1042    async fn topology_rejects_self_duplicate_and_missing_endpoint_edges() {
1043        let service = WorkGraphService::with_scope(
1044            Arc::new(MemoryWorkGraphStore::new()),
1045            "realm",
1046            WorkNamespace::default(),
1047        );
1048        let first = service.create(create_req("first")).await.expect("first");
1049        let second = service.create(create_req("second")).await.expect("second");
1050
1051        let self_edge = service
1052            .link(LinkWorkItemsRequest {
1053                realm_id: None,
1054                namespace: None,
1055                kind: WorkEdgeKind::Blocks,
1056                from_id: first.id.clone(),
1057                to_id: first.id.clone(),
1058            })
1059            .await
1060            .expect_err("self edge should fail");
1061        assert!(matches!(
1062            self_edge,
1063            crate::WorkGraphError::InvalidTransition(_)
1064        ));
1065
1066        let missing_endpoint = service
1067            .link(LinkWorkItemsRequest {
1068                realm_id: None,
1069                namespace: None,
1070                kind: WorkEdgeKind::Blocks,
1071                from_id: first.id.clone(),
1072                to_id: crate::WorkItemId::generated(),
1073            })
1074            .await
1075            .expect_err("missing endpoint should fail");
1076        assert!(matches!(
1077            missing_endpoint,
1078            crate::WorkGraphError::InvalidTransition(_)
1079        ));
1080
1081        service
1082            .link(LinkWorkItemsRequest {
1083                realm_id: None,
1084                namespace: None,
1085                kind: WorkEdgeKind::Blocks,
1086                from_id: first.id.clone(),
1087                to_id: second.id.clone(),
1088            })
1089            .await
1090            .expect("first edge");
1091
1092        let duplicate = service
1093            .link(LinkWorkItemsRequest {
1094                realm_id: None,
1095                namespace: None,
1096                kind: WorkEdgeKind::Blocks,
1097                from_id: first.id,
1098                to_id: second.id,
1099            })
1100            .await
1101            .expect_err("duplicate edge should fail");
1102        assert!(matches!(
1103            duplicate,
1104            crate::WorkGraphError::InvalidTransition(_)
1105        ));
1106    }
1107
1108    #[tokio::test]
1109    async fn snapshot_includes_items_edges_ready_ids_and_event_high_water_mark() {
1110        let service = WorkGraphService::with_scope(
1111            Arc::new(MemoryWorkGraphStore::new()),
1112            "realm",
1113            WorkNamespace::default(),
1114        );
1115        let blocker = service
1116            .create(create_req("blocker"))
1117            .await
1118            .expect("blocker");
1119        let blocked = service
1120            .create(create_req("blocked"))
1121            .await
1122            .expect("blocked");
1123        service
1124            .link(LinkWorkItemsRequest {
1125                realm_id: None,
1126                namespace: None,
1127                kind: WorkEdgeKind::Blocks,
1128                from_id: blocker.id.clone(),
1129                to_id: blocked.id.clone(),
1130            })
1131            .await
1132            .expect("link");
1133
1134        let snapshot = service
1135            .snapshot(crate::WorkGraphSnapshotFilter::default())
1136            .await
1137            .expect("snapshot");
1138        assert_eq!(snapshot.realm_id, "realm");
1139        assert_eq!(snapshot.items.len(), 2);
1140        assert_eq!(snapshot.edges.len(), 1);
1141        assert!(snapshot.ready_item_ids.iter().any(|id| id == &blocker.id));
1142        assert!(!snapshot.ready_item_ids.iter().any(|id| id == &blocked.id));
1143        assert!(snapshot.event_high_water_mark.is_some());
1144    }
1145
1146    #[tokio::test]
1147    async fn events_can_span_all_namespaces_when_requested() {
1148        let store = Arc::new(MemoryWorkGraphStore::new());
1149        let default_service =
1150            WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1151        let other_service = WorkGraphService::with_scope(
1152            store,
1153            "realm",
1154            WorkNamespace::new("other").expect("namespace"),
1155        );
1156
1157        default_service
1158            .create(create_req("default item"))
1159            .await
1160            .expect("default item");
1161        other_service
1162            .create(create_req("other item"))
1163            .await
1164            .expect("other item");
1165
1166        let default_events = default_service
1167            .events(WorkGraphEventFilter::default())
1168            .await
1169            .expect("default events");
1170        assert_eq!(default_events.len(), 1);
1171
1172        let all_events = default_service
1173            .events(WorkGraphEventFilter {
1174                all_namespaces: true,
1175                ..WorkGraphEventFilter::default()
1176            })
1177            .await
1178            .expect("all events");
1179        assert_eq!(all_events.len(), 2);
1180    }
1181}