Skip to main content

zagens_runtime/runtime_serve/
http.rs

1//! HTTP sidecar bootstrap — DI assembly and axum serve loop (D16 E1-d).
2
3use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result, anyhow};
9use sha2::{Digest, Sha256};
10use tokio::io::AsyncBufReadExt;
11use tokio::io::BufReader;
12use tokio::net::TcpListener;
13use tokio::sync::Mutex;
14use tokio_util::sync::CancellationToken;
15
16use crate::automation_manager::{AutomationManager, AutomationSchedulerConfig, spawn_scheduler};
17use crate::config::Config;
18use crate::runtime_api::{ResumeTaskTracker, RuntimeApiState, build_router};
19use crate::runtime_threads::{RuntimeThreadManager, RuntimeThreadManagerConfig};
20use crate::session_manager::{SessionManager, default_sessions_dir};
21use crate::task_manager::{TaskManager, TaskManagerConfig};
22
23#[derive(Debug, Clone)]
24pub struct RuntimeApiOptions {
25    pub host: String,
26    pub port: u16,
27    pub workers: usize,
28    /// Additional CORS origins to allow on top of the built-in defaults
29    /// (`http://localhost:{3000,1420}`, `http://127.0.0.1:{3000,1420}`,
30    /// `tauri://localhost`). Populated by `--cors-origin` (repeatable),
31    /// `DEEPSEEK_CORS_ORIGINS` (comma-separated), and `[runtime_api]
32    /// cors_origins` in `config.toml`. Whalescale#255 / #561.
33    pub cors_origins: Vec<String>,
34    /// Optional bearer token required for `/v1/*` routes. If omitted here,
35    /// `run_http_server` also checks `DEEPSEEK_RUNTIME_TOKEN`.
36    pub auth_token: Option<String>,
37}
38
39impl Default for RuntimeApiOptions {
40    fn default() -> Self {
41        Self {
42            host: "127.0.0.1".to_string(),
43            port: 7878,
44            workers: 8,
45            cors_origins: Vec::new(),
46            auth_token: None,
47        }
48    }
49}
50
51/// Start the runtime API server.
52///
53/// `options.port == 0` is now accepted and means "let the OS pick an ephemeral
54/// port". The actually bound port is reported back to the supervisor via the
55/// `DS_PICK_READY` line (`port: <bound>`) and through the `local_addr().port()`
56/// log line below; Zagens desktop consumes it via `tokio::sync::watch::<u16>`
57/// (see `crates/desktop/src/sidecar.rs` D2 work). The guard that previously
58/// rejected port 0 was removed in this commit (D2 follow-up).
59pub async fn run_http_server(
60    config: Config,
61    workspace: PathBuf,
62    options: RuntimeApiOptions,
63) -> Result<()> {
64    let t0 = std::time::Instant::now();
65    eprintln!("[deepseek-runtime] starting HTTP API (task manager, threads, scheduler)…");
66
67    let task_cfg = TaskManagerConfig::from_runtime(
68        &config,
69        workspace.clone(),
70        config.default_text_model.clone(),
71        Some(options.workers),
72    );
73    let manager_cfg = RuntimeThreadManagerConfig::from_task_data_dir(task_cfg.data_dir.clone());
74    let sessions_dir = default_sessions_dir()
75        .unwrap_or_else(|_| zagens_config::user_data_path_or_relative("sessions"));
76    let shared_session_manager = Arc::new(
77        SessionManager::new(sessions_dir.clone()).context("Failed to create SessionManager")?,
78    );
79    let sb_config = config.clone();
80    let sb_workspace = workspace.clone();
81    let session_manager_for_threads = shared_session_manager.clone();
82    let runtime_threads = Arc::new(
83        tokio::task::spawn_blocking(move || {
84            RuntimeThreadManager::open_with_session_manager(
85                sb_config,
86                sb_workspace,
87                manager_cfg,
88                Some(session_manager_for_threads),
89            )
90        })
91        .await
92        .map_err(|e| anyhow!("RuntimeThreadManager::open panicked: {e}"))??,
93    );
94    eprintln!(
95        "[deepseek-runtime] RuntimeThreadManager::open ok (+{:?})",
96        t0.elapsed()
97    );
98    let task_manager =
99        TaskManager::start_with_runtime_manager(task_cfg, config.clone(), runtime_threads.clone())
100            .await?;
101    eprintln!(
102        "[deepseek-runtime] TaskManager::start ok (+{:?})",
103        t0.elapsed()
104    );
105    let automations = Arc::new(Mutex::new(AutomationManager::default_location()?));
106    runtime_threads.attach_automation_manager(automations.clone());
107    let scheduler_cancel = CancellationToken::new();
108    let scheduler_handle = spawn_scheduler(
109        automations.clone(),
110        task_manager.clone(),
111        scheduler_cancel.clone(),
112        AutomationSchedulerConfig::default(),
113    );
114
115    let runtime_token = options
116        .auth_token
117        .clone()
118        .or_else(|| std::env::var("DEEPSEEK_RUNTIME_TOKEN").ok())
119        .filter(|token| !token.trim().is_empty());
120    let auth_enabled = runtime_token.is_some();
121
122    let process_started_at_ms = SystemTime::now()
123        .duration_since(UNIX_EPOCH)
124        .unwrap_or_default()
125        .as_millis();
126    let token_fingerprint = {
127        let mut hasher = Sha256::new();
128        hasher.update(runtime_token.as_deref().unwrap_or(""));
129        let hash = hasher.finalize();
130        let fp: String = hash[..16].iter().map(|b| format!("{b:02x}")).collect();
131        Arc::new(fp)
132    };
133
134    let mut shared_mcp_pool = crate::mcp::McpPool::from_config_path(&config.mcp_config_path())
135        .context("Failed to load MCP config for shared pool")?;
136    if let Some(network_toml) = config.network.clone() {
137        let decider = crate::network_policy::NetworkPolicyDecider::with_default_audit(
138            network_toml.into_runtime(),
139        );
140        shared_mcp_pool = shared_mcp_pool.with_network_policy(decider);
141    }
142    let shared_mcp_pool = Arc::new(tokio::sync::Mutex::new(shared_mcp_pool));
143    crate::mcp_shared::install_shared_mcp_pool(Arc::clone(&shared_mcp_pool));
144
145    let token_fp = token_fingerprint.as_ref().clone();
146    let state = RuntimeApiState::new(
147        config.clone(),
148        workspace,
149        task_manager,
150        runtime_threads,
151        options.cors_origins.clone(),
152        config.mcp_config_path(),
153        automations,
154        runtime_token,
155        process_started_at_ms,
156        token_fingerprint,
157        shared_session_manager,
158        ResumeTaskTracker::new(),
159        shared_mcp_pool,
160    );
161    let app = build_router(state);
162
163    let addr: SocketAddr = format!("{}:{}", options.host, options.port)
164        .parse()
165        .with_context(|| format!("Invalid bind address '{}:{}'", options.host, options.port))?;
166    let listener = TcpListener::bind(addr)
167        .await
168        .with_context(|| format!("Failed to bind {addr}"))?;
169    // Report the actual bound port so callers that pass `--port 0` (or hit ephemeral fallback)
170    // can discover the real listener; `options.port` may differ from `local_addr().port()`.
171    let bound_addr = listener
172        .local_addr()
173        .with_context(|| "Failed to read bound local_addr from TcpListener")?;
174    let bound_port = bound_addr.port();
175
176    eprintln!(
177        "[deepseek-runtime] bound {bound_addr}, serving (+{:?}) — output also on stderr (see sidecar.log if launched from Zagens)",
178        t0.elapsed()
179    );
180    eprintln!("Runtime API listening on http://{bound_addr}");
181    eprintln!("Security: this server is local-first. Do not expose it to untrusted networks.");
182    if auth_enabled {
183        eprintln!("Runtime API auth: bearer token required for /v1/* routes.");
184    }
185
186    // Signal READY to the supervisor via stdout (line protocol).
187    // Zagens's supervisor waits for this line before considering the sidecar healthy.
188    // `port` MUST be the actually bound port (not the requested one) so the desktop
189    // shell can discover ephemeral ports from `--port 0`.
190    let ready_line = serde_json::json!({
191        "port": bound_port,
192        "pid": std::process::id(),
193        "token_fp": token_fp,
194        "version": env!("CARGO_PKG_VERSION"),
195    });
196    println!("DS_PICK_READY {ready_line}");
197    let _ = std::io::Write::flush(&mut std::io::stdout());
198
199    let started_at = std::time::Instant::now();
200    tokio::spawn(async move {
201        let stdin = BufReader::new(tokio::io::stdin());
202        let mut lines = stdin.lines();
203        while let Ok(Some(line)) = lines.next_line().await {
204            let op: serde_json::Value = match serde_json::from_str(&line) {
205                Ok(v) => v,
206                Err(_) => continue,
207            };
208            match op.get("op").and_then(|v| v.as_str()) {
209                Some("ping") => {
210                    let seq = op.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
211                    let pong = serde_json::json!({
212                        "op": "pong",
213                        "seq": seq,
214                        "pid": std::process::id(),
215                        "uptime_ms": started_at.elapsed().as_millis(),
216                    });
217                    println!("DS_PICK_PONG {pong}");
218                    let _ = std::io::Write::flush(&mut std::io::stdout());
219                }
220                Some("drain") => {
221                    let drain_resp = serde_json::json!({
222                        "op": "drain",
223                        "state": "draining",
224                    });
225                    println!("DS_PICK_DRAIN {drain_resp}");
226                    let _ = std::io::Write::flush(&mut std::io::stdout());
227                    break;
228                }
229                _ => {}
230            }
231        }
232    });
233
234    eprintln!("[deepseek-runtime] axum::serve started, listening on {bound_addr}");
235    let serve_result = axum::serve(listener, app)
236        .await
237        .map_err(|e| anyhow!("Runtime API server error: {e}"));
238    eprintln!(
239        "[deepseek-runtime] axum::serve returned: {:?}",
240        serve_result
241            .as_ref()
242            .map(|_| "ok")
243            .map_err(|e| format!("{e:#}"))
244    );
245    scheduler_cancel.cancel();
246    scheduler_handle.abort();
247    serve_result
248}