duroxide/futures.rs
1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::{Action, Event, EventKind, OrchestrationContext};
7
8/// Helper function to check if a completion event can be consumed according to FIFO ordering.
9///
10/// Returns `true` if all completion events in history that occurred before the given
11/// `completion_event_id` have already been consumed (or are cancelled select2 losers).
12///
13/// # Arguments
14/// * `history` - The full event history
15/// * `consumed_completions` - Set of already consumed completion event IDs
16/// * `cancelled_source_ids` - Set of source_event_ids whose completions should be auto-skipped (select2 losers)
17/// * `completion_event_id` - The event ID we want to consume
18fn can_consume_completion(
19 history: &[Event],
20 consumed_completions: &std::collections::HashSet<u64>,
21 cancelled_source_ids: &std::collections::HashSet<u64>,
22 completion_event_id: u64,
23) -> bool {
24 history.iter().all(|e| {
25 match &e.kind {
26 // Completions with source_event_id - check if cancelled
27 EventKind::ActivityCompleted { .. }
28 | EventKind::ActivityFailed { .. }
29 | EventKind::TimerFired { .. }
30 | EventKind::SubOrchestrationCompleted { .. }
31 | EventKind::SubOrchestrationFailed { .. } => {
32 // source_event_id is now on the Event struct
33 if let Some(source_event_id) = e.source_event_id {
34 // Cancelled completions (select2 losers) don't block - skip them
35 if cancelled_source_ids.contains(&source_event_id) {
36 return true;
37 }
38 }
39 // Otherwise: if before ours, must be consumed
40 e.event_id >= completion_event_id || consumed_completions.contains(&e.event_id)
41 }
42 // External events don't have source_event_id - can't be cancelled via select2
43 EventKind::ExternalEvent { .. } => {
44 e.event_id >= completion_event_id || consumed_completions.contains(&e.event_id)
45 }
46 _ => true, // Non-completions don't affect ordering
47 }
48 })
49}
50
51#[derive(Debug, Clone)]
52pub enum DurableOutput {
53 Activity(Result<String, String>),
54 Timer,
55 External(String),
56 SubOrchestration(Result<String, String>),
57}
58
59/// A durable future representing an asynchronous operation in an orchestration.
60///
61/// Common fields `claimed_event_id` and `ctx` are stored directly on the struct,
62/// while operation-specific data is stored in the `kind` variant.
63pub struct DurableFuture {
64 /// The event ID claimed by this future during scheduling (set on first poll)
65 pub(crate) claimed_event_id: Cell<Option<u64>>,
66 /// Reference to the orchestration context
67 pub(crate) ctx: OrchestrationContext,
68 /// Operation-specific data
69 pub(crate) kind: Kind,
70}
71
72pub(crate) enum Kind {
73 Activity {
74 name: String,
75 input: String,
76 },
77 Timer {
78 delay_ms: u64,
79 },
80 External {
81 name: String,
82 result: RefCell<Option<String>>, // Cache result once found
83 },
84 SubOrch {
85 name: String,
86 version: Option<String>,
87 instance: RefCell<String>, // Updated once event_id is known
88 input: String,
89 },
90 System {
91 op: String,
92 value: RefCell<Option<String>>,
93 },
94}
95
96// KindTag removed - no longer needed with cursor model
97
98impl Future for DurableFuture {
99 type Output = DurableOutput;
100 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
101 // Safety: We never move fields that are !Unpin; we only take &mut to mutate inner Cells and use ctx by reference.
102 let this = unsafe { self.get_unchecked_mut() };
103
104 // Common fields: this.claimed_event_id, this.ctx
105 match &mut this.kind {
106 Kind::Activity { name, input } => {
107 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
108 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
109
110 // Step 1: Claim scheduling event_id if not already claimed
111 if this.claimed_event_id.get().is_none() {
112 // Find next unclaimed SCHEDULING event in history (global order enforcement)
113 let mut found_event_id = None;
114 for event in &inner.history {
115 let eid = event.event_id;
116 if inner.claimed_scheduling_events.contains(&eid) {
117 continue;
118 }
119 match &event.kind {
120 EventKind::ActivityScheduled { name: n, input: inp } => {
121 // MUST be our schedule next
122 if n != name || inp != input {
123 // Record nondeterminism gracefully
124 inner.nondeterminism_error = Some(format!(
125 "nondeterministic: schedule order mismatch: next is ActivityScheduled('{n}','{inp}') but expected ActivityScheduled('{name}','{input}')"
126 ));
127 return Poll::Pending;
128 }
129 found_event_id = Some(eid);
130 break;
131 }
132 EventKind::TimerCreated { .. } => {
133 inner.nondeterminism_error = Some(format!(
134 "nondeterministic: schedule order mismatch: next is TimerCreated but expected ActivityScheduled('{name}','{input}')"
135 ));
136 return Poll::Pending;
137 }
138 EventKind::ExternalSubscribed { name: en } => {
139 inner.nondeterminism_error = Some(format!(
140 "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected ActivityScheduled('{name}','{input}')"
141 ));
142 return Poll::Pending;
143 }
144 EventKind::SubOrchestrationScheduled {
145 name: sn, input: sin, ..
146 } => {
147 inner.nondeterminism_error = Some(format!(
148 "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected ActivityScheduled('{name}','{input}')"
149 ));
150 return Poll::Pending;
151 }
152 _ => {}
153 }
154 }
155
156 let event_id = found_event_id.unwrap_or_else(|| {
157 // Not in history - create new (first execution)
158 let new_id = inner.next_event_id;
159 inner.next_event_id += 1;
160 let exec_id = inner.execution_id;
161 let inst_id = inner.instance_id.clone();
162
163 inner.history.push(Event::with_event_id(
164 new_id,
165 inst_id,
166 exec_id,
167 None,
168 EventKind::ActivityScheduled {
169 name: name.clone(),
170 input: input.clone(),
171 },
172 ));
173
174 inner.record_action(Action::CallActivity {
175 scheduling_event_id: new_id,
176 name: name.clone(),
177 input: input.clone(),
178 });
179
180 new_id
181 });
182
183 inner.claimed_scheduling_events.insert(event_id);
184 this.claimed_event_id.set(Some(event_id));
185 }
186
187 // claimed_event_id is guaranteed to be Some after the above block sets it
188 let our_event_id = this
189 .claimed_event_id
190 .get()
191 .expect("claimed_event_id should be set after claiming");
192
193 // Step 2: Look for our completion - FIFO enforcement
194 // Find our completion in history
195 let our_completion = inner.history.iter().find_map(|e| {
196 // source_event_id is on the Event struct
197 if e.source_event_id != Some(our_event_id) {
198 return None;
199 }
200 match &e.kind {
201 EventKind::ActivityCompleted { result } => Some((e.event_id, Ok(result.clone()))),
202 EventKind::ActivityFailed { details } => {
203 debug_assert!(
204 matches!(details, crate::ErrorDetails::Application { .. }),
205 "INVARIANT: Only Application errors should reach orchestration code, got: {details:?}"
206 );
207 Some((e.event_id, Err(details.display_message())))
208 }
209 _ => None,
210 }
211 });
212
213 if let Some((completion_event_id, result)) = our_completion {
214 // Check: Are all completion events BEFORE ours consumed?
215 if can_consume_completion(
216 &inner.history,
217 &inner.consumed_completions,
218 &inner.cancelled_source_ids,
219 completion_event_id,
220 ) {
221 inner.consumed_completions.insert(completion_event_id);
222 return Poll::Ready(DurableOutput::Activity(result));
223 }
224 }
225
226 Poll::Pending
227 }
228 Kind::Timer { delay_ms } => {
229 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
230 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
231
232 // Step 1: Claim scheduling event_id
233 if this.claimed_event_id.get().is_none() {
234 // Enforce global scheduling order
235 let mut found_event_id = None;
236 for event in &inner.history {
237 let eid = event.event_id;
238 if inner.claimed_scheduling_events.contains(&eid) {
239 continue;
240 }
241 match &event.kind {
242 EventKind::TimerCreated { .. } => {
243 found_event_id = Some(eid);
244 break;
245 }
246 EventKind::ActivityScheduled { name: n, input: inp } => {
247 inner.nondeterminism_error = Some(format!(
248 "nondeterministic: schedule order mismatch: next is ActivityScheduled('{n}','{inp}') but expected TimerCreated"
249 ));
250 return Poll::Pending;
251 }
252 EventKind::ExternalSubscribed { name: en } => {
253 inner.nondeterminism_error = Some(format!(
254 "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected TimerCreated"
255 ));
256 return Poll::Pending;
257 }
258 EventKind::SubOrchestrationScheduled {
259 name: sn, input: sin, ..
260 } => {
261 inner.nondeterminism_error = Some(format!(
262 "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected TimerCreated"
263 ));
264 return Poll::Pending;
265 }
266 _ => {}
267 }
268 }
269
270 let event_id = found_event_id.unwrap_or_else(|| {
271 // Not in history - create new (first execution)
272 let new_id = inner.next_event_id;
273 inner.next_event_id += 1;
274 let now = inner.now_ms();
275 let fire_at_ms = now.saturating_add(*delay_ms);
276 let exec_id = inner.execution_id;
277 let inst_id = inner.instance_id.clone();
278
279 inner.history.push(Event::with_event_id(
280 new_id,
281 inst_id,
282 exec_id,
283 None,
284 EventKind::TimerCreated { fire_at_ms },
285 ));
286
287 inner.record_action(Action::CreateTimer {
288 scheduling_event_id: new_id,
289 fire_at_ms,
290 });
291
292 new_id
293 });
294
295 inner.claimed_scheduling_events.insert(event_id);
296 this.claimed_event_id.set(Some(event_id));
297 }
298
299 // claimed_event_id is guaranteed to be Some after the above block sets it
300 let our_event_id = this
301 .claimed_event_id
302 .get()
303 .expect("claimed_event_id should be set after claiming");
304
305 // Step 2: Look for TimerFired - FIFO enforcement
306 let our_completion = inner.history.iter().find_map(|e| {
307 if matches!(&e.kind, EventKind::TimerFired { .. }) && e.source_event_id == Some(our_event_id) {
308 return Some(e.event_id);
309 }
310 None
311 });
312
313 if let Some(completion_event_id) = our_completion {
314 // Check: Are all completion events BEFORE ours consumed?
315 if can_consume_completion(
316 &inner.history,
317 &inner.consumed_completions,
318 &inner.cancelled_source_ids,
319 completion_event_id,
320 ) {
321 inner.consumed_completions.insert(completion_event_id);
322 return Poll::Ready(DurableOutput::Timer);
323 }
324 }
325
326 Poll::Pending
327 }
328 Kind::External { name, result } => {
329 // Check if we already have the result cached
330 if let Some(cached) = result.borrow().clone() {
331 return Poll::Ready(DurableOutput::External(cached));
332 }
333
334 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
335 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
336
337 // Step 1: Claim ExternalSubscribed event_id
338 if this.claimed_event_id.get().is_none() {
339 // Enforce global scheduling order
340 let mut found_event_id = None;
341 for event in &inner.history {
342 let eid = event.event_id;
343 if inner.claimed_scheduling_events.contains(&eid) {
344 continue;
345 }
346 match &event.kind {
347 EventKind::ExternalSubscribed { name: n } => {
348 if n != name {
349 inner.nondeterminism_error = Some(format!(
350 "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{n}') but expected ExternalSubscribed('{name}')"
351 ));
352 return Poll::Pending;
353 }
354 found_event_id = Some(eid);
355 break;
356 }
357 EventKind::ActivityScheduled { name: an, input: ainp } => {
358 inner.nondeterminism_error = Some(format!(
359 "nondeterministic: schedule order mismatch: next is ActivityScheduled('{an}','{ainp}') but expected ExternalSubscribed('{name}')"
360 ));
361 return Poll::Pending;
362 }
363 EventKind::TimerCreated { .. } => {
364 inner.nondeterminism_error = Some(format!(
365 "nondeterministic: schedule order mismatch: next is TimerCreated but expected ExternalSubscribed('{name}')"
366 ));
367 return Poll::Pending;
368 }
369 EventKind::SubOrchestrationScheduled {
370 name: sn, input: sin, ..
371 } => {
372 inner.nondeterminism_error = Some(format!(
373 "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected ExternalSubscribed('{name}')"
374 ));
375 return Poll::Pending;
376 }
377 _ => {}
378 }
379 }
380
381 let event_id = found_event_id.unwrap_or_else(|| {
382 // Not in history - create new
383 let new_id = inner.next_event_id;
384 inner.next_event_id += 1;
385 let exec_id = inner.execution_id;
386 let inst_id = inner.instance_id.clone();
387
388 inner.history.push(Event::with_event_id(
389 new_id,
390 inst_id,
391 exec_id,
392 None,
393 EventKind::ExternalSubscribed { name: name.clone() },
394 ));
395
396 inner.record_action(Action::WaitExternal {
397 scheduling_event_id: new_id,
398 name: name.clone(),
399 });
400
401 new_id
402 });
403
404 inner.claimed_scheduling_events.insert(event_id);
405 this.claimed_event_id.set(Some(event_id));
406 }
407
408 // claimed_event_id is guaranteed to be Some after the above block sets it
409 let _our_event_id = this
410 .claimed_event_id
411 .get()
412 .expect("claimed_event_id should be set after claiming");
413
414 // Step 2: Look for ExternalEvent (special case - search by name)
415 // External events can arrive in any order
416 if !inner.consumed_external_events.contains(name)
417 && let Some((event_id, data)) = inner.history.iter().find_map(|e| {
418 if let EventKind::ExternalEvent { name: ext_name, data } = &e.kind
419 && ext_name == name
420 {
421 return Some((e.event_id, data.clone()));
422 }
423 None
424 })
425 {
426 // Check: Are all completions BEFORE ours consumed?
427 if can_consume_completion(
428 &inner.history,
429 &inner.consumed_completions,
430 &inner.cancelled_source_ids,
431 event_id,
432 ) {
433 inner.consumed_completions.insert(event_id);
434 inner.consumed_external_events.insert(name.clone());
435 *result.borrow_mut() = Some(data.clone());
436 return Poll::Ready(DurableOutput::External(data));
437 }
438 }
439
440 Poll::Pending
441 }
442 Kind::SubOrch {
443 name,
444 version,
445 instance,
446 input,
447 } => {
448 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
449 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
450
451 // Step 1: Claim SubOrchestrationScheduled event_id
452 if this.claimed_event_id.get().is_none() {
453 // Enforce global scheduling order
454 let mut found_event_id = None;
455 for event in &inner.history {
456 let eid = event.event_id;
457 if inner.claimed_scheduling_events.contains(&eid) {
458 continue;
459 }
460 match &event.kind {
461 EventKind::SubOrchestrationScheduled {
462 name: n,
463 input: inp,
464 instance: inst,
465 } => {
466 if n != name || inp != input {
467 inner.nondeterminism_error = Some(format!(
468 "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{n}','{inp}') but expected SubOrchestrationScheduled('{name}','{input}')"
469 ));
470 return Poll::Pending;
471 }
472 *instance.borrow_mut() = inst.clone();
473 found_event_id = Some(eid);
474 break;
475 }
476 EventKind::ActivityScheduled { name: an, input: ainp } => {
477 inner.nondeterminism_error = Some(format!(
478 "nondeterministic: schedule order mismatch: next is ActivityScheduled('{an}','{ainp}') but expected SubOrchestrationScheduled('{name}','{input}')"
479 ));
480 return Poll::Pending;
481 }
482 EventKind::TimerCreated { .. } => {
483 inner.nondeterminism_error = Some(format!(
484 "nondeterministic: schedule order mismatch: next is TimerCreated but expected SubOrchestrationScheduled('{name}','{input}')"
485 ));
486 return Poll::Pending;
487 }
488 EventKind::ExternalSubscribed { name: en } => {
489 inner.nondeterminism_error = Some(format!(
490 "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected SubOrchestrationScheduled('{name}','{input}')"
491 ));
492 return Poll::Pending;
493 }
494 _ => {}
495 }
496 }
497
498 let event_id = found_event_id.unwrap_or_else(|| {
499 // Not in history - create new
500 let new_id = inner.next_event_id;
501 inner.next_event_id += 1;
502 let exec_id = inner.execution_id;
503 let inst_id = inner.instance_id.clone();
504 let child_instance = format!("sub::{new_id}");
505 *instance.borrow_mut() = child_instance.clone();
506
507 inner.history.push(Event::with_event_id(
508 new_id,
509 inst_id,
510 exec_id,
511 None,
512 EventKind::SubOrchestrationScheduled {
513 name: name.clone(),
514 instance: child_instance.clone(),
515 input: input.clone(),
516 },
517 ));
518
519 inner.record_action(Action::StartSubOrchestration {
520 scheduling_event_id: new_id,
521 name: name.clone(),
522 version: version.clone(),
523 instance: child_instance,
524 input: input.clone(),
525 });
526
527 new_id
528 });
529
530 inner.claimed_scheduling_events.insert(event_id);
531 this.claimed_event_id.set(Some(event_id));
532 }
533
534 // claimed_event_id is guaranteed to be Some after the above block sets it
535 let our_event_id = this
536 .claimed_event_id
537 .get()
538 .expect("claimed_event_id should be set after claiming");
539
540 // Step 2: Look for SubOrch completion - FIFO enforcement
541 let our_completion = inner.history.iter().find_map(|e| {
542 // source_event_id is on the Event struct
543 if e.source_event_id != Some(our_event_id) {
544 return None;
545 }
546 match &e.kind {
547 EventKind::SubOrchestrationCompleted { result } => Some((e.event_id, Ok(result.clone()))),
548 EventKind::SubOrchestrationFailed { details } => {
549 debug_assert!(
550 matches!(details, crate::ErrorDetails::Application { .. }),
551 "INVARIANT: Only Application errors should reach orchestration code, got: {details:?}"
552 );
553 Some((e.event_id, Err(details.display_message())))
554 }
555 _ => None,
556 }
557 });
558
559 if let Some((completion_event_id, result)) = our_completion {
560 // Check: Are all completions BEFORE ours consumed?
561 if can_consume_completion(
562 &inner.history,
563 &inner.consumed_completions,
564 &inner.cancelled_source_ids,
565 completion_event_id,
566 ) {
567 inner.consumed_completions.insert(completion_event_id);
568 return Poll::Ready(DurableOutput::SubOrchestration(result));
569 }
570 }
571
572 Poll::Pending
573 }
574 Kind::System { op, value } => {
575 // Check if we already computed the value
576 if let Some(v) = value.borrow().clone() {
577 return Poll::Ready(DurableOutput::Activity(Ok(v)));
578 }
579
580 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
581 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
582
583 // Step 1: Try to adopt from history (replay)
584 if this.claimed_event_id.get().is_none() {
585 // Look for matching SystemCall event in history
586 let found = inner.history.iter().find_map(|e| {
587 if let EventKind::SystemCall {
588 op: hist_op,
589 value: hist_value,
590 } = &e.kind
591 && hist_op == op
592 && !inner.claimed_scheduling_events.contains(&e.event_id)
593 {
594 return Some((e.event_id, hist_value.clone()));
595 }
596 None
597 });
598
599 if let Some((found_event_id, found_value)) = found {
600 // Found our system call in history - adopt it
601 inner.claimed_scheduling_events.insert(found_event_id);
602 this.claimed_event_id.set(Some(found_event_id));
603 *value.borrow_mut() = Some(found_value.clone());
604 return Poll::Ready(DurableOutput::Activity(Ok(found_value)));
605 }
606 }
607
608 // Step 2: First execution - compute value synchronously
609 if this.claimed_event_id.get().is_none() {
610 let computed_value = match op.as_str() {
611 crate::SYSCALL_OP_GUID => generate_guid(),
612 crate::SYSCALL_OP_UTCNOW_MS => inner.now_ms().to_string(),
613 s if s.starts_with(crate::SYSCALL_OP_TRACE_PREFIX) => {
614 // Parse trace operation: "trace:{level}:{message}"
615 let parts: Vec<&str> = s.splitn(3, ':').collect();
616 if parts.len() == 3 {
617 let level = parts[1];
618 let message = parts[2];
619 // Extract context for structured logging
620 let instance_id = &inner.instance_id;
621 let execution_id = inner.execution_id;
622 let orch_name = inner.orchestration_name.as_deref().unwrap_or("unknown");
623 let orch_version = inner.orchestration_version.as_deref().unwrap_or("unknown");
624 let worker_id = inner.worker_id.as_deref().unwrap_or("unknown");
625
626 // Log to tracing only on first execution (not during replay)
627 // Include instance context for correlation
628 match level {
629 "ERROR" => tracing::error!(
630 target: "duroxide::orchestration",
631 instance_id = %instance_id,
632 execution_id = %execution_id,
633 orchestration_name = %orch_name,
634 orchestration_version = %orch_version,
635 worker_id = %worker_id,
636 "{}", message
637 ),
638 "WARN" => tracing::warn!(
639 target: "duroxide::orchestration",
640 instance_id = %instance_id,
641 execution_id = %execution_id,
642 orchestration_name = %orch_name,
643 orchestration_version = %orch_version,
644 worker_id = %worker_id,
645 "{}", message
646 ),
647 "DEBUG" => tracing::debug!(
648 target: "duroxide::orchestration",
649 instance_id = %instance_id,
650 execution_id = %execution_id,
651 orchestration_name = %orch_name,
652 orchestration_version = %orch_version,
653 worker_id = %worker_id,
654 "{}", message
655 ),
656 _ => tracing::info!(
657 target: "duroxide::orchestration",
658 instance_id = %instance_id,
659 execution_id = %execution_id,
660 orchestration_name = %orch_name,
661 orchestration_version = %orch_version,
662 worker_id = %worker_id,
663 "{}", message
664 ),
665 }
666 }
667 // trace operations don't return values, just empty string
668 String::new()
669 }
670 _ => {
671 inner.nondeterminism_error = Some(format!("unknown system operation: {op}"));
672 return Poll::Pending;
673 }
674 };
675
676 // Allocate event_id and record event
677 let event_id = inner.next_event_id;
678 inner.next_event_id += 1;
679 let exec_id = inner.execution_id;
680 let inst_id = inner.instance_id.clone();
681
682 inner.history.push(Event::with_event_id(
683 event_id,
684 inst_id,
685 exec_id,
686 None,
687 EventKind::SystemCall {
688 op: op.clone(),
689 value: computed_value.clone(),
690 },
691 ));
692
693 inner.record_action(Action::SystemCall {
694 scheduling_event_id: event_id,
695 op: op.clone(),
696 value: computed_value.clone(),
697 });
698
699 inner.claimed_scheduling_events.insert(event_id);
700 this.claimed_event_id.set(Some(event_id));
701 *value.borrow_mut() = Some(computed_value.clone());
702
703 return Poll::Ready(DurableOutput::Activity(Ok(computed_value)));
704 }
705
706 Poll::Pending
707 }
708 }
709 }
710}
711
712// Compile-time contract: DurableFuture must remain Unpin because poll() relies on freely
713// projecting &mut self into its internal Kind. This assertion fails if future changes.
714#[allow(dead_code)] // Used in const assertion below
715const fn assert_unpin<T: Unpin>() {}
716const _: () = {
717 assert_unpin::<DurableFuture>();
718};
719
720// Helper function to generate deterministic GUIDs
721fn generate_guid() -> String {
722 use std::time::{SystemTime, UNIX_EPOCH};
723
724 let timestamp = SystemTime::now()
725 .duration_since(UNIX_EPOCH)
726 .map(|d| d.as_nanos())
727 .unwrap_or(0);
728
729 // Thread-local counter for uniqueness within the same timestamp
730 thread_local! {
731 static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
732 }
733 let counter = COUNTER.with(|c| {
734 let val = c.get();
735 c.set(val.wrapping_add(1));
736 val
737 });
738
739 // Format as UUID-like string
740 format!(
741 "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
742 (timestamp >> 96) as u32,
743 ((timestamp >> 80) & 0xFFFF) as u16,
744 (counter & 0xFFFF) as u16,
745 ((timestamp >> 64) & 0xFFFF) as u16,
746 (timestamp & 0xFFFFFFFFFFFF) as u64
747 )
748}
749
750// Aggregate future machinery
751enum AggregateMode {
752 Select,
753 Join,
754}
755
756pub enum AggregateOutput {
757 Select { winner_index: usize, output: DurableOutput },
758 Join { outputs: Vec<DurableOutput> },
759}
760
761pub struct AggregateDurableFuture {
762 ctx: OrchestrationContext,
763 children: Vec<DurableFuture>,
764 mode: AggregateMode,
765}
766
767impl AggregateDurableFuture {
768 pub(crate) fn new_select(ctx: OrchestrationContext, children: Vec<DurableFuture>) -> Self {
769 Self {
770 ctx,
771 children,
772 mode: AggregateMode::Select,
773 }
774 }
775 pub(crate) fn new_join(ctx: OrchestrationContext, children: Vec<DurableFuture>) -> Self {
776 Self {
777 ctx,
778 children,
779 mode: AggregateMode::Join,
780 }
781 }
782
783 // Note: Unconsumed completion detection removed - the cursor model naturally
784 // handles this via strict sequential consumption. Any unconsumed completions
785 // will cause a panic when the next future tries to poll.
786}
787
788impl Future for AggregateDurableFuture {
789 type Output = AggregateOutput;
790 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
791 let this = unsafe { self.get_unchecked_mut() };
792
793 match this.mode {
794 AggregateMode::Select => {
795 // Two-phase polling to handle replay correctly:
796 // Phase 1: Poll ALL children to ensure they claim their scheduling events.
797 // During replay, the winner might return Ready immediately, but
798 // losers still need to claim their scheduling events to avoid
799 // nondeterminism when subsequent code schedules new operations.
800 // Phase 2: Check which child is ready and return the winner.
801 // Phase 3: Mark loser source_event_ids as cancelled so their completions
802 // don't block FIFO ordering for subsequent operations.
803
804 // Phase 1: Ensure all children claim their scheduling events
805 let mut ready_results: Vec<Option<DurableOutput>> = vec![None; this.children.len()];
806 for (i, child) in this.children.iter_mut().enumerate() {
807 if let Poll::Ready(output) = Pin::new(child).poll(cx) {
808 ready_results[i] = Some(output);
809 }
810 }
811
812 // Phase 2: Find the first ready child (winner)
813 let winner_index = ready_results.iter().position(|r| r.is_some());
814
815 if let Some(winner_idx) = winner_index {
816 // Phase 3: Mark all loser source_event_ids as cancelled
817 {
818 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
819 let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
820 for (i, child) in this.children.iter().enumerate() {
821 if i != winner_idx {
822 // Get the loser's claimed_event_id (source_event_id for its completion)
823 // Now accessed directly from the DurableFuture struct
824 if let Some(source_id) = child.claimed_event_id.get() {
825 inner.cancelled_source_ids.insert(source_id);
826 }
827 }
828 }
829 }
830
831 // Return the winner - safe to unwrap since winner_idx came from position() finding Some
832 let output = ready_results[winner_idx]
833 .take()
834 .expect("winner_idx points to Ready result");
835 return Poll::Ready(AggregateOutput::Select {
836 winner_index: winner_idx,
837 output,
838 });
839 }
840
841 Poll::Pending
842 }
843 AggregateMode::Join => {
844 // Fixed-point polling: keep polling children until no new results appear
845 // This allows cascading consumption respecting completion FIFO ordering.
846 let mut results: Vec<Option<DurableOutput>> = vec![None; this.children.len()];
847 loop {
848 let mut made_progress = false;
849 for (i, child) in this.children.iter_mut().enumerate() {
850 if results[i].is_some() {
851 continue;
852 }
853 if let Poll::Ready(output) = Pin::new(child).poll(cx) {
854 results[i] = Some(output);
855 made_progress = true;
856 }
857 }
858
859 if results.iter().all(|r| r.is_some()) {
860 // All outputs ready: return in persisted history order of completions
861 let mut items: Vec<(u64, usize, DurableOutput)> = Vec::with_capacity(results.len());
862 for (i, out_opt) in results.into_iter().enumerate() {
863 // All results are guaranteed to be Some due to the check above
864 let out = out_opt.expect("All results should be Some at this point");
865 // Determine completion event_id for child i
866 let eid = {
867 // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
868 let inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
869 let child = &this.children[i];
870 // All children must have claimed an event_id before reaching completion
871 let sid = child.claimed_event_id.get().expect("child must claim id");
872 match &child.kind {
873 Kind::Activity { .. } => inner
874 .history
875 .iter()
876 .find_map(|e| {
877 if e.source_event_id != Some(sid) {
878 return None;
879 }
880 match &e.kind {
881 EventKind::ActivityCompleted { .. }
882 | EventKind::ActivityFailed { .. } => Some(e.event_id),
883 _ => None,
884 }
885 })
886 .unwrap_or(u64::MAX),
887 Kind::Timer { .. } => inner
888 .history
889 .iter()
890 .find_map(|e| {
891 if e.source_event_id != Some(sid) {
892 return None;
893 }
894 match &e.kind {
895 EventKind::TimerFired { .. } => Some(e.event_id),
896 _ => None,
897 }
898 })
899 .unwrap_or(u64::MAX),
900 Kind::External { name, .. } => {
901 let n = name.clone();
902 inner
903 .history
904 .iter()
905 .find_map(|e| {
906 if let EventKind::ExternalEvent { name: en, .. } = &e.kind
907 && *en == n
908 {
909 return Some(e.event_id);
910 }
911 None
912 })
913 .unwrap_or(u64::MAX)
914 }
915 Kind::SubOrch { .. } => inner
916 .history
917 .iter()
918 .find_map(|e| {
919 if e.source_event_id != Some(sid) {
920 return None;
921 }
922 match &e.kind {
923 EventKind::SubOrchestrationCompleted { .. }
924 | EventKind::SubOrchestrationFailed { .. } => Some(e.event_id),
925 _ => None,
926 }
927 })
928 .unwrap_or(u64::MAX),
929 Kind::System { .. } => {
930 // For system calls, the event itself is the completion
931 sid
932 }
933 }
934 };
935 items.push((eid, i, out));
936 }
937 items.sort_by_key(|(eid, _i, _)| *eid);
938 let outputs: Vec<DurableOutput> = items.into_iter().map(|(_, _, o)| o).collect();
939 return Poll::Ready(AggregateOutput::Join { outputs });
940 }
941
942 if !made_progress {
943 return Poll::Pending;
944 }
945 // Otherwise, loop again: newly consumed completions may unblock others
946 }
947 }
948 }
949 }
950}
951
952pub struct SelectFuture(pub(crate) AggregateDurableFuture);
953impl Future for SelectFuture {
954 type Output = (usize, DurableOutput);
955 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
956 let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
957 match inner.poll(cx) {
958 Poll::Ready(AggregateOutput::Select { winner_index, output }) => Poll::Ready((winner_index, output)),
959 Poll::Ready(_) => unreachable!(),
960 Poll::Pending => Poll::Pending,
961 }
962 }
963}
964
965pub struct JoinFuture(pub(crate) AggregateDurableFuture);
966impl Future for JoinFuture {
967 type Output = Vec<DurableOutput>;
968 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
969 let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
970 match inner.poll(cx) {
971 Poll::Ready(AggregateOutput::Join { outputs }) => Poll::Ready(outputs),
972 Poll::Ready(_) => unreachable!(),
973 Poll::Pending => Poll::Pending,
974 }
975 }
976}