supermachine 0.4.23

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
//! Per-vCPU worker and dispatch loop.
//!
//! The runner owns VM lifecycle. This module owns vCPU execution, MMIO exit
//! dispatch, PSCI handling, quiesce windows, and snapshot trigger checks.

use std::fmt;
use std::sync::Arc;

use crate::devices::mmio_bus::MmioBus;
use crate::devices::virtio::mmio::MmioVirtio;
use crate::devices::virtio::vsock::device::Vsock;
use crate::hvf::Vcpu;
use crate::vmm::coord::VcpuCoordinator;
use crate::vmm::vstate::MicroVm;

pub struct DispatchSnapshot<'a> {
    pub after_ms: Option<u64>,
    pub at_heartbeat: Option<u64>,
    pub on_listener: bool,
    /// When `true`, the dispatch loop fires its readiness trigger
    /// on the [`crate::devices::serial::PRE_EXEC_READY`] atomic —
    /// init-oci just printed "workload-pre-exec" and is in a brief
    /// nanosleep, giving us a stable WFI window to capture in. Used
    /// by the always-pipelined-skip-warm `.build()` path: snapshots
    /// the guest BEFORE the workload runs, so each restore re-execs
    /// the workload fresh (which is what agent-runtime users want).
    /// Saves 50–150 ms of bake time vs `on_listener`.
    ///
    /// When `on_pre_exec` AND `on_listener` are both true, pre-exec
    /// wins (it fires earlier in the boot timeline). With `with_warmup`
    /// or service-image bakes, leave this `false` to preserve the
    /// existing listener-ready semantics.
    pub on_pre_exec: bool,
    pub quiesce_ms: u64,
    pub out_path: Option<&'a str>,
    pub stop_requested: Option<&'a std::sync::atomic::AtomicBool>,
    /// Pipelined-bake (`bake-then-pool`) signal. When `out_path`
    /// is `None` AND this is `true`, the dispatch loop returns
    /// `DispatchExit::BakeReady` on the first readiness trigger
    /// (workload-parked / listener-ready / snapshot-after-ms
    /// fallback) instead of doing the no-op `take_snapshot`. The
    /// runner's outer loop then writes `BAKE_READY` on the pool
    /// ctl writer and re-enters dispatch.
    pub bake_ready_signal: bool,
    /// Path to the worker's exec-vsock unix socket. When the
    /// snapshot trigger fires AND vCPUs > 1, the worker connects
    /// to this socket as a client, sends a `smpark_park` CONTROL
    /// (which the muxer forwards to the in-guest agent via
    /// virtio-vsock), waits for the ack, then proceeds with the
    /// pause-and-capture rendezvous. After the capture, sends
    /// `smpark_unpark` to wake the parked secondaries. Best-
    /// effort: `None` or transport failure or smpark.ko not loaded
    /// → fall through to the existing rendezvous-only path.
    pub vsock_exec_path: Option<&'a str>,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DispatchExit {
    Canceled,
    Stopped,
    SnapshotSaved,
    SystemOff,
    /// Returned by `dispatch_vcpu` exactly once per worker
    /// lifetime when `bake_ready_signal` is set: the bake init
    /// has reached its readiness trigger but we're not capturing
    /// (no `out_path`). Runner uses this to surface the signal
    /// to the host and transition to pool-mode dispatch.
    BakeReady,
}

#[derive(Debug)]
pub enum WorkerError {
    ListenerReadinessTimeout {
        after_ms: u64,
    },
    QuiesceThreadSpawn(std::io::Error),
    SnapshotCapture(crate::hvf::Error),
    SnapshotSave {
        path: String,
        source: crate::vmm::snapshot::FileError,
    },
    VcpuCreate {
        idx: u32,
        source: crate::hvf::Error,
    },
    VcpuRun {
        idx: u32,
        source: crate::hvf::Error,
    },
    VcpuSetup {
        idx: u32,
        source: crate::hvf::Error,
    },
    VtimerMask(crate::hvf::Error),
}

