zlayer-agent 0.11.11

Container runtime agent using libcontainer/youki
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
//! macOS libkrun VM runtime
//!
//! Implements the [`Runtime`] trait using libkrun microVMs for full Linux
//! container compatibility with GPU forwarding on macOS Apple Silicon.
//!
//! This module is only compiled on macOS targets (`#[cfg(target_os = "macos")]`).
//! It provides hardware-level VM isolation using Apple's Hypervisor.framework
//! through the libkrun library, loaded dynamically at runtime via `dlopen`.
//!
//! ## Architecture
//!
//! Each "container" is a lightweight microVM running a minimal Linux kernel:
//!
//! ```text
//! macOS Host
//!   |
//!   +-- ZLayer VmRuntime
//!        |
//!        +-- libkrun microVM (per container)
//!             |
//!             +-- Linux kernel (built into libkrun)
//!             +-- Container rootfs (extracted OCI image)
//!             +-- GPU: Venus-Vulkan (~30%) or ggml API remoting (~97%)
//!             +-- Networking: TSI (Transparent Socket Impersonation)
//! ```
//!
//! ## Key Characteristics
//!
//! - **Boot time**: 100-300ms per VM
//! - **Memory overhead**: ~100MB per VM (Linux kernel + VM infrastructure)
//! - **CPU overhead**: Near-native (hardware virtualization via Hypervisor.framework)
//! - **GPU**: Venus-Vulkan protocol (~30% native) or ggml API remoting (~97% native)
//! - **Networking**: TSI maps guest sockets through the host process
//! - **Compatibility**: Full Linux OCI containers (unlike `SandboxRuntime` which needs macOS-native binaries)
//!
//! ## Limitations
//!
//! - **One GPU container at a time**: Metal GPU context cannot be shared across VMs
//! - **Requires Hypervisor.framework**: Works on Intel and Apple Silicon Macs; GPU forwarding (Metal/Venus-Vulkan) requires Apple Silicon
//! - **No exec support**: Would require vsock or SSH agent inside the VM
//! - **vCPUs must not exceed host cores**: libkrun silently hangs otherwise
//! - **No graceful shutdown API**: VM exits when entrypoint process exits
//!
//! ## Directory Layout
//!
//! ```text
//! {data_dir}/
//!   images/
//!     {sanitized_image_name}/
//!       rootfs/           -- extracted OCI image layers
//!   vms/
//!     {service}-{replica}/
//!       rootfs/           -- copy of base image rootfs (no APFS clone needed for VMs)
//!       config.json       -- serialized ServiceSpec
//!       console.log       -- VM console output
//! ```

use crate::cgroups_stats::ContainerStats;
use crate::error::{AgentError, Result};
use crate::runtime::{ContainerId, ContainerState, Runtime};
use std::collections::HashMap;
use std::ffi::CString;
use std::net::IpAddr;
use std::os::raw::{c_char, c_int, c_uint};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use zlayer_observability::logs::{LogEntry, LogSource, LogStream};
use zlayer_spec::{RegistryAuth, ServiceSpec};

// ---------------------------------------------------------------------------
// libkrun FFI types (loaded via dlopen)
// ---------------------------------------------------------------------------

/// Function signature: `int krun_set_log_level(uint32_t level)`
type KrunSetLogLevel = unsafe extern "C" fn(level: c_uint) -> c_int;

/// Function signature: `int32_t krun_create_ctx()`
/// Returns a context ID (>= 0) on success, or -1 on error.
type KrunCreateCtx = unsafe extern "C" fn() -> c_int;

/// Function signature: `int32_t krun_free_ctx(uint32_t ctx_id)`
type KrunFreeCtx = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;

/// Function signature: `int32_t krun_set_vm_config(uint32_t ctx_id, uint8_t num_vcpus, uint32_t ram_mib)`
type KrunSetVmConfig =
    unsafe extern "C" fn(ctx_id: c_uint, num_vcpus: c_uint, ram_mib: c_uint) -> c_int;

/// Function signature: `int32_t krun_set_root(uint32_t ctx_id, const char *root_path)`
type KrunSetRoot = unsafe extern "C" fn(ctx_id: c_uint, root_path: *const c_char) -> c_int;

/// Function signature: `int32_t krun_set_workdir(uint32_t ctx_id, const char *workdir_path)`
type KrunSetWorkdir = unsafe extern "C" fn(ctx_id: c_uint, workdir_path: *const c_char) -> c_int;

/// Function signature: `int32_t krun_set_exec(uint32_t ctx_id, const char *exec_path, char *const argv[], char *const envp[])`
type KrunSetExec = unsafe extern "C" fn(
    ctx_id: c_uint,
    exec_path: *const c_char,
    argv: *const *const c_char,
    envp: *const *const c_char,
) -> c_int;

/// Function signature: `int32_t krun_set_gpu_options2(uint32_t ctx_id, uint32_t virgl_flags)`
type KrunSetGpuOptions2 = unsafe extern "C" fn(ctx_id: c_uint, virgl_flags: c_uint) -> c_int;

/// Function signature: `int32_t krun_set_tsi(uint32_t ctx_id)`
/// Enables Transparent Socket Impersonation networking.
type KrunSetTsi = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;

/// Function signature: `int32_t krun_start_enter(uint32_t ctx_id)`
/// BLOCKS the calling thread until the VM exits.
type KrunStartEnter = unsafe extern "C" fn(ctx_id: c_uint) -> c_int;

// ---------------------------------------------------------------------------
// Dynamically loaded libkrun library
// ---------------------------------------------------------------------------

/// Wrapper around a dynamically loaded `libkrun.dylib`.
///
/// All function pointers are resolved once at construction time via `dlsym`.
/// The underlying `libloading::Library` is kept alive for the lifetime of
/// this struct to keep the loaded symbols valid.
struct LibKrun {
    /// The loaded dynamic library handle. Must remain alive as long as the
    /// function pointers are in use.
    _lib: libloading::Library,
    set_log_level: KrunSetLogLevel,
    create_ctx: KrunCreateCtx,
    free_ctx: KrunFreeCtx,
    set_vm_config: KrunSetVmConfig,
    set_root: KrunSetRoot,
    set_workdir: KrunSetWorkdir,
    set_exec: KrunSetExec,
    set_gpu_options2: KrunSetGpuOptions2,
    set_tsi: KrunSetTsi,
    start_enter: KrunStartEnter,
}

// SAFETY: LibKrun holds function pointers from a dynamically loaded library.
// The library handle (_lib) ensures the pointers remain valid. The function
// pointers themselves are safe to share across threads because they point to
// code (not mutable data). libkrun's API is documented as thread-safe for
// distinct context IDs.
#[allow(unsafe_code)]
unsafe impl Send for LibKrun {}
#[allow(unsafe_code)]
unsafe impl Sync for LibKrun {}

