1use anyhow::{anyhow, bail, Context, Result};
2use serde::Serialize;
3use serde_json::{json, Value};
4use sha2::{Digest, Sha256};
5use std::collections::BTreeSet;
6use std::process::Command;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9mod cli;
10mod codex;
11mod config;
12mod daemon;
13mod mcp;
14mod projects;
15mod state;
16mod telegram;
17
18use crate::cli::{
19 AwayCommands, Cli, Commands, DaemonCommands, HermesCommands, ProjectCommands, TelegramCommands,
20};
21use crate::codex::{
22 attach_follow_result, build_show_thread_result, classify_app_server_error_message,
23 collect_follow_events, filter_watch_events, follow_result_summary, fork_thread_dry_run,
24 fork_thread_live_result, get_away_mode, normalized_message, parse_event_filter,
25 resolve_codex_binary, run_exec_hook, set_away_mode, start_codex_watch_receiver,
26 start_new_thread_dry_run, start_thread_in_cwd, sync_state_from_live, thread_cwd_from_response,
27 thread_id_from_response, turn_start_params, watch_events_from_sync_result,
28 watch_thread_error_event, CodexAppServerClient, FollowRun,
29};
30#[cfg(test)]
31use crate::codex::{derive_pending_prompt, normalize_thread_snapshot};
32use crate::daemon::{
33 daemon_service_logs, daemon_service_spec, daemon_service_status, install_daemon_service,
34 run_daemon, start_daemon_service, stop_daemon_service, uninstall_daemon_service,
35 DEFAULT_DAEMON_LABEL,
36};
37use crate::mcp::run_mcp_server;
38use crate::projects::{build_registered_project, ensure_unique_project_id, slugify_project_token};
39use crate::state::{
40 archive_result, create_state_db, list_inbox_from_db, list_waiting_from_db,
41 observed_workspaces_from_db, record_action, resolve_archive_targets, state_db_path,
42 unarchive_thread_result, ObservedWorkspace,
43};
44#[cfg(test)]
45use crate::state::{classify_inbox_item, create_state_db_in_memory, BridgeThreadSnapshot};
46#[cfg(test)]
47use crate::state::{
48 deliver_due_outbound_events, enqueue_outbound_event, pending_outbound_count,
49 record_transport_delivery, transport_delivery_exists, OutboxDeliverySummary,
50};
51use crate::telegram::{
52 telegram_disable_result, telegram_setup_result, telegram_status_result, telegram_test_result,
53};
54use clap::Parser;
55pub(crate) use config::{
56 daemon_config_path, load_daemon_config, merged_daemon_config, read_daemon_config_raw,
57 redacted_daemon_config, resolve_telegram_bot_token, write_daemon_config, DaemonConfig,
58 RegisteredProject, SetupOptions, TelegramConfig, TelegramSetupOptions,
59};
60pub(crate) use state::state_dir_path;
61
62#[derive(Serialize)]
63struct ErrorEnvelope {
64 ok: bool,
65 error: ErrorBody,
66}
67
68#[derive(Serialize)]
69struct ErrorBody {
70 code: &'static str,
71 message: String,
72 classified: Value,
73}
74
75#[derive(Serialize)]
76struct DoctorEnvelope {
77 ok: bool,
78 codex: DoctorCodex,
79 bridge: DoctorBridge,
80}
81
82#[derive(Serialize)]
83struct DoctorCodex {
84 resolved_path: String,
85 source: String,
86 version_stdout: String,
87}
88
89#[derive(Serialize)]
90struct DoctorBridge {
91 config_path: String,
92 config_exists: bool,
93 telegram_configured: bool,
94 daemon_service_path: String,
95 daemon_service_exists: bool,
96}
97
98#[derive(Serialize)]
99struct ReplyResult<'a> {
100 ok: bool,
101 action: &'a str,
102 dry_run: bool,
103 thread_id: &'a str,
104 message: &'a str,
105 sent_at: u64,
106}
107
108#[derive(Serialize)]
109struct ApproveResult<'a> {
110 ok: bool,
111 action: &'a str,
112 dry_run: bool,
113 thread_id: &'a str,
114 decision: &'a str,
115 sent_text: &'a str,
116 sent_at: u64,
117}
118
119pub fn main_entry() -> anyhow::Result<()> {
120 run()
121}
122
123pub fn render_error_envelope(error: &anyhow::Error) -> String {
124 let envelope = ErrorEnvelope {
125 ok: false,
126 error: ErrorBody {
127 code: "internal_error",
128 message: format!("{error:#}"),
129 classified: classify_app_server_error_message(&format!("{error:#}")),
130 },
131 };
132
133 serde_json::to_string(&envelope).expect("serialize error envelope")
134}
135
136fn run() -> Result<()> {
137 let cli = Cli::parse();
138
139 match cli.command {
140 Commands::Setup {
141 bot_token,
142 chat_id,
143 allowed_user_id,
144 events,
145 bridge_command,
146 daemon_label,
147 install_daemon,
148 start_daemon,
149 register_hermes,
150 hermes_server_name,
151 hermes_command,
152 pair_timeout_ms,
153 dry_run,
154 } => {
155 let result = setup_result(SetupOptions {
156 bot_token: bot_token.as_deref(),
157 chat_id: chat_id.as_deref(),
158 allowed_user_id: allowed_user_id.as_deref(),
159 events: &events,
160 bridge_command: &bridge_command,
161 daemon_label: &daemon_label,
162 install_daemon,
163 start_daemon,
164 register_hermes,
165 hermes_server_name: &hermes_server_name,
166 hermes_command: &hermes_command,
167 dry_run,
168 pair_timeout_ms,
169 })?;
170 println!("{}", serde_json::to_string(&result)?);
171 }
172 Commands::Doctor => {
173 let resolved = resolve_codex_binary()?;
174 let output = Command::new(&resolved.path)
175 .arg("--version")
176 .output()
177 .with_context(|| {
178 format!("failed to execute {} --version", resolved.path.display())
179 })?;
180 if !output.status.success() {
181 bail!(
182 "codex binary {} returned non-zero exit status for --version",
183 resolved.path.display()
184 );
185 }
186 let payload = DoctorEnvelope {
187 ok: true,
188 codex: DoctorCodex {
189 resolved_path: resolved.path.display().to_string(),
190 source: resolved.source.to_string(),
191 version_stdout: String::from_utf8_lossy(&output.stdout).trim().to_string(),
192 },
193 bridge: doctor_bridge()?,
194 };
195 println!("{}", serde_json::to_string(&payload)?);
196 }
197 Commands::Away { command } => {
198 let now = now_millis()?;
199 let db_path = state_db_path()?;
200 let conn = create_state_db(&db_path)?;
201 let payload = match command {
202 AwayCommands::On => set_away_mode(&conn, true, now)?,
203 AwayCommands::Off => set_away_mode(&conn, false, now)?,
204 AwayCommands::Status => get_away_mode(&conn)?,
205 };
206 println!("{}", serde_json::to_string(&payload)?);
207 }
208 Commands::Threads { limit } => {
209 let now = now_millis()?;
210 let db_path = state_db_path()?;
211 let conn = create_state_db(&db_path)?;
212 let mut client = CodexAppServerClient::connect()?;
213 let result = sync_state_from_live(&mut client, &conn, now, limit, false)?;
214 println!(
215 "{}",
216 serde_json::to_string(&json!({
217 "threads": result["threads"].clone()
218 }))?
219 );
220 }
221 Commands::Follow {
222 thread_id,
223 message,
224 duration,
225 poll_interval,
226 events,
227 } => {
228 let event_filter = parse_event_filter(events.as_deref());
229 let mut client = CodexAppServerClient::connect()?;
230 let events = collect_follow_events(
231 &mut client,
232 &thread_id,
233 message.as_deref(),
234 duration,
235 poll_interval,
236 event_filter.as_ref(),
237 )?;
238 for event in &events {
239 println!("{}", serde_json::to_string(event)?);
240 }
241 println!(
242 "{}",
243 serde_json::to_string(&follow_result_summary(
244 &thread_id, duration, &events, false,
245 ))?
246 );
247 }
248 Commands::Unarchive { thread_id, dry_run } => {
249 let now = now_millis()?;
250 let db_path = state_db_path()?;
251 let conn = create_state_db(&db_path)?;
252 let live_result = if dry_run {
253 None
254 } else {
255 let mut client = CodexAppServerClient::connect()?;
256 Some(client.request("thread/unarchive", json!({ "threadId": thread_id }))?)
257 };
258 println!(
259 "{}",
260 serde_json::to_string(&unarchive_thread_result(
261 &conn,
262 &thread_id,
263 dry_run,
264 now,
265 live_result
266 )?)?
267 );
268 }
269 Commands::Waiting { project, limit } => {
270 let now = now_millis()?;
271 let db_path = state_db_path()?;
272 let conn = create_state_db(&db_path)?;
273 let mut client = CodexAppServerClient::connect()?;
274 sync_state_from_live(&mut client, &conn, now, limit.max(25), false)?;
275 let result = list_waiting_from_db(&conn, project.as_deref(), limit)?;
276 println!("{}", serde_json::to_string(&result)?);
277 }
278 Commands::Inbox {
279 project,
280 status,
281 attention,
282 waiting_on,
283 limit,
284 } => {
285 let now = now_millis()?;
286 let db_path = state_db_path()?;
287 let conn = create_state_db(&db_path)?;
288 let mut client = CodexAppServerClient::connect()?;
289 sync_state_from_live(&mut client, &conn, now, limit.max(25), false)?;
290 let result = list_inbox_from_db(
291 &conn,
292 now,
293 project.as_deref(),
294 status.as_deref(),
295 attention.as_deref(),
296 waiting_on.as_deref(),
297 limit,
298 )?;
299 println!("{}", serde_json::to_string(&result)?);
300 }
301 Commands::Watch { once, exec, events } => {
302 let filter = parse_event_filter(events.as_deref());
303 let db_path = state_db_path()?;
304 let conn = create_state_db(&db_path)?;
305 if once {
306 let now = now_millis()?;
307 let mut client = CodexAppServerClient::connect()?;
308 let sync_result = match sync_state_from_live(&mut client, &conn, now, 50, true) {
309 Ok(sync_result) => sync_result,
310 Err(error) => {
311 let filtered = filter_watch_events(
312 vec![watch_thread_error_event(&error)],
313 filter.as_ref(),
314 );
315 for event in &filtered {
316 if let Some(command) = exec.as_deref() {
317 run_exec_hook(command, event)?;
318 }
319 }
320 println!("{}", serde_json::to_string(&json!({ "events": filtered }))?);
321 return Ok(());
322 }
323 };
324 let filtered = watch_events_from_sync_result(&sync_result, vec![], filter.as_ref());
325 for event in &filtered {
326 if let Some(command) = exec.as_deref() {
327 run_exec_hook(command, event)?;
328 }
329 }
330 println!("{}", serde_json::to_string(&json!({ "events": filtered }))?);
331 } else {
332 println!(
333 "{}",
334 serde_json::to_string(
335 &json!({ "type": "watch_started", "away": get_away_mode(&conn)?["away"] })
336 )?
337 );
338 let mut last = String::new();
339 let watch_rx = start_codex_watch_receiver().ok();
340 loop {
341 let now = now_millis()?;
342 let mut client = CodexAppServerClient::connect()?;
343 let filtered = match sync_state_from_live(&mut client, &conn, now, 50, true) {
344 Ok(sync_result) => watch_events_from_sync_result(
345 &sync_result,
346 client.drain_notifications(),
347 filter.as_ref(),
348 ),
349 Err(error) => filter_watch_events(
350 vec![watch_thread_error_event(&error)],
351 filter.as_ref(),
352 ),
353 };
354 let serialized = serde_json::to_string(&filtered)?;
355 if serialized != last {
356 last = serialized;
357 for event in filtered {
358 println!("{}", serde_json::to_string(&event)?);
359 if let Some(command) = exec.as_deref() {
360 run_exec_hook(command, &event)?;
361 }
362 }
363 }
364 if let Some(rx) = watch_rx.as_ref() {
365 rx.recv_timeout(std::time::Duration::from_millis(1500));
366 } else {
367 std::thread::sleep(std::time::Duration::from_millis(1500));
368 }
369 }
370 }
371 }
372 Commands::Daemon { command } => match command {
373 DaemonCommands::Run {
374 once,
375 poll_interval,
376 timeout_ms,
377 } => {
378 run_daemon(once, poll_interval, Duration::from_millis(timeout_ms))?;
379 }
380 DaemonCommands::Install {
381 dry_run,
382 label,
383 bridge_command,
384 } => {
385 let result = install_daemon_service(&label, &bridge_command, dry_run)?;
386 println!("{}", serde_json::to_string(&result)?);
387 }
388 DaemonCommands::Uninstall { dry_run, label } => {
389 let result = uninstall_daemon_service(&label, dry_run)?;
390 println!("{}", serde_json::to_string(&result)?);
391 }
392 DaemonCommands::Start { dry_run, label } => {
393 let result = start_daemon_service(&label, dry_run)?;
394 println!("{}", serde_json::to_string(&result)?);
395 }
396 DaemonCommands::Stop { dry_run, label } => {
397 let result = stop_daemon_service(&label, dry_run)?;
398 println!("{}", serde_json::to_string(&result)?);
399 }
400 DaemonCommands::Status { label } => {
401 let result = daemon_service_status(&label)?;
402 println!("{}", serde_json::to_string(&result)?);
403 }
404 DaemonCommands::Logs { label } => {
405 let result = daemon_service_logs(&label)?;
406 println!("{}", serde_json::to_string(&result)?);
407 }
408 },
409 Commands::Telegram { command } => match command {
410 TelegramCommands::Setup {
411 bot_token,
412 chat_id,
413 allowed_user_id,
414 events,
415 bridge_command,
416 pair_timeout_ms,
417 dry_run,
418 } => {
419 let result = telegram_setup_result(TelegramSetupOptions {
420 bot_token: bot_token.as_deref(),
421 chat_id: chat_id.as_deref(),
422 allowed_user_id: allowed_user_id.as_deref(),
423 events: &events,
424 bridge_command: &bridge_command,
425 dry_run,
426 pair_timeout_ms,
427 })?;
428 println!("{}", serde_json::to_string(&result)?);
429 }
430 TelegramCommands::Status => {
431 let result = telegram_status_result()?;
432 println!("{}", serde_json::to_string(&result)?);
433 }
434 TelegramCommands::Test {
435 message,
436 timeout_ms,
437 dry_run,
438 } => {
439 let result =
440 telegram_test_result(&message, Duration::from_millis(timeout_ms), dry_run)?;
441 println!("{}", serde_json::to_string(&result)?);
442 }
443 TelegramCommands::Disable { dry_run } => {
444 let result = telegram_disable_result(dry_run)?;
445 println!("{}", serde_json::to_string(&result)?);
446 }
447 },
448 Commands::Projects { command } => match command {
449 ProjectCommands::List { observed_limit } => {
450 let result = projects_list_result(observed_limit)?;
451 println!("{}", serde_json::to_string(&result)?);
452 }
453 ProjectCommands::Add {
454 cwd,
455 id,
456 label,
457 aliases,
458 dry_run,
459 } => {
460 let result =
461 project_add_result(&cwd, id.as_deref(), label.as_deref(), &aliases, dry_run)?;
462 println!("{}", serde_json::to_string(&result)?);
463 }
464 ProjectCommands::Import { limit, dry_run } => {
465 let result = project_import_result(limit, dry_run)?;
466 println!("{}", serde_json::to_string(&result)?);
467 }
468 ProjectCommands::Remove { id, dry_run } => {
469 let result = project_remove_result(&id, dry_run)?;
470 println!("{}", serde_json::to_string(&result)?);
471 }
472 },
473 Commands::Sync { limit } => {
474 let now = now_millis()?;
475 let db_path = state_db_path()?;
476 let conn = create_state_db(&db_path)?;
477 let mut client = CodexAppServerClient::connect()?;
478 let result = sync_state_from_live(&mut client, &conn, now, limit, false)?;
479 println!("{}", serde_json::to_string(&result)?);
480 }
481 Commands::New {
482 cwd,
483 message,
484 dry_run,
485 follow,
486 stream,
487 duration,
488 poll_interval,
489 events,
490 prompt,
491 } => {
492 let message = normalized_message(message.as_deref()).or_else(|| {
493 let joined = prompt.join(" ").trim().to_string();
494 (!joined.is_empty()).then_some(joined)
495 });
496 if dry_run {
497 println!(
498 "{}",
499 serde_json::to_string(&start_new_thread_dry_run(
500 cwd.as_deref(),
501 message.as_deref()
502 ))?
503 );
504 } else {
505 let mut client = CodexAppServerClient::connect()?;
506 let result = start_thread_in_cwd(&mut client, cwd.as_deref(), message.as_deref())?;
507 let result = if follow {
508 let db_path = state_db_path()?;
509 let conn = create_state_db(&db_path)?;
510 let filter = parse_event_filter(events.as_deref());
511 if let Some(thread_id) = result.get("threadId").and_then(Value::as_str) {
512 attach_follow_result(
513 result.clone(),
514 &mut client,
515 &conn,
516 FollowRun {
517 thread_id,
518 duration_ms: duration,
519 poll_interval_ms: poll_interval,
520 event_filter: filter.as_ref(),
521 stream,
522 },
523 )?
524 } else {
525 result
526 }
527 } else {
528 result
529 };
530 println!("{}", serde_json::to_string(&result)?);
531 }
532 }
533 Commands::Fork {
534 thread_id,
535 message,
536 dry_run,
537 follow,
538 stream,
539 duration,
540 poll_interval,
541 events,
542 prompt,
543 } => {
544 let message = normalized_message(message.as_deref()).or_else(|| {
545 let joined = prompt.join(" ").trim().to_string();
546 (!joined.is_empty()).then_some(joined)
547 });
548 if dry_run {
549 println!(
550 "{}",
551 serde_json::to_string(&fork_thread_dry_run(&thread_id, message.as_deref()))?
552 );
553 } else {
554 let mut client = CodexAppServerClient::connect()?;
555 let forked = client.request("thread/fork", json!({ "threadId": thread_id }))?;
556 let new_thread_id = thread_id_from_response(&forked);
557 let forked_cwd = thread_cwd_from_response(&forked, None);
558 let started = match (new_thread_id.as_deref(), message.as_deref()) {
559 (Some(new_thread_id), Some(message)) if !message.trim().is_empty() => {
560 Some(client.request(
561 "turn/start",
562 turn_start_params(new_thread_id, forked_cwd.as_deref(), message),
563 )?)
564 }
565 _ => None,
566 };
567 let result =
568 fork_thread_live_result(&thread_id, message.as_deref(), forked, started);
569 let result = if follow {
570 let db_path = state_db_path()?;
571 let conn = create_state_db(&db_path)?;
572 let filter = parse_event_filter(events.as_deref());
573 if let Some(thread_id) = result.get("threadId").and_then(Value::as_str) {
574 attach_follow_result(
575 result.clone(),
576 &mut client,
577 &conn,
578 FollowRun {
579 thread_id,
580 duration_ms: duration,
581 poll_interval_ms: poll_interval,
582 event_filter: filter.as_ref(),
583 stream,
584 },
585 )?
586 } else {
587 result
588 }
589 } else {
590 result
591 };
592 println!("{}", serde_json::to_string(&result)?);
593 }
594 }
595 Commands::Archive {
596 thread_id_option,
597 thread_ids,
598 project,
599 status,
600 attention,
601 limit,
602 dry_run,
603 yes,
604 } => {
605 let mut targets = Vec::new();
606 if let Some(raw) = thread_id_option {
607 let raw = raw.as_str();
608 targets.extend(
609 raw.split(',')
610 .map(str::trim)
611 .filter(|value| !value.is_empty())
612 .map(str::to_string),
613 );
614 }
615 targets.extend(thread_ids);
616 let now = now_millis()?;
617 let db_path = state_db_path()?;
618 let conn = create_state_db(&db_path)?;
619 if !dry_run && targets.is_empty() && !yes {
620 bail!("Refusing bulk archive without --yes or --dry-run");
621 }
622 let mut client = if dry_run {
623 None
624 } else {
625 Some(CodexAppServerClient::connect()?)
626 };
627 if !dry_run && targets.is_empty() {
628 if let Some(client) = client.as_mut() {
629 sync_state_from_live(client, &conn, now, 50, false)?;
630 }
631 }
632 let selection = resolve_archive_targets(
633 &conn,
634 &targets,
635 project.as_deref(),
636 status.as_deref(),
637 attention.as_deref(),
638 limit,
639 now,
640 )?;
641 if !dry_run && selection.using_filter_selection && !yes {
642 bail!("Refusing bulk archive without --yes or --dry-run");
643 }
644 if dry_run {
645 let results = selection
646 .targets
647 .into_iter()
648 .map(|thread_id| json!({ "threadId": thread_id, "status": "would_archive" }))
649 .collect::<Vec<_>>();
650 println!("{}", serde_json::to_string(&archive_result(true, results))?);
651 } else {
652 let mut results = Vec::new();
653 for target in selection.targets {
654 let result = client
655 .as_mut()
656 .context("archive client missing")?
657 .request("thread/archive", json!({ "threadId": target }))?;
658 record_action(
659 &conn,
660 &target,
661 "archive",
662 json!({ "result": result, "archivedAt": now }),
663 now,
664 )?;
665 results.push(json!({
666 "threadId": target,
667 "status": "archived",
668 "result": result
669 }));
670 }
671 println!(
672 "{}",
673 serde_json::to_string(&archive_result(false, results))?
674 );
675 }
676 }
677 Commands::Show { thread_id } => {
678 let db_path = state_db_path()?;
679 let conn = create_state_db(&db_path)?;
680 let mut client = CodexAppServerClient::connect()?;
681 let result = client.request(
682 "thread/read",
683 json!({
684 "threadId": thread_id,
685 "includeTurns": true
686 }),
687 )?;
688 println!(
689 "{}",
690 serde_json::to_string(
691 &build_show_thread_result(Some(&conn), &thread_id, result,)?
692 )?
693 );
694 }
695 Commands::Reply {
696 thread_id,
697 message,
698 dry_run,
699 follow,
700 stream,
701 duration,
702 poll_interval,
703 events,
704 prompt,
705 } => {
706 let message = message
707 .or_else(|| {
708 let joined = prompt.join(" ").trim().to_string();
709 (!joined.is_empty()).then_some(joined)
710 })
711 .unwrap_or_default()
712 .trim()
713 .to_string();
714 if message.is_empty() {
715 bail!("Reply message cannot be empty");
716 }
717 let sent_at = now_millis()?;
718 if dry_run {
719 let payload = ReplyResult {
720 ok: true,
721 action: "reply",
722 dry_run: true,
723 thread_id: &thread_id,
724 message: &message,
725 sent_at,
726 };
727 println!("{}", serde_json::to_string(&payload)?);
728 } else {
729 let mut client = CodexAppServerClient::connect()?;
730 let resumed = client.request("thread/resume", json!({ "threadId": thread_id }))?;
731 let started = client.request(
732 "turn/start",
733 json!({
734 "threadId": thread_id,
735 "input": [{
736 "type": "text",
737 "text": message,
738 "text_elements": []
739 }]
740 }),
741 )?;
742 let db_path = state_db_path()?;
743 let conn = create_state_db(&db_path)?;
744 record_action(
745 &conn,
746 &thread_id,
747 "reply",
748 json!({
749 "message": message,
750 "resumed": resumed,
751 "started": started,
752 "sentAt": sent_at
753 }),
754 sent_at,
755 )?;
756 let result = json!({
757 "ok": true,
758 "action": "reply",
759 "threadId": thread_id,
760 "message": message,
761 "sentAt": sent_at,
762 "resumed": resumed,
763 "started": started
764 });
765 let result = if follow {
766 let filter = parse_event_filter(events.as_deref());
767 attach_follow_result(
768 result,
769 &mut client,
770 &conn,
771 FollowRun {
772 thread_id: &thread_id,
773 duration_ms: duration,
774 poll_interval_ms: poll_interval,
775 event_filter: filter.as_ref(),
776 stream,
777 },
778 )?
779 } else {
780 result
781 };
782 println!("{}", serde_json::to_string(&result)?);
783 }
784 }
785 Commands::Approve {
786 thread_id,
787 decision,
788 dry_run,
789 follow,
790 stream,
791 duration,
792 poll_interval,
793 events,
794 positional_decision,
795 } => {
796 let normalized = decision
797 .or(positional_decision)
798 .unwrap_or_default()
799 .trim()
800 .to_lowercase();
801 let sent_text = match normalized.as_str() {
802 "approve" => "YES",
803 "deny" => "NO",
804 _ => bail!("Approval decision must be approve or deny"),
805 };
806 let sent_at = now_millis()?;
807 if dry_run {
808 let payload = ApproveResult {
809 ok: true,
810 action: "approve",
811 dry_run: true,
812 thread_id: &thread_id,
813 decision: &normalized,
814 sent_text,
815 sent_at,
816 };
817 println!("{}", serde_json::to_string(&payload)?);
818 } else {
819 let mut client = CodexAppServerClient::connect()?;
820 let resumed = client.request("thread/resume", json!({ "threadId": thread_id }))?;
821 let started = client.request(
822 "turn/start",
823 json!({
824 "threadId": thread_id,
825 "input": [{
826 "type": "text",
827 "text": sent_text,
828 "text_elements": []
829 }]
830 }),
831 )?;
832 let db_path = state_db_path()?;
833 let conn = create_state_db(&db_path)?;
834 record_action(
835 &conn,
836 &thread_id,
837 "approve",
838 json!({
839 "decision": normalized,
840 "sentText": sent_text,
841 "resumed": resumed,
842 "started": started,
843 "sentAt": sent_at
844 }),
845 sent_at,
846 )?;
847 let result = json!({
848 "ok": true,
849 "action": "approve",
850 "threadId": thread_id,
851 "decision": normalized,
852 "sentText": sent_text,
853 "sentAt": sent_at,
854 "resumed": resumed,
855 "started": started
856 });
857 let result = if follow {
858 let filter = parse_event_filter(events.as_deref());
859 attach_follow_result(
860 result,
861 &mut client,
862 &conn,
863 FollowRun {
864 thread_id: &thread_id,
865 duration_ms: duration,
866 poll_interval_ms: poll_interval,
867 event_filter: filter.as_ref(),
868 stream,
869 },
870 )?
871 } else {
872 result
873 };
874 println!("{}", serde_json::to_string(&result)?);
875 }
876 }
877 Commands::Mcp => {
878 let stdin = std::io::stdin();
879 let stdout = std::io::stdout();
880 run_mcp_server(stdin.lock(), stdout.lock())?;
881 }
882 Commands::Hermes { command } => match command {
883 HermesCommands::Install {
884 server_name,
885 hermes_command,
886 bridge_command,
887 dry_run,
888 } => {
889 let result = run_hermes_install(HermesInstallOptions {
890 server_name: &server_name,
891 hermes_command: &hermes_command,
892 bridge_command: &bridge_command,
893 dry_run,
894 })?;
895 println!("{}", serde_json::to_string(&result)?);
896 }
897 },
898 }
899
900 Ok(())
901}
902
903fn now_millis() -> Result<u64> {
904 Ok(SystemTime::now()
905 .duration_since(UNIX_EPOCH)
906 .map_err(|e| anyhow!(e))?
907 .as_millis() as u64)
908}
909
910fn importable_projects_from_observed(
911 observed: &[ObservedWorkspace],
912 existing_projects: &[RegisteredProject],
913) -> Vec<RegisteredProject> {
914 let mut projects = Vec::new();
915 let mut existing_ids = existing_projects
916 .iter()
917 .map(|project| project.id.clone())
918 .collect::<BTreeSet<_>>();
919 let existing_cwds = existing_projects
920 .iter()
921 .map(|project| project.cwd.clone())
922 .collect::<BTreeSet<_>>();
923 for workspace in observed {
924 if existing_cwds.contains(&workspace.cwd)
925 || projects
926 .iter()
927 .any(|project: &RegisteredProject| project.cwd == workspace.cwd)
928 {
929 continue;
930 }
931 let base_id =
932 slugify_project_token(&workspace.label).unwrap_or_else(|| "project".to_string());
933 let id = ensure_unique_project_id(&base_id, &existing_ids);
934 existing_ids.insert(id.clone());
935 projects.push(RegisteredProject {
936 id,
937 label: workspace.label.clone(),
938 cwd: workspace.cwd.clone(),
939 aliases: Vec::new(),
940 });
941 }
942 projects
943}
944
945fn daemon_run_command(bridge_command: &str) -> String {
946 format!("{} daemon run", shell_quote(bridge_command))
947}
948
949fn doctor_bridge() -> Result<DoctorBridge> {
950 let config_path = daemon_config_path()?;
951 let config = read_daemon_config_raw()?;
952 let service = daemon_service_spec(DEFAULT_DAEMON_LABEL, "codex-telegram-bridge")?;
953 Ok(DoctorBridge {
954 config_path: config_path.display().to_string(),
955 config_exists: config_path.exists(),
956 telegram_configured: config
957 .as_ref()
958 .and_then(|config| config.telegram.as_ref())
959 .is_some(),
960 daemon_service_path: service.service_path.display().to_string(),
961 daemon_service_exists: service.service_path.exists(),
962 })
963}
964
965fn setup_result(options: SetupOptions<'_>) -> Result<Value> {
966 let resolved = resolve_codex_binary()?;
967 let telegram = telegram_setup_result(TelegramSetupOptions {
968 bot_token: options.bot_token,
969 chat_id: options.chat_id,
970 allowed_user_id: options.allowed_user_id,
971 events: options.events,
972 bridge_command: options.bridge_command,
973 dry_run: options.dry_run,
974 pair_timeout_ms: options.pair_timeout_ms,
975 })?;
976 let daemon_install = if options.install_daemon {
977 Some(install_daemon_service(
978 options.daemon_label,
979 options.bridge_command,
980 options.dry_run,
981 )?)
982 } else {
983 None
984 };
985 let daemon_start = if options.start_daemon {
986 Some(start_daemon_service(options.daemon_label, options.dry_run)?)
987 } else {
988 None
989 };
990 let hermes = if options.register_hermes {
991 Some(run_hermes_install(HermesInstallOptions {
992 server_name: options.hermes_server_name,
993 hermes_command: options.hermes_command,
994 bridge_command: options.bridge_command,
995 dry_run: options.dry_run,
996 })?)
997 } else {
998 None
999 };
1000
1001 Ok(json!({
1002 "ok": true,
1003 "action": "setup",
1004 "dryRun": options.dry_run,
1005 "codex": {
1006 "resolvedPath": resolved.path.display().to_string(),
1007 "source": resolved.source
1008 },
1009 "telegram": telegram,
1010 "daemon": {
1011 "install": daemon_install,
1012 "start": daemon_start
1013 },
1014 "hermes": hermes,
1015 "nextStep": if options.dry_run {
1016 "Run setup without --dry-run, then use away on when leaving your computer."
1017 } else {
1018 "Use away on when leaving your computer. Reply to Codex Telegram messages to keep working from Telegram."
1019 }
1020 }))
1021}
1022
1023fn projects_list_result(observed_limit: u64) -> Result<Value> {
1024 let config = load_daemon_config()?;
1025 let db_path = state_db_path()?;
1026 let conn = create_state_db(&db_path)?;
1027 let observed = observed_workspaces_from_db(&conn, observed_limit)?;
1028 let importable = importable_projects_from_observed(&observed, &config.projects);
1029 Ok(json!({
1030 "ok": true,
1031 "action": "projects_list",
1032 "configured": config.projects,
1033 "observed": observed.into_iter().map(|workspace| json!({
1034 "label": workspace.label,
1035 "cwd": workspace.cwd,
1036 "lastSeenAt": workspace.last_seen_at
1037 })).collect::<Vec<_>>(),
1038 "importable": importable
1039 }))
1040}
1041
1042fn project_add_result(
1043 cwd: &str,
1044 id: Option<&str>,
1045 label: Option<&str>,
1046 aliases: &[String],
1047 dry_run: bool,
1048) -> Result<Value> {
1049 let mut config = load_daemon_config()?;
1050 let project = build_registered_project(cwd, id, label, aliases, &config.projects)?;
1051 if config
1052 .projects
1053 .iter()
1054 .any(|existing| existing.cwd == project.cwd)
1055 {
1056 bail!("project cwd `{}` is already registered", project.cwd);
1057 }
1058 if !dry_run {
1059 config.projects.push(project.clone());
1060 write_daemon_config(&config)?;
1061 }
1062 Ok(json!({
1063 "ok": true,
1064 "action": "projects_add",
1065 "dryRun": dry_run,
1066 "project": project
1067 }))
1068}
1069
1070fn project_import_result(limit: u64, dry_run: bool) -> Result<Value> {
1071 let mut config = load_daemon_config()?;
1072 let db_path = state_db_path()?;
1073 let conn = create_state_db(&db_path)?;
1074 let observed = observed_workspaces_from_db(&conn, limit)?;
1075 let importable = importable_projects_from_observed(&observed, &config.projects);
1076 if !dry_run && !importable.is_empty() {
1077 config.projects.extend(importable.clone());
1078 write_daemon_config(&config)?;
1079 }
1080 Ok(json!({
1081 "ok": true,
1082 "action": "projects_import",
1083 "dryRun": dry_run,
1084 "imported": importable,
1085 "count": importable.len()
1086 }))
1087}
1088
1089fn project_remove_result(id: &str, dry_run: bool) -> Result<Value> {
1090 let mut config = load_daemon_config()?;
1091 let Some(project) = config
1092 .projects
1093 .iter()
1094 .find(|project| project.id == id)
1095 .cloned()
1096 else {
1097 bail!("project `{id}` was not found");
1098 };
1099 if !dry_run {
1100 config.projects.retain(|candidate| candidate.id != id);
1101 write_daemon_config(&config)?;
1102 }
1103 Ok(json!({
1104 "ok": true,
1105 "action": "projects_remove",
1106 "dryRun": dry_run,
1107 "project": project
1108 }))
1109}
1110
1111const DEFAULT_NOTIFICATION_EVENTS: &str = "thread_waiting,thread_completed";
1112
1113#[derive(Debug, Clone, Copy)]
1114struct HermesInstallOptions<'a> {
1115 server_name: &'a str,
1116 hermes_command: &'a str,
1117 bridge_command: &'a str,
1118 dry_run: bool,
1119}
1120
1121fn run_hermes_install(options: HermesInstallOptions<'_>) -> Result<Value> {
1122 let server_name = options.server_name.trim();
1123 let hermes_command = options.hermes_command.trim();
1124 let bridge_command = options.bridge_command.trim();
1125
1126 if server_name.is_empty() {
1127 bail!("server name cannot be empty");
1128 }
1129 if hermes_command.is_empty() {
1130 bail!("hermes command cannot be empty");
1131 }
1132 if bridge_command.is_empty() {
1133 bail!("bridge command cannot be empty");
1134 }
1135
1136 let mcp_args = vec![
1137 "mcp".to_string(),
1138 "add".to_string(),
1139 server_name.to_string(),
1140 "--command".to_string(),
1141 bridge_command.to_string(),
1142 "--args".to_string(),
1143 "mcp".to_string(),
1144 ];
1145
1146 let base = json!({
1147 "ok": true,
1148 "action": "hermes_install",
1149 "dryRun": options.dry_run,
1150 "serverName": server_name,
1151 "hermesCommand": hermes_command,
1152 "bridgeCommand": bridge_command,
1153 "args": mcp_args.clone(),
1154 "mcp": {
1155 "configured": true,
1156 "args": mcp_args.clone(),
1157 "nextStep": "Restart Hermes so it reconnects to MCP servers and discovers codex_* tools."
1158 },
1159 "nextStep": "Restart Hermes for MCP discovery. Telegram notifications are configured with the top-level setup command, not through Hermes."
1160 });
1161 if options.dry_run {
1162 return Ok(base);
1163 }
1164
1165 let mcp_output = Command::new(hermes_command)
1166 .args(&mcp_args)
1167 .output()
1168 .with_context(|| format!("failed to run {hermes_command} mcp add"))?;
1169 if !mcp_output.status.success() {
1170 bail!(
1171 "Hermes MCP registration failed with status {}: {}",
1172 mcp_output.status,
1173 String::from_utf8_lossy(&mcp_output.stderr).trim()
1174 );
1175 }
1176
1177 Ok(json!({
1178 "ok": true,
1179 "action": "hermes_install",
1180 "dryRun": false,
1181 "serverName": server_name,
1182 "hermesCommand": hermes_command,
1183 "bridgeCommand": bridge_command,
1184 "args": mcp_args,
1185 "mcp": {
1186 "configured": true,
1187 "stdout": String::from_utf8_lossy(&mcp_output.stdout).trim(),
1188 "stderr": String::from_utf8_lossy(&mcp_output.stderr).trim()
1189 },
1190 "nextStep": "Restart Hermes for MCP discovery. Telegram notifications are configured with the top-level setup command, not through Hermes."
1191 }))
1192}
1193
1194fn redact_secret_text(text: &str, secret: &str) -> String {
1195 if secret.is_empty() {
1196 text.to_string()
1197 } else {
1198 text.replace(secret, "<redacted>")
1199 }
1200}
1201
1202fn shell_quote(value: &str) -> String {
1203 if !value.is_empty()
1204 && value.chars().all(|c| {
1205 c.is_ascii_alphanumeric() || matches!(c, '/' | '.' | '_' | '-' | ',' | ':' | '=')
1206 })
1207 {
1208 value.to_string()
1209 } else {
1210 format!("'{}'", value.replace('\'', "'\\''"))
1211 }
1212}
1213
1214fn event_thread_id(event: &Value) -> Option<String> {
1215 event
1216 .get("threadId")
1217 .and_then(Value::as_str)
1218 .or_else(|| event.pointer("/thread/threadId").and_then(Value::as_str))
1219 .or_else(|| event.pointer("/thread/id").and_then(Value::as_str))
1220 .map(str::to_string)
1221}
1222
1223fn notification_event_id(event: &Value) -> String {
1224 let event_type = event
1225 .get("type")
1226 .and_then(Value::as_str)
1227 .unwrap_or("codex_event");
1228 let thread_id = event_thread_id(event).unwrap_or_else(|| "unknown".to_string());
1229 let discriminator = event
1230 .get("eventKey")
1231 .and_then(Value::as_str)
1232 .map(str::to_string)
1233 .or_else(|| {
1234 event
1235 .get("updatedAt")
1236 .and_then(Value::as_u64)
1237 .map(|value| value.to_string())
1238 })
1239 .or_else(|| {
1240 event
1241 .get("observedAt")
1242 .and_then(Value::as_u64)
1243 .map(|value| value.to_string())
1244 })
1245 .unwrap_or_else(|| {
1246 serde_json::to_string(event)
1247 .map(|raw| sha256_hex(raw.as_bytes()))
1248 .unwrap_or_else(|_| "event".to_string())
1249 });
1250 sanitize_delivery_id(&format!("codex:{event_type}:{thread_id}:{discriminator}"))
1251}
1252
1253fn sanitize_delivery_id(value: &str) -> String {
1254 value
1255 .chars()
1256 .map(|c| {
1257 if c.is_ascii_alphanumeric() || matches!(c, '-' | '_' | '.') {
1258 c
1259 } else {
1260 '-'
1261 }
1262 })
1263 .collect()
1264}
1265
1266fn sha256_hex(message: &[u8]) -> String {
1267 hex_lower(&Sha256::digest(message))
1268}
1269
1270fn hex_lower(bytes: &[u8]) -> String {
1271 const HEX: &[u8; 16] = b"0123456789abcdef";
1272 let mut output = String::with_capacity(bytes.len() * 2);
1273 for byte in bytes {
1274 output.push(HEX[(byte >> 4) as usize] as char);
1275 output.push(HEX[(byte & 0x0f) as usize] as char);
1276 }
1277 output
1278}
1279
1280#[cfg(test)]
1281mod tests {
1282 use super::*;
1283
1284 #[test]
1285 fn derives_waiting_prompt_from_status_flags() {
1286 let summary = json!({
1287 "id": "thr_reply",
1288 "name": null,
1289 "cwd": "/tmp/reply",
1290 "updatedAt": 123,
1291 "status": {
1292 "type": "active",
1293 "activeFlags": ["waitingOnUserInput"]
1294 }
1295 });
1296 let thread = json!({
1297 "id": "thr_reply",
1298 "cwd": "/tmp/reply",
1299 "status": {
1300 "type": "active",
1301 "activeFlags": ["waitingOnUserInput"]
1302 },
1303 "turns": [
1304 {
1305 "status": "in_progress",
1306 "items": [
1307 {
1308 "type": "agentMessage",
1309 "phase": "final_answer",
1310 "text": "Can you confirm the plan?"
1311 }
1312 ]
1313 }
1314 ]
1315 });
1316
1317 let snapshot = normalize_thread_snapshot(&summary, &thread).expect("snapshot");
1318 let prompt = snapshot.pending_prompt.expect("pending prompt");
1319 assert_eq!(prompt.kind, "reply");
1320 assert_eq!(
1321 prompt.question.as_deref(),
1322 Some("Can you confirm the plan?")
1323 );
1324 assert_eq!(snapshot.last_turn_status.as_deref(), Some("in_progress"));
1325 }
1326
1327 #[test]
1328 fn hermes_install_dry_run_builds_mcp_add_command() {
1329 let result = run_hermes_install(HermesInstallOptions {
1330 server_name: "codex",
1331 hermes_command: "hermes-se",
1332 bridge_command: "codex-telegram-bridge",
1333 dry_run: true,
1334 })
1335 .expect("dry-run install result");
1336
1337 assert_eq!(result["action"], "hermes_install");
1338 assert_eq!(result["dryRun"], true);
1339 assert_eq!(result["hermesCommand"], "hermes-se");
1340 assert_eq!(
1341 result["args"],
1342 json!([
1343 "mcp",
1344 "add",
1345 "codex",
1346 "--command",
1347 "codex-telegram-bridge",
1348 "--args",
1349 "mcp"
1350 ])
1351 );
1352 assert_eq!(
1353 result["mcp"]["nextStep"],
1354 "Restart Hermes so it reconnects to MCP servers and discovers codex_* tools."
1355 );
1356 assert!(
1357 result.get("notificationLane").is_none(),
1358 "Hermes install should not configure Telegram notifications"
1359 );
1360 }
1361
1362 #[test]
1363 fn outbound_events_dedupe_retry_and_deliver_durably() {
1364 let conn = create_state_db_in_memory().expect("db");
1365 let event = json!({
1366 "type": "thread_waiting",
1367 "threadId": "thr_1",
1368 "updatedAt": 42
1369 });
1370
1371 assert!(enqueue_outbound_event(&conn, &event, 1000).expect("enqueue"));
1372 assert!(
1373 !enqueue_outbound_event(&conn, &event, 1001).expect("dedupe"),
1374 "same delivery id should not enqueue twice"
1375 );
1376
1377 let failed = deliver_due_outbound_events(&conn, 1000, 10, |_| bail!("Hermes offline"))
1378 .expect("failed delivery summary");
1379 assert_eq!(
1380 failed,
1381 OutboxDeliverySummary {
1382 attempted: 1,
1383 delivered: 0,
1384 failed: 1
1385 }
1386 );
1387 assert_eq!(pending_outbound_count(&conn).expect("pending"), 1);
1388
1389 let delayed = deliver_due_outbound_events(&conn, 1000, 10, |_| Ok(json!({"ok": true})))
1390 .expect("not due summary");
1391 assert_eq!(delayed.attempted, 0);
1392
1393 let delivered = deliver_due_outbound_events(&conn, 2000, 10, |_| Ok(json!({"ok": true})))
1394 .expect("delivered summary");
1395 assert_eq!(
1396 delivered,
1397 OutboxDeliverySummary {
1398 attempted: 1,
1399 delivered: 1,
1400 failed: 0
1401 }
1402 );
1403 assert_eq!(pending_outbound_count(&conn).expect("pending"), 0);
1404 }
1405
1406 #[test]
1407 fn transport_delivery_log_tracks_each_transport_once() {
1408 let conn = create_state_db_in_memory().expect("db");
1409
1410 assert!(
1411 !transport_delivery_exists(&conn, "event_1", "telegram").expect("lookup"),
1412 "transport should not start delivered"
1413 );
1414 record_transport_delivery(
1415 &conn,
1416 "event_1",
1417 "telegram",
1418 &json!({ "messageId": 111 }),
1419 1000,
1420 )
1421 .expect("record delivery");
1422
1423 assert!(
1424 transport_delivery_exists(&conn, "event_1", "telegram").expect("lookup"),
1425 "recorded transport should be treated as delivered"
1426 );
1427 assert!(
1428 !transport_delivery_exists(&conn, "event_1", "hermes").expect("lookup"),
1429 "other transports for the same event must remain pending"
1430 );
1431 }
1432
1433 #[test]
1434 fn inbox_age_seconds_handles_mixed_timestamp_units() {
1435 let snapshot = snapshot_fixture(
1436 "thr_recent",
1437 "/tmp/project",
1438 1_776_219_396,
1439 "notLoaded",
1440 vec![],
1441 Some("completed"),
1442 );
1443
1444 let item = classify_inbox_item(&snapshot, 1_776_219_400_000);
1445
1446 assert_eq!(item.age_seconds, Some(4));
1447 }
1448
1449 fn snapshot_fixture(
1450 thread_id: &str,
1451 cwd: &str,
1452 updated_at: u64,
1453 status_type: &str,
1454 status_flags: Vec<&str>,
1455 last_turn_status: Option<&str>,
1456 ) -> BridgeThreadSnapshot {
1457 let status_flags_vec = status_flags
1458 .into_iter()
1459 .map(|s| s.to_string())
1460 .collect::<Vec<_>>();
1461 BridgeThreadSnapshot {
1462 thread_id: thread_id.to_string(),
1463 name: None,
1464 cwd: Some(cwd.to_string()),
1465 updated_at: Some(updated_at),
1466 status_type: status_type.to_string(),
1467 status_flags: status_flags_vec.clone(),
1468 last_turn_status: last_turn_status.map(|s| s.to_string()),
1469 last_preview: Some(format!("preview for {thread_id}")),
1470 pending_prompt: derive_pending_prompt(
1471 thread_id,
1472 &status_flags_vec,
1473 Some(format!("preview for {thread_id}")),
1474 ),
1475 }
1476 }
1477}