impl fmt::Display for WorkerError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            WorkerError::ListenerReadinessTimeout { after_ms } => write!(
                f,
                "listener readiness timeout after {after_ms} ms; refusing zero-listener snapshot"
            ),
            WorkerError::QuiesceThreadSpawn(e) => write!(f, "spawn quiesce canceller: {e}"),
            WorkerError::SnapshotCapture(e) => write!(f, "capture snapshot: {e:?}"),
            WorkerError::SnapshotSave { path, source } => {
                write!(f, "save snapshot {path}: {source:?}")
            }
            WorkerError::VcpuCreate { idx, source } => {
                write!(f, "create vCPU {idx}: {source:?}")
            }
            WorkerError::VcpuRun { idx, source } => {
                write!(f, "run vCPU {idx}: {source:?}")
            }
            WorkerError::VcpuSetup { idx, source } => {
                write!(f, "setup vCPU {idx}: {source:?}")
            }
            WorkerError::VtimerMask(e) => write!(f, "set vtimer mask: {e:?}"),
        }
    }
}

impl std::error::Error for WorkerError {}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum StepExit {
    Canceled,
    Continue,
    Stop,
    SystemOff,
}

/// Optionally hint macOS to schedule this vCPU thread onto a
/// P-core via `pthread_set_qos_class_self_np`. **Off by default**:
/// benchmarks (rust:1-slim warm rustc, 8 workers) showed the
/// scheduler's default behaviour beats both USER_INTERACTIVE and
/// USER_INITIATED for our workload — macOS already places HVF-
/// entitled foreground threads well, and forcing higher priority
/// only causes contention with other system threads. Kept as
/// opt-in for cases where the user is benchmarking on a quiet
/// host or has a specific workload that benefits.
///
/// Set `SUPERMACHINE_QOS=user_interactive` (or `=user_initiated`)
/// to enable. Anything else is a no-op.
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn pin_vcpu_thread_to_pcore() {
    const QOS_CLASS_USER_INTERACTIVE: u32 = 0x21;
    const QOS_CLASS_USER_INITIATED: u32 = 0x19;
    let cls = match std::env::var("SUPERMACHINE_QOS").as_deref() {
        Ok("user_interactive") => QOS_CLASS_USER_INTERACTIVE,
        Ok("user_initiated") => QOS_CLASS_USER_INITIATED,
        _ => return,
    };
    unsafe extern "C" {
        fn pthread_set_qos_class_self_np(qos_class: u32, relative_priority: i32) -> i32;
    }
    unsafe {
        let _ = pthread_set_qos_class_self_np(cls, 0);
    }
}

#[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
pub fn pin_vcpu_thread_to_pcore() {}


#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn run_secondary(
    idx: u32,
    coord: Arc<VcpuCoordinator>,
    bus: Arc<MmioBus>,
    restore_state: Option<crate::vmm::snapshot::PerVcpuState>,
) {
    pin_vcpu_thread_to_pcore();
    if let Err(e) = run_secondary_inner(idx, coord, bus, restore_state) {
        eprintln!("  [vcpu-{idx}] worker error: {e}");
    }
}