impl LibKrun {
    /// Load libkrun dynamically from standard macOS library paths.
    ///
    /// Searches the following locations in order:
    /// 1. `libkrun.dylib` (system library path / `DYLD_LIBRARY_PATH`)
    /// 2. `/usr/local/lib/libkrun.dylib` (Intel Homebrew)
    /// 3. `/opt/homebrew/lib/libkrun.dylib` (Apple Silicon Homebrew)
    ///
    /// # Errors
    ///
    /// Returns `AgentError::Configuration` if libkrun cannot be found or if
    /// any required symbol is missing from the library.
    #[allow(unsafe_code)]
    fn load() -> Result<Self> {
        let lib_paths = [
            "libkrun.dylib",
            "/usr/local/lib/libkrun.dylib",
            "/opt/homebrew/lib/libkrun.dylib",
        ];

        let lib = lib_paths
            .iter()
            .find_map(|path| unsafe { libloading::Library::new(*path).ok() })
            .ok_or_else(|| {
                AgentError::Configuration(
                    "libkrun not found. Install via: brew tap slp/krunkit && brew install krunkit\n\
                     Searched: libkrun.dylib, /usr/local/lib/libkrun.dylib, /opt/homebrew/lib/libkrun.dylib"
                        .to_string(),
                )
            })?;

        // Load all required symbols. Each `lib.get()` call returns a reference
        // to the symbol, and we dereference it to get the function pointer.
        unsafe {
            let set_log_level: KrunSetLogLevel =
                *lib.get(b"krun_set_log_level\0").map_err(|e| {
                    AgentError::Configuration(format!("libkrun missing krun_set_log_level: {e}"))
                })?;
            let create_ctx: KrunCreateCtx = *lib.get(b"krun_create_ctx\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_create_ctx: {e}"))
            })?;
            let free_ctx: KrunFreeCtx = *lib.get(b"krun_free_ctx\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_free_ctx: {e}"))
            })?;
            let set_vm_config: KrunSetVmConfig =
                *lib.get(b"krun_set_vm_config\0").map_err(|e| {
                    AgentError::Configuration(format!("libkrun missing krun_set_vm_config: {e}"))
                })?;
            let set_root: KrunSetRoot = *lib.get(b"krun_set_root\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_set_root: {e}"))
            })?;
            let set_workdir: KrunSetWorkdir = *lib.get(b"krun_set_workdir\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_set_workdir: {e}"))
            })?;
            let set_exec: KrunSetExec = *lib.get(b"krun_set_exec\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_set_exec: {e}"))
            })?;
            let set_gpu_options2: KrunSetGpuOptions2 =
                *lib.get(b"krun_set_gpu_options2\0").map_err(|e| {
                    AgentError::Configuration(format!("libkrun missing krun_set_gpu_options2: {e}"))
                })?;
            let set_tsi: KrunSetTsi = *lib.get(b"krun_set_tsi\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_set_tsi: {e}"))
            })?;
            let start_enter: KrunStartEnter = *lib.get(b"krun_start_enter\0").map_err(|e| {
                AgentError::Configuration(format!("libkrun missing krun_start_enter: {e}"))
            })?;

            Ok(Self {
                _lib: lib,
                set_log_level,
                create_ctx,
                free_ctx,
                set_vm_config,
                set_root,
                set_workdir,
                set_exec,
                set_gpu_options2,
                set_tsi,
                start_enter,
            })
        }
    }
}

// ---------------------------------------------------------------------------
// VM container state
// ---------------------------------------------------------------------------

/// Metadata for a running libkrun microVM.
struct VmContainer {
    /// Current container state.
    state: ContainerState,
    /// libkrun context ID (returned by `krun_create_ctx`).
    ctx_id: Option<u32>,
    /// Path to the VM's state directory.
    state_dir: PathBuf,
    /// Path to the rootfs provided to the VM.
    rootfs_dir: PathBuf,
    /// Path to the console log file.
    log_file: PathBuf,
    /// When the VM was started.
    started_at: Option<Instant>,
    /// The original service spec.
    spec: ServiceSpec,
    /// Thread handle for the VM. `krun_start_enter()` blocks the calling
    /// thread, so we spawn it in `std::thread::spawn` (NOT `tokio::spawn`,
    /// as it would block the async runtime).
    vm_thread: Option<std::thread::JoinHandle<i32>>,
    /// Whether this VM has GPU enabled (for enforcing one-GPU-at-a-time).
    gpu_enabled: bool,
    /// Memory allocated to this VM in MiB (for stats reporting).
    ram_mib: u32,
    /// Number of vCPUs allocated to this VM.
    vcpus: u32,
}

// ---------------------------------------------------------------------------
// VmRuntime
// ---------------------------------------------------------------------------

/// macOS libkrun VM-based container runtime.
///
/// Uses libkrun (dynamically loaded) to create lightweight microVMs via
/// Apple's Hypervisor.framework. Each container runs as a real Linux VM
/// with its own kernel, providing full OCI Linux container compatibility
/// on macOS.
///
/// ## GPU Access
///
/// libkrun supports GPU forwarding via Venus-Vulkan protocol and ggml API
/// remoting. Only **one GPU-enabled VM** can run at a time per host due to
/// Metal GPU context limitations. This constraint is enforced by the runtime.
///
/// ## Networking
///
/// TSI (Transparent Socket Impersonation) maps guest socket operations
/// through the host process. Guest `bind()` appears on the host, guest
/// `connect()` goes through the host. No virtual NIC or IP configuration
/// is needed. All containers appear as `127.0.0.1` from the host's perspective.
pub struct VmRuntime {
    /// Dynamically loaded libkrun API. `None` if libkrun is not installed.
    api: Option<Arc<LibKrun>>,
    /// Base directory for VM state and images.
    data_dir: PathBuf,
    /// Directory for VM logs.
    log_dir: PathBuf,
    /// Active VM containers keyed by directory name (e.g., "myservice-1").
    containers: Arc<RwLock<HashMap<String, VmContainer>>>,
    /// Pulled image rootfs paths keyed by sanitized image name.
    image_rootfs: Arc<RwLock<HashMap<String, PathBuf>>>,
    /// Whether a GPU-enabled VM is currently running. Only one is allowed
    /// at a time due to Metal GPU context limitations in libkrun.
    gpu_in_use: Arc<AtomicBool>,
}

impl std::fmt::Debug for VmRuntime {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("VmRuntime")
            .field("data_dir", &self.data_dir)
            .field("log_dir", &self.log_dir)
            .field("libkrun_loaded", &self.api.is_some())
            .finish_non_exhaustive()
    }
}

