arcbox-agent 0.4.9

Guest agent for ArcBox VMs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
//! EnsureRuntime state machine (platform-independent, testable).
//!
//! Non-blocking semantics: the RPC never awaits the start sequence inline.
//! The first caller transitions `NotStarted`/`Failed` → `Starting`, spawns
//! the actual start work as a background task, and returns `STATUS_STARTING`
//! immediately. Subsequent callers see `Starting` and also return
//! `STATUS_STARTING`. Once the background task publishes its result:
//! - `Ready` → callers get cached endpoint/message with `STATUS_REUSED`.
//! - `Failed` → the *next* caller with `start_if_needed=true` transitions
//!   back to `Starting` and spawns a fresh start; callers don't observe a
//!   cached failed response because the expected recovery mode is "ask the
//!   agent to try again." Probe-only (`start_if_needed=false`) callers
//!   bypass the state machine and re-run `probe_fn`.
//!
//! Why non-blocking: the host-side vsock transport uses a short per-RPC
//! deadline (~5s). Blocking the RPC for the full cold-boot window (up to
//! ~90s waiting on containerd + dockerd) causes the daemon to close the
//! socketpair mid-call, and the agent's eventual response write fails with
//! EPIPE. Returning immediately keeps the RPC well under the deadline and
//! lets the daemon poll with its own backoff.

use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::sync::OnceLock;

// STATUS_STARTED is still a valid terminal status the start task can report
// internally (persisted via `RuntimeState::Ready`'s message); callers see
// STATUS_REUSED once state is settled.
#[allow(unused_imports)]
pub use arcbox_constants::status::{
    RUNTIME_FAILED as STATUS_FAILED, RUNTIME_REUSED as STATUS_REUSED,
    RUNTIME_STARTED as STATUS_STARTED, RUNTIME_STARTING as STATUS_STARTING,
};
use arcbox_protocol::agent::RuntimeEnsureResponse;
use futures::FutureExt;
use tokio::sync::Mutex;

/// Runtime lifecycle state.
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum RuntimeState {
    /// No ensure has been attempted yet.
    NotStarted,
    /// A background start task is running.
    Starting,
    /// Runtime is confirmed ready.
    Ready { endpoint: String, message: String },
    /// Last start attempt failed; a new `start_if_needed=true` call will retry.
    Failed { message: String },
}

/// Global guard that serializes start attempts and caches the outcome.
#[allow(dead_code)]
pub struct RuntimeGuard {
    pub state: Mutex<RuntimeState>,
}

impl RuntimeGuard {
    #[allow(dead_code)]
    pub fn new() -> Self {
        Self {
            state: Mutex::new(RuntimeState::NotStarted),
        }
    }
}

impl Default for RuntimeGuard {
    fn default() -> Self {
        Self::new()
    }
}

/// Returns the global `RuntimeGuard` singleton as an `Arc`.
///
/// `Arc` (rather than `&'static RuntimeGuard`) lets callers cheaply clone the
/// guard and move an owned handle into the background start task — the
/// singleton state stays shared, but the spawn doesn't require borrowing
/// from a `'static` reference.
#[allow(dead_code)]
pub fn runtime_guard() -> Arc<RuntimeGuard> {
    static GUARD: OnceLock<Arc<RuntimeGuard>> = OnceLock::new();
    GUARD.get_or_init(|| Arc::new(RuntimeGuard::new())).clone()
}