#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn run_secondary_inner(
    idx: u32,
    coord: Arc<VcpuCoordinator>,
    bus: Arc<MmioBus>,
    restore_state: Option<crate::vmm::snapshot::PerVcpuState>,
) -> Result<(), WorkerError> {
    use applevisor_sys as av;

    let vcpu = Vcpu::new().map_err(|source| WorkerError::VcpuCreate { idx, source })?;
    // MPIDR_EL1 must equal the vCPU index (matches FDT cpu@N reg + GIC
    // redistributor frame). Without this, GIC PPIs never route here.
    vcpu.set_sys_reg(av::hv_sys_reg_t::MPIDR_EL1, idx as u64)
        .map_err(|source| WorkerError::VcpuSetup { idx, source })?;
    // Register the handle so the snapshot trigger thread can
    // `hv_vcpus_exit` us when it needs to capture state.
    coord.register_secondary(vcpu.handle());

    if let Some(st) = restore_state {
        // Multi-vCPU restore path. The snapshot already has our
        // full register state; load it and dispatch directly. No
        // PSCI park, no waiting for CPU_ON — the kernel onlined
        // us before snapshot fired and "thinks" we're still up,
        // which is true once we restore state and start running.
        eprintln!("  [vcpu-{idx}] restoring from snapshot");
        crate::vmm::snapshot::restore_vcpu_state(&vcpu, &st)
            .map_err(|source| WorkerError::VcpuSetup { idx, source })?;
        coord.slots[idx as usize]
            .on
            .store(true, std::sync::atomic::Ordering::SeqCst);
        return dispatch_vcpu_inner(idx, &vcpu, &bus, &coord);
    }

    // Boot-from-scratch path. Secondary parks waiting for vcpu0
    // to issue PSCI CPU_ON during kernel SMP bring-up.
    eprintln!("  [vcpu-{idx}] parked, waiting for PSCI CPU_ON");
    let Some((entry, ctx_id)) = coord.wait_for_run(idx) else {
        return Ok(());
    };
    eprintln!("  [vcpu-{idx}] CPU_ON entry=0x{entry:x} ctx=0x{ctx_id:x}");
    // Linux aarch64 boot protocol for secondaries: same as primary:
    // EL1h, DAIF masked, MMU off. X0 = context id. X1..X3 = 0.
    vcpu.set_reg(av::hv_reg_t::CPSR, 0x3c5).ok();
    vcpu.set_reg(av::hv_reg_t::PC, entry).ok();
    vcpu.set_reg(av::hv_reg_t::X0, ctx_id).ok();
    vcpu.set_reg(av::hv_reg_t::X1, 0).ok();
    vcpu.set_reg(av::hv_reg_t::X2, 0).ok();
    vcpu.set_reg(av::hv_reg_t::X3, 0).ok();
    dispatch_vcpu_inner(idx, &vcpu, &bus, &coord)
}