impl VmRuntime {
    /// Create a new VM runtime.
    ///
    /// Attempts to dynamically load `libkrun.dylib`. If libkrun is not
    /// installed, the runtime is created in a degraded state where
    /// `start_container` will return an error with installation instructions.
    ///
    /// Creates the required directory hierarchy:
    /// - `{data_dir}/vms/` -- per-VM state directories
    /// - `{data_dir}/images/` -- pulled OCI image rootfs
    ///
    /// # Errors
    ///
    /// Returns an error if the required directories cannot be created.
    #[allow(unsafe_code)]
    pub fn new(_auth_ctx: Option<crate::runtime::ContainerAuthContext>) -> Result<Self> {
        let data_dir = zlayer_paths::ZLayerDirs::default_data_dir();
        let log_dir = zlayer_paths::ZLayerDirs::default_log_dir();

        std::fs::create_dir_all(data_dir.join("vms")).map_err(|e| {
            AgentError::Configuration(format!(
                "Failed to create VM state dir {}: {}",
                data_dir.join("vms").display(),
                e
            ))
        })?;
        std::fs::create_dir_all(data_dir.join("images")).map_err(|e| {
            AgentError::Configuration(format!(
                "Failed to create images dir {}: {}",
                data_dir.join("images").display(),
                e
            ))
        })?;
        std::fs::create_dir_all(&log_dir).map_err(|e| {
            AgentError::Configuration(format!(
                "Failed to create log dir {}: {}",
                log_dir.display(),
                e
            ))
        })?;

        // Try to load libkrun dynamically
        let api = match LibKrun::load() {
            Ok(lib) => {
                // Set log level: 0=off, 1=error, 2=warn, 3=info, 4=debug
                unsafe { (lib.set_log_level)(2) };
                tracing::info!("libkrun loaded successfully");
                Some(Arc::new(lib))
            }
            Err(e) => {
                tracing::warn!(
                    "libkrun not available: {e}. VM runtime will be non-functional. \
                     Install via: brew tap slp/krunkit && brew install krunkit"
                );
                None
            }
        };

        tracing::info!(
            data_dir = %data_dir.display(),
            log_dir = %log_dir.display(),
            libkrun_available = api.is_some(),
            "macOS VM runtime initialized"
        );

        Ok(Self {
            api,
            data_dir,
            log_dir,
            containers: Arc::new(RwLock::new(HashMap::new())),
            image_rootfs: Arc::new(RwLock::new(HashMap::new())),
            gpu_in_use: Arc::new(AtomicBool::new(false)),
        })
    }

    /// Generate a directory name for a container from its [`ContainerId`].
    fn container_dir_name(id: &ContainerId) -> String {
        format!("{}-{}", id.service, id.replica)
    }

    /// Get the base VM state directory for a container.
    fn vm_dir(&self, id: &ContainerId) -> PathBuf {
        self.data_dir.join("vms").join(Self::container_dir_name(id))
    }

    /// Get the images base directory.
    fn images_dir(&self) -> PathBuf {
        self.data_dir.join("images")
    }

    /// Require that libkrun is loaded, returning the API handle or an error.
    fn require_api(&self) -> Result<&Arc<LibKrun>> {
        self.api.as_ref().ok_or_else(|| {
            AgentError::Configuration(
                "libkrun is not available. Cannot start VM containers.\n\
                 Install via: brew tap slp/krunkit && brew install krunkit"
                    .to_string(),
            )
        })
    }
}

// ---------------------------------------------------------------------------
// Helper functions
// ---------------------------------------------------------------------------

/// Sanitize an image name for use as a filesystem directory name.
fn sanitize_image_name(image: &str) -> String {
    image.replace(['/', ':', '@'], "_")
}

/// Parse a memory string like "512Mi" or "2Gi" into megabytes.
#[allow(clippy::cast_possible_truncation)]
fn parse_memory_to_mib(s: &str) -> Option<u32> {
    let s = s.trim();
    if let Some(num) = s.strip_suffix("Gi") {
        num.parse::<u32>().ok().map(|v| v * 1024)
    } else if let Some(num) = s.strip_suffix("Mi") {
        num.parse::<u32>().ok()
    } else if let Some(num) = s.strip_suffix("Ki") {
        num.parse::<u32>().ok().map(|v| v / 1024)
    } else {
        // Raw bytes -- convert to MiB
        s.parse::<u64>().ok().map(|v| (v / (1024 * 1024)) as u32)
    }
}

/// Clamp the requested vCPU count to not exceed host physical cores.
///
/// libkrun silently hangs if more vCPUs are configured than the host
/// has physical cores. This function prevents that.
#[allow(clippy::cast_possible_truncation)]
fn safe_vcpu_count(requested: u32) -> u32 {
    let host_cores = num_cpus::get() as u32;
    let clamped = requested.min(host_cores).max(1);
    if requested > host_cores {
        tracing::warn!(
            requested = requested,
            host_cores = host_cores,
            clamped = clamped,
            "Clamping vCPU count to host core count (libkrun hangs if exceeded)"
        );
    }
    clamped
}

/// Resolve the entrypoint command from a [`ServiceSpec`].
///
/// Checks `spec.command.entrypoint` and `spec.command.args` in order,
/// then falls back to `/bin/sh`.
fn resolve_entrypoint(spec: &ServiceSpec) -> (String, Vec<String>) {
    // Use entrypoint if specified
    if let Some(ref entrypoint) = spec.command.entrypoint {
        if !entrypoint.is_empty() {
            let program = entrypoint[0].clone();
            let mut args: Vec<String> = entrypoint[1..].to_vec();

            // Append args from spec.command.args if present
            if let Some(ref extra_args) = spec.command.args {
                args.extend(extra_args.iter().cloned());
            }

            return (program, args);
        }
    }

    // Use args as command if no entrypoint
    if let Some(ref cmd_args) = spec.command.args {
        if !cmd_args.is_empty() {
            let program = cmd_args[0].clone();
            let args = cmd_args[1..].to_vec();
            return (program, args);
        }
    }

    // Fallback: use /bin/sh (Linux rootfs inside VM)
    ("/bin/sh".to_string(), vec![])
}

/// Convert a Rust string to a null-terminated `CString`, returning an error
/// with context if the string contains interior null bytes.
fn to_cstring(s: &str, context: &str) -> Result<CString> {
    CString::new(s)
        .map_err(|e| AgentError::InvalidSpec(format!("{context} contains null byte: {e}")))
}

/// Build a null-terminated array of `CString` pointers for passing to C.
///
/// Returns a tuple of `(Vec<CString>, Vec<*const c_char>)` where the
/// second vec is the pointer array (null-terminated) suitable for C's
/// `char *const argv[]`. The first vec must be kept alive as long as the
/// pointers are in use.
fn build_c_string_array(strings: &[String]) -> Result<(Vec<CString>, Vec<*const c_char>)> {
    let c_strings: Vec<CString> = strings
        .iter()
        .map(|s| {
            CString::new(s.as_str())
                .map_err(|e| AgentError::InvalidSpec(format!("String contains null byte: {e}")))
        })
        .collect::<Result<Vec<_>>>()?;

    let mut ptrs: Vec<*const c_char> = c_strings.iter().map(|cs| cs.as_ptr()).collect();
    ptrs.push(std::ptr::null()); // Null terminator for C array

    Ok((c_strings, ptrs))
}

/// Build environment variable array in "KEY=VALUE" format for C.
fn build_env_array(env: &HashMap<String, String>) -> Result<(Vec<CString>, Vec<*const c_char>)> {
    let env_strings: Vec<String> = env.iter().map(|(k, v)| format!("{k}={v}")).collect();

    build_c_string_array(&env_strings)
}

// ---------------------------------------------------------------------------
// Runtime trait implementation
// ---------------------------------------------------------------------------

