1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Result, bail};
6use axum::extract::{Request, State};
7use axum::http::{HeaderValue, Method, StatusCode, header};
8use axum::middleware::{self, Next};
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use codewhale_agent::ModelRegistry;
13use codewhale_config::{CliRuntimeOverrides, ConfigStore};
14use codewhale_core::Runtime;
15use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink, UnixSocketHookSink};
16use codewhale_mcp::McpManager;
17use codewhale_protocol::{
18 AppRequest, AppResponse, PromptRequest, PromptResponse, ThreadGoalClearParams,
19 ThreadGoalGetParams, ThreadGoalSetParams, ThreadRequest, ThreadResponse, UserInputAnswerEvent,
20};
21use codewhale_state::StateStore;
22use codewhale_tools::{ToolCall, ToolRegistry};
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use serde_json::{Value, json};
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::{Mutex, RwLock};
28use tower_http::cors::CorsLayer;
29use uuid::Uuid;
30
31type PendingUserInputAnswers = Vec<UserInputAnswerEvent>;
40
41mod chat_completions;
42
43const DEFAULT_CORS_ORIGINS: &[&str] = &[
44 "http://localhost",
45 "http://localhost:1420",
46 "http://localhost:3000",
47 "http://localhost:5173",
48 "http://127.0.0.1",
49 "http://127.0.0.1:1420",
50 "tauri://localhost",
51];
52
53#[derive(Clone)]
54pub struct AppServerOptions {
55 pub listen: SocketAddr,
56 pub config_path: Option<PathBuf>,
57 pub auth_token: Option<String>,
58 pub insecure_no_auth: bool,
59 pub cors_origins: Vec<String>,
60}
61
62impl std::fmt::Debug for AppServerOptions {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("AppServerOptions")
65 .field("listen", &self.listen)
66 .field("config_path", &self.config_path)
67 .field(
68 "auth_token",
69 &self.auth_token.as_ref().map(|_| "<redacted>"),
70 )
71 .field("insecure_no_auth", &self.insecure_no_auth)
72 .field("cors_origins", &self.cors_origins)
73 .finish()
74 }
75}
76
77#[derive(Clone)]
78struct AppState {
79 config_path: Option<PathBuf>,
80 config: Arc<RwLock<codewhale_config::ConfigToml>>,
81 runtime: Arc<Mutex<Runtime>>,
82 registry: ModelRegistry,
83 auth_token: Option<String>,
84 pending_user_input: Arc<Mutex<std::collections::HashMap<String, PendingUserInputAnswers>>>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91struct ToolCallRequest {
92 call: ToolCall,
93 #[serde(default)]
94 cwd: Option<PathBuf>,
95}
96
97#[derive(Debug, Deserialize)]
98struct JsonRpcRequest {
99 #[serde(default)]
100 jsonrpc: Option<String>,
101 #[serde(default)]
102 id: Option<Value>,
103 method: String,
104 #[serde(default)]
105 params: Value,
106}
107
108#[derive(Debug)]
109struct JsonRpcError {
110 code: i64,
111 message: String,
112 data: Option<Value>,
113}
114
115#[derive(Debug)]
116struct StdioDispatchResult {
117 result: Value,
118 should_exit: bool,
119}
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq)]
122enum AppTransport {
123 Http,
124 Stdio,
125}
126
127#[derive(Debug, Deserialize)]
128struct ConfigGetParams {
129 key: String,
130}
131
132#[derive(Debug, Deserialize)]
133struct ConfigSetParams {
134 key: String,
135 value: String,
136}
137
138#[derive(Debug, Deserialize)]
139struct ThreadIdParams {
140 thread_id: String,
141}
142
143#[derive(Debug, Deserialize)]
144struct ThreadMessageParams {
145 thread_id: String,
146 input: String,
147}
148
149pub async fn run(options: AppServerOptions) -> Result<()> {
150 let auth_token = resolve_auth_token(&options)?;
151 let state = build_state(options.config_path.clone(), auth_token)?;
152 let app = app_router(state, &options.cors_origins);
153
154 let listener = tokio::net::TcpListener::bind(options.listen).await?;
155 axum::serve(listener, app).await?;
156 Ok(())
157}
158
159fn app_router(state: AppState, cors_origins: &[String]) -> Router {
160 let protected_routes = Router::new()
161 .route("/thread", post(thread_handler))
162 .route("/app", post(app_handler))
163 .route("/prompt", post(prompt_handler))
164 .route("/tool", post(tool_handler))
165 .route("/jobs", get(jobs_handler))
166 .route("/mcp/startup", post(mcp_startup_handler))
167 .route_layer(middleware::from_fn_with_state(
168 state.clone(),
169 require_app_server_token,
170 ));
171
172 Router::new()
173 .route("/healthz", get(healthz))
174 .route(
175 "/v1/chat/completions",
176 post(chat_completions::chat_completions_handler),
177 )
178 .merge(protected_routes)
179 .layer(cors_layer(cors_origins))
180 .with_state(state)
181}
182
183pub async fn run_stdio(config_path: Option<PathBuf>) -> Result<()> {
184 let state = build_state(config_path, None)?;
185 let stdin = tokio::io::stdin();
186 let stdout = tokio::io::stdout();
187 let mut reader = BufReader::new(stdin).lines();
188 let mut writer = tokio::io::BufWriter::new(stdout);
189 while let Some(line) = reader.next_line().await? {
190 if line.trim().is_empty() {
191 continue;
192 }
193
194 let request: JsonRpcRequest = match serde_json::from_str(&line) {
195 Ok(value) => value,
196 Err(err) => {
197 let response = jsonrpc_error(
198 None,
199 JsonRpcError::parse_error(format!("invalid json: {err}")),
200 );
201 writer.write_all(response.to_string().as_bytes()).await?;
202 writer.write_all(b"\n").await?;
203 writer.flush().await?;
204 continue;
205 }
206 };
207
208 if request
209 .jsonrpc
210 .as_deref()
211 .is_some_and(|version| version != "2.0")
212 {
213 let response = jsonrpc_error(
214 request.id,
215 JsonRpcError::invalid_request("jsonrpc version must be 2.0"),
216 );
217 writer.write_all(response.to_string().as_bytes()).await?;
218 writer.write_all(b"\n").await?;
219 writer.flush().await?;
220 continue;
221 }
222
223 let response = match dispatch_stdio_request(&state, &request.method, request.params).await {
224 Ok(dispatch) => {
225 let encoded = jsonrpc_result(request.id, dispatch.result);
226 writer.write_all(encoded.to_string().as_bytes()).await?;
227 writer.write_all(b"\n").await?;
228 writer.flush().await?;
229 if dispatch.should_exit {
230 break;
231 }
232 continue;
233 }
234 Err(err) => jsonrpc_error(request.id, err),
235 };
236
237 writer.write_all(response.to_string().as_bytes()).await?;
238 writer.write_all(b"\n").await?;
239 writer.flush().await?;
240 }
241
242 Ok(())
243}
244
245async fn healthz() -> Json<Value> {
246 Json(json!({
247 "status": "ok",
248 "protocol": "v2",
249 "service": "deepseek-app-server"
250 }))
251}
252
253async fn thread_handler(
254 State(state): State<AppState>,
255 Json(req): Json<ThreadRequest>,
256) -> Json<ThreadResponse> {
257 let mut runtime = state.runtime.lock().await;
258 match runtime.handle_thread(req).await {
259 Ok(res) => Json(res),
260 Err(err) => Json(ThreadResponse {
261 thread_id: "error".to_string(),
262 status: format!("error:{err}"),
263 thread: None,
264 threads: Vec::new(),
265 goal: None,
266 model: None,
267 model_provider: None,
268 cwd: None,
269 approval_policy: None,
270 sandbox: None,
271 events: Vec::new(),
272 data: json!({}),
273 }),
274 }
275}
276
277async fn prompt_handler(
278 State(state): State<AppState>,
279 Json(req): Json<PromptRequest>,
280) -> Json<PromptResponse> {
281 let mut runtime = state.runtime.lock().await;
282 let overrides = CliRuntimeOverrides::default();
283 match runtime.handle_prompt(req, &overrides).await {
284 Ok(res) => Json(res),
285 Err(err) => Json(PromptResponse {
286 output: err.to_string(),
287 model: "unknown".to_string(),
288 events: Vec::new(),
289 }),
290 }
291}
292
293async fn tool_handler(
294 State(state): State<AppState>,
295 Json(req): Json<ToolCallRequest>,
296) -> Json<Value> {
297 let runtime = state.runtime.lock().await;
298 let cwd = req
299 .cwd
300 .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
301 let approval_mode = {
303 let cfg = state.config.read().await;
304 cfg.approval_policy
305 .as_deref()
306 .and_then(|p| match p.trim().to_ascii_lowercase().as_str() {
307 "auto" | "yolo" => Some(codewhale_execpolicy::AskForApproval::UnlessTrusted),
308 "never" | "deny" => Some(codewhale_execpolicy::AskForApproval::Never),
309 _ => None,
310 })
311 .unwrap_or(codewhale_execpolicy::AskForApproval::OnRequest)
312 };
313 match runtime.invoke_tool(req.call, approval_mode, &cwd).await {
314 Ok(value) => Json(value),
315 Err(err) => Json(json!({ "ok": false, "error": err.to_string() })),
316 }
317}
318
319async fn jobs_handler(State(state): State<AppState>) -> Json<AppResponse> {
320 let runtime = state.runtime.lock().await;
321 Json(runtime.app_status())
322}
323
324async fn mcp_startup_handler(State(state): State<AppState>) -> Json<Value> {
325 let runtime = state.runtime.lock().await;
326 let summary = runtime.mcp_startup().await;
327 Json(json!({
328 "ok": true,
329 "summary": summary
330 }))
331}
332
333async fn app_handler(
334 State(state): State<AppState>,
335 Json(req): Json<AppRequest>,
336) -> Json<AppResponse> {
337 Json(process_app_request(&state, req, AppTransport::Http).await)
338}
339
340fn build_state(config_path: Option<PathBuf>, auth_token: Option<String>) -> Result<AppState> {
341 let store = ConfigStore::load(config_path.clone())?;
342 let config = store.config.clone();
343 let exec_policy = store.exec_policy_engine();
344 let registry = ModelRegistry::default();
345
346 let state_db_path = config_path
347 .as_ref()
348 .and_then(|p| p.parent().map(|parent| parent.join("state.db")));
349 let state_store = StateStore::open(state_db_path)?;
350
351 let mut hooks = HookDispatcher::default();
352 hooks.add_sink(Arc::new(StdoutHookSink));
353 let hook_log_path = config_path
354 .as_ref()
355 .and_then(|p| p.parent().map(|parent| parent.join("events.jsonl")))
356 .unwrap_or_else(|| PathBuf::from(".deepseek/events.jsonl"));
357 hooks.add_sink(Arc::new(JsonlHookSink::new(hook_log_path)));
358
359 if let Some(socket_path) = config
360 .hook_sinks
361 .as_ref()
362 .and_then(|sinks| sinks.unix_socket_path.as_ref())
363 .filter(|path| !path.as_os_str().is_empty())
364 {
365 hooks.add_sink(Arc::new(UnixSocketHookSink::new(socket_path.clone())));
366 }
367
368 let runtime = Runtime::new(
369 config.clone(),
370 registry.clone(),
371 state_store,
372 Arc::new(ToolRegistry::default()),
373 Arc::new(McpManager::default()),
374 exec_policy,
375 hooks,
376 );
377
378 Ok(AppState {
379 config_path,
380 config: Arc::new(RwLock::new(config)),
381 runtime: Arc::new(Mutex::new(runtime)),
382 registry,
383 auth_token,
384 pending_user_input: Arc::new(Mutex::new(std::collections::HashMap::new())),
385 })
386}
387
388fn resolve_auth_token(options: &AppServerOptions) -> Result<Option<String>> {
389 let configured = options.auth_token.as_ref().map(|token| token.trim());
390 if let Some(token) = configured
391 && token.is_empty()
392 {
393 bail!("app-server auth token cannot be empty");
394 }
395
396 if options.insecure_no_auth {
397 if !options.listen.ip().is_loopback() {
398 bail!("refusing unauthenticated app-server bind on non-loopback address");
399 }
400 eprintln!("warning: app-server HTTP auth disabled by --insecure-no-auth");
401 return Ok(None);
402 }
403
404 let token = configured
405 .map(str::to_string)
406 .unwrap_or_else(|| format!("cwapp_{}", Uuid::new_v4().simple()));
407 if options.auth_token.is_some() {
408 eprintln!("app-server auth: bearer token required for HTTP routes.");
409 } else {
410 eprintln!("app-server auth: generated bearer token for this process.");
411 eprintln!(" Authorization: Bearer {token}");
412 eprintln!(" Pass --auth-token or set CODEWHALE_APP_SERVER_TOKEN for a stable token.");
413 }
414 Ok(Some(token))
415}
416
417fn cors_layer(extra_origins: &[String]) -> CorsLayer {
418 let mut origins: Vec<HeaderValue> = DEFAULT_CORS_ORIGINS
419 .iter()
420 .filter_map(|origin| HeaderValue::from_str(origin).ok())
421 .collect();
422 for raw in extra_origins {
423 let trimmed = raw.trim();
424 if trimmed.is_empty() {
425 continue;
426 }
427 match HeaderValue::from_str(trimmed) {
428 Ok(value) if !origins.contains(&value) => origins.push(value),
429 Ok(_) => {}
430 Err(err) => {
431 eprintln!("warning: ignoring invalid app-server CORS origin `{trimmed}`: {err}")
432 }
433 }
434 }
435
436 CorsLayer::new()
437 .allow_origin(origins)
438 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
439 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE])
440}
441
442async fn require_app_server_token(
443 State(state): State<AppState>,
444 req: Request,
445 next: Next,
446) -> Response {
447 let Some(expected) = state.auth_token.as_deref() else {
448 return next.run(req).await;
449 };
450 let authorized = req
451 .headers()
452 .get(header::AUTHORIZATION)
453 .and_then(|value| value.to_str().ok())
454 .and_then(|raw| raw.strip_prefix("Bearer "))
455 .is_some_and(|token| token == expected);
456
457 if authorized {
458 next.run(req).await
459 } else {
460 (
461 StatusCode::UNAUTHORIZED,
462 Json(json!({
463 "error": {
464 "message": "app-server bearer token required",
465 "status": StatusCode::UNAUTHORIZED.as_u16(),
466 }
467 })),
468 )
469 .into_response()
470 }
471}
472
473fn params_or_object(params: Value) -> Value {
474 if params.is_null() { json!({}) } else { params }
475}
476
477fn parse_params<T: DeserializeOwned>(params: Value) -> std::result::Result<T, JsonRpcError> {
478 serde_json::from_value(params).map_err(|err| JsonRpcError::invalid_params(err.to_string()))
479}
480
481fn jsonrpc_result(id: Option<Value>, result: Value) -> Value {
482 json!({
483 "jsonrpc": "2.0",
484 "id": id.unwrap_or(Value::Null),
485 "result": result
486 })
487}
488
489fn jsonrpc_error(id: Option<Value>, err: JsonRpcError) -> Value {
490 json!({
491 "jsonrpc": "2.0",
492 "id": id.unwrap_or(Value::Null),
493 "error": {
494 "code": err.code,
495 "message": err.message,
496 "data": err.data
497 }
498 })
499}
500
501impl JsonRpcError {
502 fn parse_error(message: impl Into<String>) -> Self {
503 Self {
504 code: -32700,
505 message: message.into(),
506 data: None,
507 }
508 }
509
510 fn invalid_request(message: impl Into<String>) -> Self {
511 Self {
512 code: -32600,
513 message: message.into(),
514 data: None,
515 }
516 }
517
518 fn method_not_found(method: &str) -> Self {
519 Self {
520 code: -32601,
521 message: format!("unsupported method: {method}"),
522 data: None,
523 }
524 }
525
526 fn invalid_params(message: impl Into<String>) -> Self {
527 Self {
528 code: -32602,
529 message: message.into(),
530 data: None,
531 }
532 }
533
534 fn internal(message: impl Into<String>) -> Self {
535 Self {
536 code: -32603,
537 message: message.into(),
538 data: None,
539 }
540 }
541}
542
543async fn handle_thread_request(
544 state: &AppState,
545 req: ThreadRequest,
546) -> std::result::Result<ThreadResponse, JsonRpcError> {
547 let mut runtime = state.runtime.lock().await;
548 runtime
549 .handle_thread(req)
550 .await
551 .map_err(|err| JsonRpcError::internal(err.to_string()))
552}
553
554async fn handle_prompt_request(
555 state: &AppState,
556 req: PromptRequest,
557) -> std::result::Result<PromptResponse, JsonRpcError> {
558 let mut runtime = state.runtime.lock().await;
559 runtime
560 .handle_prompt(req, &CliRuntimeOverrides::default())
561 .await
562 .map_err(|err| JsonRpcError::internal(err.to_string()))
563}
564
565async fn dispatch_stdio_request(
566 state: &AppState,
567 method: &str,
568 params: Value,
569) -> std::result::Result<StdioDispatchResult, JsonRpcError> {
570 let outcome = match method {
571 "healthz" | "app/healthz" => StdioDispatchResult {
572 result: json!({
573 "status": "ok",
574 "service": "deepseek-app-server",
575 "transport": "stdio"
576 }),
577 should_exit: false,
578 },
579 "capabilities" => StdioDispatchResult {
580 result: json!({
581 "transport": "stdio",
582 "families": ["thread/*", "app/*", "prompt/*"],
583 "methods": [
584 "healthz",
585 "thread/capabilities",
586 "thread/request",
587 "thread/create",
588 "thread/start",
589 "thread/resume",
590 "thread/fork",
591 "thread/list",
592 "thread/read",
593 "thread/set_name",
594 "thread/goal/set",
595 "thread/goal/get",
596 "thread/goal/clear",
597 "thread/archive",
598 "thread/unarchive",
599 "thread/message",
600 "app/capabilities",
601 "app/request",
602 "app/config/get",
603 "app/config/set",
604 "app/config/unset",
605 "app/config/list",
606 "app/models",
607 "app/thread_loaded_list",
608 "prompt/capabilities",
609 "prompt/request",
610 "prompt/run",
611 "shutdown"
612 ]
613 }),
614 should_exit: false,
615 },
616 "thread/capabilities" => StdioDispatchResult {
617 result: json!({
618 "methods": [
619 "thread/request",
620 "thread/create",
621 "thread/start",
622 "thread/resume",
623 "thread/fork",
624 "thread/list",
625 "thread/read",
626 "thread/set_name",
627 "thread/goal/set",
628 "thread/goal/get",
629 "thread/goal/clear",
630 "thread/archive",
631 "thread/unarchive",
632 "thread/message"
633 ]
634 }),
635 should_exit: false,
636 },
637 "thread/request" => {
638 let request: ThreadRequest = parse_params(params)?;
639 let response = handle_thread_request(state, request).await?;
640 StdioDispatchResult {
641 result: serde_json::to_value(response)
642 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
643 should_exit: false,
644 }
645 }
646 "thread/create" => {
647 #[derive(Debug, Deserialize)]
648 struct CreateParams {
649 #[serde(default)]
650 metadata: Value,
651 }
652 let parsed: CreateParams = parse_params(params_or_object(params))?;
653 let response = handle_thread_request(
654 state,
655 ThreadRequest::Create {
656 metadata: parsed.metadata,
657 },
658 )
659 .await?;
660 StdioDispatchResult {
661 result: serde_json::to_value(response)
662 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
663 should_exit: false,
664 }
665 }
666 "thread/start" => {
667 let request = ThreadRequest::Start(parse_params(params_or_object(params))?);
668 let response = handle_thread_request(state, request).await?;
669 StdioDispatchResult {
670 result: serde_json::to_value(response)
671 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
672 should_exit: false,
673 }
674 }
675 "thread/resume" => {
676 let request = ThreadRequest::Resume(parse_params(params_or_object(params))?);
677 let response = handle_thread_request(state, request).await?;
678 StdioDispatchResult {
679 result: serde_json::to_value(response)
680 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
681 should_exit: false,
682 }
683 }
684 "thread/fork" => {
685 let request = ThreadRequest::Fork(parse_params(params_or_object(params))?);
686 let response = handle_thread_request(state, request).await?;
687 StdioDispatchResult {
688 result: serde_json::to_value(response)
689 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
690 should_exit: false,
691 }
692 }
693 "thread/list" => {
694 let request = ThreadRequest::List(parse_params(params_or_object(params))?);
695 let response = handle_thread_request(state, request).await?;
696 StdioDispatchResult {
697 result: serde_json::to_value(response)
698 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
699 should_exit: false,
700 }
701 }
702 "thread/read" => {
703 let request = ThreadRequest::Read(parse_params(params_or_object(params))?);
704 let response = handle_thread_request(state, request).await?;
705 StdioDispatchResult {
706 result: serde_json::to_value(response)
707 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
708 should_exit: false,
709 }
710 }
711 "thread/set_name" | "thread/set-name" => {
712 let request = ThreadRequest::SetName(parse_params(params_or_object(params))?);
713 let response = handle_thread_request(state, request).await?;
714 StdioDispatchResult {
715 result: serde_json::to_value(response)
716 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
717 should_exit: false,
718 }
719 }
720 "thread/goal/set" | "thread/goal_set" | "thread/goal-set" => {
721 let request = ThreadRequest::GoalSet(parse_params::<ThreadGoalSetParams>(
722 params_or_object(params),
723 )?);
724 let response = handle_thread_request(state, request).await?;
725 StdioDispatchResult {
726 result: serde_json::to_value(response)
727 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
728 should_exit: false,
729 }
730 }
731 "thread/goal/get" | "thread/goal_get" | "thread/goal-get" => {
732 let request = ThreadRequest::GoalGet(parse_params::<ThreadGoalGetParams>(
733 params_or_object(params),
734 )?);
735 let response = handle_thread_request(state, request).await?;
736 StdioDispatchResult {
737 result: serde_json::to_value(response)
738 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
739 should_exit: false,
740 }
741 }
742 "thread/goal/clear" | "thread/goal_clear" | "thread/goal-clear" => {
743 let request = ThreadRequest::GoalClear(parse_params::<ThreadGoalClearParams>(
744 params_or_object(params),
745 )?);
746 let response = handle_thread_request(state, request).await?;
747 StdioDispatchResult {
748 result: serde_json::to_value(response)
749 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
750 should_exit: false,
751 }
752 }
753 "thread/archive" => {
754 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
755 let response = handle_thread_request(
756 state,
757 ThreadRequest::Archive {
758 thread_id: parsed.thread_id,
759 },
760 )
761 .await?;
762 StdioDispatchResult {
763 result: serde_json::to_value(response)
764 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
765 should_exit: false,
766 }
767 }
768 "thread/unarchive" => {
769 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
770 let response = handle_thread_request(
771 state,
772 ThreadRequest::Unarchive {
773 thread_id: parsed.thread_id,
774 },
775 )
776 .await?;
777 StdioDispatchResult {
778 result: serde_json::to_value(response)
779 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
780 should_exit: false,
781 }
782 }
783 "thread/message" => {
784 let parsed: ThreadMessageParams = parse_params(params_or_object(params))?;
785 let response = handle_thread_request(
786 state,
787 ThreadRequest::Message {
788 thread_id: parsed.thread_id,
789 input: parsed.input,
790 },
791 )
792 .await?;
793 StdioDispatchResult {
794 result: serde_json::to_value(response)
795 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
796 should_exit: false,
797 }
798 }
799 "app/capabilities" => {
800 let response =
801 process_app_request(state, AppRequest::Capabilities, AppTransport::Stdio).await;
802 StdioDispatchResult {
803 result: serde_json::to_value(response)
804 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
805 should_exit: false,
806 }
807 }
808 "app/request" => {
809 let request: AppRequest = parse_params(params)?;
810 let response = process_app_request(state, request, AppTransport::Stdio).await;
811 StdioDispatchResult {
812 result: serde_json::to_value(response)
813 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
814 should_exit: false,
815 }
816 }
817 "app/config/get" => {
818 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
819 let response = process_app_request(
820 state,
821 AppRequest::ConfigGet { key: parsed.key },
822 AppTransport::Stdio,
823 )
824 .await;
825 StdioDispatchResult {
826 result: serde_json::to_value(response)
827 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
828 should_exit: false,
829 }
830 }
831 "app/config/set" => {
832 let parsed: ConfigSetParams = parse_params(params_or_object(params))?;
833 let response = process_app_request(
834 state,
835 AppRequest::ConfigSet {
836 key: parsed.key,
837 value: parsed.value,
838 },
839 AppTransport::Stdio,
840 )
841 .await;
842 StdioDispatchResult {
843 result: serde_json::to_value(response)
844 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
845 should_exit: false,
846 }
847 }
848 "app/config/unset" => {
849 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
850 let response = process_app_request(
851 state,
852 AppRequest::ConfigUnset { key: parsed.key },
853 AppTransport::Stdio,
854 )
855 .await;
856 StdioDispatchResult {
857 result: serde_json::to_value(response)
858 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
859 should_exit: false,
860 }
861 }
862 "app/config/list" => {
863 let response =
864 process_app_request(state, AppRequest::ConfigList, AppTransport::Stdio).await;
865 StdioDispatchResult {
866 result: serde_json::to_value(response)
867 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
868 should_exit: false,
869 }
870 }
871 "app/models" => {
872 let response =
873 process_app_request(state, AppRequest::Models, AppTransport::Stdio).await;
874 StdioDispatchResult {
875 result: serde_json::to_value(response)
876 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
877 should_exit: false,
878 }
879 }
880 "app/thread_loaded_list" | "app/thread-loaded-list" => {
881 let response =
882 process_app_request(state, AppRequest::ThreadLoadedList, AppTransport::Stdio).await;
883 StdioDispatchResult {
884 result: serde_json::to_value(response)
885 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
886 should_exit: false,
887 }
888 }
889 "prompt/capabilities" => StdioDispatchResult {
890 result: json!({
891 "methods": ["prompt/request", "prompt/run"]
892 }),
893 should_exit: false,
894 },
895 "prompt/request" | "prompt/run" => {
896 let request: PromptRequest = parse_params(params)?;
897 let response = handle_prompt_request(state, request).await?;
898 StdioDispatchResult {
899 result: serde_json::to_value(response)
900 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
901 should_exit: false,
902 }
903 }
904 "shutdown" => StdioDispatchResult {
905 result: json!({"ok": true, "status": "stopped"}),
906 should_exit: true,
907 },
908 _ => return Err(JsonRpcError::method_not_found(method)),
909 };
910 Ok(outcome)
911}
912
913async fn process_app_request(
914 state: &AppState,
915 req: AppRequest,
916 transport: AppTransport,
917) -> AppResponse {
918 match req {
919 AppRequest::Capabilities => AppResponse {
920 ok: true,
921 data: json!({
922 "routes": ["/thread", "/app", "/prompt", "/tool", "/jobs", "/mcp/startup"],
923 "config": ["get", "set", "unset", "list"],
924 "events": ["response_start", "response_delta", "response_end", "tool_call_start", "tool_call_result", "mcp_startup_update", "mcp_startup_complete"],
925 "transport": "stdio+http",
926 "config_path": state.config_path.as_ref().map(|p| p.display().to_string()),
927 }),
928 events: Vec::new(),
929 },
930 AppRequest::ConfigGet { key } => {
931 let cfg = state.config.read().await;
932 let value = match transport {
933 AppTransport::Http => cfg.get_display_value(&key),
934 AppTransport::Stdio => cfg.get_value(&key),
935 };
936 AppResponse {
937 ok: true,
938 data: json!({ "key": key, "value": value }),
939 events: Vec::new(),
940 }
941 }
942 AppRequest::ConfigSet { key, value } => {
943 let mut cfg = state.config.write().await;
944 let result = cfg.set_value(&key, &value);
945 let ok = result.is_ok();
946 let message = result.err().map(|e| e.to_string());
947 let snapshot = cfg.clone();
948 drop(cfg);
949 if let Err(e) = persist_config(state, snapshot).await {
950 tracing::error!("Failed to persist config after set: {e}");
951 }
952 AppResponse {
953 ok,
954 data: json!({ "key": key, "value": value, "error": message }),
955 events: Vec::new(),
956 }
957 }
958 AppRequest::ConfigUnset { key } => {
959 let mut cfg = state.config.write().await;
960 let result = cfg.unset_value(&key);
961 let ok = result.is_ok();
962 let message = result.err().map(|e| e.to_string());
963 let snapshot = cfg.clone();
964 drop(cfg);
965 if let Err(e) = persist_config(state, snapshot).await {
966 tracing::error!("Failed to persist config after unset: {e}");
967 }
968 AppResponse {
969 ok,
970 data: json!({ "key": key, "error": message }),
971 events: Vec::new(),
972 }
973 }
974 AppRequest::ConfigList => {
975 let cfg = state.config.read().await;
976 AppResponse {
977 ok: true,
978 data: json!({ "values": cfg.list_values() }),
979 events: Vec::new(),
980 }
981 }
982 AppRequest::Models => AppResponse {
983 ok: true,
984 data: json!({ "models": state.registry.list() }),
985 events: Vec::new(),
986 },
987 AppRequest::ThreadLoadedList => {
988 let mut runtime = state.runtime.lock().await;
989 let response = runtime
990 .handle_thread(codewhale_protocol::ThreadRequest::List(
991 codewhale_protocol::ThreadListParams {
992 include_archived: false,
993 limit: Some(50),
994 },
995 ))
996 .await;
997 match response {
998 Ok(thread_resp) => AppResponse {
999 ok: true,
1000 data: json!({ "threads": thread_resp.threads }),
1001 events: thread_resp.events,
1002 },
1003 Err(err) => AppResponse {
1004 ok: false,
1005 data: json!({ "error": err.to_string() }),
1006 events: Vec::new(),
1007 },
1008 }
1009 }
1010 AppRequest::SubmitUserInput {
1011 request_id,
1012 answers,
1013 } => {
1014 let mut pending = state.pending_user_input.lock().await;
1020 if pending.contains_key(&request_id) {
1021 return AppResponse {
1022 ok: false,
1023 data: json!({
1024 "error": "request_id already resolved",
1025 "request_id": request_id,
1026 }),
1027 events: Vec::new(),
1028 };
1029 }
1030 pending.insert(request_id.clone(), answers);
1031 AppResponse {
1032 ok: true,
1033 data: json!({ "request_id": request_id, "resolved": true }),
1034 events: Vec::new(),
1035 }
1036 }
1037 }
1038}
1039
1040async fn persist_config(state: &AppState, config: codewhale_config::ConfigToml) -> Result<()> {
1041 if state.config_path.is_none() {
1042 return Ok(());
1043 }
1044 let mut store = ConfigStore::load(state.config_path.clone())?;
1045 store.config = config;
1046 store.save()
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 use super::*;
1052 use axum::body::{Body, to_bytes};
1053 use codewhale_protocol::AppRequest;
1054 use std::fs;
1055 use tower::ServiceExt;
1056
1057 fn app_with_config(auth_token: Option<&str>) -> (Router, tempfile::TempDir) {
1058 let tmp = tempfile::tempdir().expect("tempdir");
1059 let config_path = tmp.path().join("config.toml");
1060 fs::write(&config_path, "api_key = \"sk-deepseek-secret\"\n").expect("write config");
1061 let state = build_state(
1062 Some(config_path),
1063 auth_token.map(std::string::ToString::to_string),
1064 )
1065 .expect("state");
1066 (app_router(state, &[]), tmp)
1067 }
1068
1069 async fn response_body_json(response: Response) -> Value {
1070 let bytes = to_bytes(response.into_body(), usize::MAX)
1071 .await
1072 .expect("body bytes");
1073 serde_json::from_slice(&bytes).expect("json response")
1074 }
1075
1076 #[tokio::test]
1077 async fn http_app_routes_require_bearer_token_when_auth_enabled() {
1078 let (app, _tmp) = app_with_config(Some("test-token"));
1079 let response = app
1080 .oneshot(
1081 Request::builder()
1082 .method(Method::POST)
1083 .uri("/app")
1084 .header(header::CONTENT_TYPE, "application/json")
1085 .body(Body::from(
1086 serde_json::to_vec(&AppRequest::ConfigGet {
1087 key: "api_key".to_string(),
1088 })
1089 .expect("request json"),
1090 ))
1091 .expect("request"),
1092 )
1093 .await
1094 .expect("response");
1095
1096 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
1097 }
1098
1099 #[tokio::test]
1100 async fn http_config_get_redacts_sensitive_values_after_auth() {
1101 let (app, _tmp) = app_with_config(Some("test-token"));
1102 let response = app
1103 .oneshot(
1104 Request::builder()
1105 .method(Method::POST)
1106 .uri("/app")
1107 .header(header::AUTHORIZATION, "Bearer test-token")
1108 .header(header::CONTENT_TYPE, "application/json")
1109 .body(Body::from(
1110 serde_json::to_vec(&AppRequest::ConfigGet {
1111 key: "api_key".to_string(),
1112 })
1113 .expect("request json"),
1114 ))
1115 .expect("request"),
1116 )
1117 .await
1118 .expect("response");
1119
1120 assert_eq!(response.status(), StatusCode::OK);
1121 let body = response_body_json(response).await;
1122 assert_eq!(body["data"]["value"], "sk-d***cret");
1123 }
1124
1125 #[tokio::test]
1126 async fn cors_does_not_allow_arbitrary_origins() {
1127 let (app, _tmp) = app_with_config(Some("test-token"));
1128 let response = app
1129 .oneshot(
1130 Request::builder()
1131 .method(Method::GET)
1132 .uri("/healthz")
1133 .header(header::ORIGIN, "https://attacker.example")
1134 .body(Body::empty())
1135 .expect("request"),
1136 )
1137 .await
1138 .expect("response");
1139
1140 assert_eq!(response.status(), StatusCode::OK);
1141 assert!(
1142 response
1143 .headers()
1144 .get(header::ACCESS_CONTROL_ALLOW_ORIGIN)
1145 .is_none()
1146 );
1147 }
1148
1149 #[tokio::test]
1150 async fn build_state_loads_permissions_into_runtime_policy() {
1151 let tmp = tempfile::tempdir().expect("tempdir");
1152 let config_path = tmp.path().join("config.toml");
1153 fs::write(&config_path, "api_key = \"sk-deepseek-secret\"\n").expect("write config");
1154 fs::write(
1155 tmp.path().join("permissions.toml"),
1156 r#"
1157 [[rules]]
1158 tool = "exec_shell"
1159 command = "cargo test"
1160 "#,
1161 )
1162 .expect("write permissions");
1163
1164 let state = build_state(Some(config_path), None).expect("state");
1165 let runtime = state.runtime.lock().await;
1166 let decision = runtime
1167 .exec_policy
1168 .check(codewhale_execpolicy::ExecPolicyContext {
1169 command: "cargo test --workspace",
1170 cwd: "/workspace",
1171 tool: Some("exec_shell"),
1172 path: None,
1173 ask_for_approval: codewhale_execpolicy::AskForApproval::UnlessTrusted,
1174 sandbox_mode: Some("workspace-write"),
1175 })
1176 .expect("policy check");
1177
1178 assert!(decision.allow);
1179 assert!(decision.requires_approval);
1180 assert_eq!(
1181 decision.matched_rule.as_deref(),
1182 Some("tool=exec_shell command=cargo test")
1183 );
1184 }
1185
1186 #[test]
1187 fn non_loopback_bind_without_auth_fails_fast() {
1188 let options = AppServerOptions {
1189 listen: "0.0.0.0:8787".parse().expect("socket addr"),
1190 config_path: None,
1191 auth_token: None,
1192 insecure_no_auth: true,
1193 cors_origins: Vec::new(),
1194 };
1195
1196 let err = resolve_auth_token(&options).expect_err("non-loopback unauth should fail");
1197 assert!(
1198 err.to_string()
1199 .contains("refusing unauthenticated app-server bind")
1200 );
1201 }
1202
1203 #[tokio::test]
1204 async fn stdio_transport_keeps_raw_config_get_for_legacy_clients() {
1205 let tmp = tempfile::tempdir().expect("tempdir");
1206 let config_path = tmp.path().join("config.toml");
1207 fs::write(&config_path, "").expect("write config");
1208 let state = build_state(Some(config_path), None).expect("state");
1209 {
1210 let mut cfg = state.config.write().await;
1211 cfg.api_key = Some("sk-deepseek-secret".to_string());
1212 }
1213
1214 let response = process_app_request(
1215 &state,
1216 AppRequest::ConfigGet {
1217 key: "api_key".to_string(),
1218 },
1219 AppTransport::Stdio,
1220 )
1221 .await;
1222
1223 assert_eq!(response.data["value"], "sk-deepseek-secret");
1224 }
1225
1226 #[tokio::test]
1227 async fn stdio_thread_goal_methods_round_trip_persisted_goal() {
1228 let tmp = tempfile::tempdir().expect("tempdir");
1229 let config_path = tmp.path().join("config.toml");
1230 fs::write(&config_path, "").expect("write config");
1231 let state = build_state(Some(config_path), None).expect("state");
1232
1233 let capabilities = dispatch_stdio_request(&state, "thread/capabilities", json!({}))
1234 .await
1235 .expect("thread capabilities");
1236 assert!(
1237 capabilities.result["methods"]
1238 .as_array()
1239 .expect("methods")
1240 .iter()
1241 .any(|method| method == "thread/goal/set")
1242 );
1243
1244 let started = dispatch_stdio_request(&state, "thread/start", json!({}))
1245 .await
1246 .expect("start thread");
1247 let thread_id = started.result["thread_id"]
1248 .as_str()
1249 .expect("thread id")
1250 .to_string();
1251
1252 let set = dispatch_stdio_request(
1253 &state,
1254 "thread/goal/set",
1255 json!({
1256 "thread_id": thread_id,
1257 "objective": "Release 0.8.59",
1258 "token_budget": 59000
1259 }),
1260 )
1261 .await
1262 .expect("set goal");
1263 assert_eq!(set.result["status"], "ok");
1264 assert_eq!(set.result["goal"]["objective"], "Release 0.8.59");
1265 assert_eq!(set.result["goal"]["status"], "active");
1266
1267 let got = dispatch_stdio_request(
1268 &state,
1269 "thread/goal/get",
1270 json!({
1271 "thread_id": thread_id
1272 }),
1273 )
1274 .await
1275 .expect("get goal");
1276 assert_eq!(got.result["goal"]["token_budget"], 59000);
1277
1278 let cleared = dispatch_stdio_request(
1279 &state,
1280 "thread/goal/clear",
1281 json!({
1282 "thread_id": thread_id
1283 }),
1284 )
1285 .await
1286 .expect("clear goal");
1287 assert_eq!(cleared.result["status"], "cleared");
1288 assert_eq!(cleared.result["data"]["cleared"], true);
1289 }
1290
1291 const EXPECTED_CAPABILITY_METHODS: &[&str] = &[
1300 "healthz",
1301 "thread/capabilities",
1302 "thread/request",
1303 "thread/create",
1304 "thread/start",
1305 "thread/resume",
1306 "thread/fork",
1307 "thread/list",
1308 "thread/read",
1309 "thread/set_name",
1310 "thread/goal/set",
1311 "thread/goal/get",
1312 "thread/goal/clear",
1313 "thread/archive",
1314 "thread/unarchive",
1315 "thread/message",
1316 "app/capabilities",
1317 "app/request",
1318 "app/config/get",
1319 "app/config/set",
1320 "app/config/unset",
1321 "app/config/list",
1322 "app/models",
1323 "app/thread_loaded_list",
1324 "prompt/capabilities",
1325 "prompt/request",
1326 "prompt/run",
1327 "shutdown",
1328 ];
1329
1330 fn capability_test_state() -> (AppState, tempfile::TempDir) {
1331 let tmp = tempfile::tempdir().expect("tempdir");
1332 let config_path = tmp.path().join("config.toml");
1333 fs::write(&config_path, "").expect("write config");
1334 let state = build_state(Some(config_path), None).expect("state");
1335 (state, tmp)
1336 }
1337
1338 #[tokio::test]
1339 async fn capabilities_method_set_is_stable() {
1340 let (state, _tmp) = capability_test_state();
1341 let caps = dispatch_stdio_request(&state, "capabilities", json!({}))
1342 .await
1343 .expect("capabilities dispatch");
1344 let methods: Vec<String> = caps.result["methods"]
1345 .as_array()
1346 .expect("methods array")
1347 .iter()
1348 .map(|m| m.as_str().expect("method string").to_string())
1349 .collect();
1350 assert_eq!(
1351 methods, EXPECTED_CAPABILITY_METHODS,
1352 "app-server stdio capability set drifted; update the dispatcher, this \
1353 snapshot, and docs/RUNTIME_API.md together"
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn every_advertised_capability_is_dispatchable() {
1359 let (state, _tmp) = capability_test_state();
1360 for method in EXPECTED_CAPABILITY_METHODS {
1364 if let Err(err) = dispatch_stdio_request(&state, method, json!({})).await {
1365 assert_ne!(
1366 err.code,
1367 JsonRpcError::method_not_found(method).code,
1368 "advertised capability `{method}` is not dispatchable"
1369 );
1370 }
1371 }
1372 }
1373
1374 #[test]
1377 fn auth_token_empty_string_fails() {
1378 let options = AppServerOptions {
1379 listen: "127.0.0.1:0".parse().expect("addr"),
1380 config_path: None,
1381 auth_token: Some(" ".to_string()),
1382 insecure_no_auth: false,
1383 cors_origins: Vec::new(),
1384 };
1385 let err = resolve_auth_token(&options).expect_err("empty token should fail");
1386 assert!(err.to_string().contains("cannot be empty"));
1387 }
1388
1389 #[test]
1390 fn auth_token_generated_when_none_provided() {
1391 let options = AppServerOptions {
1392 listen: "127.0.0.1:0".parse().expect("addr"),
1393 config_path: None,
1394 auth_token: None,
1395 insecure_no_auth: false,
1396 cors_origins: Vec::new(),
1397 };
1398 let token = resolve_auth_token(&options).unwrap();
1399 assert!(token.is_some());
1400 assert!(token.unwrap().starts_with("cwapp_"));
1401 }
1402
1403 #[test]
1404 fn auth_token_explicit_is_preserved() {
1405 let options = AppServerOptions {
1406 listen: "127.0.0.1:0".parse().expect("addr"),
1407 config_path: None,
1408 auth_token: Some("my-secret".to_string()),
1409 insecure_no_auth: false,
1410 cors_origins: Vec::new(),
1411 };
1412 let token = resolve_auth_token(&options).unwrap();
1413 assert_eq!(token.as_deref(), Some("my-secret"));
1414 }
1415
1416 #[test]
1417 fn insecure_no_auth_on_loopback_returns_none() {
1418 let options = AppServerOptions {
1419 listen: "127.0.0.1:0".parse().expect("addr"),
1420 config_path: None,
1421 auth_token: None,
1422 insecure_no_auth: true,
1423 cors_origins: Vec::new(),
1424 };
1425 let token = resolve_auth_token(&options).unwrap();
1426 assert!(token.is_none());
1427 }
1428
1429 #[test]
1432 fn cors_layer_includes_default_origins() {
1433 let layer = cors_layer(&[]);
1434 let _ = layer;
1436 }
1437
1438 #[test]
1439 fn cors_layer_adds_extra_origins() {
1440 let extras = vec!["https://example.com".to_string()];
1441 let layer = cors_layer(&extras);
1442 let _ = layer;
1443 }
1444
1445 #[test]
1446 fn cors_layer_skips_empty_origins() {
1447 let extras = vec!["".to_string(), " ".to_string()];
1448 let layer = cors_layer(&extras);
1449 let _ = layer;
1450 }
1451
1452 #[test]
1455 fn params_or_object_returns_object_for_null() {
1456 let result = params_or_object(Value::Null);
1457 assert_eq!(result, json!({}));
1458 }
1459
1460 #[test]
1461 fn params_or_object_passthrough_for_non_null() {
1462 let input = json!({"key": "value"});
1463 let result = params_or_object(input.clone());
1464 assert_eq!(result, input);
1465 }
1466
1467 #[test]
1468 fn jsonrpc_result_format() {
1469 let result = jsonrpc_result(Some(json!(1)), json!({"ok": true}));
1470 assert_eq!(result["jsonrpc"], "2.0");
1471 assert_eq!(result["id"], 1);
1472 assert_eq!(result["result"]["ok"], true);
1473 }
1474
1475 #[test]
1476 fn jsonrpc_result_null_id() {
1477 let result = jsonrpc_result(None, json!(null));
1478 assert_eq!(result["id"], Value::Null);
1479 }
1480
1481 #[test]
1482 fn jsonrpc_error_format() {
1483 let err = jsonrpc_error(Some(json!(2)), JsonRpcError::internal("oops"));
1484 assert_eq!(err["jsonrpc"], "2.0");
1485 assert_eq!(err["id"], 2);
1486 assert_eq!(err["error"]["code"], -32603);
1487 assert_eq!(err["error"]["message"], "oops");
1488 }
1489
1490 #[test]
1491 fn jsonrpc_error_codes() {
1492 assert_eq!(JsonRpcError::parse_error("").code, -32700);
1493 assert_eq!(JsonRpcError::invalid_request("").code, -32600);
1494 assert_eq!(JsonRpcError::method_not_found("x").code, -32601);
1495 assert_eq!(JsonRpcError::invalid_params("").code, -32602);
1496 assert_eq!(JsonRpcError::internal("").code, -32603);
1497 }
1498
1499 #[test]
1502 fn app_server_options_debug_does_not_leak_token() {
1503 let options = AppServerOptions {
1504 listen: "127.0.0.1:8080".parse().expect("addr"),
1505 config_path: None,
1506 auth_token: Some("secret-token".to_string()),
1507 insecure_no_auth: false,
1508 cors_origins: vec!["https://example.com".to_string()],
1509 };
1510 let debug = format!("{options:?}");
1511 assert!(!debug.contains("secret-token"));
1512 assert!(debug.contains("<redacted>"));
1513 assert!(debug.contains("8080"));
1514 }
1515
1516 #[test]
1519 fn default_cors_origins_include_common_dev_ports() {
1520 assert!(DEFAULT_CORS_ORIGINS.contains(&"http://localhost:3000"));
1521 assert!(DEFAULT_CORS_ORIGINS.contains(&"http://localhost:5173"));
1522 assert!(DEFAULT_CORS_ORIGINS.contains(&"tauri://localhost"));
1523 }
1524}