/// Per-vCPU dispatch loop. Returns when the vCPU is canceled / shut down.
/// Snapshot trigger only runs on vCPU 0.
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub fn dispatch_vcpu(
    idx: u32,
    vcpu: &Vcpu,
    bus: &Arc<MmioBus>,
    coord: &Arc<VcpuCoordinator>,
    all_mmio: &[Arc<MmioVirtio>],
    vsock: &Arc<Vsock>,
    vm: &MicroVm,
    snapshot: DispatchSnapshot<'_>,
) -> Result<DispatchExit, WorkerError> {
    use crate::devices::serial::HEARTBEAT_COUNT;
    use crate::vmm::snapshot;
    use std::sync::atomic::Ordering;

    let dispatch_start = std::time::Instant::now();
    let mut quiesced = false;
    let take_snapshot = |reason: &str| -> Result<bool, WorkerError> {
        let Some(out) = snapshot.out_path else {
            return Ok(false);
        };
        eprintln!("  snapshot trigger ({reason})");
        let t0 = std::time::Instant::now();
        // Multi-vCPU: park secondaries via smpark.ko before the
        // rendezvous. The worker connects to its own exec-vsock
        // unix socket; the muxer relays the CONTROL frame through
        // virtio-vsock to the in-guest agent, which ioctls
        // /dev/smpark. Secondaries broadcast-IPI into WFI with
        // IRQs masked + LRs drained, producing byte-identical-
        // trivial captured per-vCPU state that HVF can round-trip.
        // Best-effort: smpark.ko not loaded / no exec path / RPC
        // failure → fall through to existing rendezvous-only
        // capture (works on 1-vCPU; intermittent on N>1).
        // Note: smpark park/unpark is NOT driven from this in-
        // worker snapshot trigger path. The worker's vCPU 0
        // dispatcher thread is what would issue the agent RPC,
        // but it's also what runs hv_vcpu_run for vCPU 0 — so
        // the agent (which lives on vCPU 0) can never service
        // the request while we're blocking here. Result: 5s
        // timeout per snapshot. Multi-vCPU snapshot reliability
        // requires the host-driven pipelined-bake path; the
        // sequential bake's heartbeat trigger remains
        // intermittent-on-N>1.
        //
        // See bake.rs::run_native_supermachine_bake_pipelined
        // for the working path; PooledVm::snapshot also drives
        // smpark host-side from the embedder-facing API.
        let secondary_handles = coord.secondary_handles_snapshot();
        if !secondary_handles.is_empty() {
            coord.request_snapshot_pause(&secondary_handles);
        }
        let virtio = snapshot::VirtioSnapshot {
            mmio: all_mmio.iter().map(|m| m.capture_state()).collect(),
            vsock_listeners: vsock.muxer().capture_tsi_listeners(),
        };
        let secondary_states = if !secondary_handles.is_empty() {
            coord.take_secondary_states()
        } else {
            Vec::new()
        };
        // Streaming capture+save: skips the 2 GiB intermediate
        // RAM copy that the legacy capture_snapshot allocates.
        // Same correctness invariant — guest is paused for the
        // duration via the secondary rendezvous above.
        let write_stats = snapshot::capture_and_save_streaming(
            vm,
            &virtio,
            &secondary_states,
            out,
        )
        .map_err(|e| match e {
            snapshot::SnapshotStreamError::Hvf(h) => WorkerError::SnapshotCapture(h),
            snapshot::SnapshotStreamError::Io(io) => WorkerError::SnapshotSave {
                path: out.to_string(),
                source: io,
            },
        })?;
        if !secondary_handles.is_empty() {
            coord.release_after_snapshot();
        }
        // Unpark — secondaries can resume normal execution. If we
        // parked successfully, this is essential: skipping it
        // leaves them stuck in WFI inside smpark_park_routine on
        // the resumed guest. If parking failed, this is a no-op
        // (the agent reports ok=false; we ignore the result).
        let total_us = t0.elapsed().as_micros();
        eprintln!("\n  snapshot ({reason}): total {total_us} us (capture+save streamed), RAM {} MiB (data {} MiB, zero {} MiB), mmio={} listeners={} -> {out}",
            write_stats.ram_bytes / (1024*1024),
            write_stats.ram_data_bytes / (1024*1024),
            write_stats.ram_zero_bytes / (1024*1024),
            virtio.mmio.len(), virtio.vsock_listeners.len());
        Ok(true)
    };
    let mut listener_quiesced = false;
    let mut pre_exec_quiesced = false;
    loop {
        if snapshot
            .stop_requested
            .is_some_and(|flag| flag.load(Ordering::SeqCst))
        {
            return Ok(DispatchExit::Stopped);
        }
        // Bake-then-pool readiness signal — fires the equivalent
        // of `take_snapshot` triggers (heartbeat / listener /
        // workload-parked / wall-clock) but returns a different
        // DispatchExit so the runner can hand control to the host
        // for warmup. Only one of `out_path` and
        // `bake_ready_signal` is set at any time.
        let bake_signal = snapshot.bake_ready_signal && snapshot.out_path.is_none();

        // Heartbeat-triggered snapshot fires at a known guest-init point,
        // not mid-handshake.
        if let Some(target) = snapshot.at_heartbeat {
            if HEARTBEAT_COUNT.load(Ordering::SeqCst) >= target {
                if !quiesced && snapshot.quiesce_ms > 0 {
                    eprintln!(
                        "  quiescing for {} ms before snapshot...",
                        snapshot.quiesce_ms
                    );
                    if let Some(exit) = quiesce_to_wfi(vcpu, bus, coord, snapshot.quiesce_ms)? {
                        return Ok(exit);
                    }
                    quiesced = true;
                }
                if bake_signal {
                    return Ok(DispatchExit::BakeReady);
                }
                if snapshot.out_path.is_some() && take_snapshot("heartbeat")? {
                    return Ok(DispatchExit::SnapshotSaved);
                }
            }
        }
        // Pre-exec trigger: init-oci just printed "workload-pre-exec"
        // and is now in a nanosleep — the vCPU is in clean WFI (no
        // in-flight syscall, no runnable user task, agent in epoll
        // wait). No quiesce needed — fire immediately. This fires
        // BEFORE on_listener even when both flags are set, because
        // pre-exec is earlier in the boot timeline. Saves ~50 ms vs
        // the on_listener / parked-PID-1 path on the common case.
        //
        // Cost on workloads where parked-PID-1 would have fired
        // first (alpine /bin/sh, rust:1-slim default CMD): pre-exec
        // fires ~5-10 ms BEFORE the workload would have parked, so
        // there's a tiny shift to slightly earlier capture. Net:
        // small win.
        //
        // Big win on workloads where listener-ready would have been
        // slow (JVM, heavy-import python, slow-binding services):
        // pre-exec fires ~150 ms after kernel boot regardless of
        // workload's startup time, vs waiting out the wall-clock
        // 7-second fallback. ~15× speedup measured on
        // python:slim with `python3 -c "time.sleep(60)"` (7700 ms
        // → 470 ms).
        let _ = &mut pre_exec_quiesced; // silence dead_code; may use later for multi-vCPU rendezvous
        if snapshot.on_pre_exec
            && crate::devices::serial::PRE_EXEC_READY
                .load(std::sync::atomic::Ordering::SeqCst)
        {
            if bake_signal {
                eprintln!("  init-oci pre-workload-exec; bake-ready");
                return Ok(DispatchExit::BakeReady);
            }
            if snapshot.out_path.is_some() && take_snapshot("pre-exec")? {
                return Ok(DispatchExit::SnapshotSaved);
            }
        }
        // OCI server readiness: once the guest has registered a TSI listener,
        // the customer process has completed bind/listen.
        if snapshot.on_listener {
            let listeners = vsock.muxer().listener_count();
            if listeners > 0 {
                if !listener_quiesced {
                    eprintln!("  listener readiness: {listeners} TSI listener(s)");
                    if snapshot.quiesce_ms > 0 {
                        eprintln!(
                            "  quiescing listener-ready guest for {} ms before snapshot...",
                            snapshot.quiesce_ms
                        );
                        if let Some(exit) = quiesce_to_wfi(vcpu, bus, coord, snapshot.quiesce_ms)? {
                            return Ok(exit);
                        }
                    }
                    listener_quiesced = true;
                }
                if bake_signal {
                    return Ok(DispatchExit::BakeReady);
                }
                if snapshot.out_path.is_some() && take_snapshot("listener-ready")? {
                    return Ok(DispatchExit::SnapshotSaved);
                }
            }
        }
        // Early non-service fallback: init-oci has logged
        // "parking PID 1" (workload forked + exited, pid 1 is now
        // idle). This fires within ~100 ms of the workload exit
        // for non-service images (rust:1-slim, python:slim,
        // bash) — much faster than waiting out the full
        // --snapshot-after-ms wall-clock fallback. Service images
        // (nginx, redis) never trip this — their workload doesn't
        // exit, so the listener-ready branch above wins.
        if snapshot.on_listener
            && vsock.muxer().listener_count() == 0
            && crate::devices::serial::WORKLOAD_PARKED
                .load(std::sync::atomic::Ordering::SeqCst)
        {
            if bake_signal {
                eprintln!("  init-oci parked PID 1; bake-ready");
                return Ok(DispatchExit::BakeReady);
            }
            if snapshot.out_path.is_some() {
                eprintln!(
                    "  init-oci parked PID 1 (no listener); capturing init-state snapshot"
                );
                if take_snapshot("workload-parked")? {
                    return Ok(DispatchExit::SnapshotSaved);
                }
            }
        }
        // Wall-clock fallback. Fires after `--snapshot-after-ms`
        // regardless of whether a listener appeared. Reached only
        // when neither the listener-ready nor workload-parked
        // branches fired — i.e. a service image whose listener
        // never bound (misconfigured nginx, etc.). We capture
        // whatever state we have so the user can investigate
        // rather than getting a hard error from the bake.
        if let Some(after_ms) = snapshot.after_ms {
            if dispatch_start.elapsed().as_millis() as u64 >= after_ms {
                if snapshot.on_listener && vsock.muxer().listener_count() == 0 {
                    eprintln!(
                        "  no listener after {after_ms} ms; falling back to wall-clock snapshot"
                    );
                }
                if bake_signal {
                    return Ok(DispatchExit::BakeReady);
                }
                if snapshot.out_path.is_some() && take_snapshot("wall-clock")? {
                    return Ok(DispatchExit::SnapshotSaved);
                }
            }
        }
        match vcpu_step(idx, vcpu, bus, coord)? {
            StepExit::Canceled => return Ok(DispatchExit::Canceled),
            StepExit::Continue => {}
            StepExit::Stop => return Ok(DispatchExit::Stopped),
            StepExit::SystemOff => return Ok(DispatchExit::SystemOff),
        }
    }
}