#[async_trait::async_trait]
impl Runtime for VmRuntime {
    /// Pull an image to local storage with default policy (`IfNotPresent`).
    async fn pull_image(&self, image: &str) -> Result<()> {
        self.pull_image_with_policy(image, zlayer_spec::PullPolicy::IfNotPresent, None)
            .await
    }

    /// Pull an image to local storage with a specific policy.
    ///
    /// Uses `zlayer_registry` to pull OCI image layers and extract them.
    /// Unlike the `SandboxRuntime`, VM containers run real Linux -- so standard
    /// Linux OCI images work directly.
    ///
    /// The `_auth` parameter is accepted for trait conformance (§3.10) but
    /// currently ignored: credentials flow through the existing
    /// `AuthResolver` hostname lookup. Callers that need inline auth should
    /// use the Docker runtime.
    async fn pull_image_with_policy(
        &self,
        image: &str,
        policy: zlayer_spec::PullPolicy,
        _auth: Option<&RegistryAuth>,
    ) -> Result<()> {
        let safe_name = sanitize_image_name(image);
        let image_dir = self.images_dir().join(&safe_name);
        let rootfs_dir = image_dir.join("rootfs");

        match policy {
            zlayer_spec::PullPolicy::Always | zlayer_spec::PullPolicy::Newer => {
                /* always re-pull; drift detection happens at the service layer */
            }
            zlayer_spec::PullPolicy::IfNotPresent => {
                if rootfs_dir.exists() {
                    tracing::debug!(image = %image, "Image already present, skipping pull");
                    let mut images = self.image_rootfs.write().await;
                    images.insert(safe_name, rootfs_dir);
                    return Ok(());
                }
            }
            zlayer_spec::PullPolicy::Never => {
                if !rootfs_dir.exists() {
                    return Err(AgentError::PullFailed {
                        image: image.to_string(),
                        reason: "Image not present and pull policy is Never".to_string(),
                    });
                }
                let mut images = self.image_rootfs.write().await;
                images.insert(safe_name, rootfs_dir);
                return Ok(());
            }
        }

        tracing::info!(
            image = %image,
            "Pulling image for VM runtime (Linux OCI images supported natively)"
        );

        tokio::fs::create_dir_all(&rootfs_dir)
            .await
            .map_err(|e| AgentError::PullFailed {
                image: image.to_string(),
                reason: format!("Failed to create rootfs dir: {e}"),
            })?;

        // Use zlayer-registry to pull and extract OCI image layers.
        let cache_path = self.images_dir().join("blobs.redb");
        let cache_type = zlayer_registry::CacheType::persistent_at(&cache_path);
        let blob_cache = cache_type
            .build()
            .await
            .map_err(|e| AgentError::PullFailed {
                image: image.to_string(),
                reason: format!("Failed to open blob cache: {e}"),
            })?;

        let puller = zlayer_registry::ImagePuller::with_cache(blob_cache);
        let auth = zlayer_registry::RegistryAuth::Anonymous;

        let layers = puller
            .pull_image(image, &auth)
            .await
            .map_err(|e| AgentError::PullFailed {
                image: image.to_string(),
                reason: format!("Failed to pull image layers: {e}"),
            })?;

        tracing::info!(
            image = %image,
            layer_count = layers.len(),
            "Extracting layers to image rootfs"
        );

        let mut unpacker = zlayer_registry::LayerUnpacker::new(rootfs_dir.clone());
        unpacker
            .unpack_layers(&layers)
            .await
            .map_err(|e| AgentError::PullFailed {
                image: image.to_string(),
                reason: format!("Failed to extract rootfs: {e}"),
            })?;

        // Track the rootfs path
        let mut images = self.image_rootfs.write().await;
        images.insert(safe_name, rootfs_dir.clone());

        tracing::info!(
            image = %image,
            rootfs = %rootfs_dir.display(),
            "Image pulled successfully"
        );

        Ok(())
    }

    /// Create a container (prepare VM rootfs and state directory).
    ///
    /// Unlike the `SandboxRuntime`, no Seatbelt profile is needed -- the VM
    /// provides full hardware-level isolation. The rootfs is copied (not
    /// APFS-cloned) since each VM has its own filesystem view through
    /// the virtualization layer.
    #[allow(clippy::too_many_lines)]
    async fn create_container(&self, id: &ContainerId, spec: &ServiceSpec) -> Result<()> {
        let dir_name = Self::container_dir_name(id);
        let vm_dir = self.vm_dir(id);
        let rootfs_dir = vm_dir.join("rootfs");

        // Clean up stale VM directory if it exists
        if vm_dir.exists() {
            tracing::warn!(
                container = %dir_name,
                "Stale VM directory found, cleaning up"
            );
            if let Err(e) = tokio::fs::remove_dir_all(&vm_dir).await {
                tracing::warn!(
                    container = %dir_name,
                    error = %e,
                    "Failed to remove stale VM directory"
                );
            }
        }

        // Create VM state directory
        tokio::fs::create_dir_all(&vm_dir)
            .await
            .map_err(|e| AgentError::CreateFailed {
                id: dir_name.clone(),
                reason: format!("Failed to create VM dir {}: {e}", vm_dir.display()),
            })?;

        // Locate the base image rootfs
        let image_name_str = spec.image.name.to_string();
        let safe_image_name = sanitize_image_name(&image_name_str);
        let image_rootfs = {
            let images = self.image_rootfs.read().await;
            images.get(&safe_image_name).cloned()
        };
        let image_rootfs =
            image_rootfs.unwrap_or_else(|| self.images_dir().join(&safe_image_name).join("rootfs"));

        if !image_rootfs.exists() {
            return Err(AgentError::CreateFailed {
                id: dir_name.clone(),
                reason: format!(
                    "Image rootfs not found at {}. Run pull_image first.",
                    image_rootfs.display()
                ),
            });
        }

        // Copy rootfs to VM directory. No APFS clone needed -- each VM
        // has its own filesystem view through virtio-fs.
        tracing::debug!(
            container = %dir_name,
            src = %image_rootfs.display(),
            dst = %rootfs_dir.display(),
            "Copying rootfs for VM"
        );
        copy_directory_recursive(&image_rootfs, &rootfs_dir)
            .await
            .map_err(|e| AgentError::CreateFailed {
                id: dir_name.clone(),
                reason: format!(
                    "Failed to copy rootfs from {} to {}: {e}",
                    image_rootfs.display(),
                    rootfs_dir.display()
                ),
            })?;

        // Write config to disk
        let config_json =
            serde_json::to_string_pretty(spec).map_err(|e| AgentError::CreateFailed {
                id: dir_name.clone(),
                reason: format!("Failed to serialize spec: {e}"),
            })?;
        tokio::fs::write(vm_dir.join("config.json"), &config_json)
            .await
            .map_err(|e| AgentError::CreateFailed {
                id: dir_name.clone(),
                reason: format!("Failed to write config.json: {e}"),
            })?;

        let log_file = vm_dir.join("console.log");

        // Determine if GPU is requested
        let gpu_enabled = spec
            .resources
            .gpu
            .as_ref()
            .is_some_and(|gpu| gpu.vendor == "apple");

        // Determine vCPU count from spec (default 2)
        #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
        let vcpus = spec
            .resources
            .cpu
            .map_or(2, |cpu| (cpu.ceil() as u32).max(1));
        let vcpus = safe_vcpu_count(vcpus);

        // Determine RAM from spec (default 512 MiB, minimum 128 MiB for VM overhead)
        let ram_mib = spec
            .resources
            .memory
            .as_ref()
            .and_then(|m| parse_memory_to_mib(m))
            .unwrap_or(512)
            .max(128); // 128 MiB minimum for Linux kernel + init

        // Register the container as pending
        let container = VmContainer {
            state: ContainerState::Pending,
            ctx_id: None,
            state_dir: vm_dir,
            rootfs_dir,
            log_file,
            started_at: None,
            spec: spec.clone(),
            vm_thread: None,
            gpu_enabled,
            ram_mib,
            vcpus,
        };

        let mut containers = self.containers.write().await;
        containers.insert(dir_name.clone(), container);

        tracing::info!(
            container = %dir_name,
            image = %spec.image.name,
            vcpus = vcpus,
            ram_mib = ram_mib,
            gpu = gpu_enabled,
            "VM container created"
        );

        Ok(())
    }