/// Non-blocking EnsureRuntime handler.
///
/// - If state is `Ready`: returns the cached endpoint/message with `STATUS_REUSED`.
/// - If `start_if_needed == false`: invokes `probe_fn` and returns its result.
/// - If state is `Starting`: returns `STATUS_STARTING` immediately (daemon polls).
/// - Otherwise (`NotStarted` or `Failed`): transitions state to `Starting`,
///   spawns `make_start()` as a background task, and returns `STATUS_STARTING`.
///
/// The background task awaits the start future and publishes the outcome to
/// `guard.state` (transitioning to `Ready` or `Failed`). If the start future
/// panics, the task catches the unwind and publishes a `Failed` state so the
/// state machine always settles and the daemon's poll loop can make progress.
///
/// Both `make_start` and `probe_fn` are `FnOnce() -> Fut` (not bare futures)
/// so the caller does not construct a future on hot `Ready`/`Starting`
/// polling paths where neither will be invoked.
#[allow(dead_code)]
pub async fn ensure_runtime<F, Fut, P, PFut>(
    guard: Arc<RuntimeGuard>,
    start_if_needed: bool,
    make_start: F,
    probe_fn: P,
) -> RuntimeEnsureResponse
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = RuntimeEnsureResponse> + Send + 'static,
    P: FnOnce() -> PFut,
    PFut: Future<Output = RuntimeEnsureResponse>,
{
    // Fast path: Ready → return cached state.
    {
        let state = guard.state.lock().await;
        if let RuntimeState::Ready { endpoint, message } = &*state {
            return RuntimeEnsureResponse {
                ready: true,
                endpoint: endpoint.clone(),
                message: message.clone(),
                status: STATUS_REUSED.to_string(),
            };
        }
    }

    if !start_if_needed {
        return probe_fn().await;
    }

    // Decide whether to become the driver (spawn the background task).
    let become_driver = {
        let mut state = guard.state.lock().await;
        match &*state {
            RuntimeState::Ready { endpoint, message } => {
                // Another call finished while we waited for the lock.
                return RuntimeEnsureResponse {
                    ready: true,
                    endpoint: endpoint.clone(),
                    message: message.clone(),
                    status: STATUS_REUSED.to_string(),
                };
            }
            RuntimeState::Starting => false,
            RuntimeState::NotStarted | RuntimeState::Failed { .. } => {
                *state = RuntimeState::Starting;
                true
            }
        }
    };

    if become_driver {
        let fut = make_start();
        let guard_for_task = Arc::clone(&guard);
        tokio::spawn(async move {
            // Catch panics from the start future so a bug there can't leave
            // state stuck at `Starting` forever. `AssertUnwindSafe` is sound
            // here: we discard the inner future on panic and publish a
            // fresh `Failed` state directly.
            let response = match AssertUnwindSafe(fut).catch_unwind().await {
                Ok(resp) => resp,
                Err(panic_payload) => {
                    let message = panic_message(&panic_payload);
                    tracing::error!(
                        "ensure_runtime start task panicked: {}; marking runtime Failed",
                        message
                    );
                    RuntimeEnsureResponse {
                        ready: false,
                        endpoint: String::new(),
                        message: format!("runtime start panicked: {message}"),
                        status: STATUS_FAILED.to_string(),
                    }
                }
            };
            let mut state = guard_for_task.state.lock().await;
            *state = if response.ready {
                RuntimeState::Ready {
                    endpoint: response.endpoint.clone(),
                    message: response.message.clone(),
                }
            } else {
                RuntimeState::Failed {
                    message: response.message.clone(),
                }
            };
        });
    }

    // Either we just spawned, or another caller is already driving. Return
    // the pending marker so the daemon polls with backoff.
    RuntimeEnsureResponse {
        ready: false,
        endpoint: String::new(),
        message: "runtime start in progress".to_string(),
        status: STATUS_STARTING.to_string(),
    }
}