#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
pub(crate) fn quiesce_to_wfi(
    vcpu: &Vcpu,
    bus: &MmioBus,
    coord: &VcpuCoordinator,
    wait_ms: u64,
) -> Result<Option<DispatchExit>, WorkerError> {
    use applevisor_sys as av;

    vcpu.set_vtimer_mask(true)
        .map_err(WorkerError::VtimerMask)?;
    let h = vcpu.handle();
    let canceller = std::thread::Builder::new()
        .name("quiesce-canceller".into())
        .spawn(move || {
            std::thread::sleep(std::time::Duration::from_millis(wait_ms));
            // SAFETY: handle valid for caller's quiesce window.
            unsafe {
                let _ = av::hv_vcpus_exit(&h, 1);
            }
        })
        .map_err(WorkerError::QuiesceThreadSpawn)?;
    let result = loop {
        match vcpu_step(0, vcpu, bus, coord)? {
            StepExit::Canceled => break Ok(None),
            StepExit::Continue => {}
            StepExit::Stop => break Ok(None),
            StepExit::SystemOff => break Ok(Some(DispatchExit::SystemOff)),
        }
    };
    let _ = canceller.join();
    let unmask = vcpu.set_vtimer_mask(false).map_err(WorkerError::VtimerMask);
    match (result, unmask) {
        (Err(e), _) => Err(e),
        (Ok(_), Err(e)) => Err(e),
        (Ok(exit), Ok(())) => Ok(exit),
    }
}