    /// Start a container by creating and booting a libkrun microVM.
    ///
    /// This method:
    /// 1. Creates a libkrun context via `krun_create_ctx()`
    /// 2. Configures the VM: vCPUs, RAM, rootfs, workdir, GPU, networking
    /// 3. Sets the entrypoint command and environment variables
    /// 4. Spawns `krun_start_enter()` on a dedicated OS thread (it blocks
    ///    until the VM exits, so it cannot run on the tokio runtime)
    /// 5. Updates state to `Running`
    ///
    /// # GPU Constraint
    ///
    /// Only one GPU-enabled VM can run at a time. If a GPU container is
    /// already running, this method returns an error.
    #[allow(clippy::too_many_lines, unsafe_code)]
    async fn start_container(&self, id: &ContainerId) -> Result<()> {
        let dir_name = Self::container_dir_name(id);
        let api = self.require_api()?;

        // Extract what we need from the container state, then release the lock
        let (rootfs_dir, log_file, spec, gpu_enabled, vcpus, ram_mib) = {
            let containers = self.containers.read().await;
            let container = containers
                .get(&dir_name)
                .ok_or_else(|| AgentError::NotFound {
                    container: dir_name.clone(),
                    reason: "Container not created".to_string(),
                })?;
            (
                container.rootfs_dir.clone(),
                container.log_file.clone(),
                container.spec.clone(),
                container.gpu_enabled,
                container.vcpus,
                container.ram_mib,
            )
        };

        // Enforce one-GPU-at-a-time constraint
        if gpu_enabled {
            let was_in_use = self.gpu_in_use.swap(true, Ordering::SeqCst);
            if was_in_use {
                return Err(AgentError::StartFailed {
                    id: dir_name,
                    reason: "Cannot start GPU-enabled VM: another GPU VM is already running. \
                             libkrun supports only one GPU-enabled VM at a time due to Metal \
                             GPU context limitations. Stop the existing GPU container first, \
                             or use the SandboxRuntime (Approach A) which supports multiple \
                             GPU processes via Metal's built-in multi-process support."
                        .to_string(),
                });
            }
        }

        // Resolve entrypoint
        let (program, args) = resolve_entrypoint(&spec);

        tracing::info!(
            container = %dir_name,
            program = %program,
            args = ?args,
            vcpus = vcpus,
            ram_mib = ram_mib,
            gpu = gpu_enabled,
            "Starting libkrun VM"
        );

        // --- Configure libkrun context ---

        // 1. Create context
        let ctx_id = unsafe { (api.create_ctx)() };
        if ctx_id < 0 {
            if gpu_enabled {
                self.gpu_in_use.store(false, Ordering::SeqCst);
            }
            return Err(AgentError::StartFailed {
                id: dir_name,
                reason: format!("krun_create_ctx failed with code {ctx_id}"),
            });
        }
        #[allow(clippy::cast_sign_loss)]
        let ctx_id = ctx_id as u32;

        // Helper closure to clean up context on error
        let cleanup_on_error = |api: &LibKrun, ctx: u32, gpu: bool, gpu_flag: &AtomicBool| {
            unsafe { (api.free_ctx)(ctx) };
            if gpu {
                gpu_flag.store(false, Ordering::SeqCst);
            }
        };

        // 2. Configure VM resources
        let ret = unsafe { (api.set_vm_config)(ctx_id, vcpus, ram_mib) };
        if ret != 0 {
            cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
            return Err(AgentError::StartFailed {
                id: dir_name,
                reason: format!(
                    "krun_set_vm_config({vcpus} vCPUs, {ram_mib} MiB) failed with code {ret}"
                ),
            });
        }

        // 3. Set rootfs (virtio-fs passthrough)
        let rootfs_cstr = to_cstring(rootfs_dir.to_str().unwrap_or("/invalid"), "rootfs path")?;
        let ret = unsafe { (api.set_root)(ctx_id, rootfs_cstr.as_ptr()) };
        if ret != 0 {
            cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
            return Err(AgentError::StartFailed {
                id: dir_name,
                reason: format!(
                    "krun_set_root({}) failed with code {ret}",
                    rootfs_dir.display()
                ),
            });
        }

        // 4. Set working directory if specified
        if let Some(ref workdir) = spec.command.workdir {
            let workdir_cstr = to_cstring(workdir, "workdir path")?;
            let ret = unsafe { (api.set_workdir)(ctx_id, workdir_cstr.as_ptr()) };
            if ret != 0 {
                cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
                return Err(AgentError::StartFailed {
                    id: dir_name,
                    reason: format!("krun_set_workdir({workdir}) failed with code {ret}"),
                });
            }
        }

        // 5. Configure GPU if requested
        if gpu_enabled {
            // Flags: 0 = auto-detect Metal backend on Apple Silicon.
            // libkrun auto-selects Venus-Vulkan or ggml remoting based on
            // the guest workload.
            let ret = unsafe { (api.set_gpu_options2)(ctx_id, 0) };
            if ret != 0 {
                cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
                return Err(AgentError::StartFailed {
                    id: dir_name,
                    reason: format!("krun_set_gpu_options2 failed with code {ret}"),
                });
            }
            tracing::info!(
                container = %dir_name,
                "GPU forwarding enabled (Venus-Vulkan/ggml)"
            );
        }

        // 6. Enable TSI networking
        let ret = unsafe { (api.set_tsi)(ctx_id) };
        if ret != 0 {
            cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
            return Err(AgentError::StartFailed {
                id: dir_name,
                reason: format!("krun_set_tsi failed with code {ret}"),
            });
        }

        // 7. Set entrypoint command and environment
        let exec_cstr = to_cstring(&program, "exec path")?;

        // Build argv: [program, ...args]
        let mut argv_strings = vec![program.clone()];
        argv_strings.extend(args.clone());
        // Scope raw pointer arrays so they don't live across .await
        let ret = {
            let (_argv_cstrings, argv_ptrs) = build_c_string_array(&argv_strings)?;
            let (_envp_cstrings, envp_ptrs) = build_env_array(&spec.env)?;

            unsafe {
                (api.set_exec)(
                    ctx_id,
                    exec_cstr.as_ptr(),
                    argv_ptrs.as_ptr(),
                    envp_ptrs.as_ptr(),
                )
            }
        };
        if ret != 0 {
            cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
            return Err(AgentError::StartFailed {
                id: dir_name,
                reason: format!("krun_set_exec({program}) failed with code {ret}"),
            });
        }

        // 8. Spawn the VM on a dedicated OS thread.
        //
        // CRITICAL: `krun_start_enter()` BLOCKS the calling thread until the
        // VM exits. We must NOT call it from a tokio task, as it would block
        // the entire async runtime. Use `std::thread::spawn` instead.
        let api_clone = Arc::clone(api);
        let thread_dir_name = dir_name.clone();
        let thread_log_file = log_file.clone();

        let vm_thread = std::thread::Builder::new()
            .name(format!("krun-vm-{dir_name}"))
            .spawn(move || {
                tracing::debug!(
                    container = %thread_dir_name,
                    ctx_id = ctx_id,
                    "VM thread starting krun_start_enter"
                );

                // Create log file for any pre-start output (libkrun logs to stderr)
                // This is best-effort -- the VM will still run if logging fails.
                let _ = std::fs::File::create(&thread_log_file);

                // This call BLOCKS until the VM exits (entrypoint process exits).
                let exit_code = unsafe { (api_clone.start_enter)(ctx_id) };

                tracing::info!(
                    container = %thread_dir_name,
                    ctx_id = ctx_id,
                    exit_code = exit_code,
                    "VM exited"
                );

                exit_code
            })
            .map_err(|e| {
                cleanup_on_error(api, ctx_id, gpu_enabled, &self.gpu_in_use);
                AgentError::StartFailed {
                    id: dir_name.clone(),
                    reason: format!("Failed to spawn VM thread: {e}"),
                }
            })?;

        // Update container state
        let mut containers = self.containers.write().await;
        if let Some(container) = containers.get_mut(&dir_name) {
            container.ctx_id = Some(ctx_id);
            container.state = ContainerState::Running;
            container.started_at = Some(Instant::now());
            container.vm_thread = Some(vm_thread);
        }

        tracing::info!(
            container = %dir_name,
            ctx_id = ctx_id,
            "VM started"
        );

        Ok(())
    }

