car-messaging 0.31.0

Multi-channel approval-transport adapters (iMessage + Slack) for the CAR daemon — inbound poller/orchestrator, Slack wire parsing, per-channel config/allowlist/pairing. Extracted from car-server-core (#418) to cut its test-binary link footprint.
//! U1 — Spawn the channel watcher on runtime enable (the invisible-restart fix).
//!
//! Before this change the daemon only spawned a channel's poller/observer at
//! BOOT, for channels already enabled. Toggling a channel on while car-server
//! was already running did NOT spawn the loop — so a first-time enable did
//! nothing until a restart, completely invisibly. The [`ChannelSupervisor`]
//! closes that: `ensure_spawned` (called from `messaging.config.set` on an
//! off→on transition) spawns the watcher immediately, and is idempotent so a
//! second enable / a boot-spawned channel never double-spawns.
//!
//! These tests drive the supervisor's PUBLIC API with a counting spawn closure
//! (no macOS chat.db, no real loops) so the spawn bookkeeping is asserted
//! deterministically. The real per-channel spawn body is covered by the
//! `spawn_channel_pollers` boot path (exercised on-device per the plan's
//! scenarios — actual iMessage delivery needs a signed build + device).

use car_messaging::channel_supervisor::{ChannelSupervisor, SharedLiveness, SpawnFn};
use car_messaging::messaging_config::ChannelId;
use car_server_types::host::HostState;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

/// Build a supervisor whose spawn closure just increments `counter` per channel
/// it spawns (so a test can assert the spawn happened exactly once). Returns the
/// supervisor and the shared counter.
fn counting_supervisor() -> (Arc<ChannelSupervisor>, Arc<AtomicUsize>) {
    let host = Arc::new(HostState::new());
    let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
    let counter = Arc::new(AtomicUsize::new(0));
    let counter_for_closure = counter.clone();
    let spawn: SpawnFn = Box::new(
        move |_channel: ChannelId,
              _host: &Arc<HostState>,
              _liveness: &SharedLiveness,
              _cancel: tokio::sync::watch::Receiver<bool>|
              -> Result<(), String> {
            counter_for_closure.fetch_add(1, Ordering::SeqCst);
            Ok(())
        },
    );
    let sup = Arc::new(ChannelSupervisor::new(host, cancel_tx, spawn));
    (sup, counter)
}

#[test]
fn runtime_enable_spawns_watcher_once() {
    let (sup, count) = counting_supervisor();
    assert!(!sup.is_spawned(ChannelId::IMessage));

    // First enable at runtime (channel was off at boot) → watcher spawned.
    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    assert!(sup.is_spawned(ChannelId::IMessage));
    assert_eq!(count.load(Ordering::SeqCst), 1, "exactly one spawn");
}

#[test]
fn re_enable_is_idempotent_no_duplicate_watcher() {
    let (sup, count) = counting_supervisor();

    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    // A second enable of an already-spawned channel must NOT spawn again.
    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    assert_eq!(
        count.load(Ordering::SeqCst),
        1,
        "an already-spawned channel is not re-spawned (idempotent)"
    );
}

#[test]
fn boot_spawned_channel_is_not_double_spawned_by_a_later_enable() {
    let (sup, count) = counting_supervisor();
    // Simulate the boot path having already spawned this channel.
    sup.mark_spawned(ChannelId::IMessage);
    assert!(sup.is_spawned(ChannelId::IMessage));

    // A later runtime enable (e.g. a redundant config.set) must be a no-op.
    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    assert_eq!(
        count.load(Ordering::SeqCst),
        0,
        "a channel already spawned at boot is never re-spawned by a later set"
    );
}

#[test]
fn channels_are_independent() {
    let (sup, count) = counting_supervisor();
    sup.ensure_spawned(ChannelId::IMessage).unwrap();
    assert!(sup.is_spawned(ChannelId::IMessage));
    assert!(!sup.is_spawned(ChannelId::Slack));

    sup.ensure_spawned(ChannelId::Slack).unwrap();
    assert!(sup.is_spawned(ChannelId::Slack));
    assert_eq!(
        count.load(Ordering::SeqCst),
        2,
        "each channel spawns its own watcher exactly once"
    );
}

#[test]
fn spawn_error_leaves_channel_not_live_so_a_later_enable_retries() {
    // A spawn closure that fails the FIRST attempt and succeeds afterwards —
    // models an unprovisioned Slack channel that is then provisioned.
    let host = Arc::new(HostState::new());
    let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
    let attempts = Arc::new(AtomicUsize::new(0));
    let attempts_for_closure = attempts.clone();
    let spawn: SpawnFn = Box::new(
        move |_channel: ChannelId,
              _host: &Arc<HostState>,
              _liveness: &SharedLiveness,
              _cancel: tokio::sync::watch::Receiver<bool>|
              -> Result<(), String> {
            let n = attempts_for_closure.fetch_add(1, Ordering::SeqCst);
            if n == 0 {
                Err("not provisioned yet".to_string())
            } else {
                Ok(())
            }
        },
    );
    let sup = ChannelSupervisor::new(host, cancel_tx, spawn);

    // First enable fails — the channel must NOT be marked live (so a retry can
    // re-attempt once the precondition is met).
    let err = sup.ensure_spawned(ChannelId::Slack);
    assert!(err.is_err(), "the first spawn failed");
    assert!(
        !sup.is_spawned(ChannelId::Slack),
        "a failed spawn does not mark the channel live"
    );

    // Second enable succeeds and marks it live.
    sup.ensure_spawned(ChannelId::Slack).unwrap();
    assert!(sup.is_spawned(ChannelId::Slack));
    assert_eq!(attempts.load(Ordering::SeqCst), 2, "the retry re-attempted");
}