supermachine 0.7.44

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
// Status: TX FIFO direct-to stdout-or-sink, RX byte-channel for
// host→guest control bytes (added 0.7.43 for the kernel-boot cache
// resume signal), line-aware heartbeat detection.
//
// PL011 register map (subset we implement):
//   0x000  UARTDR     data register; write = TX byte, read = RX byte
//                     (RX-side reads from the host-pushed queue;
//                      empty → 0)
//   0x018  UARTFR     flag register; bit 7 TXFE always 1, bit 4 RXFE
//                     reflects RX queue emptiness
//   0x024  UARTIBRD   integer baud divisor (write-ignored)
//   0x028  UARTFBRD   fractional baud divisor (write-ignored)
//   0x02c  UARTLCR_H  line control (write-ignored)
//   0x030  UARTCR     control (write-ignored)
//   0x038  UARTIMSC   interrupt mask (kernel sets bit 4 = RX IRQ
//                     enable; we honor it implicitly — we always
//                     raise SPI 33 on a push, kernel handles based
//                     on its own mask)
//   0x03c  UARTRIS    raw interrupt status; bit 4 = RXRIS, set
//                     while RX queue non-empty
//   0x040  UARTMIS    masked interrupt status (= RIS & IMSC)
//   0x044  UARTICR    interrupt clear (write-ignored; RXRIS auto-
//                     clears when the FIFO drains via UARTDR reads)
//   0xfe0  UARTPeriphID0 = 0x11 (PL011 magic)
//   0xfe4  UARTPeriphID1 = 0x10
//   0xfe8  UARTPeriphID2 = 0x14 (rev 4)
//   0xfec  UARTPeriphID3 = 0x00
//   0xff0  UARTPCellID0  = 0x0d
//   0xff4  UARTPCellID1  = 0xf0
//   0xff8  UARTPCellID2  = 0x05
//   0xffc  UARTPCellID3  = 0xb1
//
// IRQ semantics on supermachine (0.7.43+):
//
//   FDT declares the PL011 IRQ as IRQ_TYPE_LEVEL_HI. HVF's
//   `hv_gic_set_spi` is edge-triggered with auto-deassert on EOI
//   (see fdt.rs:181). We work around the mismatch by treating the
//   RX line as edge-pulse: on `push_rx_byte`, raise SPI 33; the
//   kernel's IRQ handler runs once, reads UARTDR, EOIs, and HVF
//   auto-deasserts. For a single-byte resume protocol this is
//   sufficient. Multi-byte continuous RX would need per-byte pulses
//   or a software RX-FIFO drain loop in the kernel-side ISR; we
//   don't need that for the kernel-boot-cache use case.

use std::collections::VecDeque;
use std::io::Write;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::{Mutex, OnceLock};

use super::mmio_bus::MmioDevice;

/// Host-pushed RX bytes. Drained by UARTDR reads. Empty by default;
/// only used for the kernel-boot cache resume signal (single byte).
static RX_QUEUE: Mutex<VecDeque<u8>> = Mutex::new(VecDeque::new());

/// UARTIMSC value as written by the guest. Bit 4 = RX IRQ enable;
/// bit 6 = RT IRQ (receive timeout); others unused by Linux's PL011
/// driver. Stored so reads of UARTMIS / UARTRIS reflect the mask.
static UART_IMSC: AtomicU32 = AtomicU32::new(0);

/// Optional callback to drive PL011's SPI line on the GIC. The
/// argument is the level: `true` asserts, `false` deasserts.
///
/// Wired by `vmm/builder.rs` to
/// `crate::hvf::gic_set_spi(SERIAL_IRQ, level)`. Both directions
/// matter: FDT declares PL011 as `IRQ_TYPE_LEVEL_HI`, so the GIC
/// continuously samples the input. If we assert without
/// deasserting after the kernel drains UARTDR, the GIC keeps
/// re-firing the IRQ, the kernel sees "nobody cared", and
/// disables the IRQ permanently after ~5 spurious calls.
///
/// Deassertion happens in the UARTDR read path: when a UARTDR
/// read empties the RX queue, we lower the line.
///
/// Left as `None` in unit tests + during snapshot capture (the RX
/// queue is required to be empty at snapshot time, so no IRQ is
/// in flight).
static IRQ_RAISER: OnceLock<Box<dyn Fn(bool) + Send + Sync>> = OnceLock::new();