    /// Stop a container by destroying the libkrun VM context.
    ///
    /// libkrun does not have a clean shutdown API (`krun_stop` does not exist).
    /// The VM exits when the entrypoint process exits. To force stop:
    ///
    /// 1. Call `krun_free_ctx()` to destroy the VM context (abrupt)
    /// 2. Wait for the VM thread to complete
    ///
    /// The `timeout` parameter is respected for waiting on the thread join.
    #[allow(unsafe_code)]
    async fn stop_container(&self, id: &ContainerId, timeout: Duration) -> Result<()> {
        let dir_name = Self::container_dir_name(id);

        let (ctx_id, gpu_enabled, vm_thread) = {
            let mut containers = self.containers.write().await;
            let container = containers
                .get_mut(&dir_name)
                .ok_or_else(|| AgentError::NotFound {
                    container: dir_name.clone(),
                    reason: "Container not found".to_string(),
                })?;

            // If already exited, nothing to do
            if matches!(
                container.state,
                ContainerState::Exited { .. } | ContainerState::Failed { .. }
            ) {
                return Ok(());
            }

            container.state = ContainerState::Stopping;

            (
                container.ctx_id,
                container.gpu_enabled,
                container.vm_thread.take(),
            )
        };

        tracing::info!(
            container = %dir_name,
            ctx_id = ?ctx_id,
            timeout = ?timeout,
            "Stopping VM"
        );

        // Destroy the VM context to force the VM to exit.
        // This is the only way to stop a libkrun VM externally.
        if let (Some(api), Some(ctx)) = (&self.api, ctx_id) {
            let ret = unsafe { (api.free_ctx)(ctx) };
            if ret != 0 {
                tracing::warn!(
                    container = %dir_name,
                    ctx_id = ctx,
                    ret = ret,
                    "krun_free_ctx returned non-zero (VM may have already exited)"
                );
            }
        }

        // Wait for the VM thread to finish (with timeout)
        if let Some(thread) = vm_thread {
            let exit_code = tokio::task::spawn_blocking(move || {
                // std::thread::JoinHandle doesn't support timeout directly.
                // We rely on krun_free_ctx having terminated the VM above.
                // Give it a reasonable window then report.
                thread.join().unwrap_or_else(|_| {
                    tracing::warn!("VM thread panicked during join");
                    -1
                })
            })
            .await
            .unwrap_or(-1);

            let mut containers = self.containers.write().await;
            if let Some(c) = containers.get_mut(&dir_name) {
                c.state = ContainerState::Exited { code: exit_code };
                c.ctx_id = None;
            }

            tracing::info!(
                container = %dir_name,
                exit_code = exit_code,
                "VM stopped"
            );
        } else {
            // No thread -- mark as exited
            let mut containers = self.containers.write().await;
            if let Some(c) = containers.get_mut(&dir_name) {
                c.state = ContainerState::Exited { code: 0 };
                c.ctx_id = None;
            }
        }

        // Release GPU lock if this was a GPU container
        if gpu_enabled {
            self.gpu_in_use.store(false, Ordering::SeqCst);
        }

        Ok(())
    }

    /// Remove a container.
    ///
    /// Stops the VM if still running, removes the state directory, and
    /// removes the container from internal tracking.
    #[allow(unsafe_code)]
    async fn remove_container(&self, id: &ContainerId) -> Result<()> {
        let dir_name = Self::container_dir_name(id);

        tracing::info!(container = %dir_name, "Removing VM container");

        // Stop the VM if it is still running
        {
            let containers = self.containers.read().await;
            if let Some(c) = containers.get(&dir_name) {
                if matches!(
                    c.state,
                    ContainerState::Running
                        | ContainerState::Stopping
                        | ContainerState::Initializing
                ) {
                    drop(containers);
                    // Best-effort stop
                    let _ = self.stop_container(id, Duration::from_secs(5)).await;
                }
            }
        }

        // Remove from tracking, capturing state_dir for cleanup below
        let removed_state_dir = {
            let mut containers = self.containers.write().await;
            if let Some(c) = containers.remove(&dir_name) {
                // Release GPU lock if needed
                if c.gpu_enabled
                    && matches!(c.state, ContainerState::Running | ContainerState::Stopping)
                {
                    self.gpu_in_use.store(false, Ordering::SeqCst);
                }

                // Free context if still active
                if let (Some(api), Some(ctx)) = (&self.api, c.ctx_id) {
                    let _ = unsafe { (api.free_ctx)(ctx) };
                }
                Some(c.state_dir)
            } else {
                None
            }
        };

        // Remove state directory
        let vm_dir = removed_state_dir.unwrap_or_else(|| self.vm_dir(id));
        if vm_dir.exists() {
            tokio::fs::remove_dir_all(&vm_dir).await.map_err(|e| {
                AgentError::Internal(format!("Failed to remove VM dir {}: {e}", vm_dir.display()))
            })?;
        }

        tracing::info!(container = %dir_name, "VM container removed");
        Ok(())
    }

