car-messaging 0.31.0

Multi-channel approval-transport adapters (iMessage + Slack) for the CAR daemon — inbound poller/orchestrator, Slack wire parsing, per-channel config/allowlist/pairing. Extracted from car-server-core (#418) to cut its test-binary link footprint.
//! U4 — Host-gated `messaging.test_send` self-test (daemon-side logic).
//!
//! `send_test` is a PURE send probe: it sends a fixed, clearly-labeled body to
//! the paired handle, records the outcome into liveness (so a pass genuinely
//! proves Automation works and a failure surfaces like a real send), and mints
//! NO approval/pairing mapping — it resolves nothing. These tests pin:
//!
//! - Not enabled → clear error, no send attempted.
//! - Enabled but unpaired → clear error, no send attempted.
//! - Enabled + paired + success → Ok; liveness `last_send_*` updated; a
//!   concurrently-pending real approval is NOT resolved/consumed by the test.
//! - Enabled + paired + hard-fail (Automation denied) → the reason surfaces.
//!
//! The host-gate itself (a non-host caller is rejected) is the SAME
//! `require_approval_authority` boundary the existing `messaging.config.*` /
//! `messaging.pairing.*` methods use — covered by `messaging_config_gate.rs`;
//! `messaging.status` / `messaging.test_send` call it identically (verified by
//! source inspection in the handler), so an inbound message (no host token) can
//! never reach them.

use car_proto::{CreateHostApprovalRequest, HostApprovalStatus};
use car_messaging::channel_supervisor::{ChannelLiveness, SharedLiveness};
use car_messaging::messaging_config::{ChannelId, MessagingConfigStore};
use car_messaging::messaging_orchestrator::{
    MessageSender, MessagingOrchestrator, SendOutcome, TEST_SEND_BODY,
};
use car_server_types::host::HostState;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;

const PAIRED: &str = "+15551234567";

/// A capturing sender whose result is fixed (success or a hard error), and that
/// records every `(handle, body)` so the test can assert the fixed test body.
struct CapturingSender {
    fail: Option<String>,
    sent: Mutex<Vec<(String, String)>>,
}
impl CapturingSender {
    fn ok() -> Arc<Self> {
        Arc::new(Self {
            fail: None,
            sent: Mutex::new(Vec::new()),
        })
    }
    fn hard_fail(reason: &str) -> Arc<Self> {
        Arc::new(Self {
            fail: Some(reason.to_string()),
            sent: Mutex::new(Vec::new()),
        })
    }
    fn calls(&self) -> Vec<(String, String)> {
        self.sent.lock().unwrap().clone()
    }
}
impl MessageSender for CapturingSender {
    fn send(&self, handle: &str, body: &str) -> Result<SendOutcome, String> {
        self.sent
            .lock()
            .unwrap()
            .push((handle.to_string(), body.to_string()));
        match &self.fail {
            Some(reason) => Err(reason.clone()),
            None => Ok(SendOutcome::ok()),
        }
    }
}

fn fresh_liveness() -> SharedLiveness {
    Arc::new(Mutex::new(HashMap::new()))
}

fn snapshot(liveness: &SharedLiveness) -> ChannelLiveness {
    liveness
        .lock()
        .unwrap()
        .get(&ChannelId::IMessage)
        .cloned()
        .unwrap_or_default()
}

/// Build an orchestrator with a chosen enabled/paired state + a sender + shared
/// liveness.
fn build(
    enabled: bool,
    paired: bool,
    sender: Arc<dyn MessageSender>,
    liveness: SharedLiveness,
) -> (MessagingOrchestrator, Arc<HostState>, TempDir) {
    let dir = TempDir::new().expect("tempdir");
    let store = MessagingConfigStore::with_base_dir(dir.path());
    if enabled {
        store.set_enabled(true).unwrap();
    }
    if paired {
        store.add_handle(PAIRED).unwrap();
    }
    let host = Arc::new(HostState::new());
    let orch = MessagingOrchestrator::with_liveness(
        host.clone(),
        MessagingConfigStore::with_base_dir(dir.path()),
        sender,
        dir.path().to_path_buf(),
        liveness,
    );
    (orch, host, dir)
}

#[tokio::test]
async fn disabled_returns_clear_error_no_send() {
    let liveness = fresh_liveness();
    let sender = CapturingSender::ok();
    let (orch, _host, _dir) = build(false, true, sender.clone() as Arc<dyn MessageSender>, liveness);

    let err = orch.send_test().await.expect_err("disabled ⇒ error");
    assert!(
        err.to_lowercase().contains("off"),
        "the error names the channel being off: {err}"
    );
    assert_eq!(sender.calls().len(), 0, "no send attempted when disabled");
}

#[tokio::test]
async fn enabled_unpaired_returns_clear_error_no_send() {
    let liveness = fresh_liveness();
    let sender = CapturingSender::ok();
    let (orch, _host, _dir) = build(true, false, sender.clone() as Arc<dyn MessageSender>, liveness);

    let err = orch.send_test().await.expect_err("unpaired ⇒ error");
    assert!(
        err.to_lowercase().contains("paired") || err.to_lowercase().contains("pairing"),
        "the error tells the user to pair first: {err}"
    );
    assert_eq!(sender.calls().len(), 0, "no send attempted when unpaired");
}

#[tokio::test]
async fn paired_success_sends_fixed_body_updates_liveness_creates_no_mapping() {
    let liveness = fresh_liveness();
    let sender = CapturingSender::ok();
    let (orch, host, _dir) =
        build(true, true, sender.clone() as Arc<dyn MessageSender>, liveness.clone());

    // Seed a REAL pending approval BEFORE the test send. The test send must NOT
    // touch it (no code mapping, no resolve) — it stays pending afterwards.
    let req = CreateHostApprovalRequest {
        agent_id: None,
        action: "approve a real action".to_string(),
        details: serde_json::Value::Null,
        options: vec![],
        system_level: true,
    };
    let approval_id = host.create_approval(None, req).await.unwrap().id;

    orch.send_test().await.expect("paired + ok ⇒ Ok");

    // Sent exactly the fixed, labeled test body to the paired handle.
    let calls = sender.calls();
    assert_eq!(calls.len(), 1, "exactly one test send");
    assert_eq!(calls[0].0, PAIRED, "sent to the paired handle");
    assert_eq!(calls[0].1, TEST_SEND_BODY, "sent the fixed labeled test body");

    // Liveness updated.
    let snap = snapshot(&liveness);
    assert_eq!(snap.last_send_ok, Some(true), "test send recorded as success");
    assert!(snap.last_send_at_ms.is_some(), "last delivered is set");

    // The real approval is UNTOUCHED — the test send minted no mapping and
    // resolved nothing.
    let still = host
        .approvals()
        .await
        .into_iter()
        .find(|a| a.id == approval_id)
        .expect("approval still present");
    assert_eq!(
        still.status,
        HostApprovalStatus::Pending,
        "the test send did not resolve the real pending approval"
    );

    // And a normal outbound observe still works for that approval (the test
    // send did not pollute the code space): it sends ONE approval prompt.
    orch.observe_and_notify().await;
    let after = sender.calls();
    assert_eq!(
        after.len(),
        2,
        "the real approval still gets exactly one prompt after the test send"
    );
    assert_ne!(
        after[1].1, TEST_SEND_BODY,
        "the approval prompt is a real prompt, not the test body"
    );
}

#[tokio::test]
async fn paired_hard_fail_surfaces_reason() {
    let liveness = fresh_liveness();
    let sender = CapturingSender::hard_fail("osascript failed: not authorized (Automation denied)");
    let (orch, _host, _dir) =
        build(true, true, sender.clone() as Arc<dyn MessageSender>, liveness.clone());

    let err = orch.send_test().await.expect_err("hard fail ⇒ error");
    assert!(
        err.contains("not authorized") || err.to_lowercase().contains("automation"),
        "the Automation-denied reason surfaces verbatim: {err}"
    );

    // The failure is recorded into liveness (surfaced, not swallowed).
    let snap = snapshot(&liveness);
    assert_eq!(snap.last_send_ok, Some(false), "hard fail recorded as failure");
    assert!(snap.last_error.is_some(), "the error is recorded for the pane");
}