Skip to main content

codex_telegram_bridge/
lib.rs

1use anyhow::{anyhow, bail, Context, Result};
2use serde::Serialize;
3use serde_json::{json, Value};
4use sha2::{Digest, Sha256};
5use std::collections::BTreeSet;
6use std::process::Command;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9mod cli;
10mod codex;
11mod config;
12mod daemon;
13mod mcp;
14mod projects;
15mod state;
16mod telegram;
17
18use crate::cli::{
19    AwayCommands, Cli, Commands, DaemonCommands, HermesCommands, ProjectCommands, TelegramCommands,
20};
21use crate::codex::{
22    attach_follow_result, build_show_thread_result, classify_app_server_error_message,
23    collect_follow_events, filter_watch_events, follow_result_summary, fork_thread_dry_run,
24    fork_thread_live_result, get_away_mode, normalized_message, parse_event_filter,
25    resolve_codex_binary, run_exec_hook, set_away_mode, start_codex_watch_receiver,
26    start_new_thread_dry_run, start_thread_in_cwd, sync_state_from_live, thread_cwd_from_response,
27    thread_id_from_response, turn_start_params, watch_events_from_sync_result,
28    watch_thread_error_event, CodexAppServerClient, FollowRun,
29};
30#[cfg(test)]
31use crate::codex::{derive_pending_prompt, normalize_thread_snapshot};
32use crate::daemon::{
33    daemon_service_logs, daemon_service_spec, daemon_service_status, install_daemon_service,
34    run_daemon, start_daemon_service, stop_daemon_service, uninstall_daemon_service,
35    DEFAULT_DAEMON_LABEL,
36};
37use crate::mcp::run_mcp_server;
38use crate::projects::{build_registered_project, ensure_unique_project_id, slugify_project_token};
39use crate::state::{
40    archive_result, create_state_db, list_inbox_from_db, list_waiting_from_db,
41    observed_workspaces_from_db, record_action, resolve_archive_targets, state_db_path,
42    unarchive_thread_result, ObservedWorkspace,
43};
44#[cfg(test)]
45use crate::state::{classify_inbox_item, create_state_db_in_memory, BridgeThreadSnapshot};
46#[cfg(test)]
47use crate::state::{
48    deliver_due_outbound_events, enqueue_outbound_event, pending_outbound_count,
49    record_transport_delivery, transport_delivery_exists, OutboxDeliverySummary,
50};
51use crate::telegram::{
52    telegram_disable_result, telegram_setup_result, telegram_status_result, telegram_test_result,
53};
54use clap::Parser;
55pub(crate) use config::{
56    daemon_config_path, load_daemon_config, merged_daemon_config, read_daemon_config_raw,
57    redacted_daemon_config, resolve_telegram_bot_token, write_daemon_config, DaemonConfig,
58    RegisteredProject, SetupOptions, TelegramConfig, TelegramSetupOptions,
59};
60pub(crate) use state::state_dir_path;
61
62#[derive(Serialize)]
63struct ErrorEnvelope {
64    ok: bool,
65    error: ErrorBody,
66}
67
68#[derive(Serialize)]
69struct ErrorBody {
70    code: &'static str,
71    message: String,
72    classified: Value,
73}
74
75#[derive(Serialize)]
76struct DoctorEnvelope {
77    ok: bool,
78    codex: DoctorCodex,
79    bridge: DoctorBridge,
80}
81
82#[derive(Serialize)]
83struct DoctorCodex {
84    resolved_path: String,
85    source: String,
86    version_stdout: String,
87}
88
89#[derive(Serialize)]
90struct DoctorBridge {
91    config_path: String,
92    config_exists: bool,
93    telegram_configured: bool,
94    daemon_service_path: String,
95    daemon_service_exists: bool,
96}
97
98#[derive(Serialize)]
99struct ReplyResult<'a> {
100    ok: bool,
101    action: &'a str,
102    dry_run: bool,
103    thread_id: &'a str,
104    message: &'a str,
105    sent_at: u64,
106}
107
108#[derive(Serialize)]
109struct ApproveResult<'a> {
110    ok: bool,
111    action: &'a str,
112    dry_run: bool,
113    thread_id: &'a str,
114    decision: &'a str,
115    sent_text: &'a str,
116    sent_at: u64,
117}
118
119pub fn main_entry() -> anyhow::Result<()> {
120    run()
121}
122
123pub fn render_error_envelope(error: &anyhow::Error) -> String {
124    let envelope = ErrorEnvelope {
125        ok: false,
126        error: ErrorBody {
127            code: "internal_error",
128            message: format!("{error:#}"),
129            classified: classify_app_server_error_message(&format!("{error:#}")),
130        },
131    };
132
133    serde_json::to_string(&envelope).expect("serialize error envelope")
134}
135
136fn run() -> Result<()> {
137    let cli = Cli::parse();
138
139    match cli.command {
140        Commands::Setup {
141            bot_token,
142            chat_id,
143            allowed_user_id,
144            events,
145            bridge_command,
146            daemon_label,
147            install_daemon,
148            start_daemon,
149            register_hermes,
150            hermes_server_name,
151            hermes_command,
152            pair_timeout_ms,
153            dry_run,
154        } => {
155            let result = setup_result(SetupOptions {
156                bot_token: bot_token.as_deref(),
157                chat_id: chat_id.as_deref(),
158                allowed_user_id: allowed_user_id.as_deref(),
159                events: &events,
160                bridge_command: &bridge_command,
161                daemon_label: &daemon_label,
162                install_daemon,
163                start_daemon,
164                register_hermes,
165                hermes_server_name: &hermes_server_name,
166                hermes_command: &hermes_command,
167                dry_run,
168                pair_timeout_ms,
169            })?;
170            println!("{}", serde_json::to_string(&result)?);
171        }
172        Commands::Doctor => {
173            let resolved = resolve_codex_binary()?;
174            let output = Command::new(&resolved.path)
175                .arg("--version")
176                .output()
177                .with_context(|| {
178                    format!("failed to execute {} --version", resolved.path.display())
179                })?;
180            if !output.status.success() {
181                bail!(
182                    "codex binary {} returned non-zero exit status for --version",
183                    resolved.path.display()
184                );
185            }
186            let payload = DoctorEnvelope {
187                ok: true,
188                codex: DoctorCodex {
189                    resolved_path: resolved.path.display().to_string(),
190                    source: resolved.source.to_string(),
191                    version_stdout: String::from_utf8_lossy(&output.stdout).trim().to_string(),
192                },
193                bridge: doctor_bridge()?,
194            };
195            println!("{}", serde_json::to_string(&payload)?);
196        }
197        Commands::Away { command } => {
198            let now = now_millis()?;
199            let db_path = state_db_path()?;
200            let conn = create_state_db(&db_path)?;
201            let payload = match command {
202                AwayCommands::On => set_away_mode(&conn, true, now)?,
203                AwayCommands::Off => set_away_mode(&conn, false, now)?,
204                AwayCommands::Status => get_away_mode(&conn)?,
205            };
206            println!("{}", serde_json::to_string(&payload)?);
207        }
208        Commands::Threads { limit } => {
209            let now = now_millis()?;
210            let db_path = state_db_path()?;
211            let conn = create_state_db(&db_path)?;
212            let mut client = CodexAppServerClient::connect()?;
213            let result = sync_state_from_live(&mut client, &conn, now, limit, false)?;
214            println!(
215                "{}",
216                serde_json::to_string(&json!({
217                    "threads": result["threads"].clone()
218                }))?
219            );
220        }
221        Commands::Follow {
222            thread_id,
223            message,
224            duration,
225            poll_interval,
226            events,
227        } => {
228            let event_filter = parse_event_filter(events.as_deref());
229            let mut client = CodexAppServerClient::connect()?;
230            let events = collect_follow_events(
231                &mut client,
232                &thread_id,
233                message.as_deref(),
234                duration,
235                poll_interval,
236                event_filter.as_ref(),
237            )?;
238            for event in &events {
239                println!("{}", serde_json::to_string(event)?);
240            }
241            println!(
242                "{}",
243                serde_json::to_string(&follow_result_summary(
244                    &thread_id, duration, &events, false,
245                ))?
246            );
247        }
248        Commands::Unarchive { thread_id, dry_run } => {
249            let now = now_millis()?;
250            let db_path = state_db_path()?;
251            let conn = create_state_db(&db_path)?;
252            let live_result = if dry_run {
253                None
254            } else {
255                let mut client = CodexAppServerClient::connect()?;
256                Some(client.request("thread/unarchive", json!({ "threadId": thread_id }))?)
257            };
258            println!(
259                "{}",
260                serde_json::to_string(&unarchive_thread_result(
261                    &conn,
262                    &thread_id,
263                    dry_run,
264                    now,
265                    live_result
266                )?)?
267            );
268        }
269        Commands::Waiting { project, limit } => {
270            let now = now_millis()?;
271            let db_path = state_db_path()?;
272            let conn = create_state_db(&db_path)?;
273            let mut client = CodexAppServerClient::connect()?;
274            sync_state_from_live(&mut client, &conn, now, limit.max(25), false)?;
275            let result = list_waiting_from_db(&conn, project.as_deref(), limit)?;
276            println!("{}", serde_json::to_string(&result)?);
277        }
278        Commands::Inbox {
279            project,
280            status,
281            attention,
282            waiting_on,
283            limit,
284        } => {
285            let now = now_millis()?;
286            let db_path = state_db_path()?;
287            let conn = create_state_db(&db_path)?;
288            let mut client = CodexAppServerClient::connect()?;
289            sync_state_from_live(&mut client, &conn, now, limit.max(25), false)?;
290            let result = list_inbox_from_db(
291                &conn,
292                now,
293                project.as_deref(),
294                status.as_deref(),
295                attention.as_deref(),
296                waiting_on.as_deref(),
297                limit,
298            )?;
299            println!("{}", serde_json::to_string(&result)?);
300        }
301        Commands::Watch { once, exec, events } => {
302            let filter = parse_event_filter(events.as_deref());
303            let db_path = state_db_path()?;
304            let conn = create_state_db(&db_path)?;
305            if once {
306                let now = now_millis()?;
307                let mut client = CodexAppServerClient::connect()?;
308                let sync_result = match sync_state_from_live(&mut client, &conn, now, 50, true) {
309                    Ok(sync_result) => sync_result,
310                    Err(error) => {
311                        let filtered = filter_watch_events(
312                            vec![watch_thread_error_event(&error)],
313                            filter.as_ref(),
314                        );
315                        for event in &filtered {
316                            if let Some(command) = exec.as_deref() {
317                                run_exec_hook(command, event)?;
318                            }
319                        }
320                        println!("{}", serde_json::to_string(&json!({ "events": filtered }))?);
321                        return Ok(());
322                    }
323                };
324                let filtered = watch_events_from_sync_result(&sync_result, vec![], filter.as_ref());
325                for event in &filtered {
326                    if let Some(command) = exec.as_deref() {
327                        run_exec_hook(command, event)?;
328                    }
329                }
330                println!("{}", serde_json::to_string(&json!({ "events": filtered }))?);
331            } else {
332                println!(
333                    "{}",
334                    serde_json::to_string(
335                        &json!({ "type": "watch_started", "away": get_away_mode(&conn)?["away"] })
336                    )?
337                );
338                let mut last = String::new();
339                let watch_rx = start_codex_watch_receiver().ok();
340                loop {
341                    let now = now_millis()?;
342                    let mut client = CodexAppServerClient::connect()?;
343                    let filtered = match sync_state_from_live(&mut client, &conn, now, 50, true) {
344                        Ok(sync_result) => watch_events_from_sync_result(
345                            &sync_result,
346                            client.drain_notifications(),
347                            filter.as_ref(),
348                        ),
349                        Err(error) => filter_watch_events(
350                            vec![watch_thread_error_event(&error)],
351                            filter.as_ref(),
352                        ),
353                    };
354                    let serialized = serde_json::to_string(&filtered)?;
355                    if serialized != last {
356                        last = serialized;
357                        for event in filtered {
358                            println!("{}", serde_json::to_string(&event)?);
359                            if let Some(command) = exec.as_deref() {
360                                run_exec_hook(command, &event)?;
361                            }
362                        }
363                    }
364                    if let Some(rx) = watch_rx.as_ref() {
365                        rx.recv_timeout(std::time::Duration::from_millis(1500));
366                    } else {
367                        std::thread::sleep(std::time::Duration::from_millis(1500));
368                    }
369                }
370            }
371        }
372        Commands::Daemon { command } => match command {
373            DaemonCommands::Run {
374                once,
375                poll_interval,
376                timeout_ms,
377            } => {
378                run_daemon(once, poll_interval, Duration::from_millis(timeout_ms))?;
379            }
380            DaemonCommands::Install {
381                dry_run,
382                label,
383                bridge_command,
384            } => {
385                let result = install_daemon_service(&label, &bridge_command, dry_run)?;
386                println!("{}", serde_json::to_string(&result)?);
387            }
388            DaemonCommands::Uninstall { dry_run, label } => {
389                let result = uninstall_daemon_service(&label, dry_run)?;
390                println!("{}", serde_json::to_string(&result)?);
391            }
392            DaemonCommands::Start { dry_run, label } => {
393                let result = start_daemon_service(&label, dry_run)?;
394                println!("{}", serde_json::to_string(&result)?);
395            }
396            DaemonCommands::Stop { dry_run, label } => {
397                let result = stop_daemon_service(&label, dry_run)?;
398                println!("{}", serde_json::to_string(&result)?);
399            }
400            DaemonCommands::Status { label } => {
401                let result = daemon_service_status(&label)?;
402                println!("{}", serde_json::to_string(&result)?);
403            }
404            DaemonCommands::Logs { label } => {
405                let result = daemon_service_logs(&label)?;
406                println!("{}", serde_json::to_string(&result)?);
407            }
408        },
409        Commands::Telegram { command } => match command {
410            TelegramCommands::Setup {
411                bot_token,
412                chat_id,
413                allowed_user_id,
414                events,
415                bridge_command,
416                pair_timeout_ms,
417                dry_run,
418            } => {
419                let result = telegram_setup_result(TelegramSetupOptions {
420                    bot_token: bot_token.as_deref(),
421                    chat_id: chat_id.as_deref(),
422                    allowed_user_id: allowed_user_id.as_deref(),
423                    events: &events,
424                    bridge_command: &bridge_command,
425                    dry_run,
426                    pair_timeout_ms,
427                })?;
428                println!("{}", serde_json::to_string(&result)?);
429            }
430            TelegramCommands::Status => {
431                let result = telegram_status_result()?;
432                println!("{}", serde_json::to_string(&result)?);
433            }
434            TelegramCommands::Test {
435                message,
436                timeout_ms,
437                dry_run,
438            } => {
439                let result =
440                    telegram_test_result(&message, Duration::from_millis(timeout_ms), dry_run)?;
441                println!("{}", serde_json::to_string(&result)?);
442            }
443            TelegramCommands::Disable { dry_run } => {
444                let result = telegram_disable_result(dry_run)?;
445                println!("{}", serde_json::to_string(&result)?);
446            }
447        },
448        Commands::Projects { command } => match command {
449            ProjectCommands::List { observed_limit } => {
450                let result = projects_list_result(observed_limit)?;
451                println!("{}", serde_json::to_string(&result)?);
452            }
453            ProjectCommands::Add {
454                cwd,
455                id,
456                label,
457                aliases,
458                dry_run,
459            } => {
460                let result =
461                    project_add_result(&cwd, id.as_deref(), label.as_deref(), &aliases, dry_run)?;
462                println!("{}", serde_json::to_string(&result)?);
463            }
464            ProjectCommands::Import { limit, dry_run } => {
465                let result = project_import_result(limit, dry_run)?;
466                println!("{}", serde_json::to_string(&result)?);
467            }
468            ProjectCommands::Remove { id, dry_run } => {
469                let result = project_remove_result(&id, dry_run)?;
470                println!("{}", serde_json::to_string(&result)?);
471            }
472        },
473        Commands::Sync { limit } => {
474            let now = now_millis()?;
475            let db_path = state_db_path()?;
476            let conn = create_state_db(&db_path)?;
477            let mut client = CodexAppServerClient::connect()?;
478            let result = sync_state_from_live(&mut client, &conn, now, limit, false)?;
479            println!("{}", serde_json::to_string(&result)?);
480        }
481        Commands::New {
482            cwd,
483            message,
484            dry_run,
485            follow,
486            stream,
487            duration,
488            poll_interval,
489            events,
490            prompt,
491        } => {
492            let message = normalized_message(message.as_deref()).or_else(|| {
493                let joined = prompt.join(" ").trim().to_string();
494                (!joined.is_empty()).then_some(joined)
495            });
496            if dry_run {
497                println!(
498                    "{}",
499                    serde_json::to_string(&start_new_thread_dry_run(
500                        cwd.as_deref(),
501                        message.as_deref()
502                    ))?
503                );
504            } else {
505                let mut client = CodexAppServerClient::connect()?;
506                let result = start_thread_in_cwd(&mut client, cwd.as_deref(), message.as_deref())?;
507                let result = if follow {
508                    let db_path = state_db_path()?;
509                    let conn = create_state_db(&db_path)?;
510                    let filter = parse_event_filter(events.as_deref());
511                    if let Some(thread_id) = result.get("threadId").and_then(Value::as_str) {
512                        attach_follow_result(
513                            result.clone(),
514                            &mut client,
515                            &conn,
516                            FollowRun {
517                                thread_id,
518                                duration_ms: duration,
519                                poll_interval_ms: poll_interval,
520                                event_filter: filter.as_ref(),
521                                stream,
522                            },
523                        )?
524                    } else {
525                        result
526                    }
527                } else {
528                    result
529                };
530                println!("{}", serde_json::to_string(&result)?);
531            }
532        }
533        Commands::Fork {
534            thread_id,
535            message,
536            dry_run,
537            follow,
538            stream,
539            duration,
540            poll_interval,
541            events,
542            prompt,
543        } => {
544            let message = normalized_message(message.as_deref()).or_else(|| {
545                let joined = prompt.join(" ").trim().to_string();
546                (!joined.is_empty()).then_some(joined)
547            });
548            if dry_run {
549                println!(
550                    "{}",
551                    serde_json::to_string(&fork_thread_dry_run(&thread_id, message.as_deref()))?
552                );
553            } else {
554                let mut client = CodexAppServerClient::connect()?;
555                let forked = client.request("thread/fork", json!({ "threadId": thread_id }))?;
556                let new_thread_id = thread_id_from_response(&forked);
557                let forked_cwd = thread_cwd_from_response(&forked, None);
558                let started = match (new_thread_id.as_deref(), message.as_deref()) {
559                    (Some(new_thread_id), Some(message)) if !message.trim().is_empty() => {
560                        Some(client.request(
561                            "turn/start",
562                            turn_start_params(new_thread_id, forked_cwd.as_deref(), message),
563                        )?)
564                    }
565                    _ => None,
566                };
567                let result =
568                    fork_thread_live_result(&thread_id, message.as_deref(), forked, started);
569                let result = if follow {
570                    let db_path = state_db_path()?;
571                    let conn = create_state_db(&db_path)?;
572                    let filter = parse_event_filter(events.as_deref());
573                    if let Some(thread_id) = result.get("threadId").and_then(Value::as_str) {
574                        attach_follow_result(
575                            result.clone(),
576                            &mut client,
577                            &conn,
578                            FollowRun {
579                                thread_id,
580                                duration_ms: duration,
581                                poll_interval_ms: poll_interval,
582                                event_filter: filter.as_ref(),
583                                stream,
584                            },
585                        )?
586                    } else {
587                        result
588                    }
589                } else {
590                    result
591                };
592                println!("{}", serde_json::to_string(&result)?);
593            }
594        }
595        Commands::Archive {
596            thread_id_option,
597            thread_ids,
598            project,
599            status,
600            attention,
601            limit,
602            dry_run,
603            yes,
604        } => {
605            let mut targets = Vec::new();
606            if let Some(raw) = thread_id_option {
607                let raw = raw.as_str();
608                targets.extend(
609                    raw.split(',')
610                        .map(str::trim)
611                        .filter(|value| !value.is_empty())
612                        .map(str::to_string),
613                );
614            }
615            targets.extend(thread_ids);
616            let now = now_millis()?;
617            let db_path = state_db_path()?;
618            let conn = create_state_db(&db_path)?;
619            if !dry_run && targets.is_empty() && !yes {
620                bail!("Refusing bulk archive without --yes or --dry-run");
621            }
622            let mut client = if dry_run {
623                None
624            } else {
625                Some(CodexAppServerClient::connect()?)
626            };
627            if !dry_run && targets.is_empty() {
628                if let Some(client) = client.as_mut() {
629                    sync_state_from_live(client, &conn, now, 50, false)?;
630                }
631            }
632            let selection = resolve_archive_targets(
633                &conn,
634                &targets,
635                project.as_deref(),
636                status.as_deref(),
637                attention.as_deref(),
638                limit,
639                now,
640            )?;
641            if !dry_run && selection.using_filter_selection && !yes {
642                bail!("Refusing bulk archive without --yes or --dry-run");
643            }
644            if dry_run {
645                let results = selection
646                    .targets
647                    .into_iter()
648                    .map(|thread_id| json!({ "threadId": thread_id, "status": "would_archive" }))
649                    .collect::<Vec<_>>();
650                println!("{}", serde_json::to_string(&archive_result(true, results))?);
651            } else {
652                let mut results = Vec::new();
653                for target in selection.targets {
654                    let result = client
655                        .as_mut()
656                        .context("archive client missing")?
657                        .request("thread/archive", json!({ "threadId": target }))?;
658                    record_action(
659                        &conn,
660                        &target,
661                        "archive",
662                        json!({ "result": result, "archivedAt": now }),
663                        now,
664                    )?;
665                    results.push(json!({
666                        "threadId": target,
667                        "status": "archived",
668                        "result": result
669                    }));
670                }
671                println!(
672                    "{}",
673                    serde_json::to_string(&archive_result(false, results))?
674                );
675            }
676        }
677        Commands::Show { thread_id } => {
678            let db_path = state_db_path()?;
679            let conn = create_state_db(&db_path)?;
680            let mut client = CodexAppServerClient::connect()?;
681            let result = client.request(
682                "thread/read",
683                json!({
684                    "threadId": thread_id,
685                    "includeTurns": true
686                }),
687            )?;
688            println!(
689                "{}",
690                serde_json::to_string(
691                    &build_show_thread_result(Some(&conn), &thread_id, result,)?
692                )?
693            );
694        }
695        Commands::Reply {
696            thread_id,
697            message,
698            dry_run,
699            follow,
700            stream,
701            duration,
702            poll_interval,
703            events,
704            prompt,
705        } => {
706            let message = message
707                .or_else(|| {
708                    let joined = prompt.join(" ").trim().to_string();
709                    (!joined.is_empty()).then_some(joined)
710                })
711                .unwrap_or_default()
712                .trim()
713                .to_string();
714            if message.is_empty() {
715                bail!("Reply message cannot be empty");
716            }
717            let sent_at = now_millis()?;
718            if dry_run {
719                let payload = ReplyResult {
720                    ok: true,
721                    action: "reply",
722                    dry_run: true,
723                    thread_id: &thread_id,
724                    message: &message,
725                    sent_at,
726                };
727                println!("{}", serde_json::to_string(&payload)?);
728            } else {
729                let mut client = CodexAppServerClient::connect()?;
730                let resumed = client.request("thread/resume", json!({ "threadId": thread_id }))?;
731                let started = client.request(
732                    "turn/start",
733                    json!({
734                        "threadId": thread_id,
735                        "input": [{
736                            "type": "text",
737                            "text": message,
738                            "text_elements": []
739                        }]
740                    }),
741                )?;
742                let db_path = state_db_path()?;
743                let conn = create_state_db(&db_path)?;
744                record_action(
745                    &conn,
746                    &thread_id,
747                    "reply",
748                    json!({
749                        "message": message,
750                        "resumed": resumed,
751                        "started": started,
752                        "sentAt": sent_at
753                    }),
754                    sent_at,
755                )?;
756                let result = json!({
757                    "ok": true,
758                    "action": "reply",
759                    "threadId": thread_id,
760                    "message": message,
761                    "sentAt": sent_at,
762                    "resumed": resumed,
763                    "started": started
764                });
765                let result = if follow {
766                    let filter = parse_event_filter(events.as_deref());
767                    attach_follow_result(
768                        result,
769                        &mut client,
770                        &conn,
771                        FollowRun {
772                            thread_id: &thread_id,
773                            duration_ms: duration,
774                            poll_interval_ms: poll_interval,
775                            event_filter: filter.as_ref(),
776                            stream,
777                        },
778                    )?
779                } else {
780                    result
781                };
782                println!("{}", serde_json::to_string(&result)?);
783            }
784        }
785        Commands::Approve {
786            thread_id,
787            decision,
788            dry_run,
789            follow,
790            stream,
791            duration,
792            poll_interval,
793            events,
794            positional_decision,
795        } => {
796            let normalized = decision
797                .or(positional_decision)
798                .unwrap_or_default()
799                .trim()
800                .to_lowercase();
801            let sent_text = match normalized.as_str() {
802                "approve" => "YES",
803                "deny" => "NO",
804                _ => bail!("Approval decision must be approve or deny"),
805            };
806            let sent_at = now_millis()?;
807            if dry_run {
808                let payload = ApproveResult {
809                    ok: true,
810                    action: "approve",
811                    dry_run: true,
812                    thread_id: &thread_id,
813                    decision: &normalized,
814                    sent_text,
815                    sent_at,
816                };
817                println!("{}", serde_json::to_string(&payload)?);
818            } else {
819                let mut client = CodexAppServerClient::connect()?;
820                let resumed = client.request("thread/resume", json!({ "threadId": thread_id }))?;
821                let started = client.request(
822                    "turn/start",
823                    json!({
824                        "threadId": thread_id,
825                        "input": [{
826                            "type": "text",
827                            "text": sent_text,
828                            "text_elements": []
829                        }]
830                    }),
831                )?;
832                let db_path = state_db_path()?;
833                let conn = create_state_db(&db_path)?;
834                record_action(
835                    &conn,
836                    &thread_id,
837                    "approve",
838                    json!({
839                        "decision": normalized,
840                        "sentText": sent_text,
841                        "resumed": resumed,
842                        "started": started,
843                        "sentAt": sent_at
844                    }),
845                    sent_at,
846                )?;
847                let result = json!({
848                    "ok": true,
849                    "action": "approve",
850                    "threadId": thread_id,
851                    "decision": normalized,
852                    "sentText": sent_text,
853                    "sentAt": sent_at,
854                    "resumed": resumed,
855                    "started": started
856                });
857                let result = if follow {
858                    let filter = parse_event_filter(events.as_deref());
859                    attach_follow_result(
860                        result,
861                        &mut client,
862                        &conn,
863                        FollowRun {
864                            thread_id: &thread_id,
865                            duration_ms: duration,
866                            poll_interval_ms: poll_interval,
867                            event_filter: filter.as_ref(),
868                            stream,
869                        },
870                    )?
871                } else {
872                    result
873                };
874                println!("{}", serde_json::to_string(&result)?);
875            }
876        }
877        Commands::Mcp => {
878            let stdin = std::io::stdin();
879            let stdout = std::io::stdout();
880            run_mcp_server(stdin.lock(), stdout.lock())?;
881        }
882        Commands::Hermes { command } => match command {
883            HermesCommands::Install {
884                server_name,
885                hermes_command,
886                bridge_command,
887                dry_run,
888            } => {
889                let result = run_hermes_install(HermesInstallOptions {
890                    server_name: &server_name,
891                    hermes_command: &hermes_command,
892                    bridge_command: &bridge_command,
893                    dry_run,
894                })?;
895                println!("{}", serde_json::to_string(&result)?);
896            }
897        },
898    }
899
900    Ok(())
901}
902
903fn now_millis() -> Result<u64> {
904    Ok(SystemTime::now()
905        .duration_since(UNIX_EPOCH)
906        .map_err(|e| anyhow!(e))?
907        .as_millis() as u64)
908}
909
910fn importable_projects_from_observed(
911    observed: &[ObservedWorkspace],
912    existing_projects: &[RegisteredProject],
913) -> Vec<RegisteredProject> {
914    let mut projects = Vec::new();
915    let mut existing_ids = existing_projects
916        .iter()
917        .map(|project| project.id.clone())
918        .collect::<BTreeSet<_>>();
919    let existing_cwds = existing_projects
920        .iter()
921        .map(|project| project.cwd.clone())
922        .collect::<BTreeSet<_>>();
923    for workspace in observed {
924        if existing_cwds.contains(&workspace.cwd)
925            || projects
926                .iter()
927                .any(|project: &RegisteredProject| project.cwd == workspace.cwd)
928        {
929            continue;
930        }
931        let base_id =
932            slugify_project_token(&workspace.label).unwrap_or_else(|| "project".to_string());
933        let id = ensure_unique_project_id(&base_id, &existing_ids);
934        existing_ids.insert(id.clone());
935        projects.push(RegisteredProject {
936            id,
937            label: workspace.label.clone(),
938            cwd: workspace.cwd.clone(),
939            aliases: Vec::new(),
940        });
941    }
942    projects
943}
944
945fn daemon_run_command(bridge_command: &str) -> String {
946    format!("{} daemon run", shell_quote(bridge_command))
947}
948
949fn doctor_bridge() -> Result<DoctorBridge> {
950    let config_path = daemon_config_path()?;
951    let config = read_daemon_config_raw()?;
952    let service = daemon_service_spec(DEFAULT_DAEMON_LABEL, "codex-telegram-bridge")?;
953    Ok(DoctorBridge {
954        config_path: config_path.display().to_string(),
955        config_exists: config_path.exists(),
956        telegram_configured: config
957            .as_ref()
958            .and_then(|config| config.telegram.as_ref())
959            .is_some(),
960        daemon_service_path: service.service_path.display().to_string(),
961        daemon_service_exists: service.service_path.exists(),
962    })
963}
964
965fn setup_result(options: SetupOptions<'_>) -> Result<Value> {
966    let resolved = resolve_codex_binary()?;
967    let telegram = telegram_setup_result(TelegramSetupOptions {
968        bot_token: options.bot_token,
969        chat_id: options.chat_id,
970        allowed_user_id: options.allowed_user_id,
971        events: options.events,
972        bridge_command: options.bridge_command,
973        dry_run: options.dry_run,
974        pair_timeout_ms: options.pair_timeout_ms,
975    })?;
976    let daemon_install = if options.install_daemon {
977        Some(install_daemon_service(
978            options.daemon_label,
979            options.bridge_command,
980            options.dry_run,
981        )?)
982    } else {
983        None
984    };
985    let daemon_start = if options.start_daemon {
986        Some(start_daemon_service(options.daemon_label, options.dry_run)?)
987    } else {
988        None
989    };
990    let hermes = if options.register_hermes {
991        Some(run_hermes_install(HermesInstallOptions {
992            server_name: options.hermes_server_name,
993            hermes_command: options.hermes_command,
994            bridge_command: options.bridge_command,
995            dry_run: options.dry_run,
996        })?)
997    } else {
998        None
999    };
1000
1001    Ok(json!({
1002        "ok": true,
1003        "action": "setup",
1004        "dryRun": options.dry_run,
1005        "codex": {
1006            "resolvedPath": resolved.path.display().to_string(),
1007            "source": resolved.source
1008        },
1009        "telegram": telegram,
1010        "daemon": {
1011            "install": daemon_install,
1012            "start": daemon_start
1013        },
1014        "hermes": hermes,
1015        "nextStep": if options.dry_run {
1016            "Run setup without --dry-run, then use away on when leaving your computer."
1017        } else {
1018            "Use away on when leaving your computer. Reply to Codex Telegram messages to keep working from Telegram."
1019        }
1020    }))
1021}
1022
1023fn projects_list_result(observed_limit: u64) -> Result<Value> {
1024    let config = load_daemon_config()?;
1025    let db_path = state_db_path()?;
1026    let conn = create_state_db(&db_path)?;
1027    let observed = observed_workspaces_from_db(&conn, observed_limit)?;
1028    let importable = importable_projects_from_observed(&observed, &config.projects);
1029    Ok(json!({
1030        "ok": true,
1031        "action": "projects_list",
1032        "configured": config.projects,
1033        "observed": observed.into_iter().map(|workspace| json!({
1034            "label": workspace.label,
1035            "cwd": workspace.cwd,
1036            "lastSeenAt": workspace.last_seen_at
1037        })).collect::<Vec<_>>(),
1038        "importable": importable
1039    }))
1040}
1041
1042fn project_add_result(
1043    cwd: &str,
1044    id: Option<&str>,
1045    label: Option<&str>,
1046    aliases: &[String],
1047    dry_run: bool,
1048) -> Result<Value> {
1049    let mut config = load_daemon_config()?;
1050    let project = build_registered_project(cwd, id, label, aliases, &config.projects)?;
1051    if config
1052        .projects
1053        .iter()
1054        .any(|existing| existing.cwd == project.cwd)
1055    {
1056        bail!("project cwd `{}` is already registered", project.cwd);
1057    }
1058    if !dry_run {
1059        config.projects.push(project.clone());
1060        write_daemon_config(&config)?;
1061    }
1062    Ok(json!({
1063        "ok": true,
1064        "action": "projects_add",
1065        "dryRun": dry_run,
1066        "project": project
1067    }))
1068}
1069
1070fn project_import_result(limit: u64, dry_run: bool) -> Result<Value> {
1071    let mut config = load_daemon_config()?;
1072    let db_path = state_db_path()?;
1073    let conn = create_state_db(&db_path)?;
1074    let observed = observed_workspaces_from_db(&conn, limit)?;
1075    let importable = importable_projects_from_observed(&observed, &config.projects);
1076    if !dry_run && !importable.is_empty() {
1077        config.projects.extend(importable.clone());
1078        write_daemon_config(&config)?;
1079    }
1080    Ok(json!({
1081        "ok": true,
1082        "action": "projects_import",
1083        "dryRun": dry_run,
1084        "imported": importable,
1085        "count": importable.len()
1086    }))
1087}
1088
1089fn project_remove_result(id: &str, dry_run: bool) -> Result<Value> {
1090    let mut config = load_daemon_config()?;
1091    let Some(project) = config
1092        .projects
1093        .iter()
1094        .find(|project| project.id == id)
1095        .cloned()
1096    else {
1097        bail!("project `{id}` was not found");
1098    };
1099    if !dry_run {
1100        config.projects.retain(|candidate| candidate.id != id);
1101        write_daemon_config(&config)?;
1102    }
1103    Ok(json!({
1104        "ok": true,
1105        "action": "projects_remove",
1106        "dryRun": dry_run,
1107        "project": project
1108    }))
1109}
1110
1111const DEFAULT_NOTIFICATION_EVENTS: &str = "thread_waiting,thread_completed";
1112
1113#[derive(Debug, Clone, Copy)]
1114struct HermesInstallOptions<'a> {
1115    server_name: &'a str,
1116    hermes_command: &'a str,
1117    bridge_command: &'a str,
1118    dry_run: bool,
1119}
1120
1121fn run_hermes_install(options: HermesInstallOptions<'_>) -> Result<Value> {
1122    let server_name = options.server_name.trim();
1123    let hermes_command = options.hermes_command.trim();
1124    let bridge_command = options.bridge_command.trim();
1125
1126    if server_name.is_empty() {
1127        bail!("server name cannot be empty");
1128    }
1129    if hermes_command.is_empty() {
1130        bail!("hermes command cannot be empty");
1131    }
1132    if bridge_command.is_empty() {
1133        bail!("bridge command cannot be empty");
1134    }
1135
1136    let mcp_args = vec![
1137        "mcp".to_string(),
1138        "add".to_string(),
1139        server_name.to_string(),
1140        "--command".to_string(),
1141        bridge_command.to_string(),
1142        "--args".to_string(),
1143        "mcp".to_string(),
1144    ];
1145
1146    let base = json!({
1147        "ok": true,
1148        "action": "hermes_install",
1149        "dryRun": options.dry_run,
1150        "serverName": server_name,
1151        "hermesCommand": hermes_command,
1152        "bridgeCommand": bridge_command,
1153        "args": mcp_args.clone(),
1154        "mcp": {
1155            "configured": true,
1156            "args": mcp_args.clone(),
1157            "nextStep": "Restart Hermes so it reconnects to MCP servers and discovers codex_* tools."
1158        },
1159        "nextStep": "Restart Hermes for MCP discovery. Telegram notifications are configured with the top-level setup command, not through Hermes."
1160    });
1161    if options.dry_run {
1162        return Ok(base);
1163    }
1164
1165    let mcp_output = Command::new(hermes_command)
1166        .args(&mcp_args)
1167        .output()
1168        .with_context(|| format!("failed to run {hermes_command} mcp add"))?;
1169    if !mcp_output.status.success() {
1170        bail!(
1171            "Hermes MCP registration failed with status {}: {}",
1172            mcp_output.status,
1173            String::from_utf8_lossy(&mcp_output.stderr).trim()
1174        );
1175    }
1176
1177    Ok(json!({
1178        "ok": true,
1179        "action": "hermes_install",
1180        "dryRun": false,
1181        "serverName": server_name,
1182        "hermesCommand": hermes_command,
1183        "bridgeCommand": bridge_command,
1184        "args": mcp_args,
1185        "mcp": {
1186            "configured": true,
1187            "stdout": String::from_utf8_lossy(&mcp_output.stdout).trim(),
1188            "stderr": String::from_utf8_lossy(&mcp_output.stderr).trim()
1189        },
1190        "nextStep": "Restart Hermes for MCP discovery. Telegram notifications are configured with the top-level setup command, not through Hermes."
1191    }))
1192}
1193
1194fn redact_secret_text(text: &str, secret: &str) -> String {
1195    if secret.is_empty() {
1196        text.to_string()
1197    } else {
1198        text.replace(secret, "<redacted>")
1199    }
1200}
1201
1202fn shell_quote(value: &str) -> String {
1203    if !value.is_empty()
1204        && value.chars().all(|c| {
1205            c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '_' | '-' | ',' | ':' | '=')
1206        })
1207    {
1208        value.to_string()
1209    } else {
1210        format!("'{}'", value.replace('\'', "'\\''"))
1211    }
1212}
1213
1214fn event_thread_id(event: &Value) -> Option<String> {
1215    event
1216        .get("threadId")
1217        .and_then(Value::as_str)
1218        .or_else(|| event.pointer("/thread/threadId").and_then(Value::as_str))
1219        .or_else(|| event.pointer("/thread/id").and_then(Value::as_str))
1220        .map(str::to_string)
1221}
1222
1223fn notification_event_id(event: &Value) -> String {
1224    let event_type = event
1225        .get("type")
1226        .and_then(Value::as_str)
1227        .unwrap_or("codex_event");
1228    let thread_id = event_thread_id(event).unwrap_or_else(|| "unknown".to_string());
1229    let discriminator = event
1230        .get("eventKey")
1231        .and_then(Value::as_str)
1232        .map(str::to_string)
1233        .or_else(|| {
1234            event
1235                .get("updatedAt")
1236                .and_then(Value::as_u64)
1237                .map(|value| value.to_string())
1238        })
1239        .or_else(|| {
1240            event
1241                .get("observedAt")
1242                .and_then(Value::as_u64)
1243                .map(|value| value.to_string())
1244        })
1245        .unwrap_or_else(|| {
1246            serde_json::to_string(event)
1247                .map(|raw| sha256_hex(raw.as_bytes()))
1248                .unwrap_or_else(|_| "event".to_string())
1249        });
1250    sanitize_delivery_id(&format!("codex:{event_type}:{thread_id}:{discriminator}"))
1251}
1252
1253fn sanitize_delivery_id(value: &str) -> String {
1254    value
1255        .chars()
1256        .map(|c| {
1257            if c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '.') {
1258                c
1259            } else {
1260                '-'
1261            }
1262        })
1263        .collect()
1264}
1265
1266fn sha256_hex(message: &[u8]) -> String {
1267    hex_lower(&Sha256::digest(message))
1268}
1269
1270fn hex_lower(bytes: &[u8]) -> String {
1271    const HEX: &[u8; 16] = b"0123456789abcdef";
1272    let mut output = String::with_capacity(bytes.len() * 2);
1273    for byte in bytes {
1274        output.push(HEX[(byte >> 4) as usize] as char);
1275        output.push(HEX[(byte & 0x0f) as usize] as char);
1276    }
1277    output
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282    use super::*;
1283
1284    #[test]
1285    fn derives_waiting_prompt_from_status_flags() {
1286        let summary = json!({
1287            "id": "thr_reply",
1288            "name": null,
1289            "cwd": "/tmp/reply",
1290            "updatedAt": 123,
1291            "status": {
1292                "type": "active",
1293                "activeFlags": ["waitingOnUserInput"]
1294            }
1295        });
1296        let thread = json!({
1297            "id": "thr_reply",
1298            "cwd": "/tmp/reply",
1299            "status": {
1300                "type": "active",
1301                "activeFlags": ["waitingOnUserInput"]
1302            },
1303            "turns": [
1304                {
1305                    "status": "in_progress",
1306                    "items": [
1307                        {
1308                            "type": "agentMessage",
1309                            "phase": "final_answer",
1310                            "text": "Can you confirm the plan?"
1311                        }
1312                    ]
1313                }
1314            ]
1315        });
1316
1317        let snapshot = normalize_thread_snapshot(&summary, &thread).expect("snapshot");
1318        let prompt = snapshot.pending_prompt.expect("pending prompt");
1319        assert_eq!(prompt.kind, "reply");
1320        assert_eq!(
1321            prompt.question.as_deref(),
1322            Some("Can you confirm the plan?")
1323        );
1324        assert_eq!(snapshot.last_turn_status.as_deref(), Some("in_progress"));
1325    }
1326
1327    #[test]
1328    fn hermes_install_dry_run_builds_mcp_add_command() {
1329        let result = run_hermes_install(HermesInstallOptions {
1330            server_name: "codex",
1331            hermes_command: "hermes-se",
1332            bridge_command: "codex-telegram-bridge",
1333            dry_run: true,
1334        })
1335        .expect("dry-run install result");
1336
1337        assert_eq!(result["action"], "hermes_install");
1338        assert_eq!(result["dryRun"], true);
1339        assert_eq!(result["hermesCommand"], "hermes-se");
1340        assert_eq!(
1341            result["args"],
1342            json!([
1343                "mcp",
1344                "add",
1345                "codex",
1346                "--command",
1347                "codex-telegram-bridge",
1348                "--args",
1349                "mcp"
1350            ])
1351        );
1352        assert_eq!(
1353            result["mcp"]["nextStep"],
1354            "Restart Hermes so it reconnects to MCP servers and discovers codex_* tools."
1355        );
1356        assert!(
1357            result.get("notificationLane").is_none(),
1358            "Hermes install should not configure Telegram notifications"
1359        );
1360    }
1361
1362    #[test]
1363    fn outbound_events_dedupe_retry_and_deliver_durably() {
1364        let conn = create_state_db_in_memory().expect("db");
1365        let event = json!({
1366            "type": "thread_waiting",
1367            "threadId": "thr_1",
1368            "updatedAt": 42
1369        });
1370
1371        assert!(enqueue_outbound_event(&conn, &event, 1000).expect("enqueue"));
1372        assert!(
1373            !enqueue_outbound_event(&conn, &event, 1001).expect("dedupe"),
1374            "same delivery id should not enqueue twice"
1375        );
1376
1377        let failed = deliver_due_outbound_events(&conn, 1000, 10, |_| bail!("Hermes offline"))
1378            .expect("failed delivery summary");
1379        assert_eq!(
1380            failed,
1381            OutboxDeliverySummary {
1382                attempted: 1,
1383                delivered: 0,
1384                failed: 1
1385            }
1386        );
1387        assert_eq!(pending_outbound_count(&conn).expect("pending"), 1);
1388
1389        let delayed = deliver_due_outbound_events(&conn, 1000, 10, |_| Ok(json!({"ok": true})))
1390            .expect("not due summary");
1391        assert_eq!(delayed.attempted, 0);
1392
1393        let delivered = deliver_due_outbound_events(&conn, 2000, 10, |_| Ok(json!({"ok": true})))
1394            .expect("delivered summary");
1395        assert_eq!(
1396            delivered,
1397            OutboxDeliverySummary {
1398                attempted: 1,
1399                delivered: 1,
1400                failed: 0
1401            }
1402        );
1403        assert_eq!(pending_outbound_count(&conn).expect("pending"), 0);
1404    }
1405
1406    #[test]
1407    fn transport_delivery_log_tracks_each_transport_once() {
1408        let conn = create_state_db_in_memory().expect("db");
1409
1410        assert!(
1411            !transport_delivery_exists(&conn, "event_1", "telegram").expect("lookup"),
1412            "transport should not start delivered"
1413        );
1414        record_transport_delivery(
1415            &conn,
1416            "event_1",
1417            "telegram",
1418            &json!({ "messageId": 111 }),
1419            1000,
1420        )
1421        .expect("record delivery");
1422
1423        assert!(
1424            transport_delivery_exists(&conn, "event_1", "telegram").expect("lookup"),
1425            "recorded transport should be treated as delivered"
1426        );
1427        assert!(
1428            !transport_delivery_exists(&conn, "event_1", "hermes").expect("lookup"),
1429            "other transports for the same event must remain pending"
1430        );
1431    }
1432
1433    #[test]
1434    fn inbox_age_seconds_handles_mixed_timestamp_units() {
1435        let snapshot = snapshot_fixture(
1436            "thr_recent",
1437            "/tmp/project",
1438            1_776_219_396,
1439            "notLoaded",
1440            vec![],
1441            Some("completed"),
1442        );
1443
1444        let item = classify_inbox_item(&snapshot, 1_776_219_400_000);
1445
1446        assert_eq!(item.age_seconds, Some(4));
1447    }
1448
1449    fn snapshot_fixture(
1450        thread_id: &str,
1451        cwd: &str,
1452        updated_at: u64,
1453        status_type: &str,
1454        status_flags: Vec<&str>,
1455        last_turn_status: Option<&str>,
1456    ) -> BridgeThreadSnapshot {
1457        let status_flags_vec = status_flags
1458            .into_iter()
1459            .map(|s| s.to_string())
1460            .collect::<Vec<_>>();
1461        BridgeThreadSnapshot {
1462            thread_id: thread_id.to_string(),
1463            name: None,
1464            cwd: Some(cwd.to_string()),
1465            updated_at: Some(updated_at),
1466            status_type: status_type.to_string(),
1467            status_flags: status_flags_vec.clone(),
1468            last_turn_status: last_turn_status.map(|s| s.to_string()),
1469            last_preview: Some(format!("preview for {thread_id}")),
1470            pending_prompt: derive_pending_prompt(
1471                thread_id,
1472                &status_flags_vec,
1473                Some(format!("preview for {thread_id}")),
1474            ),
1475        }
1476    }
1477}