1use std::collections::{BTreeMap, BTreeSet};
2
3use chrono::{DateTime, Duration, Utc};
4use serde_json::json;
5
6use crate::WorkGraphError;
7use crate::machines::workgraph_lifecycle as wg_dsl;
8use crate::types::{
9 AddEvidenceRequest, ClaimWorkItemRequest, CloseWorkItemRequest, CreateWorkItemRequest,
10 ReleaseWorkItemRequest, UpdateWorkItemRequest, WorkClaim, WorkEdge, WorkEdgeKind,
11 WorkGraphEvent, WorkGraphEventKind, WorkGraphMachineState, WorkItem, WorkItemId, WorkNamespace,
12 WorkStatus,
13};
14
15#[derive(Debug, Default, Clone, Copy)]
16pub struct WorkGraphMachine;
17
18impl WorkGraphMachine {
19 pub fn validate_item_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
20 validate_item_machine_projection(item)
21 }
22
23 pub fn create_item(
24 request: CreateWorkItemRequest,
25 realm_id: String,
26 namespace: WorkNamespace,
27 now: DateTime<Utc>,
28 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
29 let title = validate_title(request.title)?;
30 let status = request.status.unwrap_or_default();
31 if matches!(
32 status,
33 WorkStatus::InProgress
34 | WorkStatus::Completed
35 | WorkStatus::Cancelled
36 | WorkStatus::Failed
37 ) {
38 return Err(WorkGraphError::InvalidTransition(
39 "new work items may only start open or blocked".to_string(),
40 ));
41 }
42 let input = match status {
43 WorkStatus::Open => wg_dsl::WorkGraphLifecycleInput::CreateOpen {
44 due_at_utc_ms: request.due_at.map(datetime_to_millis),
45 not_before_utc_ms: request.not_before.map(datetime_to_millis),
46 snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
47 unresolved_blocker_count: 0,
48 },
49 WorkStatus::Blocked => wg_dsl::WorkGraphLifecycleInput::CreateBlocked {
50 due_at_utc_ms: request.due_at.map(datetime_to_millis),
51 not_before_utc_ms: request.not_before.map(datetime_to_millis),
52 snoozed_until_utc_ms: request.snoozed_until.map(datetime_to_millis),
53 unresolved_blocker_count: 0,
54 },
55 WorkStatus::InProgress
56 | WorkStatus::Completed
57 | WorkStatus::Cancelled
58 | WorkStatus::Failed => unreachable!("invalid create status rejected above"),
59 };
60 let dsl_state = apply_new_item_dsl(input)?;
61 let mut item = WorkItem {
62 id: WorkItemId::generated(),
63 realm_id,
64 namespace,
65 title,
66 description: request.description,
67 status: work_status_from_dsl(dsl_state.lifecycle_phase)?,
68 priority: request.priority,
69 labels: normalize_labels(request.labels)?,
70 owner: None,
71 claim: None,
72 machine_state: dsl_state.clone(),
73 revision: dsl_state.revision,
74 due_at: dsl_state.due_at_utc_ms.and_then(millis_to_datetime),
75 not_before: dsl_state.not_before_utc_ms.and_then(millis_to_datetime),
76 snoozed_until: dsl_state.snoozed_until_utc_ms.and_then(millis_to_datetime),
77 created_at: now,
78 updated_at: now,
79 terminal_at: dsl_state.terminal_at_utc_ms.and_then(millis_to_datetime),
80 external_refs: request.external_refs,
81 evidence_refs: request.evidence_refs,
82 };
83 sync_item_from_machine_state(&mut item)?;
84 let event = item_event(&item, WorkGraphEventKind::Created, now)?;
85 Ok((item, event))
86 }
87
88 pub fn update_item(
89 mut item: WorkItem,
90 request: UpdateWorkItemRequest,
91 now: DateTime<Utc>,
92 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
93 let due_at = request.due_at.or(item.due_at);
94 let not_before = request.not_before.or(item.not_before);
95 let snoozed_until = request.snoozed_until.or(item.snoozed_until);
96 let dsl_state = apply_item_dsl(
97 &item,
98 item.machine_state.unresolved_blocker_count,
99 wg_dsl::WorkGraphLifecycleInput::Update {
100 expected_revision: request.expected_revision,
101 due_at_utc_ms: due_at.map(datetime_to_millis),
102 not_before_utc_ms: not_before.map(datetime_to_millis),
103 snoozed_until_utc_ms: snoozed_until.map(datetime_to_millis),
104 unresolved_blocker_count: item.machine_state.unresolved_blocker_count,
105 },
106 Some(request.expected_revision),
107 )?;
108
109 if let Some(title) = request.title {
110 item.title = validate_title(title)?;
111 }
112 if let Some(description) = request.description {
113 item.description = Some(description);
114 }
115 if let Some(priority) = request.priority {
116 item.priority = priority;
117 }
118 if let Some(labels) = request.labels {
119 item.labels = normalize_labels(labels)?;
120 }
121 item.machine_state = dsl_state;
122 sync_item_from_machine_state(&mut item)?;
123 if !request.external_refs.is_empty() {
124 item.external_refs = request.external_refs;
125 }
126 item.updated_at = now;
127 let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
128 Ok((item, event))
129 }
130
131 pub fn claim_item(
132 item: WorkItem,
133 request: ClaimWorkItemRequest,
134 now: DateTime<Utc>,
135 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
136 Self::claim_ready_item(item, request, now)
137 }
138
139 pub fn claim_ready_item(
140 item: WorkItem,
141 request: ClaimWorkItemRequest,
142 now: DateTime<Utc>,
143 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
144 Self::claim_item_with_unresolved_blockers(
145 item.clone(),
146 item.machine_state.unresolved_blocker_count,
147 request,
148 now,
149 )
150 }
151
152 pub fn refresh_eligibility(
153 mut item: WorkItem,
154 unresolved_blocker_count: u64,
155 now: DateTime<Utc>,
156 ) -> Result<Option<(WorkItem, WorkGraphEvent)>, WorkGraphError> {
157 if item.machine_state.unresolved_blocker_count == unresolved_blocker_count {
158 return Ok(None);
159 }
160 let dsl_state = apply_item_dsl(
161 &item,
162 unresolved_blocker_count,
163 wg_dsl::WorkGraphLifecycleInput::RefreshEligibility {
164 unresolved_blocker_count,
165 },
166 None,
167 )?;
168 item.machine_state = dsl_state;
169 sync_item_from_machine_state(&mut item)?;
170 item.updated_at = now;
171 let event = item_event(&item, WorkGraphEventKind::Updated, now)?;
172 Ok(Some((item, event)))
173 }
174
175 pub(crate) fn claim_item_with_unresolved_blockers(
176 mut item: WorkItem,
177 unresolved_blocker_count: u64,
178 request: ClaimWorkItemRequest,
179 now: DateTime<Utc>,
180 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
181 let lease_expires_at = request.lease_expires_at.or_else(|| {
182 request
183 .lease_seconds
184 .map(|seconds| now + seconds_to_duration(seconds))
185 });
186 let owner_key = work_owner_key(&request.owner)?;
187 let dsl_state = apply_item_dsl(
188 &item,
189 unresolved_blocker_count,
190 wg_dsl::WorkGraphLifecycleInput::Claim {
191 expected_revision: request.expected_revision,
192 owner_key,
193 now_utc_ms: datetime_to_millis(now),
194 lease_expires_at_utc_ms: lease_expires_at.map(datetime_to_millis),
195 },
196 Some(request.expected_revision),
197 )?;
198 item.owner = Some(request.owner.clone());
199 item.claim = Some(WorkClaim {
200 owner: request.owner,
201 claimed_at: now,
202 lease_expires_at,
203 });
204 item.machine_state = dsl_state;
205 sync_item_from_machine_state(&mut item)?;
206 item.updated_at = now;
207 let event = item_event(&item, WorkGraphEventKind::Claimed, now)?;
208 Ok((item, event))
209 }
210
211 pub fn release_item(
212 mut item: WorkItem,
213 request: ReleaseWorkItemRequest,
214 now: DateTime<Utc>,
215 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
216 let dsl_state = apply_item_dsl(
217 &item,
218 item.machine_state.unresolved_blocker_count,
219 wg_dsl::WorkGraphLifecycleInput::Release {
220 expected_revision: request.expected_revision,
221 },
222 Some(request.expected_revision),
223 )?;
224 item.claim = None;
225 item.owner = None;
226 item.machine_state = dsl_state;
227 sync_item_from_machine_state(&mut item)?;
228 item.updated_at = now;
229 let event = item_event(&item, WorkGraphEventKind::Released, now)?;
230 Ok((item, event))
231 }
232
233 pub fn block_item(
234 mut item: WorkItem,
235 expected_revision: u64,
236 now: DateTime<Utc>,
237 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
238 let dsl_state = apply_item_dsl(
239 &item,
240 item.machine_state.unresolved_blocker_count,
241 wg_dsl::WorkGraphLifecycleInput::Block { expected_revision },
242 Some(expected_revision),
243 )?;
244 item.claim = None;
245 item.owner = None;
246 item.machine_state = dsl_state;
247 sync_item_from_machine_state(&mut item)?;
248 item.updated_at = now;
249 let event = item_event(&item, WorkGraphEventKind::Blocked, now)?;
250 Ok((item, event))
251 }
252
253 pub fn close_item(
254 mut item: WorkItem,
255 request: CloseWorkItemRequest,
256 now: DateTime<Utc>,
257 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
258 let dsl_input = match request.status {
259 WorkStatus::Completed => wg_dsl::WorkGraphLifecycleInput::CloseCompleted {
260 expected_revision: request.expected_revision,
261 at_utc_ms: datetime_to_millis(now),
262 },
263 WorkStatus::Cancelled => wg_dsl::WorkGraphLifecycleInput::CloseCancelled {
264 expected_revision: request.expected_revision,
265 at_utc_ms: datetime_to_millis(now),
266 },
267 WorkStatus::Failed => wg_dsl::WorkGraphLifecycleInput::CloseFailed {
268 expected_revision: request.expected_revision,
269 at_utc_ms: datetime_to_millis(now),
270 },
271 WorkStatus::Open | WorkStatus::InProgress | WorkStatus::Blocked => {
272 return Err(WorkGraphError::InvalidTransition(
273 "close requires a terminal status".to_string(),
274 ));
275 }
276 };
277 let dsl_state = apply_item_dsl(
278 &item,
279 item.machine_state.unresolved_blocker_count,
280 dsl_input,
281 Some(request.expected_revision),
282 )?;
283 item.claim = None;
284 item.owner = None;
285 item.machine_state = dsl_state;
286 sync_item_from_machine_state(&mut item)?;
287 item.updated_at = now;
288 let event = item_event(&item, WorkGraphEventKind::Closed, now)?;
289 Ok((item, event))
290 }
291
292 pub fn add_evidence(
293 mut item: WorkItem,
294 request: AddEvidenceRequest,
295 now: DateTime<Utc>,
296 ) -> Result<(WorkItem, WorkGraphEvent), WorkGraphError> {
297 let dsl_state = apply_item_dsl(
298 &item,
299 item.machine_state.unresolved_blocker_count,
300 wg_dsl::WorkGraphLifecycleInput::AddEvidence {
301 expected_revision: request.expected_revision,
302 },
303 Some(request.expected_revision),
304 )?;
305 item.evidence_refs.push(request.evidence);
306 item.machine_state = dsl_state;
307 sync_item_from_machine_state(&mut item)?;
308 item.updated_at = now;
309 let event = item_event(&item, WorkGraphEventKind::EvidenceAdded, now)?;
310 Ok((item, event))
311 }
312
313 pub fn is_ready(item: &WorkItem, now: DateTime<Utc>) -> bool {
314 let owner_key = wg_dsl::WorkOwnerKey {
315 kind: wg_dsl::WorkOwnerKind::Label,
316 id: "__ready_probe__".to_string(),
317 };
318 apply_item_dsl(
319 item,
320 item.machine_state.unresolved_blocker_count,
321 wg_dsl::WorkGraphLifecycleInput::Claim {
322 expected_revision: item.revision,
323 owner_key,
324 now_utc_ms: datetime_to_millis(now),
325 lease_expires_at_utc_ms: None,
326 },
327 Some(item.revision),
328 )
329 .is_ok()
330 }
331
332 pub fn ready_items(items: Vec<WorkItem>, now: DateTime<Utc>) -> Vec<WorkItem> {
333 items
334 .into_iter()
335 .filter(|item| Self::is_ready(item, now))
336 .collect()
337 }
338
339 pub fn validate_link(
340 edge: &WorkEdge,
341 existing_items: &[WorkItem],
342 existing_edges: &[WorkEdge],
343 ) -> Result<(), WorkGraphError> {
344 let topology_state = topology_state(existing_items, existing_edges);
345 apply_link_validation_dsl(
346 topology_state,
347 wg_dsl::WorkGraphLifecycleInput::ValidateLink {
348 kind: dsl_edge_kind(edge.kind),
349 from_item_key: work_item_key(&edge.from_id),
350 to_item_key: work_item_key(&edge.to_id),
351 edge_key: work_edge_key(edge.kind, &edge.from_id, &edge.to_id),
352 reverse_path_key: dependency_path_key(edge.kind, &edge.to_id, &edge.from_id),
353 },
354 )?;
355 Ok(())
356 }
357}
358
359fn validate_title(title: String) -> Result<String, WorkGraphError> {
360 let title = title.trim();
361 if title.is_empty() {
362 return Err(WorkGraphError::InvalidInput(
363 "work item title must not be empty".to_string(),
364 ));
365 }
366 Ok(title.to_string())
367}
368
369fn normalize_labels(labels: BTreeSet<String>) -> Result<BTreeSet<String>, WorkGraphError> {
370 let mut normalized = BTreeSet::new();
371 for label in labels {
372 let label = label.trim();
373 if label.is_empty() {
374 return Err(WorkGraphError::InvalidInput(
375 "work item labels must not be empty".to_string(),
376 ));
377 }
378 normalized.insert(label.to_string());
379 }
380 Ok(normalized)
381}
382
383fn apply_new_item_dsl(
384 input: wg_dsl::WorkGraphLifecycleInput,
385) -> Result<wg_dsl::WorkGraphLifecycleMachineState, WorkGraphError> {
386 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::new();
387 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
388 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
389 Ok(dsl_auth.state)
390}
391
392fn apply_link_validation_dsl(
393 state: WorkGraphMachineState,
394 input: wg_dsl::WorkGraphLifecycleInput,
395) -> Result<(), WorkGraphError> {
396 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::from_state(state);
397 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input)
398 .map_err(|error| WorkGraphError::InvalidTransition(format!("{error:?}")))?;
399 Ok(())
400}
401
402fn apply_item_dsl(
403 item: &WorkItem,
404 unresolved_blocker_count: u64,
405 input: wg_dsl::WorkGraphLifecycleInput,
406 expected_revision: Option<u64>,
407) -> Result<WorkGraphMachineState, WorkGraphError> {
408 validate_item_machine_projection(item)?;
409 let mut state = item.machine_state.clone();
410 state.unresolved_blocker_count = unresolved_blocker_count;
411 let mut dsl_auth = wg_dsl::WorkGraphLifecycleMachineAuthority::from_state(state);
412 wg_dsl::WorkGraphLifecycleMachineMutator::apply(&mut dsl_auth, input).map_err(|error| {
413 if let Some(expected) = expected_revision
414 && item.revision != expected
415 {
416 return WorkGraphError::StaleRevision {
417 id: item.id.clone(),
418 expected,
419 actual: item.revision,
420 };
421 }
422 WorkGraphError::InvalidTransition(format!("{error:?}"))
423 })?;
424 Ok(dsl_auth.state)
425}
426
427fn work_status_from_dsl(status: wg_dsl::WorkLifecycleState) -> Result<WorkStatus, WorkGraphError> {
428 match status {
429 wg_dsl::WorkLifecycleState::Open => Ok(WorkStatus::Open),
430 wg_dsl::WorkLifecycleState::InProgress => Ok(WorkStatus::InProgress),
431 wg_dsl::WorkLifecycleState::Blocked => Ok(WorkStatus::Blocked),
432 wg_dsl::WorkLifecycleState::Completed => Ok(WorkStatus::Completed),
433 wg_dsl::WorkLifecycleState::Cancelled => Ok(WorkStatus::Cancelled),
434 wg_dsl::WorkLifecycleState::Failed => Ok(WorkStatus::Failed),
435 wg_dsl::WorkLifecycleState::Absent => Err(WorkGraphError::InvalidTransition(
436 "work item lifecycle state is absent".to_string(),
437 )),
438 }
439}
440
441fn sync_item_from_machine_state(item: &mut WorkItem) -> Result<(), WorkGraphError> {
442 item.status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
443 item.revision = item.machine_state.revision;
444 item.due_at = item
445 .machine_state
446 .due_at_utc_ms
447 .and_then(millis_to_datetime);
448 item.not_before = item
449 .machine_state
450 .not_before_utc_ms
451 .and_then(millis_to_datetime);
452 item.snoozed_until = item
453 .machine_state
454 .snoozed_until_utc_ms
455 .and_then(millis_to_datetime);
456 item.terminal_at = item
457 .machine_state
458 .terminal_at_utc_ms
459 .and_then(millis_to_datetime);
460 Ok(())
461}
462
463fn validate_item_machine_projection(item: &WorkItem) -> Result<(), WorkGraphError> {
464 let status = work_status_from_dsl(item.machine_state.lifecycle_phase)?;
465 if item.status != status {
466 return Err(WorkGraphError::Store(format!(
467 "work item {} status projection {:?} does not match machine state {:?}",
468 item.id, item.status, status
469 )));
470 }
471 if item.revision != item.machine_state.revision {
472 return Err(WorkGraphError::Store(format!(
473 "work item {} revision projection {} does not match machine state {}",
474 item.id, item.revision, item.machine_state.revision
475 )));
476 }
477 if item.due_at.map(datetime_to_millis) != item.machine_state.due_at_utc_ms {
478 return Err(WorkGraphError::Store(format!(
479 "work item {} due_at projection does not match machine state",
480 item.id
481 )));
482 }
483 if item.not_before.map(datetime_to_millis) != item.machine_state.not_before_utc_ms {
484 return Err(WorkGraphError::Store(format!(
485 "work item {} not_before projection does not match machine state",
486 item.id
487 )));
488 }
489 if item.snoozed_until.map(datetime_to_millis) != item.machine_state.snoozed_until_utc_ms {
490 return Err(WorkGraphError::Store(format!(
491 "work item {} snoozed_until projection does not match machine state",
492 item.id
493 )));
494 }
495 if item.terminal_at.map(datetime_to_millis) != item.machine_state.terminal_at_utc_ms {
496 return Err(WorkGraphError::Store(format!(
497 "work item {} terminal_at projection does not match machine state",
498 item.id
499 )));
500 }
501 if let Some(claim) = &item.claim {
502 let claim_owner_key = work_owner_key(&claim.owner)?;
503 if item.machine_state.claim_owner_key.as_ref() != Some(&claim_owner_key) {
504 return Err(WorkGraphError::Store(format!(
505 "work item {} claim owner projection does not match machine state",
506 item.id
507 )));
508 }
509 if item.machine_state.claimed_at_utc_ms != Some(datetime_to_millis(claim.claimed_at)) {
510 return Err(WorkGraphError::Store(format!(
511 "work item {} claim time projection does not match machine state",
512 item.id
513 )));
514 }
515 if item.machine_state.lease_expires_at_utc_ms
516 != claim.lease_expires_at.map(datetime_to_millis)
517 {
518 return Err(WorkGraphError::Store(format!(
519 "work item {} claim lease projection does not match machine state",
520 item.id
521 )));
522 }
523 } else if item.machine_state.claim_owner_key.is_some()
524 || item.machine_state.claimed_at_utc_ms.is_some()
525 || item.machine_state.lease_expires_at_utc_ms.is_some()
526 {
527 return Err(WorkGraphError::Store(format!(
528 "work item {} machine state has a claim without a claim projection",
529 item.id
530 )));
531 }
532 Ok(())
533}
534
535fn work_owner_key(owner: &crate::types::WorkOwner) -> Result<wg_dsl::WorkOwnerKey, WorkGraphError> {
536 let kind = match owner.key.kind {
537 crate::types::WorkOwnerKind::Principal => wg_dsl::WorkOwnerKind::Principal,
538 crate::types::WorkOwnerKind::Agent => wg_dsl::WorkOwnerKind::Agent,
539 crate::types::WorkOwnerKind::Session => wg_dsl::WorkOwnerKind::Session,
540 crate::types::WorkOwnerKind::Mob => wg_dsl::WorkOwnerKind::Mob,
541 crate::types::WorkOwnerKind::Label => wg_dsl::WorkOwnerKind::Label,
542 };
543 Ok(wg_dsl::WorkOwnerKey {
544 kind,
545 id: owner.key.id.clone(),
546 })
547}
548
549fn topology_state(
550 existing_items: &[WorkItem],
551 existing_edges: &[WorkEdge],
552) -> WorkGraphMachineState {
553 WorkGraphMachineState {
554 topology_item_keys: existing_items
555 .iter()
556 .map(|item| work_item_key(&item.id))
557 .collect(),
558 topology_edge_keys: existing_edges
559 .iter()
560 .map(|edge| work_edge_key(edge.kind, &edge.from_id, &edge.to_id))
561 .collect(),
562 blocks_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Blocks),
563 parent_reachability: dependency_reachability(existing_edges, WorkEdgeKind::Parent),
564 ..Default::default()
565 }
566}
567
568fn dependency_reachability(
569 edges: &[WorkEdge],
570 kind: WorkEdgeKind,
571) -> BTreeSet<wg_dsl::WorkDependencyPathKey> {
572 let mut adjacency = BTreeMap::<WorkItemId, BTreeSet<WorkItemId>>::new();
573 for edge in edges.iter().filter(|edge| edge.kind == kind) {
574 adjacency
575 .entry(edge.from_id.clone())
576 .or_default()
577 .insert(edge.to_id.clone());
578 }
579
580 let mut reachability = BTreeSet::new();
581 for start in adjacency.keys() {
582 let mut stack = adjacency
583 .get(start)
584 .into_iter()
585 .flat_map(|targets| targets.iter().cloned())
586 .collect::<Vec<_>>();
587 let mut seen = BTreeSet::new();
588 while let Some(current) = stack.pop() {
589 if !seen.insert(current.clone()) {
590 continue;
591 }
592 reachability.insert(dependency_path_key(kind, start, ¤t));
593 if let Some(targets) = adjacency.get(¤t) {
594 stack.extend(targets.iter().cloned());
595 }
596 }
597 }
598 reachability
599}
600
601fn work_item_key(id: &WorkItemId) -> wg_dsl::WorkItemKey {
602 wg_dsl::WorkItemKey(id.as_str().to_string())
603}
604
605fn work_edge_key(
606 kind: WorkEdgeKind,
607 from_id: &WorkItemId,
608 to_id: &WorkItemId,
609) -> wg_dsl::WorkEdgeKey {
610 wg_dsl::WorkEdgeKey(format!(
611 "{}:{}:{}",
612 edge_kind_key(kind),
613 from_id.as_str(),
614 to_id.as_str()
615 ))
616}
617
618fn dependency_path_key(
619 kind: WorkEdgeKind,
620 from_id: &WorkItemId,
621 to_id: &WorkItemId,
622) -> wg_dsl::WorkDependencyPathKey {
623 wg_dsl::WorkDependencyPathKey(format!(
624 "{}:{}:{}",
625 edge_kind_key(kind),
626 from_id.as_str(),
627 to_id.as_str()
628 ))
629}
630
631fn dsl_edge_kind(kind: WorkEdgeKind) -> wg_dsl::WorkEdgeKind {
632 match kind {
633 WorkEdgeKind::Blocks => wg_dsl::WorkEdgeKind::Blocks,
634 WorkEdgeKind::Parent => wg_dsl::WorkEdgeKind::Parent,
635 WorkEdgeKind::Related => wg_dsl::WorkEdgeKind::Related,
636 WorkEdgeKind::Supersedes => wg_dsl::WorkEdgeKind::Supersedes,
637 WorkEdgeKind::DerivedFrom => wg_dsl::WorkEdgeKind::DerivedFrom,
638 }
639}
640
641fn edge_kind_key(kind: WorkEdgeKind) -> &'static str {
642 match kind {
643 WorkEdgeKind::Blocks => "blocks",
644 WorkEdgeKind::Parent => "parent",
645 WorkEdgeKind::Related => "related",
646 WorkEdgeKind::Supersedes => "supersedes",
647 WorkEdgeKind::DerivedFrom => "derived_from",
648 }
649}
650
651fn datetime_to_millis(dt: DateTime<Utc>) -> u64 {
652 u64::try_from(dt.timestamp_millis()).unwrap_or(0)
653}
654
655fn millis_to_datetime(ms: u64) -> Option<DateTime<Utc>> {
656 DateTime::from_timestamp_millis(i64::try_from(ms).ok()?)
657}
658
659fn item_event(
660 item: &WorkItem,
661 kind: WorkGraphEventKind,
662 at: DateTime<Utc>,
663) -> Result<WorkGraphEvent, WorkGraphError> {
664 Ok(WorkGraphEvent::item(
665 item.realm_id.clone(),
666 item.namespace.clone(),
667 item.id.clone(),
668 kind,
669 at,
670 json!({ "item": item }),
671 ))
672}
673
674fn seconds_to_duration(seconds: u64) -> Duration {
675 let seconds = i64::try_from(seconds).unwrap_or(i64::MAX);
676 Duration::seconds(seconds)
677}
678
679#[cfg(test)]
680#[allow(clippy::expect_used, clippy::unwrap_used)]
681mod tests {
682 use super::*;
683 use crate::types::{
684 ClaimWorkItemRequest, CloseWorkItemRequest, UpdateWorkItemRequest, WorkOwner, WorkOwnerKey,
685 };
686
687 fn create(title: &str, now: DateTime<Utc>) -> WorkItem {
688 WorkGraphMachine::create_item(
689 CreateWorkItemRequest {
690 realm_id: None,
691 namespace: None,
692 title: title.to_string(),
693 description: None,
694 priority: Default::default(),
695 labels: BTreeSet::new(),
696 due_at: None,
697 not_before: None,
698 snoozed_until: None,
699 external_refs: Vec::new(),
700 evidence_refs: Vec::new(),
701 status: None,
702 },
703 "realm".to_string(),
704 WorkNamespace::default(),
705 now,
706 )
707 .expect("create")
708 .0
709 }
710
711 fn owner(id: &str) -> WorkOwner {
712 WorkOwner::new(WorkOwnerKey::label(id).expect("owner key"))
713 }
714
715 #[test]
716 fn blocked_items_are_never_ready() {
717 let now = Utc::now();
718 let item = create("blocked", now);
719 let (item, _) = WorkGraphMachine::block_item(item, 1, now).expect("block");
720 assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
721 }
722
723 #[test]
724 fn future_due_items_are_not_ready() {
725 let now = Utc::now();
726 let item = create("future", now);
727 let (item, _) = WorkGraphMachine::update_item(
728 item,
729 UpdateWorkItemRequest {
730 id: WorkItemId::generated(),
731 realm_id: None,
732 namespace: None,
733 expected_revision: 1,
734 title: None,
735 description: None,
736 priority: None,
737 labels: None,
738 due_at: Some(now + Duration::hours(1)),
739 not_before: None,
740 snoozed_until: None,
741 external_refs: Vec::new(),
742 },
743 now,
744 )
745 .expect("update due");
746
747 assert!(WorkGraphMachine::ready_items(vec![item], now).is_empty());
748 }
749
750 #[test]
751 fn terminal_items_cannot_be_claimed() {
752 let now = Utc::now();
753 let item = create("done", now);
754 let (item, _) = WorkGraphMachine::close_item(
755 item,
756 CloseWorkItemRequest {
757 id: WorkItemId::generated(),
758 realm_id: None,
759 namespace: None,
760 expected_revision: 1,
761 status: WorkStatus::Completed,
762 },
763 now,
764 )
765 .expect("close");
766 let error = WorkGraphMachine::claim_item(
767 item,
768 ClaimWorkItemRequest {
769 id: WorkItemId::generated(),
770 realm_id: None,
771 namespace: None,
772 expected_revision: 2,
773 owner: owner("worker"),
774 lease_seconds: None,
775 lease_expires_at: None,
776 },
777 now,
778 )
779 .expect_err("terminal claim should fail");
780 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
781 }
782
783 #[test]
784 fn stale_revisions_fail() {
785 let now = Utc::now();
786 let item = create("stale", now);
787 let error =
788 WorkGraphMachine::block_item(item, 7, now).expect_err("stale transition should fail");
789 assert!(matches!(error, WorkGraphError::StaleRevision { .. }));
790 }
791
792 #[test]
793 fn only_one_active_claim_can_exist() {
794 let now = Utc::now();
795 let item = create("claim", now);
796 let (claimed, _) = WorkGraphMachine::claim_item(
797 item,
798 ClaimWorkItemRequest {
799 id: WorkItemId::generated(),
800 realm_id: None,
801 namespace: None,
802 expected_revision: 1,
803 owner: owner("worker"),
804 lease_seconds: Some(60),
805 lease_expires_at: None,
806 },
807 now,
808 )
809 .expect("claim");
810 let error = WorkGraphMachine::claim_item(
811 claimed,
812 ClaimWorkItemRequest {
813 id: WorkItemId::generated(),
814 realm_id: None,
815 namespace: None,
816 expected_revision: 2,
817 owner: owner("worker-2"),
818 lease_seconds: Some(60),
819 lease_expires_at: None,
820 },
821 now,
822 )
823 .expect_err("double claim should fail");
824 assert!(matches!(error, WorkGraphError::InvalidTransition(_)));
825 }
826}