use crate::approval_core::{ApprovalCore, ResolveOutcome};
use crate::channel::{
CancelSignal, ChannelId, ChannelRegistryHandles, InboundChannel, InboundSink, SharedHost,
};
use crate::host::HostState;
use crate::messaging_config::MessagingConfigStore;
use car_ffi_common::integrations::InboundMessage;
use car_proto::HostApprovalRequest;
#[cfg(test)]
use car_proto::HostApprovalStatus;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
const IMESSAGE_PRINCIPAL: &str = "imessage-transport";
const APPROVE: &str = "approve";
const DENY: &str = "deny";
pub trait MessageSender: Send + Sync {
fn send(&self, handle: &str, body: &str) -> Result<(), String>;
}
#[derive(Debug, Clone, Default)]
pub struct RealMessageSender;
impl MessageSender for RealMessageSender {
fn send(&self, handle: &str, body: &str) -> Result<(), String> {
let req = serde_json::json!({ "recipient": handle, "body": body });
let req_json = req.to_string();
car_ffi_common::integrations::messages_send(&req_json).map(|_| ())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InboundIntent {
Approve { code: Option<String> },
Deny { code: Option<String> },
PairingCode(String),
Ignore,
}
const PAIRING_CODE_LEN: usize = 43;
const APPROVAL_CODE_LEN: usize = 2;
pub struct MessagingOrchestrator {
core: ApprovalCore,
config: MessagingConfigStore,
sender: Arc<dyn MessageSender>,
base_dir: std::path::PathBuf,
codes: Mutex<CodeMap>,
parse_calls: std::sync::atomic::AtomicU64,
}
#[derive(Default)]
struct CodeMap {
code_to_id: HashMap<String, String>,
id_to_code: HashMap<String, String>,
next: u64,
}
impl CodeMap {
fn mint(&mut self) -> String {
let n = self.next;
self.next += 1;
let letter = (b'A' + (n % 26) as u8) as char;
let num = n / 26;
format!("{letter}{num}")
}
}
impl MessagingOrchestrator {
pub fn new(
host: Arc<HostState>,
config: MessagingConfigStore,
sender: Arc<dyn MessageSender>,
base_dir: impl Into<std::path::PathBuf>,
) -> Self {
Self {
core: ApprovalCore::new(host),
config,
sender,
base_dir: base_dir.into(),
codes: Mutex::new(CodeMap::default()),
parse_calls: std::sync::atomic::AtomicU64::new(0),
}
}
fn host(&self) -> &Arc<HostState> {
self.core.host()
}
pub fn parse_call_count(&self) -> u64 {
self.parse_calls.load(std::sync::atomic::Ordering::Relaxed)
}
pub async fn observe_and_notify(&self) {
if !self.config.is_enabled().unwrap_or(false) {
return;
}
let Some(recipient) = self.paired_handle() else {
return; };
let approvals = self.host().approvals().await;
{
let live_pending: std::collections::HashSet<&str> = approvals
.iter()
.filter(|a| ApprovalCore::is_eligible_pending(a))
.map(|a| a.id.as_str())
.collect();
let mut codes = self.codes.lock().await;
let stale: Vec<String> = codes
.id_to_code
.keys()
.filter(|id| !live_pending.contains(id.as_str()))
.cloned()
.collect();
for id in stale {
if let Some(code) = codes.id_to_code.remove(&id) {
codes.code_to_id.remove(&code);
}
}
}
for approval in approvals {
if !ApprovalCore::is_eligible_pending(&approval) {
continue;
}
let code = {
let mut codes = self.codes.lock().await;
if codes.id_to_code.contains_key(&approval.id) {
continue;
}
let code = codes.mint();
codes.code_to_id.insert(code.clone(), approval.id.clone());
codes.id_to_code.insert(approval.id.clone(), code.clone());
code
};
let body = outbound_body(&approval, &code);
if let Err(e) = self.sender.send(&recipient, &body) {
{
let mut codes = self.codes.lock().await;
if let Some(c) = codes.id_to_code.remove(&approval.id) {
codes.code_to_id.remove(&c);
}
}
tracing::warn!(
approval_id = %approval.id,
error = %e,
"iMessage approval prompt send failed; rolled back code, will retry next tick"
);
}
}
}
pub async fn send_shared_prompt(&self, approval: &HostApprovalRequest, code: &str) {
if !self.config.is_enabled().unwrap_or(false) {
return;
}
let Some(recipient) = self.paired_handle() else {
return;
};
{
let mut codes = self.codes.lock().await;
if codes.id_to_code.contains_key(&approval.id) {
return;
}
codes
.code_to_id
.insert(code.to_string(), approval.id.clone());
codes
.id_to_code
.insert(approval.id.clone(), code.to_string());
}
let body = outbound_body(approval, code);
if let Err(e) = self.sender.send(&recipient, &body) {
let mut codes = self.codes.lock().await;
if let Some(c) = codes.id_to_code.remove(&approval.id) {
codes.code_to_id.remove(&c);
}
tracing::warn!(
approval_id = %approval.id,
error = %e,
"iMessage shared-code prompt send failed; rolled back, will retry next tick"
);
}
}
pub async fn handle_inbound(&self, msg: &InboundMessage) {
if !self.config.is_enabled().unwrap_or(false) {
return;
}
if !self.config.is_allowlisted(&msg.handle_id).unwrap_or(false) {
return;
}
self.parse_calls
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let intent = parse_inbound(&msg.body);
match intent {
InboundIntent::Approve { code } => {
self.resolve_intent(&msg.handle_id, code, APPROVE).await;
}
InboundIntent::Deny { code } => {
self.resolve_intent(&msg.handle_id, code, DENY).await;
}
InboundIntent::PairingCode(candidate) => {
let _ = self
.config
.validate_and_consume_pairing_code(&msg.handle_id, &candidate);
}
InboundIntent::Ignore => {}
}
}
async fn resolve_intent(&self, handle: &str, code: Option<String>, resolution: &str) {
match code {
Some(code) => {
if let Some(approval_id) = self.lookup_pending_code(&code).await {
self.resolve(&approval_id, resolution).await;
}
}
None => {
let pending = self.pending_codes().await;
match pending.len() {
0 => {}
1 => {
let approval_id = pending[0].1.clone();
self.resolve(&approval_id, resolution).await;
}
_ => {
let reply = disambiguation_body(&pending);
if let Err(e) = self.sender.send(handle, &reply) {
tracing::warn!(error = %e, "disambiguation reply send failed");
}
}
}
}
}
}
async fn resolve(&self, approval_id: &str, resolution: &str) {
if self.core.resolve(IMESSAGE_PRINCIPAL, approval_id, resolution).await
== ResolveOutcome::Resolved
{
let mut codes = self.codes.lock().await;
if let Some(code) = codes.id_to_code.remove(approval_id) {
codes.code_to_id.remove(&code);
}
}
}
async fn lookup_pending_code(&self, code: &str) -> Option<String> {
let approval_id = {
let codes = self.codes.lock().await;
codes.code_to_id.get(code).cloned()?
};
self.core
.is_id_eligible_pending(&approval_id)
.await
.then_some(approval_id)
}
async fn pending_codes(&self) -> Vec<(String, String)> {
let pending_ids: std::collections::HashSet<String> = self
.core
.eligible_pending()
.await
.into_iter()
.map(|a| a.id)
.collect();
let codes = self.codes.lock().await;
let mut out: Vec<(String, String)> = codes
.code_to_id
.iter()
.filter(|(_, id)| pending_ids.contains(*id))
.map(|(c, id)| (c.clone(), id.clone()))
.collect();
out.sort_by(|a, b| a.0.cmp(&b.0));
out
}
fn paired_handle(&self) -> Option<String> {
self.config.allowlist().ok()?.into_iter().next()
}
pub async fn poll_once<FMax, FNew>(
&self,
read_max: FMax,
read_new: FNew,
) -> Result<usize, String>
where
FMax: Fn() -> Result<i64, String>,
FNew: Fn(i64) -> Result<Vec<InboundMessage>, String>,
{
if !self.config.is_enabled().unwrap_or(false) {
return Ok(0);
}
use car_ffi_common::integrations::Watermark;
let existing = Watermark::load(&self.base_dir).map_err(|e| e.to_string())?;
let last = match existing {
Some(w) => w.last_rowid,
None => {
let seed = read_max()?;
Watermark::new(seed)
.persist(&self.base_dir)
.map_err(|e| e.to_string())?;
return Ok(0);
}
};
let rows = read_new(last)?;
if rows.is_empty() {
return Ok(0);
}
let mut max_seen = last;
for row in &rows {
if row.rowid > max_seen {
max_seen = row.rowid;
}
self.handle_inbound(row).await;
}
if max_seen > last {
Watermark::new(max_seen)
.persist(&self.base_dir)
.map_err(|e| e.to_string())?;
}
Ok(rows.len())
}
#[cfg(target_os = "macos")]
pub async fn poll_once_default_db(&self) -> Result<usize, String> {
self.poll_once(
|| car_ffi_common::integrations::messages_max_rowid().map_err(|e| e.to_string()),
|min| {
car_ffi_common::integrations::messages_read_inbound(min).map_err(|e| e.to_string())
},
)
.await
}
#[cfg(target_os = "macos")]
pub async fn run_inbound_loop(
&self,
interval: std::time::Duration,
max_iterations: Option<u32>,
mut cancel: tokio::sync::watch::Receiver<bool>,
) {
let mut iterations: u32 = 0;
loop {
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
self.observe_and_notify().await;
if let Err(e) = self.poll_once_default_db().await {
tracing::debug!(error = %e, "messaging poll tick failed");
}
iterations += 1;
if let Some(max) = max_iterations {
if iterations >= max {
break;
}
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
break;
}
}
}
}
}
#[cfg(target_os = "macos")]
pub async fn run_inbound_only_loop(
&self,
interval: std::time::Duration,
mut cancel: tokio::sync::watch::Receiver<bool>,
) {
loop {
if let Err(e) = self.poll_once_default_db().await {
tracing::debug!(error = %e, "messaging inbound poll tick failed");
}
tokio::select! {
_ = tokio::time::sleep(interval) => {}
_ = cancel.changed() => {
if *cancel.borrow() {
break;
}
}
}
}
}
}
#[cfg(target_os = "macos")]
#[async_trait::async_trait]
impl InboundChannel for MessagingOrchestrator {
fn channel(&self) -> ChannelId {
ChannelId::IMessage
}
async fn run(&self, _sink: &dyn InboundSink, cancel: CancelSignal) {
self.run_inbound_loop(std::time::Duration::from_secs(2), None, cancel)
.await;
}
}
pub fn spawn_channel_pollers(host: SharedHost) -> ChannelRegistryHandles {
let (cancel_tx, _cancel_rx) = tokio::sync::watch::channel(false);
let store = MessagingConfigStore::from_home();
let core = crate::approval_core::ApprovalCore::new(host.clone());
#[cfg(target_os = "macos")]
let imessage: Option<Arc<MessagingOrchestrator>> = {
if store.is_enabled_for(ChannelId::IMessage).unwrap_or(false) {
let base_dir = std::env::var_os("HOME")
.map(std::path::PathBuf::from)
.unwrap_or_else(|| std::path::PathBuf::from("."))
.join(".car");
let orch = Arc::new(MessagingOrchestrator::new(
host.clone(),
MessagingConfigStore::from_home(),
Arc::new(RealMessageSender),
base_dir,
));
let orch_in = orch.clone();
let cancel_rx = cancel_tx.subscribe();
tokio::spawn(async move {
orch_in
.run_inbound_only_loop(std::time::Duration::from_secs(2), cancel_rx)
.await;
});
Some(orch)
} else {
None
}
};
#[cfg(not(target_os = "macos"))]
let imessage: Option<Arc<MessagingOrchestrator>> = None;
let slack: Option<Arc<crate::slack_adapter::SlackAdapter>> = {
if store.is_enabled_for(ChannelId::Slack).unwrap_or(false) {
match store.slack_token_ref_for(ChannelId::Slack).unwrap_or(None) {
Some(token_ref) => {
let post_channel = store
.slack_channel_id_for(ChannelId::Slack)
.unwrap_or(None)
.unwrap_or_default();
if post_channel.is_empty() {
tracing::warn!(
"slack channel enabled but no slack_channel id configured \
(no slack_channel in messaging.json) — outbound approval \
prompts will be skipped; inbound pairing + button resolve \
still active. Set via messaging.config.set \
{{ channel: \"slack\", slack_channel: \"<id>\" }}"
);
}
let transport = Arc::new(crate::slack_adapter::RealSlackTransport::new(
token_ref.bot_token_key.clone(),
token_ref.app_token_key.clone(),
));
transport.clone().spawn_socket_loop(cancel_tx.subscribe());
let adapter = Arc::new(crate::slack_adapter::SlackAdapter::new(
host.clone(),
MessagingConfigStore::from_home(),
transport as Arc<dyn crate::slack_adapter::SlackTransport>,
post_channel,
));
let adapter_in = adapter.clone();
let cancel_rx = cancel_tx.subscribe();
tokio::spawn(async move {
let sink = NoopSink;
adapter_in.run(&sink, cancel_rx).await;
});
Some(adapter)
}
None => {
tracing::warn!(
"slack channel enabled but no tokens provisioned \
(no slack_token_ref in messaging.json) — skipping slack adapter; \
provision via messaging.config.set {{ channel: \"slack\", bot_token, app_token }}"
);
None
}
}
} else {
None
}
};
if imessage.is_some() || slack.is_some() {
let coordinator = crate::fanout::FanoutCoordinator::new(core, imessage, slack);
let mut cancel_rx = cancel_tx.subscribe();
tokio::spawn(async move {
loop {
coordinator.observe_and_fanout().await;
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(2)) => {}
_ = cancel_rx.changed() => {
if *cancel_rx.borrow() {
break;
}
}
}
}
});
}
ChannelRegistryHandles { cancel_tx }
}
struct NoopSink;
#[async_trait::async_trait]
impl InboundSink for NoopSink {
async fn deliver(&self, _channel: ChannelId, _msg: &InboundMessage) {}
}
pub fn outbound_body(approval: &HostApprovalRequest, code: &str) -> String {
format!(
"Approval needed: {action}\nReply `{code} approve` or `{code} deny`.",
action = approval.action,
code = code,
)
}
fn disambiguation_body(pending: &[(String, String)]) -> String {
let mut s = String::from(
"Multiple approvals are pending. Reply with a code, e.g. `<code> approve`:\n",
);
for (code, _id) in pending {
s.push_str(&format!("• {code}\n"));
}
s
}
fn parse_inbound(body: &str) -> InboundIntent {
let trimmed = body.trim();
if trimmed.is_empty() {
return InboundIntent::Ignore;
}
let tokens: Vec<&str> = trimmed.split_whitespace().collect();
if tokens.len() == 1 {
let t = tokens[0];
if t.eq_ignore_ascii_case(APPROVE) {
return InboundIntent::Approve { code: None };
}
if t.eq_ignore_ascii_case(DENY) {
return InboundIntent::Deny { code: None };
}
if looks_like_pairing_code(t) {
return InboundIntent::PairingCode(t.to_string());
}
return InboundIntent::Ignore;
}
if tokens.len() == 2 {
let (a, b) = (tokens[0], tokens[1]);
if let Some(intent) = code_word(a, b) {
return intent;
}
if let Some(intent) = code_word(b, a) {
return intent;
}
}
InboundIntent::Ignore
}
fn code_word(code: &str, word: &str) -> Option<InboundIntent> {
if !looks_like_approval_code(code) {
return None;
}
if word.eq_ignore_ascii_case(APPROVE) {
return Some(InboundIntent::Approve {
code: Some(code.to_uppercase()),
});
}
if word.eq_ignore_ascii_case(DENY) {
return Some(InboundIntent::Deny {
code: Some(code.to_uppercase()),
});
}
None
}
fn looks_like_approval_code(t: &str) -> bool {
let bytes = t.as_bytes();
if bytes.len() < APPROVAL_CODE_LEN || bytes.len() > 5 {
return false;
}
bytes[0].is_ascii_alphabetic() && bytes[1..].iter().all(|b| b.is_ascii_digit())
}
fn looks_like_pairing_code(t: &str) -> bool {
t.len() == PAIRING_CODE_LEN
&& t.bytes()
.all(|b| b.is_ascii_alphanumeric() || b == b'-' || b == b'_')
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_approval() -> HostApprovalRequest {
HostApprovalRequest {
id: "id0".to_string(),
agent_id: None,
client_id: None,
action: "send wire transfer".to_string(),
details: serde_json::Value::Null,
options: vec![],
status: HostApprovalStatus::Pending,
created_at: chrono::Utc::now(),
resolved_at: None,
resolution: None,
}
}
#[test]
fn parse_inbound_ignores_daemon_authored_bodies() {
let prompt = outbound_body(&sample_approval(), "A0");
assert_eq!(
parse_inbound(&prompt),
InboundIntent::Ignore,
"the daemon's own outbound prompt body must parse to Ignore, got {prompt:?}"
);
let disambig = disambiguation_body(&[
("A0".to_string(), "id0".to_string()),
("B0".to_string(), "id1".to_string()),
]);
assert_eq!(
parse_inbound(&disambig),
InboundIntent::Ignore,
"the daemon's own disambiguation body must parse to Ignore, got {disambig:?}"
);
}
}