    /// Get container state.
    ///
    /// Checks whether the VM thread is still alive to detect exits.
    async fn container_state(&self, id: &ContainerId) -> Result<ContainerState> {
        let dir_name = Self::container_dir_name(id);

        let mut containers = self.containers.write().await;
        let container = containers
            .get_mut(&dir_name)
            .ok_or_else(|| AgentError::NotFound {
                container: dir_name.clone(),
                reason: "Container not found".to_string(),
            })?;

        // If already in a terminal state, return it
        match &container.state {
            ContainerState::Exited { .. } | ContainerState::Failed { .. } => {
                return Ok(container.state.clone());
            }
            ContainerState::Pending => return Ok(ContainerState::Pending),
            _ => {}
        }

        // Check if the VM thread has finished
        if let Some(ref thread) = container.vm_thread {
            if thread.is_finished() {
                // Thread finished -- take ownership to join it
                let Some(thread) = container.vm_thread.take() else {
                    return Ok(container.state.clone());
                };
                let Ok(exit_code) = thread.join() else {
                    container.state = ContainerState::Failed {
                        reason: "VM thread panicked".to_string(),
                    };
                    return Ok(container.state.clone());
                };

                container.state = ContainerState::Exited { code: exit_code };
                container.ctx_id = None;

                // Release GPU lock
                if container.gpu_enabled {
                    self.gpu_in_use.store(false, Ordering::SeqCst);
                }
            }
            // else: thread still running, state is Running
        }

        Ok(container.state.clone())
    }

    /// Get container logs from the VM console output.
    ///
    /// Reads the console log file. If `tail > 0`, returns only the last
    /// `tail` lines.
    async fn container_logs(&self, id: &ContainerId, tail: usize) -> Result<Vec<LogEntry>> {
        let dir_name = Self::container_dir_name(id);

        let log_file = {
            let containers = self.containers.read().await;
            let container = containers
                .get(&dir_name)
                .ok_or_else(|| AgentError::NotFound {
                    container: dir_name.clone(),
                    reason: "Container not found".to_string(),
                })?;
            container.log_file.clone()
        };

        let content = match tokio::fs::read_to_string(&log_file).await {
            Ok(s) => s,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                return Ok(Vec::new());
            }
            Err(e) => {
                return Err(AgentError::Internal(format!(
                    "Failed to read VM console log {}: {e}",
                    log_file.display()
                )));
            }
        };

        let now = chrono::Utc::now();
        let source = LogSource::Container(id.to_string());
        let mut entries: Vec<LogEntry> = content
            .lines()
            .map(|line| LogEntry {
                timestamp: now,
                stream: LogStream::Stdout,
                message: line.to_string(),
                source: source.clone(),
                service: None,
                deployment: None,
            })
            .collect();

        // Apply tail
        if tail > 0 && entries.len() > tail {
            let start = entries.len() - tail;
            entries = entries.split_off(start);
        }