/// Wire up the IRQ-driver callback. Called once at VMM construction.
/// Subsequent calls are no-ops (OnceLock semantics).
pub fn set_pl011_irq_raiser<F>(f: F)
where
    F: Fn(bool) + Send + Sync + 'static,
{
    let _ = IRQ_RAISER.set(Box::new(f));
}

/// Push a byte into the PL011 RX queue and raise the UART RX IRQ.
/// Used by the host for control signals (e.g. kernel-boot cache
/// resume byte). Returns the new queue depth.
///
/// CONTRACT: callers must not push while a snapshot is being
/// captured. The IRQ injection mutates GIC state; capturing
/// mid-injection would record a half-asserted line that
/// `gic_state_restore` may not reconstruct correctly. In practice,
/// the bake driver only pushes after `snapshot::save_compact_to_file`
/// returns; for the kernel-boot-cache flow this is automatic.
pub fn push_rx_byte(b: u8) -> usize {
    let depth = {
        let mut q = RX_QUEUE.lock().unwrap();
        q.push_back(b);
        q.len()
    };
    // Assert the IRQ line. The line stays high (LEVEL_HI per FDT)
    // until the kernel drains UARTDR and the queue empties — at
    // which point the UARTDR read path lowers it.
    if let Some(raiser) = IRQ_RAISER.get() {
        raiser(true);
    }
    depth
}

/// Drain the RX queue without raising the IRQ. Used by snapshot
/// capture to assert "no in-flight RX bytes" before saving state.
/// Returns the number of bytes drained (typically 0 — a non-zero
/// return signals a contract violation by some other caller).
pub fn drain_rx_queue_for_snapshot() -> usize {
    let mut q = RX_QUEUE.lock().unwrap();
    let n = q.len();
    q.clear();
    n
}

/// Process-global heartbeat count. Bumped each time a complete line
/// containing the literal "heartbeat counter=" arrives on PL011.
/// Used by `--snapshot-at N` (capture once HEARTBEAT_COUNT >= N) and
/// `--quiesce-ms` (post-heartbeat wait-for-WFI).
pub static HEARTBEAT_COUNT: AtomicU64 = AtomicU64::new(0);

/// Set when a complete line containing "parking PID 1" arrives on
/// PL011 — i.e. init-oci's workload exited and pid 1 is now idle.
/// The bake worker uses this as an "early fallback" snapshot trigger
/// for non-service images: instead of waiting out the full
/// `--snapshot-after-ms` timeout (default 7 s) when no listener
/// appears, snapshot as soon as init reports parking. Service images
/// (nginx, redis, etc.) never trip this — their workload doesn't exit.
pub static WORKLOAD_PARKED: AtomicBool = AtomicBool::new(false);

/// Set when a complete line containing "init-oci: workload-pre-exec"
/// arrives on PL011 — i.e. init-oci has finished its setup and is
/// about to fork+exec the workload (it then nanosleeps 100 ms to
/// give us a stable WFI window). Used as the bake-ready trigger for
/// the always-pipelined-skip-warm `.build()` path: capture the
/// guest BEFORE the workload runs, so each restore re-executes the
/// workload fresh (which is what agent-runtime users want). Saves
/// 50-150 ms of bake time vs waiting for the workload's listener.
///
/// Gated host-side by the bake driver's `on_pre_exec` flag — only
/// active for skip_warm_snapshot=true bakes. With_warmup + service-
/// image bakes leave `on_pre_exec=false` so this marker is detected
/// but ignored, preserving the listener-ready snapshot semantics.
pub static PRE_EXEC_READY: AtomicBool = AtomicBool::new(false);

/// Set when a complete line containing `[SUPERMACHINE-SMPARK]
/// state_gpa=0x<HEX>` arrives on PL011. init-oci prints this
/// after a successful `finit_module(smpark.ko)` call (if the
/// module exposes the `/sys/kernel/smpark/state_gpa` sysfs file).
/// The value is the guest-physical address of smpark.ko's
/// unpark-signal page — the host-direct cycle-2 UNPARK path
/// uses this to skip the agent vsock RPC entirely.
///
/// Zero = "not announced" (older smpark.ko without the sysfs
/// file, or single-vCPU bake where the module isn't loaded).
/// Host falls back to the agent vsock RPC path when zero.
pub static SMPARK_STATE_GPA: AtomicU64 = AtomicU64::new(0);