#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn dispatch_vcpu_inner(
    idx: u32,
    vcpu: &Vcpu,
    bus: &Arc<MmioBus>,
    coord: &Arc<VcpuCoordinator>,
) -> Result<(), WorkerError> {
    use std::sync::atomic::Ordering;
    loop {
        // Multi-vCPU snapshot rendezvous. When vcpu0 fires the
        // snapshot trigger, it sets `snapshot_request` + forces
        // each secondary out of `hv_vcpu_run` via
        // `hv_vcpus_exit`. The exit surfaces as
        // `StepExit::Canceled`. We must distinguish that from a
        // real shutdown: if `snapshot_request` is set, loop
        // around and let `maybe_pause_for_snapshot` capture
        // state; otherwise it's a true cancellation and we exit.
        coord
            .maybe_pause_for_snapshot(idx, vcpu)
            .map_err(WorkerError::SnapshotCapture)?;
        // Multi-vCPU cycle-restore rendezvous. Same shape as the
        // snapshot one above, but inverted: vcpu0 (the runner
        // pool-mode driver) sets `restore_request` after remapping
        // RAM + restoring its own state, and we apply our target
        // per-vCPU state on this OWNING thread (HVF requires it).
        // Without this, restore_on_release(true) + multi-vCPU
        // leaves secondaries on a stale trajectory and the next
        // softirq/IPI path panics.
        coord
            .maybe_apply_restore(idx, vcpu)
            .map_err(WorkerError::SnapshotCapture)?;
        match vcpu_step(idx, vcpu, bus, coord)? {
            StepExit::Canceled => {
                if coord.snapshot_request.load(Ordering::Acquire)
                    || coord.restore_request.load(Ordering::Acquire)
                {
                    // Forced exit by snapshot/restore trigger; loop
                    // back so the rendezvous point handles it.
                    continue;
                }
                return Ok(());
            }
            StepExit::Continue => {}
            StepExit::Stop | StepExit::SystemOff => return Ok(()),
        }
    }
}

