duroxide 0.1.27

Durable code execution framework for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
#![allow(clippy::unwrap_used)]
#![allow(clippy::clone_on_ref_ptr)]
#![allow(clippy::expect_used)]

mod common;

use duroxide::runtime::{self, registry::ActivityRegistry};
use duroxide::{ActivityContext, OrchestrationContext, OrchestrationRegistry};
use std::sync::Arc;
use std::time::Duration;

#[tokio::test]
async fn test_new_guid() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("TestGuid", |ctx: OrchestrationContext, _input: String| async move {
            let guid1 = ctx.new_guid().await?;
            let guid2 = ctx.new_guid().await?;

            // GUIDs should be different
            assert_ne!(guid1, guid2);

            // GUIDs should be valid hex strings (excluding hyphens)
            assert!(guid1.chars().filter(|c| *c != '-').all(|c| c.is_ascii_hexdigit()));
            assert!(guid2.chars().filter(|c| *c != '-').all(|c| c.is_ascii_hexdigit()));

            Ok(format!("{guid1},{guid2}"))
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());
    client.start_orchestration("test-guid", "TestGuid", "").await.unwrap();
    let client = duroxide::Client::new(store.clone());
    let status = client
        .wait_for_orchestration("test-guid", tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
        // Result should contain two different GUIDs
        let parts: Vec<&str> = output.split(',').collect();
        assert_eq!(parts.len(), 2);
        assert_ne!(parts[0], parts[1]);
    } else {
        panic!("Orchestration did not complete successfully: {status:?}");
    }

    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_utc_now_ms() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("TestTime", |ctx: OrchestrationContext, _input: String| async move {
            let time1 = ctx.utc_now().await?;

            // Add a small timer to ensure time progresses
            ctx.schedule_timer(Duration::from_millis(100)).await;

            let time2 = ctx.utc_now().await?;

            // Convert to milliseconds for validation
            let t1 = time1
                .duration_since(std::time::UNIX_EPOCH)
                .map_err(|e| e.to_string())?
                .as_millis() as u64;
            let t2 = time2
                .duration_since(std::time::UNIX_EPOCH)
                .map_err(|e| e.to_string())?
                .as_millis() as u64;

            // Times should be reasonable (after year 2020)
            assert!(t1 > 1577836800000); // Jan 1, 2020
            assert!(t2 > 1577836800000);

            // Second time should be after first (since we had a timer in between)
            assert!(t2 >= t1);

            Ok(format!("{t1},{t2}"))
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());
    client.start_orchestration("test-time", "TestTime", "").await.unwrap();
    let client = duroxide::Client::new(store.clone());
    let status = client
        .wait_for_orchestration("test-time", tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
        // Result should contain two timestamps
        let parts: Vec<&str> = output.split(',').collect();
        assert_eq!(parts.len(), 2);
    } else {
        panic!("Orchestration did not complete successfully: {status:?}");
    }

    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_system_calls_deterministic_replay() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register(
            "TestDeterminism",
            |ctx: OrchestrationContext, _input: String| async move {
                let guid = ctx.new_guid().await?;
                let time = ctx.utc_now().await?;

                // Use values in some computation
                let time_ms = time
                    .duration_since(std::time::UNIX_EPOCH)
                    .map_err(|e| e.to_string())?
                    .as_millis() as u64;
                let result = format!("guid:{guid},time:{time_ms}");

                Ok(result)
            },
        )
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities.clone(), orchestrations.clone()).await;

    // Run orchestration first time
    let instance = "test-determinism";
    let client = duroxide::Client::new(store.clone());
    client
        .start_orchestration(instance, "TestDeterminism", "")
        .await
        .unwrap();
    let client = duroxide::Client::new(store.clone());
    let status1 = client
        .wait_for_orchestration(instance, tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    let output1 = if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status1 {
        output
    } else {
        panic!("First run did not complete successfully: {status1:?}");
    };

    rt.shutdown(None).await;

    // Start new runtime with same store (simulating restart)
    let rt2 = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;

    // The orchestration should complete with the same result due to deterministic replay
    let client2 = duroxide::Client::new(store.clone());
    let status2 = client2
        .wait_for_orchestration(instance, tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    let output2 = if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status2 {
        output
    } else {
        panic!("Second run did not complete successfully: {status2:?}");
    };

    // Outputs should be identical
    assert_eq!(output1, output2);

    rt2.shutdown(None).await;
}

#[tokio::test]
async fn test_system_calls_with_select() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder()
        .register("QuickTask", |_ctx: ActivityContext, _: String| async move {
            Ok("task_done".to_string())
        })
        .build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("TestSelect", |ctx: OrchestrationContext, _input: String| async move {
            // Test: System calls should work correctly with activities in select/join

            // First, get a system call result
            let guid = ctx.new_guid().await?;

            // Test select2 with activities - system calls complete synchronously in the background
            let activity1 = ctx.schedule_activity("QuickTask", "task1");
            let activity2 = ctx.schedule_activity("QuickTask", "task2");

            let (winner_idx, output) = ctx.select2(activity1, activity2).await.into_tuple();

            let first_result = match output {
                Ok(s) => s,
                Err(e) => format!("error: {e}"),
            };

            // Get another system call to verify they work throughout the orchestration
            let time = ctx.utc_now().await?;

            // Verify both system calls returned valid values
            assert!(guid.len() == 36, "GUID should be valid");
            let time_ms = time
                .duration_since(std::time::UNIX_EPOCH)
                .map_err(|e| e.to_string())?
                .as_millis() as u64;
            assert!(time_ms > 0, "Time should be positive");

            Ok(format!(
                "winner:{},result:{},guid_len:{},time_valid:true",
                winner_idx,
                first_result,
                guid.len()
            ))
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());
    client
        .start_orchestration("test-select", "TestSelect", "")
        .await
        .unwrap();
    let status = client
        .wait_for_orchestration("test-select", tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
        println!("Output: {output}");
        // Output should contain winner index, result, guid validation, and time validation
        assert!(output.starts_with("winner:"), "Output should start with 'winner:'");
        assert!(output.contains("result:task_done"), "Output should contain task result");
        assert!(output.contains("guid_len:36"), "GUID should be 36 chars");
        assert!(output.contains("time_valid:true"), "Time should be valid");
    } else {
        panic!("Orchestration did not complete successfully: {status:?}");
    }

    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_system_calls_join_with_activities() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder()
        .register("SlowTask", |_ctx: ActivityContext, input: String| async move {
            Ok(format!("processed:{input}"))
        })
        .build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("TestJoin", |ctx: OrchestrationContext, _input: String| async move {
            // Test 1: Call system calls and activity separately since they have different return types
            let guid = ctx.new_guid().await?;
            let time = ctx.utc_now().await?;
            let activity_result = ctx.schedule_activity("SlowTask", "data1").await?;

            // Validate the values
            assert_eq!(guid.len(), 36, "GUID should be 36 chars");
            let time_ms = time
                .duration_since(std::time::UNIX_EPOCH)
                .map_err(|e| e.to_string())?
                .as_millis() as u64;
            assert!(time_ms > 0, "Time should be positive");
            assert_eq!(activity_result, "processed:data1");

            // Test 2: Select between two activities (same type)
            let activity1 = ctx.schedule_activity("SlowTask", "data2");
            let activity2 = ctx.schedule_activity("SlowTask", "data3");

            let (winner_idx, output) = ctx.select2(activity1, activity2).await.into_tuple();

            let winner_result = match output {
                Ok(s) => s,
                Err(e) => panic!("Expected activity output: {e}"),
            };

            // System call should typically win since it completes synchronously
            // But we accept either winner

            Ok(format!(
                "guid_len:{},time:{},activity:{},winner:{},winner_result:{}",
                guid.len(),
                time_ms,
                activity_result,
                winner_idx,
                winner_result
            ))
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());
    client.start_orchestration("test-join", "TestJoin", "").await.unwrap();
    let status = client
        .wait_for_orchestration("test-join", tokio::time::Duration::from_secs(5))
        .await
        .unwrap();

    if let duroxide::runtime::OrchestrationStatus::Completed { output, .. } = status {
        println!("Output: {output}");
        assert!(output.contains("guid_len:36"), "GUID should be 36 chars");
        assert!(
            output.contains("activity:processed:data1"),
            "Activity should process correctly"
        );
        assert!(output.contains("winner:"), "Should have winner");
    } else {
        panic!("Orchestration did not complete successfully: {status:?}");
    }

    rt.shutdown(None).await;
}

/// Test: Verify that utc_now() used as activity input replays correctly.
///
/// This test verifies replay correctness:
/// 1. First turn: utc_now returns T1, waits for external event
/// 2. External event triggers second turn (replay)
/// 3. On replay, utc_now should return the SAME value T1 from history
/// 4. Activity is scheduled with T1 as input - should match history
#[tokio::test]
async fn test_utc_now_as_activity_input_replays_correctly() {
    let (store, _td) = common::create_sqlite_store_disk().await;

    let activities = ActivityRegistry::builder()
        .register(
            "ProcessWithTimestamp",
            |_ctx: ActivityContext, input: String| async move { Ok(format!("processed:{}", input)) },
        )
        .build();

    let orchestrations = OrchestrationRegistry::builder()
        .register(
            "TestUtcNowReplayAsInput",
            |ctx: OrchestrationContext, _input: String| async move {
                // Get a timestamp
                let time = ctx.utc_now().await?;
                let time_ms = time
                    .duration_since(std::time::UNIX_EPOCH)
                    .map_err(|e| e.to_string())?
                    .as_millis() as u64;

                // Wait for external event - this forces a turn boundary
                let _ = ctx.schedule_wait("continue").await;

                // Use timestamp as input to an activity
                // On replay, utc_now must return the same value or this will cause nondeterminism
                let result = ctx
                    .schedule_activity("ProcessWithTimestamp", time_ms.to_string())
                    .await?;

                Ok(result)
            },
        )
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities.clone(), orchestrations.clone()).await;
    let client = duroxide::Client::new(store.clone());

    client
        .start_orchestration("test-utcnow-replay", "TestUtcNowReplayAsInput", "")
        .await
        .unwrap();

    // Wait for the external subscription to be registered
    let subscribed = common::wait_for_subscription(store.clone(), "test-utcnow-replay", "continue", 2000).await;
    assert!(subscribed, "Orchestration should subscribe to 'continue' event");

    // Wait a bit so wall-clock time advances
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Send the external event - this triggers replay of the orchestration
    client
        .raise_event("test-utcnow-replay", "continue", "go")
        .await
        .unwrap();

    let status = client
        .wait_for_orchestration("test-utcnow-replay", Duration::from_secs(10))
        .await
        .unwrap();

    match status {
        duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
            println!("Orchestration completed: {}", output);
            assert!(output.starts_with("processed:"), "Should have processed the timestamp");
        }
        duroxide::runtime::OrchestrationStatus::Failed { details, .. } => {
            panic!("Orchestration failed: {}", details.display_message());
        }
        other => {
            panic!("Unexpected status: {other:?}");
        }
    }

    rt.shutdown(None).await;
}

/// Test: Verify that new_guid() used as activity input replays correctly.
///
/// Similar to utc_now test - new_guid must return the same value on replay.
#[tokio::test]
async fn test_new_guid_as_activity_input_replays_correctly() {
    let (store, _td) = common::create_sqlite_store_disk().await;

    let activities = ActivityRegistry::builder()
        .register("ProcessWithId", |_ctx: ActivityContext, input: String| async move {
            Ok(format!("processed:{}", input))
        })
        .build();

    let orchestrations = OrchestrationRegistry::builder()
        .register(
            "TestGuidNondeterminism",
            |ctx: OrchestrationContext, _input: String| async move {
                // Get a GUID
                let guid = ctx.new_guid().await?;

                // Wait for external event - this forces a turn boundary
                let _ = ctx.schedule_wait("continue").await;

                // Use guid as input to an activity
                // On replay, new_guid must return the same value or this will cause nondeterminism
                let result = ctx.schedule_activity("ProcessWithId", guid).await?;

                Ok(result)
            },
        )
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities.clone(), orchestrations.clone()).await;
    let client = duroxide::Client::new(store.clone());

    client
        .start_orchestration("test-guid-replay", "TestGuidNondeterminism", "")
        .await
        .unwrap();

    // Wait for the external subscription to be registered
    let subscribed = common::wait_for_subscription(store.clone(), "test-guid-replay", "continue", 2000).await;
    assert!(subscribed, "Orchestration should subscribe to 'continue' event");

    // Send the external event - this triggers replay
    client.raise_event("test-guid-replay", "continue", "go").await.unwrap();

    let status = client
        .wait_for_orchestration("test-guid-replay", Duration::from_secs(10))
        .await
        .unwrap();

    match status {
        duroxide::runtime::OrchestrationStatus::Completed { output, .. } => {
            println!("Orchestration completed: {}", output);
            assert!(output.starts_with("processed:"), "Should have processed the guid");
        }
        duroxide::runtime::OrchestrationStatus::Failed { details, .. } => {
            panic!("Orchestration failed: {}", details.display_message());
        }
        other => {
            panic!("Unexpected status: {other:?}");
        }
    }

    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_activity_then_syscall_ordering() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );

    let activities = ActivityRegistry::builder()
        .register("A", |_ctx: ActivityContext, _input: String| async move {
            Ok("a".to_string())
        })
        .register("B", |_ctx: ActivityContext, input: String| async move {
            Ok(format!("b:{input}"))
        })
        .build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("Ordering", |ctx: OrchestrationContext, _input: String| async move {
            let _ = ctx.schedule_activity("A", "").await?;
            let guid = ctx.new_guid().await?;
            let _ = ctx.schedule_activity("B", guid).await?;
            Ok("ok".to_string())
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());

    client
        .start_orchestration("test-ordering-1", "Ordering", "")
        .await
        .unwrap();
    let status = client
        .wait_for_orchestration("test-ordering-1", Duration::from_secs(5))
        .await
        .unwrap();
    assert!(matches!(
        status,
        duroxide::runtime::OrchestrationStatus::Completed { .. }
    ));

    let history = client.read_execution_history("test-ordering-1", 1).await.unwrap();
    let scheduled_names: Vec<String> = history
        .iter()
        .filter_map(|e| match &e.kind {
            duroxide::EventKind::ActivityScheduled { name, .. } => Some(name.clone()),
            _ => None,
        })
        .collect();

    // Ordering should be stable: A, syscall(new_guid), B
    assert_eq!(scheduled_names.len(), 3);
    assert_eq!(scheduled_names[0], "A");
    assert_eq!(scheduled_names[1], "__duroxide_syscall:new_guid");
    assert_eq!(scheduled_names[2], "B");

    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_multiple_syscalls_same_type() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register("TwoGuids", |ctx: OrchestrationContext, _input: String| async move {
            let g1 = ctx.new_guid().await?;
            let g2 = ctx.new_guid().await?;
            assert_ne!(g1, g2);
            Ok(format!("{g1},{g2}"))
        })
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities.clone(), orchestrations.clone()).await;
    let client = duroxide::Client::new(store.clone());
    client
        .start_orchestration("test-two-guids", "TwoGuids", "")
        .await
        .unwrap();
    let status1 = client
        .wait_for_orchestration("test-two-guids", Duration::from_secs(5))
        .await
        .unwrap();
    let output1 = match status1 {
        duroxide::runtime::OrchestrationStatus::Completed { output, .. } => output,
        other => panic!("Unexpected status: {other:?}"),
    };
    rt.shutdown(None).await;

    // Restart runtime and ensure replay is stable
    let rt2 = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let status2 = client
        .wait_for_orchestration("test-two-guids", Duration::from_secs(5))
        .await
        .unwrap();
    let output2 = match status2 {
        duroxide::runtime::OrchestrationStatus::Completed { output, .. } => output,
        other => panic!("Unexpected status: {other:?}"),
    };
    assert_eq!(output1, output2);
    rt2.shutdown(None).await;
}

