use anyhow::{bail, Context, Result};
use rusqlite::Connection;
use serde_json::{json, Value};
use std::env;
use std::fs;
use std::path::PathBuf;
use std::process::Command;
use std::thread;
use std::time::Duration;
use crate::codex::{
filter_watch_events, parse_event_filter, start_codex_watch_receiver, sync_state_from_live,
watch_events_from_sync_result, watch_thread_error_event, CodexAppServerClient,
};
use crate::state::{
create_state_db, deliver_due_outbound_events, enqueue_outbound_event, pending_outbound_count,
record_transport_delivery, should_emit_for_away_window, state_db_path,
transport_delivery_exists, OutboxDeliverySummary,
};
use crate::telegram::{deliver_telegram_event, process_telegram_updates, telegram_set_my_commands};
use crate::{
daemon_config_path, load_daemon_config, notification_event_id, now_millis, shell_quote,
state_dir_path, DaemonConfig,
};
#[derive(Debug, Clone)]
pub(crate) struct DaemonServiceSpec {
pub(crate) service_path: PathBuf,
pub(crate) stdout_log: PathBuf,
pub(crate) stderr_log: PathBuf,
pub(crate) unit_name: String,
pub(crate) contents: String,
pub(crate) install_command: String,
pub(crate) uninstall_command: String,
pub(crate) start_command: String,
pub(crate) stop_command: String,
pub(crate) status_command: String,
}
pub(crate) const DEFAULT_DAEMON_LABEL: &str = "com.hanifcarroll.codex-telegram-bridge";
fn event_observed_at(event: &Value) -> Option<u64> {
event
.get("updatedAt")
.and_then(Value::as_u64)
.or_else(|| event.get("observedAt").and_then(Value::as_u64))
.or_else(|| event.pointer("/thread/updatedAt").and_then(Value::as_u64))
}
fn should_enqueue_daemon_notification(conn: &Connection, event: &Value) -> Result<bool> {
let away_status = crate::get_away_mode(conn)?;
if !away_notifications_enabled_from_status(&away_status) {
return Ok(false);
}
let away_started_at = away_status.get("awayStartedAt").and_then(Value::as_u64);
Ok(should_emit_for_away_window(
away_started_at,
event_observed_at(event),
))
}
pub(crate) fn enqueue_daemon_notification_events(
conn: &Connection,
events: &[Value],
now: u64,
) -> Result<usize> {
let mut enqueued = 0usize;
for event in events {
if should_enqueue_daemon_notification(conn, event)?
&& enqueue_outbound_event(conn, event, now)?
{
enqueued += 1;
}
}
Ok(enqueued)
}
fn away_notifications_enabled_from_status(away_status: &Value) -> bool {
away_status.get("away").and_then(Value::as_bool) == Some(true)
}
fn away_notifications_enabled(conn: &Connection) -> Result<bool> {
Ok(away_notifications_enabled_from_status(
&crate::get_away_mode(conn)?,
))
}
fn deliver_outbound_events(
conn: &Connection,
config: &DaemonConfig,
now: u64,
timeout: Duration,
) -> Result<OutboxDeliverySummary> {
deliver_due_outbound_events(conn, now, 100, |event| {
let event_id = notification_event_id(event);
let telegram = config
.telegram
.as_ref()
.context("Telegram is not configured. Run setup first.")?;
let result = if transport_delivery_exists(conn, &event_id, "telegram")? {
json!({ "ok": true, "transport": "telegram", "skipped": "already_delivered" })
} else {
let result = deliver_telegram_event(conn, telegram, event, now, timeout)?;
record_transport_delivery(conn, &event_id, "telegram", &result, now)?;
result
};
Ok(json!({ "telegram": result }))
})
}
fn daemon_cycle(
conn: &Connection,
config: &DaemonConfig,
now: u64,
timeout: Duration,
) -> Result<Value> {
let filter = parse_event_filter(Some(&config.events));
let events = match CodexAppServerClient::connect().and_then(|mut client| {
let sync_result = sync_state_from_live(&mut client, conn, now, 50, true)?;
Ok(watch_events_from_sync_result(
&sync_result,
client.drain_notifications(),
filter.as_ref(),
))
}) {
Ok(events) => events,
Err(error) => filter_watch_events(vec![watch_thread_error_event(&error)], filter.as_ref()),
};
let enqueued = enqueue_daemon_notification_events(conn, &events, now)?;
let delivery = if away_notifications_enabled(conn)? {
deliver_outbound_events(conn, config, now, timeout)?
} else {
OutboxDeliverySummary::default()
};
let telegram_updates = match config.telegram.as_ref() {
Some(telegram) => match process_telegram_updates(conn, telegram, now, timeout) {
Ok(result) => result,
Err(error) => json!({
"ok": false,
"transport": "telegram",
"error": format!("{error:#}")
}),
},
None => Value::Null,
};
Ok(json!({
"ok": true,
"action": "daemon_cycle",
"observed": events.len(),
"enqueued": enqueued,
"delivery": delivery,
"telegramUpdates": telegram_updates,
"pending": pending_outbound_count(conn)?
}))
}
pub(crate) fn run_daemon(once: bool, poll_interval: u64, timeout: Duration) -> Result<()> {
let config = load_daemon_config()?;
let db_path = state_db_path()?;
let conn = create_state_db(&db_path)?;
if once {
let result = daemon_cycle(&conn, &config, now_millis()?, timeout)?;
println!("{}", serde_json::to_string(&result)?);
return Ok(());
}
let telegram_commands = config.telegram.as_ref().map(|telegram| {
telegram_set_my_commands(telegram, timeout)
.map(|_| json!({ "registered": true }))
.unwrap_or_else(|error| {
json!({
"registered": false,
"error": format!("{error:#}")
})
})
});
println!(
"{}",
serde_json::to_string(&json!({
"ok": true,
"action": "daemon_started",
"configPath": daemon_config_path()?.display().to_string(),
"events": config.events,
"telegramCommands": telegram_commands
}))?
);
let watch_rx = start_codex_watch_receiver().ok();
loop {
let result = daemon_cycle(&conn, &config, now_millis()?, timeout)?;
println!("{}", serde_json::to_string(&result)?);
if let Some(rx) = watch_rx.as_ref() {
rx.recv_timeout(Duration::from_millis(poll_interval));
} else {
thread::sleep(Duration::from_millis(poll_interval));
}
}
}
fn validate_daemon_label(label: &str) -> Result<&str> {
let trimmed = label.trim();
if trimmed.is_empty() {
bail!("daemon label cannot be empty");
}
if trimmed.contains('/') || trimmed.contains('\\') || trimmed.contains('\'') {
bail!("daemon label contains unsupported characters");
}
Ok(trimmed)
}
pub(crate) fn daemon_service_spec(label: &str, bridge_command: &str) -> Result<DaemonServiceSpec> {
let label = validate_daemon_label(label)?;
let bridge_command = bridge_command.trim();
if bridge_command.is_empty() {
bail!("bridge command cannot be empty");
}
let service_bridge_command = resolve_service_bridge_command(bridge_command);
let state_dir = state_dir_path()?;
let stdout_log = state_dir.join("daemon.out.log");
let stderr_log = state_dir.join("daemon.err.log");
let run_args = [
service_bridge_command.clone(),
"daemon".to_string(),
"run".to_string(),
];
if cfg!(target_os = "macos") {
let service_path = dirs::home_dir()
.context("home directory is not available")?
.join("Library")
.join("LaunchAgents")
.join(format!("{label}.plist"));
let args_xml = run_args
.iter()
.map(|arg| format!(" <string>{}</string>", xml_escape(arg)))
.collect::<Vec<_>>()
.join("\n");
let contents = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>{}</string>
<key>ProgramArguments</key>
<array>
{}
</array>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>{}</string>
<key>StandardErrorPath</key>
<string>{}</string>
</dict>
</plist>
"#,
xml_escape(label),
args_xml,
xml_escape(&stdout_log.display().to_string()),
xml_escape(&stderr_log.display().to_string())
);
let quoted_path = shell_quote(&service_path.display().to_string());
let bootstrap_command = format!("launchctl bootstrap gui/$(id -u) {quoted_path}");
let kickstart_command =
format!("launchctl kickstart -k gui/$(id -u)/{}", shell_quote(label));
let bootout_command = format!("launchctl bootout gui/$(id -u)/{}", shell_quote(label));
Ok(DaemonServiceSpec {
service_path,
stdout_log,
stderr_log,
unit_name: label.to_string(),
contents,
install_command: bootstrap_command.clone(),
uninstall_command: format!("{bootout_command} 2>/dev/null || true"),
start_command: format!("{bootstrap_command} 2>/dev/null || {kickstart_command}"),
stop_command: bootout_command,
status_command: format!("launchctl print gui/$(id -u)/{}", shell_quote(label)),
})
} else if cfg!(target_os = "linux") {
let unit_name = if label.ends_with(".service") {
label.to_string()
} else {
format!("{label}.service")
};
let service_path = dirs::home_dir()
.context("home directory is not available")?
.join(".config")
.join("systemd")
.join("user")
.join(&unit_name);
let contents = format!(
"[Unit]\nDescription=Codex Telegram Bridge notification daemon\n\n[Service]\nType=simple\nExecStart={} daemon run\nRestart=always\nRestartSec=2\nStandardOutput=append:{}\nStandardError=append:{}\n\n[Install]\nWantedBy=default.target\n",
shell_quote(&service_bridge_command),
stdout_log.display(),
stderr_log.display()
);
Ok(DaemonServiceSpec {
service_path,
stdout_log,
stderr_log,
unit_name: unit_name.clone(),
contents,
install_command: format!(
"systemctl --user daemon-reload && systemctl --user enable --now {}",
shell_quote(&unit_name)
),
uninstall_command: format!(
"systemctl --user disable --now {} 2>/dev/null || true",
shell_quote(&unit_name)
),
start_command: format!(
"systemctl --user daemon-reload && systemctl --user enable --now {}",
shell_quote(&unit_name)
),
stop_command: format!("systemctl --user stop {}", shell_quote(&unit_name)),
status_command: format!("systemctl --user status {}", shell_quote(&unit_name)),
})
} else {
bail!("daemon service install is only supported on macOS launchd and Linux systemd")
}
}
fn resolve_service_bridge_command(bridge_command: &str) -> String {
let trimmed = bridge_command.trim();
if trimmed.contains('/') {
let path = PathBuf::from(trimmed);
return if path.is_absolute() {
path.display().to_string()
} else {
env::current_dir()
.map(|cwd| cwd.join(path).display().to_string())
.unwrap_or_else(|_| trimmed.to_string())
};
}
which::which(trimmed)
.map(|path| path.display().to_string())
.unwrap_or_else(|_| trimmed.to_string())
}
pub(crate) fn install_daemon_service(
label: &str,
bridge_command: &str,
dry_run: bool,
) -> Result<Value> {
let spec = daemon_service_spec(label, bridge_command)?;
if !dry_run {
if let Some(parent) = spec.service_path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(&spec.service_path, &spec.contents)?;
}
Ok(json!({
"ok": true,
"action": "daemon_install",
"dryRun": dry_run,
"label": spec.unit_name,
"servicePath": spec.service_path,
"runCommand": crate::daemon_run_command(bridge_command),
"installCommand": spec.install_command,
"startCommand": spec.start_command,
"stopCommand": spec.stop_command,
"statusCommand": spec.status_command,
"logs": {
"stdout": spec.stdout_log,
"stderr": spec.stderr_log
},
"contents": if dry_run { Some(spec.contents) } else { None }
}))
}
pub(crate) fn uninstall_daemon_service(label: &str, dry_run: bool) -> Result<Value> {
let spec = daemon_service_spec(label, "codex-telegram-bridge")?;
let output = if dry_run {
None
} else {
Some(run_shell_command(&spec.uninstall_command)?)
};
if !dry_run && spec.service_path.exists() {
fs::remove_file(&spec.service_path)?;
}
Ok(json!({
"ok": true,
"action": "daemon_uninstall",
"dryRun": dry_run,
"label": spec.unit_name,
"servicePath": spec.service_path,
"uninstallCommand": spec.uninstall_command,
"output": output
}))
}
fn run_shell_command(command: &str) -> Result<Value> {
let output = Command::new("/bin/sh")
.arg("-c")
.arg(command)
.output()
.with_context(|| format!("failed to run `{command}`"))?;
Ok(json!({
"status": output.status.code(),
"success": output.status.success(),
"stdout": String::from_utf8_lossy(&output.stdout).trim(),
"stderr": String::from_utf8_lossy(&output.stderr).trim()
}))
}
pub(crate) fn start_daemon_service(label: &str, dry_run: bool) -> Result<Value> {
let spec = daemon_service_spec(label, "codex-telegram-bridge")?;
let command = spec.start_command.clone();
let output = if dry_run {
None
} else {
Some(run_shell_command(&command)?)
};
Ok(json!({
"ok": true,
"action": "daemon_start",
"dryRun": dry_run,
"label": spec.unit_name,
"command": command,
"output": output
}))
}
pub(crate) fn stop_daemon_service(label: &str, dry_run: bool) -> Result<Value> {
let spec = daemon_service_spec(label, "codex-telegram-bridge")?;
let command = spec.stop_command.clone();
let output = if dry_run {
None
} else {
Some(run_shell_command(&command)?)
};
Ok(json!({
"ok": true,
"action": "daemon_stop",
"dryRun": dry_run,
"label": spec.unit_name,
"command": command,
"output": output
}))
}
pub(crate) fn daemon_service_status(label: &str) -> Result<Value> {
let spec = daemon_service_spec(label, "codex-telegram-bridge")?;
let config_path = daemon_config_path()?;
Ok(json!({
"ok": true,
"action": "daemon_status",
"label": spec.unit_name,
"configPath": config_path,
"configExists": config_path.exists(),
"servicePath": spec.service_path,
"serviceExists": spec.service_path.exists(),
"statusCommand": spec.status_command,
"logs": {
"stdout": spec.stdout_log,
"stderr": spec.stderr_log
}
}))
}
pub(crate) fn daemon_service_logs(label: &str) -> Result<Value> {
let spec = daemon_service_spec(label, "codex-telegram-bridge")?;
Ok(json!({
"ok": true,
"action": "daemon_logs",
"label": spec.unit_name,
"stdout": spec.stdout_log,
"stderr": spec.stderr_log,
"tailCommand": format!(
"tail -f {} {}",
shell_quote(&spec.stdout_log.display().to_string()),
shell_quote(&spec.stderr_log.display().to_string())
)
}))
}
fn xml_escape(value: &str) -> String {
value
.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codex::set_away_mode;
use crate::state::{create_state_db_in_memory, pending_outbound_count};
#[test]
fn daemon_install_dry_run_resolves_relative_bridge_command_for_services() {
let result =
install_daemon_service(DEFAULT_DAEMON_LABEL, "bin/codex-telegram-bridge", true)
.expect("daemon install dry run");
let expected = std::env::current_dir()
.expect("cwd")
.join("bin/codex-telegram-bridge")
.display()
.to_string();
assert!(result["contents"]
.as_str()
.expect("service contents")
.contains(&expected));
}
#[test]
fn daemon_notification_policy_only_enqueues_events_while_away() {
let conn = create_state_db_in_memory().expect("db");
let event = json!({
"type": "thread_waiting",
"threadId": "thr_1",
"updatedAt": 1500
});
let off_count =
enqueue_daemon_notification_events(&conn, std::slice::from_ref(&event), 2000)
.expect("away off enqueue");
assert_eq!(
off_count, 0,
"daemon should stay quiet while user is present"
);
set_away_mode(&conn, true, 1000).expect("away on");
let on_count =
enqueue_daemon_notification_events(&conn, &[event], 2000).expect("away on enqueue");
assert_eq!(on_count, 1, "daemon should notify while user is away");
}
#[test]
fn daemon_notification_policy_skips_events_before_away_started() {
let conn = create_state_db_in_memory().expect("db");
set_away_mode(&conn, true, 2000).expect("away on");
let events = vec![
json!({
"type": "thread_waiting",
"threadId": "thr_old",
"updatedAt": 1500
}),
json!({
"type": "thread_waiting",
"threadId": "thr_new",
"updatedAt": 2500
}),
];
let count = enqueue_daemon_notification_events(&conn, &events, 3000).expect("enqueue");
assert_eq!(count, 1);
assert_eq!(pending_outbound_count(&conn).expect("pending"), 1);
}
#[test]
fn daemon_notification_policy_accepts_codex_second_timestamps() {
let conn = create_state_db_in_memory().expect("db");
set_away_mode(&conn, true, 1_776_219_288_240).expect("away on");
let events = vec![
json!({
"type": "thread_completed",
"threadId": "thr_old",
"updatedAt": 1_776_219_200
}),
json!({
"type": "thread_completed",
"threadId": "thr_new",
"updatedAt": 1_776_219_396
}),
];
let count = enqueue_daemon_notification_events(&conn, &events, 1_776_219_397_000)
.expect("mixed timestamp enqueue");
assert_eq!(count, 1);
assert_eq!(pending_outbound_count(&conn).expect("pending"), 1);
}
#[test]
fn away_off_clears_pending_daemon_notifications() {
let conn = create_state_db_in_memory().expect("db");
set_away_mode(&conn, true, 1000).expect("away on");
let event = json!({
"type": "thread_waiting",
"threadId": "thr_1",
"updatedAt": 1500
});
enqueue_daemon_notification_events(&conn, &[event], 2000).expect("enqueue");
assert_eq!(pending_outbound_count(&conn).expect("pending"), 1);
let disabled = set_away_mode(&conn, false, 2500).expect("away off");
assert_eq!(disabled["away"], false);
assert_eq!(disabled["clearedPendingNotifications"], 1);
assert_eq!(pending_outbound_count(&conn).expect("pending"), 0);
}
}