/// Optional sink: when set, PL011 bytes go here instead of stdout.
/// Wired by `--log-sink FILE` at startup.
pub static LOG_SINK: Mutex<Option<std::fs::File>> = Mutex::new(None);
static LOG_SINK_SET: AtomicBool = AtomicBool::new(false);
static HEARTBEAT_DETECTION_ENABLED: AtomicBool = AtomicBool::new(false);

/// Sliding buffer of the current PL011 output line, reset on '\n'.
static PL011_LINE: Mutex<Vec<u8>> = Mutex::new(Vec::new());

/// Marker substring we look for to bump HEARTBEAT_COUNT. The match is
/// `windows().any(...)` so it can sit anywhere in the line.
const HEARTBEAT_MARKER: &[u8] = b"heartbeat counter=";

/// Marker for [`WORKLOAD_PARKED`] — init-oci writes "parking PID 1
/// (exit=N)" when its forked workload exits and pid 1 goes idle.
const PARKED_MARKER: &[u8] = b"parking PID 1";

/// Marker for [`PRE_EXEC_READY`] — init-oci writes
/// "init-oci: workload-pre-exec" right before forking its workload
/// child (then nanosleeps 100 ms so the host can capture).
const PRE_EXEC_MARKER: &[u8] = b"init-oci: workload-pre-exec";

/// Prefix for [`SMPARK_STATE_GPA`] capture. Line shape (printed
/// by init-oci after loading smpark.ko):
///   `[SUPERMACHINE-SMPARK] state_gpa=0x12345678abcdef00`
/// We parse the hex value after the `=`.
const SMPARK_GPA_PREFIX: &[u8] = b"[SUPERMACHINE-SMPARK] state_gpa=0x";

/// 0.7.43+ kernel-boot cache marker. init-oci writes this line
/// right after smpark.ko load + before `try_pivot_to_overlay()`,
/// then blocks in `read(0, ...)`. The host:
///   * Cache-WRITE bake: snapshots to the kernel-boot-cache file,
///     then pushes 'R\n' resume token to PL011 RX.
///   * Cache-READ bake: restores from cache → init-oci is already
///     paused-in-read() from the cached snapshot → host pushes
///     'R\n' after restore completes.
///
/// Marker line shape:
///   `[SUPERMACHINE-KCACHE] ready`
///
/// Detected by `KCACHE_READY` atomic which the bake driver polls.
/// This separates the marker DETECTION (here in the PL011 thread,
/// must be lightweight) from the marker ACTION (snapshot + push,
/// done on the bake driver thread). Without the split, the action
/// would block the PL011 write path and the guest's UART TX would
/// stall.
const KCACHE_READY_MARKER: &[u8] = b"[SUPERMACHINE-KCACHE] ready";

/// Set when the [`KCACHE_READY_MARKER`] line completes on PL011.
/// The bake driver polls this atomic during the cache-write flow:
/// when it flips to true, send `SNAPSHOT_ASYNC` for the cache
/// path, then push 'R\n' to resume init-oci.
/// One-shot per bake — reset to false at worker spawn.
pub static KCACHE_READY: AtomicBool = AtomicBool::new(false);

/// Set when init-oci writes the "resumed token=…" line, signaling
/// it received the kcache resume token and moved past the pause.
/// The kcache-HIT retry loop (`bin/worker.rs::supermachine-kcache-
/// resume`) polls this and stops re-pushing once it flips, so we
/// don't pile up extra bytes in the TTY queue that the post-pivot
/// workload might inadvertently consume.
pub static KCACHE_RESUMED: AtomicBool = AtomicBool::new(false);

const KCACHE_RESUMED_MARKER: &[u8] = b"[SUPERMACHINE-KCACHE] resumed";

