agcodex_mcp_server/
lib.rs1#![deny(clippy::print_stdout, clippy::print_stderr)]
3
4use std::io::Result as IoResult;
5use std::path::PathBuf;
6
7use agcodex_mcp_types::JSONRPCMessage;
8use tokio::io::AsyncBufReadExt;
9use tokio::io::AsyncWriteExt;
10use tokio::io::BufReader;
11use tokio::io::{self};
12use tokio::sync::mpsc;
13use tracing::debug;
14use tracing::error;
15use tracing::info;
16use tracing_subscriber::EnvFilter;
17
18mod codex_message_processor;
19mod codex_tool_config;
20mod codex_tool_runner;
21mod error_code;
22mod exec_approval;
23mod json_to_toml;
24pub(crate) mod message_processor;
25mod outgoing_message;
26mod patch_approval;
27
28use crate::message_processor::MessageProcessor;
29use crate::outgoing_message::OutgoingMessage;
30use crate::outgoing_message::OutgoingMessageSender;
31
32pub use crate::codex_tool_config::CodexToolCallParam;
33pub use crate::codex_tool_config::CodexToolCallReplyParam;
34pub use crate::exec_approval::ExecApprovalElicitRequestParams;
35pub use crate::exec_approval::ExecApprovalResponse;
36pub use crate::patch_approval::PatchApprovalElicitRequestParams;
37pub use crate::patch_approval::PatchApprovalResponse;
38
39const CHANNEL_CAPACITY: usize = 128;
43
44pub async fn run_main(codex_linux_sandbox_exe: Option<PathBuf>) -> IoResult<()> {
45 tracing_subscriber::fmt()
48 .with_writer(std::io::stderr)
49 .with_env_filter(EnvFilter::from_default_env())
50 .init();
51
52 let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
54 let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
55
56 let stdin_reader_handle = tokio::spawn({
58 let incoming_tx = incoming_tx.clone();
59 async move {
60 let stdin = io::stdin();
61 let reader = BufReader::new(stdin);
62 let mut lines = reader.lines();
63
64 while let Some(line) = lines.next_line().await.unwrap_or_default() {
65 match serde_json::from_str::<JSONRPCMessage>(&line) {
66 Ok(msg) => {
67 if incoming_tx.send(msg).await.is_err() {
68 break;
70 }
71 }
72 Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
73 }
74 }
75
76 debug!("stdin reader finished (EOF)");
77 }
78 });
79
80 let processor_handle = tokio::spawn({
82 let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
83 let mut processor = MessageProcessor::new(outgoing_message_sender, codex_linux_sandbox_exe);
84 async move {
85 while let Some(msg) = incoming_rx.recv().await {
86 match msg {
87 JSONRPCMessage::Request(r) => processor.process_request(r).await,
88 JSONRPCMessage::Response(r) => processor.process_response(r).await,
89 JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
90 JSONRPCMessage::Error(e) => processor.process_error(e),
91 }
92 }
93
94 info!("processor task exited (channel closed)");
95 }
96 });
97
98 let stdout_writer_handle = tokio::spawn(async move {
100 let mut stdout = io::stdout();
101 while let Some(outgoing_message) = outgoing_rx.recv().await {
102 let msg: JSONRPCMessage = outgoing_message.into();
103 match serde_json::to_string(&msg) {
104 Ok(json) => {
105 if let Err(e) = stdout.write_all(json.as_bytes()).await {
106 error!("Failed to write to stdout: {e}");
107 break;
108 }
109 if let Err(e) = stdout.write_all(b"\n").await {
110 error!("Failed to write newline to stdout: {e}");
111 break;
112 }
113 if let Err(e) = stdout.flush().await {
114 error!("Failed to flush stdout: {e}");
115 break;
116 }
117 }
118 Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
119 }
120 }
121
122 info!("stdout writer exited (channel closed)");
123 });
124
125 let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
129
130 Ok(())
131}