/// Best-effort extraction of a human-readable message from a panic payload.
fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
    if let Some(s) = payload.downcast_ref::<&'static str>() {
        (*s).to_string()
    } else if let Some(s) = payload.downcast_ref::<String>() {
        s.clone()
    } else {
        "panic payload was not a string".to_string()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicU32, Ordering};
    use std::time::Duration;

    fn make_ready_response() -> RuntimeEnsureResponse {
        RuntimeEnsureResponse {
            ready: true,
            endpoint: "vsock:2375".to_string(),
            message: "docker socket ready".to_string(),
            status: STATUS_STARTED.to_string(),
        }
    }

    fn make_failed_response() -> RuntimeEnsureResponse {
        RuntimeEnsureResponse {
            ready: false,
            endpoint: String::new(),
            message: "docker socket missing".to_string(),
            status: STATUS_FAILED.to_string(),
        }
    }

    /// Waits for the background task to settle state into Ready or Failed.
    async fn wait_settled(guard: &RuntimeGuard, deadline_ms: u64) -> RuntimeState {
        let deadline = std::time::Instant::now() + Duration::from_millis(deadline_ms);
        loop {
            {
                let state = guard.state.lock().await;
                match &*state {
                    RuntimeState::Ready { .. } | RuntimeState::Failed { .. } => {
                        return state.clone();
                    }
                    _ => {}
                }
            }
            if std::time::Instant::now() >= deadline {
                let state = guard.state.lock().await;
                return state.clone();
            }
            tokio::time::sleep(Duration::from_millis(2)).await;
        }
    }

    #[tokio::test]
    async fn first_call_returns_starting_and_state_becomes_ready() {
        let guard = Arc::new(RuntimeGuard::new());

        let r = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_ready_response() },
            || async { unreachable!("probe should not be called when start_if_needed=true") },
        )
        .await;

        assert!(!r.ready);
        assert_eq!(r.status, STATUS_STARTING);

        let settled = wait_settled(&guard, 500).await;
        assert!(
            matches!(&settled, RuntimeState::Ready { .. }),
            "expected Ready, got {:?}",
            settled
        );
    }

    #[tokio::test]
    async fn second_call_after_ready_returns_reused() {
        let guard = Arc::new(RuntimeGuard::new());

        // Drive start + wait for Ready.
        let _ = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_ready_response() },
            || async { unreachable!() },
        )
        .await;
        let _ = wait_settled(&guard, 500).await;

        // Subsequent call hits fast path.
        let r = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { panic!("start_fn should not be called after Ready") },
            || async { unreachable!() },
        )
        .await;

        assert!(r.ready);
        assert_eq!(r.status, STATUS_REUSED);
        assert_eq!(r.endpoint, "vsock:2375");
    }

    #[tokio::test]
    async fn call_while_starting_returns_starting() {
        let guard = Arc::new(RuntimeGuard::new());
        let gate = Arc::new(tokio::sync::Notify::new());
        let gate_for_start = Arc::clone(&gate);

        // First call spawns start that blocks on `gate` — state stays Starting.
        let r1 = ensure_runtime(
            Arc::clone(&guard),
            true,
            move || {
                let gate = gate_for_start;
                async move {
                    gate.notified().await;
                    make_ready_response()
                }
            },
            || async { unreachable!() },
        )
        .await;
        assert_eq!(r1.status, STATUS_STARTING);

        // Second call while Starting returns immediately with Starting.
        let r2 = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { panic!("start_fn must not re-run while Starting") },
            || async { unreachable!() },
        )
        .await;
        assert!(!r2.ready);
        assert_eq!(r2.status, STATUS_STARTING);

        // Release the start task and wait for settlement.
        gate.notify_one();
        let settled = wait_settled(&guard, 500).await;
        assert!(matches!(settled, RuntimeState::Ready { .. }));
    }

    #[tokio::test]
    async fn failed_start_transitions_to_failed_and_retry_can_succeed() {
        let guard = Arc::new(RuntimeGuard::new());

        let _ = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_failed_response() },
            || async { unreachable!() },
        )
        .await;
        let settled = wait_settled(&guard, 500).await;
        assert!(matches!(settled, RuntimeState::Failed { .. }));

        // Retry: new start is spawned.
        let r = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_ready_response() },
            || async { unreachable!() },
        )
        .await;
        assert_eq!(r.status, STATUS_STARTING);
        let settled = wait_settled(&guard, 500).await;
        assert!(matches!(settled, RuntimeState::Ready { .. }));
    }

    #[tokio::test]
    async fn probe_only_does_not_spawn_start() {
        let guard = Arc::new(RuntimeGuard::new());

        let r = ensure_runtime(
            Arc::clone(&guard),
            false,
            || async { panic!("start_fn must not run when start_if_needed=false") },
            || async { make_failed_response() },
        )
        .await;

        assert!(!r.ready);
        assert_eq!(r.status, STATUS_FAILED);

        // State untouched.
        let state = guard.state.lock().await;
        assert!(matches!(&*state, RuntimeState::NotStarted));
    }

    #[tokio::test]
    async fn concurrent_callers_spawn_start_exactly_once() {
        let guard = Arc::new(RuntimeGuard::new());
        let start_count = Arc::new(AtomicU32::new(0));
        let barrier = Arc::new(tokio::sync::Barrier::new(5));

        let mut handles = Vec::new();
        for _ in 0..5 {
            let guard = Arc::clone(&guard);
            let start_count = Arc::clone(&start_count);
            let barrier = Arc::clone(&barrier);

            handles.push(tokio::spawn(async move {
                barrier.wait().await;
                ensure_runtime(
                    guard,
                    true,
                    move || {
                        let start_count = Arc::clone(&start_count);
                        async move {
                            start_count.fetch_add(1, Ordering::SeqCst);
                            tokio::time::sleep(Duration::from_millis(20)).await;
                            make_ready_response()
                        }
                    },
                    || async { unreachable!() },
                )
                .await
            }));
        }

        for h in handles {
            let r = h.await.unwrap();
            assert_eq!(r.status, STATUS_STARTING);
        }

        let settled = wait_settled(&guard, 500).await;
        assert!(matches!(settled, RuntimeState::Ready { .. }));
        assert_eq!(start_count.load(Ordering::SeqCst), 1);
    }

    #[tokio::test]
    async fn panic_in_start_task_transitions_to_failed() {
        let guard = Arc::new(RuntimeGuard::new());

        let r = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { panic!("boom from start_fn") },
            || async { unreachable!() },
        )
        .await;
        assert_eq!(r.status, STATUS_STARTING);

        // Without the catch_unwind guard, state would stay Starting forever.
        // With it, the panic is converted into Failed so the daemon's poll
        // loop has a terminal state to observe.
        let settled = wait_settled(&guard, 500).await;
        assert!(
            matches!(&settled, RuntimeState::Failed { .. }),
            "expected Failed after panic, got {:?}",
            settled
        );

        // A subsequent start_if_needed=true call retries cleanly.
        let r = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_ready_response() },
            || async { unreachable!() },
        )
        .await;
        assert_eq!(r.status, STATUS_STARTING);
        let settled = wait_settled(&guard, 500).await;
        assert!(matches!(settled, RuntimeState::Ready { .. }));
    }

    #[tokio::test]
    async fn probe_after_ready_returns_reused_without_running_probe() {
        let guard = Arc::new(RuntimeGuard::new());

        // Start + settle.
        let _ = ensure_runtime(
            Arc::clone(&guard),
            true,
            || async { make_ready_response() },
            || async { unreachable!() },
        )
        .await;
        let _ = wait_settled(&guard, 500).await;

        // Probe-only: fast path hits Ready before probe_fn is awaited.
        let r = ensure_runtime(
            Arc::clone(&guard),
            false,
            || async { panic!("start_fn must not run") },
            || async { panic!("probe_fn must not run when state is already Ready") },
        )
        .await;

        assert!(r.ready);
        assert_eq!(r.status, STATUS_REUSED);
    }
}