1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::Result;
6use axum::extract::State;
7use axum::routing::{get, post};
8use axum::{Json, Router};
9use codewhale_agent::ModelRegistry;
10use codewhale_config::{CliRuntimeOverrides, ConfigStore};
11use codewhale_core::Runtime;
12use codewhale_execpolicy::ExecPolicyEngine;
13use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink};
14use codewhale_mcp::McpManager;
15use codewhale_protocol::{
16 AppRequest, AppResponse, PromptRequest, PromptResponse, ThreadRequest, ThreadResponse,
17};
18use codewhale_state::StateStore;
19use codewhale_tools::{ToolCall, ToolRegistry};
20use serde::de::DeserializeOwned;
21use serde::{Deserialize, Serialize};
22use serde_json::{Value, json};
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
24use tokio::sync::{Mutex, RwLock};
25use tower_http::cors::CorsLayer;
26
27#[derive(Debug, Clone)]
28pub struct AppServerOptions {
29 pub listen: SocketAddr,
30 pub config_path: Option<PathBuf>,
31}
32
33#[derive(Clone)]
34struct AppState {
35 config_path: Option<PathBuf>,
36 config: Arc<RwLock<codewhale_config::ConfigToml>>,
37 runtime: Arc<Mutex<Runtime>>,
38 registry: ModelRegistry,
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
42struct ToolCallRequest {
43 call: ToolCall,
44 #[serde(default)]
45 cwd: Option<PathBuf>,
46}
47
48#[derive(Debug, Deserialize)]
49struct JsonRpcRequest {
50 #[serde(default)]
51 jsonrpc: Option<String>,
52 #[serde(default)]
53 id: Option<Value>,
54 method: String,
55 #[serde(default)]
56 params: Value,
57}
58
59#[derive(Debug)]
60struct JsonRpcError {
61 code: i64,
62 message: String,
63 data: Option<Value>,
64}
65
66#[derive(Debug)]
67struct StdioDispatchResult {
68 result: Value,
69 should_exit: bool,
70}
71
72#[derive(Debug, Deserialize)]
73struct ConfigGetParams {
74 key: String,
75}
76
77#[derive(Debug, Deserialize)]
78struct ConfigSetParams {
79 key: String,
80 value: String,
81}
82
83#[derive(Debug, Deserialize)]
84struct ThreadIdParams {
85 thread_id: String,
86}
87
88#[derive(Debug, Deserialize)]
89struct ThreadMessageParams {
90 thread_id: String,
91 input: String,
92}
93
94pub async fn run(options: AppServerOptions) -> Result<()> {
95 let state = build_state(options.config_path.clone())?;
96
97 let app = Router::new()
98 .route("/healthz", get(healthz))
99 .route("/thread", post(thread_handler))
100 .route("/app", post(app_handler))
101 .route("/prompt", post(prompt_handler))
102 .route("/tool", post(tool_handler))
103 .route("/jobs", get(jobs_handler))
104 .route("/mcp/startup", post(mcp_startup_handler))
105 .layer(CorsLayer::permissive())
106 .with_state(state);
107
108 let listener = tokio::net::TcpListener::bind(options.listen).await?;
109 axum::serve(listener, app).await?;
110 Ok(())
111}
112
113pub async fn run_stdio(config_path: Option<PathBuf>) -> Result<()> {
114 let state = build_state(config_path)?;
115 let stdin = tokio::io::stdin();
116 let stdout = tokio::io::stdout();
117 let mut reader = BufReader::new(stdin).lines();
118 let mut writer = tokio::io::BufWriter::new(stdout);
119 while let Some(line) = reader.next_line().await? {
120 if line.trim().is_empty() {
121 continue;
122 }
123
124 let request: JsonRpcRequest = match serde_json::from_str(&line) {
125 Ok(value) => value,
126 Err(err) => {
127 let response = jsonrpc_error(
128 None,
129 JsonRpcError::parse_error(format!("invalid json: {err}")),
130 );
131 writer.write_all(response.to_string().as_bytes()).await?;
132 writer.write_all(b"\n").await?;
133 writer.flush().await?;
134 continue;
135 }
136 };
137
138 if request
139 .jsonrpc
140 .as_deref()
141 .is_some_and(|version| version != "2.0")
142 {
143 let response = jsonrpc_error(
144 request.id,
145 JsonRpcError::invalid_request("jsonrpc version must be 2.0"),
146 );
147 writer.write_all(response.to_string().as_bytes()).await?;
148 writer.write_all(b"\n").await?;
149 writer.flush().await?;
150 continue;
151 }
152
153 let response = match dispatch_stdio_request(&state, &request.method, request.params).await {
154 Ok(dispatch) => {
155 let encoded = jsonrpc_result(request.id, dispatch.result);
156 writer.write_all(encoded.to_string().as_bytes()).await?;
157 writer.write_all(b"\n").await?;
158 writer.flush().await?;
159 if dispatch.should_exit {
160 break;
161 }
162 continue;
163 }
164 Err(err) => jsonrpc_error(request.id, err),
165 };
166
167 writer.write_all(response.to_string().as_bytes()).await?;
168 writer.write_all(b"\n").await?;
169 writer.flush().await?;
170 }
171
172 Ok(())
173}
174
175async fn healthz() -> Json<Value> {
176 Json(json!({
177 "status": "ok",
178 "protocol": "v2",
179 "service": "deepseek-app-server"
180 }))
181}
182
183async fn thread_handler(
184 State(state): State<AppState>,
185 Json(req): Json<ThreadRequest>,
186) -> Json<ThreadResponse> {
187 let mut runtime = state.runtime.lock().await;
188 match runtime.handle_thread(req).await {
189 Ok(res) => Json(res),
190 Err(err) => Json(ThreadResponse {
191 thread_id: "error".to_string(),
192 status: format!("error:{err}"),
193 thread: None,
194 threads: Vec::new(),
195 model: None,
196 model_provider: None,
197 cwd: None,
198 approval_policy: None,
199 sandbox: None,
200 events: Vec::new(),
201 data: json!({}),
202 }),
203 }
204}
205
206async fn prompt_handler(
207 State(state): State<AppState>,
208 Json(req): Json<PromptRequest>,
209) -> Json<PromptResponse> {
210 let mut runtime = state.runtime.lock().await;
211 let overrides = CliRuntimeOverrides::default();
212 match runtime.handle_prompt(req, &overrides).await {
213 Ok(res) => Json(res),
214 Err(err) => Json(PromptResponse {
215 output: err.to_string(),
216 model: "unknown".to_string(),
217 events: Vec::new(),
218 }),
219 }
220}
221
222async fn tool_handler(
223 State(state): State<AppState>,
224 Json(req): Json<ToolCallRequest>,
225) -> Json<Value> {
226 let runtime = state.runtime.lock().await;
227 let cwd = req
228 .cwd
229 .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
230 match runtime
231 .invoke_tool(
232 req.call,
233 codewhale_execpolicy::AskForApproval::OnRequest,
234 &cwd,
235 )
236 .await
237 {
238 Ok(value) => Json(value),
239 Err(err) => Json(json!({ "ok": false, "error": err.to_string() })),
240 }
241}
242
243async fn jobs_handler(State(state): State<AppState>) -> Json<AppResponse> {
244 let runtime = state.runtime.lock().await;
245 Json(runtime.app_status())
246}
247
248async fn mcp_startup_handler(State(state): State<AppState>) -> Json<Value> {
249 let runtime = state.runtime.lock().await;
250 let summary = runtime.mcp_startup().await;
251 Json(json!({
252 "ok": true,
253 "summary": summary
254 }))
255}
256
257async fn app_handler(
258 State(state): State<AppState>,
259 Json(req): Json<AppRequest>,
260) -> Json<AppResponse> {
261 Json(process_app_request(&state, req).await)
262}
263
264fn build_state(config_path: Option<PathBuf>) -> Result<AppState> {
265 let store = ConfigStore::load(config_path.clone())?;
266 let config = store.config.clone();
267 let registry = ModelRegistry::default();
268
269 let state_db_path = config_path
270 .as_ref()
271 .and_then(|p| p.parent().map(|parent| parent.join("state.db")));
272 let state_store = StateStore::open(state_db_path)?;
273
274 let mut hooks = HookDispatcher::default();
275 hooks.add_sink(Arc::new(StdoutHookSink));
276 let hook_log_path = config_path
277 .as_ref()
278 .and_then(|p| p.parent().map(|parent| parent.join("events.jsonl")))
279 .unwrap_or_else(|| PathBuf::from(".deepseek/events.jsonl"));
280 hooks.add_sink(Arc::new(JsonlHookSink::new(hook_log_path)));
281
282 let runtime = Runtime::new(
283 config.clone(),
284 registry.clone(),
285 state_store,
286 Arc::new(ToolRegistry::default()),
287 Arc::new(McpManager::default()),
288 ExecPolicyEngine::new(Vec::new(), Vec::new()),
289 hooks,
290 );
291
292 Ok(AppState {
293 config_path,
294 config: Arc::new(RwLock::new(config)),
295 runtime: Arc::new(Mutex::new(runtime)),
296 registry,
297 })
298}
299
300fn params_or_object(params: Value) -> Value {
301 if params.is_null() { json!({}) } else { params }
302}
303
304fn parse_params<T: DeserializeOwned>(params: Value) -> std::result::Result<T, JsonRpcError> {
305 serde_json::from_value(params).map_err(|err| JsonRpcError::invalid_params(err.to_string()))
306}
307
308fn jsonrpc_result(id: Option<Value>, result: Value) -> Value {
309 json!({
310 "jsonrpc": "2.0",
311 "id": id.unwrap_or(Value::Null),
312 "result": result
313 })
314}
315
316fn jsonrpc_error(id: Option<Value>, err: JsonRpcError) -> Value {
317 json!({
318 "jsonrpc": "2.0",
319 "id": id.unwrap_or(Value::Null),
320 "error": {
321 "code": err.code,
322 "message": err.message,
323 "data": err.data
324 }
325 })
326}
327
328impl JsonRpcError {
329 fn parse_error(message: impl Into<String>) -> Self {
330 Self {
331 code: -32700,
332 message: message.into(),
333 data: None,
334 }
335 }
336
337 fn invalid_request(message: impl Into<String>) -> Self {
338 Self {
339 code: -32600,
340 message: message.into(),
341 data: None,
342 }
343 }
344
345 fn method_not_found(method: &str) -> Self {
346 Self {
347 code: -32601,
348 message: format!("unsupported method: {method}"),
349 data: None,
350 }
351 }
352
353 fn invalid_params(message: impl Into<String>) -> Self {
354 Self {
355 code: -32602,
356 message: message.into(),
357 data: None,
358 }
359 }
360
361 fn internal(message: impl Into<String>) -> Self {
362 Self {
363 code: -32603,
364 message: message.into(),
365 data: None,
366 }
367 }
368}
369
370async fn handle_thread_request(
371 state: &AppState,
372 req: ThreadRequest,
373) -> std::result::Result<ThreadResponse, JsonRpcError> {
374 let mut runtime = state.runtime.lock().await;
375 runtime
376 .handle_thread(req)
377 .await
378 .map_err(|err| JsonRpcError::internal(err.to_string()))
379}
380
381async fn handle_prompt_request(
382 state: &AppState,
383 req: PromptRequest,
384) -> std::result::Result<PromptResponse, JsonRpcError> {
385 let mut runtime = state.runtime.lock().await;
386 runtime
387 .handle_prompt(req, &CliRuntimeOverrides::default())
388 .await
389 .map_err(|err| JsonRpcError::internal(err.to_string()))
390}
391
392async fn dispatch_stdio_request(
393 state: &AppState,
394 method: &str,
395 params: Value,
396) -> std::result::Result<StdioDispatchResult, JsonRpcError> {
397 let outcome = match method {
398 "healthz" | "app/healthz" => StdioDispatchResult {
399 result: json!({
400 "status": "ok",
401 "service": "deepseek-app-server",
402 "transport": "stdio"
403 }),
404 should_exit: false,
405 },
406 "capabilities" => StdioDispatchResult {
407 result: json!({
408 "transport": "stdio",
409 "families": ["thread/*", "app/*", "prompt/*"],
410 "methods": [
411 "healthz",
412 "thread/capabilities",
413 "thread/request",
414 "thread/create",
415 "thread/start",
416 "thread/resume",
417 "thread/fork",
418 "thread/list",
419 "thread/read",
420 "thread/set_name",
421 "thread/archive",
422 "thread/unarchive",
423 "thread/message",
424 "app/capabilities",
425 "app/request",
426 "app/config/get",
427 "app/config/set",
428 "app/config/unset",
429 "app/config/list",
430 "app/models",
431 "app/thread_loaded_list",
432 "prompt/capabilities",
433 "prompt/request",
434 "prompt/run",
435 "shutdown"
436 ]
437 }),
438 should_exit: false,
439 },
440 "thread/capabilities" => StdioDispatchResult {
441 result: json!({
442 "methods": [
443 "thread/request",
444 "thread/create",
445 "thread/start",
446 "thread/resume",
447 "thread/fork",
448 "thread/list",
449 "thread/read",
450 "thread/set_name",
451 "thread/archive",
452 "thread/unarchive",
453 "thread/message"
454 ]
455 }),
456 should_exit: false,
457 },
458 "thread/request" => {
459 let request: ThreadRequest = parse_params(params)?;
460 let response = handle_thread_request(state, request).await?;
461 StdioDispatchResult {
462 result: serde_json::to_value(response)
463 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
464 should_exit: false,
465 }
466 }
467 "thread/create" => {
468 #[derive(Debug, Deserialize)]
469 struct CreateParams {
470 #[serde(default)]
471 metadata: Value,
472 }
473 let parsed: CreateParams = parse_params(params_or_object(params))?;
474 let response = handle_thread_request(
475 state,
476 ThreadRequest::Create {
477 metadata: parsed.metadata,
478 },
479 )
480 .await?;
481 StdioDispatchResult {
482 result: serde_json::to_value(response)
483 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
484 should_exit: false,
485 }
486 }
487 "thread/start" => {
488 let request = ThreadRequest::Start(parse_params(params_or_object(params))?);
489 let response = handle_thread_request(state, request).await?;
490 StdioDispatchResult {
491 result: serde_json::to_value(response)
492 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
493 should_exit: false,
494 }
495 }
496 "thread/resume" => {
497 let request = ThreadRequest::Resume(parse_params(params_or_object(params))?);
498 let response = handle_thread_request(state, request).await?;
499 StdioDispatchResult {
500 result: serde_json::to_value(response)
501 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
502 should_exit: false,
503 }
504 }
505 "thread/fork" => {
506 let request = ThreadRequest::Fork(parse_params(params_or_object(params))?);
507 let response = handle_thread_request(state, request).await?;
508 StdioDispatchResult {
509 result: serde_json::to_value(response)
510 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
511 should_exit: false,
512 }
513 }
514 "thread/list" => {
515 let request = ThreadRequest::List(parse_params(params_or_object(params))?);
516 let response = handle_thread_request(state, request).await?;
517 StdioDispatchResult {
518 result: serde_json::to_value(response)
519 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
520 should_exit: false,
521 }
522 }
523 "thread/read" => {
524 let request = ThreadRequest::Read(parse_params(params_or_object(params))?);
525 let response = handle_thread_request(state, request).await?;
526 StdioDispatchResult {
527 result: serde_json::to_value(response)
528 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
529 should_exit: false,
530 }
531 }
532 "thread/set_name" | "thread/set-name" => {
533 let request = ThreadRequest::SetName(parse_params(params_or_object(params))?);
534 let response = handle_thread_request(state, request).await?;
535 StdioDispatchResult {
536 result: serde_json::to_value(response)
537 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
538 should_exit: false,
539 }
540 }
541 "thread/archive" => {
542 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
543 let response = handle_thread_request(
544 state,
545 ThreadRequest::Archive {
546 thread_id: parsed.thread_id,
547 },
548 )
549 .await?;
550 StdioDispatchResult {
551 result: serde_json::to_value(response)
552 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
553 should_exit: false,
554 }
555 }
556 "thread/unarchive" => {
557 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
558 let response = handle_thread_request(
559 state,
560 ThreadRequest::Unarchive {
561 thread_id: parsed.thread_id,
562 },
563 )
564 .await?;
565 StdioDispatchResult {
566 result: serde_json::to_value(response)
567 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
568 should_exit: false,
569 }
570 }
571 "thread/message" => {
572 let parsed: ThreadMessageParams = parse_params(params_or_object(params))?;
573 let response = handle_thread_request(
574 state,
575 ThreadRequest::Message {
576 thread_id: parsed.thread_id,
577 input: parsed.input,
578 },
579 )
580 .await?;
581 StdioDispatchResult {
582 result: serde_json::to_value(response)
583 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
584 should_exit: false,
585 }
586 }
587 "app/capabilities" => {
588 let response = process_app_request(state, AppRequest::Capabilities).await;
589 StdioDispatchResult {
590 result: serde_json::to_value(response)
591 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
592 should_exit: false,
593 }
594 }
595 "app/request" => {
596 let request: AppRequest = parse_params(params)?;
597 let response = process_app_request(state, request).await;
598 StdioDispatchResult {
599 result: serde_json::to_value(response)
600 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
601 should_exit: false,
602 }
603 }
604 "app/config/get" => {
605 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
606 let response =
607 process_app_request(state, AppRequest::ConfigGet { key: parsed.key }).await;
608 StdioDispatchResult {
609 result: serde_json::to_value(response)
610 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
611 should_exit: false,
612 }
613 }
614 "app/config/set" => {
615 let parsed: ConfigSetParams = parse_params(params_or_object(params))?;
616 let response = process_app_request(
617 state,
618 AppRequest::ConfigSet {
619 key: parsed.key,
620 value: parsed.value,
621 },
622 )
623 .await;
624 StdioDispatchResult {
625 result: serde_json::to_value(response)
626 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
627 should_exit: false,
628 }
629 }
630 "app/config/unset" => {
631 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
632 let response =
633 process_app_request(state, AppRequest::ConfigUnset { key: parsed.key }).await;
634 StdioDispatchResult {
635 result: serde_json::to_value(response)
636 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
637 should_exit: false,
638 }
639 }
640 "app/config/list" => {
641 let response = process_app_request(state, AppRequest::ConfigList).await;
642 StdioDispatchResult {
643 result: serde_json::to_value(response)
644 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
645 should_exit: false,
646 }
647 }
648 "app/models" => {
649 let response = process_app_request(state, AppRequest::Models).await;
650 StdioDispatchResult {
651 result: serde_json::to_value(response)
652 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
653 should_exit: false,
654 }
655 }
656 "app/thread_loaded_list" | "app/thread-loaded-list" => {
657 let response = process_app_request(state, AppRequest::ThreadLoadedList).await;
658 StdioDispatchResult {
659 result: serde_json::to_value(response)
660 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
661 should_exit: false,
662 }
663 }
664 "prompt/capabilities" => StdioDispatchResult {
665 result: json!({
666 "methods": ["prompt/request", "prompt/run"]
667 }),
668 should_exit: false,
669 },
670 "prompt/request" | "prompt/run" => {
671 let request: PromptRequest = parse_params(params)?;
672 let response = handle_prompt_request(state, request).await?;
673 StdioDispatchResult {
674 result: serde_json::to_value(response)
675 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
676 should_exit: false,
677 }
678 }
679 "shutdown" => StdioDispatchResult {
680 result: json!({"ok": true, "status": "stopped"}),
681 should_exit: true,
682 },
683 _ => return Err(JsonRpcError::method_not_found(method)),
684 };
685 Ok(outcome)
686}
687
688async fn process_app_request(state: &AppState, req: AppRequest) -> AppResponse {
689 match req {
690 AppRequest::Capabilities => AppResponse {
691 ok: true,
692 data: json!({
693 "routes": ["/thread", "/app", "/prompt", "/tool", "/jobs", "/mcp/startup"],
694 "config": ["get", "set", "unset", "list"],
695 "events": ["response_start", "response_delta", "response_end", "tool_call_start", "tool_call_result", "mcp_startup_update", "mcp_startup_complete"],
696 "transport": "stdio+http",
697 "config_path": state.config_path.as_ref().map(|p| p.display().to_string()),
698 }),
699 events: Vec::new(),
700 },
701 AppRequest::ConfigGet { key } => {
702 let cfg = state.config.read().await;
703 AppResponse {
704 ok: true,
705 data: json!({ "key": key, "value": cfg.get_value(&key) }),
706 events: Vec::new(),
707 }
708 }
709 AppRequest::ConfigSet { key, value } => {
710 let mut cfg = state.config.write().await;
711 let result = cfg.set_value(&key, &value);
712 let ok = result.is_ok();
713 let message = result.err().map(|e| e.to_string());
714 let snapshot = cfg.clone();
715 drop(cfg);
716 let _ = persist_config(state, snapshot).await;
717 AppResponse {
718 ok,
719 data: json!({ "key": key, "value": value, "error": message }),
720 events: Vec::new(),
721 }
722 }
723 AppRequest::ConfigUnset { key } => {
724 let mut cfg = state.config.write().await;
725 let result = cfg.unset_value(&key);
726 let ok = result.is_ok();
727 let message = result.err().map(|e| e.to_string());
728 let snapshot = cfg.clone();
729 drop(cfg);
730 let _ = persist_config(state, snapshot).await;
731 AppResponse {
732 ok,
733 data: json!({ "key": key, "error": message }),
734 events: Vec::new(),
735 }
736 }
737 AppRequest::ConfigList => {
738 let cfg = state.config.read().await;
739 AppResponse {
740 ok: true,
741 data: json!({ "values": cfg.list_values() }),
742 events: Vec::new(),
743 }
744 }
745 AppRequest::Models => AppResponse {
746 ok: true,
747 data: json!({ "models": state.registry.list() }),
748 events: Vec::new(),
749 },
750 AppRequest::ThreadLoadedList => {
751 let mut runtime = state.runtime.lock().await;
752 let response = runtime
753 .handle_thread(codewhale_protocol::ThreadRequest::List(
754 codewhale_protocol::ThreadListParams {
755 include_archived: false,
756 limit: Some(50),
757 },
758 ))
759 .await;
760 match response {
761 Ok(thread_resp) => AppResponse {
762 ok: true,
763 data: json!({ "threads": thread_resp.threads }),
764 events: thread_resp.events,
765 },
766 Err(err) => AppResponse {
767 ok: false,
768 data: json!({ "error": err.to_string() }),
769 events: Vec::new(),
770 },
771 }
772 }
773 }
774}
775
776async fn persist_config(state: &AppState, config: codewhale_config::ConfigToml) -> Result<()> {
777 if state.config_path.is_none() {
778 return Ok(());
779 }
780 let mut store = ConfigStore::load(state.config_path.clone())?;
781 store.config = config;
782 store.save()
783}