car-messaging 0.31.0

Multi-channel approval-transport adapters (iMessage + Slack) for the CAR daemon — inbound poller/orchestrator, Slack wire parsing, per-channel config/allowlist/pairing. Extracted from car-server-core (#418) to cut its test-binary link footprint.
//! Runtime channel supervisor + per-channel liveness (Units 1, 2, 3).
//!
//! The approval-transport adapters (`spawn_channel_pollers`) used to be spawned
//! ONCE at daemon boot, for channels already enabled in `messaging.json`. There
//! was no handle the live `messaging.config.set` path could reach to spawn a
//! channel's watcher when the user flipped it on while car-server was already
//! running — so a first-time enable did nothing until a restart (the invisible
//! failure this feature closes). [`ChannelSupervisor`] is that handle.
//!
//! It owns three things:
//!
//! 1. **The live-channel set** (`live: Mutex<HashSet<ChannelId>>`) — which
//!    channels currently have a spawned watcher. [`ChannelSupervisor::ensure_spawned`]
//!    is idempotent against this set: a second enable of an already-spawned
//!    channel is a no-op (U1). Boot records the channels it spawned via
//!    [`ChannelSupervisor::mark_spawned`].
//! 2. **The cancel signal** — the same `tokio::sync::watch::Sender<bool>` the
//!    boot registry returned, so shutdown can stop every loop. A channel spawned
//!    at runtime subscribes to the SAME signal, so it stops on shutdown too.
//! 3. **Per-channel liveness** (`ChannelLiveness`) — the real runtime health the
//!    host-gated `messaging.status` method reads and the outbound send path
//!    writes (U2/U3). `watcher_running` is derived from the live set;
//!    `last_send_*`/`last_error` are written by the send path.
//!
//! There are NO cargo feature flags here (CLAUDE.md hard rule #1). The iMessage
//! adapter the supervisor spawns is `#[cfg(target_os = "macos")]`-gated inside
//! its own spawn body; the supervisor, the live set, and the liveness struct are
//! cross-platform.

use car_server_types::channel::{ChannelId, SharedHost};
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

/// A point-in-time snapshot of one channel's outbound-send health, recorded by
/// the send path (U3) and read by `messaging.status` (U2). All fields are
/// `Option`/`bool` so a never-sent channel reports cleanly (no send recorded).
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct ChannelLiveness {
    /// Unix-epoch milliseconds of the most recent send ATTEMPT that produced a
    /// recorded outcome (success or failure). `None` until the first send.
    pub last_send_at_ms: Option<i64>,
    /// Whether the most recent recorded send succeeded. `None` until the first
    /// send; `Some(false)` for BOTH a hard error and a soft `sent:false`.
    pub last_send_ok: Option<bool>,
    /// Human-readable reason for the most recent FAILURE (hard error string or
    /// the soft `sent:false` reason). `None` when the last send succeeded or no
    /// send has happened. NOT cleared to `None` on success — instead a success
    /// sets `last_send_ok:true` and clears it; see [`Self::record_success`].
    pub last_error: Option<String>,
}

impl ChannelLiveness {
    /// Record a successful send: stamp the time, mark ok, clear any prior error.
    fn record_success(&mut self, at_ms: i64) {
        self.last_send_at_ms = Some(at_ms);
        self.last_send_ok = Some(true);
        self.last_error = None;
    }

    /// Record a failed send (hard error OR soft `sent:false`): stamp the time,
    /// mark not-ok, store the reason.
    fn record_failure(&mut self, at_ms: i64, reason: impl Into<String>) {
        self.last_send_at_ms = Some(at_ms);
        self.last_send_ok = Some(false);
        self.last_error = Some(reason.into());
    }
}

/// The spawn closure a [`ChannelSupervisor`] calls to start a channel's
/// watcher loop at runtime. It is handed the supervisor's `SharedHost`, the
/// shared liveness map, and a fresh cancel receiver subscribed to the
/// supervisor's cancel signal. Returns `Ok(())` if the watcher was spawned (or
/// is a no-op on this platform), `Err` with a reason if the channel cannot be
/// spawned (e.g. an unprovisioned Slack channel) — the caller surfaces that.
///
/// Boxed + `Send + Sync` so the supervisor can hold one closure for the
/// process lifetime and call it from the async `messaging.config.set` handler.
pub type SpawnFn = Box<
    dyn Fn(ChannelId, &SharedHost, &SharedLiveness, tokio::sync::watch::Receiver<bool>)
            -> Result<(), String>
        + Send
        + Sync,
>;

/// Shared, lock-guarded per-channel liveness map. Written by the send path
/// (U3) and read by `messaging.status` (U2). A `std::sync::Mutex` (not tokio)
/// because every access is a tiny, non-await critical section.
pub type SharedLiveness = Arc<Mutex<HashMap<ChannelId, ChannelLiveness>>>;

/// Runtime supervisor for the approval-transport channels. Held as an
/// `Arc` on [`crate::session::ServerState`] (lazy-initialized at boot by
/// `spawn_channel_pollers`) so the host-gated `messaging.config.set` handler
/// can reach it to spawn a channel's watcher the instant the user enables it —
/// no daemon/app restart (U1).
pub struct ChannelSupervisor {
    /// Shared host the spawned adapters resolve approvals against.
    host: SharedHost,
    /// Which channels currently have a spawned watcher. Guards idempotency:
    /// `ensure_spawned` is a no-op for a channel already in this set.
    live: Mutex<HashSet<ChannelId>>,
    /// Cancel signal shared with every spawned loop (boot + runtime). Flip to
    /// `true` to stop them all on shutdown.
    cancel_tx: tokio::sync::watch::Sender<bool>,
    /// The spawn closure for a single channel's watcher. `None` only in unit
    /// tests that exercise the live-set bookkeeping without spawning real loops.
    spawn: Option<SpawnFn>,
    /// Per-channel send health, shared with the send path and `messaging.status`.
    liveness: SharedLiveness,
}

impl ChannelSupervisor {
    /// Build a supervisor over the given host, cancel signal, and spawn closure.
    /// The boot path (`spawn_channel_pollers`) constructs it, records the
    /// channels it already spawned via [`Self::mark_spawned`], and stores it on
    /// `ServerState`.
    pub fn new(
        host: SharedHost,
        cancel_tx: tokio::sync::watch::Sender<bool>,
        spawn: SpawnFn,
    ) -> Self {
        Self {
            host,
            live: Mutex::new(HashSet::new()),
            cancel_tx,
            spawn: Some(spawn),
            liveness: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    /// Build a supervisor with NO spawn closure — for unit tests that assert the
    /// idempotent live-set bookkeeping without standing up real watcher loops.
    /// `ensure_spawned` records the channel as live but spawns nothing.
    #[cfg(test)]
    pub fn new_for_test(host: SharedHost) -> Self {
        let (cancel_tx, _rx) = tokio::sync::watch::channel(false);
        Self {
            host,
            live: Mutex::new(HashSet::new()),
            cancel_tx,
            spawn: None,
            liveness: Arc::new(Mutex::new(HashMap::new())),
        }
    }

    /// The shared liveness map (so the boot path can hand the SAME `Arc` to the
    /// adapters it spawns, and `messaging.status` can read it).
    pub fn liveness(&self) -> SharedLiveness {
        self.liveness.clone()
    }

    /// Stop every spawned adapter loop (boot + runtime) by flipping the shared
    /// cancel signal. Called by the daemon shutdown path. Idempotent.
    pub fn cancel_all(&self) {
        let _ = self.cancel_tx.send(true);
    }

    /// Record that `channel`'s watcher is already running (called by the boot
    /// path for each channel it spawned at startup). Idempotent.
    pub fn mark_spawned(&self, channel: ChannelId) {
        self.live.lock().unwrap().insert(channel);
    }

    /// Whether `channel`'s watcher is currently spawned. Read by
    /// `messaging.status` for the `watcher_running` field (U2).
    pub fn is_spawned(&self, channel: ChannelId) -> bool {
        self.live.lock().unwrap().contains(&channel)
    }

    /// Ensure `channel`'s watcher is spawned (U1 — the invisible-restart fix).
    /// Idempotent: if the channel is already live, this is a no-op and returns
    /// `Ok(())`. Otherwise it calls the spawn closure with a fresh cancel
    /// receiver, records the channel as live on success, and returns the
    /// closure's result. A spawn error leaves the channel NOT marked live (so a
    /// later retry can re-attempt).
    ///
    /// Called from `messaging.config.set` after a successful off→on transition.
    /// Disable does NOT abort the loop (KTD1) — `poll_once`/`observe_and_notify`
    /// already gate on the enabled flag, so a disabled channel does zero work
    /// per tick; aborting would only add lifecycle complexity.
    pub fn ensure_spawned(&self, channel: ChannelId) -> Result<(), String> {
        // Idempotency guard FIRST — a second enable is a no-op (no duplicate
        // watcher), held under the live-set lock so two concurrent set calls
        // can't both pass the check and double-spawn.
        {
            let mut live = self.live.lock().unwrap();
            if live.contains(&channel) {
                return Ok(());
            }
            // Reserve the slot BEFORE spawning so a concurrent call short-
            // circuits. If the spawn fails we remove it again below.
            live.insert(channel);
        }

        let result = match &self.spawn {
            Some(spawn) => spawn(
                channel,
                &self.host,
                &self.liveness,
                self.cancel_tx.subscribe(),
            ),
            // No spawn closure (test supervisor): the channel is recorded live
            // above; there is nothing to start.
            None => Ok(()),
        };

        if result.is_err() {
            // Roll back the reservation so a later enable can retry.
            self.live.lock().unwrap().remove(&channel);
        }
        result
    }

    /// Record a successful send for `channel` into the shared liveness (U3).
    /// Stamps `now`, marks ok, clears any prior error.
    pub fn record_send_success(&self, channel: ChannelId) {
        let now = now_ms();
        self.liveness
            .lock()
            .unwrap()
            .entry(channel)
            .or_default()
            .record_success(now);
    }

    /// Record a failed send for `channel` (hard error OR soft `sent:false`)
    /// into the shared liveness (U3). Stamps `now`, marks not-ok, stores the
    /// reason.
    pub fn record_send_failure(&self, channel: ChannelId, reason: impl Into<String>) {
        let now = now_ms();
        self.liveness
            .lock()
            .unwrap()
            .entry(channel)
            .or_default()
            .record_failure(now, reason);
    }

    /// Snapshot `channel`'s liveness (default if no send recorded yet). Read by
    /// `messaging.status`.
    pub fn liveness_snapshot(&self, channel: ChannelId) -> ChannelLiveness {
        self.liveness
            .lock()
            .unwrap()
            .get(&channel)
            .cloned()
            .unwrap_or_default()
    }
}

/// Free function: record a successful send into a [`SharedLiveness`] map — used
/// by the spawned adapters (which hold the `Arc<Mutex<…>>` directly, not the
/// whole supervisor) so the send path and `messaging.status` agree.
pub fn liveness_record_success(liveness: &SharedLiveness, channel: ChannelId) {
    let now = now_ms();
    liveness
        .lock()
        .unwrap()
        .entry(channel)
        .or_default()
        .record_success(now);
}

/// Free function: record a failed send into a [`SharedLiveness`] map.
pub fn liveness_record_failure(
    liveness: &SharedLiveness,
    channel: ChannelId,
    reason: impl Into<String>,
) {
    let now = now_ms();
    liveness
        .lock()
        .unwrap()
        .entry(channel)
        .or_default()
        .record_failure(now, reason);
}

/// Current Unix-epoch milliseconds. A monotonic-ish wall clock for the
/// last-delivered display; a small skew is acceptable (the UI shows "2m ago").
fn now_ms() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .map(|d| d.as_millis() as i64)
        .unwrap_or(0)
}

#[cfg(test)]
mod tests {
    use super::*;
    use car_server_types::host::HostState;

    fn host() -> SharedHost {
        Arc::new(HostState::new())
    }

    #[test]
    fn ensure_spawned_is_idempotent() {
        let sup = ChannelSupervisor::new_for_test(host());
        assert!(!sup.is_spawned(ChannelId::IMessage));

        // First enable records the channel live.
        sup.ensure_spawned(ChannelId::IMessage).unwrap();
        assert!(sup.is_spawned(ChannelId::IMessage));

        // Second enable is a no-op — still exactly one (no duplicate). We assert
        // via the public is_spawned (still true) and that no error is returned.
        sup.ensure_spawned(ChannelId::IMessage).unwrap();
        assert!(sup.is_spawned(ChannelId::IMessage));

        // A different channel is independent.
        assert!(!sup.is_spawned(ChannelId::Slack));
    }

    #[test]
    fn mark_spawned_reflects_in_is_spawned() {
        let sup = ChannelSupervisor::new_for_test(host());
        sup.mark_spawned(ChannelId::IMessage);
        assert!(sup.is_spawned(ChannelId::IMessage));
        assert!(!sup.is_spawned(ChannelId::Slack));
    }

    #[test]
    fn liveness_records_success_and_failure() {
        let sup = ChannelSupervisor::new_for_test(host());

        // No send yet ⇒ default snapshot.
        let snap = sup.liveness_snapshot(ChannelId::IMessage);
        assert_eq!(snap, ChannelLiveness::default());
        assert!(snap.last_send_at_ms.is_none());

        // Record success.
        sup.record_send_success(ChannelId::IMessage);
        let snap = sup.liveness_snapshot(ChannelId::IMessage);
        assert_eq!(snap.last_send_ok, Some(true));
        assert!(snap.last_send_at_ms.is_some());
        assert!(snap.last_error.is_none());

        // Record failure — overrides ok, sets the reason.
        sup.record_send_failure(ChannelId::IMessage, "recipient not found");
        let snap = sup.liveness_snapshot(ChannelId::IMessage);
        assert_eq!(snap.last_send_ok, Some(false));
        assert_eq!(snap.last_error.as_deref(), Some("recipient not found"));

        // A later success clears the error.
        sup.record_send_success(ChannelId::IMessage);
        let snap = sup.liveness_snapshot(ChannelId::IMessage);
        assert_eq!(snap.last_send_ok, Some(true));
        assert!(snap.last_error.is_none());
    }
}