Skip to main content

post_cortex_daemon/daemon/
stdio_proxy.rs

1// Copyright (c) 2025 Julius ML
2// MIT License
3
4//! stdio-to-daemon proxy with auto-start capability
5//!
6//! This module provides a stdio interface that proxies MCP requests to a running daemon.
7//! If no daemon is running, it automatically starts one in the background.
8
9use crate::daemon::DaemonConfig;
10use arc_swap::ArcSwap;
11use std::io::{self, BufRead};
12use std::process::{Command, Stdio};
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::net::TcpStream;
16use tokio::time::sleep;
17use tracing::{error, info};
18
19const MAX_STARTUP_WAIT_SECS: u64 = 30;
20const STARTUP_CHECK_INTERVAL_MS: u64 = 100;
21
22/// Check if daemon is running by attempting TCP connection
23pub async fn is_daemon_running(host: &str, port: u16) -> bool {
24    let addr = format!("{}:{}", host, port);
25    TcpStream::connect(&addr).await.is_ok()
26}
27
28/// Start daemon in background process
29pub fn start_daemon_background(config: &DaemonConfig) -> Result<(), String> {
30    let current_exe =
31        std::env::current_exe().map_err(|e| format!("Failed to get current executable: {}", e))?;
32
33    info!("Starting daemon in background: {:?}", current_exe);
34
35    let child = Command::new(&current_exe)
36        .args(["start", "--daemon"])
37        .env("PCX_PORT", config.port.to_string())
38        .env("PCX_HOST", &config.host)
39        .env("PCX_DATA_DIR", &config.data_directory)
40        .stdin(Stdio::null())
41        .stdout(Stdio::null())
42        .stderr(Stdio::null())
43        .spawn()
44        .map_err(|e| format!("Failed to spawn daemon: {}", e))?;
45
46    info!("Daemon process spawned with PID: {}", child.id());
47    Ok(())
48}
49
50/// Wait for daemon to become ready
51pub async fn wait_for_daemon(host: &str, port: u16) -> Result<(), String> {
52    let start = std::time::Instant::now();
53    let timeout = Duration::from_secs(MAX_STARTUP_WAIT_SECS);
54
55    while start.elapsed() < timeout {
56        if is_daemon_running(host, port).await {
57            info!("Daemon is ready on {}:{}", host, port);
58            return Ok(());
59        }
60        sleep(Duration::from_millis(STARTUP_CHECK_INTERVAL_MS)).await;
61    }
62
63    Err(format!(
64        "Daemon failed to start within {} seconds",
65        MAX_STARTUP_WAIT_SECS
66    ))
67}
68
69/// Ensure daemon is running, starting it if necessary
70pub async fn ensure_daemon_running(config: &DaemonConfig) -> Result<(), String> {
71    if is_daemon_running(&config.host, config.port).await {
72        info!("Daemon already running on {}:{}", config.host, config.port);
73        return Ok(());
74    }
75
76    info!("Daemon not running, starting in background...");
77    start_daemon_background(config)?;
78    wait_for_daemon(&config.host, config.port).await
79}
80
81/// Run stdio proxy mode - bridges stdin/stdout to daemon via Streamable HTTP
82pub async fn run_stdio_proxy(config: DaemonConfig) -> Result<(), String> {
83    // Ensure daemon is running
84    ensure_daemon_running(&config).await?;
85
86    let mcp_url = format!("http://{}:{}/mcp", config.host, config.port);
87    let client = reqwest::Client::new();
88
89    info!("Connecting to daemon at {}", mcp_url);
90
91    // Session ID storage (lock-free via ArcSwap)
92    let session_id: Arc<ArcSwap<Option<String>>> = Arc::new(ArcSwap::from_pointee(None));
93
94    // Stdin reader - send requests to daemon and write responses to stdout
95    let stdin_handle = tokio::task::spawn_blocking({
96        let client = client.clone();
97        let mcp_url = mcp_url.clone();
98        let session_id = session_id.clone();
99
100        move || {
101            let stdin = io::stdin();
102            let reader = stdin.lock();
103            let rt = tokio::runtime::Handle::current();
104
105            for line in reader.lines() {
106                match line {
107                    Ok(line) if !line.trim().is_empty() => {
108                        let client = client.clone();
109                        let url = mcp_url.clone();
110                        let session_id = session_id.clone();
111
112                        rt.block_on(async {
113                            // Build request with proper headers for Streamable HTTP
114                            let mut request = client
115                                .post(&url)
116                                .header("Content-Type", "application/json")
117                                .header("Accept", "application/json, text/event-stream");
118
119                            // Add session ID header if we have one (lock-free load)
120                            let current_sid = session_id.load();
121                            if let Some(ref id) = **current_sid {
122                                request = request.header("Mcp-Session-Id", id.clone());
123                            }
124
125                            match request.body(line).send().await {
126                                Ok(resp) => {
127                                    // Extract session ID from response header (lock-free store)
128                                    if let Some(new_sid) = resp.headers().get("mcp-session-id")
129                                        && let Ok(sid_str) = new_sid.to_str()
130                                        && session_id.load().is_none()
131                                    {
132                                        info!("Got session ID: {}", sid_str);
133                                        session_id.store(Arc::new(Some(sid_str.to_string())));
134                                    }
135
136                                    if resp.status().is_success() {
137                                        // Read response body and parse SSE events
138                                        match resp.text().await {
139                                            Ok(body) => {
140                                                // Parse SSE format: "data: {...}\n\n"
141                                                for event in body.split("\n\n") {
142                                                    if let Some(data) = event.strip_prefix("data: ")
143                                                    {
144                                                        let data = data.trim();
145                                                        if !data.is_empty() {
146                                                            println!("{}", data);
147                                                        }
148                                                    }
149                                                }
150                                            }
151                                            Err(e) => {
152                                                error!("Failed to read response: {}", e);
153                                            }
154                                        }
155                                    } else {
156                                        error!("Request failed with status: {}", resp.status());
157                                        if let Ok(body) = resp.text().await {
158                                            error!("Response: {}", body);
159                                        }
160                                    }
161                                }
162                                Err(e) => {
163                                    error!("Failed to send request: {}", e);
164                                }
165                            }
166                        });
167                    }
168                    Ok(_) => {} // Empty line, skip
169                    Err(e) => {
170                        error!("Error reading stdin: {}", e);
171                        break;
172                    }
173                }
174            }
175        }
176    });
177
178    // Wait for stdin to close (client disconnected)
179    let _ = stdin_handle.await;
180
181    Ok(())
182}