1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::{DateTime, Duration, Utc};
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machines::{work_attention_lifecycle as attention_dsl, workgraph_lifecycle as wg_dsl};
8use crate::types::{
9 AddEvidenceRequest, AttentionDelegatedAuthority, ClaimWorkItemRequest, CloseWorkItemRequest,
10 CreateWorkItemRequest, ProjectedAttentionAuthority, ReleaseWorkItemRequest,
11 UpdateWorkItemRequest, WorkAttentionBinding, WorkAttentionMode, WorkAttentionStatus, WorkClaim,
12 WorkCompletionPolicy, WorkEdge, WorkEdgeKind, WorkGraphEvent, WorkGraphEventKind,
13 WorkGraphMachineState, WorkItem, WorkItemId, WorkNamespace, WorkStatus,
14};
15
16pub use wg_dsl::WorkGraphPublicErrorClass;
22
23pub use wg_dsl::WorkPublicConfirmationAdmissionKind;
31
32pub use wg_dsl::WorkCompletionPolicyMutationAdmissionKind;
41
42#[derive(Debug, Default, Clone, Copy)]
43pub struct WorkAttentionMachine;
44
45impl WorkAttentionMachine {
46 pub fn pause(
47 mut binding: WorkAttentionBinding,
48 expected_revision: u64,
49 until: Option<DateTime<Utc>>,
50 now: DateTime<Utc>,
51 ) -> Result<WorkAttentionBinding, WorkGraphError> {
52 let input = attention_dsl::WorkAttentionLifecycleInput::Pause {
53 expected_revision,
54 until_utc_ms: until.map(datetime_to_millis),
55 };
56 binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
57 sync_attention_from_machine_state(&mut binding);
58 binding.updated_at = now;
59 Ok(binding)
60 }
61
62 pub fn resume(
63 mut binding: WorkAttentionBinding,
64 expected_revision: u64,
65 now: DateTime<Utc>,
66 ) -> Result<WorkAttentionBinding, WorkGraphError> {
67 let input = attention_dsl::WorkAttentionLifecycleInput::Resume { expected_revision };
68 binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
69 sync_attention_from_machine_state(&mut binding);
70 binding.updated_at = now;
71 Ok(binding)
72 }
73
74 pub fn stop(
75 mut binding: WorkAttentionBinding,
76 expected_revision: u64,
77 now: DateTime<Utc>,
78 ) -> Result<WorkAttentionBinding, WorkGraphError> {
79 let input = attention_dsl::WorkAttentionLifecycleInput::Stop {
80 expected_revision,
81 at_utc_ms: datetime_to_millis(now),
82 };
83 binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
84 sync_attention_from_machine_state(&mut binding);
85 binding.updated_at = now;
86 Ok(binding)
87 }
88
89 pub fn classify_eligibility_at(
100 binding: &WorkAttentionBinding,
101 now: DateTime<Utc>,
102 ) -> Result<bool, WorkGraphError> {
103 let mut dsl_auth =
104 attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
105 binding.machine_state.clone(),
106 )
107 .map_err(|error| {
108 WorkGraphError::InvalidTransition(format!(
109 "attention binding {} refused eligibility recovery: {error:?}",
110 binding.binding_id
111 ))
112 })?;
113 let transition = attention_dsl::WorkAttentionLifecycleMachineMutator::apply(
114 &mut dsl_auth,
115 attention_dsl::WorkAttentionLifecycleInput::ClassifyAttentionEligibility {
116 now_utc_ms: datetime_to_millis(now),
117 },
118 )
119 .map_err(|error| {
120 WorkGraphError::InvalidTransition(format!(
121 "attention binding {} refused eligibility classification: {error:?}",
122 binding.binding_id
123 ))
124 })?;
125
126 let mut classified = None;
127 for effect in transition.effects() {
128 if let attention_dsl::WorkAttentionLifecycleEffect::AttentionEligibilityClassified {
129 eligible,
130 } = effect
131 && classified.replace(*eligible).is_some()
132 {
133 return Err(WorkGraphError::Store(format!(
134 "attention binding {} emitted multiple eligibility verdicts",
135 binding.binding_id
136 )));
137 }
138 }
139
140 classified.ok_or_else(|| {
141 WorkGraphError::Store(format!(
142 "attention binding {} emitted no eligibility verdict",
143 binding.binding_id
144 ))
145 })
146 }
147
148 pub fn classify_authority(
160 binding: &WorkAttentionBinding,
161 ) -> Result<ProjectedAttentionAuthority, WorkGraphError> {
162 let mode = attention_mode_to_dsl(binding.mode);
163 let delegated_authority = attention_delegated_authority_to_dsl(binding.delegated_authority);
164 let mut dsl_auth =
165 attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
166 binding.machine_state.clone(),
167 )
168 .map_err(|error| {
169 WorkGraphError::InvalidTransition(format!(
170 "attention binding {} refused authority recovery: {error:?}",
171 binding.binding_id
172 ))
173 })?;
174 let transition = attention_dsl::WorkAttentionLifecycleMachineMutator::apply(
175 &mut dsl_auth,
176 attention_dsl::WorkAttentionLifecycleInput::ClassifyAttentionAuthority {
177 mode,
178 delegated_authority,
179 },
180 )
181 .map_err(|error| {
182 WorkGraphError::InvalidTransition(format!(
183 "attention binding {} refused authority classification: {error:?}",
184 binding.binding_id
185 ))
186 })?;
187
188 let mut classified = None;
189 for effect in transition.effects() {
190 if let attention_dsl::WorkAttentionLifecycleEffect::AttentionAuthorityClassified {
191 can_get,
192 can_add_evidence,
193 can_release,
194 can_update,
195 can_block,
196 can_create,
197 can_link,
198 can_link_parent,
199 can_link_related,
200 can_link_derived_from,
201 can_close_own_review_item,
202 can_close_if_policy_allows,
203 } = effect
204 {
205 let verdict = ProjectedAttentionAuthority {
206 can_get: *can_get,
207 can_add_evidence: *can_add_evidence,
208 can_release: *can_release,
209 can_update: *can_update,
210 can_block: *can_block,
211 can_create: *can_create,
212 can_link: *can_link,
213 can_link_parent: *can_link_parent,
214 can_link_related: *can_link_related,
215 can_link_derived_from: *can_link_derived_from,
216 can_close_own_review_item: *can_close_own_review_item,
217 can_close_if_policy_allows: *can_close_if_policy_allows,
218 };
219 if classified.replace(verdict).is_some() {
220 return Err(WorkGraphError::Store(format!(
221 "attention binding {} emitted multiple authority verdicts",
222 binding.binding_id
223 )));
224 }
225 }
226 }
227
228 classified.ok_or_else(|| {
229 WorkGraphError::Store(format!(
230 "attention binding {} emitted no authority verdict",
231 binding.binding_id
232 ))
233 })
234 }
235}
236
237fn attention_mode_to_dsl(mode: WorkAttentionMode) -> attention_dsl::WorkAttentionMode {
238 match mode {
239 WorkAttentionMode::Pursue => attention_dsl::WorkAttentionMode::Pursue,
240 WorkAttentionMode::Coordinate => attention_dsl::WorkAttentionMode::Coordinate,
241 WorkAttentionMode::Review => attention_dsl::WorkAttentionMode::Review,
242 WorkAttentionMode::Falsify => attention_dsl::WorkAttentionMode::Falsify,
243 WorkAttentionMode::Judge => attention_dsl::WorkAttentionMode::Judge,
244 WorkAttentionMode::Observe => attention_dsl::WorkAttentionMode::Observe,
245 }
246}
247
248fn attention_delegated_authority_to_dsl(
249 authority: AttentionDelegatedAuthority,
250) -> attention_dsl::AttentionDelegatedAuthority {
251 match authority {
252 AttentionDelegatedAuthority::AddEvidence => {
253 attention_dsl::AttentionDelegatedAuthority::AddEvidence
254 }
255 AttentionDelegatedAuthority::CloseOwnReviewItem => {
256 attention_dsl::AttentionDelegatedAuthority::CloseOwnReviewItem
257 }
258 AttentionDelegatedAuthority::RequestClosure => {
259 attention_dsl::AttentionDelegatedAuthority::RequestClosure
260 }
261 AttentionDelegatedAuthority::CloseIfPolicyAllows => {
262 attention_dsl::AttentionDelegatedAuthority::CloseIfPolicyAllows
263 }
264 }
265}
266
267#[derive(Debug, Default, Clone, Copy)]
268pub struct WorkGraphMachine;
269
270impl WorkGraphMachine {
271 pub fn validate_item_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
281 validate_item_machine_projection(item)
282 }
283
284 pub fn public_error_class(
293 error: &WorkGraphError,
294 ) -> Result<WorkGraphPublicErrorClass, WorkGraphError> {
295 let kind = work_graph_error_kind(error);
296 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
297 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
298 &mut dsl_auth,
299 wg_dsl::WorkGraphLifecycleInput::ClassifyWorkGraphPublicError { kind },
300 )
301 .map_err(|transition_error| {
302 WorkGraphError::Store(format!(
303 "generated WorkGraph public error classification refused kind {kind:?}: {transition_error:?}"
304 ))
305 })?;
306
307 let mut classified = None;
308 for effect in transition.effects() {
309 if let wg_dsl::WorkGraphLifecycleEffect::WorkGraphPublicErrorClassified {
310 kind: emitted_kind,
311 public_class,
312 } = effect
313 {
314 if *emitted_kind != kind {
315 return Err(WorkGraphError::Store(format!(
316 "generated WorkGraph public error classification emitted kind {emitted_kind:?} while classifying {kind:?}"
317 )));
318 }
319 if classified.replace(*public_class).is_some() {
320 return Err(WorkGraphError::Store(format!(
321 "generated WorkGraph public error classification emitted multiple classes for kind {kind:?}"
322 )));
323 }
324 }
325 }
326
327 classified.ok_or_else(|| {
328 WorkGraphError::Store(format!(
329 "generated WorkGraph public error classification did not emit a class for kind {kind:?}"
330 ))
331 })
332 }
333
334 pub fn classify_public_confirmation_admission(
347 completion_policy: &crate::types::WorkCompletionPolicy,
348 ) -> Result<wg_dsl::WorkPublicConfirmationAdmissionKind, WorkGraphError> {
349 let policy = completion_policy.to_machine();
350 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
351 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
352 &mut dsl_auth,
353 wg_dsl::WorkGraphLifecycleInput::ClassifyPublicConfirmationAdmission {
354 completion_policy: policy,
355 },
356 )
357 .map_err(|error| {
358 WorkGraphError::InvalidInput(format!(
359 "WorkGraphLifecycle refused public-confirmation admission for {policy:?}: {error:?}"
360 ))
361 })?;
362
363 let mut admission = None;
364 for effect in transition.effects() {
365 if let wg_dsl::WorkGraphLifecycleEffect::PublicConfirmationAdmissionClassified {
366 admission: emitted,
367 } = effect
368 && admission.replace(*emitted).is_some()
369 {
370 return Err(WorkGraphError::Store(format!(
371 "WorkGraphLifecycle public-confirmation admission emitted multiple verdicts for {policy:?}"
372 )));
373 }
374 }
375
376 admission.ok_or_else(|| {
377 WorkGraphError::Store(format!(
378 "WorkGraphLifecycle public-confirmation admission emitted no verdict for {policy:?}"
379 ))
380 })
381 }
382
383 pub fn classify_create_completion_policy_admission(
395 completion_policy: &crate::types::WorkCompletionPolicy,
396 ) -> Result<wg_dsl::WorkCreateCompletionPolicyAdmissionKind, WorkGraphError> {
397 let policy = completion_policy.to_machine();
398 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
399 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
400 &mut dsl_auth,
401 wg_dsl::WorkGraphLifecycleInput::ClassifyCreateCompletionPolicyAdmission {
402 completion_policy: policy,
403 },
404 )
405 .map_err(|error| {
406 WorkGraphError::InvalidInput(format!(
407 "WorkGraphLifecycle refused create completion-policy admission for {policy:?}: {error:?}"
408 ))
409 })?;
410
411 let mut admission = None;
412 for effect in transition.effects() {
413 if let wg_dsl::WorkGraphLifecycleEffect::CreateCompletionPolicyAdmissionClassified {
414 admission: emitted,
415 } = effect
416 && admission.replace(*emitted).is_some()
417 {
418 return Err(WorkGraphError::Store(format!(
419 "WorkGraphLifecycle create completion-policy admission emitted multiple verdicts for {policy:?}"
420 )));
421 }
422 }
423
424 admission.ok_or_else(|| {
425 WorkGraphError::Store(format!(
426 "WorkGraphLifecycle create completion-policy admission emitted no verdict for {policy:?}"
427 ))
428 })
429 }
430
431 pub fn classify_completion_policy_mutation_admission(
445 item: &WorkItem,
446 requested: &crate::types::WorkCompletionPolicy,
447 ) -> Result<wg_dsl::WorkCompletionPolicyMutationAdmissionKind, WorkGraphError> {
448 validate_item_machine_projection(item)?;
449 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
450 item.machine_state.clone(),
451 )
452 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
453 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
454 &mut dsl_auth,
455 wg_dsl::WorkGraphLifecycleInput::ClassifyCompletionPolicyMutationAdmission {
456 requested_completion_policy: requested.to_machine(),
457 requested_completion_supervisor_owner_key: requested.supervisor_owner_key(),
458 requested_completion_reviewer_quorum_threshold: requested
459 .reviewer_quorum_threshold(),
460 },
461 )
462 .map_err(|error| {
463 WorkGraphError::InvalidTransition(format!(
464 "work item {} refused completion-policy mutation classification: {error:?}",
465 item.id
466 ))
467 })?;
468
469 let mut admission = None;
470 for effect in transition.effects() {
471 if let wg_dsl::WorkGraphLifecycleEffect::CompletionPolicyMutationAdmissionClassified {
472 admission: emitted,
473 } = effect
474 && admission.replace(*emitted).is_some()
475 {
476 return Err(WorkGraphError::Store(format!(
477 "work item {} emitted multiple completion-policy mutation verdicts",
478 item.id
479 )));
480 }
481 }
482
483 admission.ok_or_else(|| {
484 WorkGraphError::Store(format!(
485 "work item {} emitted no completion-policy mutation verdict",
486 item.id
487 ))
488 })
489 }
490
491 pub fn classify_confirmation_admission(
506 completion_policy: &crate::types::WorkCompletionPolicy,
507 requested_principal: Option<&crate::types::WorkOwnerKey>,
508 supplied_evidence_kind: wg_dsl::WorkConfirmationEvidenceObservation,
509 ) -> Result<wg_dsl::WorkConfirmationAdmissionKind, WorkGraphError> {
510 let policy = completion_policy.to_machine();
511 let requested_principal_owner_key =
512 requested_principal.map(crate::types::work_owner_key_to_machine);
513 let requested_principal_kind = requested_principal
514 .map(|principal| crate::types::work_owner_kind_to_machine(principal.kind));
515 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
516 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
517 &mut dsl_auth,
518 wg_dsl::WorkGraphLifecycleInput::ClassifyConfirmationAdmission {
519 completion_policy: policy,
520 completion_supervisor_owner_key: completion_policy.supervisor_owner_key(),
521 requested_principal_owner_key,
522 requested_principal_kind,
523 supplied_evidence_kind,
524 },
525 )
526 .map_err(|error| {
527 WorkGraphError::Store(format!(
528 "WorkGraphLifecycle refused confirmation admission for {policy:?}: {error:?}"
529 ))
530 })?;
531
532 let mut admission = None;
533 for effect in transition.effects() {
534 if let wg_dsl::WorkGraphLifecycleEffect::ConfirmationAdmissionClassified {
535 admission: emitted,
536 } = effect
537 && admission.replace(*emitted).is_some()
538 {
539 return Err(WorkGraphError::Store(format!(
540 "WorkGraphLifecycle confirmation admission emitted multiple verdicts for {policy:?}"
541 )));
542 }
543 }
544
545 admission.ok_or_else(|| {
546 WorkGraphError::Store(format!(
547 "WorkGraphLifecycle confirmation admission emitted no verdict for {policy:?}"
548 ))
549 })
550 }
551
552 pub fn classify_terminality(item: &WorkItem) -> Result<bool, WorkGraphError> {
561 validate_item_machine_projection(item)?;
562 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
563 item.machine_state.clone(),
564 )
565 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
566 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
567 &mut dsl_auth,
568 wg_dsl::WorkGraphLifecycleInput::ClassifyTerminality {},
569 )
570 .map_err(|error| {
571 WorkGraphError::InvalidTransition(format!(
572 "work item {} refused terminality classification: {error:?}",
573 item.id
574 ))
575 })?;
576
577 let mut classified = None;
578 for effect in transition.effects() {
579 if let wg_dsl::WorkGraphLifecycleEffect::WorkItemTerminalityClassified { terminal } =
580 effect
581 && classified.replace(*terminal).is_some()
582 {
583 return Err(WorkGraphError::Store(format!(
584 "work item {} emitted multiple terminality verdicts",
585 item.id
586 )));
587 }
588 }
589
590 classified.ok_or_else(|| {
591 WorkGraphError::Store(format!(
592 "work item {} emitted no terminality verdict",
593 item.id
594 ))
595 })
596 }
597
598 pub fn classify_blocker_satisfied(
609 gated_item: &WorkItem,
610 blocker: Option<&WorkItem>,
611 ) -> Result<bool, WorkGraphError> {
612 validate_item_machine_projection(gated_item)?;
613 let blocker_lifecycle_phase = match blocker {
614 Some(blocker) => blocker.machine_state.lifecycle_phase,
615 None => wg_dsl::WorkLifecycleState::Absent,
616 };
617 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
618 gated_item.machine_state.clone(),
619 )
620 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
621 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
622 &mut dsl_auth,
623 wg_dsl::WorkGraphLifecycleInput::ClassifyBlockerSatisfied {
624 blocker_present: blocker.is_some(),
625 blocker_lifecycle_phase,
626 },
627 )
628 .map_err(|error| {
629 WorkGraphError::InvalidTransition(format!(
630 "work item {} refused blocker satisfaction classification: {error:?}",
631 gated_item.id
632 ))
633 })?;
634
635 let mut classified = None;
636 for effect in transition.effects() {
637 if let wg_dsl::WorkGraphLifecycleEffect::BlockerSatisfactionClassified { satisfied } =
638 effect
639 && classified.replace(*satisfied).is_some()
640 {
641 return Err(WorkGraphError::Store(format!(
642 "work item {} emitted multiple blocker satisfaction verdicts",
643 gated_item.id
644 )));
645 }
646 }
647
648 classified.ok_or_else(|| {
649 WorkGraphError::Store(format!(
650 "work item {} emitted no blocker satisfaction verdict",
651 gated_item.id
652 ))
653 })
654 }
655
656 pub fn create_item(
657 request: CreateWorkItemRequest,
658 realm_id: String,
659 namespace: WorkNamespace,
660 now: DateTime<Utc>,
661 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
662 let title = validate_title(request.title)?;
663 let status = request.status.unwrap_or_default();
664 let input = match classify_create_status_admission(status)? {
671 wg_dsl::WorkCreateStatusAdmissionKind::AdmittedOpen => {
672 wg_dsl::WorkGraphLifecycleInput::CreateOpen {
673 due_at_utc_ms: request.due_at.map(datetime_to_millis),
674 not_before_utc_ms: request.not_before.map(datetime_to_millis),
675 snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
676 completion_policy: request.completion_policy.clone().to_machine(),
677 completion_supervisor_owner_key: request
678 .completion_policy
679 .supervisor_owner_key(),
680 completion_reviewer_quorum_threshold: request
681 .completion_policy
682 .reviewer_quorum_threshold(),
683 unresolved_blocker_count: 0,
684 }
685 }
686 wg_dsl::WorkCreateStatusAdmissionKind::AdmittedBlocked => {
687 wg_dsl::WorkGraphLifecycleInput::CreateBlocked {
688 due_at_utc_ms: request.due_at.map(datetime_to_millis),
689 not_before_utc_ms: request.not_before.map(datetime_to_millis),
690 snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
691 completion_policy: request.completion_policy.clone().to_machine(),
692 completion_supervisor_owner_key: request
693 .completion_policy
694 .supervisor_owner_key(),
695 completion_reviewer_quorum_threshold: request
696 .completion_policy
697 .reviewer_quorum_threshold(),
698 unresolved_blocker_count: 0,
699 }
700 }
701 wg_dsl::WorkCreateStatusAdmissionKind::Denied => {
702 return Err(WorkGraphError::InvalidTransition(
703 "new work items may only start open or blocked".to_string(),
704 ));
705 }
706 };
707 let dsl_state = apply_new_item_dsl(input)?;
708 let mut item = WorkItem {
709 id: WorkItemId::generated(),
710 realm_id,
711 namespace,
712 title,
713 description: request.description,
714 status: work_status_from_dsl(dsl_state.lifecycle_phase)?,
715 completion_policy: request.completion_policy,
716 priority: request.priority,
717 labels: normalize_labels(request.labels)?,
718 owner: None,
719 claim: None,
720 machine_state: dsl_state.clone(),
721 revision: dsl_state.revision,
722 due_at: dsl_state.due_at_utc_ms.and_then(millis_to_datetime),
723 not_before: dsl_state.not_before_utc_ms.and_then(millis_to_datetime),
724 snoozed_until: dsl_state.snoozed_until_utc_ms.and_then(millis_to_datetime),
725 created_at: now,
726 updated_at: now,
727 terminal_at: dsl_state.terminal_at_utc_ms.and_then(millis_to_datetime),
728 external_refs: request.external_refs,
729 evidence_refs: request.evidence_refs,
730 };
731 sync_item_from_machine_state(&mut item)?;
732 let event = item_event(&item, WorkGraphEventKind::Created, now)?;
733 Ok((item, event))
734 }
735
736 pub fn update_item(
737 mut item: WorkItem,
738 request: UpdateWorkItemRequest,
739 now: DateTime<Utc>,
740 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
741 let due_at = request.due_at.or(item.due_at);
742 let not_before = request.not_before.or(item.not_before);
743 let snoozed_until = request.snoozed_until.or(item.snoozed_until);
744 let completion_policy = request
745 .completion_policy
746 .unwrap_or_else(|| item.completion_policy.clone());
747 let dsl_state = apply_item_dsl(
748 &item,
749 item.machine_state.unresolved_blocker_count,
750 wg_dsl::WorkGraphLifecycleInput::Update {
751 expected_revision: request.expected_revision,
752 due_at_utc_ms: due_at.map(datetime_to_millis),
753 not_before_utc_ms: not_before.map(datetime_to_millis),
754 snoozed_until_utc_ms: snoozed_until.map(datetime_to_millis),
755 completion_policy: completion_policy.clone().to_machine(),
756 completion_supervisor_owner_key: completion_policy.supervisor_owner_key(),
757 completion_reviewer_quorum_threshold: completion_policy.reviewer_quorum_threshold(),
758 unresolved_blocker_count: item.machine_state.unresolved_blocker_count,
759 },
760 Some(request.expected_revision),
761 )?;
762
763 if let Some(title) = request.title {
764 item.title = validate_title(title)?;
765 }
766 if let Some(description) = request.description {
767 item.description = Some(description);
768 }
769 if let Some(priority) = request.priority {
770 item.priority = priority;
771 }
772 if let Some(labels) = request.labels {
773 item.labels = normalize_labels(labels)?;
774 }
775 item.machine_state = dsl_state;
776 sync_item_from_machine_state(&mut item)?;
777 if !request.external_refs.is_empty() {
778 item.external_refs = request.external_refs;
779 }
780 item.updated_at = now;
781 let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
782 Ok((item, event))
783 }
784
785 pub fn claim_item(
786 item: WorkItem,
787 request: ClaimWorkItemRequest,
788 now: DateTime<Utc>,
789 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
790 Self::claim_ready_item(item, request, now)
791 }
792
793 pub fn claim_ready_item(
794 item: WorkItem,
795 request: ClaimWorkItemRequest,
796 now: DateTime<Utc>,
797 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
798 Self::claim_item_with_unresolved_blockers(
799 item.clone(),
800 item.machine_state.unresolved_blocker_count,
801 request,
802 now,
803 )
804 }
805
806 pub fn refresh_eligibility(
807 mut item: WorkItem,
808 unresolved_blocker_count: u64,
809 now: DateTime<Utc>,
810 ) -> Result<Option<(WorkItem, WorkGraphEvent)>, WorkGraphError> {
811 if item.machine_state.unresolved_blocker_count == unresolved_blocker_count {
812 return Ok(None);
813 }
814 let dsl_state = apply_item_dsl(
815 &item,
816 unresolved_blocker_count,
817 wg_dsl::WorkGraphLifecycleInput::RefreshEligibility {
818 unresolved_blocker_count,
819 },
820 None,
821 )?;
822 item.machine_state = dsl_state;
823 sync_item_from_machine_state(&mut item)?;
824 item.updated_at = now;
825 let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
826 Ok(Some((item, event)))
827 }
828
829 pub(crate) fn claim_item_with_unresolved_blockers(
830 mut item: WorkItem,
831 unresolved_blocker_count: u64,
832 request: ClaimWorkItemRequest,
833 now: DateTime<Utc>,
834 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
835 let lease_expires_at = request.lease_expires_at.or_else(|| {
836 request
837 .lease_seconds
838 .map(|seconds| now + seconds_to_duration(seconds))
839 });
840 let owner_key = work_owner_key(&request.owner)?;
841 let dsl_state = apply_item_dsl(
842 &item,
843 unresolved_blocker_count,
844 wg_dsl::WorkGraphLifecycleInput::Claim {
845 expected_revision: request.expected_revision,
846 owner_key,
847 now_utc_ms: datetime_to_millis(now),
848 lease_expires_at_utc_ms: lease_expires_at.map(datetime_to_millis),
849 },
850 Some(request.expected_revision),
851 )?;
852 item.owner = Some(request.owner.clone());
853 item.claim = Some(WorkClaim {
854 owner: request.owner,
855 claimed_at: now,
856 lease_expires_at,
857 });
858 item.machine_state = dsl_state;
859 sync_item_from_machine_state(&mut item)?;
860 item.updated_at = now;
861 let event = item_event(&item, WorkGraphEventKind::Claimed, now)?;
862 Ok((item, event))
863 }
864
865 pub fn release_item(
866 mut item: WorkItem,
867 request: ReleaseWorkItemRequest,
868 now: DateTime<Utc>,
869 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
870 let dsl_state = apply_item_dsl(
871 &item,
872 item.machine_state.unresolved_blocker_count,
873 wg_dsl::WorkGraphLifecycleInput::Release {
874 expected_revision: request.expected_revision,
875 },
876 Some(request.expected_revision),
877 )?;
878 item.claim = None;
879 item.owner = None;
880 item.machine_state = dsl_state;
881 sync_item_from_machine_state(&mut item)?;
882 item.updated_at = now;
883 let event = item_event(&item, WorkGraphEventKind::Released, now)?;
884 Ok((item, event))
885 }
886
887 pub fn block_item(
888 mut item: WorkItem,
889 expected_revision: u64,
890 now: DateTime<Utc>,
891 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
892 let dsl_state = apply_item_dsl(
893 &item,
894 item.machine_state.unresolved_blocker_count,
895 wg_dsl::WorkGraphLifecycleInput::Block { expected_revision },
896 Some(expected_revision),
897 )?;
898 item.claim = None;
899 item.owner = None;
900 item.machine_state = dsl_state;
901 sync_item_from_machine_state(&mut item)?;
902 item.updated_at = now;
903 let event = item_event(&item, WorkGraphEventKind::Blocked, now)?;
904 Ok((item, event))
905 }
906
907 pub fn close_item(
908 mut item: WorkItem,
909 request: CloseWorkItemRequest,
910 now: DateTime<Utc>,
911 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
912 let dsl_input = match classify_close_status_admission(request.status)? {
920 wg_dsl::WorkCloseStatusAdmissionKind::AdmittedCompleted => {
921 wg_dsl::WorkGraphLifecycleInput::CloseCompleted {
922 expected_revision: request.expected_revision,
923 at_utc_ms: datetime_to_millis(now),
924 }
925 }
926 wg_dsl::WorkCloseStatusAdmissionKind::AdmittedCancelled => {
927 wg_dsl::WorkGraphLifecycleInput::CloseCancelled {
928 expected_revision: request.expected_revision,
929 at_utc_ms: datetime_to_millis(now),
930 }
931 }
932 wg_dsl::WorkCloseStatusAdmissionKind::AdmittedFailed => {
933 wg_dsl::WorkGraphLifecycleInput::CloseFailed {
934 expected_revision: request.expected_revision,
935 at_utc_ms: datetime_to_millis(now),
936 }
937 }
938 wg_dsl::WorkCloseStatusAdmissionKind::DeniedNonTerminal => {
939 return Err(WorkGraphError::InvalidTransition(
940 "close requires a terminal status".to_string(),
941 ));
942 }
943 };
944 let dsl_state = apply_item_dsl(
945 &item,
946 item.machine_state.unresolved_blocker_count,
947 dsl_input,
948 Some(request.expected_revision),
949 )?;
950 item.claim = None;
951 item.owner = None;
952 item.machine_state = dsl_state;
953 sync_item_from_machine_state(&mut item)?;
954 item.updated_at = now;
955 let event = item_event(&item, WorkGraphEventKind::Closed, now)?;
956 Ok((item, event))
957 }
958
959 pub fn add_evidence(
960 mut item: WorkItem,
961 request: AddEvidenceRequest,
962 now: DateTime<Utc>,
963 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
964 let evidence_kind = request
965 .evidence
966 .confirmation_kind
967 .unwrap_or(crate::types::WorkEvidenceKind::SelfAttest)
968 .to_machine();
969 let confirming_owner_key = request
970 .evidence
971 .confirming_owner_key
972 .as_ref()
973 .map(crate::types::work_owner_key_to_machine);
974 let dsl_state = apply_item_dsl(
975 &item,
976 item.machine_state.unresolved_blocker_count,
977 wg_dsl::WorkGraphLifecycleInput::AddEvidence {
978 expected_revision: request.expected_revision,
979 evidence_kind,
980 confirming_owner_key,
981 },
982 Some(request.expected_revision),
983 )?;
984 item.evidence_refs.push(request.evidence);
985 item.machine_state = dsl_state;
986 sync_item_from_machine_state(&mut item)?;
987 item.updated_at = now;
988 let event = item_event(&item, WorkGraphEventKind::EvidenceAdded, now)?;
989 Ok((item, event))
990 }
991
992 pub fn classify_readiness(item: &WorkItem, now: DateTime<Utc>) -> Result<bool, WorkGraphError> {
1002 validate_item_machine_projection(item)?;
1003 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
1004 item.machine_state.clone(),
1005 )
1006 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1007 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1008 &mut dsl_auth,
1009 wg_dsl::WorkGraphLifecycleInput::ClassifyReadiness {
1010 now_utc_ms: datetime_to_millis(now),
1011 },
1012 )
1013 .map_err(|error| {
1014 WorkGraphError::InvalidTransition(format!(
1015 "work item {} refused readiness classification: {error:?}",
1016 item.id
1017 ))
1018 })?;
1019
1020 let mut classified = None;
1021 for effect in transition.effects() {
1022 if let wg_dsl::WorkGraphLifecycleEffect::WorkItemReadinessClassified { ready } = effect
1023 && classified.replace(*ready).is_some()
1024 {
1025 return Err(WorkGraphError::Store(format!(
1026 "work item {} emitted multiple readiness verdicts",
1027 item.id
1028 )));
1029 }
1030 }
1031
1032 classified.ok_or_else(|| {
1033 WorkGraphError::Store(format!(
1034 "work item {} emitted no readiness verdict",
1035 item.id
1036 ))
1037 })
1038 }
1039
1040 pub fn is_ready(item: &WorkItem, now: DateTime<Utc>) -> bool {
1049 Self::classify_readiness(item, now).unwrap_or(false)
1050 }
1051
1052 pub fn ready_items(items: Vec<WorkItem>, now: DateTime<Utc>) -> Vec<WorkItem> {
1053 items
1054 .into_iter()
1055 .filter(|item| Self::is_ready(item, now))
1056 .collect()
1057 }
1058
1059 pub fn validate_link(
1060 edge: &WorkEdge,
1061 existing_items: &[WorkItem],
1062 existing_edges: &[WorkEdge],
1063 ) -> Result<(), WorkGraphError> {
1064 let topology_state = topology_state(existing_items, existing_edges);
1065 apply_link_validation_dsl(
1066 topology_state,
1067 wg_dsl::WorkGraphLifecycleInput::ValidateLink {
1068 kind: dsl_edge_kind(edge.kind),
1069 from_item_key: work_item_key(&edge.from_id),
1070 to_item_key: work_item_key(&edge.to_id),
1071 edge_key: work_edge_key(edge.kind, &edge.from_id, &edge.to_id),
1072 reverse_path_key: dependency_path_key(edge.kind, &edge.to_id, &edge.from_id),
1073 },
1074 )?;
1075 Ok(())
1076 }
1077}
1078
1079fn work_graph_error_kind(error: &WorkGraphError) -> wg_dsl::WorkGraphErrorKind {
1084 match error {
1085 WorkGraphError::NotFound { .. } => wg_dsl::WorkGraphErrorKind::NotFound,
1086 WorkGraphError::AttentionNotFound { .. } => wg_dsl::WorkGraphErrorKind::AttentionNotFound,
1087 WorkGraphError::StaleRevision { .. } => wg_dsl::WorkGraphErrorKind::StaleRevision,
1088 WorkGraphError::Conflict(_) => wg_dsl::WorkGraphErrorKind::Conflict,
1089 WorkGraphError::InvalidTransition(_) => wg_dsl::WorkGraphErrorKind::InvalidTransition,
1090 WorkGraphError::InvalidInput(_) => wg_dsl::WorkGraphErrorKind::InvalidInput,
1091 WorkGraphError::InvalidTimestampMillis { .. } => {
1092 wg_dsl::WorkGraphErrorKind::InvalidTimestampMillis
1093 }
1094 WorkGraphError::Store(_) => wg_dsl::WorkGraphErrorKind::Store,
1095 WorkGraphError::UnsupportedBackend(_) => wg_dsl::WorkGraphErrorKind::UnsupportedBackend,
1096 }
1097}
1098
1099pub(crate) fn completion_policy_name(policy: &WorkCompletionPolicy) -> &'static str {
1100 match policy {
1101 WorkCompletionPolicy::SelfAttest => "self_attest",
1102 WorkCompletionPolicy::HostConfirmed => "host_confirmed",
1103 WorkCompletionPolicy::PrincipalConfirmed => "principal_confirmed",
1104 WorkCompletionPolicy::Supervisor { .. } => "supervisor",
1105 WorkCompletionPolicy::ReviewerQuorum { .. } => "reviewer_quorum",
1106 }
1107}
1108
1109fn validate_title(title: String) -> Result<String, WorkGraphError> {
1110 let title = title.trim();
1111 if title.is_empty() {
1112 return Err(WorkGraphError::InvalidInput(
1113 "work item title must not be empty".to_string(),
1114 ));
1115 }
1116 Ok(title.to_string())
1117}
1118
1119fn normalize_labels(labels: BTreeSet<String>) -> Result<BTreeSet<String>, WorkGraphError> {
1120 let mut normalized = BTreeSet::new();
1121 for label in labels {
1122 let label = label.trim();
1123 if label.is_empty() {
1124 return Err(WorkGraphError::InvalidInput(
1125 "work item labels must not be empty".to_string(),
1126 ));
1127 }
1128 normalized.insert(label.to_string());
1129 }
1130 Ok(normalized)
1131}
1132
1133fn apply_new_item_dsl(
1134 input: wg_dsl::WorkGraphLifecycleInput,
1135) -> Result<wg_dsl::WorkGraphLifecycleMachineState, WorkGraphError> {
1136 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1137 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
1138 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1139 Ok(dsl_auth.state().clone())
1140}
1141
1142fn classify_create_status_admission(
1153 status: WorkStatus,
1154) -> Result<wg_dsl::WorkCreateStatusAdmissionKind, WorkGraphError> {
1155 let requested_status = crate::types::work_lifecycle_state_from_status(status);
1156 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1157 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1158 &mut dsl_auth,
1159 wg_dsl::WorkGraphLifecycleInput::ClassifyCreateStatusAdmission { requested_status },
1160 )
1161 .map_err(|error| {
1162 WorkGraphError::InvalidTransition(format!(
1163 "WorkGraphLifecycle refused create-status admission for {requested_status:?}: {error:?}"
1164 ))
1165 })?;
1166
1167 let mut admission = None;
1168 for effect in transition.effects() {
1169 if let wg_dsl::WorkGraphLifecycleEffect::CreateStatusAdmissionClassified {
1170 admission: emitted,
1171 } = effect
1172 && admission.replace(*emitted).is_some()
1173 {
1174 return Err(WorkGraphError::Store(format!(
1175 "WorkGraphLifecycle create-status admission emitted multiple verdicts for {requested_status:?}"
1176 )));
1177 }
1178 }
1179
1180 admission.ok_or_else(|| {
1181 WorkGraphError::Store(format!(
1182 "WorkGraphLifecycle create-status admission emitted no verdict for {requested_status:?}"
1183 ))
1184 })
1185}
1186
1187fn classify_close_status_admission(
1198 status: WorkStatus,
1199) -> Result<wg_dsl::WorkCloseStatusAdmissionKind, WorkGraphError> {
1200 let requested_status = crate::types::work_lifecycle_state_from_status(status);
1201 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1202 let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1203 &mut dsl_auth,
1204 wg_dsl::WorkGraphLifecycleInput::ClassifyCloseStatusAdmission { requested_status },
1205 )
1206 .map_err(|error| {
1207 WorkGraphError::InvalidTransition(format!(
1208 "WorkGraphLifecycle refused close-status admission for {requested_status:?}: {error:?}"
1209 ))
1210 })?;
1211
1212 let mut admission = None;
1213 for effect in transition.effects() {
1214 if let wg_dsl::WorkGraphLifecycleEffect::CloseStatusAdmissionClassified {
1215 admission: emitted,
1216 } = effect
1217 && admission.replace(*emitted).is_some()
1218 {
1219 return Err(WorkGraphError::Store(format!(
1220 "WorkGraphLifecycle close-status admission emitted multiple verdicts for {requested_status:?}"
1221 )));
1222 }
1223 }
1224
1225 admission.ok_or_else(|| {
1226 WorkGraphError::Store(format!(
1227 "WorkGraphLifecycle close-status admission emitted no verdict for {requested_status:?}"
1228 ))
1229 })
1230}
1231
1232fn apply_link_validation_dsl(
1233 state: WorkGraphMachineState,
1234 input: wg_dsl::WorkGraphLifecycleInput,
1235) -> Result<(), WorkGraphError> {
1236 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(state)
1237 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1238 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
1239 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1240 Ok(())
1241}
1242
1243fn apply_attention_dsl(
1244 binding: &WorkAttentionBinding,
1245 input: attention_dsl::WorkAttentionLifecycleInput,
1246 expected_revision: Option<u64>,
1247) -> Result<attention_dsl::WorkAttentionLifecycleMachineState, WorkGraphError> {
1248 let mut dsl_auth = attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
1249 binding.machine_state.clone(),
1250 )
1251 .map_err(|error| {
1252 WorkGraphError::InvalidTransition(format!(
1253 "attention binding {} refused recovery: {error:?}",
1254 binding.binding_id
1255 ))
1256 })?;
1257 attention_dsl::WorkAttentionLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(
1258 |error| {
1259 if let Some(expected) = expected_revision
1260 && binding.machine_state.revision != expected
1261 {
1262 return WorkGraphError::StaleRevision {
1263 id: binding.work_ref.item_id.clone(),
1264 expected,
1265 actual: binding.machine_state.revision,
1266 };
1267 }
1268 WorkGraphError::InvalidTransition(format!(
1269 "attention binding {} refused transition: {error:?}",
1270 binding.binding_id
1271 ))
1272 },
1273 )?;
1274 Ok(dsl_auth.state().clone())
1275}
1276
1277fn apply_item_dsl(
1278 item: &WorkItem,
1279 unresolved_blocker_count: u64,
1280 input: wg_dsl::WorkGraphLifecycleInput,
1281 expected_revision: Option<u64>,
1282) -> Result<WorkGraphMachineState, WorkGraphError> {
1283 validate_item_machine_projection(item)?;
1284 let mut state = item.machine_state.clone();
1285 state.unresolved_blocker_count = unresolved_blocker_count;
1286 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(state)
1287 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1288 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(|error| {
1289 if let Some(expected) = expected_revision
1290 && item.revision != expected
1291 {
1292 return WorkGraphError::StaleRevision {
1293 id: item.id.clone(),
1294 expected,
1295 actual: item.revision,
1296 };
1297 }
1298 WorkGraphError::InvalidTransition(format!("{error:?}"))
1299 })?;
1300 Ok(dsl_auth.state().clone())
1301}
1302
1303fn sync_attention_from_machine_state(binding: &mut WorkAttentionBinding) {
1304 binding.status = match binding.machine_state.lifecycle_phase {
1305 attention_dsl::WorkAttentionLifecycleState::Active => WorkAttentionStatus::Active,
1306 attention_dsl::WorkAttentionLifecycleState::Paused => WorkAttentionStatus::Paused {
1307 until: binding
1308 .machine_state
1309 .paused_until_utc_ms
1310 .and_then(millis_to_datetime),
1311 },
1312 attention_dsl::WorkAttentionLifecycleState::Superseded => WorkAttentionStatus::Superseded,
1313 attention_dsl::WorkAttentionLifecycleState::Stopped => WorkAttentionStatus::Stopped,
1314 };
1315}
1316
1317fn work_status_from_dsl(status: wg_dsl::WorkLifecycleState) -> Result<WorkStatus, WorkGraphError> {
1318 match status {
1319 wg_dsl::WorkLifecycleState::Open => Ok(WorkStatus::Open),
1320 wg_dsl::WorkLifecycleState::InProgress => Ok(WorkStatus::InProgress),
1321 wg_dsl::WorkLifecycleState::Blocked => Ok(WorkStatus::Blocked),
1322 wg_dsl::WorkLifecycleState::Completed => Ok(WorkStatus::Completed),
1323 wg_dsl::WorkLifecycleState::Cancelled => Ok(WorkStatus::Cancelled),
1324 wg_dsl::WorkLifecycleState::Failed => Ok(WorkStatus::Failed),
1325 wg_dsl::WorkLifecycleState::Absent => Err(WorkGraphError::InvalidTransition(
1326 "work item lifecycle state is absent".to_string(),
1327 )),
1328 }
1329}
1330
1331fn sync_item_from_machine_state(item: &mut WorkItem) -> Result<(), WorkGraphError> {
1332 item.status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
1333 item.revision = item.machine_state.revision;
1334 item.due_at = item
1335 .machine_state
1336 .due_at_utc_ms
1337 .and_then(millis_to_datetime);
1338 item.not_before = item
1339 .machine_state
1340 .not_before_utc_ms
1341 .and_then(millis_to_datetime);
1342 item.snoozed_until = item
1343 .machine_state
1344 .snoozed_until_utc_ms
1345 .and_then(millis_to_datetime);
1346 item.completion_policy = crate::types::WorkCompletionPolicy::from_machine(
1347 item.machine_state.completion_policy,
1348 item.machine_state.completion_supervisor_owner_key.clone(),
1349 item.machine_state.completion_reviewer_quorum_threshold,
1350 );
1351 item.terminal_at = item
1352 .machine_state
1353 .terminal_at_utc_ms
1354 .and_then(millis_to_datetime);
1355 Ok(())
1356}
1357
1358fn validate_item_machine_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
1359 let status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
1360 if item.status != status {
1361 return Err(WorkGraphError::Store(format!(
1362 "work item {} status projection {:?} does not match machine state {:?}",
1363 item.id, item.status, status
1364 )));
1365 }
1366 if item.revision != item.machine_state.revision {
1367 return Err(WorkGraphError::Store(format!(
1368 "work item {} revision projection {} does not match machine state {}",
1369 item.id, item.revision, item.machine_state.revision
1370 )));
1371 }
1372 if item.due_at.map(datetime_to_millis) != item.machine_state.due_at_utc_ms {
1373 return Err(WorkGraphError::Store(format!(
1374 "work item {} due_at projection does not match machine state",
1375 item.id
1376 )));
1377 }
1378 if item.not_before.map(datetime_to_millis) != item.machine_state.not_before_utc_ms {
1379 return Err(WorkGraphError::Store(format!(
1380 "work item {} not_before projection does not match machine state",
1381 item.id
1382 )));
1383 }
1384 if item.snoozed_until.map(datetime_to_millis) != item.machine_state.snoozed_until_utc_ms {
1385 return Err(WorkGraphError::Store(format!(
1386 "work item {} snoozed_until projection does not match machine state",
1387 item.id
1388 )));
1389 }
1390 if item.completion_policy
1391 != crate::types::WorkCompletionPolicy::from_machine(
1392 item.machine_state.completion_policy,
1393 item.machine_state.completion_supervisor_owner_key.clone(),
1394 item.machine_state.completion_reviewer_quorum_threshold,
1395 )
1396 {
1397 return Err(WorkGraphError::Store(format!(
1398 "work item {} completion_policy projection does not match machine state",
1399 item.id
1400 )));
1401 }
1402 if item.terminal_at.map(datetime_to_millis) != item.machine_state.terminal_at_utc_ms {
1403 return Err(WorkGraphError::Store(format!(
1404 "work item {} terminal_at projection does not match machine state",
1405 item.id
1406 )));
1407 }
1408 if let Some(claim) = &item.claim {
1409 let claim_owner_key = work_owner_key(&claim.owner)?;
1410 if item.machine_state.claim_owner_key.as_ref() != Some(&claim_owner_key) {
1411 return Err(WorkGraphError::Store(format!(
1412 "work item {} claim owner projection does not match machine state",
1413 item.id
1414 )));
1415 }
1416 if item.machine_state.claimed_at_utc_ms != Some(datetime_to_millis(claim.claimed_at)) {
1417 return Err(WorkGraphError::Store(format!(
1418 "work item {} claim time projection does not match machine state",
1419 item.id
1420 )));
1421 }
1422 if item.machine_state.lease_expires_at_utc_ms
1423 != claim.lease_expires_at.map(datetime_to_millis)
1424 {
1425 return Err(WorkGraphError::Store(format!(
1426 "work item {} claim lease projection does not match machine state",
1427 item.id
1428 )));
1429 }
1430 } else if item.machine_state.claim_owner_key.is_some()
1431 || item.machine_state.claimed_at_utc_ms.is_some()
1432 || item.machine_state.lease_expires_at_utc_ms.is_some()
1433 {
1434 return Err(WorkGraphError::Store(format!(
1435 "work item {} machine state has a claim without a claim projection",
1436 item.id
1437 )));
1438 }
1439 Ok(())
1440}
1441
1442fn work_owner_key(owner: &crate::types::WorkOwner) -> Result<wg_dsl::WorkOwnerKey, WorkGraphError> {
1443 let kind = match owner.key.kind {
1444 crate::types::WorkOwnerKind::Principal => wg_dsl::WorkOwnerKind::Principal,
1445 crate::types::WorkOwnerKind::Agent => wg_dsl::WorkOwnerKind::Agent,
1446 crate::types::WorkOwnerKind::Session => wg_dsl::WorkOwnerKind::Session,
1447 crate::types::WorkOwnerKind::Mob => wg_dsl::WorkOwnerKind::Mob,
1448 crate::types::WorkOwnerKind::Label => wg_dsl::WorkOwnerKind::Label,
1449 };
1450 Ok(wg_dsl::WorkOwnerKey {
1451 kind,
1452 id: owner.key.id.clone(),
1453 })
1454}
1455
1456fn topology_state(
1457 existing_items: &[WorkItem],
1458 existing_edges: &[WorkEdge],
1459) -> WorkGraphMachineState {
1460 WorkGraphMachineState {
1461 topology_item_keys: existing_items
1462 .iter()
1463 .map(|item| work_item_key(&item.id))
1464 .collect(),
1465 topology_edge_keys: existing_edges
1466 .iter()
1467 .map(|edge| work_edge_key(edge.kind, &edge.from_id, &edge.to_id))
1468 .collect(),
1469 blocks_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Blocks),
1470 parent_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Parent),
1471 ..Default::default()
1472 }
1473}
1474
1475fn dependency_reachability(
1476 edges: &[WorkEdge],
1477 kind: WorkEdgeKind,
1478) -> BTreeSet<wg_dsl::WorkDependencyPathKey> {
1479 let mut adjacency = BTreeMap::<WorkItemId, BTreeSet<WorkItemId>>::new();
1480 for edge in edges.iter().filter(|edge| edge.kind == kind) {
1481 adjacency
1482 .entry(edge.from_id.clone())
1483 .or_default()
1484 .insert(edge.to_id.clone());
1485 }
1486
1487 let mut reachability = BTreeSet::new();
1488 for start in adjacency.keys() {
1489 let mut stack = adjacency
1490 .get(start)
1491 .into_iter()
1492 .flat_map(|targets| targets.iter().cloned())
1493 .collect::<Vec<_>>();
1494 let mut seen = BTreeSet::new();
1495 while let Some(current) = stack.pop() {
1496 if !seen.insert(current.clone()) {
1497 continue;
1498 }
1499 reachability.insert(dependency_path_key(kind, start, ¤t));
1500 if let Some(targets) = adjacency.get(¤t) {
1501 stack.extend(targets.iter().cloned());
1502 }
1503 }
1504 }
1505 reachability
1506}
1507
1508fn work_item_key(id: &WorkItemId) -> wg_dsl::WorkItemKey {
1509 wg_dsl::WorkItemKey(id.as_str().to_string())
1510}
1511
1512fn work_edge_key(
1513 kind: WorkEdgeKind,
1514 from_id: &WorkItemId,
1515 to_id: &WorkItemId,
1516) -> wg_dsl::WorkEdgeKey {
1517 wg_dsl::WorkEdgeKey(format!(
1518 "{}:{}:{}",
1519 edge_kind_key(kind),
1520 from_id.as_str(),
1521 to_id.as_str()
1522 ))
1523}
1524
1525fn dependency_path_key(
1526 kind: WorkEdgeKind,
1527 from_id: &WorkItemId,
1528 to_id: &WorkItemId,
1529) -> wg_dsl::WorkDependencyPathKey {
1530 wg_dsl::WorkDependencyPathKey(format!(
1531 "{}:{}:{}",
1532 edge_kind_key(kind),
1533 from_id.as_str(),
1534 to_id.as_str()
1535 ))
1536}
1537
1538fn dsl_edge_kind(kind: WorkEdgeKind) -> wg_dsl::WorkEdgeKind {
1539 match kind {
1540 WorkEdgeKind::Blocks => wg_dsl::WorkEdgeKind::Blocks,
1541 WorkEdgeKind::Parent => wg_dsl::WorkEdgeKind::Parent,
1542 WorkEdgeKind::Related => wg_dsl::WorkEdgeKind::Related,
1543 WorkEdgeKind::Supersedes => wg_dsl::WorkEdgeKind::Supersedes,
1544 WorkEdgeKind::DerivedFrom => wg_dsl::WorkEdgeKind::DerivedFrom,
1545 }
1546}
1547
1548fn edge_kind_key(kind: WorkEdgeKind) -> &'static str {
1549 match kind {
1550 WorkEdgeKind::Blocks => "blocks",
1551 WorkEdgeKind::Parent => "parent",
1552 WorkEdgeKind::Related => "related",
1553 WorkEdgeKind::Supersedes => "supersedes",
1554 WorkEdgeKind::DerivedFrom => "derived_from",
1555 }
1556}
1557
1558fn datetime_to_millis(dt: DateTime<Utc>) -> u64 {
1559 u64::try_from(dt.timestamp_millis()).unwrap_or(0)
1560}
1561
1562fn millis_to_datetime(ms: u64) -> Option<DateTime<Utc>> {
1563 DateTime::from_timestamp_millis(i64::try_from(ms).ok()?)
1564}
1565
1566fn item_event(
1567 item: &WorkItem,
1568 kind: WorkGraphEventKind,
1569 at: DateTime<Utc>,
1570) -> Result<WorkGraphEvent, WorkGraphError> {
1571 Ok(WorkGraphEvent::item(
1572 item.realm_id.clone(),
1573 item.namespace.clone(),
1574 item.id.clone(),
1575 kind,
1576 at,
1577 json!({ "item": item }),
1578 ))
1579}
1580
1581fn seconds_to_duration(seconds: u64) -> Duration {
1582 let seconds = i64::try_from(seconds).unwrap_or(i64::MAX);
1583 Duration::seconds(seconds)
1584}
1585
1586#[cfg(test)]
1587#[allow(
1588 clippy::expect_used,
1589 clippy::unwrap_used,
1590 clippy::panic,
1591 clippy::redundant_clone
1592)]
1593mod tests {
1594 use super::*;
1595 use crate::types::{
1596 AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, UpdateWorkItemRequest,
1597 WorkEvidenceKind, WorkEvidenceRef, WorkOwner, WorkOwnerKey,
1598 };
1599
1600 fn create(title: &str, now: DateTime<Utc>) -> WorkItem {
1601 create_with_policy(title, WorkCompletionPolicy::SelfAttest, now)
1602 }
1603
1604 fn create_with_policy(
1605 title: &str,
1606 completion_policy: WorkCompletionPolicy,
1607 now: DateTime<Utc>,
1608 ) -> WorkItem {
1609 WorkGraphMachine::create_item(
1610 CreateWorkItemRequest {
1611 realm_id: None,
1612 namespace: None,
1613 title: title.to_string(),
1614 description: None,
1615 priority: Default::default(),
1616 completion_policy,
1617 labels: BTreeSet::new(),
1618 due_at: None,
1619 not_before: None,
1620 snoozed_until: None,
1621 external_refs: Vec::new(),
1622 evidence_refs: Vec::new(),
1623 status: None,
1624 },
1625 "realm".to_string(),
1626 WorkNamespace::default(),
1627 now,
1628 )
1629 .expect("create")
1630 .0
1631 }
1632
1633 fn owner(id: &str) -> WorkOwner {
1634 WorkOwner::new(WorkOwnerKey::label(id).expect("owner key"))
1635 }
1636
1637 fn create_with_status(
1638 status: Option<WorkStatus>,
1639 now: DateTime<Utc>,
1640 ) -> Result<WorkItem, WorkGraphError> {
1641 WorkGraphMachine::create_item(
1642 CreateWorkItemRequest {
1643 realm_id: None,
1644 namespace: None,
1645 title: "status-create".to_string(),
1646 description: None,
1647 priority: Default::default(),
1648 completion_policy: WorkCompletionPolicy::SelfAttest,
1649 labels: BTreeSet::new(),
1650 due_at: None,
1651 not_before: None,
1652 snoozed_until: None,
1653 external_refs: Vec::new(),
1654 evidence_refs: Vec::new(),
1655 status,
1656 },
1657 "realm".to_string(),
1658 WorkNamespace::default(),
1659 now,
1660 )
1661 .map(|(item, _)| item)
1662 }
1663
1664 #[test]
1665 fn create_status_admission_is_decided_by_machine() {
1666 use wg_dsl::WorkCreateStatusAdmissionKind as Admission;
1667 assert_eq!(
1669 classify_create_status_admission(WorkStatus::Open).expect("classify open"),
1670 Admission::AdmittedOpen
1671 );
1672 assert_eq!(
1673 classify_create_status_admission(WorkStatus::Blocked).expect("classify blocked"),
1674 Admission::AdmittedBlocked
1675 );
1676 for status in [
1677 WorkStatus::InProgress,
1678 WorkStatus::Completed,
1679 WorkStatus::Cancelled,
1680 WorkStatus::Failed,
1681 ] {
1682 assert_eq!(
1683 classify_create_status_admission(status)
1684 .unwrap_or_else(|error| panic!("classify {status:?}: {error:?}")),
1685 Admission::Denied,
1686 "requested status {status:?} must be denied as a creation state"
1687 );
1688 }
1689 }
1690
1691 #[test]
1692 fn create_item_admits_open_and_blocked_and_rejects_the_rest() {
1693 let now = Utc::now();
1694 assert_eq!(
1695 create_with_status(Some(WorkStatus::Open), now)
1696 .expect("open admitted")
1697 .status,
1698 WorkStatus::Open
1699 );
1700 assert_eq!(
1701 create_with_status(Some(WorkStatus::Blocked), now)
1702 .expect("blocked admitted")
1703 .status,
1704 WorkStatus::Blocked
1705 );
1706 assert_eq!(
1708 create_with_status(None, now)
1709 .expect("default admitted")
1710 .status,
1711 WorkStatus::Open
1712 );
1713 for status in [
1714 WorkStatus::InProgress,
1715 WorkStatus::Completed,
1716 WorkStatus::Cancelled,
1717 WorkStatus::Failed,
1718 ] {
1719 let error = create_with_status(Some(status), now)
1720 .expect_err("non-open/blocked create status must be rejected");
1721 match error {
1722 WorkGraphError::InvalidTransition(message) => assert_eq!(
1723 message, "new work items may only start open or blocked",
1724 "rejection message preserved for {status:?}"
1725 ),
1726 other => panic!("expected InvalidTransition for {status:?}, got {other:?}"),
1727 }
1728 }
1729 }
1730
1731 #[test]
1732 fn public_confirmation_admission_is_decided_by_machine() {
1733 use wg_dsl::WorkPublicConfirmationAdmissionKind as Admission;
1734 assert_eq!(
1737 WorkGraphMachine::classify_public_confirmation_admission(
1738 &WorkCompletionPolicy::SelfAttest
1739 )
1740 .expect("self-attest admission"),
1741 Admission::Admitted
1742 );
1743 let owner_key = WorkOwnerKey::label("supervisor").expect("owner key");
1744 let denied = [
1745 WorkCompletionPolicy::HostConfirmed,
1746 WorkCompletionPolicy::PrincipalConfirmed,
1747 WorkCompletionPolicy::Supervisor {
1748 owner_key: owner_key.clone(),
1749 },
1750 WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
1751 ];
1752 for policy in denied {
1753 assert_eq!(
1754 WorkGraphMachine::classify_public_confirmation_admission(&policy)
1755 .unwrap_or_else(|error| panic!("classify {policy:?}: {error:?}")),
1756 Admission::DeniedRequiresTrustedHost,
1757 "policy {policy:?} must require trusted host for public confirmation"
1758 );
1759 }
1760 }
1761
1762 #[test]
1763 fn create_completion_policy_admission_is_decided_by_machine() {
1764 use wg_dsl::WorkCreateCompletionPolicyAdmissionKind as Admission;
1765 assert_eq!(
1768 WorkGraphMachine::classify_create_completion_policy_admission(
1769 &WorkCompletionPolicy::SelfAttest
1770 )
1771 .expect("self-attest admission"),
1772 Admission::Admitted
1773 );
1774 let owner_key = WorkOwnerKey::label("supervisor").expect("owner key");
1775 let denied = [
1776 WorkCompletionPolicy::HostConfirmed,
1777 WorkCompletionPolicy::PrincipalConfirmed,
1778 WorkCompletionPolicy::Supervisor { owner_key },
1779 WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
1780 ];
1781 for policy in denied {
1782 assert_eq!(
1783 WorkGraphMachine::classify_create_completion_policy_admission(&policy)
1784 .unwrap_or_else(|error| panic!("classify {policy:?}: {error:?}")),
1785 Admission::DeniedNonSelfAttest,
1786 "policy {policy:?} must be denied at create for a non-goal work item"
1787 );
1788 }
1789 }
1790
1791 #[test]
1792 fn close_status_admission_is_decided_by_machine() {
1793 use wg_dsl::WorkCloseStatusAdmissionKind as Admission;
1794 assert_eq!(
1797 classify_close_status_admission(WorkStatus::Completed).expect("classify completed"),
1798 Admission::AdmittedCompleted
1799 );
1800 assert_eq!(
1801 classify_close_status_admission(WorkStatus::Cancelled).expect("classify cancelled"),
1802 Admission::AdmittedCancelled
1803 );
1804 assert_eq!(
1805 classify_close_status_admission(WorkStatus::Failed).expect("classify failed"),
1806 Admission::AdmittedFailed
1807 );
1808 for status in [
1809 WorkStatus::Open,
1810 WorkStatus::InProgress,
1811 WorkStatus::Blocked,
1812 ] {
1813 assert_eq!(
1814 classify_close_status_admission(status)
1815 .unwrap_or_else(|error| panic!("classify {status:?}: {error:?}")),
1816 Admission::DeniedNonTerminal,
1817 "requested close status {status:?} must be denied as a non-terminal target"
1818 );
1819 }
1820 }
1821
1822 #[test]
1823 fn close_item_rejects_non_terminal_status_with_preserved_message() {
1824 let now = Utc::now();
1825 for status in [
1826 WorkStatus::Open,
1827 WorkStatus::InProgress,
1828 WorkStatus::Blocked,
1829 ] {
1830 let item = create("close-target", now);
1831 let error = WorkGraphMachine::close_item(
1832 item.clone(),
1833 CloseWorkItemRequest {
1834 id: item.id.clone(),
1835 realm_id: None,
1836 namespace: None,
1837 expected_revision: item.revision,
1838 status,
1839 },
1840 now,
1841 )
1842 .expect_err("non-terminal close status must be rejected");
1843 match error {
1844 WorkGraphError::InvalidTransition(message) => assert_eq!(
1845 message, "close requires a terminal status",
1846 "rejection message preserved for {status:?}"
1847 ),
1848 other => panic!("expected InvalidTransition for {status:?}, got {other:?}"),
1849 }
1850 }
1851 }
1852
1853 #[test]
1854 fn blocked_items_are_never_ready() {
1855 let now = Utc::now();
1856 let item = create("blocked", now);
1857 let (item, _) = WorkGraphMachine::block_item(item, 1, now).expect("block");
1858 assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
1859 }
1860
1861 #[test]
1862 fn future_due_items_are_not_ready() {
1863 let now = Utc::now();
1864 let item = create("future", now);
1865 let (item, _) = WorkGraphMachine::update_item(
1866 item,
1867 UpdateWorkItemRequest {
1868 id: WorkItemId::generated(),
1869 realm_id: None,
1870 namespace: None,
1871 expected_revision: 1,
1872 title: None,
1873 description: None,
1874 priority: None,
1875 completion_policy: None,
1876 labels: None,
1877 due_at: Some(now + Duration::hours(1)),
1878 not_before: None,
1879 snoozed_until: None,
1880 external_refs: Vec::new(),
1881 },
1882 now,
1883 )
1884 .expect("update due");
1885
1886 assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
1887 }
1888
1889 #[test]
1890 fn readiness_is_decided_by_machine_matching_claim_guards() {
1891 let now = Utc::now();
1892
1893 let open = create("ready-open", now);
1896 assert_eq!(open.status, WorkStatus::Open);
1897 assert!(
1898 WorkGraphMachine::classify_readiness(&open, now).expect("classify open"),
1899 "an unblocked, due-eligible open item must be machine-classified ready"
1900 );
1901 assert!(WorkGraphMachine::is_ready(&open, now));
1902
1903 let (future, _) = WorkGraphMachine::update_item(
1905 open,
1906 UpdateWorkItemRequest {
1907 id: WorkItemId::generated(),
1908 realm_id: None,
1909 namespace: None,
1910 expected_revision: 1,
1911 title: None,
1912 description: None,
1913 priority: None,
1914 completion_policy: None,
1915 labels: None,
1916 due_at: Some(now + Duration::hours(1)),
1917 not_before: None,
1918 snoozed_until: None,
1919 external_refs: Vec::new(),
1920 },
1921 now,
1922 )
1923 .expect("update future due");
1924 assert!(
1925 !WorkGraphMachine::classify_readiness(&future, now).expect("classify future"),
1926 "a future-due open item must be machine-classified not ready"
1927 );
1928 assert!(!WorkGraphMachine::is_ready(&future, now));
1929
1930 let claimable = create("reclaim", now);
1933 let (claimed, _) = WorkGraphMachine::claim_item(
1934 claimable,
1935 ClaimWorkItemRequest {
1936 id: WorkItemId::generated(),
1937 realm_id: None,
1938 namespace: None,
1939 expected_revision: 1,
1940 owner: owner("worker"),
1941 lease_seconds: Some(30),
1942 lease_expires_at: None,
1943 },
1944 now,
1945 )
1946 .expect("claim");
1947 assert_eq!(claimed.status, WorkStatus::InProgress);
1948 assert!(
1949 !WorkGraphMachine::classify_readiness(&claimed, now).expect("classify live lease"),
1950 "an in-progress item with a live lease must not be machine-classified ready"
1951 );
1952 let after_lease = now + Duration::seconds(31);
1953 assert!(
1954 WorkGraphMachine::classify_readiness(&claimed, after_lease)
1955 .expect("classify expired lease"),
1956 "an in-progress item with an expired lease must be machine-classified ready"
1957 );
1958 assert!(WorkGraphMachine::is_ready(&claimed, after_lease));
1959 }
1960
1961 #[test]
1962 fn terminal_items_cannot_be_claimed() {
1963 let now = Utc::now();
1964 let item = create("done", now);
1965 let (item, _) = WorkGraphMachine::close_item(
1966 item,
1967 CloseWorkItemRequest {
1968 id: WorkItemId::generated(),
1969 realm_id: None,
1970 namespace: None,
1971 expected_revision: 1,
1972 status: WorkStatus::Completed,
1973 },
1974 now,
1975 )
1976 .expect("close");
1977 let error = WorkGraphMachine::claim_item(
1978 item,
1979 ClaimWorkItemRequest {
1980 id: WorkItemId::generated(),
1981 realm_id: None,
1982 namespace: None,
1983 expected_revision: 2,
1984 owner: owner("worker"),
1985 lease_seconds: None,
1986 lease_expires_at: None,
1987 },
1988 now,
1989 )
1990 .expect_err("terminal claim should fail");
1991 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
1992 }
1993
1994 #[test]
1995 fn completed_close_is_completion_policy_gated_by_machine() {
1996 let now = Utc::now();
1997 let item = create_with_policy(
1998 "needs host confirmation",
1999 WorkCompletionPolicy::HostConfirmed,
2000 now,
2001 );
2002
2003 let error = WorkGraphMachine::close_item(
2007 item.clone(),
2008 CloseWorkItemRequest {
2009 id: item.id.clone(),
2010 realm_id: None,
2011 namespace: None,
2012 expected_revision: 1,
2013 status: WorkStatus::Completed,
2014 },
2015 now,
2016 )
2017 .expect_err("machine must reject completed close without policy evidence");
2018 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2019
2020 let (item, _) = WorkGraphMachine::add_evidence(
2024 item,
2025 AddEvidenceRequest {
2026 id: WorkItemId::generated(),
2027 realm_id: None,
2028 namespace: None,
2029 expected_revision: 1,
2030 evidence: WorkEvidenceRef {
2031 kind: "host_confirmation".to_string(),
2032 id: "acceptance".to_string(),
2033 label: None,
2034 summary: None,
2035 confirmation_kind: Some(WorkEvidenceKind::HostConfirmation),
2036 confirming_owner_key: None,
2037 },
2038 },
2039 now,
2040 )
2041 .expect("record host confirmation evidence");
2042
2043 let (closed, _) = WorkGraphMachine::close_item(
2044 item.clone(),
2045 CloseWorkItemRequest {
2046 id: item.id,
2047 realm_id: None,
2048 namespace: None,
2049 expected_revision: item.revision,
2050 status: WorkStatus::Completed,
2051 },
2052 now,
2053 )
2054 .expect("machine admits close once host confirmation is satisfied");
2055 assert_eq!(closed.status, WorkStatus::Completed);
2056 }
2057
2058 #[test]
2059 fn reviewer_quorum_close_requires_distinct_reviewers() {
2060 let now = Utc::now();
2061 let item = create_with_policy(
2062 "needs two reviewers",
2063 WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
2064 now,
2065 );
2066
2067 let reviewer_evidence = |reviewer: &str| WorkEvidenceRef {
2068 kind: "reviewer_confirmation".to_string(),
2069 id: reviewer.to_string(),
2070 label: Some(reviewer.to_string()),
2071 summary: None,
2072 confirmation_kind: Some(WorkEvidenceKind::ReviewerConfirmation),
2073 confirming_owner_key: Some(
2074 WorkOwnerKey::principal(reviewer).expect("reviewer principal"),
2075 ),
2076 };
2077
2078 let (item, _) = WorkGraphMachine::add_evidence(
2080 item,
2081 AddEvidenceRequest {
2082 id: WorkItemId::generated(),
2083 realm_id: None,
2084 namespace: None,
2085 expected_revision: 1,
2086 evidence: reviewer_evidence("alice"),
2087 },
2088 now,
2089 )
2090 .expect("record first reviewer");
2091
2092 let error = WorkGraphMachine::close_item(
2093 item.clone(),
2094 CloseWorkItemRequest {
2095 id: item.id.clone(),
2096 realm_id: None,
2097 namespace: None,
2098 expected_revision: item.revision,
2099 status: WorkStatus::Completed,
2100 },
2101 now,
2102 )
2103 .expect_err("single reviewer must not satisfy a quorum of two");
2104 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2105
2106 let expected_revision = item.revision;
2109 let (item, _) = WorkGraphMachine::add_evidence(
2110 item,
2111 AddEvidenceRequest {
2112 id: WorkItemId::generated(),
2113 realm_id: None,
2114 namespace: None,
2115 expected_revision,
2116 evidence: reviewer_evidence("alice"),
2117 },
2118 now,
2119 )
2120 .expect("record duplicate reviewer");
2121
2122 let error = WorkGraphMachine::close_item(
2123 item.clone(),
2124 CloseWorkItemRequest {
2125 id: item.id.clone(),
2126 realm_id: None,
2127 namespace: None,
2128 expected_revision: item.revision,
2129 status: WorkStatus::Completed,
2130 },
2131 now,
2132 )
2133 .expect_err("duplicate reviewer must not satisfy a quorum of two");
2134 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2135
2136 let expected_revision = item.revision;
2138 let (item, _) = WorkGraphMachine::add_evidence(
2139 item,
2140 AddEvidenceRequest {
2141 id: WorkItemId::generated(),
2142 realm_id: None,
2143 namespace: None,
2144 expected_revision,
2145 evidence: reviewer_evidence("bob"),
2146 },
2147 now,
2148 )
2149 .expect("record second reviewer");
2150
2151 let (closed, _) = WorkGraphMachine::close_item(
2152 item.clone(),
2153 CloseWorkItemRequest {
2154 id: item.id,
2155 realm_id: None,
2156 namespace: None,
2157 expected_revision: item.revision,
2158 status: WorkStatus::Completed,
2159 },
2160 now,
2161 )
2162 .expect("two distinct reviewers satisfy the quorum");
2163 assert_eq!(closed.status, WorkStatus::Completed);
2164 }
2165
2166 #[test]
2167 fn stale_revisions_fail() {
2168 let now = Utc::now();
2169 let item = create("stale", now);
2170 let error =
2171 WorkGraphMachine::block_item(item, 7, now).expect_err("stale transition should fail");
2172 assert!(matches!(error, WorkGraphError::StaleRevision { .. }));
2173 }
2174
2175 #[test]
2176 fn public_error_class_is_machine_owned() {
2177 use crate::types::{WorkAttentionBindingId, WorkNamespace};
2178
2179 let cases: &[(WorkGraphError, WorkGraphPublicErrorClass)] = &[
2180 (
2181 WorkGraphError::not_found(
2182 "realm".to_string(),
2183 WorkNamespace::default(),
2184 WorkItemId::generated(),
2185 ),
2186 WorkGraphPublicErrorClass::NotFound,
2187 ),
2188 (
2189 WorkGraphError::attention_not_found(
2190 "realm".to_string(),
2191 WorkNamespace::default(),
2192 WorkAttentionBindingId::generated(),
2193 ),
2194 WorkGraphPublicErrorClass::NotFound,
2195 ),
2196 (
2197 WorkGraphError::StaleRevision {
2198 id: WorkItemId::generated(),
2199 expected: 1,
2200 actual: 2,
2201 },
2202 WorkGraphPublicErrorClass::Conflict,
2203 ),
2204 (
2205 WorkGraphError::Conflict("conflict".to_string()),
2206 WorkGraphPublicErrorClass::Conflict,
2207 ),
2208 (
2209 WorkGraphError::InvalidTransition("bad".to_string()),
2210 WorkGraphPublicErrorClass::InvalidTransition,
2211 ),
2212 (
2213 WorkGraphError::InvalidInput("bad".to_string()),
2214 WorkGraphPublicErrorClass::InvalidArguments,
2215 ),
2216 (
2217 WorkGraphError::InvalidTimestampMillis {
2218 field: "due_at",
2219 millis: -1,
2220 },
2221 WorkGraphPublicErrorClass::InvalidArguments,
2222 ),
2223 (
2224 WorkGraphError::Store("store".to_string()),
2225 WorkGraphPublicErrorClass::StoreError,
2226 ),
2227 (
2228 WorkGraphError::UnsupportedBackend("backend".to_string()),
2229 WorkGraphPublicErrorClass::CapabilityUnavailable,
2230 ),
2231 ];
2232
2233 for (error, expected) in cases {
2234 let class = WorkGraphMachine::public_error_class(error)
2235 .expect("machine must classify every WorkGraphError variant");
2236 assert_eq!(class, *expected, "unexpected public class for {error:?}");
2237 }
2238 }
2239
2240 #[test]
2241 fn only_one_active_claim_can_exist() {
2242 let now = Utc::now();
2243 let item = create("claim", now);
2244 let (claimed, _) = WorkGraphMachine::claim_item(
2245 item,
2246 ClaimWorkItemRequest {
2247 id: WorkItemId::generated(),
2248 realm_id: None,
2249 namespace: None,
2250 expected_revision: 1,
2251 owner: owner("worker"),
2252 lease_seconds: Some(60),
2253 lease_expires_at: None,
2254 },
2255 now,
2256 )
2257 .expect("claim");
2258 let error = WorkGraphMachine::claim_item(
2259 claimed,
2260 ClaimWorkItemRequest {
2261 id: WorkItemId::generated(),
2262 realm_id: None,
2263 namespace: None,
2264 expected_revision: 2,
2265 owner: owner("worker-2"),
2266 lease_seconds: Some(60),
2267 lease_expires_at: None,
2268 },
2269 now,
2270 )
2271 .expect_err("double claim should fail");
2272 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2273 }
2274
2275 #[test]
2276 fn validate_item_projection_only_rejects_never_derives() {
2277 let now = Utc::now();
2278
2279 let clean = create("projection-guard", now);
2281 WorkGraphMachine::validate_item_projection(&clean)
2282 .expect("a machine-built projection must agree with its machine state");
2283
2284 let mut status_drift = clean.clone();
2288 status_drift.status = WorkStatus::Completed;
2289 let err = WorkGraphMachine::validate_item_projection(&status_drift)
2290 .expect_err("status projection drift must be rejected, never repaired");
2291 assert!(
2292 matches!(&err, WorkGraphError::Store(message) if message.contains("status projection")),
2293 "rejection must cite the status projection drift, got: {err:?}"
2294 );
2295
2296 let mut revision_drift = clean.clone();
2300 revision_drift.revision = revision_drift.revision.wrapping_add(1);
2301 let err = WorkGraphMachine::validate_item_projection(&revision_drift)
2302 .expect_err("revision projection drift must be rejected, never repaired");
2303 assert!(
2304 matches!(&err, WorkGraphError::Store(message) if message.contains("revision projection")),
2305 "rejection must cite the revision projection drift, got: {err:?}"
2306 );
2307 }
2308}