1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machine::WorkGraphMachine;
8use crate::store::{WorkGraphEventFilter, WorkGraphStore};
9use crate::types::{
10 AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, CreateWorkItemRequest,
11 LinkWorkItemsRequest, ReadyWorkFilter, ReleaseWorkItemRequest, UpdateWorkItemRequest, WorkEdge,
12 WorkEdgeKind, WorkGraphEvent, WorkGraphEventKind, WorkGraphSnapshot, WorkGraphSnapshotFilter,
13 WorkItem, WorkItemFilter, WorkItemId, WorkNamespace,
14};
15
16const BEST_EFFORT_REFRESH_ATTEMPTS: usize = 3;
17
18#[derive(Clone)]
19pub struct WorkGraphService {
20 store: Arc<dyn WorkGraphStore>,
21 default_realm_id: Arc<str>,
22 default_namespace: WorkNamespace,
23}
24
25impl WorkGraphService {
26 pub fn new(store: Arc<dyn WorkGraphStore>) -> Self {
27 Self::with_scope(store, "default", WorkNamespace::default())
28 }
29
30 pub fn with_scope(
31 store: Arc<dyn WorkGraphStore>,
32 default_realm_id: impl Into<String>,
33 default_namespace: WorkNamespace,
34 ) -> Self {
35 Self {
36 store,
37 default_realm_id: Arc::<str>::from(default_realm_id.into()),
38 default_namespace,
39 }
40 }
41
42 pub fn store(&self) -> &Arc<dyn WorkGraphStore> {
43 &self.store
44 }
45
46 pub fn default_realm_id(&self) -> &str {
47 &self.default_realm_id
48 }
49
50 pub fn default_namespace(&self) -> &WorkNamespace {
51 &self.default_namespace
52 }
53
54 pub async fn create(&self, request: CreateWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
55 let now = self.store.get_store_time_utc().await?;
56 let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
57 let (item, event) = WorkGraphMachine::create_item(request, realm_id, namespace, now)?;
58 self.store.insert_item(item, event).await
59 }
60
61 pub async fn get(
62 &self,
63 realm_id: Option<String>,
64 namespace: Option<WorkNamespace>,
65 id: WorkItemId,
66 ) -> Result<WorkItem, WorkGraphError> {
67 let (realm_id, namespace) = self.scope(realm_id, namespace);
68 self.store
69 .get_item(&realm_id, &namespace, &id)
70 .await?
71 .ok_or_else(|| WorkGraphError::not_found(realm_id, namespace, id))
72 }
73
74 pub async fn list(&self, filter: WorkItemFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
75 self.store
76 .list_items(self.normalize_item_filter(filter))
77 .await
78 }
79
80 pub async fn ready(&self, filter: ReadyWorkFilter) -> Result<Vec<WorkItem>, WorkGraphError> {
81 let now = self.store.get_store_time_utc().await?;
82 let (realm_id, namespace) = self.scope(filter.realm_id.clone(), filter.namespace.clone());
83 let all_items = self
84 .store
85 .list_items(WorkItemFilter {
86 realm_id: Some(realm_id.clone()),
87 namespace: Some(namespace.clone()),
88 include_terminal: true,
89 ..WorkItemFilter::default()
90 })
91 .await?;
92 let labels = filter.labels.clone();
93 let mut ready = WorkGraphMachine::ready_items(
94 all_items
95 .into_iter()
96 .filter(|item| labels.iter().all(|label| item.labels.contains(label)))
97 .collect(),
98 now,
99 );
100 if let Some(limit) = filter.limit {
101 ready.truncate(limit);
102 }
103 Ok(ready)
104 }
105
106 pub async fn snapshot(
107 &self,
108 filter: WorkGraphSnapshotFilter,
109 ) -> Result<WorkGraphSnapshot, WorkGraphError> {
110 let captured_at = self.store.get_store_time_utc().await?;
111 let filter = self.normalize_snapshot_filter(filter);
112 let realm_id = filter
113 .realm_id
114 .clone()
115 .unwrap_or_else(|| self.default_realm_id.to_string());
116 let items = self
117 .store
118 .list_items(WorkItemFilter {
119 realm_id: Some(realm_id.clone()),
120 namespace: filter.namespace.clone(),
121 all_namespaces: filter.all_namespaces,
122 statuses: filter.statuses.clone(),
123 labels: filter.labels.clone(),
124 include_terminal: filter.include_terminal,
125 limit: filter.limit,
126 })
127 .await?;
128
129 let namespaces = self.snapshot_namespaces(&realm_id, &filter, &items).await?;
130 let mut edges = Vec::new();
131 for namespace in &namespaces {
132 edges.extend(self.store.list_edges(&realm_id, namespace).await?);
133 }
134
135 let ready_item_ids = self
136 .ready_item_ids_in_namespaces(&realm_id, &namespaces, &filter.labels, captured_at)
137 .await?;
138 let event_high_water_mark = self
139 .store
140 .list_events(WorkGraphEventFilter {
141 realm_id: Some(realm_id.clone()),
142 namespace: if filter.all_namespaces {
143 None
144 } else {
145 filter.namespace.clone()
146 },
147 all_namespaces: filter.all_namespaces,
148 after_seq: None,
149 limit: None,
150 })
151 .await?
152 .into_iter()
153 .filter_map(|event| event.seq)
154 .max();
155
156 Ok(WorkGraphSnapshot {
157 realm_id,
158 namespace: if filter.all_namespaces {
159 None
160 } else {
161 filter.namespace
162 },
163 all_namespaces: filter.all_namespaces,
164 captured_at,
165 event_high_water_mark,
166 items,
167 edges,
168 ready_item_ids,
169 })
170 }
171
172 pub async fn claim(&self, request: ClaimWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
173 let now = self.store.get_store_time_utc().await?;
174 let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
175 let item = self
176 .store
177 .get_item(&realm_id, &namespace, &request.id)
178 .await?
179 .ok_or_else(|| {
180 WorkGraphError::not_found(realm_id.clone(), namespace.clone(), request.id.clone())
181 })?;
182 let expected_previous_revision = item.revision;
183 let unresolved_blockers = self
184 .unresolved_blocker_count_for_item(&realm_id, &namespace, &item)
185 .await?;
186 let (item, event) = WorkGraphMachine::claim_item_with_unresolved_blockers(
187 item,
188 unresolved_blockers,
189 request,
190 now,
191 )?;
192 self.store
193 .update_item_cas(item, expected_previous_revision, event)
194 .await
195 }
196
197 pub async fn release(
198 &self,
199 request: ReleaseWorkItemRequest,
200 ) -> Result<WorkItem, WorkGraphError> {
201 let now = self.store.get_store_time_utc().await?;
202 let item = self
203 .get(
204 request.realm_id.clone(),
205 request.namespace.clone(),
206 request.id.clone(),
207 )
208 .await?;
209 let expected_previous_revision = item.revision;
210 let (item, event) = WorkGraphMachine::release_item(item, request, now)?;
211 self.store
212 .update_item_cas(item, expected_previous_revision, event)
213 .await
214 }
215
216 pub async fn update(&self, request: UpdateWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
217 let now = self.store.get_store_time_utc().await?;
218 let item = self
219 .get(
220 request.realm_id.clone(),
221 request.namespace.clone(),
222 request.id.clone(),
223 )
224 .await?;
225 let expected_previous_revision = item.revision;
226 let (item, event) = WorkGraphMachine::update_item(item, request, now)?;
227 self.store
228 .update_item_cas(item, expected_previous_revision, event)
229 .await
230 }
231
232 pub async fn block(
233 &self,
234 realm_id: Option<String>,
235 namespace: Option<WorkNamespace>,
236 id: WorkItemId,
237 expected_revision: u64,
238 ) -> Result<WorkItem, WorkGraphError> {
239 let now = self.store.get_store_time_utc().await?;
240 let item = self.get(realm_id, namespace, id).await?;
241 let expected_previous_revision = item.revision;
242 let (item, event) = WorkGraphMachine::block_item(item, expected_revision, now)?;
243 self.store
244 .update_item_cas(item, expected_previous_revision, event)
245 .await
246 }
247
248 pub async fn close(&self, request: CloseWorkItemRequest) -> Result<WorkItem, WorkGraphError> {
249 let now = self.store.get_store_time_utc().await?;
250 let item = self
251 .get(
252 request.realm_id.clone(),
253 request.namespace.clone(),
254 request.id.clone(),
255 )
256 .await?;
257 let expected_previous_revision = item.revision;
258 let (item, event) = WorkGraphMachine::close_item(item, request, now)?;
259 let closed = self
260 .store
261 .update_item_cas(item, expected_previous_revision, event)
262 .await?;
263 self.best_effort_refresh_dependents_after_blocker_change(&closed, now)
264 .await;
265 Ok(closed)
266 }
267
268 pub async fn link(&self, request: LinkWorkItemsRequest) -> Result<WorkEdge, WorkGraphError> {
269 let now = self.store.get_store_time_utc().await?;
270 let (realm_id, namespace) = self.scope(request.realm_id.clone(), request.namespace.clone());
271 let edge = WorkEdge {
272 realm_id,
273 namespace,
274 kind: request.kind,
275 from_id: request.from_id,
276 to_id: request.to_id,
277 created_at: now,
278 };
279 let existing_edges = self
280 .store
281 .list_edges(&edge.realm_id, &edge.namespace)
282 .await?;
283 let existing_items = self
284 .store
285 .list_items(WorkItemFilter {
286 realm_id: Some(edge.realm_id.clone()),
287 namespace: Some(edge.namespace.clone()),
288 include_terminal: true,
289 ..WorkItemFilter::default()
290 })
291 .await?;
292 WorkGraphMachine::validate_link(&edge, &existing_items, &existing_edges)?;
293 let event = WorkGraphEvent::graph(
294 edge.realm_id.clone(),
295 edge.namespace.clone(),
296 WorkGraphEventKind::Linked,
297 now,
298 json!({ "edge": edge }),
299 );
300 let inserted = self.store.insert_edge(edge, event).await?;
301 if inserted.kind == WorkEdgeKind::Blocks {
302 self.best_effort_refresh_item_eligibility(
303 &inserted.realm_id,
304 &inserted.namespace,
305 &inserted.to_id,
306 now,
307 )
308 .await;
309 }
310 Ok(inserted)
311 }
312
313 pub async fn add_evidence(
314 &self,
315 request: AddEvidenceRequest,
316 ) -> Result<WorkItem, WorkGraphError> {
317 let now = self.store.get_store_time_utc().await?;
318 let item = self
319 .get(
320 request.realm_id.clone(),
321 request.namespace.clone(),
322 request.id.clone(),
323 )
324 .await?;
325 let expected_previous_revision = item.revision;
326 let (item, event) = WorkGraphMachine::add_evidence(item, request, now)?;
327 self.store
328 .update_item_cas(item, expected_previous_revision, event)
329 .await
330 }
331
332 pub async fn events(
333 &self,
334 mut filter: WorkGraphEventFilter,
335 ) -> Result<Vec<WorkGraphEvent>, WorkGraphError> {
336 if filter.realm_id.is_none() {
337 filter.realm_id = Some(self.default_realm_id.to_string());
338 }
339 if !filter.all_namespaces && filter.namespace.is_none() {
340 filter.namespace = Some(self.default_namespace.clone());
341 }
342 self.store.list_events(filter).await
343 }
344
345 fn scope(
346 &self,
347 realm_id: Option<String>,
348 namespace: Option<WorkNamespace>,
349 ) -> (String, WorkNamespace) {
350 (
351 realm_id.unwrap_or_else(|| self.default_realm_id.to_string()),
352 namespace.unwrap_or_else(|| self.default_namespace.clone()),
353 )
354 }
355
356 fn normalize_item_filter(&self, mut filter: WorkItemFilter) -> WorkItemFilter {
357 if filter.realm_id.is_none() {
358 filter.realm_id = Some(self.default_realm_id.to_string());
359 }
360 if !filter.all_namespaces && filter.namespace.is_none() {
361 filter.namespace = Some(self.default_namespace.clone());
362 }
363 filter
364 }
365
366 fn normalize_snapshot_filter(
367 &self,
368 mut filter: WorkGraphSnapshotFilter,
369 ) -> WorkGraphSnapshotFilter {
370 if filter.realm_id.is_none() {
371 filter.realm_id = Some(self.default_realm_id.to_string());
372 }
373 if !filter.all_namespaces && filter.namespace.is_none() {
374 filter.namespace = Some(self.default_namespace.clone());
375 }
376 filter
377 }
378
379 async fn snapshot_namespaces(
380 &self,
381 realm_id: &str,
382 filter: &WorkGraphSnapshotFilter,
383 items: &[WorkItem],
384 ) -> Result<BTreeSet<WorkNamespace>, WorkGraphError> {
385 if !filter.all_namespaces {
386 return Ok(BTreeSet::from_iter([filter
387 .namespace
388 .clone()
389 .unwrap_or_else(|| self.default_namespace.clone())]));
390 }
391
392 let mut namespaces = items
393 .iter()
394 .map(|item| item.namespace.clone())
395 .collect::<BTreeSet<_>>();
396 if namespaces.is_empty() {
397 namespaces.extend(
398 self.store
399 .list_events(WorkGraphEventFilter {
400 realm_id: Some(realm_id.to_string()),
401 namespace: None,
402 all_namespaces: true,
403 after_seq: None,
404 limit: None,
405 })
406 .await?
407 .into_iter()
408 .map(|event| event.namespace),
409 );
410 }
411 Ok(namespaces)
412 }
413
414 async fn ready_item_ids_in_namespaces(
415 &self,
416 realm_id: &str,
417 namespaces: &BTreeSet<WorkNamespace>,
418 labels: &[String],
419 now: chrono::DateTime<chrono::Utc>,
420 ) -> Result<Vec<WorkItemId>, WorkGraphError> {
421 let mut ready_ids = Vec::new();
422 for namespace in namespaces {
423 let all_items = self
424 .store
425 .list_items(WorkItemFilter {
426 realm_id: Some(realm_id.to_string()),
427 namespace: Some(namespace.clone()),
428 include_terminal: true,
429 ..WorkItemFilter::default()
430 })
431 .await?;
432 let ready_items = WorkGraphMachine::ready_items(
433 all_items
434 .into_iter()
435 .filter(|item| labels.iter().all(|label| item.labels.contains(label)))
436 .collect(),
437 now,
438 );
439 ready_ids.extend(ready_items.into_iter().map(|item| item.id));
440 }
441 Ok(ready_ids)
442 }
443
444 async fn refresh_dependents_after_blocker_change(
445 &self,
446 blocker: &WorkItem,
447 now: chrono::DateTime<chrono::Utc>,
448 ) -> Result<(), WorkGraphError> {
449 let edges = self
450 .store
451 .list_edges(&blocker.realm_id, &blocker.namespace)
452 .await?;
453 for edge in edges
454 .iter()
455 .filter(|edge| edge.kind == WorkEdgeKind::Blocks && edge.from_id == blocker.id)
456 {
457 self.refresh_item_eligibility(&blocker.realm_id, &blocker.namespace, &edge.to_id, now)
458 .await?;
459 }
460 Ok(())
461 }
462
463 async fn best_effort_refresh_dependents_after_blocker_change(
464 &self,
465 blocker: &WorkItem,
466 now: chrono::DateTime<chrono::Utc>,
467 ) {
468 for _ in 0..BEST_EFFORT_REFRESH_ATTEMPTS {
469 match self
470 .refresh_dependents_after_blocker_change(blocker, now)
471 .await
472 {
473 Ok(()) => return,
474 Err(WorkGraphError::StaleRevision { .. }) => continue,
475 Err(_) => return,
476 }
477 }
478 }
479
480 async fn best_effort_refresh_item_eligibility(
481 &self,
482 realm_id: &str,
483 namespace: &WorkNamespace,
484 id: &WorkItemId,
485 now: chrono::DateTime<chrono::Utc>,
486 ) {
487 for _ in 0..BEST_EFFORT_REFRESH_ATTEMPTS {
488 match self
489 .refresh_item_eligibility(realm_id, namespace, id, now)
490 .await
491 {
492 Ok(()) => return,
493 Err(WorkGraphError::StaleRevision { .. }) => continue,
494 Err(_) => return,
495 }
496 }
497 }
498
499 async fn refresh_item_eligibility(
500 &self,
501 realm_id: &str,
502 namespace: &WorkNamespace,
503 id: &WorkItemId,
504 now: chrono::DateTime<chrono::Utc>,
505 ) -> Result<(), WorkGraphError> {
506 let Some(item) = self.store.get_item(realm_id, namespace, id).await? else {
507 return Ok(());
508 };
509 let all_items = self
510 .store
511 .list_items(WorkItemFilter {
512 realm_id: Some(realm_id.to_string()),
513 namespace: Some(namespace.clone()),
514 include_terminal: true,
515 ..WorkItemFilter::default()
516 })
517 .await?
518 .into_iter()
519 .map(|item| (item.id.clone(), item))
520 .collect::<BTreeMap<_, _>>();
521 let edges = self.store.list_edges(realm_id, namespace).await?;
522 let unresolved_blockers = unresolved_blocker_count(&item, &all_items, &edges);
523 if let Some((item, event)) =
524 WorkGraphMachine::refresh_eligibility(item, unresolved_blockers, now)?
525 {
526 let expected_previous_revision = item.revision;
527 self.store
528 .update_item_cas(item, expected_previous_revision, event)
529 .await?;
530 }
531 Ok(())
532 }
533
534 async fn unresolved_blocker_count_for_item(
535 &self,
536 realm_id: &str,
537 namespace: &WorkNamespace,
538 item: &WorkItem,
539 ) -> Result<u64, WorkGraphError> {
540 let all_items = self
541 .store
542 .list_items(WorkItemFilter {
543 realm_id: Some(realm_id.to_string()),
544 namespace: Some(namespace.clone()),
545 include_terminal: true,
546 ..WorkItemFilter::default()
547 })
548 .await?
549 .into_iter()
550 .map(|item| (item.id.clone(), item))
551 .collect::<BTreeMap<_, _>>();
552 let edges = self.store.list_edges(realm_id, namespace).await?;
553 Ok(unresolved_blocker_count(item, &all_items, &edges))
554 }
555}
556
557fn unresolved_blocker_count(
558 item: &WorkItem,
559 all_items: &BTreeMap<WorkItemId, WorkItem>,
560 edges: &[WorkEdge],
561) -> u64 {
562 edges
563 .iter()
564 .filter(|edge| edge.kind == WorkEdgeKind::Blocks && edge.to_id == item.id)
565 .filter(|edge| {
566 all_items
567 .get(&edge.from_id)
568 .is_none_or(|blocker| !blocker.status.is_terminal_success())
569 })
570 .count()
571 .try_into()
572 .unwrap_or(u64::MAX)
573}
574
575#[cfg(test)]
576#[allow(clippy::expect_used, clippy::unwrap_used)]
577mod tests {
578 use std::collections::BTreeSet;
579 use std::sync::Arc;
580 use std::sync::atomic::{AtomicUsize, Ordering};
581
582 use async_trait::async_trait;
583 use chrono::{DateTime, Utc};
584 use serde_json::json;
585
586 use crate::store::WorkGraphEventFilter;
587 use crate::types::{
588 ClaimWorkItemRequest, LinkWorkItemsRequest, WorkEdge, WorkEdgeKind, WorkGraphEvent,
589 WorkGraphEventKind, WorkItem, WorkItemFilter, WorkOwner, WorkOwnerKey,
590 };
591 use crate::{
592 CreateWorkItemRequest, MemoryWorkGraphStore, UpdateWorkItemRequest, WorkGraphService,
593 WorkGraphStore, WorkGraphStoreKind, WorkItemId, WorkNamespace,
594 };
595
596 fn create_req(title: &str) -> CreateWorkItemRequest {
597 CreateWorkItemRequest {
598 realm_id: None,
599 namespace: None,
600 title: title.to_string(),
601 description: None,
602 priority: Default::default(),
603 labels: BTreeSet::new(),
604 due_at: None,
605 not_before: None,
606 snoozed_until: None,
607 external_refs: Vec::new(),
608 evidence_refs: Vec::new(),
609 status: None,
610 }
611 }
612
613 struct RefreshConflictStore {
614 inner: MemoryWorkGraphStore,
615 fail_updated_events: AtomicUsize,
616 }
617
618 impl RefreshConflictStore {
619 fn new() -> Self {
620 Self {
621 inner: MemoryWorkGraphStore::new(),
622 fail_updated_events: AtomicUsize::new(0),
623 }
624 }
625
626 fn fail_next_refresh_update(&self) {
627 self.fail_updated_events.fetch_add(1, Ordering::SeqCst);
628 }
629 }
630
631 #[async_trait]
632 impl WorkGraphStore for RefreshConflictStore {
633 fn kind(&self) -> WorkGraphStoreKind {
634 WorkGraphStoreKind::Custom
635 }
636
637 async fn get_store_time_utc(&self) -> Result<DateTime<Utc>, crate::WorkGraphError> {
638 self.inner.get_store_time_utc().await
639 }
640
641 async fn insert_item(
642 &self,
643 item: WorkItem,
644 event: WorkGraphEvent,
645 ) -> Result<WorkItem, crate::WorkGraphError> {
646 self.inner.insert_item(item, event).await
647 }
648
649 async fn update_item_cas(
650 &self,
651 item: WorkItem,
652 expected_previous_revision: u64,
653 event: WorkGraphEvent,
654 ) -> Result<WorkItem, crate::WorkGraphError> {
655 if event.kind == WorkGraphEventKind::Updated
656 && self
657 .fail_updated_events
658 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |remaining| {
659 remaining.checked_sub(1)
660 })
661 .is_ok()
662 {
663 return Err(crate::WorkGraphError::StaleRevision {
664 id: item.id,
665 expected: expected_previous_revision,
666 actual: expected_previous_revision.saturating_add(1),
667 });
668 }
669 self.inner
670 .update_item_cas(item, expected_previous_revision, event)
671 .await
672 }
673
674 async fn get_item(
675 &self,
676 realm_id: &str,
677 namespace: &WorkNamespace,
678 id: &WorkItemId,
679 ) -> Result<Option<WorkItem>, crate::WorkGraphError> {
680 self.inner.get_item(realm_id, namespace, id).await
681 }
682
683 async fn list_items(
684 &self,
685 filter: WorkItemFilter,
686 ) -> Result<Vec<WorkItem>, crate::WorkGraphError> {
687 self.inner.list_items(filter).await
688 }
689
690 async fn insert_edge(
691 &self,
692 edge: WorkEdge,
693 event: WorkGraphEvent,
694 ) -> Result<WorkEdge, crate::WorkGraphError> {
695 self.inner.insert_edge(edge, event).await
696 }
697
698 async fn list_edges(
699 &self,
700 realm_id: &str,
701 namespace: &WorkNamespace,
702 ) -> Result<Vec<WorkEdge>, crate::WorkGraphError> {
703 self.inner.list_edges(realm_id, namespace).await
704 }
705
706 async fn list_events(
707 &self,
708 filter: WorkGraphEventFilter,
709 ) -> Result<Vec<WorkGraphEvent>, crate::WorkGraphError> {
710 self.inner.list_events(filter).await
711 }
712 }
713
714 #[tokio::test]
715 async fn blocked_dependencies_are_not_ready_until_completed() {
716 let service = WorkGraphService::with_scope(
717 Arc::new(MemoryWorkGraphStore::new()),
718 "realm",
719 WorkNamespace::default(),
720 );
721 let blocker = service
722 .create(create_req("blocker"))
723 .await
724 .expect("blocker");
725 let blocked = service
726 .create(create_req("blocked"))
727 .await
728 .expect("blocked");
729 service
730 .link(LinkWorkItemsRequest {
731 realm_id: None,
732 namespace: None,
733 kind: WorkEdgeKind::Blocks,
734 from_id: blocker.id.clone(),
735 to_id: blocked.id.clone(),
736 })
737 .await
738 .expect("link");
739
740 let ready = service.ready(Default::default()).await.expect("ready");
741 assert!(ready.iter().any(|item| item.id == blocker.id));
742 assert!(!ready.iter().any(|item| item.id == blocked.id));
743 service
744 .close(crate::CloseWorkItemRequest {
745 id: blocker.id,
746 realm_id: None,
747 namespace: None,
748 expected_revision: blocker.revision,
749 status: crate::WorkStatus::Completed,
750 })
751 .await
752 .expect("close blocker");
753 let ready = service.ready(Default::default()).await.expect("ready");
754 assert!(ready.iter().any(|item| item.id == blocked.id));
755 }
756
757 #[tokio::test]
758 async fn link_reports_success_when_post_insert_refresh_conflicts() {
759 let store = Arc::new(RefreshConflictStore::new());
760 let service =
761 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
762 let blocker = service
763 .create(create_req("blocker"))
764 .await
765 .expect("blocker");
766 let blocked = service
767 .create(create_req("blocked"))
768 .await
769 .expect("blocked");
770
771 store.fail_next_refresh_update();
772 let edge = service
773 .link(LinkWorkItemsRequest {
774 realm_id: None,
775 namespace: None,
776 kind: WorkEdgeKind::Blocks,
777 from_id: blocker.id.clone(),
778 to_id: blocked.id.clone(),
779 })
780 .await
781 .expect("link should report inserted edge despite refresh conflict");
782
783 assert_eq!(edge.from_id, blocker.id);
784 assert_eq!(edge.to_id, blocked.id);
785 let edges = store
786 .list_edges("realm", &WorkNamespace::default())
787 .await
788 .expect("edges");
789 assert_eq!(edges.len(), 1);
790 let ready = service.ready(Default::default()).await.expect("ready");
791 assert!(!ready.iter().any(|item| item.id == blocked.id));
792 }
793
794 #[tokio::test]
795 async fn close_reports_success_when_dependent_refresh_conflicts() {
796 let store = Arc::new(RefreshConflictStore::new());
797 let service =
798 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
799 let blocker = service
800 .create(create_req("blocker"))
801 .await
802 .expect("blocker");
803 let blocked = service
804 .create(create_req("blocked"))
805 .await
806 .expect("blocked");
807 service
808 .link(LinkWorkItemsRequest {
809 realm_id: None,
810 namespace: None,
811 kind: WorkEdgeKind::Blocks,
812 from_id: blocker.id.clone(),
813 to_id: blocked.id.clone(),
814 })
815 .await
816 .expect("link");
817
818 store.fail_next_refresh_update();
819 let closed = service
820 .close(crate::CloseWorkItemRequest {
821 id: blocker.id.clone(),
822 realm_id: None,
823 namespace: None,
824 expected_revision: blocker.revision,
825 status: crate::WorkStatus::Completed,
826 })
827 .await
828 .expect("close should report committed terminal item despite refresh conflict");
829
830 assert_eq!(closed.id, blocker.id);
831 assert_eq!(closed.status, crate::WorkStatus::Completed);
832 let fetched = service
833 .get(None, None, closed.id)
834 .await
835 .expect("closed item should be stored");
836 assert_eq!(fetched.status, crate::WorkStatus::Completed);
837 let ready = service.ready(Default::default()).await.expect("ready");
838 assert!(ready.iter().any(|item| item.id == blocked.id));
839 }
840
841 #[tokio::test]
842 async fn blocked_dependency_stays_unready_after_item_update() {
843 let service = WorkGraphService::with_scope(
844 Arc::new(MemoryWorkGraphStore::new()),
845 "realm",
846 WorkNamespace::default(),
847 );
848 let blocker = service
849 .create(create_req("blocker"))
850 .await
851 .expect("blocker");
852 let blocked = service
853 .create(create_req("blocked"))
854 .await
855 .expect("blocked");
856 service
857 .link(LinkWorkItemsRequest {
858 realm_id: None,
859 namespace: None,
860 kind: WorkEdgeKind::Blocks,
861 from_id: blocker.id,
862 to_id: blocked.id.clone(),
863 })
864 .await
865 .expect("link");
866 let blocked = service
867 .get(None, None, blocked.id.clone())
868 .await
869 .expect("blocked after link");
870
871 service
872 .update(UpdateWorkItemRequest {
873 id: blocked.id.clone(),
874 realm_id: None,
875 namespace: None,
876 expected_revision: blocked.revision,
877 title: Some("blocked, updated".to_string()),
878 description: None,
879 priority: None,
880 labels: None,
881 due_at: None,
882 not_before: None,
883 snoozed_until: None,
884 external_refs: Vec::new(),
885 })
886 .await
887 .expect("update blocked item");
888
889 let ready = service.ready(Default::default()).await.expect("ready");
890 assert!(!ready.iter().any(|item| item.id == blocked.id));
891 }
892
893 #[tokio::test]
894 async fn concurrent_claim_attempts_have_one_winner() {
895 let service = WorkGraphService::with_scope(
896 Arc::new(MemoryWorkGraphStore::new()),
897 "realm",
898 WorkNamespace::default(),
899 );
900 let item = service.create(create_req("claim")).await.expect("create");
901 let request = ClaimWorkItemRequest {
902 id: item.id,
903 realm_id: None,
904 namespace: None,
905 expected_revision: item.revision,
906 owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
907 lease_seconds: Some(60),
908 lease_expires_at: None,
909 };
910 let first = service.claim(request.clone()).await;
911 let second = service.claim(request).await;
912 assert!(first.is_ok() ^ second.is_ok());
913 }
914
915 #[tokio::test]
916 async fn blocker_item_remains_claimable_after_linking_dependents() {
917 let service = WorkGraphService::with_scope(
918 Arc::new(MemoryWorkGraphStore::new()),
919 "realm",
920 WorkNamespace::default(),
921 );
922 let blocker = service
923 .create(create_req("blocker"))
924 .await
925 .expect("blocker");
926 let dependent = service
927 .create(create_req("dependent"))
928 .await
929 .expect("dependent");
930 service
931 .link(LinkWorkItemsRequest {
932 realm_id: None,
933 namespace: None,
934 kind: WorkEdgeKind::Blocks,
935 from_id: blocker.id.clone(),
936 to_id: dependent.id.clone(),
937 })
938 .await
939 .expect("link");
940
941 let claimed = service
942 .claim(ClaimWorkItemRequest {
943 id: blocker.id.clone(),
944 realm_id: None,
945 namespace: None,
946 expected_revision: blocker.revision,
947 owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
948 lease_seconds: Some(60),
949 lease_expires_at: None,
950 })
951 .await
952 .expect("blocker with outgoing dependencies should remain claimable");
953
954 assert_eq!(claimed.id, blocker.id);
955 assert_eq!(claimed.status, crate::WorkStatus::InProgress);
956 }
957
958 #[tokio::test]
959 async fn claim_recomputes_dependency_projection_before_admission() {
960 let store = Arc::new(MemoryWorkGraphStore::new());
961 let service =
962 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
963 let blocker = service
964 .create(create_req("blocker"))
965 .await
966 .expect("blocker");
967 let dependent = service
968 .create(create_req("dependent"))
969 .await
970 .expect("dependent");
971 let now = store.get_store_time_utc().await.expect("time");
972 store
973 .insert_edge(
974 WorkEdge {
975 realm_id: "realm".to_string(),
976 namespace: WorkNamespace::default(),
977 kind: WorkEdgeKind::Blocks,
978 from_id: blocker.id,
979 to_id: dependent.id.clone(),
980 created_at: now,
981 },
982 WorkGraphEvent::graph(
983 "realm".to_string(),
984 WorkNamespace::default(),
985 WorkGraphEventKind::Linked,
986 now,
987 json!({ "test": "stale-projection" }),
988 ),
989 )
990 .await
991 .expect("raw edge insert");
992
993 let error = service
994 .claim(ClaimWorkItemRequest {
995 id: dependent.id,
996 realm_id: None,
997 namespace: None,
998 expected_revision: dependent.revision,
999 owner: WorkOwner::new(WorkOwnerKey::label("worker").expect("owner key")),
1000 lease_seconds: Some(60),
1001 lease_expires_at: None,
1002 })
1003 .await
1004 .expect_err("fresh graph blockers should reject stale ready projection");
1005
1006 assert!(matches!(error, crate::WorkGraphError::InvalidTransition(_)));
1007 }
1008
1009 #[tokio::test]
1010 async fn dependency_cycles_are_rejected() {
1011 let service = WorkGraphService::with_scope(
1012 Arc::new(MemoryWorkGraphStore::new()),
1013 "realm",
1014 WorkNamespace::default(),
1015 );
1016 let first = service.create(create_req("first")).await.expect("first");
1017 let second = service.create(create_req("second")).await.expect("second");
1018 service
1019 .link(LinkWorkItemsRequest {
1020 realm_id: None,
1021 namespace: None,
1022 kind: WorkEdgeKind::Blocks,
1023 from_id: first.id.clone(),
1024 to_id: second.id.clone(),
1025 })
1026 .await
1027 .expect("first edge");
1028 let error = service
1029 .link(LinkWorkItemsRequest {
1030 realm_id: None,
1031 namespace: None,
1032 kind: WorkEdgeKind::Blocks,
1033 from_id: second.id,
1034 to_id: first.id,
1035 })
1036 .await
1037 .expect_err("cycle should fail");
1038 assert!(matches!(error, crate::WorkGraphError::InvalidTransition(_)));
1039 }
1040
1041 #[tokio::test]
1042 async fn topology_rejects_self_duplicate_and_missing_endpoint_edges() {
1043 let service = WorkGraphService::with_scope(
1044 Arc::new(MemoryWorkGraphStore::new()),
1045 "realm",
1046 WorkNamespace::default(),
1047 );
1048 let first = service.create(create_req("first")).await.expect("first");
1049 let second = service.create(create_req("second")).await.expect("second");
1050
1051 let self_edge = service
1052 .link(LinkWorkItemsRequest {
1053 realm_id: None,
1054 namespace: None,
1055 kind: WorkEdgeKind::Blocks,
1056 from_id: first.id.clone(),
1057 to_id: first.id.clone(),
1058 })
1059 .await
1060 .expect_err("self edge should fail");
1061 assert!(matches!(
1062 self_edge,
1063 crate::WorkGraphError::InvalidTransition(_)
1064 ));
1065
1066 let missing_endpoint = service
1067 .link(LinkWorkItemsRequest {
1068 realm_id: None,
1069 namespace: None,
1070 kind: WorkEdgeKind::Blocks,
1071 from_id: first.id.clone(),
1072 to_id: crate::WorkItemId::generated(),
1073 })
1074 .await
1075 .expect_err("missing endpoint should fail");
1076 assert!(matches!(
1077 missing_endpoint,
1078 crate::WorkGraphError::InvalidTransition(_)
1079 ));
1080
1081 service
1082 .link(LinkWorkItemsRequest {
1083 realm_id: None,
1084 namespace: None,
1085 kind: WorkEdgeKind::Blocks,
1086 from_id: first.id.clone(),
1087 to_id: second.id.clone(),
1088 })
1089 .await
1090 .expect("first edge");
1091
1092 let duplicate = service
1093 .link(LinkWorkItemsRequest {
1094 realm_id: None,
1095 namespace: None,
1096 kind: WorkEdgeKind::Blocks,
1097 from_id: first.id,
1098 to_id: second.id,
1099 })
1100 .await
1101 .expect_err("duplicate edge should fail");
1102 assert!(matches!(
1103 duplicate,
1104 crate::WorkGraphError::InvalidTransition(_)
1105 ));
1106 }
1107
1108 #[tokio::test]
1109 async fn snapshot_includes_items_edges_ready_ids_and_event_high_water_mark() {
1110 let service = WorkGraphService::with_scope(
1111 Arc::new(MemoryWorkGraphStore::new()),
1112 "realm",
1113 WorkNamespace::default(),
1114 );
1115 let blocker = service
1116 .create(create_req("blocker"))
1117 .await
1118 .expect("blocker");
1119 let blocked = service
1120 .create(create_req("blocked"))
1121 .await
1122 .expect("blocked");
1123 service
1124 .link(LinkWorkItemsRequest {
1125 realm_id: None,
1126 namespace: None,
1127 kind: WorkEdgeKind::Blocks,
1128 from_id: blocker.id.clone(),
1129 to_id: blocked.id.clone(),
1130 })
1131 .await
1132 .expect("link");
1133
1134 let snapshot = service
1135 .snapshot(crate::WorkGraphSnapshotFilter::default())
1136 .await
1137 .expect("snapshot");
1138 assert_eq!(snapshot.realm_id, "realm");
1139 assert_eq!(snapshot.items.len(), 2);
1140 assert_eq!(snapshot.edges.len(), 1);
1141 assert!(snapshot.ready_item_ids.iter().any(|id| id == &blocker.id));
1142 assert!(!snapshot.ready_item_ids.iter().any(|id| id == &blocked.id));
1143 assert!(snapshot.event_high_water_mark.is_some());
1144 }
1145
1146 #[tokio::test]
1147 async fn events_can_span_all_namespaces_when_requested() {
1148 let store = Arc::new(MemoryWorkGraphStore::new());
1149 let default_service =
1150 WorkGraphService::with_scope(store.clone(), "realm", WorkNamespace::default());
1151 let other_service = WorkGraphService::with_scope(
1152 store,
1153 "realm",
1154 WorkNamespace::new("other").expect("namespace"),
1155 );
1156
1157 default_service
1158 .create(create_req("default item"))
1159 .await
1160 .expect("default item");
1161 other_service
1162 .create(create_req("other item"))
1163 .await
1164 .expect("other item");
1165
1166 let default_events = default_service
1167 .events(WorkGraphEventFilter::default())
1168 .await
1169 .expect("default events");
1170 assert_eq!(default_events.len(), 1);
1171
1172 let all_events = default_service
1173 .events(WorkGraphEventFilter {
1174 all_namespaces: true,
1175 ..WorkGraphEventFilter::default()
1176 })
1177 .await
1178 .expect("all events");
1179 assert_eq!(all_events.len(), 2);
1180 }
1181}