post_cortex_daemon/daemon/
stdio_proxy.rs1use crate::daemon::DaemonConfig;
10use arc_swap::ArcSwap;
11use std::io::{self, BufRead};
12use std::process::{Command, Stdio};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::net::TcpStream;
16use tokio::time::sleep;
17use tracing::{error, info};
18
19const MAX_STARTUP_WAIT_SECS: u64 = 30;
20const STARTUP_CHECK_INTERVAL_MS: u64 = 100;
21
22pub async fn is_daemon_running(host: &str, port: u16) -> bool {
24 let addr = format!("{}:{}", host, port);
25 TcpStream::connect(&addr).await.is_ok()
26}
27
28pub fn start_daemon_background(config: &DaemonConfig) -> Result<(), String> {
30 let current_exe =
31 std::env::current_exe().map_err(|e| format!("Failed to get current executable: {}", e))?;
32
33 info!("Starting daemon in background: {:?}", current_exe);
34
35 let child = Command::new(¤t_exe)
36 .args(["start", "--daemon"])
37 .env("PCX_PORT", config.port.to_string())
38 .env("PCX_HOST", &config.host)
39 .env("PCX_DATA_DIR", &config.data_directory)
40 .stdin(Stdio::null())
41 .stdout(Stdio::null())
42 .stderr(Stdio::null())
43 .spawn()
44 .map_err(|e| format!("Failed to spawn daemon: {}", e))?;
45
46 info!("Daemon process spawned with PID: {}", child.id());
47 Ok(())
48}
49
50pub async fn wait_for_daemon(host: &str, port: u16) -> Result<(), String> {
52 let start = std::time::Instant::now();
53 let timeout = Duration::from_secs(MAX_STARTUP_WAIT_SECS);
54
55 while start.elapsed() < timeout {
56 if is_daemon_running(host, port).await {
57 info!("Daemon is ready on {}:{}", host, port);
58 return Ok(());
59 }
60 sleep(Duration::from_millis(STARTUP_CHECK_INTERVAL_MS)).await;
61 }
62
63 Err(format!(
64 "Daemon failed to start within {} seconds",
65 MAX_STARTUP_WAIT_SECS
66 ))
67}
68
69pub async fn ensure_daemon_running(config: &DaemonConfig) -> Result<(), String> {
71 if is_daemon_running(&config.host, config.port).await {
72 info!("Daemon already running on {}:{}", config.host, config.port);
73 return Ok(());
74 }
75
76 info!("Daemon not running, starting in background...");
77 start_daemon_background(config)?;
78 wait_for_daemon(&config.host, config.port).await
79}
80
81pub async fn run_stdio_proxy(config: DaemonConfig) -> Result<(), String> {
83 ensure_daemon_running(&config).await?;
85
86 let mcp_url = format!("http://{}:{}/mcp", config.host, config.port);
87 let client = reqwest::Client::new();
88
89 info!("Connecting to daemon at {}", mcp_url);
90
91 let session_id: Arc<ArcSwap<Option<String>>> = Arc::new(ArcSwap::from_pointee(None));
93
94 let stdin_handle = tokio::task::spawn_blocking({
96 let client = client.clone();
97 let mcp_url = mcp_url.clone();
98 let session_id = session_id.clone();
99
100 move || {
101 let stdin = io::stdin();
102 let reader = stdin.lock();
103 let rt = tokio::runtime::Handle::current();
104
105 for line in reader.lines() {
106 match line {
107 Ok(line) if !line.trim().is_empty() => {
108 let client = client.clone();
109 let url = mcp_url.clone();
110 let session_id = session_id.clone();
111
112 rt.block_on(async {
113 let mut request = client
115 .post(&url)
116 .header("Content-Type", "application/json")
117 .header("Accept", "application/json, text/event-stream");
118
119 let current_sid = session_id.load();
121 if let Some(ref id) = **current_sid {
122 request = request.header("Mcp-Session-Id", id.clone());
123 }
124
125 match request.body(line).send().await {
126 Ok(resp) => {
127 if let Some(new_sid) = resp.headers().get("mcp-session-id") {
129 if let Ok(sid_str) = new_sid.to_str() {
130 if session_id.load().is_none() {
131 info!("Got session ID: {}", sid_str);
132 session_id
133 .store(Arc::new(Some(sid_str.to_string())));
134 }
135 }
136 }
137
138 if resp.status().is_success() {
139 match resp.text().await {
141 Ok(body) => {
142 for event in body.split("\n\n") {
144 if let Some(data) = event.strip_prefix("data: ")
145 {
146 let data = data.trim();
147 if !data.is_empty() {
148 println!("{}", data);
149 }
150 }
151 }
152 }
153 Err(e) => {
154 error!("Failed to read response: {}", e);
155 }
156 }
157 } else {
158 error!("Request failed with status: {}", resp.status());
159 if let Ok(body) = resp.text().await {
160 error!("Response: {}", body);
161 }
162 }
163 }
164 Err(e) => {
165 error!("Failed to send request: {}", e);
166 }
167 }
168 });
169 }
170 Ok(_) => {} Err(e) => {
172 error!("Error reading stdin: {}", e);
173 break;
174 }
175 }
176 }
177 }
178 });
179
180 let _ = stdin_handle.await;
182
183 Ok(())
184}