sayiir_core/
task_claim.rs1use chrono::{Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct TaskClaim {
16 pub instance_id: Arc<str>,
18 pub task_id: crate::TaskId,
20 pub worker_id: String,
22 pub claimed_at: u64,
24 pub expires_at: Option<u64>,
27}
28
29#[allow(clippy::cast_sign_loss)] impl TaskClaim {
31 #[must_use]
33 pub fn new(
34 instance_id: &str,
35 task_id: crate::TaskId,
36 worker_id: String,
37 ttl: Option<Duration>,
38 ) -> Self {
39 let instance_id: Arc<str> = Arc::from(instance_id);
40 let now = Utc::now();
41 let claimed_at = now.timestamp() as u64;
42 let expires_at = ttl.and_then(|duration| {
43 now.checked_add_signed(duration)
44 .map(|expiry| expiry.timestamp() as u64)
45 });
46
47 Self {
48 instance_id,
49 task_id,
50 worker_id,
51 claimed_at,
52 expires_at,
53 }
54 }
55
56 #[must_use]
58 pub fn is_expired(&self) -> bool {
59 if let Some(expires_at) = self.expires_at {
60 let now = Utc::now().timestamp() as u64;
61 now >= expires_at
62 } else {
63 false
64 }
65 }
66
67 #[must_use]
69 pub fn is_owned_by(&self, worker_id: &str) -> bool {
70 self.worker_id == worker_id
71 }
72}
73
74#[cfg(test)]
75#[allow(
76 clippy::unwrap_used,
77 clippy::expect_used,
78 clippy::panic,
79 clippy::indexing_slicing,
80 clippy::cast_sign_loss
81)]
82mod tests {
83 use super::*;
84
85 fn claim(worker: &str, expires_at: Option<u64>) -> TaskClaim {
86 TaskClaim {
87 instance_id: "inst-1".into(),
88 task_id: crate::TaskId::from("task-1"),
89 worker_id: worker.into(),
90 claimed_at: 1_000_000,
91 expires_at,
92 }
93 }
94
95 #[test]
96 fn no_ttl_never_expires() {
97 assert!(!claim("w1", None).is_expired());
98 }
99
100 #[test]
101 fn future_expiry_is_not_expired() {
102 let far_future = Utc::now().timestamp() as u64 + 3600;
103 assert!(!claim("w1", Some(far_future)).is_expired());
104 }
105
106 #[test]
107 fn past_expiry_is_expired() {
108 assert!(claim("w1", Some(0)).is_expired());
109 }
110
111 #[test]
112 fn boundary_expiry_is_expired() {
113 let now = Utc::now().timestamp() as u64;
115 assert!(claim("w1", Some(now)).is_expired());
116 }
117
118 #[test]
119 fn is_owned_by_matching_worker() {
120 assert!(claim("worker-a", None).is_owned_by("worker-a"));
121 }
122
123 #[test]
124 fn is_not_owned_by_different_worker() {
125 assert!(!claim("worker-a", None).is_owned_by("worker-b"));
126 }
127
128 #[test]
129 fn new_with_ttl_sets_expiry() {
130 let c = TaskClaim::new(
131 "i",
132 crate::TaskId::from("t"),
133 "w".into(),
134 Some(Duration::seconds(60)),
135 );
136 assert!(c.expires_at.is_some());
137 assert!(c.expires_at.unwrap() > c.claimed_at);
138 }
139
140 #[test]
141 fn new_without_ttl_has_no_expiry() {
142 let c = TaskClaim::new("i", crate::TaskId::from("t"), "w".into(), None);
143 assert!(c.expires_at.is_none());
144 }
145}
146
147#[derive(Debug, Clone)]
149pub struct AvailableTask {
150 pub instance_id: Arc<str>,
152 pub task_id: crate::TaskId,
154 pub input: bytes::Bytes,
156 pub workflow_definition_hash: crate::DefinitionHash,
158 pub trace_parent: Option<String>,
160 pub snapshot: Arc<crate::snapshot::WorkflowSnapshot>,
174}