Skip to main content

sayiir_core/
task_claim.rs

1//! Task claiming mechanism for multi-node collaboration.
2//!
3//! This module provides structures for claiming and tracking task execution
4//! across multiple nodes, preventing duplicate execution.
5
6use chrono::{Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9
10/// A claim on a task by a worker node.
11///
12/// When a worker claims a task, it has exclusive rights to execute it
13/// until the claim expires or is released.
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct TaskClaim {
16    /// The workflow instance ID (not to be confused with the workflow ID)
17    pub instance_id: Arc<str>,
18    /// The task ID being claimed.
19    pub task_id: crate::TaskId,
20    /// The worker node ID that claimed this task.
21    pub worker_id: String,
22    /// When the claim was created (Unix timestamp).
23    pub claimed_at: u64,
24    /// When the claim expires (Unix timestamp).
25    /// If None, the claim never expires.
26    pub expires_at: Option<u64>,
27}
28
29#[allow(clippy::cast_sign_loss)] // Timestamps are always positive
30impl TaskClaim {
31    /// Create a new task claim.
32    #[must_use]
33    pub fn new(
34        instance_id: &str,
35        task_id: crate::TaskId,
36        worker_id: String,
37        ttl: Option<Duration>,
38    ) -> Self {
39        let instance_id: Arc<str> = Arc::from(instance_id);
40        let now = Utc::now();
41        let claimed_at = now.timestamp() as u64;
42        let expires_at = ttl.and_then(|duration| {
43            now.checked_add_signed(duration)
44                .map(|expiry| expiry.timestamp() as u64)
45        });
46
47        Self {
48            instance_id,
49            task_id,
50            worker_id,
51            claimed_at,
52            expires_at,
53        }
54    }
55
56    /// Check if this claim has expired.
57    #[must_use]
58    pub fn is_expired(&self) -> bool {
59        if let Some(expires_at) = self.expires_at {
60            let now = Utc::now().timestamp() as u64;
61            now >= expires_at
62        } else {
63            false
64        }
65    }
66
67    /// Check if this claim belongs to the given worker.
68    #[must_use]
69    pub fn is_owned_by(&self, worker_id: &str) -> bool {
70        self.worker_id == worker_id
71    }
72}
73
74#[cfg(test)]
75#[allow(
76    clippy::unwrap_used,
77    clippy::expect_used,
78    clippy::panic,
79    clippy::indexing_slicing,
80    clippy::cast_sign_loss
81)]
82mod tests {
83    use super::*;
84
85    fn claim(worker: &str, expires_at: Option<u64>) -> TaskClaim {
86        TaskClaim {
87            instance_id: "inst-1".into(),
88            task_id: crate::TaskId::from("task-1"),
89            worker_id: worker.into(),
90            claimed_at: 1_000_000,
91            expires_at,
92        }
93    }
94
95    #[test]
96    fn no_ttl_never_expires() {
97        assert!(!claim("w1", None).is_expired());
98    }
99
100    #[test]
101    fn future_expiry_is_not_expired() {
102        let far_future = Utc::now().timestamp() as u64 + 3600;
103        assert!(!claim("w1", Some(far_future)).is_expired());
104    }
105
106    #[test]
107    fn past_expiry_is_expired() {
108        assert!(claim("w1", Some(0)).is_expired());
109    }
110
111    #[test]
112    fn boundary_expiry_is_expired() {
113        // expires_at == now should be expired (now >= expires_at)
114        let now = Utc::now().timestamp() as u64;
115        assert!(claim("w1", Some(now)).is_expired());
116    }
117
118    #[test]
119    fn is_owned_by_matching_worker() {
120        assert!(claim("worker-a", None).is_owned_by("worker-a"));
121    }
122
123    #[test]
124    fn is_not_owned_by_different_worker() {
125        assert!(!claim("worker-a", None).is_owned_by("worker-b"));
126    }
127
128    #[test]
129    fn new_with_ttl_sets_expiry() {
130        let c = TaskClaim::new(
131            "i",
132            crate::TaskId::from("t"),
133            "w".into(),
134            Some(Duration::seconds(60)),
135        );
136        assert!(c.expires_at.is_some());
137        assert!(c.expires_at.unwrap() > c.claimed_at);
138    }
139
140    #[test]
141    fn new_without_ttl_has_no_expiry() {
142        let c = TaskClaim::new("i", crate::TaskId::from("t"), "w".into(), None);
143        assert!(c.expires_at.is_none());
144    }
145}
146
147/// Information about an available task ready for execution.
148#[derive(Debug, Clone)]
149pub struct AvailableTask {
150    /// The workflow instance ID.
151    pub instance_id: Arc<str>,
152    /// The task ID.
153    pub task_id: crate::TaskId,
154    /// The input data for the task (serialized).
155    pub input: bytes::Bytes,
156    /// The workflow definition hash.
157    pub workflow_definition_hash: crate::DefinitionHash,
158    /// W3C `traceparent` header for distributed trace context propagation.
159    pub trace_parent: Option<String>,
160    /// Workflow state at dispatch time. Backends decode this from the
161    /// history JOIN they already issue for the dispatch SELECT, so
162    /// passing it through lets the worker skip a separate `load_snapshot`
163    /// round-trip. Safe to use post-claim: nothing mutates the snapshot
164    /// blob between dispatch and execution other than the worker
165    /// itself, and signals (which don't touch the blob) are re-checked
166    /// in the post-claim guard.
167    ///
168    /// Wrapped in `Arc` so the dispatch loop can move the owned
169    /// snapshot in once, and so workers that lose the claim race drop
170    /// their copy cheaply (refcount decrement). Only the worker that
171    /// actually executes the task pays a deep `clone()` — and only
172    /// when it needs a mutable working copy.
173    pub snapshot: Arc<crate::snapshot::WorkflowSnapshot>,
174}