duroxide/
futures.rs

1use std::cell::{Cell, RefCell};
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5
6use crate::{Action, Event, EventKind, OrchestrationContext};
7
8/// Helper function to check if a completion event can be consumed according to FIFO ordering.
9///
10/// Returns `true` if all completion events in history that occurred before the given
11/// `completion_event_id` have already been consumed (or are cancelled select2 losers).
12///
13/// # Arguments
14/// * `history` - The full event history
15/// * `consumed_completions` - Set of already consumed completion event IDs
16/// * `cancelled_source_ids` - Set of source_event_ids whose completions should be auto-skipped (select2 losers)
17/// * `completion_event_id` - The event ID we want to consume
18fn can_consume_completion(
19    history: &[Event],
20    consumed_completions: &std::collections::HashSet<u64>,
21    cancelled_source_ids: &std::collections::HashSet<u64>,
22    completion_event_id: u64,
23) -> bool {
24    history.iter().all(|e| {
25        match &e.kind {
26            // Completions with source_event_id - check if cancelled
27            EventKind::ActivityCompleted { .. }
28            | EventKind::ActivityFailed { .. }
29            | EventKind::TimerFired { .. }
30            | EventKind::SubOrchestrationCompleted { .. }
31            | EventKind::SubOrchestrationFailed { .. } => {
32                // source_event_id is now on the Event struct
33                if let Some(source_event_id) = e.source_event_id {
34                    // Cancelled completions (select2 losers) don't block - skip them
35                    if cancelled_source_ids.contains(&source_event_id) {
36                        return true;
37                    }
38                }
39                // Otherwise: if before ours, must be consumed
40                e.event_id >= completion_event_id || consumed_completions.contains(&e.event_id)
41            }
42            // External events don't have source_event_id - can't be cancelled via select2
43            EventKind::ExternalEvent { .. } => {
44                e.event_id >= completion_event_id || consumed_completions.contains(&e.event_id)
45            }
46            _ => true, // Non-completions don't affect ordering
47        }
48    })
49}
50
51#[derive(Debug, Clone)]
52pub enum DurableOutput {
53    Activity(Result<String, String>),
54    Timer,
55    External(String),
56    SubOrchestration(Result<String, String>),
57}
58
59/// A durable future representing an asynchronous operation in an orchestration.
60///
61/// Common fields `claimed_event_id` and `ctx` are stored directly on the struct,
62/// while operation-specific data is stored in the `kind` variant.
63pub struct DurableFuture {
64    /// The event ID claimed by this future during scheduling (set on first poll)
65    pub(crate) claimed_event_id: Cell<Option<u64>>,
66    /// Reference to the orchestration context
67    pub(crate) ctx: OrchestrationContext,
68    /// Operation-specific data
69    pub(crate) kind: Kind,
70}
71
72pub(crate) enum Kind {
73    Activity {
74        name: String,
75        input: String,
76    },
77    Timer {
78        delay_ms: u64,
79    },
80    External {
81        name: String,
82        result: RefCell<Option<String>>, // Cache result once found
83    },
84    SubOrch {
85        name: String,
86        version: Option<String>,
87        instance: RefCell<String>, // Updated once event_id is known
88        input: String,
89    },
90    System {
91        op: String,
92        value: RefCell<Option<String>>,
93    },
94}
95
96// KindTag removed - no longer needed with cursor model
97
98impl Future for DurableFuture {
99    type Output = DurableOutput;
100    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
101        // Safety: We never move fields that are !Unpin; we only take &mut to mutate inner Cells and use ctx by reference.
102        let this = unsafe { self.get_unchecked_mut() };
103
104        // Common fields: this.claimed_event_id, this.ctx
105        match &mut this.kind {
106            Kind::Activity { name, input } => {
107                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
108                let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
109
110                // Step 1: Claim scheduling event_id if not already claimed
111                if this.claimed_event_id.get().is_none() {
112                    // Find next unclaimed SCHEDULING event in history (global order enforcement)
113                    let mut found_event_id = None;
114                    for event in &inner.history {
115                        let eid = event.event_id;
116                        if inner.claimed_scheduling_events.contains(&eid) {
117                            continue;
118                        }
119                        match &event.kind {
120                            EventKind::ActivityScheduled { name: n, input: inp } => {
121                                // MUST be our schedule next
122                                if n != name || inp != input {
123                                    // Record nondeterminism gracefully
124                                    inner.nondeterminism_error = Some(format!(
125                                        "nondeterministic: schedule order mismatch: next is ActivityScheduled('{n}','{inp}') but expected ActivityScheduled('{name}','{input}')"
126                                    ));
127                                    return Poll::Pending;
128                                }
129                                found_event_id = Some(eid);
130                                break;
131                            }
132                            EventKind::TimerCreated { .. } => {
133                                inner.nondeterminism_error = Some(format!(
134                                    "nondeterministic: schedule order mismatch: next is TimerCreated but expected ActivityScheduled('{name}','{input}')"
135                                ));
136                                return Poll::Pending;
137                            }
138                            EventKind::ExternalSubscribed { name: en } => {
139                                inner.nondeterminism_error = Some(format!(
140                                    "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected ActivityScheduled('{name}','{input}')"
141                                ));
142                                return Poll::Pending;
143                            }
144                            EventKind::SubOrchestrationScheduled {
145                                name: sn, input: sin, ..
146                            } => {
147                                inner.nondeterminism_error = Some(format!(
148                                    "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected ActivityScheduled('{name}','{input}')"
149                                ));
150                                return Poll::Pending;
151                            }
152                            _ => {}
153                        }
154                    }
155
156                    let event_id = found_event_id.unwrap_or_else(|| {
157                        // Not in history - create new (first execution)
158                        let new_id = inner.next_event_id;
159                        inner.next_event_id += 1;
160                        let exec_id = inner.execution_id;
161                        let inst_id = inner.instance_id.clone();
162
163                        inner.history.push(Event::with_event_id(
164                            new_id,
165                            inst_id,
166                            exec_id,
167                            None,
168                            EventKind::ActivityScheduled {
169                                name: name.clone(),
170                                input: input.clone(),
171                            },
172                        ));
173
174                        inner.record_action(Action::CallActivity {
175                            scheduling_event_id: new_id,
176                            name: name.clone(),
177                            input: input.clone(),
178                        });
179
180                        new_id
181                    });
182
183                    inner.claimed_scheduling_events.insert(event_id);
184                    this.claimed_event_id.set(Some(event_id));
185                }
186
187                // claimed_event_id is guaranteed to be Some after the above block sets it
188                let our_event_id = this
189                    .claimed_event_id
190                    .get()
191                    .expect("claimed_event_id should be set after claiming");
192
193                // Step 2: Look for our completion - FIFO enforcement
194                // Find our completion in history
195                let our_completion = inner.history.iter().find_map(|e| {
196                    // source_event_id is on the Event struct
197                    if e.source_event_id != Some(our_event_id) {
198                        return None;
199                    }
200                    match &e.kind {
201                        EventKind::ActivityCompleted { result } => Some((e.event_id, Ok(result.clone()))),
202                        EventKind::ActivityFailed { details } => {
203                            debug_assert!(
204                                matches!(details, crate::ErrorDetails::Application { .. }),
205                                "INVARIANT: Only Application errors should reach orchestration code, got: {details:?}"
206                            );
207                            Some((e.event_id, Err(details.display_message())))
208                        }
209                        _ => None,
210                    }
211                });
212
213                if let Some((completion_event_id, result)) = our_completion {
214                    // Check: Are all completion events BEFORE ours consumed?
215                    if can_consume_completion(
216                        &inner.history,
217                        &inner.consumed_completions,
218                        &inner.cancelled_source_ids,
219                        completion_event_id,
220                    ) {
221                        inner.consumed_completions.insert(completion_event_id);
222                        return Poll::Ready(DurableOutput::Activity(result));
223                    }
224                }
225
226                Poll::Pending
227            }
228            Kind::Timer { delay_ms } => {
229                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
230                let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
231
232                // Step 1: Claim scheduling event_id
233                if this.claimed_event_id.get().is_none() {
234                    // Enforce global scheduling order
235                    let mut found_event_id = None;
236                    for event in &inner.history {
237                        let eid = event.event_id;
238                        if inner.claimed_scheduling_events.contains(&eid) {
239                            continue;
240                        }
241                        match &event.kind {
242                            EventKind::TimerCreated { .. } => {
243                                found_event_id = Some(eid);
244                                break;
245                            }
246                            EventKind::ActivityScheduled { name: n, input: inp } => {
247                                inner.nondeterminism_error = Some(format!(
248                                    "nondeterministic: schedule order mismatch: next is ActivityScheduled('{n}','{inp}') but expected TimerCreated"
249                                ));
250                                return Poll::Pending;
251                            }
252                            EventKind::ExternalSubscribed { name: en } => {
253                                inner.nondeterminism_error = Some(format!(
254                                    "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected TimerCreated"
255                                ));
256                                return Poll::Pending;
257                            }
258                            EventKind::SubOrchestrationScheduled {
259                                name: sn, input: sin, ..
260                            } => {
261                                inner.nondeterminism_error = Some(format!(
262                                    "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected TimerCreated"
263                                ));
264                                return Poll::Pending;
265                            }
266                            _ => {}
267                        }
268                    }
269
270                    let event_id = found_event_id.unwrap_or_else(|| {
271                        // Not in history - create new (first execution)
272                        let new_id = inner.next_event_id;
273                        inner.next_event_id += 1;
274                        let now = inner.now_ms();
275                        let fire_at_ms = now.saturating_add(*delay_ms);
276                        let exec_id = inner.execution_id;
277                        let inst_id = inner.instance_id.clone();
278
279                        inner.history.push(Event::with_event_id(
280                            new_id,
281                            inst_id,
282                            exec_id,
283                            None,
284                            EventKind::TimerCreated { fire_at_ms },
285                        ));
286
287                        inner.record_action(Action::CreateTimer {
288                            scheduling_event_id: new_id,
289                            fire_at_ms,
290                        });
291
292                        new_id
293                    });
294
295                    inner.claimed_scheduling_events.insert(event_id);
296                    this.claimed_event_id.set(Some(event_id));
297                }
298
299                // claimed_event_id is guaranteed to be Some after the above block sets it
300                let our_event_id = this
301                    .claimed_event_id
302                    .get()
303                    .expect("claimed_event_id should be set after claiming");
304
305                // Step 2: Look for TimerFired - FIFO enforcement
306                let our_completion = inner.history.iter().find_map(|e| {
307                    if matches!(&e.kind, EventKind::TimerFired { .. }) && e.source_event_id == Some(our_event_id) {
308                        return Some(e.event_id);
309                    }
310                    None
311                });
312
313                if let Some(completion_event_id) = our_completion {
314                    // Check: Are all completion events BEFORE ours consumed?
315                    if can_consume_completion(
316                        &inner.history,
317                        &inner.consumed_completions,
318                        &inner.cancelled_source_ids,
319                        completion_event_id,
320                    ) {
321                        inner.consumed_completions.insert(completion_event_id);
322                        return Poll::Ready(DurableOutput::Timer);
323                    }
324                }
325
326                Poll::Pending
327            }
328            Kind::External { name, result } => {
329                // Check if we already have the result cached
330                if let Some(cached) = result.borrow().clone() {
331                    return Poll::Ready(DurableOutput::External(cached));
332                }
333
334                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
335                let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
336
337                // Step 1: Claim ExternalSubscribed event_id
338                if this.claimed_event_id.get().is_none() {
339                    // Enforce global scheduling order
340                    let mut found_event_id = None;
341                    for event in &inner.history {
342                        let eid = event.event_id;
343                        if inner.claimed_scheduling_events.contains(&eid) {
344                            continue;
345                        }
346                        match &event.kind {
347                            EventKind::ExternalSubscribed { name: n } => {
348                                if n != name {
349                                    inner.nondeterminism_error = Some(format!(
350                                        "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{n}') but expected ExternalSubscribed('{name}')"
351                                    ));
352                                    return Poll::Pending;
353                                }
354                                found_event_id = Some(eid);
355                                break;
356                            }
357                            EventKind::ActivityScheduled { name: an, input: ainp } => {
358                                inner.nondeterminism_error = Some(format!(
359                                    "nondeterministic: schedule order mismatch: next is ActivityScheduled('{an}','{ainp}') but expected ExternalSubscribed('{name}')"
360                                ));
361                                return Poll::Pending;
362                            }
363                            EventKind::TimerCreated { .. } => {
364                                inner.nondeterminism_error = Some(format!(
365                                    "nondeterministic: schedule order mismatch: next is TimerCreated but expected ExternalSubscribed('{name}')"
366                                ));
367                                return Poll::Pending;
368                            }
369                            EventKind::SubOrchestrationScheduled {
370                                name: sn, input: sin, ..
371                            } => {
372                                inner.nondeterminism_error = Some(format!(
373                                    "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{sn}','{sin}') but expected ExternalSubscribed('{name}')"
374                                ));
375                                return Poll::Pending;
376                            }
377                            _ => {}
378                        }
379                    }
380
381                    let event_id = found_event_id.unwrap_or_else(|| {
382                        // Not in history - create new
383                        let new_id = inner.next_event_id;
384                        inner.next_event_id += 1;
385                        let exec_id = inner.execution_id;
386                        let inst_id = inner.instance_id.clone();
387
388                        inner.history.push(Event::with_event_id(
389                            new_id,
390                            inst_id,
391                            exec_id,
392                            None,
393                            EventKind::ExternalSubscribed { name: name.clone() },
394                        ));
395
396                        inner.record_action(Action::WaitExternal {
397                            scheduling_event_id: new_id,
398                            name: name.clone(),
399                        });
400
401                        new_id
402                    });
403
404                    inner.claimed_scheduling_events.insert(event_id);
405                    this.claimed_event_id.set(Some(event_id));
406                }
407
408                // claimed_event_id is guaranteed to be Some after the above block sets it
409                let _our_event_id = this
410                    .claimed_event_id
411                    .get()
412                    .expect("claimed_event_id should be set after claiming");
413
414                // Step 2: Look for ExternalEvent (special case - search by name)
415                // External events can arrive in any order
416                if !inner.consumed_external_events.contains(name)
417                    && let Some((event_id, data)) = inner.history.iter().find_map(|e| {
418                        if let EventKind::ExternalEvent { name: ext_name, data } = &e.kind
419                            && ext_name == name
420                        {
421                            return Some((e.event_id, data.clone()));
422                        }
423                        None
424                    })
425                {
426                    // Check: Are all completions BEFORE ours consumed?
427                    if can_consume_completion(
428                        &inner.history,
429                        &inner.consumed_completions,
430                        &inner.cancelled_source_ids,
431                        event_id,
432                    ) {
433                        inner.consumed_completions.insert(event_id);
434                        inner.consumed_external_events.insert(name.clone());
435                        *result.borrow_mut() = Some(data.clone());
436                        return Poll::Ready(DurableOutput::External(data));
437                    }
438                }
439
440                Poll::Pending
441            }
442            Kind::SubOrch {
443                name,
444                version,
445                instance,
446                input,
447            } => {
448                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
449                let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
450
451                // Step 1: Claim SubOrchestrationScheduled event_id
452                if this.claimed_event_id.get().is_none() {
453                    // Enforce global scheduling order
454                    let mut found_event_id = None;
455                    for event in &inner.history {
456                        let eid = event.event_id;
457                        if inner.claimed_scheduling_events.contains(&eid) {
458                            continue;
459                        }
460                        match &event.kind {
461                            EventKind::SubOrchestrationScheduled {
462                                name: n,
463                                input: inp,
464                                instance: inst,
465                            } => {
466                                if n != name || inp != input {
467                                    inner.nondeterminism_error = Some(format!(
468                                        "nondeterministic: schedule order mismatch: next is SubOrchestrationScheduled('{n}','{inp}') but expected SubOrchestrationScheduled('{name}','{input}')"
469                                    ));
470                                    return Poll::Pending;
471                                }
472                                *instance.borrow_mut() = inst.clone();
473                                found_event_id = Some(eid);
474                                break;
475                            }
476                            EventKind::ActivityScheduled { name: an, input: ainp } => {
477                                inner.nondeterminism_error = Some(format!(
478                                    "nondeterministic: schedule order mismatch: next is ActivityScheduled('{an}','{ainp}') but expected SubOrchestrationScheduled('{name}','{input}')"
479                                ));
480                                return Poll::Pending;
481                            }
482                            EventKind::TimerCreated { .. } => {
483                                inner.nondeterminism_error = Some(format!(
484                                    "nondeterministic: schedule order mismatch: next is TimerCreated but expected SubOrchestrationScheduled('{name}','{input}')"
485                                ));
486                                return Poll::Pending;
487                            }
488                            EventKind::ExternalSubscribed { name: en } => {
489                                inner.nondeterminism_error = Some(format!(
490                                    "nondeterministic: schedule order mismatch: next is ExternalSubscribed('{en}') but expected SubOrchestrationScheduled('{name}','{input}')"
491                                ));
492                                return Poll::Pending;
493                            }
494                            _ => {}
495                        }
496                    }
497
498                    let event_id = found_event_id.unwrap_or_else(|| {
499                        // Not in history - create new
500                        let new_id = inner.next_event_id;
501                        inner.next_event_id += 1;
502                        let exec_id = inner.execution_id;
503                        let inst_id = inner.instance_id.clone();
504                        let child_instance = format!("sub::{new_id}");
505                        *instance.borrow_mut() = child_instance.clone();
506
507                        inner.history.push(Event::with_event_id(
508                            new_id,
509                            inst_id,
510                            exec_id,
511                            None,
512                            EventKind::SubOrchestrationScheduled {
513                                name: name.clone(),
514                                instance: child_instance.clone(),
515                                input: input.clone(),
516                            },
517                        ));
518
519                        inner.record_action(Action::StartSubOrchestration {
520                            scheduling_event_id: new_id,
521                            name: name.clone(),
522                            version: version.clone(),
523                            instance: child_instance,
524                            input: input.clone(),
525                        });
526
527                        new_id
528                    });
529
530                    inner.claimed_scheduling_events.insert(event_id);
531                    this.claimed_event_id.set(Some(event_id));
532                }
533
534                // claimed_event_id is guaranteed to be Some after the above block sets it
535                let our_event_id = this
536                    .claimed_event_id
537                    .get()
538                    .expect("claimed_event_id should be set after claiming");
539
540                // Step 2: Look for SubOrch completion - FIFO enforcement
541                let our_completion = inner.history.iter().find_map(|e| {
542                    // source_event_id is on the Event struct
543                    if e.source_event_id != Some(our_event_id) {
544                        return None;
545                    }
546                    match &e.kind {
547                        EventKind::SubOrchestrationCompleted { result } => Some((e.event_id, Ok(result.clone()))),
548                        EventKind::SubOrchestrationFailed { details } => {
549                            debug_assert!(
550                                matches!(details, crate::ErrorDetails::Application { .. }),
551                                "INVARIANT: Only Application errors should reach orchestration code, got: {details:?}"
552                            );
553                            Some((e.event_id, Err(details.display_message())))
554                        }
555                        _ => None,
556                    }
557                });
558
559                if let Some((completion_event_id, result)) = our_completion {
560                    // Check: Are all completions BEFORE ours consumed?
561                    if can_consume_completion(
562                        &inner.history,
563                        &inner.consumed_completions,
564                        &inner.cancelled_source_ids,
565                        completion_event_id,
566                    ) {
567                        inner.consumed_completions.insert(completion_event_id);
568                        return Poll::Ready(DurableOutput::SubOrchestration(result));
569                    }
570                }
571
572                Poll::Pending
573            }
574            Kind::System { op, value } => {
575                // Check if we already computed the value
576                if let Some(v) = value.borrow().clone() {
577                    return Poll::Ready(DurableOutput::Activity(Ok(v)));
578                }
579
580                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
581                let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
582
583                // Step 1: Try to adopt from history (replay)
584                if this.claimed_event_id.get().is_none() {
585                    // Look for matching SystemCall event in history
586                    let found = inner.history.iter().find_map(|e| {
587                        if let EventKind::SystemCall {
588                            op: hist_op,
589                            value: hist_value,
590                        } = &e.kind
591                            && hist_op == op
592                            && !inner.claimed_scheduling_events.contains(&e.event_id)
593                        {
594                            return Some((e.event_id, hist_value.clone()));
595                        }
596                        None
597                    });
598
599                    if let Some((found_event_id, found_value)) = found {
600                        // Found our system call in history - adopt it
601                        inner.claimed_scheduling_events.insert(found_event_id);
602                        this.claimed_event_id.set(Some(found_event_id));
603                        *value.borrow_mut() = Some(found_value.clone());
604                        return Poll::Ready(DurableOutput::Activity(Ok(found_value)));
605                    }
606                }
607
608                // Step 2: First execution - compute value synchronously
609                if this.claimed_event_id.get().is_none() {
610                    let computed_value = match op.as_str() {
611                        crate::SYSCALL_OP_GUID => generate_guid(),
612                        crate::SYSCALL_OP_UTCNOW_MS => inner.now_ms().to_string(),
613                        s if s.starts_with(crate::SYSCALL_OP_TRACE_PREFIX) => {
614                            // Parse trace operation: "trace:{level}:{message}"
615                            let parts: Vec<&str> = s.splitn(3, ':').collect();
616                            if parts.len() == 3 {
617                                let level = parts[1];
618                                let message = parts[2];
619                                // Extract context for structured logging
620                                let instance_id = &inner.instance_id;
621                                let execution_id = inner.execution_id;
622                                let orch_name = inner.orchestration_name.as_deref().unwrap_or("unknown");
623                                let orch_version = inner.orchestration_version.as_deref().unwrap_or("unknown");
624                                let worker_id = inner.worker_id.as_deref().unwrap_or("unknown");
625
626                                // Log to tracing only on first execution (not during replay)
627                                // Include instance context for correlation
628                                match level {
629                                    "ERROR" => tracing::error!(
630                                        target: "duroxide::orchestration",
631                                        instance_id = %instance_id,
632                                        execution_id = %execution_id,
633                                        orchestration_name = %orch_name,
634                                        orchestration_version = %orch_version,
635                                        worker_id = %worker_id,
636                                        "{}", message
637                                    ),
638                                    "WARN" => tracing::warn!(
639                                        target: "duroxide::orchestration",
640                                        instance_id = %instance_id,
641                                        execution_id = %execution_id,
642                                        orchestration_name = %orch_name,
643                                        orchestration_version = %orch_version,
644                                        worker_id = %worker_id,
645                                        "{}", message
646                                    ),
647                                    "DEBUG" => tracing::debug!(
648                                        target: "duroxide::orchestration",
649                                        instance_id = %instance_id,
650                                        execution_id = %execution_id,
651                                        orchestration_name = %orch_name,
652                                        orchestration_version = %orch_version,
653                                        worker_id = %worker_id,
654                                        "{}", message
655                                    ),
656                                    _ => tracing::info!(
657                                        target: "duroxide::orchestration",
658                                        instance_id = %instance_id,
659                                        execution_id = %execution_id,
660                                        orchestration_name = %orch_name,
661                                        orchestration_version = %orch_version,
662                                        worker_id = %worker_id,
663                                        "{}", message
664                                    ),
665                                }
666                            }
667                            // trace operations don't return values, just empty string
668                            String::new()
669                        }
670                        _ => {
671                            inner.nondeterminism_error = Some(format!("unknown system operation: {op}"));
672                            return Poll::Pending;
673                        }
674                    };
675
676                    // Allocate event_id and record event
677                    let event_id = inner.next_event_id;
678                    inner.next_event_id += 1;
679                    let exec_id = inner.execution_id;
680                    let inst_id = inner.instance_id.clone();
681
682                    inner.history.push(Event::with_event_id(
683                        event_id,
684                        inst_id,
685                        exec_id,
686                        None,
687                        EventKind::SystemCall {
688                            op: op.clone(),
689                            value: computed_value.clone(),
690                        },
691                    ));
692
693                    inner.record_action(Action::SystemCall {
694                        scheduling_event_id: event_id,
695                        op: op.clone(),
696                        value: computed_value.clone(),
697                    });
698
699                    inner.claimed_scheduling_events.insert(event_id);
700                    this.claimed_event_id.set(Some(event_id));
701                    *value.borrow_mut() = Some(computed_value.clone());
702
703                    return Poll::Ready(DurableOutput::Activity(Ok(computed_value)));
704                }
705
706                Poll::Pending
707            }
708        }
709    }
710}
711
712// Compile-time contract: DurableFuture must remain Unpin because poll() relies on freely
713// projecting &mut self into its internal Kind. This assertion fails if future changes.
714#[allow(dead_code)] // Used in const assertion below
715const fn assert_unpin<T: Unpin>() {}
716const _: () = {
717    assert_unpin::<DurableFuture>();
718};
719
720// Helper function to generate deterministic GUIDs
721fn generate_guid() -> String {
722    use std::time::{SystemTime, UNIX_EPOCH};
723
724    let timestamp = SystemTime::now()
725        .duration_since(UNIX_EPOCH)
726        .map(|d| d.as_nanos())
727        .unwrap_or(0);
728
729    // Thread-local counter for uniqueness within the same timestamp
730    thread_local! {
731        static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
732    }
733    let counter = COUNTER.with(|c| {
734        let val = c.get();
735        c.set(val.wrapping_add(1));
736        val
737    });
738
739    // Format as UUID-like string
740    format!(
741        "{:08x}-{:04x}-{:04x}-{:04x}-{:012x}",
742        (timestamp >> 96) as u32,
743        ((timestamp >> 80) & 0xFFFF) as u16,
744        (counter & 0xFFFF) as u16,
745        ((timestamp >> 64) & 0xFFFF) as u16,
746        (timestamp & 0xFFFFFFFFFFFF) as u64
747    )
748}
749
750// Aggregate future machinery
751enum AggregateMode {
752    Select,
753    Join,
754}
755
756pub enum AggregateOutput {
757    Select { winner_index: usize, output: DurableOutput },
758    Join { outputs: Vec<DurableOutput> },
759}
760
761pub struct AggregateDurableFuture {
762    ctx: OrchestrationContext,
763    children: Vec<DurableFuture>,
764    mode: AggregateMode,
765}
766
767impl AggregateDurableFuture {
768    pub(crate) fn new_select(ctx: OrchestrationContext, children: Vec<DurableFuture>) -> Self {
769        Self {
770            ctx,
771            children,
772            mode: AggregateMode::Select,
773        }
774    }
775    pub(crate) fn new_join(ctx: OrchestrationContext, children: Vec<DurableFuture>) -> Self {
776        Self {
777            ctx,
778            children,
779            mode: AggregateMode::Join,
780        }
781    }
782
783    // Note: Unconsumed completion detection removed - the cursor model naturally
784    // handles this via strict sequential consumption. Any unconsumed completions
785    // will cause a panic when the next future tries to poll.
786}
787
788impl Future for AggregateDurableFuture {
789    type Output = AggregateOutput;
790    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
791        let this = unsafe { self.get_unchecked_mut() };
792
793        match this.mode {
794            AggregateMode::Select => {
795                // Two-phase polling to handle replay correctly:
796                // Phase 1: Poll ALL children to ensure they claim their scheduling events.
797                //          During replay, the winner might return Ready immediately, but
798                //          losers still need to claim their scheduling events to avoid
799                //          nondeterminism when subsequent code schedules new operations.
800                // Phase 2: Check which child is ready and return the winner.
801                // Phase 3: Mark loser source_event_ids as cancelled so their completions
802                //          don't block FIFO ordering for subsequent operations.
803
804                // Phase 1: Ensure all children claim their scheduling events
805                let mut ready_results: Vec<Option<DurableOutput>> = vec![None; this.children.len()];
806                for (i, child) in this.children.iter_mut().enumerate() {
807                    if let Poll::Ready(output) = Pin::new(child).poll(cx) {
808                        ready_results[i] = Some(output);
809                    }
810                }
811
812                // Phase 2: Find the first ready child (winner)
813                let winner_index = ready_results.iter().position(|r| r.is_some());
814
815                if let Some(winner_idx) = winner_index {
816                    // Phase 3: Mark all loser source_event_ids as cancelled
817                    {
818                        // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
819                        let mut inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
820                        for (i, child) in this.children.iter().enumerate() {
821                            if i != winner_idx {
822                                // Get the loser's claimed_event_id (source_event_id for its completion)
823                                // Now accessed directly from the DurableFuture struct
824                                if let Some(source_id) = child.claimed_event_id.get() {
825                                    inner.cancelled_source_ids.insert(source_id);
826                                }
827                            }
828                        }
829                    }
830
831                    // Return the winner - safe to unwrap since winner_idx came from position() finding Some
832                    let output = ready_results[winner_idx]
833                        .take()
834                        .expect("winner_idx points to Ready result");
835                    return Poll::Ready(AggregateOutput::Select {
836                        winner_index: winner_idx,
837                        output,
838                    });
839                }
840
841                Poll::Pending
842            }
843            AggregateMode::Join => {
844                // Fixed-point polling: keep polling children until no new results appear
845                // This allows cascading consumption respecting completion FIFO ordering.
846                let mut results: Vec<Option<DurableOutput>> = vec![None; this.children.len()];
847                loop {
848                    let mut made_progress = false;
849                    for (i, child) in this.children.iter_mut().enumerate() {
850                        if results[i].is_some() {
851                            continue;
852                        }
853                        if let Poll::Ready(output) = Pin::new(child).poll(cx) {
854                            results[i] = Some(output);
855                            made_progress = true;
856                        }
857                    }
858
859                    if results.iter().all(|r| r.is_some()) {
860                        // All outputs ready: return in persisted history order of completions
861                        let mut items: Vec<(u64, usize, DurableOutput)> = Vec::with_capacity(results.len());
862                        for (i, out_opt) in results.into_iter().enumerate() {
863                            // All results are guaranteed to be Some due to the check above
864                            let out = out_opt.expect("All results should be Some at this point");
865                            // Determine completion event_id for child i
866                            let eid = {
867                                // Mutex lock should never fail in normal operation - if poisoned, it indicates a serious bug
868                                let inner = this.ctx.inner.lock().expect("Mutex should not be poisoned");
869                                let child = &this.children[i];
870                                // All children must have claimed an event_id before reaching completion
871                                let sid = child.claimed_event_id.get().expect("child must claim id");
872                                match &child.kind {
873                                    Kind::Activity { .. } => inner
874                                        .history
875                                        .iter()
876                                        .find_map(|e| {
877                                            if e.source_event_id != Some(sid) {
878                                                return None;
879                                            }
880                                            match &e.kind {
881                                                EventKind::ActivityCompleted { .. }
882                                                | EventKind::ActivityFailed { .. } => Some(e.event_id),
883                                                _ => None,
884                                            }
885                                        })
886                                        .unwrap_or(u64::MAX),
887                                    Kind::Timer { .. } => inner
888                                        .history
889                                        .iter()
890                                        .find_map(|e| {
891                                            if e.source_event_id != Some(sid) {
892                                                return None;
893                                            }
894                                            match &e.kind {
895                                                EventKind::TimerFired { .. } => Some(e.event_id),
896                                                _ => None,
897                                            }
898                                        })
899                                        .unwrap_or(u64::MAX),
900                                    Kind::External { name, .. } => {
901                                        let n = name.clone();
902                                        inner
903                                            .history
904                                            .iter()
905                                            .find_map(|e| {
906                                                if let EventKind::ExternalEvent { name: en, .. } = &e.kind
907                                                    && *en == n
908                                                {
909                                                    return Some(e.event_id);
910                                                }
911                                                None
912                                            })
913                                            .unwrap_or(u64::MAX)
914                                    }
915                                    Kind::SubOrch { .. } => inner
916                                        .history
917                                        .iter()
918                                        .find_map(|e| {
919                                            if e.source_event_id != Some(sid) {
920                                                return None;
921                                            }
922                                            match &e.kind {
923                                                EventKind::SubOrchestrationCompleted { .. }
924                                                | EventKind::SubOrchestrationFailed { .. } => Some(e.event_id),
925                                                _ => None,
926                                            }
927                                        })
928                                        .unwrap_or(u64::MAX),
929                                    Kind::System { .. } => {
930                                        // For system calls, the event itself is the completion
931                                        sid
932                                    }
933                                }
934                            };
935                            items.push((eid, i, out));
936                        }
937                        items.sort_by_key(|(eid, _i, _)| *eid);
938                        let outputs: Vec<DurableOutput> = items.into_iter().map(|(_, _, o)| o).collect();
939                        return Poll::Ready(AggregateOutput::Join { outputs });
940                    }
941
942                    if !made_progress {
943                        return Poll::Pending;
944                    }
945                    // Otherwise, loop again: newly consumed completions may unblock others
946                }
947            }
948        }
949    }
950}
951
952pub struct SelectFuture(pub(crate) AggregateDurableFuture);
953impl Future for SelectFuture {
954    type Output = (usize, DurableOutput);
955    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
956        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
957        match inner.poll(cx) {
958            Poll::Ready(AggregateOutput::Select { winner_index, output }) => Poll::Ready((winner_index, output)),
959            Poll::Ready(_) => unreachable!(),
960            Poll::Pending => Poll::Pending,
961        }
962    }
963}
964
965pub struct JoinFuture(pub(crate) AggregateDurableFuture);
966impl Future for JoinFuture {
967    type Output = Vec<DurableOutput>;
968    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
969        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.0) };
970        match inner.poll(cx) {
971            Poll::Ready(AggregateOutput::Join { outputs }) => Poll::Ready(outputs),
972            Poll::Ready(_) => unreachable!(),
973            Poll::Pending => Poll::Pending,
974        }
975    }
976}