1use crate::channel::oneshot;
22use crate::cx::Cx;
23use crate::trace::distributed::LogicalTime;
24use crate::types::{Budget, CancelReason, ObligationId, RegionId, TaskId, Time};
25use std::fmt;
26use std::marker::PhantomData;
27use std::sync::atomic::{AtomicU64, Ordering};
28use std::sync::Arc;
29use std::time::Duration;
30
31static REMOTE_TASK_COUNTER: AtomicU64 = AtomicU64::new(1);
36
37#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
43pub struct NodeId(String);
44
45impl NodeId {
46 #[must_use]
48 pub fn new(id: impl Into<String>) -> Self {
49 Self(id.into())
50 }
51
52 #[must_use]
54 pub fn as_str(&self) -> &str {
55 &self.0
56 }
57}
58
59impl fmt::Display for NodeId {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 write!(f, "Node({})", self.0)
62 }
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
71pub struct RemoteTaskId(u64);
72
73impl RemoteTaskId {
74 #[must_use]
76 pub fn next() -> Self {
77 Self(REMOTE_TASK_COUNTER.fetch_add(1, Ordering::Relaxed))
78 }
79
80 #[must_use]
82 pub const fn from_raw(value: u64) -> Self {
83 Self(value)
84 }
85
86 #[must_use]
88 pub const fn raw(self) -> u64 {
89 self.0
90 }
91}
92
93impl fmt::Display for RemoteTaskId {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 write!(f, "RT{}", self.0)
96 }
97}
98
99#[derive(Clone, Debug, PartialEq, Eq, Hash)]
120pub struct ComputationName(String);
121
122impl ComputationName {
123 #[must_use]
125 pub fn new(name: impl Into<String>) -> Self {
126 Self(name.into())
127 }
128
129 #[must_use]
131 pub fn as_str(&self) -> &str {
132 &self.0
133 }
134}
135
136impl fmt::Display for ComputationName {
137 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
138 write!(f, "{}", self.0)
139 }
140}
141
142#[derive(Clone, Debug)]
152pub struct RemoteInput {
153 data: Vec<u8>,
154}
155
156impl RemoteInput {
157 #[must_use]
159 pub fn new(data: Vec<u8>) -> Self {
160 Self { data }
161 }
162
163 #[must_use]
165 pub fn empty() -> Self {
166 Self { data: Vec::new() }
167 }
168
169 #[must_use]
171 pub fn data(&self) -> &[u8] {
172 &self.data
173 }
174
175 #[must_use]
177 pub fn into_data(self) -> Vec<u8> {
178 self.data
179 }
180
181 #[must_use]
183 pub fn len(&self) -> usize {
184 self.data.len()
185 }
186
187 #[must_use]
189 pub fn is_empty(&self) -> bool {
190 self.data.is_empty()
191 }
192}
193
194pub trait RemoteRuntime: Send + Sync + fmt::Debug {
203 fn send_message(
205 &self,
206 destination: &NodeId,
207 envelope: MessageEnvelope<RemoteMessage>,
208 ) -> Result<(), RemoteError>;
209
210 fn register_task(
212 &self,
213 task_id: RemoteTaskId,
214 tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
215 );
216
217 fn unregister_task(&self, _task_id: RemoteTaskId) {}
222}
223
224#[derive(Clone, Debug)]
260pub struct RemoteCap {
261 default_lease: Duration,
263 remote_budget: Option<Budget>,
265 local_node: NodeId,
267 runtime: Option<Arc<dyn RemoteRuntime>>,
269}
270
271impl RemoteCap {
272 #[must_use]
274 pub fn new() -> Self {
275 Self {
276 default_lease: Duration::from_secs(30),
277 remote_budget: None,
278 local_node: NodeId::new("local"),
279 runtime: None,
280 }
281 }
282
283 #[must_use]
285 pub fn with_default_lease(mut self, lease: Duration) -> Self {
286 self.default_lease = lease;
287 self
288 }
289
290 #[must_use]
292 pub fn with_remote_budget(mut self, budget: Budget) -> Self {
293 self.remote_budget = Some(budget);
294 self
295 }
296
297 #[must_use]
299 pub fn with_local_node(mut self, node: NodeId) -> Self {
300 self.local_node = node;
301 self
302 }
303
304 #[must_use]
306 pub fn with_runtime(mut self, runtime: Arc<dyn RemoteRuntime>) -> Self {
307 self.runtime = Some(runtime);
308 self
309 }
310
311 #[must_use]
313 pub fn default_lease(&self) -> Duration {
314 self.default_lease
315 }
316
317 #[must_use]
319 pub fn remote_budget(&self) -> Option<&Budget> {
320 self.remote_budget.as_ref()
321 }
322
323 #[must_use]
325 pub fn local_node(&self) -> &NodeId {
326 &self.local_node
327 }
328
329 #[must_use]
331 pub fn runtime(&self) -> Option<&Arc<dyn RemoteRuntime>> {
332 self.runtime.as_ref()
333 }
334}
335
336impl Default for RemoteCap {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342#[derive(Clone, Debug, PartialEq, Eq)]
348pub enum RemoteTaskState {
349 Pending,
351 Running,
353 Completed,
355 Failed,
357 Cancelled,
359 LeaseExpired,
361}
362
363impl fmt::Display for RemoteTaskState {
364 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365 match self {
366 Self::Pending => write!(f, "Pending"),
367 Self::Running => write!(f, "Running"),
368 Self::Completed => write!(f, "Completed"),
369 Self::Failed => write!(f, "Failed"),
370 Self::Cancelled => write!(f, "Cancelled"),
371 Self::LeaseExpired => write!(f, "LeaseExpired"),
372 }
373 }
374}
375
376#[derive(Debug, Clone, PartialEq, Eq)]
382pub enum RemoteError {
383 NoCapability,
385 NodeUnreachable(String),
387 UnknownComputation(String),
389 LeaseExpired,
391 Cancelled(CancelReason),
393 RemotePanic(String),
395 SerializationError(String),
397 TransportError(String),
399}
400
401impl fmt::Display for RemoteError {
402 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
403 match self {
404 Self::NoCapability => write!(f, "remote capability not available"),
405 Self::NodeUnreachable(node) => write!(f, "node unreachable: {node}"),
406 Self::UnknownComputation(name) => {
407 write!(f, "unknown computation: {name}")
408 }
409 Self::LeaseExpired => write!(f, "remote task lease expired"),
410 Self::Cancelled(reason) => write!(f, "remote task cancelled: {reason}"),
411 Self::RemotePanic(msg) => write!(f, "remote task panicked: {msg}"),
412 Self::SerializationError(msg) => write!(f, "serialization error: {msg}"),
413 Self::TransportError(msg) => write!(f, "transport error: {msg}"),
414 }
415 }
416}
417
418impl std::error::Error for RemoteError {}
419
420pub struct RemoteHandle {
443 remote_task_id: RemoteTaskId,
445 local_task_id: Option<TaskId>,
447 node: NodeId,
449 computation: ComputationName,
451 owner_region: RegionId,
453 receiver: oneshot::Receiver<Result<RemoteOutcome, RemoteError>>,
455 lease: Duration,
457 state: RemoteTaskState,
459}
460
461impl fmt::Debug for RemoteHandle {
462 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
463 f.debug_struct("RemoteHandle")
464 .field("remote_task_id", &self.remote_task_id)
465 .field("local_task_id", &self.local_task_id)
466 .field("node", &self.node)
467 .field("computation", &self.computation)
468 .field("owner_region", &self.owner_region)
469 .field("lease", &self.lease)
470 .field("state", &self.state)
471 .finish_non_exhaustive()
472 }
473}
474
475impl RemoteHandle {
476 #[must_use]
478 pub fn remote_task_id(&self) -> RemoteTaskId {
479 self.remote_task_id
480 }
481
482 #[must_use]
484 pub fn local_task_id(&self) -> Option<TaskId> {
485 self.local_task_id
486 }
487
488 #[must_use]
490 pub fn node(&self) -> &NodeId {
491 &self.node
492 }
493
494 #[must_use]
496 pub fn computation(&self) -> &ComputationName {
497 &self.computation
498 }
499
500 #[must_use]
502 pub fn owner_region(&self) -> RegionId {
503 self.owner_region
504 }
505
506 #[must_use]
508 pub fn lease(&self) -> Duration {
509 self.lease
510 }
511
512 #[must_use]
514 pub fn state(&self) -> &RemoteTaskState {
515 &self.state
516 }
517
518 #[must_use]
520 pub fn is_finished(&self) -> bool {
521 self.receiver.is_ready()
522 }
523
524 pub async fn join(&self, cx: &Cx) -> Result<RemoteOutcome, RemoteError> {
533 self.receiver.recv(cx).await.unwrap_or_else(|_| {
534 Err(RemoteError::Cancelled(CancelReason::user(
535 "remote handle channel closed",
536 )))
537 })
538 }
539
540 pub fn try_join(&self) -> Result<Option<RemoteOutcome>, RemoteError> {
548 match self.receiver.try_recv() {
549 Ok(result) => Ok(Some(result?)),
550 Err(oneshot::TryRecvError::Empty) => Ok(None),
551 Err(oneshot::TryRecvError::Closed) => Err(RemoteError::Cancelled(CancelReason::user(
552 "remote handle channel closed",
553 ))),
554 }
555 }
556
557 pub fn abort(&self) {
564 }
567}
568
569pub fn spawn_remote(
619 cx: &Cx,
620 node: NodeId,
621 computation: ComputationName,
622 input: RemoteInput,
623) -> Result<RemoteHandle, RemoteError> {
624 let cap = cx.remote().ok_or(RemoteError::NoCapability)?;
626
627 let remote_task_id = RemoteTaskId::next();
628 let region = cx.region_id();
629 let lease = cap.default_lease();
630
631 cx.trace("spawn_remote");
632
633 let (tx, rx) = oneshot::channel::<Result<RemoteOutcome, RemoteError>>();
635
636 if let Some(runtime) = cap.runtime() {
638 runtime.register_task(remote_task_id, tx);
639
640 let req = SpawnRequest {
641 remote_task_id,
642 computation: computation.clone(),
643 input,
644 lease,
645 idempotency_key: IdempotencyKey::generate(cx),
646 budget: cap.remote_budget,
647 origin_node: cap.local_node().clone(),
648 origin_region: region,
649 origin_task: cx.task_id(),
650 };
651
652 let sender_time = cx.logical_now();
654
655 let envelope = MessageEnvelope::new(
656 req.origin_node.clone(),
657 sender_time,
658 RemoteMessage::SpawnRequest(req),
659 );
660 if let Err(err) = runtime.send_message(&node, envelope) {
661 runtime.unregister_task(remote_task_id);
662 return Err(err);
663 }
664 } else {
665 }
670
671 Ok(RemoteHandle {
672 remote_task_id,
673 local_task_id: None,
674 node,
675 computation,
676 owner_region: region,
677 receiver: rx,
678 lease,
679 state: RemoteTaskState::Pending,
680 })
681}
682
683#[derive(Debug, Clone, PartialEq, Eq)]
693pub enum LeaseError {
694 Expired,
696 Released,
698 CreationFailed(String),
700}
701
702impl fmt::Display for LeaseError {
703 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
704 match self {
705 Self::Expired => write!(f, "lease expired"),
706 Self::Released => write!(f, "lease already released"),
707 Self::CreationFailed(msg) => write!(f, "lease creation failed: {msg}"),
708 }
709 }
710}
711
712impl std::error::Error for LeaseError {}
713
714#[derive(Debug)]
752pub struct Lease {
753 obligation_id: ObligationId,
755 region: RegionId,
757 holder: TaskId,
759 expires_at: Time,
761 initial_duration: Duration,
763 state: LeaseState,
765 renewal_count: u32,
767}
768
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
771pub enum LeaseState {
772 Active,
774 Released,
776 Expired,
778}
779
780impl fmt::Display for LeaseState {
781 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782 match self {
783 Self::Active => write!(f, "Active"),
784 Self::Released => write!(f, "Released"),
785 Self::Expired => write!(f, "Expired"),
786 }
787 }
788}
789
790impl Lease {
791 #[must_use]
796 pub fn new(
797 obligation_id: ObligationId,
798 region: RegionId,
799 holder: TaskId,
800 duration: Duration,
801 now: Time,
802 ) -> Self {
803 let expires_at = now + duration;
804 Self {
805 obligation_id,
806 region,
807 holder,
808 expires_at,
809 initial_duration: duration,
810 state: LeaseState::Active,
811 renewal_count: 0,
812 }
813 }
814
815 #[must_use]
817 pub fn obligation_id(&self) -> ObligationId {
818 self.obligation_id
819 }
820
821 #[must_use]
823 pub fn region(&self) -> RegionId {
824 self.region
825 }
826
827 #[must_use]
829 pub fn holder(&self) -> TaskId {
830 self.holder
831 }
832
833 #[must_use]
835 pub fn expires_at(&self) -> Time {
836 self.expires_at
837 }
838
839 #[must_use]
841 pub fn initial_duration(&self) -> Duration {
842 self.initial_duration
843 }
844
845 #[must_use]
847 pub fn state(&self) -> LeaseState {
848 self.state
849 }
850
851 #[must_use]
853 pub fn renewal_count(&self) -> u32 {
854 self.renewal_count
855 }
856
857 #[must_use]
859 pub fn is_active(&self, now: Time) -> bool {
860 self.state == LeaseState::Active && now < self.expires_at
861 }
862
863 #[must_use]
865 pub fn is_expired(&self, now: Time) -> bool {
866 self.state == LeaseState::Expired
867 || (self.state == LeaseState::Active && now >= self.expires_at)
868 }
869
870 #[must_use]
872 pub fn is_released(&self) -> bool {
873 self.state == LeaseState::Released
874 }
875
876 #[must_use]
878 pub fn remaining(&self, now: Time) -> Duration {
879 if now >= self.expires_at {
880 Duration::ZERO
881 } else {
882 let nanos = self.expires_at.duration_since(now);
883 Duration::from_nanos(nanos)
884 }
885 }
886
887 pub fn renew(&mut self, duration: Duration, now: Time) -> Result<(), LeaseError> {
894 match self.state {
895 LeaseState::Released => return Err(LeaseError::Released),
896 LeaseState::Expired => return Err(LeaseError::Expired),
897 LeaseState::Active => {}
898 }
899 if now >= self.expires_at {
900 self.state = LeaseState::Expired;
901 return Err(LeaseError::Expired);
902 }
903 self.expires_at = now + duration;
904 self.renewal_count += 1;
905 Ok(())
906 }
907
908 pub fn release(&mut self, now: Time) -> Result<(), LeaseError> {
917 match self.state {
918 LeaseState::Released => return Err(LeaseError::Released),
919 LeaseState::Expired => return Err(LeaseError::Expired),
920 LeaseState::Active => {}
921 }
922 self.state = LeaseState::Released;
923 let _ = now; Ok(())
927 }
928
929 pub fn mark_expired(&mut self) -> Result<(), LeaseError> {
939 match self.state {
940 LeaseState::Released => return Err(LeaseError::Released),
941 LeaseState::Expired => return Ok(()), LeaseState::Active => {}
943 }
944 self.state = LeaseState::Expired;
945 Ok(())
946 }
947}
948
949#[derive(Clone, Debug)]
959pub struct IdempotencyRecord {
960 pub key: IdempotencyKey,
962 pub remote_task_id: RemoteTaskId,
964 pub computation: ComputationName,
966 pub created_at: Time,
968 pub expires_at: Time,
970 pub outcome: Option<RemoteOutcome>,
972}
973
974#[derive(Clone, Debug)]
976pub enum DedupDecision {
977 New,
979 Duplicate(IdempotencyRecord),
981 Conflict,
983}
984
985pub struct IdempotencyStore {
1001 entries: std::collections::HashMap<IdempotencyKey, IdempotencyRecord>,
1002 default_ttl: Duration,
1004}
1005
1006impl IdempotencyStore {
1007 #[must_use]
1009 pub fn new(default_ttl: Duration) -> Self {
1010 Self {
1011 entries: std::collections::HashMap::new(),
1012 default_ttl,
1013 }
1014 }
1015
1016 #[must_use]
1020 pub fn check(&self, key: &IdempotencyKey, computation: &ComputationName) -> DedupDecision {
1021 self.entries.get(key).map_or(DedupDecision::New, |record| {
1022 if record.computation == *computation {
1023 DedupDecision::Duplicate(record.clone())
1024 } else {
1025 DedupDecision::Conflict
1026 }
1027 })
1028 }
1029
1030 pub fn record(
1035 &mut self,
1036 key: IdempotencyKey,
1037 remote_task_id: RemoteTaskId,
1038 computation: ComputationName,
1039 now: Time,
1040 ) -> bool {
1041 use std::collections::hash_map::Entry;
1042 match self.entries.entry(key) {
1043 Entry::Vacant(e) => {
1044 let expires_at = now + self.default_ttl;
1045 e.insert(IdempotencyRecord {
1046 key,
1047 remote_task_id,
1048 computation,
1049 created_at: now,
1050 expires_at,
1051 outcome: None,
1052 });
1053 true
1054 }
1055 Entry::Occupied(_) => false,
1056 }
1057 }
1058
1059 pub fn complete(&mut self, key: &IdempotencyKey, outcome: RemoteOutcome) -> bool {
1063 match self.entries.get_mut(key) {
1064 Some(record) => {
1065 record.outcome = Some(outcome);
1066 true
1067 }
1068 None => false,
1069 }
1070 }
1071
1072 pub fn evict_expired(&mut self, now: Time) -> usize {
1076 let before = self.entries.len();
1077 self.entries.retain(|_, record| now < record.expires_at);
1078 before - self.entries.len()
1079 }
1080
1081 #[must_use]
1083 pub fn len(&self) -> usize {
1084 self.entries.len()
1085 }
1086
1087 #[must_use]
1089 pub fn is_empty(&self) -> bool {
1090 self.entries.is_empty()
1091 }
1092}
1093
1094impl fmt::Debug for IdempotencyStore {
1095 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1096 f.debug_struct("IdempotencyStore")
1097 .field("entries", &self.entries.len())
1098 .field("default_ttl", &self.default_ttl)
1099 .finish()
1100 }
1101}
1102
1103pub type StepIndex = usize;
1113
1114struct CompensationEntry {
1122 step: StepIndex,
1124 description: String,
1126 compensate: Box<dyn FnOnce() -> String + Send>,
1128}
1129
1130impl fmt::Debug for CompensationEntry {
1131 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1132 f.debug_struct("CompensationEntry")
1133 .field("step", &self.step)
1134 .field("description", &self.description)
1135 .finish_non_exhaustive()
1136 }
1137}
1138
1139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1141pub enum SagaState {
1142 Running,
1144 Completed,
1146 Compensating,
1148 Aborted,
1150}
1151
1152impl fmt::Display for SagaState {
1153 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1154 match self {
1155 Self::Running => write!(f, "Running"),
1156 Self::Completed => write!(f, "Completed"),
1157 Self::Compensating => write!(f, "Compensating"),
1158 Self::Aborted => write!(f, "Aborted"),
1159 }
1160 }
1161}
1162
1163#[derive(Debug, Clone)]
1165pub struct SagaStepError {
1166 pub step: StepIndex,
1168 pub description: String,
1170 pub message: String,
1172}
1173
1174impl fmt::Display for SagaStepError {
1175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1176 write!(
1177 f,
1178 "saga step {} ({}) failed: {}",
1179 self.step, self.description, self.message
1180 )
1181 }
1182}
1183
1184impl std::error::Error for SagaStepError {}
1185
1186#[derive(Debug, Clone)]
1188pub struct CompensationResult {
1189 pub step: StepIndex,
1191 pub description: String,
1193 pub result: String,
1195}
1196
1197pub struct Saga {
1239 state: SagaState,
1241 compensations: Vec<CompensationEntry>,
1243 completed_steps: StepIndex,
1245 compensation_results: Vec<CompensationResult>,
1247}
1248
1249impl fmt::Debug for Saga {
1250 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1251 f.debug_struct("Saga")
1252 .field("state", &self.state)
1253 .field("completed_steps", &self.completed_steps)
1254 .field("compensations", &self.compensations.len())
1255 .field("compensation_results", &self.compensation_results)
1256 .finish()
1257 }
1258}
1259
1260impl Saga {
1261 #[must_use]
1263 pub fn new() -> Self {
1264 Self {
1265 state: SagaState::Running,
1266 compensations: Vec::new(),
1267 completed_steps: 0,
1268 compensation_results: Vec::new(),
1269 }
1270 }
1271
1272 #[must_use]
1274 pub fn state(&self) -> SagaState {
1275 self.state
1276 }
1277
1278 #[must_use]
1280 pub fn completed_steps(&self) -> StepIndex {
1281 self.completed_steps
1282 }
1283
1284 #[must_use]
1286 pub fn compensation_results(&self) -> &[CompensationResult] {
1287 &self.compensation_results
1288 }
1289
1290 pub fn step<T>(
1306 &mut self,
1307 description: &str,
1308 action: impl FnOnce() -> Result<T, String>,
1309 compensate: impl FnOnce() -> String + Send + 'static,
1310 ) -> Result<T, SagaStepError> {
1311 assert_eq!(
1312 self.state,
1313 SagaState::Running,
1314 "cannot add steps to a saga that is not Running"
1315 );
1316
1317 let step_idx = self.completed_steps;
1318
1319 match action() {
1320 Ok(value) => {
1321 self.compensations.push(CompensationEntry {
1322 step: step_idx,
1323 description: description.to_string(),
1324 compensate: Box::new(compensate),
1325 });
1326 self.completed_steps += 1;
1327 Ok(value)
1328 }
1329 Err(msg) => {
1330 let err = SagaStepError {
1331 step: step_idx,
1332 description: description.to_string(),
1333 message: msg,
1334 };
1335 self.run_compensations();
1336 Err(err)
1337 }
1338 }
1339 }
1340
1341 pub fn complete(&mut self) {
1350 assert_eq!(
1351 self.state,
1352 SagaState::Running,
1353 "can only complete a Running saga"
1354 );
1355 self.state = SagaState::Completed;
1356 self.compensations.clear();
1357 }
1358
1359 pub fn abort(&mut self) {
1368 assert_eq!(
1369 self.state,
1370 SagaState::Running,
1371 "can only abort a Running saga"
1372 );
1373 self.run_compensations();
1374 }
1375
1376 fn run_compensations(&mut self) {
1378 self.state = SagaState::Compensating;
1379 let compensations: Vec<_> = self.compensations.drain(..).collect();
1380 for entry in compensations.into_iter().rev() {
1381 let result_desc = (entry.compensate)();
1382 self.compensation_results.push(CompensationResult {
1383 step: entry.step,
1384 description: entry.description,
1385 result: result_desc,
1386 });
1387 }
1388 self.state = SagaState::Aborted;
1389 }
1390}
1391
1392impl Default for Saga {
1393 fn default() -> Self {
1394 Self::new()
1395 }
1396}
1397
1398#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
1419pub struct IdempotencyKey(u128);
1420
1421impl IdempotencyKey {
1422 #[must_use]
1424 pub fn generate(cx: &Cx) -> Self {
1425 let high = cx.random_u64();
1426 let low = cx.random_u64();
1427 Self((u128::from(high) << 64) | u128::from(low))
1428 }
1429
1430 #[must_use]
1432 pub const fn from_raw(value: u128) -> Self {
1433 Self(value)
1434 }
1435
1436 #[must_use]
1438 pub const fn raw(self) -> u128 {
1439 self.0
1440 }
1441}
1442
1443impl fmt::Display for IdempotencyKey {
1444 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1445 write!(f, "IK-{:032x}", self.0)
1446 }
1447}
1448
1449#[derive(Clone, Debug)]
1458pub struct MessageEnvelope<T> {
1459 pub sender: NodeId,
1461 pub sender_time: LogicalTime,
1463 pub payload: T,
1465}
1466
1467impl<T> MessageEnvelope<T> {
1468 #[must_use]
1470 pub fn new(sender: NodeId, sender_time: LogicalTime, payload: T) -> Self {
1471 Self {
1472 sender,
1473 sender_time,
1474 payload,
1475 }
1476 }
1477}
1478
1479pub trait RemoteTransport {
1484 fn send(
1489 &mut self,
1490 to: &NodeId,
1491 envelope: MessageEnvelope<RemoteMessage>,
1492 ) -> Result<(), RemoteError>;
1493
1494 fn try_recv(&mut self) -> Option<MessageEnvelope<RemoteMessage>>;
1498}
1499
1500#[derive(Clone, Debug)]
1505pub enum RemoteMessage {
1506 SpawnRequest(SpawnRequest),
1508 SpawnAck(SpawnAck),
1510 CancelRequest(CancelRequest),
1512 ResultDelivery(ResultDelivery),
1514 LeaseRenewal(LeaseRenewal),
1516}
1517
1518impl RemoteMessage {
1519 #[must_use]
1521 pub fn remote_task_id(&self) -> RemoteTaskId {
1522 match self {
1523 Self::SpawnRequest(m) => m.remote_task_id,
1524 Self::SpawnAck(m) => m.remote_task_id,
1525 Self::CancelRequest(m) => m.remote_task_id,
1526 Self::ResultDelivery(m) => m.remote_task_id,
1527 Self::LeaseRenewal(m) => m.remote_task_id,
1528 }
1529 }
1530}
1531
1532#[derive(Clone, Debug)]
1550pub struct SpawnRequest {
1551 pub remote_task_id: RemoteTaskId,
1553 pub computation: ComputationName,
1555 pub input: RemoteInput,
1557 pub lease: Duration,
1559 pub idempotency_key: IdempotencyKey,
1561 pub budget: Option<Budget>,
1563 pub origin_node: NodeId,
1565 pub origin_region: RegionId,
1567 pub origin_task: TaskId,
1569}
1570
1571#[derive(Clone, Debug)]
1580pub struct SpawnAck {
1581 pub remote_task_id: RemoteTaskId,
1583 pub status: SpawnAckStatus,
1585 pub assigned_node: NodeId,
1587}
1588
1589#[derive(Clone, Debug, PartialEq, Eq)]
1591pub enum SpawnAckStatus {
1592 Accepted,
1594 Rejected(SpawnRejectReason),
1596}
1597
1598#[derive(Clone, Debug, PartialEq, Eq)]
1600pub enum SpawnRejectReason {
1601 UnknownComputation,
1603 CapacityExceeded,
1605 NodeShuttingDown,
1607 InvalidInput(String),
1609 IdempotencyConflict,
1611}
1612
1613impl fmt::Display for SpawnRejectReason {
1614 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1615 match self {
1616 Self::UnknownComputation => write!(f, "unknown computation"),
1617 Self::CapacityExceeded => write!(f, "capacity exceeded"),
1618 Self::NodeShuttingDown => write!(f, "node shutting down"),
1619 Self::InvalidInput(msg) => write!(f, "invalid input: {msg}"),
1620 Self::IdempotencyConflict => write!(f, "idempotency conflict"),
1621 }
1622 }
1623}
1624
1625#[derive(Clone, Debug)]
1634pub struct CancelRequest {
1635 pub remote_task_id: RemoteTaskId,
1637 pub reason: CancelReason,
1639 pub origin_node: NodeId,
1641}
1642
1643#[derive(Clone, Debug)]
1652pub struct ResultDelivery {
1653 pub remote_task_id: RemoteTaskId,
1655 pub outcome: RemoteOutcome,
1657 pub execution_time: Duration,
1659}
1660
1661#[derive(Clone, Debug)]
1666pub enum RemoteOutcome {
1667 Success(Vec<u8>),
1669 Failed(String),
1671 Cancelled(CancelReason),
1673 Panicked(String),
1675}
1676
1677impl RemoteOutcome {
1678 #[must_use]
1680 pub fn severity(&self) -> crate::types::Severity {
1681 match self {
1682 Self::Success(_) => crate::types::Severity::Ok,
1683 Self::Failed(_) => crate::types::Severity::Err,
1684 Self::Cancelled(_) => crate::types::Severity::Cancelled,
1685 Self::Panicked(_) => crate::types::Severity::Panicked,
1686 }
1687 }
1688
1689 #[must_use]
1691 pub fn is_success(&self) -> bool {
1692 matches!(self, Self::Success(_))
1693 }
1694}
1695
1696impl fmt::Display for RemoteOutcome {
1697 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698 match self {
1699 Self::Success(_) => write!(f, "Success"),
1700 Self::Failed(msg) => write!(f, "Failed: {msg}"),
1701 Self::Cancelled(reason) => write!(f, "Cancelled: {reason}"),
1702 Self::Panicked(msg) => write!(f, "Panicked: {msg}"),
1703 }
1704 }
1705}
1706
1707#[derive(Clone, Debug)]
1720pub struct LeaseRenewal {
1721 pub remote_task_id: RemoteTaskId,
1723 pub new_lease: Duration,
1725 pub current_state: RemoteTaskState,
1727 pub node: NodeId,
1729}
1730
1731#[derive(Debug, Clone, PartialEq, Eq)]
1737pub enum RemoteProtocolError {
1738 RemoteTaskIdMismatch {
1740 expected: RemoteTaskId,
1742 got: RemoteTaskId,
1744 },
1745 UnexpectedAckStatus {
1747 expected: &'static str,
1749 got: SpawnAckStatus,
1751 },
1752}
1753
1754impl fmt::Display for RemoteProtocolError {
1755 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1756 match self {
1757 Self::RemoteTaskIdMismatch { expected, got } => {
1758 write!(f, "remote task id mismatch: expected {expected}, got {got}")
1759 }
1760 Self::UnexpectedAckStatus { expected, got } => write!(
1761 f,
1762 "unexpected spawn ack status: expected {expected}, got {got:?}"
1763 ),
1764 }
1765 }
1766}
1767
1768impl std::error::Error for RemoteProtocolError {}
1769
1770#[derive(Debug)]
1772pub struct OriginInit;
1773#[derive(Debug)]
1775pub struct OriginSpawned;
1776#[derive(Debug)]
1778pub struct OriginRunning;
1779#[derive(Debug)]
1781pub struct OriginCancelSent;
1782#[derive(Debug)]
1784pub struct OriginLeaseExpired;
1785#[derive(Debug)]
1787pub struct OriginCompleted;
1788#[derive(Debug)]
1790pub struct OriginRejected;
1791
1792#[derive(Debug)]
1794pub struct RemoteInit;
1795#[derive(Debug)]
1797pub struct RemoteSpawnReceived;
1798#[derive(Debug)]
1800pub struct RemoteCancelPending;
1801#[derive(Debug)]
1803pub struct RemoteRunning;
1804#[derive(Debug)]
1806pub struct RemoteCancelReceived;
1807#[derive(Debug)]
1809pub struct RemoteCompleted;
1810#[derive(Debug)]
1812pub struct RemoteRejected;
1813
1814#[must_use = "OriginSession must be advanced to completion or rejected"]
1816#[derive(Debug)]
1817pub struct OriginSession<S> {
1818 remote_task_id: RemoteTaskId,
1819 _state: PhantomData<S>,
1820}
1821
1822impl OriginSession<OriginInit> {
1823 pub fn new(remote_task_id: RemoteTaskId) -> Self {
1825 Self {
1826 remote_task_id,
1827 _state: PhantomData,
1828 }
1829 }
1830
1831 pub fn send_spawn(
1833 self,
1834 req: &SpawnRequest,
1835 ) -> Result<OriginSession<OriginSpawned>, RemoteProtocolError> {
1836 self.ensure_id(req.remote_task_id)?;
1837 Ok(self.transition())
1838 }
1839}
1840
1841impl<S> OriginSession<S> {
1842 #[must_use]
1844 pub fn remote_task_id(&self) -> RemoteTaskId {
1845 self.remote_task_id
1846 }
1847
1848 fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
1849 if self.remote_task_id == got {
1850 Ok(())
1851 } else {
1852 Err(RemoteProtocolError::RemoteTaskIdMismatch {
1853 expected: self.remote_task_id,
1854 got,
1855 })
1856 }
1857 }
1858
1859 fn transition<T>(self) -> OriginSession<T> {
1860 OriginSession {
1861 remote_task_id: self.remote_task_id,
1862 _state: PhantomData,
1863 }
1864 }
1865}
1866
1867pub enum OriginAckOutcome {
1869 Accepted(OriginSession<OriginRunning>),
1871 Rejected(OriginSession<OriginRejected>),
1873}
1874
1875impl OriginSession<OriginSpawned> {
1876 pub fn recv_spawn_ack(self, ack: &SpawnAck) -> Result<OriginAckOutcome, RemoteProtocolError> {
1878 self.ensure_id(ack.remote_task_id)?;
1879 match ack.status {
1880 SpawnAckStatus::Accepted => Ok(OriginAckOutcome::Accepted(self.transition())),
1881 SpawnAckStatus::Rejected(_) => Ok(OriginAckOutcome::Rejected(self.transition())),
1882 }
1883 }
1884
1885 pub fn send_cancel(
1887 self,
1888 cancel: &CancelRequest,
1889 ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1890 self.ensure_id(cancel.remote_task_id)?;
1891 Ok(self.transition())
1892 }
1893}
1894
1895impl OriginSession<OriginRunning> {
1896 pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
1898 self.ensure_id(renewal.remote_task_id)?;
1899 Ok(self)
1900 }
1901
1902 pub fn send_cancel(
1904 self,
1905 cancel: &CancelRequest,
1906 ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1907 self.ensure_id(cancel.remote_task_id)?;
1908 Ok(self.transition())
1909 }
1910
1911 pub fn recv_result(
1913 self,
1914 result: &ResultDelivery,
1915 ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1916 self.ensure_id(result.remote_task_id)?;
1917 Ok(self.transition())
1918 }
1919
1920 pub fn lease_expired(self) -> OriginSession<OriginLeaseExpired> {
1922 self.transition()
1923 }
1924}
1925
1926impl OriginSession<OriginCancelSent> {
1927 pub fn recv_result(
1929 self,
1930 result: &ResultDelivery,
1931 ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1932 self.ensure_id(result.remote_task_id)?;
1933 Ok(self.transition())
1934 }
1935
1936 pub fn recv_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
1938 self.ensure_id(renewal.remote_task_id)?;
1939 Ok(self)
1940 }
1941}
1942
1943impl OriginSession<OriginLeaseExpired> {
1944 pub fn send_cancel(
1946 self,
1947 cancel: &CancelRequest,
1948 ) -> Result<OriginSession<OriginCancelSent>, RemoteProtocolError> {
1949 self.ensure_id(cancel.remote_task_id)?;
1950 Ok(self.transition())
1951 }
1952
1953 pub fn recv_result(
1955 self,
1956 result: &ResultDelivery,
1957 ) -> Result<OriginSession<OriginCompleted>, RemoteProtocolError> {
1958 self.ensure_id(result.remote_task_id)?;
1959 Ok(self.transition())
1960 }
1961}
1962
1963#[must_use = "RemoteSession must be advanced to completion or rejected"]
1965#[derive(Debug)]
1966pub struct RemoteSession<S> {
1967 remote_task_id: RemoteTaskId,
1968 _state: PhantomData<S>,
1969}
1970
1971impl RemoteSession<RemoteInit> {
1972 pub fn new(remote_task_id: RemoteTaskId) -> Self {
1974 Self {
1975 remote_task_id,
1976 _state: PhantomData,
1977 }
1978 }
1979
1980 pub fn recv_spawn(
1982 self,
1983 req: &SpawnRequest,
1984 ) -> Result<RemoteSession<RemoteSpawnReceived>, RemoteProtocolError> {
1985 self.ensure_id(req.remote_task_id)?;
1986 Ok(self.transition())
1987 }
1988}
1989
1990impl<S> RemoteSession<S> {
1991 #[must_use]
1993 pub fn remote_task_id(&self) -> RemoteTaskId {
1994 self.remote_task_id
1995 }
1996
1997 fn ensure_id(&self, got: RemoteTaskId) -> Result<(), RemoteProtocolError> {
1998 if self.remote_task_id == got {
1999 Ok(())
2000 } else {
2001 Err(RemoteProtocolError::RemoteTaskIdMismatch {
2002 expected: self.remote_task_id,
2003 got,
2004 })
2005 }
2006 }
2007
2008 fn transition<T>(self) -> RemoteSession<T> {
2009 RemoteSession {
2010 remote_task_id: self.remote_task_id,
2011 _state: PhantomData,
2012 }
2013 }
2014}
2015
2016impl RemoteSession<RemoteSpawnReceived> {
2017 pub fn send_ack_accepted(
2019 self,
2020 ack: &SpawnAck,
2021 ) -> Result<RemoteSession<RemoteRunning>, RemoteProtocolError> {
2022 self.ensure_id(ack.remote_task_id)?;
2023 match ack.status {
2024 SpawnAckStatus::Accepted => Ok(self.transition()),
2025 SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
2026 expected: "Accepted",
2027 got: ack.status.clone(),
2028 }),
2029 }
2030 }
2031
2032 pub fn send_ack_rejected(
2034 self,
2035 ack: &SpawnAck,
2036 ) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
2037 self.ensure_id(ack.remote_task_id)?;
2038 match ack.status {
2039 SpawnAckStatus::Rejected(_) => Ok(self.transition()),
2040 SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
2041 expected: "Rejected",
2042 got: ack.status.clone(),
2043 }),
2044 }
2045 }
2046
2047 pub fn recv_cancel(
2049 self,
2050 cancel: &CancelRequest,
2051 ) -> Result<RemoteSession<RemoteCancelPending>, RemoteProtocolError> {
2052 self.ensure_id(cancel.remote_task_id)?;
2053 Ok(self.transition())
2054 }
2055}
2056
2057impl RemoteSession<RemoteCancelPending> {
2058 pub fn send_ack_accepted(
2060 self,
2061 ack: &SpawnAck,
2062 ) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
2063 self.ensure_id(ack.remote_task_id)?;
2064 match ack.status {
2065 SpawnAckStatus::Accepted => Ok(self.transition()),
2066 SpawnAckStatus::Rejected(_) => Err(RemoteProtocolError::UnexpectedAckStatus {
2067 expected: "Accepted",
2068 got: ack.status.clone(),
2069 }),
2070 }
2071 }
2072
2073 pub fn send_ack_rejected(
2075 self,
2076 ack: &SpawnAck,
2077 ) -> Result<RemoteSession<RemoteRejected>, RemoteProtocolError> {
2078 self.ensure_id(ack.remote_task_id)?;
2079 match ack.status {
2080 SpawnAckStatus::Rejected(_) => Ok(self.transition()),
2081 SpawnAckStatus::Accepted => Err(RemoteProtocolError::UnexpectedAckStatus {
2082 expected: "Rejected",
2083 got: ack.status.clone(),
2084 }),
2085 }
2086 }
2087}
2088
2089impl RemoteSession<RemoteRunning> {
2090 pub fn recv_cancel(
2092 self,
2093 cancel: &CancelRequest,
2094 ) -> Result<RemoteSession<RemoteCancelReceived>, RemoteProtocolError> {
2095 self.ensure_id(cancel.remote_task_id)?;
2096 Ok(self.transition())
2097 }
2098
2099 pub fn send_lease_renewal(self, renewal: &LeaseRenewal) -> Result<Self, RemoteProtocolError> {
2101 self.ensure_id(renewal.remote_task_id)?;
2102 Ok(self)
2103 }
2104
2105 pub fn send_result(
2107 self,
2108 result: &ResultDelivery,
2109 ) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
2110 self.ensure_id(result.remote_task_id)?;
2111 Ok(self.transition())
2112 }
2113}
2114
2115impl RemoteSession<RemoteCancelReceived> {
2116 pub fn send_result(
2118 self,
2119 result: &ResultDelivery,
2120 ) -> Result<RemoteSession<RemoteCompleted>, RemoteProtocolError> {
2121 self.ensure_id(result.remote_task_id)?;
2122 Ok(self.transition())
2123 }
2124}
2125
2126pub mod trace_events {
2136 pub const SPAWN_REQUEST_CREATED: &str = "remote::spawn_request_created";
2138 pub const SPAWN_REQUEST_SENT: &str = "remote::spawn_request_sent";
2140 pub const SPAWN_ACK_RECEIVED: &str = "remote::spawn_ack_received";
2142 pub const SPAWN_REJECTED: &str = "remote::spawn_rejected";
2144 pub const CANCEL_SENT: &str = "remote::cancel_sent";
2146 pub const CANCEL_RECEIVED: &str = "remote::cancel_received";
2148 pub const RESULT_DELIVERED: &str = "remote::result_delivered";
2150 pub const LEASE_RENEWAL_SENT: &str = "remote::lease_renewal_sent";
2152 pub const LEASE_RENEWAL_RECEIVED: &str = "remote::lease_renewal_received";
2154 pub const LEASE_EXPIRED: &str = "remote::lease_expired";
2156}
2157
2158#[cfg(test)]
2163mod tests {
2164 use super::*;
2165 use std::sync::Mutex;
2166
2167 #[test]
2168 fn node_id_basics() {
2169 let node = NodeId::new("worker-1");
2170 assert_eq!(node.as_str(), "worker-1");
2171 assert_eq!(format!("{node}"), "Node(worker-1)");
2172
2173 let node2 = NodeId::new("worker-1");
2174 assert_eq!(node, node2);
2175
2176 let node3 = NodeId::new("worker-2");
2177 assert_ne!(node, node3);
2178 }
2179
2180 #[test]
2181 fn computation_name_basics() {
2182 let name = ComputationName::new("encode_block");
2183 assert_eq!(name.as_str(), "encode_block");
2184 assert_eq!(format!("{name}"), "encode_block");
2185
2186 let name2 = ComputationName::new("encode_block");
2187 assert_eq!(name, name2);
2188 }
2189
2190 #[test]
2191 fn remote_input_basics() {
2192 let input = RemoteInput::new(vec![1, 2, 3]);
2193 assert_eq!(input.data(), &[1, 2, 3]);
2194 assert_eq!(input.len(), 3);
2195 assert!(!input.is_empty());
2196
2197 let empty = RemoteInput::empty();
2198 assert!(empty.is_empty());
2199 assert_eq!(empty.len(), 0);
2200
2201 let owned = input.into_data();
2202 assert_eq!(owned, vec![1, 2, 3]);
2203 }
2204
2205 #[test]
2206 fn remote_cap_defaults() {
2207 let cap = RemoteCap::new();
2208 assert_eq!(cap.default_lease(), Duration::from_secs(30));
2209 assert!(cap.remote_budget().is_none());
2210 assert_eq!(cap.local_node().as_str(), "local");
2211 }
2212
2213 #[test]
2214 fn remote_cap_builder() {
2215 let cap = RemoteCap::new()
2216 .with_default_lease(Duration::from_mins(1))
2217 .with_remote_budget(Budget::INFINITE)
2218 .with_local_node(NodeId::new("origin-a"));
2219 assert_eq!(cap.default_lease(), Duration::from_mins(1));
2220 assert!(cap.remote_budget().is_some());
2221 assert_eq!(cap.local_node().as_str(), "origin-a");
2222 }
2223
2224 #[derive(Debug, Default)]
2225 struct CaptureRuntime {
2226 sent: Mutex<Vec<(NodeId, MessageEnvelope<RemoteMessage>)>>,
2227 }
2228
2229 impl RemoteRuntime for CaptureRuntime {
2230 fn send_message(
2231 &self,
2232 destination: &NodeId,
2233 envelope: MessageEnvelope<RemoteMessage>,
2234 ) -> Result<(), RemoteError> {
2235 self.sent
2236 .lock()
2237 .expect("capture runtime lock poisoned")
2238 .push((destination.clone(), envelope));
2239 Ok(())
2240 }
2241
2242 fn register_task(
2243 &self,
2244 _task_id: RemoteTaskId,
2245 _tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
2246 ) {
2247 }
2249 }
2250
2251 #[derive(Debug, Default)]
2252 struct FailingSendRuntime {
2253 registered: Mutex<Vec<RemoteTaskId>>,
2254 unregistered: Mutex<Vec<RemoteTaskId>>,
2255 }
2256
2257 impl RemoteRuntime for FailingSendRuntime {
2258 fn send_message(
2259 &self,
2260 _destination: &NodeId,
2261 _envelope: MessageEnvelope<RemoteMessage>,
2262 ) -> Result<(), RemoteError> {
2263 Err(RemoteError::TransportError("simulated send failure".into()))
2264 }
2265
2266 fn register_task(
2267 &self,
2268 task_id: RemoteTaskId,
2269 _tx: oneshot::Sender<Result<RemoteOutcome, RemoteError>>,
2270 ) {
2271 self.registered
2272 .lock()
2273 .expect("failing runtime lock poisoned")
2274 .push(task_id);
2275 }
2276
2277 fn unregister_task(&self, task_id: RemoteTaskId) {
2278 self.unregistered
2279 .lock()
2280 .expect("failing runtime lock poisoned")
2281 .push(task_id);
2282 }
2283 }
2284
2285 #[test]
2286 fn spawn_remote_uses_cap_local_node_for_origin() {
2287 let runtime = Arc::new(CaptureRuntime::default());
2288 let cap = RemoteCap::new()
2289 .with_local_node(NodeId::new("origin-a"))
2290 .with_runtime(runtime.clone());
2291 let cx: Cx = Cx::for_testing_with_remote(cap);
2292
2293 let _ = spawn_remote(
2294 &cx,
2295 NodeId::new("worker-1"),
2296 ComputationName::new("encode_block"),
2297 RemoteInput::new(vec![1, 2, 3]),
2298 )
2299 .expect("spawn_remote should succeed");
2300
2301 let (destination, envelope) = {
2302 let sent = runtime.sent.lock().expect("capture runtime lock poisoned");
2303 assert_eq!(sent.len(), 1);
2304 sent[0].clone()
2305 };
2306 assert_eq!(destination.as_str(), "worker-1");
2307 assert_eq!(envelope.sender.as_str(), "origin-a");
2308 match &envelope.payload {
2309 RemoteMessage::SpawnRequest(req) => {
2310 assert_eq!(req.origin_node.as_str(), "origin-a");
2311 }
2312 other => unreachable!("expected SpawnRequest, got {other:?}"),
2313 }
2314 }
2315
2316 #[test]
2317 fn spawn_remote_send_failure_unregisters_pending_task() {
2318 let runtime = Arc::new(FailingSendRuntime::default());
2319 let cap = RemoteCap::new().with_runtime(runtime.clone());
2320 let cx: Cx = Cx::for_testing_with_remote(cap);
2321
2322 let err = spawn_remote(
2323 &cx,
2324 NodeId::new("worker-1"),
2325 ComputationName::new("encode_block"),
2326 RemoteInput::new(vec![1, 2, 3]),
2327 )
2328 .expect_err("spawn_remote should fail when send_message fails");
2329 match err {
2330 RemoteError::TransportError(msg) => {
2331 assert!(msg.contains("simulated send failure"));
2332 }
2333 other => unreachable!("expected TransportError, got {other:?}"),
2334 }
2335
2336 let registered = runtime
2337 .registered
2338 .lock()
2339 .expect("failing runtime lock poisoned")
2340 .clone();
2341 let unregistered = runtime
2342 .unregistered
2343 .lock()
2344 .expect("failing runtime lock poisoned")
2345 .clone();
2346
2347 assert_eq!(registered.len(), 1);
2348 assert_eq!(unregistered, registered);
2349 }
2350
2351 #[test]
2352 fn remote_task_id_uniqueness() {
2353 let id1 = RemoteTaskId::next();
2354 let id2 = RemoteTaskId::next();
2355 assert_ne!(id1, id2);
2356 assert!(id2.raw() > id1.raw());
2357 }
2358
2359 #[test]
2360 fn remote_task_state_display() {
2361 assert_eq!(format!("{}", RemoteTaskState::Pending), "Pending");
2362 assert_eq!(format!("{}", RemoteTaskState::Running), "Running");
2363 assert_eq!(format!("{}", RemoteTaskState::Completed), "Completed");
2364 assert_eq!(format!("{}", RemoteTaskState::LeaseExpired), "LeaseExpired");
2365 }
2366
2367 #[test]
2368 fn remote_error_display() {
2369 let err = RemoteError::NoCapability;
2370 assert_eq!(format!("{err}"), "remote capability not available");
2371
2372 let err = RemoteError::NodeUnreachable("worker-9".into());
2373 assert!(format!("{err}").contains("worker-9"));
2374
2375 let err = RemoteError::UnknownComputation("bad_fn".into());
2376 assert!(format!("{err}").contains("bad_fn"));
2377 }
2378
2379 #[test]
2380 fn spawn_remote_without_cap_fails() {
2381 let cx: Cx = Cx::for_testing();
2382 assert!(!cx.has_remote());
2383
2384 let result = spawn_remote(
2385 &cx,
2386 NodeId::new("worker-1"),
2387 ComputationName::new("encode"),
2388 RemoteInput::empty(),
2389 );
2390 assert!(result.is_err());
2391 assert_eq!(result.unwrap_err(), RemoteError::NoCapability);
2392 }
2393
2394 #[test]
2395 fn spawn_remote_with_cap_succeeds() {
2396 let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2397 assert!(cx.has_remote());
2398
2399 let result = spawn_remote(
2400 &cx,
2401 NodeId::new("worker-1"),
2402 ComputationName::new("encode_block"),
2403 RemoteInput::new(vec![42]),
2404 );
2405 assert!(result.is_ok());
2406
2407 let handle = result.unwrap();
2408 assert_eq!(handle.node().as_str(), "worker-1");
2409 assert_eq!(handle.computation().as_str(), "encode_block");
2410 assert_eq!(*handle.state(), RemoteTaskState::Pending);
2411 assert_eq!(handle.lease(), Duration::from_secs(30));
2412 assert!(handle.local_task_id().is_none());
2413 }
2414
2415 #[test]
2416 fn remote_handle_debug() {
2417 let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2418 let handle = spawn_remote(
2419 &cx,
2420 NodeId::new("n1"),
2421 ComputationName::new("compute"),
2422 RemoteInput::empty(),
2423 )
2424 .unwrap();
2425
2426 let debug = format!("{handle:?}");
2427 assert!(debug.contains("RemoteHandle"));
2428 assert!(debug.contains("n1"));
2429 assert!(debug.contains("compute"));
2430 }
2431
2432 #[test]
2433 fn remote_handle_not_finished_initially() {
2434 let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2435 let handle = spawn_remote(
2436 &cx,
2437 NodeId::new("n1"),
2438 ComputationName::new("add"),
2439 RemoteInput::empty(),
2440 )
2441 .unwrap();
2442
2443 let _ = handle.is_finished();
2448 }
2449
2450 #[test]
2451 fn remote_handle_try_join_pending() {
2452 let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2453 let handle = spawn_remote(
2454 &cx,
2455 NodeId::new("n1"),
2456 ComputationName::new("work"),
2457 RemoteInput::empty(),
2458 )
2459 .unwrap();
2460
2461 let result = handle.try_join();
2464 assert!(result.is_ok() || matches!(result, Err(RemoteError::Cancelled(_))));
2466 }
2467
2468 #[test]
2469 fn remote_handle_abort_no_panic() {
2470 let cx: Cx = Cx::for_testing_with_remote(RemoteCap::new());
2471 let handle = spawn_remote(
2472 &cx,
2473 NodeId::new("n1"),
2474 ComputationName::new("long_task"),
2475 RemoteInput::empty(),
2476 )
2477 .unwrap();
2478
2479 handle.abort();
2481 }
2482
2483 #[test]
2484 fn remote_cap_custom_lease_propagates() {
2485 let cap = RemoteCap::new().with_default_lease(Duration::from_mins(2));
2486 let cx: Cx = Cx::for_testing_with_remote(cap);
2487
2488 let handle = spawn_remote(
2489 &cx,
2490 NodeId::new("n1"),
2491 ComputationName::new("slow"),
2492 RemoteInput::empty(),
2493 )
2494 .unwrap();
2495
2496 assert_eq!(handle.lease(), Duration::from_mins(2));
2497 }
2498
2499 #[test]
2504 fn idempotency_key_generate() {
2505 let cx: Cx = Cx::for_testing();
2506 let k1 = IdempotencyKey::generate(&cx);
2507 let k2 = IdempotencyKey::generate(&cx);
2508 assert_ne!(k1, k2);
2510 assert_ne!(k1.raw(), 0);
2511 }
2512
2513 #[test]
2514 fn idempotency_key_from_raw() {
2515 let key = IdempotencyKey::from_raw(0xDEAD_BEEF);
2516 assert_eq!(key.raw(), 0xDEAD_BEEF);
2517 let display = format!("{key}");
2518 assert!(display.starts_with("IK-"));
2519 assert!(display.contains("deadbeef"));
2520 }
2521
2522 #[test]
2523 fn spawn_request_construction() {
2524 let cx: Cx = Cx::for_testing();
2525 let req = SpawnRequest {
2526 remote_task_id: RemoteTaskId::next(),
2527 computation: ComputationName::new("encode_block"),
2528 input: RemoteInput::new(vec![1, 2, 3]),
2529 lease: Duration::from_mins(1),
2530 idempotency_key: IdempotencyKey::generate(&cx),
2531 budget: None,
2532 origin_node: NodeId::new("origin-1"),
2533 origin_region: cx.region_id(),
2534 origin_task: cx.task_id(),
2535 };
2536
2537 assert_eq!(req.computation.as_str(), "encode_block");
2538 assert_eq!(req.input.len(), 3);
2539 assert_eq!(req.lease, Duration::from_mins(1));
2540 assert_eq!(req.origin_node.as_str(), "origin-1");
2541 }
2542
2543 #[test]
2544 fn spawn_ack_accepted() {
2545 let ack = SpawnAck {
2546 remote_task_id: RemoteTaskId::next(),
2547 status: SpawnAckStatus::Accepted,
2548 assigned_node: NodeId::new("worker-3"),
2549 };
2550 assert_eq!(ack.status, SpawnAckStatus::Accepted);
2551 assert_eq!(ack.assigned_node.as_str(), "worker-3");
2552 }
2553
2554 #[test]
2555 fn spawn_ack_rejected() {
2556 let ack = SpawnAck {
2557 remote_task_id: RemoteTaskId::next(),
2558 status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
2559 assigned_node: NodeId::new("worker-1"),
2560 };
2561 assert_eq!(
2562 ack.status,
2563 SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded)
2564 );
2565 }
2566
2567 #[test]
2568 fn spawn_reject_reason_display() {
2569 assert_eq!(
2570 format!("{}", SpawnRejectReason::UnknownComputation),
2571 "unknown computation"
2572 );
2573 assert_eq!(
2574 format!("{}", SpawnRejectReason::CapacityExceeded),
2575 "capacity exceeded"
2576 );
2577 assert_eq!(
2578 format!("{}", SpawnRejectReason::NodeShuttingDown),
2579 "node shutting down"
2580 );
2581 assert!(
2582 format!("{}", SpawnRejectReason::InvalidInput("bad data".into())).contains("bad data")
2583 );
2584 assert_eq!(
2585 format!("{}", SpawnRejectReason::IdempotencyConflict),
2586 "idempotency conflict"
2587 );
2588 }
2589
2590 #[test]
2591 fn cancel_request_construction() {
2592 let req = CancelRequest {
2593 remote_task_id: RemoteTaskId::next(),
2594 reason: CancelReason::user("user abort"),
2595 origin_node: NodeId::new("origin-1"),
2596 };
2597 assert_eq!(req.origin_node.as_str(), "origin-1");
2598 }
2599
2600 #[test]
2601 fn result_delivery_success() {
2602 let delivery = ResultDelivery {
2603 remote_task_id: RemoteTaskId::next(),
2604 outcome: RemoteOutcome::Success(vec![42]),
2605 execution_time: Duration::from_millis(150),
2606 };
2607 assert!(delivery.outcome.is_success());
2608 assert_eq!(delivery.outcome.severity(), crate::types::Severity::Ok);
2609 assert_eq!(delivery.execution_time, Duration::from_millis(150));
2610 }
2611
2612 #[test]
2613 fn result_delivery_failure() {
2614 let delivery = ResultDelivery {
2615 remote_task_id: RemoteTaskId::next(),
2616 outcome: RemoteOutcome::Failed("out of memory".into()),
2617 execution_time: Duration::from_secs(5),
2618 };
2619 assert!(!delivery.outcome.is_success());
2620 assert_eq!(delivery.outcome.severity(), crate::types::Severity::Err);
2621 }
2622
2623 #[test]
2624 fn remote_outcome_display() {
2625 assert_eq!(format!("{}", RemoteOutcome::Success(vec![])), "Success");
2626 assert!(format!("{}", RemoteOutcome::Failed("oops".into())).contains("oops"));
2627 assert!(
2628 format!("{}", RemoteOutcome::Cancelled(CancelReason::user("done")))
2629 .contains("Cancelled")
2630 );
2631 assert!(format!("{}", RemoteOutcome::Panicked("boom".into())).contains("boom"));
2632 }
2633
2634 #[test]
2635 fn lease_renewal_construction() {
2636 let renewal = LeaseRenewal {
2637 remote_task_id: RemoteTaskId::next(),
2638 new_lease: Duration::from_secs(30),
2639 current_state: RemoteTaskState::Running,
2640 node: NodeId::new("worker-1"),
2641 };
2642 assert_eq!(renewal.new_lease, Duration::from_secs(30));
2643 assert_eq!(renewal.current_state, RemoteTaskState::Running);
2644 }
2645
2646 #[test]
2647 fn remote_message_task_id_dispatch() {
2648 let rtid = RemoteTaskId::next();
2649 let cx: Cx = Cx::for_testing();
2650
2651 let spawn_msg = RemoteMessage::SpawnRequest(SpawnRequest {
2652 remote_task_id: rtid,
2653 computation: ComputationName::new("test"),
2654 input: RemoteInput::empty(),
2655 lease: Duration::from_secs(30),
2656 idempotency_key: IdempotencyKey::generate(&cx),
2657 budget: None,
2658 origin_node: NodeId::new("n1"),
2659 origin_region: cx.region_id(),
2660 origin_task: cx.task_id(),
2661 });
2662 assert_eq!(spawn_msg.remote_task_id(), rtid);
2663
2664 let ack_msg = RemoteMessage::SpawnAck(SpawnAck {
2665 remote_task_id: rtid,
2666 status: SpawnAckStatus::Accepted,
2667 assigned_node: NodeId::new("n2"),
2668 });
2669 assert_eq!(ack_msg.remote_task_id(), rtid);
2670
2671 let cancel_msg = RemoteMessage::CancelRequest(CancelRequest {
2672 remote_task_id: rtid,
2673 reason: CancelReason::user("test"),
2674 origin_node: NodeId::new("n1"),
2675 });
2676 assert_eq!(cancel_msg.remote_task_id(), rtid);
2677
2678 let result_msg = RemoteMessage::ResultDelivery(ResultDelivery {
2679 remote_task_id: rtid,
2680 outcome: RemoteOutcome::Success(vec![]),
2681 execution_time: Duration::ZERO,
2682 });
2683 assert_eq!(result_msg.remote_task_id(), rtid);
2684
2685 let renewal_msg = RemoteMessage::LeaseRenewal(LeaseRenewal {
2686 remote_task_id: rtid,
2687 new_lease: Duration::from_secs(30),
2688 current_state: RemoteTaskState::Running,
2689 node: NodeId::new("n2"),
2690 });
2691 assert_eq!(renewal_msg.remote_task_id(), rtid);
2692 }
2693
2694 fn test_spawn_request(cx: &Cx, remote_task_id: RemoteTaskId) -> SpawnRequest {
2695 SpawnRequest {
2696 remote_task_id,
2697 computation: ComputationName::new("compute"),
2698 input: RemoteInput::empty(),
2699 lease: Duration::from_secs(30),
2700 idempotency_key: IdempotencyKey::generate(cx),
2701 budget: None,
2702 origin_node: NodeId::new("origin-1"),
2703 origin_region: cx.region_id(),
2704 origin_task: cx.task_id(),
2705 }
2706 }
2707
2708 fn test_ack_accepted(remote_task_id: RemoteTaskId) -> SpawnAck {
2709 SpawnAck {
2710 remote_task_id,
2711 status: SpawnAckStatus::Accepted,
2712 assigned_node: NodeId::new("worker-1"),
2713 }
2714 }
2715
2716 fn test_ack_rejected(remote_task_id: RemoteTaskId) -> SpawnAck {
2717 SpawnAck {
2718 remote_task_id,
2719 status: SpawnAckStatus::Rejected(SpawnRejectReason::CapacityExceeded),
2720 assigned_node: NodeId::new("worker-1"),
2721 }
2722 }
2723
2724 fn test_cancel(remote_task_id: RemoteTaskId) -> CancelRequest {
2725 CancelRequest {
2726 remote_task_id,
2727 reason: CancelReason::user("cancel"),
2728 origin_node: NodeId::new("origin-1"),
2729 }
2730 }
2731
2732 fn test_result(remote_task_id: RemoteTaskId, outcome: RemoteOutcome) -> ResultDelivery {
2733 ResultDelivery {
2734 remote_task_id,
2735 outcome,
2736 execution_time: Duration::ZERO,
2737 }
2738 }
2739
2740 fn test_renewal(remote_task_id: RemoteTaskId) -> LeaseRenewal {
2741 LeaseRenewal {
2742 remote_task_id,
2743 new_lease: Duration::from_secs(10),
2744 current_state: RemoteTaskState::Running,
2745 node: NodeId::new("worker-1"),
2746 }
2747 }
2748
2749 #[test]
2750 fn origin_session_cancel_flow() {
2751 let cx: Cx = Cx::for_testing();
2752 let rtid = RemoteTaskId::next();
2753 let origin = OriginSession::<OriginInit>::new(rtid);
2754 let req = test_spawn_request(&cx, rtid);
2755 let origin = origin.send_spawn(&req).unwrap();
2756 let ack = test_ack_accepted(rtid);
2757 let outcome = origin.recv_spawn_ack(&ack).unwrap();
2758 assert!(matches!(outcome, OriginAckOutcome::Accepted(_)));
2759 let origin = match outcome {
2760 OriginAckOutcome::Accepted(session) => session,
2761 OriginAckOutcome::Rejected(_) => return,
2762 };
2763 let origin = origin.recv_lease_renewal(&test_renewal(rtid)).unwrap();
2764 let origin = origin.send_cancel(&test_cancel(rtid)).unwrap();
2765 let result = test_result(
2766 rtid,
2767 RemoteOutcome::Cancelled(CancelReason::user("cancelled")),
2768 );
2769 let origin = origin.recv_result(&result).unwrap();
2770 assert_eq!(origin.remote_task_id(), rtid);
2771 }
2772
2773 #[test]
2774 fn origin_session_reject_flow() {
2775 let cx: Cx = Cx::for_testing();
2776 let rtid = RemoteTaskId::next();
2777 let origin = OriginSession::<OriginInit>::new(rtid);
2778 let req = test_spawn_request(&cx, rtid);
2779 let origin = origin.send_spawn(&req).unwrap();
2780 let ack = test_ack_rejected(rtid);
2781 let outcome = origin.recv_spawn_ack(&ack).unwrap();
2782 assert!(matches!(outcome, OriginAckOutcome::Rejected(_)));
2783 if let OriginAckOutcome::Rejected(session) = outcome {
2784 assert_eq!(session.remote_task_id(), rtid);
2785 }
2786 }
2787
2788 #[test]
2789 fn remote_session_cancel_before_ack_flow() {
2790 let cx: Cx = Cx::for_testing();
2791 let rtid = RemoteTaskId::next();
2792 let remote = RemoteSession::<RemoteInit>::new(rtid);
2793 let req = test_spawn_request(&cx, rtid);
2794 let remote = remote.recv_spawn(&req).unwrap();
2795 let remote = remote.recv_cancel(&test_cancel(rtid)).unwrap();
2796 let remote = remote.send_ack_accepted(&test_ack_accepted(rtid)).unwrap();
2797 let result = test_result(rtid, RemoteOutcome::Cancelled(CancelReason::user("done")));
2798 let remote = remote.send_result(&result).unwrap();
2799 assert_eq!(remote.remote_task_id(), rtid);
2800 }
2801
2802 #[test]
2803 fn protocol_id_mismatch_is_error() {
2804 let cx: Cx = Cx::for_testing();
2805 let rtid = RemoteTaskId::next();
2806 let origin = OriginSession::<OriginInit>::new(rtid);
2807 let req = test_spawn_request(&cx, RemoteTaskId::next());
2808 let err = origin.send_spawn(&req).unwrap_err();
2809 assert!(matches!(
2810 err,
2811 RemoteProtocolError::RemoteTaskIdMismatch { .. }
2812 ));
2813 }
2814
2815 #[test]
2816 fn protocol_ack_status_mismatch_is_error() {
2817 let cx: Cx = Cx::for_testing();
2818 let rtid = RemoteTaskId::next();
2819 let remote = RemoteSession::<RemoteInit>::new(rtid);
2820 let req = test_spawn_request(&cx, rtid);
2821 let remote = remote.recv_spawn(&req).unwrap();
2822 let ack = test_ack_rejected(rtid);
2823 let err = remote.send_ack_accepted(&ack).unwrap_err();
2824 assert!(matches!(
2825 err,
2826 RemoteProtocolError::UnexpectedAckStatus { .. }
2827 ));
2828 }
2829
2830 #[test]
2831 fn trace_event_names_are_namespaced() {
2832 assert!(trace_events::SPAWN_REQUEST_CREATED.starts_with("remote::"));
2834 assert!(trace_events::SPAWN_REQUEST_SENT.starts_with("remote::"));
2835 assert!(trace_events::SPAWN_ACK_RECEIVED.starts_with("remote::"));
2836 assert!(trace_events::SPAWN_REJECTED.starts_with("remote::"));
2837 assert!(trace_events::CANCEL_SENT.starts_with("remote::"));
2838 assert!(trace_events::CANCEL_RECEIVED.starts_with("remote::"));
2839 assert!(trace_events::RESULT_DELIVERED.starts_with("remote::"));
2840 assert!(trace_events::LEASE_RENEWAL_SENT.starts_with("remote::"));
2841 assert!(trace_events::LEASE_RENEWAL_RECEIVED.starts_with("remote::"));
2842 assert!(trace_events::LEASE_EXPIRED.starts_with("remote::"));
2843 }
2844
2845 fn test_obligation_id() -> ObligationId {
2850 ObligationId::new_for_test(0, 0)
2851 }
2852
2853 fn test_region_id() -> RegionId {
2854 RegionId::new_for_test(0, 0)
2855 }
2856
2857 fn test_task_id() -> TaskId {
2858 TaskId::new_for_test(0, 0)
2859 }
2860
2861 #[test]
2862 fn lease_creation() {
2863 let now = Time::from_secs(10);
2864 let lease = Lease::new(
2865 test_obligation_id(),
2866 test_region_id(),
2867 test_task_id(),
2868 Duration::from_secs(30),
2869 now,
2870 );
2871 assert!(lease.is_active(now));
2872 assert!(!lease.is_expired(now));
2873 assert!(!lease.is_released());
2874 assert_eq!(lease.renewal_count(), 0);
2875 assert_eq!(lease.initial_duration(), Duration::from_secs(30));
2876 assert_eq!(lease.expires_at(), Time::from_secs(40));
2877 }
2878
2879 #[test]
2880 fn lease_remaining_time() {
2881 let now = Time::from_secs(10);
2882 let lease = Lease::new(
2883 test_obligation_id(),
2884 test_region_id(),
2885 test_task_id(),
2886 Duration::from_secs(30),
2887 now,
2888 );
2889 let remaining = lease.remaining(Time::from_secs(20));
2890 assert_eq!(remaining, Duration::from_secs(20));
2891
2892 let remaining = lease.remaining(Time::from_secs(40));
2894 assert_eq!(remaining, Duration::ZERO);
2895
2896 let remaining = lease.remaining(Time::from_secs(50));
2898 assert_eq!(remaining, Duration::ZERO);
2899 }
2900
2901 #[test]
2902 fn lease_expiry_detection() {
2903 let now = Time::from_secs(10);
2904 let lease = Lease::new(
2905 test_obligation_id(),
2906 test_region_id(),
2907 test_task_id(),
2908 Duration::from_secs(30),
2909 now,
2910 );
2911
2912 assert!(!lease.is_expired(Time::from_secs(39)));
2914 assert!(lease.is_active(Time::from_secs(39)));
2915
2916 assert!(lease.is_expired(Time::from_secs(40)));
2918 assert!(!lease.is_active(Time::from_secs(40)));
2919
2920 assert!(lease.is_expired(Time::from_secs(50)));
2922 }
2923
2924 #[test]
2925 fn lease_renew_extends_expiry() {
2926 let now = Time::from_secs(10);
2927 let mut lease = Lease::new(
2928 test_obligation_id(),
2929 test_region_id(),
2930 test_task_id(),
2931 Duration::from_secs(30),
2932 now,
2933 );
2934
2935 let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
2937 assert!(result.is_ok());
2938 assert_eq!(lease.expires_at(), Time::from_secs(55));
2939 assert_eq!(lease.renewal_count(), 1);
2940
2941 let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
2943 assert!(result.is_ok());
2944 assert_eq!(lease.expires_at(), Time::from_secs(80));
2945 assert_eq!(lease.renewal_count(), 2);
2946 }
2947
2948 #[test]
2949 fn lease_renew_after_expiry_fails() {
2950 let now = Time::from_secs(10);
2951 let mut lease = Lease::new(
2952 test_obligation_id(),
2953 test_region_id(),
2954 test_task_id(),
2955 Duration::from_secs(30),
2956 now,
2957 );
2958
2959 let result = lease.renew(Duration::from_secs(30), Time::from_secs(50));
2961 assert_eq!(result, Err(LeaseError::Expired));
2962 assert_eq!(lease.state(), LeaseState::Expired);
2963 }
2964
2965 #[test]
2966 fn lease_release() {
2967 let now = Time::from_secs(10);
2968 let mut lease = Lease::new(
2969 test_obligation_id(),
2970 test_region_id(),
2971 test_task_id(),
2972 Duration::from_secs(30),
2973 now,
2974 );
2975
2976 let result = lease.release(Time::from_secs(20));
2977 assert!(result.is_ok());
2978 assert!(lease.is_released());
2979 assert_eq!(lease.state(), LeaseState::Released);
2980 }
2981
2982 #[test]
2983 fn lease_double_release_fails() {
2984 let now = Time::from_secs(10);
2985 let mut lease = Lease::new(
2986 test_obligation_id(),
2987 test_region_id(),
2988 test_task_id(),
2989 Duration::from_secs(30),
2990 now,
2991 );
2992
2993 lease.release(Time::from_secs(20)).unwrap();
2994 let result = lease.release(Time::from_secs(25));
2995 assert_eq!(result, Err(LeaseError::Released));
2996 }
2997
2998 #[test]
2999 fn lease_renew_after_release_fails() {
3000 let now = Time::from_secs(10);
3001 let mut lease = Lease::new(
3002 test_obligation_id(),
3003 test_region_id(),
3004 test_task_id(),
3005 Duration::from_secs(30),
3006 now,
3007 );
3008
3009 lease.release(Time::from_secs(20)).unwrap();
3010 let result = lease.renew(Duration::from_secs(30), Time::from_secs(25));
3011 assert_eq!(result, Err(LeaseError::Released));
3012 }
3013
3014 #[test]
3015 fn lease_mark_expired() {
3016 let now = Time::from_secs(10);
3017 let mut lease = Lease::new(
3018 test_obligation_id(),
3019 test_region_id(),
3020 test_task_id(),
3021 Duration::from_secs(30),
3022 now,
3023 );
3024
3025 let result = lease.mark_expired();
3026 assert!(result.is_ok());
3027 assert_eq!(lease.state(), LeaseState::Expired);
3028
3029 let result = lease.mark_expired();
3031 assert!(result.is_ok());
3032 }
3033
3034 #[test]
3035 fn lease_mark_expired_after_release_fails() {
3036 let now = Time::from_secs(10);
3037 let mut lease = Lease::new(
3038 test_obligation_id(),
3039 test_region_id(),
3040 test_task_id(),
3041 Duration::from_secs(30),
3042 now,
3043 );
3044
3045 lease.release(Time::from_secs(20)).unwrap();
3046 let result = lease.mark_expired();
3047 assert_eq!(result, Err(LeaseError::Released));
3048 }
3049
3050 #[test]
3051 fn lease_state_display() {
3052 assert_eq!(format!("{}", LeaseState::Active), "Active");
3053 assert_eq!(format!("{}", LeaseState::Released), "Released");
3054 assert_eq!(format!("{}", LeaseState::Expired), "Expired");
3055 }
3056
3057 #[test]
3058 fn lease_error_display() {
3059 assert_eq!(format!("{}", LeaseError::Expired), "lease expired");
3060 assert_eq!(
3061 format!("{}", LeaseError::Released),
3062 "lease already released"
3063 );
3064 assert!(format!("{}", LeaseError::CreationFailed("full".into())).contains("full"));
3065 }
3066
3067 #[test]
3072 fn idempotency_store_new_request() {
3073 let mut store = IdempotencyStore::new(Duration::from_mins(5));
3074 assert!(store.is_empty());
3075
3076 let key = IdempotencyKey::from_raw(1);
3077 let decision = store.check(&key, &ComputationName::new("encode"));
3078 assert!(matches!(decision, DedupDecision::New));
3079
3080 let inserted = store.record(
3081 key,
3082 RemoteTaskId::next(),
3083 ComputationName::new("encode"),
3084 Time::from_secs(10),
3085 );
3086 assert!(inserted);
3087 assert_eq!(store.len(), 1);
3088 }
3089
3090 #[test]
3091 fn idempotency_store_duplicate_detection() {
3092 let mut store = IdempotencyStore::new(Duration::from_mins(5));
3093 let key = IdempotencyKey::from_raw(42);
3094 let comp = ComputationName::new("encode");
3095
3096 store.record(key, RemoteTaskId::next(), comp.clone(), Time::from_secs(10));
3097
3098 let decision = store.check(&key, &comp);
3100 assert!(matches!(decision, DedupDecision::Duplicate(_)));
3101
3102 let inserted = store.record(key, RemoteTaskId::next(), comp, Time::from_secs(20));
3104 assert!(!inserted);
3105 assert_eq!(store.len(), 1);
3106 }
3107
3108 #[test]
3109 fn idempotency_store_conflict_detection() {
3110 let mut store = IdempotencyStore::new(Duration::from_mins(5));
3111 let key = IdempotencyKey::from_raw(42);
3112
3113 store.record(
3114 key,
3115 RemoteTaskId::next(),
3116 ComputationName::new("encode"),
3117 Time::from_secs(10),
3118 );
3119
3120 let decision = store.check(&key, &ComputationName::new("decode"));
3122 assert!(matches!(decision, DedupDecision::Conflict));
3123 }
3124
3125 #[test]
3126 fn idempotency_store_complete_outcome() {
3127 let mut store = IdempotencyStore::new(Duration::from_mins(5));
3128 let key = IdempotencyKey::from_raw(99);
3129
3130 store.record(
3131 key,
3132 RemoteTaskId::next(),
3133 ComputationName::new("work"),
3134 Time::from_secs(10),
3135 );
3136
3137 let updated = store.complete(&key, RemoteOutcome::Success(vec![1, 2, 3]));
3139 assert!(updated);
3140
3141 let decision = store.check(&key, &ComputationName::new("work"));
3143 assert!(matches!(decision, DedupDecision::Duplicate(_)));
3144 if let DedupDecision::Duplicate(record) = decision {
3145 assert!(record.outcome.is_some());
3146 assert!(record.outcome.unwrap().is_success());
3147 }
3148 }
3149
3150 #[test]
3151 fn idempotency_store_complete_unknown_key() {
3152 let mut store = IdempotencyStore::new(Duration::from_mins(5));
3153 let key = IdempotencyKey::from_raw(999);
3154
3155 let updated = store.complete(&key, RemoteOutcome::Failed("oops".into()));
3157 assert!(!updated);
3158 }
3159
3160 #[test]
3161 fn idempotency_store_eviction() {
3162 let mut store = IdempotencyStore::new(Duration::from_mins(1));
3163
3164 store.record(
3166 IdempotencyKey::from_raw(1),
3167 RemoteTaskId::next(),
3168 ComputationName::new("a"),
3169 Time::from_secs(10),
3170 );
3171
3172 store.record(
3174 IdempotencyKey::from_raw(2),
3175 RemoteTaskId::next(),
3176 ComputationName::new("b"),
3177 Time::from_secs(50),
3178 );
3179 assert_eq!(store.len(), 2);
3180
3181 let evicted = store.evict_expired(Time::from_secs(80));
3183 assert_eq!(evicted, 1);
3184 assert_eq!(store.len(), 1);
3185
3186 let decision = store.check(&IdempotencyKey::from_raw(2), &ComputationName::new("b"));
3188 assert!(matches!(decision, DedupDecision::Duplicate(_)));
3189
3190 let decision = store.check(&IdempotencyKey::from_raw(1), &ComputationName::new("a"));
3192 assert!(matches!(decision, DedupDecision::New));
3193 }
3194
3195 #[test]
3196 fn idempotency_store_debug() {
3197 let store = IdempotencyStore::new(Duration::from_mins(1));
3198 let debug = format!("{store:?}");
3199 assert!(debug.contains("IdempotencyStore"));
3200 assert!(debug.contains("entries"));
3201 }
3202
3203 #[test]
3208 fn saga_successful_completion() {
3209 let mut saga = Saga::new();
3210 assert_eq!(saga.state(), SagaState::Running);
3211 assert_eq!(saga.completed_steps(), 0);
3212
3213 let r1: Result<String, _> = saga.step(
3214 "create resource",
3215 || Ok("resource-1".to_string()),
3216 || "deleted resource-1".to_string(),
3217 );
3218 assert!(r1.is_ok());
3219 assert_eq!(r1.unwrap(), "resource-1");
3220 assert_eq!(saga.completed_steps(), 1);
3221
3222 let r2: Result<(), _> = saga.step("configure", || Ok(()), || "reset config".to_string());
3223 assert!(r2.is_ok());
3224 assert_eq!(saga.completed_steps(), 2);
3225
3226 saga.complete();
3227 assert_eq!(saga.state(), SagaState::Completed);
3228 assert!(saga.compensation_results().is_empty());
3229 }
3230
3231 #[test]
3232 fn saga_step_failure_runs_compensations_reverse() {
3233 use std::sync::{Arc, Mutex};
3234
3235 let order = Arc::new(Mutex::new(Vec::new()));
3236
3237 let o1 = Arc::clone(&order);
3238 let mut saga = Saga::new();
3239
3240 saga.step(
3241 "step-0",
3242 || Ok(()),
3243 move || {
3244 o1.lock().unwrap().push(0);
3245 "comp-0".to_string()
3246 },
3247 )
3248 .unwrap();
3249
3250 let o2 = Arc::clone(&order);
3251 saga.step(
3252 "step-1",
3253 || Ok(()),
3254 move || {
3255 o2.lock().unwrap().push(1);
3256 "comp-1".to_string()
3257 },
3258 )
3259 .unwrap();
3260
3261 let o3 = Arc::clone(&order);
3262 let result: Result<(), SagaStepError> = saga.step(
3264 "step-2",
3265 || Err("boom".to_string()),
3266 move || {
3267 o3.lock().unwrap().push(2);
3268 "comp-2".to_string()
3269 },
3270 );
3271
3272 assert!(result.is_err());
3273 let err = result.unwrap_err();
3274 assert_eq!(err.step, 2);
3275 assert!(err.message.contains("boom"));
3276
3277 assert_eq!(saga.state(), SagaState::Aborted);
3279
3280 let comps = saga.compensation_results();
3283 assert_eq!(comps.len(), 2);
3284 assert_eq!(comps[0].step, 1); assert_eq!(comps[1].step, 0); let executed = order.lock().unwrap().clone();
3289 assert_eq!(executed, vec![1, 0]);
3290 }
3291
3292 #[test]
3293 fn saga_explicit_abort() {
3294 use std::sync::{Arc, Mutex};
3295
3296 let compensated = Arc::new(Mutex::new(Vec::new()));
3297 let mut saga = Saga::new();
3298
3299 let c1 = Arc::clone(&compensated);
3300 saga.step(
3301 "step-0",
3302 || Ok(()),
3303 move || {
3304 c1.lock().unwrap().push("step-0");
3305 "undid step-0".to_string()
3306 },
3307 )
3308 .unwrap();
3309
3310 let c2 = Arc::clone(&compensated);
3311 saga.step(
3312 "step-1",
3313 || Ok(()),
3314 move || {
3315 c2.lock().unwrap().push("step-1");
3316 "undid step-1".to_string()
3317 },
3318 )
3319 .unwrap();
3320
3321 saga.abort();
3323 assert_eq!(saga.state(), SagaState::Aborted);
3324
3325 let comps = saga.compensation_results();
3326 assert_eq!(comps.len(), 2);
3327 assert_eq!(comps[0].description, "step-1"); assert_eq!(comps[1].description, "step-0");
3329
3330 let executed = compensated.lock().unwrap().clone();
3331 assert_eq!(executed, vec!["step-1", "step-0"]);
3332 }
3333
3334 #[test]
3335 fn saga_first_step_failure_no_compensations() {
3336 let mut saga = Saga::new();
3337
3338 let result: Result<(), _> = saga.step("fail-step", || Err("bad".to_string()), String::new);
3340 assert!(result.is_err());
3341 assert_eq!(saga.state(), SagaState::Aborted);
3342 assert!(saga.compensation_results().is_empty());
3343 }
3344
3345 #[test]
3346 fn saga_state_display() {
3347 assert_eq!(format!("{}", SagaState::Running), "Running");
3348 assert_eq!(format!("{}", SagaState::Completed), "Completed");
3349 assert_eq!(format!("{}", SagaState::Compensating), "Compensating");
3350 assert_eq!(format!("{}", SagaState::Aborted), "Aborted");
3351 }
3352
3353 #[test]
3354 fn saga_step_error_display() {
3355 let err = SagaStepError {
3356 step: 3,
3357 description: "deploy".to_string(),
3358 message: "timeout".to_string(),
3359 };
3360 let text = format!("{err}");
3361 assert!(text.contains('3'));
3362 assert!(text.contains("deploy"));
3363 assert!(text.contains("timeout"));
3364 }
3365
3366 #[test]
3367 fn saga_debug() {
3368 let saga = Saga::new();
3369 let debug = format!("{saga:?}");
3370 assert!(debug.contains("Saga"));
3371 assert!(debug.contains("Running"));
3372 }
3373
3374 #[test]
3375 fn saga_default_trait() {
3376 let saga = Saga::default();
3377 assert_eq!(saga.state(), SagaState::Running);
3378 }
3379}