/// Run one vcpu_run iteration and handle the exit.
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn vcpu_step(
    idx: u32,
    vcpu: &Vcpu,
    bus: &MmioBus,
    coord: &VcpuCoordinator,
) -> Result<StepExit, WorkerError> {
    use crate::vmm::coord::*;
    use crate::vmm::exit_profile::{self, Stage};
    use applevisor_sys as av;

    let run_t0 = std::time::Instant::now();
    let exit = vcpu
        .run()
        .map_err(|source| WorkerError::VcpuRun { idx, source })?;
    exit_profile::record(Stage::VcpuRun, run_t0.elapsed().as_micros() as u64);
    let reason = crate::hvf::ExitReason::from(exit.reason as u32);
    let esr = exit.exception.syndrome;
    let gpa = exit.exception.physical_address;
    let ec = (esr >> 26) & 0x3f;
    match reason {
        crate::hvf::ExitReason::Exception => {
            if ec == 0x24 {
                exit_profile::record(Stage::DataAbort, 0);
                handle_data_abort_vcpu(vcpu, bus, esr, gpa);
            } else if ec == 0x16 {
                exit_profile::record(Stage::Hvc, 0);
                // HVC: PSCI dispatch.
                let fid = vcpu.get_reg(av::hv_reg_t::X0).unwrap_or(0) as u32;
                let ret: i64 = match fid {
                    PSCI_VERSION => 0x10000, // PSCI 1.0
                    PSCI_FEATURES => {
                        let q = vcpu.get_reg(av::hv_reg_t::X1).unwrap_or(0) as u32;
                        match q {
                            PSCI_VERSION | PSCI_CPU_ON | PSCI_CPU_OFF | PSCI_AFFINITY_INFO
                            | PSCI_FEATURES | PSCI_SYSTEM_OFF | PSCI_SYSTEM_RESET => 0,
                            _ => PSCI_NOT_SUPPORTED,
                        }
                    }
                    PSCI_CPU_ON => {
                        let target = vcpu.get_reg(av::hv_reg_t::X1).unwrap_or(0) as u32;
                        let entry = vcpu.get_reg(av::hv_reg_t::X2).unwrap_or(0);
                        let ctx_id = vcpu.get_reg(av::hv_reg_t::X3).unwrap_or(0);
                        eprintln!("  [vcpu-{idx}] PSCI CPU_ON target={target} entry=0x{entry:x}");
                        coord.cpu_on(target, entry, ctx_id)
                    }
                    PSCI_CPU_OFF => {
                        // The PSCI spec says CPU_OFF doesn't return; stop dispatching.
                        eprintln!("  [vcpu-{idx}] PSCI CPU_OFF");
                        return Ok(StepExit::Stop);
                    }
                    PSCI_AFFINITY_INFO => {
                        let target = vcpu.get_reg(av::hv_reg_t::X1).unwrap_or(0) as u32;
                        coord.affinity_info(target)
                    }
                    PSCI_SYSTEM_OFF | PSCI_SYSTEM_RESET => {
                        eprintln!("  [vcpu-{idx}] PSCI SYSTEM_OFF/RESET - exiting");
                        return Ok(StepExit::SystemOff);
                    }
                    _ => PSCI_NOT_SUPPORTED,
                };
                vcpu.set_reg(av::hv_reg_t::X0, ret as u64).ok();
            } else if ec == 0x18 {
                exit_profile::record(Stage::Svc, 0);
                let pc = vcpu.get_reg(av::hv_reg_t::PC).unwrap_or(0);
                let iss = esr & 0x01ff_ffff;
                let rt = ((iss >> 5) & 0x1f) as u32;
                let is_read = (iss & 1) != 0;
                if is_read && rt < 31 {
                    vcpu.set_x(rt, 0).ok();
                }
                vcpu.set_reg(av::hv_reg_t::PC, pc + 4).ok();
            } else if ec == 0x17 {
                let pc = vcpu.get_reg(av::hv_reg_t::PC).unwrap_or(0);
                vcpu.set_x(0, (-1i64) as u64).ok();
                vcpu.set_reg(av::hv_reg_t::PC, pc + 4).ok();
            } else {
                let pc = vcpu.get_reg(av::hv_reg_t::PC).unwrap_or(0);
                eprintln!("  [vcpu-{idx}] unhandled EC={ec:#x} ESR=0x{esr:x} PC=0x{pc:x}");
                return Ok(StepExit::Stop);
            }
        }
        crate::hvf::ExitReason::VTimerActivated => {
            exit_profile::record(Stage::Vtimer, 0);
        }
        crate::hvf::ExitReason::Canceled => {
            eprintln!("  [vcpu-{idx}] canceled");
            return Ok(StepExit::Canceled);
        }
        crate::hvf::ExitReason::Unknown(v) => {
            eprintln!("  [vcpu-{idx}] unknown exit {v}");
            return Ok(StepExit::Stop);
        }
    }
    Ok(StepExit::Continue)
}

