zagens_runtime/runtime_serve/
http.rs1use std::net::SocketAddr;
4use std::path::PathBuf;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use anyhow::{Context, Result, anyhow};
9use sha2::{Digest, Sha256};
10use tokio::io::AsyncBufReadExt;
11use tokio::io::BufReader;
12use tokio::net::TcpListener;
13use tokio::sync::Mutex;
14use tokio_util::sync::CancellationToken;
15
16use crate::automation_manager::{AutomationManager, AutomationSchedulerConfig, spawn_scheduler};
17use crate::config::Config;
18use crate::runtime_api::{ResumeTaskTracker, RuntimeApiState, build_router};
19use crate::runtime_threads::{RuntimeThreadManager, RuntimeThreadManagerConfig};
20use crate::session_manager::{SessionManager, default_sessions_dir};
21use crate::task_manager::{TaskManager, TaskManagerConfig};
22
23#[derive(Debug, Clone)]
24pub struct RuntimeApiOptions {
25 pub host: String,
26 pub port: u16,
27 pub workers: usize,
28 pub cors_origins: Vec<String>,
34 pub auth_token: Option<String>,
37}
38
39impl Default for RuntimeApiOptions {
40 fn default() -> Self {
41 Self {
42 host: "127.0.0.1".to_string(),
43 port: 7878,
44 workers: 8,
45 cors_origins: Vec::new(),
46 auth_token: None,
47 }
48 }
49}
50
51pub async fn run_http_server(
60 config: Config,
61 workspace: PathBuf,
62 options: RuntimeApiOptions,
63) -> Result<()> {
64 let t0 = std::time::Instant::now();
65 eprintln!("[deepseek-runtime] starting HTTP API (task manager, threads, scheduler)…");
66
67 let task_cfg = TaskManagerConfig::from_runtime(
68 &config,
69 workspace.clone(),
70 config.default_text_model.clone(),
71 Some(options.workers),
72 );
73 let manager_cfg = RuntimeThreadManagerConfig::from_task_data_dir(task_cfg.data_dir.clone());
74 let sessions_dir = default_sessions_dir()
75 .unwrap_or_else(|_| zagens_config::user_data_path_or_relative("sessions"));
76 let shared_session_manager = Arc::new(
77 SessionManager::new(sessions_dir.clone()).context("Failed to create SessionManager")?,
78 );
79 let sb_config = config.clone();
80 let sb_workspace = workspace.clone();
81 let session_manager_for_threads = shared_session_manager.clone();
82 let runtime_threads = Arc::new(
83 tokio::task::spawn_blocking(move || {
84 RuntimeThreadManager::open_with_session_manager(
85 sb_config,
86 sb_workspace,
87 manager_cfg,
88 Some(session_manager_for_threads),
89 )
90 })
91 .await
92 .map_err(|e| anyhow!("RuntimeThreadManager::open panicked: {e}"))??,
93 );
94 eprintln!(
95 "[deepseek-runtime] RuntimeThreadManager::open ok (+{:?})",
96 t0.elapsed()
97 );
98 let task_manager =
99 TaskManager::start_with_runtime_manager(task_cfg, config.clone(), runtime_threads.clone())
100 .await?;
101 eprintln!(
102 "[deepseek-runtime] TaskManager::start ok (+{:?})",
103 t0.elapsed()
104 );
105 let automations = Arc::new(Mutex::new(AutomationManager::default_location()?));
106 runtime_threads.attach_automation_manager(automations.clone());
107 let scheduler_cancel = CancellationToken::new();
108 let scheduler_handle = spawn_scheduler(
109 automations.clone(),
110 task_manager.clone(),
111 scheduler_cancel.clone(),
112 AutomationSchedulerConfig::default(),
113 );
114
115 let runtime_token = options
116 .auth_token
117 .clone()
118 .or_else(|| std::env::var("DEEPSEEK_RUNTIME_TOKEN").ok())
119 .filter(|token| !token.trim().is_empty());
120 let auth_enabled = runtime_token.is_some();
121
122 let process_started_at_ms = SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .unwrap_or_default()
125 .as_millis();
126 let token_fingerprint = {
127 let mut hasher = Sha256::new();
128 hasher.update(runtime_token.as_deref().unwrap_or(""));
129 let hash = hasher.finalize();
130 let fp: String = hash[..16].iter().map(|b| format!("{b:02x}")).collect();
131 Arc::new(fp)
132 };
133
134 let mut shared_mcp_pool = crate::mcp::McpPool::from_config_path(&config.mcp_config_path())
135 .context("Failed to load MCP config for shared pool")?;
136 if let Some(network_toml) = config.network.clone() {
137 let decider = crate::network_policy::NetworkPolicyDecider::with_default_audit(
138 network_toml.into_runtime(),
139 );
140 shared_mcp_pool = shared_mcp_pool.with_network_policy(decider);
141 }
142 let shared_mcp_pool = Arc::new(tokio::sync::Mutex::new(shared_mcp_pool));
143 crate::mcp_shared::install_shared_mcp_pool(Arc::clone(&shared_mcp_pool));
144
145 let token_fp = token_fingerprint.as_ref().clone();
146 let state = RuntimeApiState::new(
147 config.clone(),
148 workspace,
149 task_manager,
150 runtime_threads,
151 options.cors_origins.clone(),
152 config.mcp_config_path(),
153 automations,
154 runtime_token,
155 process_started_at_ms,
156 token_fingerprint,
157 shared_session_manager,
158 ResumeTaskTracker::new(),
159 shared_mcp_pool,
160 );
161 let app = build_router(state);
162
163 let addr: SocketAddr = format!("{}:{}", options.host, options.port)
164 .parse()
165 .with_context(|| format!("Invalid bind address '{}:{}'", options.host, options.port))?;
166 let listener = TcpListener::bind(addr)
167 .await
168 .with_context(|| format!("Failed to bind {addr}"))?;
169 let bound_addr = listener
172 .local_addr()
173 .with_context(|| "Failed to read bound local_addr from TcpListener")?;
174 let bound_port = bound_addr.port();
175
176 eprintln!(
177 "[deepseek-runtime] bound {bound_addr}, serving (+{:?}) — output also on stderr (see sidecar.log if launched from Zagens)",
178 t0.elapsed()
179 );
180 eprintln!("Runtime API listening on http://{bound_addr}");
181 eprintln!("Security: this server is local-first. Do not expose it to untrusted networks.");
182 if auth_enabled {
183 eprintln!("Runtime API auth: bearer token required for /v1/* routes.");
184 }
185
186 let ready_line = serde_json::json!({
191 "port": bound_port,
192 "pid": std::process::id(),
193 "token_fp": token_fp,
194 "version": env!("CARGO_PKG_VERSION"),
195 });
196 println!("DS_PICK_READY {ready_line}");
197 let _ = std::io::Write::flush(&mut std::io::stdout());
198
199 let started_at = std::time::Instant::now();
200 tokio::spawn(async move {
201 let stdin = BufReader::new(tokio::io::stdin());
202 let mut lines = stdin.lines();
203 while let Ok(Some(line)) = lines.next_line().await {
204 let op: serde_json::Value = match serde_json::from_str(&line) {
205 Ok(v) => v,
206 Err(_) => continue,
207 };
208 match op.get("op").and_then(|v| v.as_str()) {
209 Some("ping") => {
210 let seq = op.get("seq").and_then(|v| v.as_u64()).unwrap_or(0);
211 let pong = serde_json::json!({
212 "op": "pong",
213 "seq": seq,
214 "pid": std::process::id(),
215 "uptime_ms": started_at.elapsed().as_millis(),
216 });
217 println!("DS_PICK_PONG {pong}");
218 let _ = std::io::Write::flush(&mut std::io::stdout());
219 }
220 Some("drain") => {
221 let drain_resp = serde_json::json!({
222 "op": "drain",
223 "state": "draining",
224 });
225 println!("DS_PICK_DRAIN {drain_resp}");
226 let _ = std::io::Write::flush(&mut std::io::stdout());
227 break;
228 }
229 _ => {}
230 }
231 }
232 });
233
234 eprintln!("[deepseek-runtime] axum::serve started, listening on {bound_addr}");
235 let serve_result = axum::serve(listener, app)
236 .await
237 .map_err(|e| anyhow!("Runtime API server error: {e}"));
238 eprintln!(
239 "[deepseek-runtime] axum::serve returned: {:?}",
240 serve_result
241 .as_ref()
242 .map(|_| "ok")
243 .map_err(|e| format!("{e:#}"))
244 );
245 scheduler_cancel.cancel();
246 scheduler_handle.abort();
247 serve_result
248}