/// 0.7.44+ pre-exec sync marker. init-oci writes this line right
/// before blocking on `read(0, ...)` at the pre-exec pause point
/// (post-pivot, immediately before forking the workload). The host:
///   * Bake (pre_exec_sync mode): after `take_snapshot("pre-exec")`
///     completes, pushes 'P\n' resume token via PL011 RX so init-oci
///     unblocks and forks the workload.
///   * Restore from a pre_exec_sync snapshot: the snapshot captured
///     the guest paused inside `read(0)`; the worker's
///     `supermachine-pre-exec-resume` thread pushes 'P\n' after
///     restore completes.
///
/// Marker line shape:
///   `[SUPERMACHINE-PRE-EXEC] ready`
///
/// Why a new marker class instead of reusing PRE_EXEC_READY:
/// PRE_EXEC_READY fires on the existing "init-oci: workload-pre-exec"
/// line which is printed BEFORE the new pause. The bake driver needs
/// to know when to push the unblock byte; using PRE_EXEC_READY for
/// both "snapshot trigger" and "push timing" conflates two signals
/// (the snapshot fires immediately, the push must wait until the
/// snapshot save returns).
const PRE_EXEC_SYNC_READY_MARKER: &[u8] = b"[SUPERMACHINE-PRE-EXEC] ready";

/// Set when [`PRE_EXEC_SYNC_READY_MARKER`] completes on PL011.
/// Bake driver and worker.rs's resume thread both watch this to
/// know init-oci has reached the pause point and is now blocked on
/// `read(0)` waiting for the unblock byte.
/// One-shot per bake — reset to false at worker spawn.
pub static PRE_EXEC_SYNC_READY: AtomicBool = AtomicBool::new(false);

/// Set when init-oci writes the pre-exec-sync resumed line after
/// receiving the unblock byte. The restore-time resume thread
/// (`bin/worker.rs::supermachine-pre-exec-resume`) polls this and
/// stops re-pushing once it flips, so we don't pile bytes into the
/// TTY queue that the forked workload might inadvertently consume.
pub static PRE_EXEC_SYNC_RESUMED: AtomicBool = AtomicBool::new(false);

const PRE_EXEC_SYNC_RESUMED_MARKER: &[u8] = b"[SUPERMACHINE-PRE-EXEC] resumed";

fn console_log_enabled() -> bool {
    static ENABLED: OnceLock<bool> = OnceLock::new();
    *ENABLED.get_or_init(|| {
        !matches!(
            std::env::var("SUPERMACHINE_CONSOLE_LOG").as_deref(),
            Ok("0") | Ok("false") | Ok("no") | Ok("off")
        )
    })
}

pub fn set_log_sink(path: &str) -> std::io::Result<()> {
    let f = std::fs::OpenOptions::new()
        .create(true)
        .append(true)
        .open(path)?;
    *LOG_SINK.lock().unwrap() = Some(f);
    LOG_SINK_SET.store(true, Ordering::Release);
    Ok(())
}

pub fn set_heartbeat_detection(enabled: bool) {
    HEARTBEAT_DETECTION_ENABLED.store(enabled, Ordering::Release);
}

pub struct SerialPl011;

impl SerialPl011 {
    pub fn new() -> Self {
        Self
    }
}

impl MmioDevice for SerialPl011 {
    fn read(&self, offset: u64, _size: u8) -> u64 {
        match offset {
            0x000 => {
                // UARTDR — pop the next byte off the RX queue, or 0
                // when empty. The PL011 spec puts the byte in the
                // low 8 bits and flag bits (FE/PE/BE/OE) in 8-11;
                // we always return clean bytes (no framing errors).
                //
                // If this read drains the queue, lower the IRQ line
                // so the GIC stops sampling it as HIGH (LEVEL_HI in
                // FDT). Without this, the kernel disables IRQ 13
                // after ~5 spurious calls and subsequent RX bytes
                // can't be delivered.
                let (byte, now_empty) = {
                    let mut q = RX_QUEUE.lock().unwrap();
                    let b = q.pop_front().unwrap_or(0);
                    (b, q.is_empty())
                };
                if now_empty {
                    if let Some(raiser) = IRQ_RAISER.get() {
                        raiser(false);
                    }
                }
                byte as u64
            }
            0x018 => {
                // UARTFR — TXFE always 1 (TX FIFO empty/ready),
                // RXFE reflects RX queue empty state.
                let rxfe = if RX_QUEUE.lock().unwrap().is_empty() {
                    1u64 << 4
                } else {
                    0
                };
                (1u64 << 7) | rxfe
            }
            0x038 => UART_IMSC.load(Ordering::Acquire) as u64,
            0x03c => {
                // UARTRIS — raw interrupt status. Bit 4 (RXRIS) is
                // set while the RX FIFO has at least one byte. All
                // other RIS bits stay 0 (we don't implement TX
                // IRQ, modem status, etc).
                if RX_QUEUE.lock().unwrap().is_empty() {
                    0
                } else {
                    1u64 << 4
                }
            }
            0x040 => {
                // UARTMIS — UARTRIS & UARTIMSC.
                let ris = if RX_QUEUE.lock().unwrap().is_empty() {
                    0u32
                } else {
                    1u32 << 4
                };
                (ris & UART_IMSC.load(Ordering::Acquire)) as u64
            }
            0xfe0 => 0x11,
            0xfe4 => 0x10,
            0xfe8 => 0x14,
            0xfec => 0x00,
            0xff0 => 0x0d,
            0xff4 => 0xf0,
            0xff8 => 0x05,
            0xffc => 0xb1,
            _ => 0,
        }
    }