/// ESR_EL2 ISS for "Data Abort, lower EL" (EC=0x24, IFSC=0x10).
/// Operates on any vCPU so secondary vCPU threads can dispatch their own MMIO
/// traps.
#[cfg(all(target_os = "macos", target_arch = "aarch64"))]
fn handle_data_abort_vcpu(vcpu: &Vcpu, bus: &MmioBus, esr: u64, gpa: u64) {
    use crate::vmm::exit_profile;
    use applevisor_sys as av;

    let pc = vcpu.get_reg(av::hv_reg_t::PC).unwrap_or(0);
    let far = gpa;
    let iss = esr & 0x01ff_ffff;
    let isv = ((iss >> 24) & 1) != 0;
    if !isv {
        vcpu.set_reg(av::hv_reg_t::PC, pc + 4).ok();
        return;
    }
    let sas = ((iss >> 22) & 0x3) as u8;
    let size: u8 = 1 << sas;
    let srt = ((iss >> 16) & 0x1f) as u32;
    let wnr = ((iss >> 6) & 1) != 0;

    if wnr {
        let val = vcpu.get_x(srt).unwrap_or(0);
        let t0 = std::time::Instant::now();
        let handled = bus.write(far, val, size);
        exit_profile::record(
            exit_profile::mmio_stage(far, true),
            t0.elapsed().as_micros() as u64,
        );
        if !handled && std::env::var("SUPERMACHINE_TRACE").is_ok() {
            eprintln!("MMIO W {far:#x} = {val:#x} sz={size} (unhandled)");
        }
    } else {
        let t0 = std::time::Instant::now();
        let val = bus.read(far, size);
        exit_profile::record(
            exit_profile::mmio_stage(far, false),
            t0.elapsed().as_micros() as u64,
        );
        if val.is_none() && std::env::var("SUPERMACHINE_TRACE").is_ok() {
            eprintln!("MMIO R {far:#x} sz={size} (unhandled, returning 0)");
        }
        let val = val.unwrap_or(0);
        if srt < 31 {
            vcpu.set_x(srt, val).ok();
        }
    }
    vcpu.set_reg(av::hv_reg_t::PC, pc + 4).ok();
}