post_cortex_daemon/daemon/
stdio_proxy.rs1use crate::daemon::DaemonConfig;
10use arc_swap::ArcSwap;
11use std::io::{self, BufRead};
12use std::process::{Command, Stdio};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::net::TcpStream;
16use tokio::time::sleep;
17use tracing::{error, info};
18
19const MAX_STARTUP_WAIT_SECS: u64 = 30;
20const STARTUP_CHECK_INTERVAL_MS: u64 = 100;
21
22pub async fn is_daemon_running(host: &str, port: u16) -> bool {
24 let addr = format!("{}:{}", host, port);
25 TcpStream::connect(&addr).await.is_ok()
26}
27
28pub fn start_daemon_background(config: &DaemonConfig) -> Result<(), String> {
30 let current_exe =
31 std::env::current_exe().map_err(|e| format!("Failed to get current executable: {}", e))?;
32
33 info!("Starting daemon in background: {:?}", current_exe);
34
35 let child = Command::new(¤t_exe)
36 .args(["start", "--daemon"])
37 .env("PCX_PORT", config.port.to_string())
38 .env("PCX_HOST", &config.host)
39 .env("PCX_DATA_DIR", &config.data_directory)
40 .stdin(Stdio::null())
41 .stdout(Stdio::null())
42 .stderr(Stdio::null())
43 .spawn()
44 .map_err(|e| format!("Failed to spawn daemon: {}", e))?;
45
46 info!("Daemon process spawned with PID: {}", child.id());
47 Ok(())
48}
49
50pub async fn wait_for_daemon(host: &str, port: u16) -> Result<(), String> {
52 let start = std::time::Instant::now();
53 let timeout = Duration::from_secs(MAX_STARTUP_WAIT_SECS);
54
55 while start.elapsed() < timeout {
56 if is_daemon_running(host, port).await {
57 info!("Daemon is ready on {}:{}", host, port);
58 return Ok(());
59 }
60 sleep(Duration::from_millis(STARTUP_CHECK_INTERVAL_MS)).await;
61 }
62
63 Err(format!(
64 "Daemon failed to start within {} seconds",
65 MAX_STARTUP_WAIT_SECS
66 ))
67}
68
69pub async fn ensure_daemon_running(config: &DaemonConfig) -> Result<(), String> {
71 if is_daemon_running(&config.host, config.port).await {
72 info!("Daemon already running on {}:{}", config.host, config.port);
73 return Ok(());
74 }
75
76 info!("Daemon not running, starting in background...");
77 start_daemon_background(config)?;
78 wait_for_daemon(&config.host, config.port).await
79}
80
81pub async fn run_stdio_proxy(config: DaemonConfig) -> Result<(), String> {
83 ensure_daemon_running(&config).await?;
85
86 let mcp_url = format!("http://{}:{}/mcp", config.host, config.port);
87 let client = reqwest::Client::new();
88
89 info!("Connecting to daemon at {}", mcp_url);
90
91 let session_id: Arc<ArcSwap<Option<String>>> = Arc::new(ArcSwap::from_pointee(None));
93
94 let stdin_handle = tokio::task::spawn_blocking({
96 let client = client.clone();
97 let mcp_url = mcp_url.clone();
98 let session_id = session_id.clone();
99
100 move || {
101 let stdin = io::stdin();
102 let reader = stdin.lock();
103 let rt = tokio::runtime::Handle::current();
104
105 for line in reader.lines() {
106 match line {
107 Ok(line) if !line.trim().is_empty() => {
108 let client = client.clone();
109 let url = mcp_url.clone();
110 let session_id = session_id.clone();
111
112 rt.block_on(async {
113 let mut request = client
115 .post(&url)
116 .header("Content-Type", "application/json")
117 .header("Accept", "application/json, text/event-stream");
118
119 let current_sid = session_id.load();
121 if let Some(ref id) = **current_sid {
122 request = request.header("Mcp-Session-Id", id.clone());
123 }
124
125 match request.body(line).send().await {
126 Ok(resp) => {
127 if let Some(new_sid) = resp.headers().get("mcp-session-id")
129 && let Ok(sid_str) = new_sid.to_str()
130 && session_id.load().is_none()
131 {
132 info!("Got session ID: {}", sid_str);
133 session_id.store(Arc::new(Some(sid_str.to_string())));
134 }
135
136 if resp.status().is_success() {
137 match resp.text().await {
139 Ok(body) => {
140 for event in body.split("\n\n") {
142 if let Some(data) = event.strip_prefix("data: ")
143 {
144 let data = data.trim();
145 if !data.is_empty() {
146 println!("{}", data);
147 }
148 }
149 }
150 }
151 Err(e) => {
152 error!("Failed to read response: {}", e);
153 }
154 }
155 } else {
156 error!("Request failed with status: {}", resp.status());
157 if let Ok(body) = resp.text().await {
158 error!("Response: {}", body);
159 }
160 }
161 }
162 Err(e) => {
163 error!("Failed to send request: {}", e);
164 }
165 }
166 });
167 }
168 Ok(_) => {} Err(e) => {
170 error!("Error reading stdin: {}", e);
171 break;
172 }
173 }
174 }
175 }
176 });
177
178 let _ = stdin_handle.await;
180
181 Ok(())
182}