    fn write(&self, offset: u64, value: u64, _size: u8) {
        if offset == 0x038 {
            // UARTIMSC — kernel sets/clears IRQ enable bits. We
            // store the full word but only RX bits (4 = RX, 6 = RT)
            // affect us. Storing the rest is harmless and lets the
            // kernel read back what it wrote.
            UART_IMSC.store(value as u32, Ordering::Release);
            return;
        }
        if offset == 0x044 {
            // UARTICR — write-only "clear" register. Bit 4 clears
            // RXRIS, but RXRIS is derived from queue state, not
            // stored. Real PL011s would also clear RT (timeout) on
            // ICR write; we don't implement RT either. So this is
            // a true no-op for us — the kernel writes it after
            // UARTDR drains to formally ack, which is fine.
            return;
        }
        if offset != 0x000 {
            return;
        }
        let b = (value & 0xff) as u8;

        if !console_log_enabled()
            && !LOG_SINK_SET.load(Ordering::Acquire)
            && !HEARTBEAT_DETECTION_ENABLED.load(Ordering::Acquire)
        {
            return;
        }

        // Route output: log sink wins if set, else stdout. Locking
        // the sink mutex per byte is cheap (single-vCPU contention is
        // none; multi-vCPU contention is rare since the kernel
        // serializes PL011 writes via a port spinlock anyway).
        {
            let mut sink = LOG_SINK.lock().unwrap();
            if let Some(f) = sink.as_mut() {
                let _ = f.write_all(&[b]);
            } else if console_log_enabled() {
                drop(sink);
                let mut out = std::io::stdout().lock();
                let _ = out.write_all(&[b]);
                let _ = out.flush();
            }
        }

        if !HEARTBEAT_DETECTION_ENABLED.load(Ordering::Acquire) {
            return;
        }

        // Line-aware marker detection (heartbeat + workload-parked).
        let mut line = PL011_LINE.lock().unwrap();
        if b == b'\n' {
            if line
                .windows(HEARTBEAT_MARKER.len())
                .any(|w| w == HEARTBEAT_MARKER)
            {
                HEARTBEAT_COUNT.fetch_add(1, Ordering::SeqCst);
            }
            if line
                .windows(PARKED_MARKER.len())
                .any(|w| w == PARKED_MARKER)
            {
                WORKLOAD_PARKED.store(true, Ordering::SeqCst);
            }
            if line
                .windows(PRE_EXEC_MARKER.len())
                .any(|w| w == PRE_EXEC_MARKER)
            {
                PRE_EXEC_READY.store(true, Ordering::SeqCst);
            }
            // Capture the smpark unpark-signal GPA. The line format
            // is one-shot — init-oci prints once per boot, right
            // after `finit_module(smpark.ko)`. Survives snapshot/
            // restore because the saved-state inherits this atomic.
            // 0.7.43+ kernel-boot cache marker detection. Flip the
            // atomic so the `pool-kcache-marker-watcher` thread
            // (in vmm/pool.rs) emits `MARKER_KCACHE_READY` on the
            // supervisor socket for the bake driver to react to.
            //
            // FALLBACK auto-push: if no bake driver is paying
            // attention to MARKER_KCACHE_READY (i.e. the kcache
            // orchestration isn't wired yet), we also push the
            // resume token from here so bakes with `kcache=1`
            // don't hang for 30 s waiting on init-oci's read
            // timeout. Once `bake.rs` learns to take a cache
            // snapshot in response to MARKER_KCACHE_READY and push
            // PUSH_RX 52 + PUSH_RX 0a itself, drop this fallback
            // (the bake driver needs to control the push timing
            // so the snapshot captures init-oci paused-in-read,
            // not post-resume).
            //
            // The fallback is gated on
            // `SUPERMACHINE_KCACHE_AUTO_PUSH != 0`; the bake
            // driver sets `SUPERMACHINE_KCACHE_AUTO_PUSH=0` in
            // the worker env when it intends to handle the
            // marker itself.
            if line
                .windows(KCACHE_READY_MARKER.len())
                .any(|w| w == KCACHE_READY_MARKER)
            {
                KCACHE_READY.store(true, Ordering::SeqCst);
                let auto_push_disabled = std::env::var(
                    "SUPERMACHINE_KCACHE_AUTO_PUSH",
                )
                .map(|v| v == "0" || v == "false")
                .unwrap_or(false);
                if !auto_push_disabled {
                    push_rx_byte(b'R');
                    push_rx_byte(b'\n');
                }
            }
            if line
                .windows(KCACHE_RESUMED_MARKER.len())
                .any(|w| w == KCACHE_RESUMED_MARKER)
            {
                KCACHE_RESUMED.store(true, Ordering::SeqCst);
            }
            // 0.7.44+ pre-exec sync marker. init-oci writes
            // "[SUPERMACHINE-PRE-EXEC] ready" right before blocking
            // on read(0). Set the atomic so the bake driver / restore
            // resume thread knows the guest is now paused-in-read.
            //
            // NO fallback auto-push here. Unlike kcache (which has
            // a fallback push for the "no bake driver wired" case),
            // pre-exec-sync is a new opt-in path: the bake driver
            // always wires the push via the on_pre_exec snapshot
            // dispatch, and the restore path always wires it via
            // the worker's resume thread. If you see init-oci hang
            // for 30 s on the pre-exec read, the orchestration is
            // broken — silently auto-pushing here would hide the
            // bug.
            if line
                .windows(PRE_EXEC_SYNC_READY_MARKER.len())
                .any(|w| w == PRE_EXEC_SYNC_READY_MARKER)
            {
                PRE_EXEC_SYNC_READY.store(true, Ordering::SeqCst);
            }
            if line
                .windows(PRE_EXEC_SYNC_RESUMED_MARKER.len())
                .any(|w| w == PRE_EXEC_SYNC_RESUMED_MARKER)
            {
                PRE_EXEC_SYNC_RESUMED.store(true, Ordering::SeqCst);
            }
            if let Some(pos) = line
                .windows(SMPARK_GPA_PREFIX.len())
                .position(|w| w == SMPARK_GPA_PREFIX)
            {
                let hex_start = pos + SMPARK_GPA_PREFIX.len();
                let mut acc: u64 = 0;
                let mut any = false;
                // u64 = 16 hex chars max; stop early on first
                // non-hex byte so trailing whitespace / newlines
                // don't pollute the accumulator.
                for &b in line[hex_start..].iter().take(16) {
                    let nib = match b {
                        b'0'..=b'9' => b - b'0',
                        b'a'..=b'f' => b - b'a' + 10,
                        b'A'..=b'F' => b - b'A' + 10,
                        _ => break,
                    };
                    acc = (acc << 4) | nib as u64;
                    any = true;
                }
                if any {
                    SMPARK_STATE_GPA.store(acc, Ordering::SeqCst);
                }
            }
            line.clear();
        } else if line.len() < 4096 {
            line.push(b);
        }
    }

