Skip to main content

meerkat_workgraph/
machine.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::{DateTime, Duration, Utc};
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machines::{work_attention_lifecycle as attention_dsl, workgraph_lifecycle as wg_dsl};
8use crate::types::{
9    AddEvidenceRequest, AttentionDelegatedAuthority, ClaimWorkItemRequest, CloseWorkItemRequest,
10    CreateWorkItemRequest, ProjectedAttentionAuthority, ReleaseWorkItemRequest,
11    UpdateWorkItemRequest, WorkAttentionBinding, WorkAttentionMode, WorkAttentionStatus, WorkClaim,
12    WorkCompletionPolicy, WorkEdge, WorkEdgeKind, WorkGraphEvent, WorkGraphEventKind,
13    WorkGraphMachineState, WorkItem, WorkItemId, WorkNamespace, WorkStatus,
14};
15
16/// Machine-owned public error classification surfaced to REST/RPC callers.
17///
18/// Re-exported from the canonical `WorkGraphLifecycleMachine` DSL: the machine
19/// is the sole authority for the variant->class POLICY (see
20/// `WorkGraphMachine::public_error_class`). Surfaces mirror the emitted class.
21pub use wg_dsl::WorkGraphPublicErrorClass;
22
23/// Machine-owned public-confirmation admission verdict surfaced to the
24/// public-confirm surface.
25///
26/// Re-exported from the canonical `WorkGraphLifecycleMachine` DSL: the machine
27/// is the sole authority for the trust-scoped eligibility (see
28/// `WorkGraphMachine::classify_public_confirmation_admission`). The surface
29/// mirrors the emitted verdict.
30pub use wg_dsl::WorkPublicConfirmationAdmissionKind;
31
32/// Machine-owned admission verdict for a requested mutation of a work item's
33/// `completion_policy`.
34///
35/// Re-exported from the canonical `WorkGraphLifecycleMachine` DSL: the machine is
36/// the sole authority for the immutability invariant "a work item's completion
37/// policy is fixed at creation and cannot be changed by an update" (see
38/// `WorkGraphMachine::classify_completion_policy_mutation_admission`). The shell
39/// mirrors the emitted verdict.
40pub use wg_dsl::WorkCompletionPolicyMutationAdmissionKind;
41
42#[derive(Debug, Default, Clone, Copy)]
43pub struct WorkAttentionMachine;
44
45impl WorkAttentionMachine {
46    pub fn pause(
47        mut binding: WorkAttentionBinding,
48        expected_revision: u64,
49        until: Option<DateTime<Utc>>,
50        now: DateTime<Utc>,
51    ) -> Result<WorkAttentionBinding, WorkGraphError> {
52        let input = attention_dsl::WorkAttentionLifecycleInput::Pause {
53            expected_revision,
54            until_utc_ms: until.map(datetime_to_millis),
55        };
56        binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
57        sync_attention_from_machine_state(&mut binding);
58        binding.updated_at = now;
59        Ok(binding)
60    }
61
62    pub fn resume(
63        mut binding: WorkAttentionBinding,
64        expected_revision: u64,
65        now: DateTime<Utc>,
66    ) -> Result<WorkAttentionBinding, WorkGraphError> {
67        let input = attention_dsl::WorkAttentionLifecycleInput::Resume { expected_revision };
68        binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
69        sync_attention_from_machine_state(&mut binding);
70        binding.updated_at = now;
71        Ok(binding)
72    }
73
74    pub fn stop(
75        mut binding: WorkAttentionBinding,
76        expected_revision: u64,
77        now: DateTime<Utc>,
78    ) -> Result<WorkAttentionBinding, WorkGraphError> {
79        let input = attention_dsl::WorkAttentionLifecycleInput::Stop {
80            expected_revision,
81            at_utc_ms: datetime_to_millis(now),
82        };
83        binding.machine_state = apply_attention_dsl(&binding, input, Some(expected_revision))?;
84        sync_attention_from_machine_state(&mut binding);
85        binding.updated_at = now;
86        Ok(binding)
87    }
88
89    /// Resolve attention-projection eligibility for the binding at `now`.
90    ///
91    /// The shell extracts only the raw wall-clock `now` (a pure observation) and
92    /// drives the canonical `WorkAttentionLifecycleMachine`'s
93    /// `ClassifyAttentionEligibility` input over the recovered binding state. The
94    /// machine owns the eligibility POLICY — including the Paused deadline-elapsed
95    /// rule (`paused_until <= now`) — and emits the verdict; this function only
96    /// mirrors the emitted `AttentionEligibilityClassified.eligible`. It fails
97    /// closed (returns `Err`) if the machine refuses to classify or emits no
98    /// verdict; it decides nothing.
99    pub fn classify_eligibility_at(
100        binding: &WorkAttentionBinding,
101        now: DateTime<Utc>,
102    ) -> Result<bool, WorkGraphError> {
103        let mut dsl_auth =
104            attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
105                binding.machine_state.clone(),
106            )
107            .map_err(|error| {
108                WorkGraphError::InvalidTransition(format!(
109                    "attention binding {} refused eligibility recovery: {error:?}",
110                    binding.binding_id
111                ))
112            })?;
113        let transition = attention_dsl::WorkAttentionLifecycleMachineMutator::apply(
114            &mut dsl_auth,
115            attention_dsl::WorkAttentionLifecycleInput::ClassifyAttentionEligibility {
116                now_utc_ms: datetime_to_millis(now),
117            },
118        )
119        .map_err(|error| {
120            WorkGraphError::InvalidTransition(format!(
121                "attention binding {} refused eligibility classification: {error:?}",
122                binding.binding_id
123            ))
124        })?;
125
126        let mut classified = None;
127        for effect in transition.effects() {
128            if let attention_dsl::WorkAttentionLifecycleEffect::AttentionEligibilityClassified {
129                eligible,
130            } = effect
131                && classified.replace(*eligible).is_some()
132            {
133                return Err(WorkGraphError::Store(format!(
134                    "attention binding {} emitted multiple eligibility verdicts",
135                    binding.binding_id
136                )));
137            }
138        }
139
140        classified.ok_or_else(|| {
141            WorkGraphError::Store(format!(
142                "attention binding {} emitted no eligibility verdict",
143                binding.binding_id
144            ))
145        })
146    }
147
148    /// Resolve the projected attention authority for the binding.
149    ///
150    /// The shell extracts only the raw binding facts (`mode`,
151    /// `delegated_authority`) and drives the canonical
152    /// `WorkAttentionLifecycleMachine`'s `ClassifyAttentionAuthority` input over
153    /// the recovered binding state. The machine owns the COMPLETE per-stance
154    /// tool-admission POLICY (which stances may read, add evidence, release,
155    /// update, block, create, link, close their own review item, or
156    /// close-if-policy-allows) and emits the capability verdict; this function
157    /// mirrors the emitted `AttentionAuthorityClassified` capability bits. Fails
158    /// closed.
159    pub fn classify_authority(
160        binding: &WorkAttentionBinding,
161    ) -> Result<ProjectedAttentionAuthority, WorkGraphError> {
162        let mode = attention_mode_to_dsl(binding.mode);
163        let delegated_authority = attention_delegated_authority_to_dsl(binding.delegated_authority);
164        let mut dsl_auth =
165            attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
166                binding.machine_state.clone(),
167            )
168            .map_err(|error| {
169                WorkGraphError::InvalidTransition(format!(
170                    "attention binding {} refused authority recovery: {error:?}",
171                    binding.binding_id
172                ))
173            })?;
174        let transition = attention_dsl::WorkAttentionLifecycleMachineMutator::apply(
175            &mut dsl_auth,
176            attention_dsl::WorkAttentionLifecycleInput::ClassifyAttentionAuthority {
177                mode,
178                delegated_authority,
179            },
180        )
181        .map_err(|error| {
182            WorkGraphError::InvalidTransition(format!(
183                "attention binding {} refused authority classification: {error:?}",
184                binding.binding_id
185            ))
186        })?;
187
188        let mut classified = None;
189        for effect in transition.effects() {
190            if let attention_dsl::WorkAttentionLifecycleEffect::AttentionAuthorityClassified {
191                can_get,
192                can_add_evidence,
193                can_release,
194                can_update,
195                can_block,
196                can_create,
197                can_link,
198                can_link_parent,
199                can_link_related,
200                can_link_derived_from,
201                can_close_own_review_item,
202                can_close_if_policy_allows,
203            } = effect
204            {
205                let verdict = ProjectedAttentionAuthority {
206                    can_get: *can_get,
207                    can_add_evidence: *can_add_evidence,
208                    can_release: *can_release,
209                    can_update: *can_update,
210                    can_block: *can_block,
211                    can_create: *can_create,
212                    can_link: *can_link,
213                    can_link_parent: *can_link_parent,
214                    can_link_related: *can_link_related,
215                    can_link_derived_from: *can_link_derived_from,
216                    can_close_own_review_item: *can_close_own_review_item,
217                    can_close_if_policy_allows: *can_close_if_policy_allows,
218                };
219                if classified.replace(verdict).is_some() {
220                    return Err(WorkGraphError::Store(format!(
221                        "attention binding {} emitted multiple authority verdicts",
222                        binding.binding_id
223                    )));
224                }
225            }
226        }
227
228        classified.ok_or_else(|| {
229            WorkGraphError::Store(format!(
230                "attention binding {} emitted no authority verdict",
231                binding.binding_id
232            ))
233        })
234    }
235}
236
237fn attention_mode_to_dsl(mode: WorkAttentionMode) -> attention_dsl::WorkAttentionMode {
238    match mode {
239        WorkAttentionMode::Pursue => attention_dsl::WorkAttentionMode::Pursue,
240        WorkAttentionMode::Coordinate => attention_dsl::WorkAttentionMode::Coordinate,
241        WorkAttentionMode::Review => attention_dsl::WorkAttentionMode::Review,
242        WorkAttentionMode::Falsify => attention_dsl::WorkAttentionMode::Falsify,
243        WorkAttentionMode::Judge => attention_dsl::WorkAttentionMode::Judge,
244        WorkAttentionMode::Observe => attention_dsl::WorkAttentionMode::Observe,
245    }
246}
247
248fn attention_delegated_authority_to_dsl(
249    authority: AttentionDelegatedAuthority,
250) -> attention_dsl::AttentionDelegatedAuthority {
251    match authority {
252        AttentionDelegatedAuthority::AddEvidence => {
253            attention_dsl::AttentionDelegatedAuthority::AddEvidence
254        }
255        AttentionDelegatedAuthority::CloseOwnReviewItem => {
256            attention_dsl::AttentionDelegatedAuthority::CloseOwnReviewItem
257        }
258        AttentionDelegatedAuthority::RequestClosure => {
259            attention_dsl::AttentionDelegatedAuthority::RequestClosure
260        }
261        AttentionDelegatedAuthority::CloseIfPolicyAllows => {
262            attention_dsl::AttentionDelegatedAuthority::CloseIfPolicyAllows
263        }
264    }
265}
266
267#[derive(Debug, Default, Clone, Copy)]
268pub struct WorkGraphMachine;
269
270impl WorkGraphMachine {
271    /// Mechanically assert that a `WorkItem`'s projected lifecycle fields agree
272    /// with its machine-owned `machine_state` authority.
273    ///
274    /// This is a *pure structural check*: it borrows `item` immutably and can
275    /// only return `Ok(())` (every projection matches the machine state) or an
276    /// `Err` rejecting the drift. It never synthesizes, repairs, or derives any
277    /// lifecycle/revision field — the canonical truth lives in
278    /// `WorkGraphMachineState`, and this guard merely rejects projections that
279    /// disagree with it.
280    pub fn validate_item_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
281        validate_item_machine_projection(item)
282    }
283
284    /// Resolve the public error class for a `WorkGraphError`.
285    ///
286    /// The shell performs only a pure typed extraction of the error variant
287    /// into a `WorkGraphErrorKind` discriminant (one kind per variant, no
288    /// grouping). The variant->class POLICY is owned by the canonical
289    /// `WorkGraphLifecycleMachine`: this drives the machine's
290    /// `ClassifyWorkGraphPublicError` input and mirrors the emitted
291    /// `WorkGraphPublicErrorClassified` effect. The shell decides nothing.
292    pub fn public_error_class(
293        error: &WorkGraphError,
294    ) -> Result<WorkGraphPublicErrorClass, WorkGraphError> {
295        let kind = work_graph_error_kind(error);
296        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
297        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
298            &mut dsl_auth,
299            wg_dsl::WorkGraphLifecycleInput::ClassifyWorkGraphPublicError { kind },
300        )
301        .map_err(|transition_error| {
302            WorkGraphError::Store(format!(
303                "generated WorkGraph public error classification refused kind {kind:?}: {transition_error:?}"
304            ))
305        })?;
306
307        let mut classified = None;
308        for effect in transition.effects() {
309            if let wg_dsl::WorkGraphLifecycleEffect::WorkGraphPublicErrorClassified {
310                kind: emitted_kind,
311                public_class,
312            } = effect
313            {
314                if *emitted_kind != kind {
315                    return Err(WorkGraphError::Store(format!(
316                        "generated WorkGraph public error classification emitted kind {emitted_kind:?} while classifying {kind:?}"
317                    )));
318                }
319                if classified.replace(*public_class).is_some() {
320                    return Err(WorkGraphError::Store(format!(
321                        "generated WorkGraph public error classification emitted multiple classes for kind {kind:?}"
322                    )));
323                }
324            }
325        }
326
327        classified.ok_or_else(|| {
328            WorkGraphError::Store(format!(
329                "generated WorkGraph public error classification did not emit a class for kind {kind:?}"
330            ))
331        })
332    }
333
334    /// Resolve whether an untrusted PUBLIC caller may confirm an item with the
335    /// given machine-owned completion policy.
336    ///
337    /// The trust-scoped eligibility "only a self-attested completion policy may
338    /// be confirmed by a public caller; every other policy requires trusted
339    /// in-process host authority" is owned by the canonical
340    /// `WorkGraphLifecycleMachine`, not the public-confirm surface. The shell
341    /// performs only a pure typed extraction of the machine-owned
342    /// `completion_policy` into the DSL observation, drives the machine's
343    /// `ClassifyPublicConfirmationAdmission` input, and mirrors the emitted
344    /// `PublicConfirmationAdmissionClassified` verdict. The shell decides
345    /// nothing and fails closed if the machine refuses or emits no verdict.
346    pub fn classify_public_confirmation_admission(
347        completion_policy: &crate::types::WorkCompletionPolicy,
348    ) -> Result<wg_dsl::WorkPublicConfirmationAdmissionKind, WorkGraphError> {
349        let policy = completion_policy.to_machine();
350        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
351        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
352            &mut dsl_auth,
353            wg_dsl::WorkGraphLifecycleInput::ClassifyPublicConfirmationAdmission {
354                completion_policy: policy,
355            },
356        )
357        .map_err(|error| {
358            WorkGraphError::InvalidInput(format!(
359                "WorkGraphLifecycle refused public-confirmation admission for {policy:?}: {error:?}"
360            ))
361        })?;
362
363        let mut admission = None;
364        for effect in transition.effects() {
365            if let wg_dsl::WorkGraphLifecycleEffect::PublicConfirmationAdmissionClassified {
366                admission: emitted,
367            } = effect
368                && admission.replace(*emitted).is_some()
369            {
370                return Err(WorkGraphError::Store(format!(
371                    "WorkGraphLifecycle public-confirmation admission emitted multiple verdicts for {policy:?}"
372                )));
373            }
374        }
375
376        admission.ok_or_else(|| {
377            WorkGraphError::Store(format!(
378                "WorkGraphLifecycle public-confirmation admission emitted no verdict for {policy:?}"
379            ))
380        })
381    }
382
383    /// Resolve whether a requested completion policy is admissible at CREATE for
384    /// a non-goal work item.
385    ///
386    /// The creation policy "non-goal work items must use the self-attest
387    /// completion policy" is owned by the canonical `WorkGraphLifecycleMachine`,
388    /// not the create shell. The shell performs only a pure typed extraction of
389    /// the requested completion policy into the DSL observation, drives the
390    /// machine's `ClassifyCreateCompletionPolicyAdmission` input over a fresh
391    /// authority, and mirrors the emitted
392    /// `CreateCompletionPolicyAdmissionClassified` verdict. The shell decides
393    /// nothing and fails closed if the machine refuses or emits no verdict.
394    pub fn classify_create_completion_policy_admission(
395        completion_policy: &crate::types::WorkCompletionPolicy,
396    ) -> Result<wg_dsl::WorkCreateCompletionPolicyAdmissionKind, WorkGraphError> {
397        let policy = completion_policy.to_machine();
398        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
399        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
400            &mut dsl_auth,
401            wg_dsl::WorkGraphLifecycleInput::ClassifyCreateCompletionPolicyAdmission {
402                completion_policy: policy,
403            },
404        )
405        .map_err(|error| {
406            WorkGraphError::InvalidInput(format!(
407                "WorkGraphLifecycle refused create completion-policy admission for {policy:?}: {error:?}"
408            ))
409        })?;
410
411        let mut admission = None;
412        for effect in transition.effects() {
413            if let wg_dsl::WorkGraphLifecycleEffect::CreateCompletionPolicyAdmissionClassified {
414                admission: emitted,
415            } = effect
416                && admission.replace(*emitted).is_some()
417            {
418                return Err(WorkGraphError::Store(format!(
419                    "WorkGraphLifecycle create completion-policy admission emitted multiple verdicts for {policy:?}"
420                )));
421            }
422        }
423
424        admission.ok_or_else(|| {
425            WorkGraphError::Store(format!(
426                "WorkGraphLifecycle create completion-policy admission emitted no verdict for {policy:?}"
427            ))
428        })
429    }
430
431    /// Resolve whether a requested completion-policy mutation is admissible.
432    ///
433    /// The immutability invariant "a work item's completion policy is fixed at
434    /// creation and cannot be changed by an update" is owned by the canonical
435    /// `WorkGraphLifecycleMachine`, not the shell. The shell performs only a pure
436    /// typed extraction of the requested completion policy into the DSL
437    /// observation (variant plus supervisor owner key plus reviewer quorum
438    /// threshold), drives the machine's `ClassifyCompletionPolicyMutationAdmission`
439    /// input over the item's recovered machine state, and mirrors the emitted
440    /// `CompletionPolicyMutationAdmissionClassified` verdict. The machine compares
441    /// the requested policy — in full — against its own machine-owned completion
442    /// policy; this function decides nothing and fails closed if the machine
443    /// refuses or emits no verdict.
444    pub fn classify_completion_policy_mutation_admission(
445        item: &WorkItem,
446        requested: &crate::types::WorkCompletionPolicy,
447    ) -> Result<wg_dsl::WorkCompletionPolicyMutationAdmissionKind, WorkGraphError> {
448        validate_item_machine_projection(item)?;
449        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
450            item.machine_state.clone(),
451        )
452        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
453        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
454            &mut dsl_auth,
455            wg_dsl::WorkGraphLifecycleInput::ClassifyCompletionPolicyMutationAdmission {
456                requested_completion_policy: requested.to_machine(),
457                requested_completion_supervisor_owner_key: requested.supervisor_owner_key(),
458                requested_completion_reviewer_quorum_threshold: requested
459                    .reviewer_quorum_threshold(),
460            },
461        )
462        .map_err(|error| {
463            WorkGraphError::InvalidTransition(format!(
464                "work item {} refused completion-policy mutation classification: {error:?}",
465                item.id
466            ))
467        })?;
468
469        let mut admission = None;
470        for effect in transition.effects() {
471            if let wg_dsl::WorkGraphLifecycleEffect::CompletionPolicyMutationAdmissionClassified {
472                admission: emitted,
473            } = effect
474                && admission.replace(*emitted).is_some()
475            {
476                return Err(WorkGraphError::Store(format!(
477                    "work item {} emitted multiple completion-policy mutation verdicts",
478                    item.id
479                )));
480            }
481        }
482
483        admission.ok_or_else(|| {
484            WorkGraphError::Store(format!(
485                "work item {} emitted no completion-policy mutation verdict",
486                item.id
487            ))
488        })
489    }
490
491    /// Resolve whether a trusted-path goal confirmation is admissible for a work
492    /// item's machine-owned completion policy.
493    ///
494    /// The eligibility "is this confirming principal + supplied evidence kind
495    /// admissible for this completion policy" is owned by the canonical
496    /// `WorkGraphLifecycleMachine`, not the goal-confirm shell. The shell
497    /// performs only pure typed extraction of the observations (the machine-owned
498    /// completion policy + its supervisor owner key, the requested confirming
499    /// principal owner key + kind, and the typed evidence-kind observation
500    /// projected from the evidence's typed confirmation classification), drives
501    /// the machine's
502    /// `ClassifyConfirmationAdmission` input, and mirrors the emitted
503    /// `ConfirmationAdmissionClassified` verdict. This function decides nothing
504    /// and fails closed if the machine refuses or emits no verdict.
505    pub fn classify_confirmation_admission(
506        completion_policy: &crate::types::WorkCompletionPolicy,
507        requested_principal: Option<&crate::types::WorkOwnerKey>,
508        supplied_evidence_kind: wg_dsl::WorkConfirmationEvidenceObservation,
509    ) -> Result<wg_dsl::WorkConfirmationAdmissionKind, WorkGraphError> {
510        let policy = completion_policy.to_machine();
511        let requested_principal_owner_key =
512            requested_principal.map(crate::types::work_owner_key_to_machine);
513        let requested_principal_kind = requested_principal
514            .map(|principal| crate::types::work_owner_kind_to_machine(principal.kind));
515        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
516        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
517            &mut dsl_auth,
518            wg_dsl::WorkGraphLifecycleInput::ClassifyConfirmationAdmission {
519                completion_policy: policy,
520                completion_supervisor_owner_key: completion_policy.supervisor_owner_key(),
521                requested_principal_owner_key,
522                requested_principal_kind,
523                supplied_evidence_kind,
524            },
525        )
526        .map_err(|error| {
527            WorkGraphError::Store(format!(
528                "WorkGraphLifecycle refused confirmation admission for {policy:?}: {error:?}"
529            ))
530        })?;
531
532        let mut admission = None;
533        for effect in transition.effects() {
534            if let wg_dsl::WorkGraphLifecycleEffect::ConfirmationAdmissionClassified {
535                admission: emitted,
536            } = effect
537                && admission.replace(*emitted).is_some()
538            {
539                return Err(WorkGraphError::Store(format!(
540                    "WorkGraphLifecycle confirmation admission emitted multiple verdicts for {policy:?}"
541                )));
542            }
543        }
544
545        admission.ok_or_else(|| {
546            WorkGraphError::Store(format!(
547                "WorkGraphLifecycle confirmation admission emitted no verdict for {policy:?}"
548            ))
549        })
550    }
551
552    /// Resolve whether a work item is terminal.
553    ///
554    /// The shell extracts no fact: it drives the canonical
555    /// `WorkGraphLifecycleMachine`'s `ClassifyTerminality` input over the item's
556    /// recovered machine state. The machine owns the lifecycle_phase and the
557    /// terminality verdict (which phases are terminal); this function mirrors the
558    /// emitted `WorkItemTerminalityClassified.terminal`, failing closed if the
559    /// machine refuses or emits no verdict. It decides nothing.
560    pub fn classify_terminality(item: &WorkItem) -> Result<bool, WorkGraphError> {
561        validate_item_machine_projection(item)?;
562        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
563            item.machine_state.clone(),
564        )
565        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
566        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
567            &mut dsl_auth,
568            wg_dsl::WorkGraphLifecycleInput::ClassifyTerminality {},
569        )
570        .map_err(|error| {
571            WorkGraphError::InvalidTransition(format!(
572                "work item {} refused terminality classification: {error:?}",
573                item.id
574            ))
575        })?;
576
577        let mut classified = None;
578        for effect in transition.effects() {
579            if let wg_dsl::WorkGraphLifecycleEffect::WorkItemTerminalityClassified { terminal } =
580                effect
581                && classified.replace(*terminal).is_some()
582            {
583                return Err(WorkGraphError::Store(format!(
584                    "work item {} emitted multiple terminality verdicts",
585                    item.id
586                )));
587            }
588        }
589
590        classified.ok_or_else(|| {
591            WorkGraphError::Store(format!(
592                "work item {} emitted no terminality verdict",
593                item.id
594            ))
595        })
596    }
597
598    /// Resolve whether a single blocking edge is satisfied.
599    ///
600    /// The shell extracts only the raw blocker lifecycle phase (a pure
601    /// observation projected from the blocker's own machine state) and whether
602    /// the blocker was resolvable at all, then drives the canonical
603    /// `WorkGraphLifecycleMachine`'s `ClassifyBlockerSatisfied` input over the
604    /// gated item's recovered state. The machine owns the satisfaction POLICY (a
605    /// blocking edge is satisfied iff its blocker reached terminal SUCCESS,
606    /// `Completed`) and emits the verdict; this function mirrors it. The caller
607    /// mechanically fans-in (counts) the unsatisfied edges. Fails closed.
608    pub fn classify_blocker_satisfied(
609        gated_item: &WorkItem,
610        blocker: Option<&WorkItem>,
611    ) -> Result<bool, WorkGraphError> {
612        validate_item_machine_projection(gated_item)?;
613        let blocker_lifecycle_phase = match blocker {
614            Some(blocker) => blocker.machine_state.lifecycle_phase,
615            None => wg_dsl::WorkLifecycleState::Absent,
616        };
617        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
618            gated_item.machine_state.clone(),
619        )
620        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
621        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
622            &mut dsl_auth,
623            wg_dsl::WorkGraphLifecycleInput::ClassifyBlockerSatisfied {
624                blocker_present: blocker.is_some(),
625                blocker_lifecycle_phase,
626            },
627        )
628        .map_err(|error| {
629            WorkGraphError::InvalidTransition(format!(
630                "work item {} refused blocker satisfaction classification: {error:?}",
631                gated_item.id
632            ))
633        })?;
634
635        let mut classified = None;
636        for effect in transition.effects() {
637            if let wg_dsl::WorkGraphLifecycleEffect::BlockerSatisfactionClassified { satisfied } =
638                effect
639                && classified.replace(*satisfied).is_some()
640            {
641                return Err(WorkGraphError::Store(format!(
642                    "work item {} emitted multiple blocker satisfaction verdicts",
643                    gated_item.id
644                )));
645            }
646        }
647
648        classified.ok_or_else(|| {
649            WorkGraphError::Store(format!(
650                "work item {} emitted no blocker satisfaction verdict",
651                gated_item.id
652            ))
653        })
654    }
655
656    pub fn create_item(
657        request: CreateWorkItemRequest,
658        realm_id: String,
659        namespace: WorkNamespace,
660        now: DateTime<Utc>,
661    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
662        let title = validate_title(request.title)?;
663        let status = request.status.unwrap_or_default();
664        // The creation policy "a new work item may only start open or blocked"
665        // is owned by WorkGraphLifecycleMachine, not this shell. We extract the
666        // requested status as a pure typed observation, drive the machine's
667        // admission classifier, and mirror the verdict: AdmittedOpen ->
668        // CreateOpen, AdmittedBlocked -> CreateBlocked, Denied -> the same
669        // InvalidTransition rejection. Fails closed.
670        let input = match classify_create_status_admission(status)? {
671            wg_dsl::WorkCreateStatusAdmissionKind::AdmittedOpen => {
672                wg_dsl::WorkGraphLifecycleInput::CreateOpen {
673                    due_at_utc_ms: request.due_at.map(datetime_to_millis),
674                    not_before_utc_ms: request.not_before.map(datetime_to_millis),
675                    snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
676                    completion_policy: request.completion_policy.clone().to_machine(),
677                    completion_supervisor_owner_key: request
678                        .completion_policy
679                        .supervisor_owner_key(),
680                    completion_reviewer_quorum_threshold: request
681                        .completion_policy
682                        .reviewer_quorum_threshold(),
683                    unresolved_blocker_count: 0,
684                }
685            }
686            wg_dsl::WorkCreateStatusAdmissionKind::AdmittedBlocked => {
687                wg_dsl::WorkGraphLifecycleInput::CreateBlocked {
688                    due_at_utc_ms: request.due_at.map(datetime_to_millis),
689                    not_before_utc_ms: request.not_before.map(datetime_to_millis),
690                    snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
691                    completion_policy: request.completion_policy.clone().to_machine(),
692                    completion_supervisor_owner_key: request
693                        .completion_policy
694                        .supervisor_owner_key(),
695                    completion_reviewer_quorum_threshold: request
696                        .completion_policy
697                        .reviewer_quorum_threshold(),
698                    unresolved_blocker_count: 0,
699                }
700            }
701            wg_dsl::WorkCreateStatusAdmissionKind::Denied => {
702                return Err(WorkGraphError::InvalidTransition(
703                    "new work items may only start open or blocked".to_string(),
704                ));
705            }
706        };
707        let dsl_state = apply_new_item_dsl(input)?;
708        let mut item = WorkItem {
709            id: WorkItemId::generated(),
710            realm_id,
711            namespace,
712            title,
713            description: request.description,
714            status: work_status_from_dsl(dsl_state.lifecycle_phase)?,
715            completion_policy: request.completion_policy,
716            priority: request.priority,
717            labels: normalize_labels(request.labels)?,
718            owner: None,
719            claim: None,
720            machine_state: dsl_state.clone(),
721            revision: dsl_state.revision,
722            due_at: dsl_state.due_at_utc_ms.and_then(millis_to_datetime),
723            not_before: dsl_state.not_before_utc_ms.and_then(millis_to_datetime),
724            snoozed_until: dsl_state.snoozed_until_utc_ms.and_then(millis_to_datetime),
725            created_at: now,
726            updated_at: now,
727            terminal_at: dsl_state.terminal_at_utc_ms.and_then(millis_to_datetime),
728            external_refs: request.external_refs,
729            evidence_refs: request.evidence_refs,
730        };
731        sync_item_from_machine_state(&mut item)?;
732        let event = item_event(&item, WorkGraphEventKind::Created, now)?;
733        Ok((item, event))
734    }
735
736    pub fn update_item(
737        mut item: WorkItem,
738        request: UpdateWorkItemRequest,
739        now: DateTime<Utc>,
740    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
741        let due_at = request.due_at.or(item.due_at);
742        let not_before = request.not_before.or(item.not_before);
743        let snoozed_until = request.snoozed_until.or(item.snoozed_until);
744        let completion_policy = request
745            .completion_policy
746            .unwrap_or_else(|| item.completion_policy.clone());
747        let dsl_state = apply_item_dsl(
748            &item,
749            item.machine_state.unresolved_blocker_count,
750            wg_dsl::WorkGraphLifecycleInput::Update {
751                expected_revision: request.expected_revision,
752                due_at_utc_ms: due_at.map(datetime_to_millis),
753                not_before_utc_ms: not_before.map(datetime_to_millis),
754                snoozed_until_utc_ms: snoozed_until.map(datetime_to_millis),
755                completion_policy: completion_policy.clone().to_machine(),
756                completion_supervisor_owner_key: completion_policy.supervisor_owner_key(),
757                completion_reviewer_quorum_threshold: completion_policy.reviewer_quorum_threshold(),
758                unresolved_blocker_count: item.machine_state.unresolved_blocker_count,
759            },
760            Some(request.expected_revision),
761        )?;
762
763        if let Some(title) = request.title {
764            item.title = validate_title(title)?;
765        }
766        if let Some(description) = request.description {
767            item.description = Some(description);
768        }
769        if let Some(priority) = request.priority {
770            item.priority = priority;
771        }
772        if let Some(labels) = request.labels {
773            item.labels = normalize_labels(labels)?;
774        }
775        item.machine_state = dsl_state;
776        sync_item_from_machine_state(&mut item)?;
777        if !request.external_refs.is_empty() {
778            item.external_refs = request.external_refs;
779        }
780        item.updated_at = now;
781        let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
782        Ok((item, event))
783    }
784
785    pub fn claim_item(
786        item: WorkItem,
787        request: ClaimWorkItemRequest,
788        now: DateTime<Utc>,
789    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
790        Self::claim_ready_item(item, request, now)
791    }
792
793    pub fn claim_ready_item(
794        item: WorkItem,
795        request: ClaimWorkItemRequest,
796        now: DateTime<Utc>,
797    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
798        Self::claim_item_with_unresolved_blockers(
799            item.clone(),
800            item.machine_state.unresolved_blocker_count,
801            request,
802            now,
803        )
804    }
805
806    pub fn refresh_eligibility(
807        mut item: WorkItem,
808        unresolved_blocker_count: u64,
809        now: DateTime<Utc>,
810    ) -> Result<Option<(WorkItem, WorkGraphEvent)>, WorkGraphError> {
811        if item.machine_state.unresolved_blocker_count == unresolved_blocker_count {
812            return Ok(None);
813        }
814        let dsl_state = apply_item_dsl(
815            &item,
816            unresolved_blocker_count,
817            wg_dsl::WorkGraphLifecycleInput::RefreshEligibility {
818                unresolved_blocker_count,
819            },
820            None,
821        )?;
822        item.machine_state = dsl_state;
823        sync_item_from_machine_state(&mut item)?;
824        item.updated_at = now;
825        let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
826        Ok(Some((item, event)))
827    }
828
829    pub(crate) fn claim_item_with_unresolved_blockers(
830        mut item: WorkItem,
831        unresolved_blocker_count: u64,
832        request: ClaimWorkItemRequest,
833        now: DateTime<Utc>,
834    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
835        let lease_expires_at = request.lease_expires_at.or_else(|| {
836            request
837                .lease_seconds
838                .map(|seconds| now + seconds_to_duration(seconds))
839        });
840        let owner_key = work_owner_key(&request.owner)?;
841        let dsl_state = apply_item_dsl(
842            &item,
843            unresolved_blocker_count,
844            wg_dsl::WorkGraphLifecycleInput::Claim {
845                expected_revision: request.expected_revision,
846                owner_key,
847                now_utc_ms: datetime_to_millis(now),
848                lease_expires_at_utc_ms: lease_expires_at.map(datetime_to_millis),
849            },
850            Some(request.expected_revision),
851        )?;
852        item.owner = Some(request.owner.clone());
853        item.claim = Some(WorkClaim {
854            owner: request.owner,
855            claimed_at: now,
856            lease_expires_at,
857        });
858        item.machine_state = dsl_state;
859        sync_item_from_machine_state(&mut item)?;
860        item.updated_at = now;
861        let event = item_event(&item, WorkGraphEventKind::Claimed, now)?;
862        Ok((item, event))
863    }
864
865    pub fn release_item(
866        mut item: WorkItem,
867        request: ReleaseWorkItemRequest,
868        now: DateTime<Utc>,
869    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
870        let dsl_state = apply_item_dsl(
871            &item,
872            item.machine_state.unresolved_blocker_count,
873            wg_dsl::WorkGraphLifecycleInput::Release {
874                expected_revision: request.expected_revision,
875            },
876            Some(request.expected_revision),
877        )?;
878        item.claim = None;
879        item.owner = None;
880        item.machine_state = dsl_state;
881        sync_item_from_machine_state(&mut item)?;
882        item.updated_at = now;
883        let event = item_event(&item, WorkGraphEventKind::Released, now)?;
884        Ok((item, event))
885    }
886
887    pub fn block_item(
888        mut item: WorkItem,
889        expected_revision: u64,
890        now: DateTime<Utc>,
891    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
892        let dsl_state = apply_item_dsl(
893            &item,
894            item.machine_state.unresolved_blocker_count,
895            wg_dsl::WorkGraphLifecycleInput::Block { expected_revision },
896            Some(expected_revision),
897        )?;
898        item.claim = None;
899        item.owner = None;
900        item.machine_state = dsl_state;
901        sync_item_from_machine_state(&mut item)?;
902        item.updated_at = now;
903        let event = item_event(&item, WorkGraphEventKind::Blocked, now)?;
904        Ok((item, event))
905    }
906
907    pub fn close_item(
908        mut item: WorkItem,
909        request: CloseWorkItemRequest,
910        now: DateTime<Utc>,
911    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
912        // The lifecycle-class fact "close requires a terminal status" is owned
913        // by WorkGraphLifecycleMachine, not this shell. We extract the requested
914        // target status as a pure typed observation, drive the machine's
915        // admission classifier, and mirror the verdict: AdmittedCompleted ->
916        // CloseCompleted, AdmittedCancelled -> CloseCancelled, AdmittedFailed ->
917        // CloseFailed, DeniedNonTerminal -> the exact same InvalidTransition
918        // rejection. Fails closed.
919        let dsl_input = match classify_close_status_admission(request.status)? {
920            wg_dsl::WorkCloseStatusAdmissionKind::AdmittedCompleted => {
921                wg_dsl::WorkGraphLifecycleInput::CloseCompleted {
922                    expected_revision: request.expected_revision,
923                    at_utc_ms: datetime_to_millis(now),
924                }
925            }
926            wg_dsl::WorkCloseStatusAdmissionKind::AdmittedCancelled => {
927                wg_dsl::WorkGraphLifecycleInput::CloseCancelled {
928                    expected_revision: request.expected_revision,
929                    at_utc_ms: datetime_to_millis(now),
930                }
931            }
932            wg_dsl::WorkCloseStatusAdmissionKind::AdmittedFailed => {
933                wg_dsl::WorkGraphLifecycleInput::CloseFailed {
934                    expected_revision: request.expected_revision,
935                    at_utc_ms: datetime_to_millis(now),
936                }
937            }
938            wg_dsl::WorkCloseStatusAdmissionKind::DeniedNonTerminal => {
939                return Err(WorkGraphError::InvalidTransition(
940                    "close requires a terminal status".to_string(),
941                ));
942            }
943        };
944        let dsl_state = apply_item_dsl(
945            &item,
946            item.machine_state.unresolved_blocker_count,
947            dsl_input,
948            Some(request.expected_revision),
949        )?;
950        item.claim = None;
951        item.owner = None;
952        item.machine_state = dsl_state;
953        sync_item_from_machine_state(&mut item)?;
954        item.updated_at = now;
955        let event = item_event(&item, WorkGraphEventKind::Closed, now)?;
956        Ok((item, event))
957    }
958
959    pub fn add_evidence(
960        mut item: WorkItem,
961        request: AddEvidenceRequest,
962        now: DateTime<Utc>,
963    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
964        let evidence_kind = request
965            .evidence
966            .confirmation_kind
967            .unwrap_or(crate::types::WorkEvidenceKind::SelfAttest)
968            .to_machine();
969        let confirming_owner_key = request
970            .evidence
971            .confirming_owner_key
972            .as_ref()
973            .map(crate::types::work_owner_key_to_machine);
974        let dsl_state = apply_item_dsl(
975            &item,
976            item.machine_state.unresolved_blocker_count,
977            wg_dsl::WorkGraphLifecycleInput::AddEvidence {
978                expected_revision: request.expected_revision,
979                evidence_kind,
980                confirming_owner_key,
981            },
982            Some(request.expected_revision),
983        )?;
984        item.evidence_refs.push(request.evidence);
985        item.machine_state = dsl_state;
986        sync_item_from_machine_state(&mut item)?;
987        item.updated_at = now;
988        let event = item_event(&item, WorkGraphEventKind::EvidenceAdded, now)?;
989        Ok((item, event))
990    }
991
992    /// Classify whether a work item is ready to claim at `now`.
993    ///
994    /// The shell extracts only the raw wall-clock `now` (a pure observation) and
995    /// drives the canonical `WorkGraphLifecycleMachine`'s `ClassifyReadiness`
996    /// input over the item's recovered machine state. The machine owns the
997    /// readiness POLICY — reproducing exactly the `Claim` transition guards
998    /// (`ClaimOpen` / `ClaimExpiredInProgress`) — and emits the verdict; this
999    /// function mirrors the emitted `WorkItemReadinessClassified.ready`, failing
1000    /// closed if the machine refuses or emits no verdict. It decides nothing.
1001    pub fn classify_readiness(item: &WorkItem, now: DateTime<Utc>) -> Result<bool, WorkGraphError> {
1002        validate_item_machine_projection(item)?;
1003        let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(
1004            item.machine_state.clone(),
1005        )
1006        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1007        let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1008            &mut dsl_auth,
1009            wg_dsl::WorkGraphLifecycleInput::ClassifyReadiness {
1010                now_utc_ms: datetime_to_millis(now),
1011            },
1012        )
1013        .map_err(|error| {
1014            WorkGraphError::InvalidTransition(format!(
1015                "work item {} refused readiness classification: {error:?}",
1016                item.id
1017            ))
1018        })?;
1019
1020        let mut classified = None;
1021        for effect in transition.effects() {
1022            if let wg_dsl::WorkGraphLifecycleEffect::WorkItemReadinessClassified { ready } = effect
1023                && classified.replace(*ready).is_some()
1024            {
1025                return Err(WorkGraphError::Store(format!(
1026                    "work item {} emitted multiple readiness verdicts",
1027                    item.id
1028                )));
1029            }
1030        }
1031
1032        classified.ok_or_else(|| {
1033            WorkGraphError::Store(format!(
1034                "work item {} emitted no readiness verdict",
1035                item.id
1036            ))
1037        })
1038    }
1039
1040    /// Mirror the machine-owned readiness verdict as a `bool`, failing closed.
1041    ///
1042    /// The readiness verdict belongs to the canonical
1043    /// `WorkGraphLifecycleMachine` (see
1044    /// [`WorkGraphMachine::classify_readiness`]). This filter-facing projection
1045    /// mirrors the machine bool; if the machine refuses to classify, it fails
1046    /// closed by treating the item as NOT ready so an unclassifiable item is
1047    /// never offered as claimable work.
1048    pub fn is_ready(item: &WorkItem, now: DateTime<Utc>) -> bool {
1049        Self::classify_readiness(item, now).unwrap_or(false)
1050    }
1051
1052    pub fn ready_items(items: Vec<WorkItem>, now: DateTime<Utc>) -> Vec<WorkItem> {
1053        items
1054            .into_iter()
1055            .filter(|item| Self::is_ready(item, now))
1056            .collect()
1057    }
1058
1059    pub fn validate_link(
1060        edge: &WorkEdge,
1061        existing_items: &[WorkItem],
1062        existing_edges: &[WorkEdge],
1063    ) -> Result<(), WorkGraphError> {
1064        let topology_state = topology_state(existing_items, existing_edges);
1065        apply_link_validation_dsl(
1066            topology_state,
1067            wg_dsl::WorkGraphLifecycleInput::ValidateLink {
1068                kind: dsl_edge_kind(edge.kind),
1069                from_item_key: work_item_key(&edge.from_id),
1070                to_item_key: work_item_key(&edge.to_id),
1071                edge_key: work_edge_key(edge.kind, &edge.from_id, &edge.to_id),
1072                reverse_path_key: dependency_path_key(edge.kind, &edge.to_id, &edge.from_id),
1073            },
1074        )?;
1075        Ok(())
1076    }
1077}
1078
1079/// Pure typed extraction of a `WorkGraphError` into the machine's typed
1080/// error-kind discriminant. This is a 1:1 variant->kind map with NO grouping;
1081/// the many-to-one variant->public-class POLICY lives in the canonical
1082/// `WorkGraphLifecycleMachine`, not here.
1083fn work_graph_error_kind(error: &WorkGraphError) -> wg_dsl::WorkGraphErrorKind {
1084    match error {
1085        WorkGraphError::NotFound { .. } => wg_dsl::WorkGraphErrorKind::NotFound,
1086        WorkGraphError::AttentionNotFound { .. } => wg_dsl::WorkGraphErrorKind::AttentionNotFound,
1087        WorkGraphError::StaleRevision { .. } => wg_dsl::WorkGraphErrorKind::StaleRevision,
1088        WorkGraphError::Conflict(_) => wg_dsl::WorkGraphErrorKind::Conflict,
1089        WorkGraphError::InvalidTransition(_) => wg_dsl::WorkGraphErrorKind::InvalidTransition,
1090        WorkGraphError::InvalidInput(_) => wg_dsl::WorkGraphErrorKind::InvalidInput,
1091        WorkGraphError::InvalidTimestampMillis { .. } => {
1092            wg_dsl::WorkGraphErrorKind::InvalidTimestampMillis
1093        }
1094        WorkGraphError::Store(_) => wg_dsl::WorkGraphErrorKind::Store,
1095        WorkGraphError::UnsupportedBackend(_) => wg_dsl::WorkGraphErrorKind::UnsupportedBackend,
1096    }
1097}
1098
1099pub(crate) fn completion_policy_name(policy: &WorkCompletionPolicy) -> &'static str {
1100    match policy {
1101        WorkCompletionPolicy::SelfAttest => "self_attest",
1102        WorkCompletionPolicy::HostConfirmed => "host_confirmed",
1103        WorkCompletionPolicy::PrincipalConfirmed => "principal_confirmed",
1104        WorkCompletionPolicy::Supervisor { .. } => "supervisor",
1105        WorkCompletionPolicy::ReviewerQuorum { .. } => "reviewer_quorum",
1106    }
1107}
1108
1109fn validate_title(title: String) -> Result<String, WorkGraphError> {
1110    let title = title.trim();
1111    if title.is_empty() {
1112        return Err(WorkGraphError::InvalidInput(
1113            "work item title must not be empty".to_string(),
1114        ));
1115    }
1116    Ok(title.to_string())
1117}
1118
1119fn normalize_labels(labels: BTreeSet<String>) -> Result<BTreeSet<String>, WorkGraphError> {
1120    let mut normalized = BTreeSet::new();
1121    for label in labels {
1122        let label = label.trim();
1123        if label.is_empty() {
1124            return Err(WorkGraphError::InvalidInput(
1125                "work item labels must not be empty".to_string(),
1126            ));
1127        }
1128        normalized.insert(label.to_string());
1129    }
1130    Ok(normalized)
1131}
1132
1133fn apply_new_item_dsl(
1134    input: wg_dsl::WorkGraphLifecycleInput,
1135) -> Result<wg_dsl::WorkGraphLifecycleMachineState, WorkGraphError> {
1136    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1137    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
1138        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1139    Ok(dsl_auth.state().clone())
1140}
1141
1142/// Resolve whether a requested INITIAL work status is an admissible creation
1143/// state.
1144///
1145/// The shell performs only a pure typed extraction of the requested
1146/// `WorkStatus` into the machine's `WorkLifecycleState` observation. The
1147/// creation POLICY "a new work item may only start open or blocked" is owned by
1148/// the canonical `WorkGraphLifecycleMachine`: this drives its
1149/// `ClassifyCreateStatusAdmission` input over a fresh authority and mirrors the
1150/// emitted `CreateStatusAdmissionClassified` verdict. The shell decides nothing
1151/// and fails closed if the machine refuses or emits no verdict.
1152fn classify_create_status_admission(
1153    status: WorkStatus,
1154) -> Result<wg_dsl::WorkCreateStatusAdmissionKind, WorkGraphError> {
1155    let requested_status = crate::types::work_lifecycle_state_from_status(status);
1156    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1157    let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1158        &mut dsl_auth,
1159        wg_dsl::WorkGraphLifecycleInput::ClassifyCreateStatusAdmission { requested_status },
1160    )
1161    .map_err(|error| {
1162        WorkGraphError::InvalidTransition(format!(
1163            "WorkGraphLifecycle refused create-status admission for {requested_status:?}: {error:?}"
1164        ))
1165    })?;
1166
1167    let mut admission = None;
1168    for effect in transition.effects() {
1169        if let wg_dsl::WorkGraphLifecycleEffect::CreateStatusAdmissionClassified {
1170            admission: emitted,
1171        } = effect
1172            && admission.replace(*emitted).is_some()
1173        {
1174            return Err(WorkGraphError::Store(format!(
1175                "WorkGraphLifecycle create-status admission emitted multiple verdicts for {requested_status:?}"
1176            )));
1177        }
1178    }
1179
1180    admission.ok_or_else(|| {
1181        WorkGraphError::Store(format!(
1182            "WorkGraphLifecycle create-status admission emitted no verdict for {requested_status:?}"
1183        ))
1184    })
1185}
1186
1187/// Resolve whether a requested target lifecycle status is an admissible CLOSE
1188/// target.
1189///
1190/// The shell performs only a pure typed extraction of the requested
1191/// `WorkStatus` into the machine's `WorkLifecycleState` observation. The
1192/// lifecycle-class fact "close requires a terminal status" is owned by the
1193/// canonical `WorkGraphLifecycleMachine`: this drives its
1194/// `ClassifyCloseStatusAdmission` input over a fresh authority and mirrors the
1195/// emitted `CloseStatusAdmissionClassified` verdict. The shell decides nothing
1196/// and fails closed if the machine refuses or emits no verdict.
1197fn classify_close_status_admission(
1198    status: WorkStatus,
1199) -> Result<wg_dsl::WorkCloseStatusAdmissionKind, WorkGraphError> {
1200    let requested_status = crate::types::work_lifecycle_state_from_status(status);
1201    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
1202    let transition = wg_dsl::WorkGraphLifecycleMachineMutator::apply(
1203        &mut dsl_auth,
1204        wg_dsl::WorkGraphLifecycleInput::ClassifyCloseStatusAdmission { requested_status },
1205    )
1206    .map_err(|error| {
1207        WorkGraphError::InvalidTransition(format!(
1208            "WorkGraphLifecycle refused close-status admission for {requested_status:?}: {error:?}"
1209        ))
1210    })?;
1211
1212    let mut admission = None;
1213    for effect in transition.effects() {
1214        if let wg_dsl::WorkGraphLifecycleEffect::CloseStatusAdmissionClassified {
1215            admission: emitted,
1216        } = effect
1217            && admission.replace(*emitted).is_some()
1218        {
1219            return Err(WorkGraphError::Store(format!(
1220                "WorkGraphLifecycle close-status admission emitted multiple verdicts for {requested_status:?}"
1221            )));
1222        }
1223    }
1224
1225    admission.ok_or_else(|| {
1226        WorkGraphError::Store(format!(
1227            "WorkGraphLifecycle close-status admission emitted no verdict for {requested_status:?}"
1228        ))
1229    })
1230}
1231
1232fn apply_link_validation_dsl(
1233    state: WorkGraphMachineState,
1234    input: wg_dsl::WorkGraphLifecycleInput,
1235) -> Result<(), WorkGraphError> {
1236    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(state)
1237        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1238    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
1239        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1240    Ok(())
1241}
1242
1243fn apply_attention_dsl(
1244    binding: &WorkAttentionBinding,
1245    input: attention_dsl::WorkAttentionLifecycleInput,
1246    expected_revision: Option<u64>,
1247) -> Result<attention_dsl::WorkAttentionLifecycleMachineState, WorkGraphError> {
1248    let mut dsl_auth = attention_dsl::WorkAttentionLifecycleMachineAuthority::recover_from_state(
1249        binding.machine_state.clone(),
1250    )
1251    .map_err(|error| {
1252        WorkGraphError::InvalidTransition(format!(
1253            "attention binding {} refused recovery: {error:?}",
1254            binding.binding_id
1255        ))
1256    })?;
1257    attention_dsl::WorkAttentionLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(
1258        |error| {
1259            if let Some(expected) = expected_revision
1260                && binding.machine_state.revision != expected
1261            {
1262                return WorkGraphError::StaleRevision {
1263                    id: binding.work_ref.item_id.clone(),
1264                    expected,
1265                    actual: binding.machine_state.revision,
1266                };
1267            }
1268            WorkGraphError::InvalidTransition(format!(
1269                "attention binding {} refused transition: {error:?}",
1270                binding.binding_id
1271            ))
1272        },
1273    )?;
1274    Ok(dsl_auth.state().clone())
1275}
1276
1277fn apply_item_dsl(
1278    item: &WorkItem,
1279    unresolved_blocker_count: u64,
1280    input: wg_dsl::WorkGraphLifecycleInput,
1281    expected_revision: Option<u64>,
1282) -> Result<WorkGraphMachineState, WorkGraphError> {
1283    validate_item_machine_projection(item)?;
1284    let mut state = item.machine_state.clone();
1285    state.unresolved_blocker_count = unresolved_blocker_count;
1286    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::recover_from_state(state)
1287        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
1288    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(|error| {
1289        if let Some(expected) = expected_revision
1290            && item.revision != expected
1291        {
1292            return WorkGraphError::StaleRevision {
1293                id: item.id.clone(),
1294                expected,
1295                actual: item.revision,
1296            };
1297        }
1298        WorkGraphError::InvalidTransition(format!("{error:?}"))
1299    })?;
1300    Ok(dsl_auth.state().clone())
1301}
1302
1303fn sync_attention_from_machine_state(binding: &mut WorkAttentionBinding) {
1304    binding.status = match binding.machine_state.lifecycle_phase {
1305        attention_dsl::WorkAttentionLifecycleState::Active => WorkAttentionStatus::Active,
1306        attention_dsl::WorkAttentionLifecycleState::Paused => WorkAttentionStatus::Paused {
1307            until: binding
1308                .machine_state
1309                .paused_until_utc_ms
1310                .and_then(millis_to_datetime),
1311        },
1312        attention_dsl::WorkAttentionLifecycleState::Superseded => WorkAttentionStatus::Superseded,
1313        attention_dsl::WorkAttentionLifecycleState::Stopped => WorkAttentionStatus::Stopped,
1314    };
1315}
1316
1317fn work_status_from_dsl(status: wg_dsl::WorkLifecycleState) -> Result<WorkStatus, WorkGraphError> {
1318    match status {
1319        wg_dsl::WorkLifecycleState::Open => Ok(WorkStatus::Open),
1320        wg_dsl::WorkLifecycleState::InProgress => Ok(WorkStatus::InProgress),
1321        wg_dsl::WorkLifecycleState::Blocked => Ok(WorkStatus::Blocked),
1322        wg_dsl::WorkLifecycleState::Completed => Ok(WorkStatus::Completed),
1323        wg_dsl::WorkLifecycleState::Cancelled => Ok(WorkStatus::Cancelled),
1324        wg_dsl::WorkLifecycleState::Failed => Ok(WorkStatus::Failed),
1325        wg_dsl::WorkLifecycleState::Absent => Err(WorkGraphError::InvalidTransition(
1326            "work item lifecycle state is absent".to_string(),
1327        )),
1328    }
1329}
1330
1331fn sync_item_from_machine_state(item: &mut WorkItem) -> Result<(), WorkGraphError> {
1332    item.status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
1333    item.revision = item.machine_state.revision;
1334    item.due_at = item
1335        .machine_state
1336        .due_at_utc_ms
1337        .and_then(millis_to_datetime);
1338    item.not_before = item
1339        .machine_state
1340        .not_before_utc_ms
1341        .and_then(millis_to_datetime);
1342    item.snoozed_until = item
1343        .machine_state
1344        .snoozed_until_utc_ms
1345        .and_then(millis_to_datetime);
1346    item.completion_policy = crate::types::WorkCompletionPolicy::from_machine(
1347        item.machine_state.completion_policy,
1348        item.machine_state.completion_supervisor_owner_key.clone(),
1349        item.machine_state.completion_reviewer_quorum_threshold,
1350    );
1351    item.terminal_at = item
1352        .machine_state
1353        .terminal_at_utc_ms
1354        .and_then(millis_to_datetime);
1355    Ok(())
1356}
1357
1358fn validate_item_machine_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
1359    let status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
1360    if item.status != status {
1361        return Err(WorkGraphError::Store(format!(
1362            "work item {} status projection {:?} does not match machine state {:?}",
1363            item.id, item.status, status
1364        )));
1365    }
1366    if item.revision != item.machine_state.revision {
1367        return Err(WorkGraphError::Store(format!(
1368            "work item {} revision projection {} does not match machine state {}",
1369            item.id, item.revision, item.machine_state.revision
1370        )));
1371    }
1372    if item.due_at.map(datetime_to_millis) != item.machine_state.due_at_utc_ms {
1373        return Err(WorkGraphError::Store(format!(
1374            "work item {} due_at projection does not match machine state",
1375            item.id
1376        )));
1377    }
1378    if item.not_before.map(datetime_to_millis) != item.machine_state.not_before_utc_ms {
1379        return Err(WorkGraphError::Store(format!(
1380            "work item {} not_before projection does not match machine state",
1381            item.id
1382        )));
1383    }
1384    if item.snoozed_until.map(datetime_to_millis) != item.machine_state.snoozed_until_utc_ms {
1385        return Err(WorkGraphError::Store(format!(
1386            "work item {} snoozed_until projection does not match machine state",
1387            item.id
1388        )));
1389    }
1390    if item.completion_policy
1391        != crate::types::WorkCompletionPolicy::from_machine(
1392            item.machine_state.completion_policy,
1393            item.machine_state.completion_supervisor_owner_key.clone(),
1394            item.machine_state.completion_reviewer_quorum_threshold,
1395        )
1396    {
1397        return Err(WorkGraphError::Store(format!(
1398            "work item {} completion_policy projection does not match machine state",
1399            item.id
1400        )));
1401    }
1402    if item.terminal_at.map(datetime_to_millis) != item.machine_state.terminal_at_utc_ms {
1403        return Err(WorkGraphError::Store(format!(
1404            "work item {} terminal_at projection does not match machine state",
1405            item.id
1406        )));
1407    }
1408    if let Some(claim) = &item.claim {
1409        let claim_owner_key = work_owner_key(&claim.owner)?;
1410        if item.machine_state.claim_owner_key.as_ref() != Some(&claim_owner_key) {
1411            return Err(WorkGraphError::Store(format!(
1412                "work item {} claim owner projection does not match machine state",
1413                item.id
1414            )));
1415        }
1416        if item.machine_state.claimed_at_utc_ms != Some(datetime_to_millis(claim.claimed_at)) {
1417            return Err(WorkGraphError::Store(format!(
1418                "work item {} claim time projection does not match machine state",
1419                item.id
1420            )));
1421        }
1422        if item.machine_state.lease_expires_at_utc_ms
1423            != claim.lease_expires_at.map(datetime_to_millis)
1424        {
1425            return Err(WorkGraphError::Store(format!(
1426                "work item {} claim lease projection does not match machine state",
1427                item.id
1428            )));
1429        }
1430    } else if item.machine_state.claim_owner_key.is_some()
1431        || item.machine_state.claimed_at_utc_ms.is_some()
1432        || item.machine_state.lease_expires_at_utc_ms.is_some()
1433    {
1434        return Err(WorkGraphError::Store(format!(
1435            "work item {} machine state has a claim without a claim projection",
1436            item.id
1437        )));
1438    }
1439    Ok(())
1440}
1441
1442fn work_owner_key(owner: &crate::types::WorkOwner) -> Result<wg_dsl::WorkOwnerKey, WorkGraphError> {
1443    let kind = match owner.key.kind {
1444        crate::types::WorkOwnerKind::Principal => wg_dsl::WorkOwnerKind::Principal,
1445        crate::types::WorkOwnerKind::Agent => wg_dsl::WorkOwnerKind::Agent,
1446        crate::types::WorkOwnerKind::Session => wg_dsl::WorkOwnerKind::Session,
1447        crate::types::WorkOwnerKind::Mob => wg_dsl::WorkOwnerKind::Mob,
1448        crate::types::WorkOwnerKind::Label => wg_dsl::WorkOwnerKind::Label,
1449    };
1450    Ok(wg_dsl::WorkOwnerKey {
1451        kind,
1452        id: owner.key.id.clone(),
1453    })
1454}
1455
1456fn topology_state(
1457    existing_items: &[WorkItem],
1458    existing_edges: &[WorkEdge],
1459) -> WorkGraphMachineState {
1460    WorkGraphMachineState {
1461        topology_item_keys: existing_items
1462            .iter()
1463            .map(|item| work_item_key(&item.id))
1464            .collect(),
1465        topology_edge_keys: existing_edges
1466            .iter()
1467            .map(|edge| work_edge_key(edge.kind, &edge.from_id, &edge.to_id))
1468            .collect(),
1469        blocks_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Blocks),
1470        parent_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Parent),
1471        ..Default::default()
1472    }
1473}
1474
1475fn dependency_reachability(
1476    edges: &[WorkEdge],
1477    kind: WorkEdgeKind,
1478) -> BTreeSet<wg_dsl::WorkDependencyPathKey> {
1479    let mut adjacency = BTreeMap::<WorkItemId, BTreeSet<WorkItemId>>::new();
1480    for edge in edges.iter().filter(|edge| edge.kind == kind) {
1481        adjacency
1482            .entry(edge.from_id.clone())
1483            .or_default()
1484            .insert(edge.to_id.clone());
1485    }
1486
1487    let mut reachability = BTreeSet::new();
1488    for start in adjacency.keys() {
1489        let mut stack = adjacency
1490            .get(start)
1491            .into_iter()
1492            .flat_map(|targets| targets.iter().cloned())
1493            .collect::<Vec<_>>();
1494        let mut seen = BTreeSet::new();
1495        while let Some(current) = stack.pop() {
1496            if !seen.insert(current.clone()) {
1497                continue;
1498            }
1499            reachability.insert(dependency_path_key(kind, start, &current));
1500            if let Some(targets) = adjacency.get(&current) {
1501                stack.extend(targets.iter().cloned());
1502            }
1503        }
1504    }
1505    reachability
1506}
1507
1508fn work_item_key(id: &WorkItemId) -> wg_dsl::WorkItemKey {
1509    wg_dsl::WorkItemKey(id.as_str().to_string())
1510}
1511
1512fn work_edge_key(
1513    kind: WorkEdgeKind,
1514    from_id: &WorkItemId,
1515    to_id: &WorkItemId,
1516) -> wg_dsl::WorkEdgeKey {
1517    wg_dsl::WorkEdgeKey(format!(
1518        "{}:{}:{}",
1519        edge_kind_key(kind),
1520        from_id.as_str(),
1521        to_id.as_str()
1522    ))
1523}
1524
1525fn dependency_path_key(
1526    kind: WorkEdgeKind,
1527    from_id: &WorkItemId,
1528    to_id: &WorkItemId,
1529) -> wg_dsl::WorkDependencyPathKey {
1530    wg_dsl::WorkDependencyPathKey(format!(
1531        "{}:{}:{}",
1532        edge_kind_key(kind),
1533        from_id.as_str(),
1534        to_id.as_str()
1535    ))
1536}
1537
1538fn dsl_edge_kind(kind: WorkEdgeKind) -> wg_dsl::WorkEdgeKind {
1539    match kind {
1540        WorkEdgeKind::Blocks => wg_dsl::WorkEdgeKind::Blocks,
1541        WorkEdgeKind::Parent => wg_dsl::WorkEdgeKind::Parent,
1542        WorkEdgeKind::Related => wg_dsl::WorkEdgeKind::Related,
1543        WorkEdgeKind::Supersedes => wg_dsl::WorkEdgeKind::Supersedes,
1544        WorkEdgeKind::DerivedFrom => wg_dsl::WorkEdgeKind::DerivedFrom,
1545    }
1546}
1547
1548fn edge_kind_key(kind: WorkEdgeKind) -> &'static str {
1549    match kind {
1550        WorkEdgeKind::Blocks => "blocks",
1551        WorkEdgeKind::Parent => "parent",
1552        WorkEdgeKind::Related => "related",
1553        WorkEdgeKind::Supersedes => "supersedes",
1554        WorkEdgeKind::DerivedFrom => "derived_from",
1555    }
1556}
1557
1558fn datetime_to_millis(dt: DateTime<Utc>) -> u64 {
1559    u64::try_from(dt.timestamp_millis()).unwrap_or(0)
1560}
1561
1562fn millis_to_datetime(ms: u64) -> Option<DateTime<Utc>> {
1563    DateTime::from_timestamp_millis(i64::try_from(ms).ok()?)
1564}
1565
1566fn item_event(
1567    item: &WorkItem,
1568    kind: WorkGraphEventKind,
1569    at: DateTime<Utc>,
1570) -> Result<WorkGraphEvent, WorkGraphError> {
1571    Ok(WorkGraphEvent::item(
1572        item.realm_id.clone(),
1573        item.namespace.clone(),
1574        item.id.clone(),
1575        kind,
1576        at,
1577        json!({ "item": item }),
1578    ))
1579}
1580
1581fn seconds_to_duration(seconds: u64) -> Duration {
1582    let seconds = i64::try_from(seconds).unwrap_or(i64::MAX);
1583    Duration::seconds(seconds)
1584}
1585
1586#[cfg(test)]
1587#[allow(
1588    clippy::expect_used,
1589    clippy::unwrap_used,
1590    clippy::panic,
1591    clippy::redundant_clone
1592)]
1593mod tests {
1594    use super::*;
1595    use crate::types::{
1596        AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, UpdateWorkItemRequest,
1597        WorkEvidenceKind, WorkEvidenceRef, WorkOwner, WorkOwnerKey,
1598    };
1599
1600    fn create(title: &str, now: DateTime<Utc>) -> WorkItem {
1601        create_with_policy(title, WorkCompletionPolicy::SelfAttest, now)
1602    }
1603
1604    fn create_with_policy(
1605        title: &str,
1606        completion_policy: WorkCompletionPolicy,
1607        now: DateTime<Utc>,
1608    ) -> WorkItem {
1609        WorkGraphMachine::create_item(
1610            CreateWorkItemRequest {
1611                realm_id: None,
1612                namespace: None,
1613                title: title.to_string(),
1614                description: None,
1615                priority: Default::default(),
1616                completion_policy,
1617                labels: BTreeSet::new(),
1618                due_at: None,
1619                not_before: None,
1620                snoozed_until: None,
1621                external_refs: Vec::new(),
1622                evidence_refs: Vec::new(),
1623                status: None,
1624            },
1625            "realm".to_string(),
1626            WorkNamespace::default(),
1627            now,
1628        )
1629        .expect("create")
1630        .0
1631    }
1632
1633    fn owner(id: &str) -> WorkOwner {
1634        WorkOwner::new(WorkOwnerKey::label(id).expect("owner key"))
1635    }
1636
1637    fn create_with_status(
1638        status: Option<WorkStatus>,
1639        now: DateTime<Utc>,
1640    ) -> Result<WorkItem, WorkGraphError> {
1641        WorkGraphMachine::create_item(
1642            CreateWorkItemRequest {
1643                realm_id: None,
1644                namespace: None,
1645                title: "status-create".to_string(),
1646                description: None,
1647                priority: Default::default(),
1648                completion_policy: WorkCompletionPolicy::SelfAttest,
1649                labels: BTreeSet::new(),
1650                due_at: None,
1651                not_before: None,
1652                snoozed_until: None,
1653                external_refs: Vec::new(),
1654                evidence_refs: Vec::new(),
1655                status,
1656            },
1657            "realm".to_string(),
1658            WorkNamespace::default(),
1659            now,
1660        )
1661        .map(|(item, _)| item)
1662    }
1663
1664    #[test]
1665    fn create_status_admission_is_decided_by_machine() {
1666        use wg_dsl::WorkCreateStatusAdmissionKind as Admission;
1667        // The machine owns the "only open or blocked" creation policy.
1668        assert_eq!(
1669            classify_create_status_admission(WorkStatus::Open).expect("classify open"),
1670            Admission::AdmittedOpen
1671        );
1672        assert_eq!(
1673            classify_create_status_admission(WorkStatus::Blocked).expect("classify blocked"),
1674            Admission::AdmittedBlocked
1675        );
1676        for status in [
1677            WorkStatus::InProgress,
1678            WorkStatus::Completed,
1679            WorkStatus::Cancelled,
1680            WorkStatus::Failed,
1681        ] {
1682            assert_eq!(
1683                classify_create_status_admission(status)
1684                    .unwrap_or_else(|error| panic!("classify {status:?}: {error:?}")),
1685                Admission::Denied,
1686                "requested status {status:?} must be denied as a creation state"
1687            );
1688        }
1689    }
1690
1691    #[test]
1692    fn create_item_admits_open_and_blocked_and_rejects_the_rest() {
1693        let now = Utc::now();
1694        assert_eq!(
1695            create_with_status(Some(WorkStatus::Open), now)
1696                .expect("open admitted")
1697                .status,
1698            WorkStatus::Open
1699        );
1700        assert_eq!(
1701            create_with_status(Some(WorkStatus::Blocked), now)
1702                .expect("blocked admitted")
1703                .status,
1704            WorkStatus::Blocked
1705        );
1706        // Default (None) resolves to Open and must be admitted.
1707        assert_eq!(
1708            create_with_status(None, now)
1709                .expect("default admitted")
1710                .status,
1711            WorkStatus::Open
1712        );
1713        for status in [
1714            WorkStatus::InProgress,
1715            WorkStatus::Completed,
1716            WorkStatus::Cancelled,
1717            WorkStatus::Failed,
1718        ] {
1719            let error = create_with_status(Some(status), now)
1720                .expect_err("non-open/blocked create status must be rejected");
1721            match error {
1722                WorkGraphError::InvalidTransition(message) => assert_eq!(
1723                    message, "new work items may only start open or blocked",
1724                    "rejection message preserved for {status:?}"
1725                ),
1726                other => panic!("expected InvalidTransition for {status:?}, got {other:?}"),
1727            }
1728        }
1729    }
1730
1731    #[test]
1732    fn public_confirmation_admission_is_decided_by_machine() {
1733        use wg_dsl::WorkPublicConfirmationAdmissionKind as Admission;
1734        // The machine owns the trust-scoped eligibility: only SelfAttest is
1735        // publicly confirmable; every other policy requires trusted host.
1736        assert_eq!(
1737            WorkGraphMachine::classify_public_confirmation_admission(
1738                &WorkCompletionPolicy::SelfAttest
1739            )
1740            .expect("self-attest admission"),
1741            Admission::Admitted
1742        );
1743        let owner_key = WorkOwnerKey::label("supervisor").expect("owner key");
1744        let denied = [
1745            WorkCompletionPolicy::HostConfirmed,
1746            WorkCompletionPolicy::PrincipalConfirmed,
1747            WorkCompletionPolicy::Supervisor {
1748                owner_key: owner_key.clone(),
1749            },
1750            WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
1751        ];
1752        for policy in denied {
1753            assert_eq!(
1754                WorkGraphMachine::classify_public_confirmation_admission(&policy)
1755                    .unwrap_or_else(|error| panic!("classify {policy:?}: {error:?}")),
1756                Admission::DeniedRequiresTrustedHost,
1757                "policy {policy:?} must require trusted host for public confirmation"
1758            );
1759        }
1760    }
1761
1762    #[test]
1763    fn create_completion_policy_admission_is_decided_by_machine() {
1764        use wg_dsl::WorkCreateCompletionPolicyAdmissionKind as Admission;
1765        // The machine owns the "non-goal work items must use self_attest"
1766        // creation policy.
1767        assert_eq!(
1768            WorkGraphMachine::classify_create_completion_policy_admission(
1769                &WorkCompletionPolicy::SelfAttest
1770            )
1771            .expect("self-attest admission"),
1772            Admission::Admitted
1773        );
1774        let owner_key = WorkOwnerKey::label("supervisor").expect("owner key");
1775        let denied = [
1776            WorkCompletionPolicy::HostConfirmed,
1777            WorkCompletionPolicy::PrincipalConfirmed,
1778            WorkCompletionPolicy::Supervisor { owner_key },
1779            WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
1780        ];
1781        for policy in denied {
1782            assert_eq!(
1783                WorkGraphMachine::classify_create_completion_policy_admission(&policy)
1784                    .unwrap_or_else(|error| panic!("classify {policy:?}: {error:?}")),
1785                Admission::DeniedNonSelfAttest,
1786                "policy {policy:?} must be denied at create for a non-goal work item"
1787            );
1788        }
1789    }
1790
1791    #[test]
1792    fn close_status_admission_is_decided_by_machine() {
1793        use wg_dsl::WorkCloseStatusAdmissionKind as Admission;
1794        // The machine owns the "close requires a terminal status" lifecycle
1795        // class fact.
1796        assert_eq!(
1797            classify_close_status_admission(WorkStatus::Completed).expect("classify completed"),
1798            Admission::AdmittedCompleted
1799        );
1800        assert_eq!(
1801            classify_close_status_admission(WorkStatus::Cancelled).expect("classify cancelled"),
1802            Admission::AdmittedCancelled
1803        );
1804        assert_eq!(
1805            classify_close_status_admission(WorkStatus::Failed).expect("classify failed"),
1806            Admission::AdmittedFailed
1807        );
1808        for status in [
1809            WorkStatus::Open,
1810            WorkStatus::InProgress,
1811            WorkStatus::Blocked,
1812        ] {
1813            assert_eq!(
1814                classify_close_status_admission(status)
1815                    .unwrap_or_else(|error| panic!("classify {status:?}: {error:?}")),
1816                Admission::DeniedNonTerminal,
1817                "requested close status {status:?} must be denied as a non-terminal target"
1818            );
1819        }
1820    }
1821
1822    #[test]
1823    fn close_item_rejects_non_terminal_status_with_preserved_message() {
1824        let now = Utc::now();
1825        for status in [
1826            WorkStatus::Open,
1827            WorkStatus::InProgress,
1828            WorkStatus::Blocked,
1829        ] {
1830            let item = create("close-target", now);
1831            let error = WorkGraphMachine::close_item(
1832                item.clone(),
1833                CloseWorkItemRequest {
1834                    id: item.id.clone(),
1835                    realm_id: None,
1836                    namespace: None,
1837                    expected_revision: item.revision,
1838                    status,
1839                },
1840                now,
1841            )
1842            .expect_err("non-terminal close status must be rejected");
1843            match error {
1844                WorkGraphError::InvalidTransition(message) => assert_eq!(
1845                    message, "close requires a terminal status",
1846                    "rejection message preserved for {status:?}"
1847                ),
1848                other => panic!("expected InvalidTransition for {status:?}, got {other:?}"),
1849            }
1850        }
1851    }
1852
1853    #[test]
1854    fn blocked_items_are_never_ready() {
1855        let now = Utc::now();
1856        let item = create("blocked", now);
1857        let (item, _) = WorkGraphMachine::block_item(item, 1, now).expect("block");
1858        assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
1859    }
1860
1861    #[test]
1862    fn future_due_items_are_not_ready() {
1863        let now = Utc::now();
1864        let item = create("future", now);
1865        let (item, _) = WorkGraphMachine::update_item(
1866            item,
1867            UpdateWorkItemRequest {
1868                id: WorkItemId::generated(),
1869                realm_id: None,
1870                namespace: None,
1871                expected_revision: 1,
1872                title: None,
1873                description: None,
1874                priority: None,
1875                completion_policy: None,
1876                labels: None,
1877                due_at: Some(now + Duration::hours(1)),
1878                not_before: None,
1879                snoozed_until: None,
1880                external_refs: Vec::new(),
1881            },
1882            now,
1883        )
1884        .expect("update due");
1885
1886        assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
1887    }
1888
1889    #[test]
1890    fn readiness_is_decided_by_machine_matching_claim_guards() {
1891        let now = Utc::now();
1892
1893        // A fresh open item with no blockers and no time windows is ready —
1894        // exactly what the `ClaimOpen` guard accepts.
1895        let open = create("ready-open", now);
1896        assert_eq!(open.status, WorkStatus::Open);
1897        assert!(
1898            WorkGraphMachine::classify_readiness(&open, now).expect("classify open"),
1899            "an unblocked, due-eligible open item must be machine-classified ready"
1900        );
1901        assert!(WorkGraphMachine::is_ready(&open, now));
1902
1903        // A future-due open item is not ready (mirrors `due_eligible` guard).
1904        let (future, _) = WorkGraphMachine::update_item(
1905            open,
1906            UpdateWorkItemRequest {
1907                id: WorkItemId::generated(),
1908                realm_id: None,
1909                namespace: None,
1910                expected_revision: 1,
1911                title: None,
1912                description: None,
1913                priority: None,
1914                completion_policy: None,
1915                labels: None,
1916                due_at: Some(now + Duration::hours(1)),
1917                not_before: None,
1918                snoozed_until: None,
1919                external_refs: Vec::new(),
1920            },
1921            now,
1922        )
1923        .expect("update future due");
1924        assert!(
1925            !WorkGraphMachine::classify_readiness(&future, now).expect("classify future"),
1926            "a future-due open item must be machine-classified not ready"
1927        );
1928        assert!(!WorkGraphMachine::is_ready(&future, now));
1929
1930        // A claimed (InProgress) item with a live lease is NOT ready; once the
1931        // lease expires it becomes ready (mirrors `ClaimExpiredInProgress`).
1932        let claimable = create("reclaim", now);
1933        let (claimed, _) = WorkGraphMachine::claim_item(
1934            claimable,
1935            ClaimWorkItemRequest {
1936                id: WorkItemId::generated(),
1937                realm_id: None,
1938                namespace: None,
1939                expected_revision: 1,
1940                owner: owner("worker"),
1941                lease_seconds: Some(30),
1942                lease_expires_at: None,
1943            },
1944            now,
1945        )
1946        .expect("claim");
1947        assert_eq!(claimed.status, WorkStatus::InProgress);
1948        assert!(
1949            !WorkGraphMachine::classify_readiness(&claimed, now).expect("classify live lease"),
1950            "an in-progress item with a live lease must not be machine-classified ready"
1951        );
1952        let after_lease = now + Duration::seconds(31);
1953        assert!(
1954            WorkGraphMachine::classify_readiness(&claimed, after_lease)
1955                .expect("classify expired lease"),
1956            "an in-progress item with an expired lease must be machine-classified ready"
1957        );
1958        assert!(WorkGraphMachine::is_ready(&claimed, after_lease));
1959    }
1960
1961    #[test]
1962    fn terminal_items_cannot_be_claimed() {
1963        let now = Utc::now();
1964        let item = create("done", now);
1965        let (item, _) = WorkGraphMachine::close_item(
1966            item,
1967            CloseWorkItemRequest {
1968                id: WorkItemId::generated(),
1969                realm_id: None,
1970                namespace: None,
1971                expected_revision: 1,
1972                status: WorkStatus::Completed,
1973            },
1974            now,
1975        )
1976        .expect("close");
1977        let error = WorkGraphMachine::claim_item(
1978            item,
1979            ClaimWorkItemRequest {
1980                id: WorkItemId::generated(),
1981                realm_id: None,
1982                namespace: None,
1983                expected_revision: 2,
1984                owner: owner("worker"),
1985                lease_seconds: None,
1986                lease_expires_at: None,
1987            },
1988            now,
1989        )
1990        .expect_err("terminal claim should fail");
1991        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
1992    }
1993
1994    #[test]
1995    fn completed_close_is_completion_policy_gated_by_machine() {
1996        let now = Utc::now();
1997        let item = create_with_policy(
1998            "needs host confirmation",
1999            WorkCompletionPolicy::HostConfirmed,
2000            now,
2001        );
2002
2003        // The machine's completion_policy_satisfied guard refuses the
2004        // CloseCompleted transition while no host confirmation evidence has
2005        // been recorded.
2006        let error = WorkGraphMachine::close_item(
2007            item.clone(),
2008            CloseWorkItemRequest {
2009                id: item.id.clone(),
2010                realm_id: None,
2011                namespace: None,
2012                expected_revision: 1,
2013                status: WorkStatus::Completed,
2014            },
2015            now,
2016        )
2017        .expect_err("machine must reject completed close without policy evidence");
2018        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2019
2020        // Once typed host-confirmation evidence is recorded, the machine's
2021        // owned host_confirmation_count satisfies the policy and the close
2022        // transition is admitted.
2023        let (item, _) = WorkGraphMachine::add_evidence(
2024            item,
2025            AddEvidenceRequest {
2026                id: WorkItemId::generated(),
2027                realm_id: None,
2028                namespace: None,
2029                expected_revision: 1,
2030                evidence: WorkEvidenceRef {
2031                    kind: "host_confirmation".to_string(),
2032                    id: "acceptance".to_string(),
2033                    label: None,
2034                    summary: None,
2035                    confirmation_kind: Some(WorkEvidenceKind::HostConfirmation),
2036                    confirming_owner_key: None,
2037                },
2038            },
2039            now,
2040        )
2041        .expect("record host confirmation evidence");
2042
2043        let (closed, _) = WorkGraphMachine::close_item(
2044            item.clone(),
2045            CloseWorkItemRequest {
2046                id: item.id,
2047                realm_id: None,
2048                namespace: None,
2049                expected_revision: item.revision,
2050                status: WorkStatus::Completed,
2051            },
2052            now,
2053        )
2054        .expect("machine admits close once host confirmation is satisfied");
2055        assert_eq!(closed.status, WorkStatus::Completed);
2056    }
2057
2058    #[test]
2059    fn reviewer_quorum_close_requires_distinct_reviewers() {
2060        let now = Utc::now();
2061        let item = create_with_policy(
2062            "needs two reviewers",
2063            WorkCompletionPolicy::ReviewerQuorum { threshold: 2 },
2064            now,
2065        );
2066
2067        let reviewer_evidence = |reviewer: &str| WorkEvidenceRef {
2068            kind: "reviewer_confirmation".to_string(),
2069            id: reviewer.to_string(),
2070            label: Some(reviewer.to_string()),
2071            summary: None,
2072            confirmation_kind: Some(WorkEvidenceKind::ReviewerConfirmation),
2073            confirming_owner_key: Some(
2074                WorkOwnerKey::principal(reviewer).expect("reviewer principal"),
2075            ),
2076        };
2077
2078        // One distinct reviewer is short of the quorum: machine refuses close.
2079        let (item, _) = WorkGraphMachine::add_evidence(
2080            item,
2081            AddEvidenceRequest {
2082                id: WorkItemId::generated(),
2083                realm_id: None,
2084                namespace: None,
2085                expected_revision: 1,
2086                evidence: reviewer_evidence("alice"),
2087            },
2088            now,
2089        )
2090        .expect("record first reviewer");
2091
2092        let error = WorkGraphMachine::close_item(
2093            item.clone(),
2094            CloseWorkItemRequest {
2095                id: item.id.clone(),
2096                realm_id: None,
2097                namespace: None,
2098                expected_revision: item.revision,
2099                status: WorkStatus::Completed,
2100            },
2101            now,
2102        )
2103        .expect_err("single reviewer must not satisfy a quorum of two");
2104        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2105
2106        // A duplicate confirmation from the same reviewer does not advance the
2107        // distinct-reviewer count; the machine still refuses.
2108        let expected_revision = item.revision;
2109        let (item, _) = WorkGraphMachine::add_evidence(
2110            item,
2111            AddEvidenceRequest {
2112                id: WorkItemId::generated(),
2113                realm_id: None,
2114                namespace: None,
2115                expected_revision,
2116                evidence: reviewer_evidence("alice"),
2117            },
2118            now,
2119        )
2120        .expect("record duplicate reviewer");
2121
2122        let error = WorkGraphMachine::close_item(
2123            item.clone(),
2124            CloseWorkItemRequest {
2125                id: item.id.clone(),
2126                realm_id: None,
2127                namespace: None,
2128                expected_revision: item.revision,
2129                status: WorkStatus::Completed,
2130            },
2131            now,
2132        )
2133        .expect_err("duplicate reviewer must not satisfy a quorum of two");
2134        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2135
2136        // A second distinct reviewer reaches the quorum; machine admits close.
2137        let expected_revision = item.revision;
2138        let (item, _) = WorkGraphMachine::add_evidence(
2139            item,
2140            AddEvidenceRequest {
2141                id: WorkItemId::generated(),
2142                realm_id: None,
2143                namespace: None,
2144                expected_revision,
2145                evidence: reviewer_evidence("bob"),
2146            },
2147            now,
2148        )
2149        .expect("record second reviewer");
2150
2151        let (closed, _) = WorkGraphMachine::close_item(
2152            item.clone(),
2153            CloseWorkItemRequest {
2154                id: item.id,
2155                realm_id: None,
2156                namespace: None,
2157                expected_revision: item.revision,
2158                status: WorkStatus::Completed,
2159            },
2160            now,
2161        )
2162        .expect("two distinct reviewers satisfy the quorum");
2163        assert_eq!(closed.status, WorkStatus::Completed);
2164    }
2165
2166    #[test]
2167    fn stale_revisions_fail() {
2168        let now = Utc::now();
2169        let item = create("stale", now);
2170        let error =
2171            WorkGraphMachine::block_item(item, 7, now).expect_err("stale transition should fail");
2172        assert!(matches!(error, WorkGraphError::StaleRevision { .. }));
2173    }
2174
2175    #[test]
2176    fn public_error_class_is_machine_owned() {
2177        use crate::types::{WorkAttentionBindingId, WorkNamespace};
2178
2179        let cases: &[(WorkGraphError, WorkGraphPublicErrorClass)] = &[
2180            (
2181                WorkGraphError::not_found(
2182                    "realm".to_string(),
2183                    WorkNamespace::default(),
2184                    WorkItemId::generated(),
2185                ),
2186                WorkGraphPublicErrorClass::NotFound,
2187            ),
2188            (
2189                WorkGraphError::attention_not_found(
2190                    "realm".to_string(),
2191                    WorkNamespace::default(),
2192                    WorkAttentionBindingId::generated(),
2193                ),
2194                WorkGraphPublicErrorClass::NotFound,
2195            ),
2196            (
2197                WorkGraphError::StaleRevision {
2198                    id: WorkItemId::generated(),
2199                    expected: 1,
2200                    actual: 2,
2201                },
2202                WorkGraphPublicErrorClass::Conflict,
2203            ),
2204            (
2205                WorkGraphError::Conflict("conflict".to_string()),
2206                WorkGraphPublicErrorClass::Conflict,
2207            ),
2208            (
2209                WorkGraphError::InvalidTransition("bad".to_string()),
2210                WorkGraphPublicErrorClass::InvalidTransition,
2211            ),
2212            (
2213                WorkGraphError::InvalidInput("bad".to_string()),
2214                WorkGraphPublicErrorClass::InvalidArguments,
2215            ),
2216            (
2217                WorkGraphError::InvalidTimestampMillis {
2218                    field: "due_at",
2219                    millis: -1,
2220                },
2221                WorkGraphPublicErrorClass::InvalidArguments,
2222            ),
2223            (
2224                WorkGraphError::Store("store".to_string()),
2225                WorkGraphPublicErrorClass::StoreError,
2226            ),
2227            (
2228                WorkGraphError::UnsupportedBackend("backend".to_string()),
2229                WorkGraphPublicErrorClass::CapabilityUnavailable,
2230            ),
2231        ];
2232
2233        for (error, expected) in cases {
2234            let class = WorkGraphMachine::public_error_class(error)
2235                .expect("machine must classify every WorkGraphError variant");
2236            assert_eq!(class, *expected, "unexpected public class for {error:?}");
2237        }
2238    }
2239
2240    #[test]
2241    fn only_one_active_claim_can_exist() {
2242        let now = Utc::now();
2243        let item = create("claim", now);
2244        let (claimed, _) = WorkGraphMachine::claim_item(
2245            item,
2246            ClaimWorkItemRequest {
2247                id: WorkItemId::generated(),
2248                realm_id: None,
2249                namespace: None,
2250                expected_revision: 1,
2251                owner: owner("worker"),
2252                lease_seconds: Some(60),
2253                lease_expires_at: None,
2254            },
2255            now,
2256        )
2257        .expect("claim");
2258        let error = WorkGraphMachine::claim_item(
2259            claimed,
2260            ClaimWorkItemRequest {
2261                id: WorkItemId::generated(),
2262                realm_id: None,
2263                namespace: None,
2264                expected_revision: 2,
2265                owner: owner("worker-2"),
2266                lease_seconds: Some(60),
2267                lease_expires_at: None,
2268            },
2269            now,
2270        )
2271        .expect_err("double claim should fail");
2272        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
2273    }
2274
2275    #[test]
2276    fn validate_item_projection_only_rejects_never_derives() {
2277        let now = Utc::now();
2278
2279        // A freshly machine-built item agrees with its machine_state authority.
2280        let clean = create("projection-guard", now);
2281        WorkGraphMachine::validate_item_projection(&clean)
2282            .expect("a machine-built projection must agree with its machine state");
2283
2284        // Tampering the projected status away from the machine-owned lifecycle
2285        // phase is rejected — the guard does not silently re-derive `status`
2286        // from the machine state, it fails closed on the drift.
2287        let mut status_drift = clean.clone();
2288        status_drift.status = WorkStatus::Completed;
2289        let err = WorkGraphMachine::validate_item_projection(&status_drift)
2290            .expect_err("status projection drift must be rejected, never repaired");
2291        assert!(
2292            matches!(&err, WorkGraphError::Store(message) if message.contains("status projection")),
2293            "rejection must cite the status projection drift, got: {err:?}"
2294        );
2295
2296        // Likewise for the revision projection: the machine_state.revision is
2297        // canonical, and a divergent projected revision is rejected rather than
2298        // synthesized back into agreement.
2299        let mut revision_drift = clean.clone();
2300        revision_drift.revision = revision_drift.revision.wrapping_add(1);
2301        let err = WorkGraphMachine::validate_item_projection(&revision_drift)
2302            .expect_err("revision projection drift must be rejected, never repaired");
2303        assert!(
2304            matches!(&err, WorkGraphError::Store(message) if message.contains("revision projection")),
2305            "rejection must cite the revision projection drift, got: {err:?}"
2306        );
2307    }
2308}