        Ok(entries)
    }

    /// Execute a command inside the VM container.
    ///
    /// **NOT SUPPORTED** in the VM runtime. libkrun does not provide an API
    /// to execute arbitrary commands inside a running VM. Implementing this
    /// would require:
    /// - A vsock-based agent running inside the guest
    /// - Or SSH access into the VM
    ///
    /// Neither is currently set up. Use the `SandboxRuntime` if exec is required.
    async fn exec(&self, id: &ContainerId, _cmd: &[String]) -> Result<(i32, String, String)> {
        let dir_name = Self::container_dir_name(id);

        // Verify the container exists
        let containers = self.containers.read().await;
        if !containers.contains_key(&dir_name) {
            return Err(AgentError::NotFound {
                container: dir_name,
                reason: "Container not found".to_string(),
            });
        }

        Err(AgentError::Internal(
            "exec is not supported in the VM runtime. libkrun does not provide an API \
             to execute commands inside a running VM. This would require a vsock-based \
             agent or SSH server inside the guest. Use the SandboxRuntime (MacSandbox) \
             if you need exec capability."
                .to_string(),
        ))
    }

    /// Get container resource statistics.
    ///
    /// **Limited in VM runtime.** Since the VM runs as a host thread (not a
    /// separate process), we cannot use `proc_pidinfo` to get per-container
    /// stats. Instead, we report the configured resource limits as approximate
    /// values.
    ///
    /// For accurate per-container stats, use the `SandboxRuntime` which has
    /// real per-process monitoring via `proc_pidinfo`.
    async fn get_container_stats(&self, id: &ContainerId) -> Result<ContainerStats> {
        let dir_name = Self::container_dir_name(id);

        let containers = self.containers.read().await;
        let container = containers
            .get(&dir_name)
            .ok_or_else(|| AgentError::NotFound {
                container: dir_name.clone(),
                reason: "Container not found".to_string(),
            })?;

        if !matches!(container.state, ContainerState::Running) {
            return Err(AgentError::Internal(
                "Container is not running -- cannot collect stats".to_string(),
            ));
        }

        // We cannot get real per-VM stats without an in-guest agent.
        // Report configured limits as approximate values.
        #[allow(clippy::cast_possible_truncation)]
        let uptime_usec = container
            .started_at
            .map_or(0, |t| t.elapsed().as_micros() as u64);

        let memory_limit = u64::from(container.ram_mib) * 1024 * 1024;

        Ok(ContainerStats {
            // Approximate CPU usage based on uptime (not real usage).
            // A proper implementation would require an in-guest metrics agent.
            cpu_usage_usec: uptime_usec,
            // Report memory limit as current usage (conservative estimate).
            // The actual usage inside the VM is not visible from the host
            // without an in-guest agent.
            memory_bytes: memory_limit,
            memory_limit,
            timestamp: Instant::now(),
        })
    }

    /// Wait for a container to exit and return its exit code.
    ///
    /// Blocks (asynchronously) until the VM thread completes.
    async fn wait_container(&self, id: &ContainerId) -> Result<i32> {
        let dir_name = Self::container_dir_name(id);

        // Check if already exited
        {
            let containers = self.containers.read().await;
            let container = containers
                .get(&dir_name)
                .ok_or_else(|| AgentError::NotFound {
                    container: dir_name.clone(),
                    reason: "Container not found".to_string(),
                })?;

            if let ContainerState::Exited { code } = &container.state {
                return Ok(*code);
            }
        }

        tracing::debug!(container = %dir_name, "Waiting for VM to exit");

        // Poll the VM thread status until it completes.
        // We poll rather than taking the thread handle because multiple
        // callers might be waiting on the same container.
        loop {
            {
                let mut containers = self.containers.write().await;
                if let Some(container) = containers.get_mut(&dir_name) {
                    // Check for terminal states
                    if let ContainerState::Exited { code } = &container.state {
                        return Ok(*code);
                    }
                    if let ContainerState::Failed { reason } = &container.state {
                        return Err(AgentError::Internal(format!("VM failed: {reason}")));
                    }

                    // Check if thread has finished
                    if let Some(ref thread) = container.vm_thread {
                        if thread.is_finished() {
                            if let Some(thread) = container.vm_thread.take() {
                                let exit_code = thread.join().unwrap_or(-1);
                                container.state = ContainerState::Exited { code: exit_code };
                                container.ctx_id = None;

                                if container.gpu_enabled {
                                    self.gpu_in_use.store(false, Ordering::SeqCst);
                                }

                                tracing::info!(
                                    container = %dir_name,
                                    exit_code = exit_code,
                                    "VM exited (via wait)"
                                );

                                return Ok(exit_code);
                            }
                        }
                    } else {
                        // No thread -- container was never started or thread was already joined
                        return Err(AgentError::Internal(
                            "VM has no active thread to wait on".to_string(),
                        ));
                    }
                } else {
                    return Err(AgentError::NotFound {
                        container: dir_name,
                        reason: "Container disappeared while waiting".to_string(),
                    });
                }
            }

            // Poll interval -- 250ms is a reasonable balance between
            // responsiveness and overhead
            tokio::time::sleep(Duration::from_millis(250)).await;
        }
    }

    /// Get container logs as structured entries.
    async fn get_logs(&self, id: &ContainerId) -> Result<Vec<LogEntry>> {
        let dir_name = Self::container_dir_name(id);

        let log_file = {
            let containers = self.containers.read().await;
            let container = containers
                .get(&dir_name)
                .ok_or_else(|| AgentError::NotFound {
                    container: dir_name.clone(),
                    reason: "Container not found".to_string(),
                })?;
            container.log_file.clone()
        };

        let content = match tokio::fs::read_to_string(&log_file).await {
            Ok(s) => s,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
            Err(e) => {
                return Err(AgentError::Internal(format!(
                    "Failed to read VM console log: {e}"
                )));
            }
        };

        let now = chrono::Utc::now();
        let source = LogSource::Container(id.to_string());
        let entries = content
            .lines()
            .map(|line| LogEntry {
                timestamp: now,
                stream: LogStream::Stdout,
                message: line.to_string(),
                source: source.clone(),
                service: None,
                deployment: None,
            })
            .collect();

        Ok(entries)
    }

    /// Get the PID of a container's main process.
    ///
    /// Returns `None` for VM containers. The PID of the main process lives
    /// inside the VM's PID namespace and is not meaningful from the host.
    async fn get_container_pid(&self, id: &ContainerId) -> Result<Option<u32>> {
        let dir_name = Self::container_dir_name(id);

        let containers = self.containers.read().await;
        if !containers.contains_key(&dir_name) {
            return Err(AgentError::NotFound {
                container: dir_name,
                reason: "Container not found".to_string(),
            });
        }

        // VM process PID is inside the guest PID namespace -- not useful from the host.
        Ok(None)
    }

    /// Get the IP address of a container.
    ///
    /// Returns `127.0.0.1` (localhost) for all VM containers. TSI
    /// (Transparent Socket Impersonation) maps guest socket operations
    /// through the host process, so guest services bind ports directly
    /// on the host. The proxy manager routes traffic by port.
    ///
    /// ## Multi-replica note
    ///
    /// Unlike the `SandboxRuntime`, the `VmRuntime` does NOT have a port conflict
    /// problem with multiple replicas. Each VM has its own full network stack
    /// via libkrun's TSI -- the guest binds inside its own namespace, and TSI
    /// maps it to a unique host port automatically. No `get_container_port_override`
    /// is needed (the default `Ok(None)` from the trait is correct).
    async fn get_container_ip(&self, id: &ContainerId) -> Result<Option<IpAddr>> {
        let dir_name = Self::container_dir_name(id);

        let containers = self.containers.read().await;
        if !containers.contains_key(&dir_name) {
            return Err(AgentError::NotFound {
                container: dir_name,
                reason: "Container not found".to_string(),
            });
        }

        // TSI maps guest networking through the host process.
        // All VM containers appear as localhost from the host's perspective.
        Ok(Some(IpAddr::V4(std::net::Ipv4Addr::LOCALHOST)))
    }

    /// Kill a VM container.
    ///
    /// libkrun exposes no POSIX-signal interface into the guest; only
    /// `SIGKILL`-equivalent termination is available, which is achieved by
    /// destroying the VM context via `stop_container`. The `signal` parameter
    /// is validated for API parity but only `SIGKILL` and `SIGTERM` map to a
    /// meaningful action (both tear the VM down). Other accepted signals are
    /// rejected with [`AgentError::Unsupported`].
    async fn kill_container(&self, id: &ContainerId, signal: Option<&str>) -> Result<()> {
        let canonical = crate::runtime::validate_signal(signal.unwrap_or("SIGKILL"))?;
        match canonical.as_str() {
            "SIGKILL" | "SIGTERM" => {
                // Tear down the VM context immediately.
                self.stop_container(id, Duration::from_secs(0)).await
            }
            other => Err(AgentError::Unsupported(format!(
                "signal '{other}' is not supported by the macOS VM runtime (only SIGKILL and SIGTERM map to VM teardown)"
            ))),
        }
    }

    /// Tagging is not supported by the macOS VM runtime.
    ///
    /// Images are materialized as VM disk images inside per-container state
    /// directories, not stored in a shared content-addressed registry, so
    /// there is nothing to tag.
    async fn tag_image(&self, _source: &str, _target: &str) -> Result<()> {
        Err(AgentError::Unsupported(
            "tag_image is not supported by the macOS VM runtime".into(),
        ))
    }
}

// ---------------------------------------------------------------------------
// Directory copy helper
// ---------------------------------------------------------------------------

/// Recursively copy a directory tree.
///
/// Unlike the `SandboxRuntime`'s `clone_directory_recursive` which uses APFS
/// `clonefile` for `CoW`, this does a regular copy. VMs have their own
/// filesystem view through `virtio-fs`, so `CoW` is not needed.
async fn copy_directory_recursive(
    src: &std::path::Path,
    dst: &std::path::Path,
) -> std::io::Result<()> {
    tokio::fs::create_dir_all(dst).await?;

    let mut entries = tokio::fs::read_dir(src).await?;
    while let Some(entry) = entries.next_entry().await? {
        let entry_path = entry.path();
        let file_name = entry.file_name();
        let dest_path = dst.join(&file_name);

        let file_type = entry.file_type().await?;

        if file_type.is_dir() {
            Box::pin(copy_directory_recursive(&entry_path, &dest_path)).await?;
        } else if file_type.is_file() {
            tokio::fs::copy(&entry_path, &dest_path).await?;
        } else if file_type.is_symlink() {
            let link_target = tokio::fs::read_link(&entry_path).await?;
            tokio::fs::symlink(&link_target, &dest_path).await?;
        }
    }

    // Preserve directory permissions
    let src_meta = tokio::fs::metadata(src).await?;
    tokio::fs::set_permissions(dst, src_meta.permissions()).await?;

    Ok(())
}