    fn len(&self) -> u64 {
        super::super::arch::aarch64::layout::SERIAL_MMIO_SIZE
    }
}

#[cfg(test)]
mod rx_tests {
    //! Unit tests for the 0.7.43+ PL011 RX path. Validates the MMIO
    //! emulation in isolation (no HVF, no IRQ injection — those
    //! require a real guest).
    //!
    //! RX_QUEUE / UART_IMSC are process-global statics, so tests
    //! within this module must serialize against each other (cargo
    //! test runs unit tests in parallel by default). We use a
    //! per-module mutex held for the duration of each test.
    use super::*;
    use std::sync::Mutex as StdMutex;

    static SERIALIZE: StdMutex<()> = StdMutex::new(());

    fn fresh_state() -> std::sync::MutexGuard<'static, ()> {
        // PoisonError on a panicking peer test is fine — we just
        // need the serialization, not the protected data.
        let guard = SERIALIZE
            .lock()
            .unwrap_or_else(|poisoned| poisoned.into_inner());
        drain_rx_queue_for_snapshot();
        UART_IMSC.store(0, Ordering::SeqCst);
        guard
    }

    #[test]
    fn empty_queue_reads_zero_with_rxfe_set() {
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        // UARTDR (0x000) — pop empty queue → 0.
        assert_eq!(dev.read(0x000, 4), 0);
        // UARTFR (0x018) — TXFE | RXFE = 0x90.
        assert_eq!(dev.read(0x018, 4), (1u64 << 7) | (1u64 << 4));
        // UARTRIS (0x03c) — no RXRIS when queue empty.
        assert_eq!(dev.read(0x03c, 4), 0);
    }

    #[test]
    fn push_then_read_round_trip() {
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        push_rx_byte(b'X');
        // RXFE clears.
        assert_eq!(dev.read(0x018, 4) & (1 << 4), 0);
        // RXRIS asserts.
        assert_eq!(dev.read(0x03c, 4), 1 << 4);
        // UARTDR pop returns the byte.
        assert_eq!(dev.read(0x000, 4), b'X' as u64);
        // RXFE set again.
        assert_ne!(dev.read(0x018, 4) & (1 << 4), 0);
        // RXRIS deasserts.
        assert_eq!(dev.read(0x03c, 4), 0);
    }

    #[test]
    fn mis_masks_with_imsc() {
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        push_rx_byte(b'A');
        // With IMSC=0, MIS bit 4 is masked off.
        assert_eq!(dev.read(0x040, 4), 0);
        // Kernel sets IMSC bit 4 (RX IRQ enable).
        dev.write(0x038, 1 << 4, 4);
        assert_eq!(UART_IMSC.load(Ordering::SeqCst), 1 << 4);
        // Now MIS reflects RIS & IMSC = bit 4.
        assert_eq!(dev.read(0x040, 4), 1 << 4);
        // Read UARTDR drains.
        assert_eq!(dev.read(0x000, 4), b'A' as u64);
        // MIS clears with the queue drain.
        assert_eq!(dev.read(0x040, 4), 0);
    }

    #[test]
    fn icr_write_is_noop() {
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        push_rx_byte(b'B');
        // RXRIS is set.
        assert_eq!(dev.read(0x03c, 4), 1 << 4);
        // ICR write doesn't drain (RXRIS is derived from queue state,
        // not stored; only UARTDR reads pop the queue).
        dev.write(0x044, 1 << 4, 4);
        assert_eq!(dev.read(0x03c, 4), 1 << 4);
        // And the byte is still available.
        assert_eq!(dev.read(0x000, 4), b'B' as u64);
    }

    #[test]
    fn fifo_order_preserved() {
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        push_rx_byte(b'a');
        push_rx_byte(b'b');
        push_rx_byte(b'c');
        assert_eq!(dev.read(0x000, 4), b'a' as u64);
        assert_eq!(dev.read(0x000, 4), b'b' as u64);
        assert_eq!(dev.read(0x000, 4), b'c' as u64);
        // Queue empty after drain.
        assert_eq!(dev.read(0x000, 4), 0);
        assert_ne!(dev.read(0x018, 4) & (1 << 4), 0);
    }

    #[test]
    fn drain_for_snapshot_returns_count() {
        let _guard = fresh_state();
        push_rx_byte(b'1');
        push_rx_byte(b'2');
        assert_eq!(drain_rx_queue_for_snapshot(), 2);
        // Idempotent — second drain is 0.
        assert_eq!(drain_rx_queue_for_snapshot(), 0);
    }

    #[test]
    fn periph_id_registers_unchanged() {
        // 0.7.43 changes must not have regressed the existing
        // PL011 / PrimeCell magic; the kernel driver probes these
        // and refuses the device if they don't match.
        let _guard = fresh_state();
        let dev = SerialPl011::new();
        assert_eq!(dev.read(0xfe0, 4), 0x11);
        assert_eq!(dev.read(0xfe4, 4), 0x10);
        assert_eq!(dev.read(0xfe8, 4), 0x14);
        assert_eq!(dev.read(0xfec, 4), 0x00);
        assert_eq!(dev.read(0xff0, 4), 0x0d);
        assert_eq!(dev.read(0xff4, 4), 0xf0);
        assert_eq!(dev.read(0xff8, 4), 0x05);
        assert_eq!(dev.read(0xffc, 4), 0xb1);
    }
}