Skip to main content

meerkat_workgraph/
machine.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::{DateTime, Duration, Utc};
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machines::workgraph_lifecycle as wg_dsl;
8use crate::types::{
9    AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, CreateWorkItemRequest,
10    ReleaseWorkItemRequest, UpdateWorkItemRequest, WorkClaim, WorkEdge, WorkEdgeKind,
11    WorkGraphEvent, WorkGraphEventKind, WorkGraphMachineState, WorkItem, WorkItemId, WorkNamespace,
12    WorkStatus,
13};
14
15#[derive(Debug, Default, Clone, Copy)]
16pub struct WorkGraphMachine;
17
18impl WorkGraphMachine {
19    pub fn validate_item_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
20        validate_item_machine_projection(item)
21    }
22
23    pub fn create_item(
24        request: CreateWorkItemRequest,
25        realm_id: String,
26        namespace: WorkNamespace,
27        now: DateTime<Utc>,
28    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
29        let title = validate_title(request.title)?;
30        let status = request.status.unwrap_or_default();
31        if matches!(
32            status,
33            WorkStatus::InProgress
34                | WorkStatus::Completed
35                | WorkStatus::Cancelled
36                | WorkStatus::Failed
37        ) {
38            return Err(WorkGraphError::InvalidTransition(
39                "new work items may only start open or blocked".to_string(),
40            ));
41        }
42        let input = match status {
43            WorkStatus::Open => wg_dsl::WorkGraphLifecycleInput::CreateOpen {
44                due_at_utc_ms: request.due_at.map(datetime_to_millis),
45                not_before_utc_ms: request.not_before.map(datetime_to_millis),
46                snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
47                unresolved_blocker_count: 0,
48            },
49            WorkStatus::Blocked => wg_dsl::WorkGraphLifecycleInput::CreateBlocked {
50                due_at_utc_ms: request.due_at.map(datetime_to_millis),
51                not_before_utc_ms: request.not_before.map(datetime_to_millis),
52                snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
53                unresolved_blocker_count: 0,
54            },
55            WorkStatus::InProgress
56            | WorkStatus::Completed
57            | WorkStatus::Cancelled
58            | WorkStatus::Failed => unreachable!("invalid create status rejected above"),
59        };
60        let dsl_state = apply_new_item_dsl(input)?;
61        let mut item = WorkItem {
62            id: WorkItemId::generated(),
63            realm_id,
64            namespace,
65            title,
66            description: request.description,
67            status: work_status_from_dsl(dsl_state.lifecycle_phase)?,
68            priority: request.priority,
69            labels: normalize_labels(request.labels)?,
70            owner: None,
71            claim: None,
72            machine_state: dsl_state.clone(),
73            revision: dsl_state.revision,
74            due_at: dsl_state.due_at_utc_ms.and_then(millis_to_datetime),
75            not_before: dsl_state.not_before_utc_ms.and_then(millis_to_datetime),
76            snoozed_until: dsl_state.snoozed_until_utc_ms.and_then(millis_to_datetime),
77            created_at: now,
78            updated_at: now,
79            terminal_at: dsl_state.terminal_at_utc_ms.and_then(millis_to_datetime),
80            external_refs: request.external_refs,
81            evidence_refs: request.evidence_refs,
82        };
83        sync_item_from_machine_state(&mut item)?;
84        let event = item_event(&item, WorkGraphEventKind::Created, now)?;
85        Ok((item, event))
86    }
87
88    pub fn update_item(
89        mut item: WorkItem,
90        request: UpdateWorkItemRequest,
91        now: DateTime<Utc>,
92    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
93        let due_at = request.due_at.or(item.due_at);
94        let not_before = request.not_before.or(item.not_before);
95        let snoozed_until = request.snoozed_until.or(item.snoozed_until);
96        let dsl_state = apply_item_dsl(
97            &item,
98            item.machine_state.unresolved_blocker_count,
99            wg_dsl::WorkGraphLifecycleInput::Update {
100                expected_revision: request.expected_revision,
101                due_at_utc_ms: due_at.map(datetime_to_millis),
102                not_before_utc_ms: not_before.map(datetime_to_millis),
103                snoozed_until_utc_ms: snoozed_until.map(datetime_to_millis),
104                unresolved_blocker_count: item.machine_state.unresolved_blocker_count,
105            },
106            Some(request.expected_revision),
107        )?;
108
109        if let Some(title) = request.title {
110            item.title = validate_title(title)?;
111        }
112        if let Some(description) = request.description {
113            item.description = Some(description);
114        }
115        if let Some(priority) = request.priority {
116            item.priority = priority;
117        }
118        if let Some(labels) = request.labels {
119            item.labels = normalize_labels(labels)?;
120        }
121        item.machine_state = dsl_state;
122        sync_item_from_machine_state(&mut item)?;
123        if !request.external_refs.is_empty() {
124            item.external_refs = request.external_refs;
125        }
126        item.updated_at = now;
127        let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
128        Ok((item, event))
129    }
130
131    pub fn claim_item(
132        item: WorkItem,
133        request: ClaimWorkItemRequest,
134        now: DateTime<Utc>,
135    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
136        Self::claim_ready_item(item, request, now)
137    }
138
139    pub fn claim_ready_item(
140        item: WorkItem,
141        request: ClaimWorkItemRequest,
142        now: DateTime<Utc>,
143    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
144        Self::claim_item_with_unresolved_blockers(
145            item.clone(),
146            item.machine_state.unresolved_blocker_count,
147            request,
148            now,
149        )
150    }
151
152    pub fn refresh_eligibility(
153        mut item: WorkItem,
154        unresolved_blocker_count: u64,
155        now: DateTime<Utc>,
156    ) -> Result<Option<(WorkItem, WorkGraphEvent)>, WorkGraphError> {
157        if item.machine_state.unresolved_blocker_count == unresolved_blocker_count {
158            return Ok(None);
159        }
160        let dsl_state = apply_item_dsl(
161            &item,
162            unresolved_blocker_count,
163            wg_dsl::WorkGraphLifecycleInput::RefreshEligibility {
164                unresolved_blocker_count,
165            },
166            None,
167        )?;
168        item.machine_state = dsl_state;
169        sync_item_from_machine_state(&mut item)?;
170        item.updated_at = now;
171        let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
172        Ok(Some((item, event)))
173    }
174
175    pub(crate) fn claim_item_with_unresolved_blockers(
176        mut item: WorkItem,
177        unresolved_blocker_count: u64,
178        request: ClaimWorkItemRequest,
179        now: DateTime<Utc>,
180    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
181        let lease_expires_at = request.lease_expires_at.or_else(|| {
182            request
183                .lease_seconds
184                .map(|seconds| now + seconds_to_duration(seconds))
185        });
186        let owner_key = work_owner_key(&request.owner)?;
187        let dsl_state = apply_item_dsl(
188            &item,
189            unresolved_blocker_count,
190            wg_dsl::WorkGraphLifecycleInput::Claim {
191                expected_revision: request.expected_revision,
192                owner_key,
193                now_utc_ms: datetime_to_millis(now),
194                lease_expires_at_utc_ms: lease_expires_at.map(datetime_to_millis),
195            },
196            Some(request.expected_revision),
197        )?;
198        item.owner = Some(request.owner.clone());
199        item.claim = Some(WorkClaim {
200            owner: request.owner,
201            claimed_at: now,
202            lease_expires_at,
203        });
204        item.machine_state = dsl_state;
205        sync_item_from_machine_state(&mut item)?;
206        item.updated_at = now;
207        let event = item_event(&item, WorkGraphEventKind::Claimed, now)?;
208        Ok((item, event))
209    }
210
211    pub fn release_item(
212        mut item: WorkItem,
213        request: ReleaseWorkItemRequest,
214        now: DateTime<Utc>,
215    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
216        let dsl_state = apply_item_dsl(
217            &item,
218            item.machine_state.unresolved_blocker_count,
219            wg_dsl::WorkGraphLifecycleInput::Release {
220                expected_revision: request.expected_revision,
221            },
222            Some(request.expected_revision),
223        )?;
224        item.claim = None;
225        item.owner = None;
226        item.machine_state = dsl_state;
227        sync_item_from_machine_state(&mut item)?;
228        item.updated_at = now;
229        let event = item_event(&item, WorkGraphEventKind::Released, now)?;
230        Ok((item, event))
231    }
232
233    pub fn block_item(
234        mut item: WorkItem,
235        expected_revision: u64,
236        now: DateTime<Utc>,
237    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
238        let dsl_state = apply_item_dsl(
239            &item,
240            item.machine_state.unresolved_blocker_count,
241            wg_dsl::WorkGraphLifecycleInput::Block { expected_revision },
242            Some(expected_revision),
243        )?;
244        item.claim = None;
245        item.owner = None;
246        item.machine_state = dsl_state;
247        sync_item_from_machine_state(&mut item)?;
248        item.updated_at = now;
249        let event = item_event(&item, WorkGraphEventKind::Blocked, now)?;
250        Ok((item, event))
251    }
252
253    pub fn close_item(
254        mut item: WorkItem,
255        request: CloseWorkItemRequest,
256        now: DateTime<Utc>,
257    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
258        let dsl_input = match request.status {
259            WorkStatus::Completed => wg_dsl::WorkGraphLifecycleInput::CloseCompleted {
260                expected_revision: request.expected_revision,
261                at_utc_ms: datetime_to_millis(now),
262            },
263            WorkStatus::Cancelled => wg_dsl::WorkGraphLifecycleInput::CloseCancelled {
264                expected_revision: request.expected_revision,
265                at_utc_ms: datetime_to_millis(now),
266            },
267            WorkStatus::Failed => wg_dsl::WorkGraphLifecycleInput::CloseFailed {
268                expected_revision: request.expected_revision,
269                at_utc_ms: datetime_to_millis(now),
270            },
271            WorkStatus::Open | WorkStatus::InProgress | WorkStatus::Blocked => {
272                return Err(WorkGraphError::InvalidTransition(
273                    "close requires a terminal status".to_string(),
274                ));
275            }
276        };
277        let dsl_state = apply_item_dsl(
278            &item,
279            item.machine_state.unresolved_blocker_count,
280            dsl_input,
281            Some(request.expected_revision),
282        )?;
283        item.claim = None;
284        item.owner = None;
285        item.machine_state = dsl_state;
286        sync_item_from_machine_state(&mut item)?;
287        item.updated_at = now;
288        let event = item_event(&item, WorkGraphEventKind::Closed, now)?;
289        Ok((item, event))
290    }
291
292    pub fn add_evidence(
293        mut item: WorkItem,
294        request: AddEvidenceRequest,
295        now: DateTime<Utc>,
296    ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
297        let dsl_state = apply_item_dsl(
298            &item,
299            item.machine_state.unresolved_blocker_count,
300            wg_dsl::WorkGraphLifecycleInput::AddEvidence {
301                expected_revision: request.expected_revision,
302            },
303            Some(request.expected_revision),
304        )?;
305        item.evidence_refs.push(request.evidence);
306        item.machine_state = dsl_state;
307        sync_item_from_machine_state(&mut item)?;
308        item.updated_at = now;
309        let event = item_event(&item, WorkGraphEventKind::EvidenceAdded, now)?;
310        Ok((item, event))
311    }
312
313    pub fn is_ready(item: &WorkItem, now: DateTime<Utc>) -> bool {
314        let owner_key = wg_dsl::WorkOwnerKey {
315            kind: wg_dsl::WorkOwnerKind::Label,
316            id: "__ready_probe__".to_string(),
317        };
318        apply_item_dsl(
319            item,
320            item.machine_state.unresolved_blocker_count,
321            wg_dsl::WorkGraphLifecycleInput::Claim {
322                expected_revision: item.revision,
323                owner_key,
324                now_utc_ms: datetime_to_millis(now),
325                lease_expires_at_utc_ms: None,
326            },
327            Some(item.revision),
328        )
329        .is_ok()
330    }
331
332    pub fn ready_items(items: Vec<WorkItem>, now: DateTime<Utc>) -> Vec<WorkItem> {
333        items
334            .into_iter()
335            .filter(|item| Self::is_ready(item, now))
336            .collect()
337    }
338
339    pub fn validate_link(
340        edge: &WorkEdge,
341        existing_items: &[WorkItem],
342        existing_edges: &[WorkEdge],
343    ) -> Result<(), WorkGraphError> {
344        let topology_state = topology_state(existing_items, existing_edges);
345        apply_link_validation_dsl(
346            topology_state,
347            wg_dsl::WorkGraphLifecycleInput::ValidateLink {
348                kind: dsl_edge_kind(edge.kind),
349                from_item_key: work_item_key(&edge.from_id),
350                to_item_key: work_item_key(&edge.to_id),
351                edge_key: work_edge_key(edge.kind, &edge.from_id, &edge.to_id),
352                reverse_path_key: dependency_path_key(edge.kind, &edge.to_id, &edge.from_id),
353            },
354        )?;
355        Ok(())
356    }
357}
358
359fn validate_title(title: String) -> Result<String, WorkGraphError> {
360    let title = title.trim();
361    if title.is_empty() {
362        return Err(WorkGraphError::InvalidInput(
363            "work item title must not be empty".to_string(),
364        ));
365    }
366    Ok(title.to_string())
367}
368
369fn normalize_labels(labels: BTreeSet<String>) -> Result<BTreeSet<String>, WorkGraphError> {
370    let mut normalized = BTreeSet::new();
371    for label in labels {
372        let label = label.trim();
373        if label.is_empty() {
374            return Err(WorkGraphError::InvalidInput(
375                "work item labels must not be empty".to_string(),
376            ));
377        }
378        normalized.insert(label.to_string());
379    }
380    Ok(normalized)
381}
382
383fn apply_new_item_dsl(
384    input: wg_dsl::WorkGraphLifecycleInput,
385) -> Result<wg_dsl::WorkGraphLifecycleMachineState, WorkGraphError> {
386    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
387    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
388        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
389    Ok(dsl_auth.state)
390}
391
392fn apply_link_validation_dsl(
393    state: WorkGraphMachineState,
394    input: wg_dsl::WorkGraphLifecycleInput,
395) -> Result<(), WorkGraphError> {
396    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::from_state(state);
397    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
398        .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
399    Ok(())
400}
401
402fn apply_item_dsl(
403    item: &WorkItem,
404    unresolved_blocker_count: u64,
405    input: wg_dsl::WorkGraphLifecycleInput,
406    expected_revision: Option<u64>,
407) -> Result<WorkGraphMachineState, WorkGraphError> {
408    validate_item_machine_projection(item)?;
409    let mut state = item.machine_state.clone();
410    state.unresolved_blocker_count = unresolved_blocker_count;
411    let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::from_state(state);
412    wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(|error| {
413        if let Some(expected) = expected_revision
414            && item.revision != expected
415        {
416            return WorkGraphError::StaleRevision {
417                id: item.id.clone(),
418                expected,
419                actual: item.revision,
420            };
421        }
422        WorkGraphError::InvalidTransition(format!("{error:?}"))
423    })?;
424    Ok(dsl_auth.state)
425}
426
427fn work_status_from_dsl(status: wg_dsl::WorkLifecycleState) -> Result<WorkStatus, WorkGraphError> {
428    match status {
429        wg_dsl::WorkLifecycleState::Open => Ok(WorkStatus::Open),
430        wg_dsl::WorkLifecycleState::InProgress => Ok(WorkStatus::InProgress),
431        wg_dsl::WorkLifecycleState::Blocked => Ok(WorkStatus::Blocked),
432        wg_dsl::WorkLifecycleState::Completed => Ok(WorkStatus::Completed),
433        wg_dsl::WorkLifecycleState::Cancelled => Ok(WorkStatus::Cancelled),
434        wg_dsl::WorkLifecycleState::Failed => Ok(WorkStatus::Failed),
435        wg_dsl::WorkLifecycleState::Absent => Err(WorkGraphError::InvalidTransition(
436            "work item lifecycle state is absent".to_string(),
437        )),
438    }
439}
440
441fn sync_item_from_machine_state(item: &mut WorkItem) -> Result<(), WorkGraphError> {
442    item.status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
443    item.revision = item.machine_state.revision;
444    item.due_at = item
445        .machine_state
446        .due_at_utc_ms
447        .and_then(millis_to_datetime);
448    item.not_before = item
449        .machine_state
450        .not_before_utc_ms
451        .and_then(millis_to_datetime);
452    item.snoozed_until = item
453        .machine_state
454        .snoozed_until_utc_ms
455        .and_then(millis_to_datetime);
456    item.terminal_at = item
457        .machine_state
458        .terminal_at_utc_ms
459        .and_then(millis_to_datetime);
460    Ok(())
461}
462
463fn validate_item_machine_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
464    let status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
465    if item.status != status {
466        return Err(WorkGraphError::Store(format!(
467            "work item {} status projection {:?} does not match machine state {:?}",
468            item.id, item.status, status
469        )));
470    }
471    if item.revision != item.machine_state.revision {
472        return Err(WorkGraphError::Store(format!(
473            "work item {} revision projection {} does not match machine state {}",
474            item.id, item.revision, item.machine_state.revision
475        )));
476    }
477    if item.due_at.map(datetime_to_millis) != item.machine_state.due_at_utc_ms {
478        return Err(WorkGraphError::Store(format!(
479            "work item {} due_at projection does not match machine state",
480            item.id
481        )));
482    }
483    if item.not_before.map(datetime_to_millis) != item.machine_state.not_before_utc_ms {
484        return Err(WorkGraphError::Store(format!(
485            "work item {} not_before projection does not match machine state",
486            item.id
487        )));
488    }
489    if item.snoozed_until.map(datetime_to_millis) != item.machine_state.snoozed_until_utc_ms {
490        return Err(WorkGraphError::Store(format!(
491            "work item {} snoozed_until projection does not match machine state",
492            item.id
493        )));
494    }
495    if item.terminal_at.map(datetime_to_millis) != item.machine_state.terminal_at_utc_ms {
496        return Err(WorkGraphError::Store(format!(
497            "work item {} terminal_at projection does not match machine state",
498            item.id
499        )));
500    }
501    if let Some(claim) = &item.claim {
502        let claim_owner_key = work_owner_key(&claim.owner)?;
503        if item.machine_state.claim_owner_key.as_ref() != Some(&claim_owner_key) {
504            return Err(WorkGraphError::Store(format!(
505                "work item {} claim owner projection does not match machine state",
506                item.id
507            )));
508        }
509        if item.machine_state.claimed_at_utc_ms != Some(datetime_to_millis(claim.claimed_at)) {
510            return Err(WorkGraphError::Store(format!(
511                "work item {} claim time projection does not match machine state",
512                item.id
513            )));
514        }
515        if item.machine_state.lease_expires_at_utc_ms
516            != claim.lease_expires_at.map(datetime_to_millis)
517        {
518            return Err(WorkGraphError::Store(format!(
519                "work item {} claim lease projection does not match machine state",
520                item.id
521            )));
522        }
523    } else if item.machine_state.claim_owner_key.is_some()
524        || item.machine_state.claimed_at_utc_ms.is_some()
525        || item.machine_state.lease_expires_at_utc_ms.is_some()
526    {
527        return Err(WorkGraphError::Store(format!(
528            "work item {} machine state has a claim without a claim projection",
529            item.id
530        )));
531    }
532    Ok(())
533}
534
535fn work_owner_key(owner: &crate::types::WorkOwner) -> Result<wg_dsl::WorkOwnerKey, WorkGraphError> {
536    let kind = match owner.key.kind {
537        crate::types::WorkOwnerKind::Principal => wg_dsl::WorkOwnerKind::Principal,
538        crate::types::WorkOwnerKind::Agent => wg_dsl::WorkOwnerKind::Agent,
539        crate::types::WorkOwnerKind::Session => wg_dsl::WorkOwnerKind::Session,
540        crate::types::WorkOwnerKind::Mob => wg_dsl::WorkOwnerKind::Mob,
541        crate::types::WorkOwnerKind::Label => wg_dsl::WorkOwnerKind::Label,
542    };
543    Ok(wg_dsl::WorkOwnerKey {
544        kind,
545        id: owner.key.id.clone(),
546    })
547}
548
549fn topology_state(
550    existing_items: &[WorkItem],
551    existing_edges: &[WorkEdge],
552) -> WorkGraphMachineState {
553    WorkGraphMachineState {
554        topology_item_keys: existing_items
555            .iter()
556            .map(|item| work_item_key(&item.id))
557            .collect(),
558        topology_edge_keys: existing_edges
559            .iter()
560            .map(|edge| work_edge_key(edge.kind, &edge.from_id, &edge.to_id))
561            .collect(),
562        blocks_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Blocks),
563        parent_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Parent),
564        ..Default::default()
565    }
566}
567
568fn dependency_reachability(
569    edges: &[WorkEdge],
570    kind: WorkEdgeKind,
571) -> BTreeSet<wg_dsl::WorkDependencyPathKey> {
572    let mut adjacency = BTreeMap::<WorkItemId, BTreeSet<WorkItemId>>::new();
573    for edge in edges.iter().filter(|edge| edge.kind == kind) {
574        adjacency
575            .entry(edge.from_id.clone())
576            .or_default()
577            .insert(edge.to_id.clone());
578    }
579
580    let mut reachability = BTreeSet::new();
581    for start in adjacency.keys() {
582        let mut stack = adjacency
583            .get(start)
584            .into_iter()
585            .flat_map(|targets| targets.iter().cloned())
586            .collect::<Vec<_>>();
587        let mut seen = BTreeSet::new();
588        while let Some(current) = stack.pop() {
589            if !seen.insert(current.clone()) {
590                continue;
591            }
592            reachability.insert(dependency_path_key(kind, start, &current));
593            if let Some(targets) = adjacency.get(&current) {
594                stack.extend(targets.iter().cloned());
595            }
596        }
597    }
598    reachability
599}
600
601fn work_item_key(id: &WorkItemId) -> wg_dsl::WorkItemKey {
602    wg_dsl::WorkItemKey(id.as_str().to_string())
603}
604
605fn work_edge_key(
606    kind: WorkEdgeKind,
607    from_id: &WorkItemId,
608    to_id: &WorkItemId,
609) -> wg_dsl::WorkEdgeKey {
610    wg_dsl::WorkEdgeKey(format!(
611        "{}:{}:{}",
612        edge_kind_key(kind),
613        from_id.as_str(),
614        to_id.as_str()
615    ))
616}
617
618fn dependency_path_key(
619    kind: WorkEdgeKind,
620    from_id: &WorkItemId,
621    to_id: &WorkItemId,
622) -> wg_dsl::WorkDependencyPathKey {
623    wg_dsl::WorkDependencyPathKey(format!(
624        "{}:{}:{}",
625        edge_kind_key(kind),
626        from_id.as_str(),
627        to_id.as_str()
628    ))
629}
630
631fn dsl_edge_kind(kind: WorkEdgeKind) -> wg_dsl::WorkEdgeKind {
632    match kind {
633        WorkEdgeKind::Blocks => wg_dsl::WorkEdgeKind::Blocks,
634        WorkEdgeKind::Parent => wg_dsl::WorkEdgeKind::Parent,
635        WorkEdgeKind::Related => wg_dsl::WorkEdgeKind::Related,
636        WorkEdgeKind::Supersedes => wg_dsl::WorkEdgeKind::Supersedes,
637        WorkEdgeKind::DerivedFrom => wg_dsl::WorkEdgeKind::DerivedFrom,
638    }
639}
640
641fn edge_kind_key(kind: WorkEdgeKind) -> &'static str {
642    match kind {
643        WorkEdgeKind::Blocks => "blocks",
644        WorkEdgeKind::Parent => "parent",
645        WorkEdgeKind::Related => "related",
646        WorkEdgeKind::Supersedes => "supersedes",
647        WorkEdgeKind::DerivedFrom => "derived_from",
648    }
649}
650
651fn datetime_to_millis(dt: DateTime<Utc>) -> u64 {
652    u64::try_from(dt.timestamp_millis()).unwrap_or(0)
653}
654
655fn millis_to_datetime(ms: u64) -> Option<DateTime<Utc>> {
656    DateTime::from_timestamp_millis(i64::try_from(ms).ok()?)
657}
658
659fn item_event(
660    item: &WorkItem,
661    kind: WorkGraphEventKind,
662    at: DateTime<Utc>,
663) -> Result<WorkGraphEvent, WorkGraphError> {
664    Ok(WorkGraphEvent::item(
665        item.realm_id.clone(),
666        item.namespace.clone(),
667        item.id.clone(),
668        kind,
669        at,
670        json!({ "item": item }),
671    ))
672}
673
674fn seconds_to_duration(seconds: u64) -> Duration {
675    let seconds = i64::try_from(seconds).unwrap_or(i64::MAX);
676    Duration::seconds(seconds)
677}
678
679#[cfg(test)]
680#[allow(clippy::expect_used, clippy::unwrap_used)]
681mod tests {
682    use super::*;
683    use crate::types::{
684        ClaimWorkItemRequest, CloseWorkItemRequest, UpdateWorkItemRequest, WorkOwner, WorkOwnerKey,
685    };
686
687    fn create(title: &str, now: DateTime<Utc>) -> WorkItem {
688        WorkGraphMachine::create_item(
689            CreateWorkItemRequest {
690                realm_id: None,
691                namespace: None,
692                title: title.to_string(),
693                description: None,
694                priority: Default::default(),
695                labels: BTreeSet::new(),
696                due_at: None,
697                not_before: None,
698                snoozed_until: None,
699                external_refs: Vec::new(),
700                evidence_refs: Vec::new(),
701                status: None,
702            },
703            "realm".to_string(),
704            WorkNamespace::default(),
705            now,
706        )
707        .expect("create")
708        .0
709    }
710
711    fn owner(id: &str) -> WorkOwner {
712        WorkOwner::new(WorkOwnerKey::label(id).expect("owner key"))
713    }
714
715    #[test]
716    fn blocked_items_are_never_ready() {
717        let now = Utc::now();
718        let item = create("blocked", now);
719        let (item, _) = WorkGraphMachine::block_item(item, 1, now).expect("block");
720        assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
721    }
722
723    #[test]
724    fn future_due_items_are_not_ready() {
725        let now = Utc::now();
726        let item = create("future", now);
727        let (item, _) = WorkGraphMachine::update_item(
728            item,
729            UpdateWorkItemRequest {
730                id: WorkItemId::generated(),
731                realm_id: None,
732                namespace: None,
733                expected_revision: 1,
734                title: None,
735                description: None,
736                priority: None,
737                labels: None,
738                due_at: Some(now + Duration::hours(1)),
739                not_before: None,
740                snoozed_until: None,
741                external_refs: Vec::new(),
742            },
743            now,
744        )
745        .expect("update due");
746
747        assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
748    }
749
750    #[test]
751    fn terminal_items_cannot_be_claimed() {
752        let now = Utc::now();
753        let item = create("done", now);
754        let (item, _) = WorkGraphMachine::close_item(
755            item,
756            CloseWorkItemRequest {
757                id: WorkItemId::generated(),
758                realm_id: None,
759                namespace: None,
760                expected_revision: 1,
761                status: WorkStatus::Completed,
762            },
763            now,
764        )
765        .expect("close");
766        let error = WorkGraphMachine::claim_item(
767            item,
768            ClaimWorkItemRequest {
769                id: WorkItemId::generated(),
770                realm_id: None,
771                namespace: None,
772                expected_revision: 2,
773                owner: owner("worker"),
774                lease_seconds: None,
775                lease_expires_at: None,
776            },
777            now,
778        )
779        .expect_err("terminal claim should fail");
780        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
781    }
782
783    #[test]
784    fn stale_revisions_fail() {
785        let now = Utc::now();
786        let item = create("stale", now);
787        let error =
788            WorkGraphMachine::block_item(item, 7, now).expect_err("stale transition should fail");
789        assert!(matches!(error, WorkGraphError::StaleRevision { .. }));
790    }
791
792    #[test]
793    fn only_one_active_claim_can_exist() {
794        let now = Utc::now();
795        let item = create("claim", now);
796        let (claimed, _) = WorkGraphMachine::claim_item(
797            item,
798            ClaimWorkItemRequest {
799                id: WorkItemId::generated(),
800                realm_id: None,
801                namespace: None,
802                expected_revision: 1,
803                owner: owner("worker"),
804                lease_seconds: Some(60),
805                lease_expires_at: None,
806            },
807            now,
808        )
809        .expect("claim");
810        let error = WorkGraphMachine::claim_item(
811            claimed,
812            ClaimWorkItemRequest {
813                id: WorkItemId::generated(),
814                realm_id: None,
815                namespace: None,
816                expected_revision: 2,
817                owner: owner("worker-2"),
818                lease_seconds: Some(60),
819                lease_expires_at: None,
820            },
821            now,
822        )
823        .expect_err("double claim should fail");
824        assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
825    }
826}