#[tokio::test(flavor = "current_thread")]
async fn test_syscalls_work_in_single_thread_mode() {
    let store = Arc::new(
        duroxide::providers::sqlite::SqliteProvider::new_in_memory()
            .await
            .unwrap(),
    );
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register(
            "SingleThreadSyscalls",
            |ctx: OrchestrationContext, _input: String| async move {
                for _ in 0..3 {
                    let _ = ctx.new_guid().await?;
                    let _ = ctx.utc_now().await?;
                }
                Ok("ok".to_string())
            },
        )
        .build();

    let options = runtime::RuntimeOptions {
        orchestration_concurrency: 1,
        worker_concurrency: 1,
        ..Default::default()
    };

    let rt = runtime::Runtime::start_with_options(store.clone(), activities, orchestrations, options).await;
    let client = duroxide::Client::new(store.clone());
    client
        .start_orchestration("test-single-thread-syscalls", "SingleThreadSyscalls", "")
        .await
        .unwrap();
    let status = client
        .wait_for_orchestration("test-single-thread-syscalls", Duration::from_secs(5))
        .await
        .unwrap();
    assert!(matches!(
        status,
        duroxide::runtime::OrchestrationStatus::Completed { .. }
    ));
    rt.shutdown(None).await;
}

#[tokio::test]
async fn test_cancellation_with_pending_syscall() {
    let (store, _td) = common::create_sqlite_store_disk().await;
    let activities = ActivityRegistry::builder().build();

    let orchestrations = OrchestrationRegistry::builder()
        .register(
            "CancelSyscall",
            |ctx: OrchestrationContext, _input: String| async move {
                // Ensure syscall activity is exercised before cancellation.
                let _ = ctx.utc_now().await?;
                // Then wait so we can cancel deterministically.
                let _ = ctx.schedule_wait("hold").await;
                Ok("done".to_string())
            },
        )
        .build();

    let rt = runtime::Runtime::start_with_store(store.clone(), activities, orchestrations).await;
    let client = duroxide::Client::new(store.clone());
    client
        .start_orchestration("test-cancel-syscall", "CancelSyscall", "")
        .await
        .unwrap();

    let subscribed = common::wait_for_subscription(store.clone(), "test-cancel-syscall", "hold", 2000).await;
    assert!(subscribed, "Orchestration should subscribe to 'hold' event");

    client
        .cancel_instance("test-cancel-syscall", "test cancellation")
        .await
        .unwrap();

    let status = client
        .wait_for_orchestration("test-cancel-syscall", Duration::from_secs(10))
        .await
        .unwrap();

    match status {
        duroxide::runtime::OrchestrationStatus::Failed { details, .. } => match details {
            duroxide::ErrorDetails::Application {
                kind: duroxide::AppErrorKind::Cancelled { .. },
                ..
            } => {}
            other => panic!("Expected cancelled application error, got: {other:?}"),
        },
        other => panic!("Expected Failed cancellation status, got: {other:?}"),
    }

    rt.shutdown(None).await;
}