1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Result, bail};
6use axum::extract::{Request, State};
7use axum::http::{HeaderValue, Method, StatusCode, header};
8use axum::middleware::{self, Next};
9use axum::response::{IntoResponse, Response};
10use axum::routing::{get, post};
11use axum::{Json, Router};
12use codewhale_agent::ModelRegistry;
13use codewhale_config::{CliRuntimeOverrides, ConfigStore};
14use codewhale_core::Runtime;
15use codewhale_hooks::{HookDispatcher, JsonlHookSink, StdoutHookSink, UnixSocketHookSink};
16use codewhale_mcp::McpManager;
17use codewhale_protocol::{
18 AppRequest, AppResponse, PromptRequest, PromptResponse, ThreadGoalClearParams,
19 ThreadGoalGetParams, ThreadGoalSetParams, ThreadRequest, ThreadResponse,
20};
21use codewhale_state::StateStore;
22use codewhale_tools::{ToolCall, ToolRegistry};
23use serde::de::DeserializeOwned;
24use serde::{Deserialize, Serialize};
25use serde_json::{Value, json};
26use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
27use tokio::sync::{Mutex, RwLock};
28use tower_http::cors::CorsLayer;
29use uuid::Uuid;
30
31mod chat_completions;
32
33const DEFAULT_CORS_ORIGINS: &[&str] = &[
34 "http://localhost",
35 "http://localhost:1420",
36 "http://localhost:3000",
37 "http://localhost:5173",
38 "http://127.0.0.1",
39 "http://127.0.0.1:1420",
40 "tauri://localhost",
41];
42
43#[derive(Clone)]
44pub struct AppServerOptions {
45 pub listen: SocketAddr,
46 pub config_path: Option<PathBuf>,
47 pub auth_token: Option<String>,
48 pub insecure_no_auth: bool,
49 pub cors_origins: Vec<String>,
50}
51
52impl std::fmt::Debug for AppServerOptions {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 f.debug_struct("AppServerOptions")
55 .field("listen", &self.listen)
56 .field("config_path", &self.config_path)
57 .field(
58 "auth_token",
59 &self.auth_token.as_ref().map(|_| "<redacted>"),
60 )
61 .field("insecure_no_auth", &self.insecure_no_auth)
62 .field("cors_origins", &self.cors_origins)
63 .finish()
64 }
65}
66
67#[derive(Clone)]
68struct AppState {
69 config_path: Option<PathBuf>,
70 config: Arc<RwLock<codewhale_config::ConfigToml>>,
71 runtime: Arc<Mutex<Runtime>>,
72 registry: ModelRegistry,
73 auth_token: Option<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
77struct ToolCallRequest {
78 call: ToolCall,
79 #[serde(default)]
80 cwd: Option<PathBuf>,
81}
82
83#[derive(Debug, Deserialize)]
84struct JsonRpcRequest {
85 #[serde(default)]
86 jsonrpc: Option<String>,
87 #[serde(default)]
88 id: Option<Value>,
89 method: String,
90 #[serde(default)]
91 params: Value,
92}
93
94#[derive(Debug)]
95struct JsonRpcError {
96 code: i64,
97 message: String,
98 data: Option<Value>,
99}
100
101#[derive(Debug)]
102struct StdioDispatchResult {
103 result: Value,
104 should_exit: bool,
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
108enum AppTransport {
109 Http,
110 Stdio,
111}
112
113#[derive(Debug, Deserialize)]
114struct ConfigGetParams {
115 key: String,
116}
117
118#[derive(Debug, Deserialize)]
119struct ConfigSetParams {
120 key: String,
121 value: String,
122}
123
124#[derive(Debug, Deserialize)]
125struct ThreadIdParams {
126 thread_id: String,
127}
128
129#[derive(Debug, Deserialize)]
130struct ThreadMessageParams {
131 thread_id: String,
132 input: String,
133}
134
135pub async fn run(options: AppServerOptions) -> Result<()> {
136 let auth_token = resolve_auth_token(&options)?;
137 let state = build_state(options.config_path.clone(), auth_token)?;
138 let app = app_router(state, &options.cors_origins);
139
140 let listener = tokio::net::TcpListener::bind(options.listen).await?;
141 axum::serve(listener, app).await?;
142 Ok(())
143}
144
145fn app_router(state: AppState, cors_origins: &[String]) -> Router {
146 let protected_routes = Router::new()
147 .route("/thread", post(thread_handler))
148 .route("/app", post(app_handler))
149 .route("/prompt", post(prompt_handler))
150 .route("/tool", post(tool_handler))
151 .route("/jobs", get(jobs_handler))
152 .route("/mcp/startup", post(mcp_startup_handler))
153 .route_layer(middleware::from_fn_with_state(
154 state.clone(),
155 require_app_server_token,
156 ));
157
158 Router::new()
159 .route("/healthz", get(healthz))
160 .route(
161 "/v1/chat/completions",
162 post(chat_completions::chat_completions_handler),
163 )
164 .merge(protected_routes)
165 .layer(cors_layer(cors_origins))
166 .with_state(state)
167}
168
169pub async fn run_stdio(config_path: Option<PathBuf>) -> Result<()> {
170 let state = build_state(config_path, None)?;
171 let stdin = tokio::io::stdin();
172 let stdout = tokio::io::stdout();
173 let mut reader = BufReader::new(stdin).lines();
174 let mut writer = tokio::io::BufWriter::new(stdout);
175 while let Some(line) = reader.next_line().await? {
176 if line.trim().is_empty() {
177 continue;
178 }
179
180 let request: JsonRpcRequest = match serde_json::from_str(&line) {
181 Ok(value) => value,
182 Err(err) => {
183 let response = jsonrpc_error(
184 None,
185 JsonRpcError::parse_error(format!("invalid json: {err}")),
186 );
187 writer.write_all(response.to_string().as_bytes()).await?;
188 writer.write_all(b"\n").await?;
189 writer.flush().await?;
190 continue;
191 }
192 };
193
194 if request
195 .jsonrpc
196 .as_deref()
197 .is_some_and(|version| version != "2.0")
198 {
199 let response = jsonrpc_error(
200 request.id,
201 JsonRpcError::invalid_request("jsonrpc version must be 2.0"),
202 );
203 writer.write_all(response.to_string().as_bytes()).await?;
204 writer.write_all(b"\n").await?;
205 writer.flush().await?;
206 continue;
207 }
208
209 let response = match dispatch_stdio_request(&state, &request.method, request.params).await {
210 Ok(dispatch) => {
211 let encoded = jsonrpc_result(request.id, dispatch.result);
212 writer.write_all(encoded.to_string().as_bytes()).await?;
213 writer.write_all(b"\n").await?;
214 writer.flush().await?;
215 if dispatch.should_exit {
216 break;
217 }
218 continue;
219 }
220 Err(err) => jsonrpc_error(request.id, err),
221 };
222
223 writer.write_all(response.to_string().as_bytes()).await?;
224 writer.write_all(b"\n").await?;
225 writer.flush().await?;
226 }
227
228 Ok(())
229}
230
231async fn healthz() -> Json<Value> {
232 Json(json!({
233 "status": "ok",
234 "protocol": "v2",
235 "service": "deepseek-app-server"
236 }))
237}
238
239async fn thread_handler(
240 State(state): State<AppState>,
241 Json(req): Json<ThreadRequest>,
242) -> Json<ThreadResponse> {
243 let mut runtime = state.runtime.lock().await;
244 match runtime.handle_thread(req).await {
245 Ok(res) => Json(res),
246 Err(err) => Json(ThreadResponse {
247 thread_id: "error".to_string(),
248 status: format!("error:{err}"),
249 thread: None,
250 threads: Vec::new(),
251 goal: None,
252 model: None,
253 model_provider: None,
254 cwd: None,
255 approval_policy: None,
256 sandbox: None,
257 events: Vec::new(),
258 data: json!({}),
259 }),
260 }
261}
262
263async fn prompt_handler(
264 State(state): State<AppState>,
265 Json(req): Json<PromptRequest>,
266) -> Json<PromptResponse> {
267 let mut runtime = state.runtime.lock().await;
268 let overrides = CliRuntimeOverrides::default();
269 match runtime.handle_prompt(req, &overrides).await {
270 Ok(res) => Json(res),
271 Err(err) => Json(PromptResponse {
272 output: err.to_string(),
273 model: "unknown".to_string(),
274 events: Vec::new(),
275 }),
276 }
277}
278
279async fn tool_handler(
280 State(state): State<AppState>,
281 Json(req): Json<ToolCallRequest>,
282) -> Json<Value> {
283 let runtime = state.runtime.lock().await;
284 let cwd = req
285 .cwd
286 .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
287 let approval_mode = {
289 let cfg = state.config.read().await;
290 cfg.approval_policy
291 .as_deref()
292 .and_then(|p| match p.trim().to_ascii_lowercase().as_str() {
293 "auto" | "yolo" => Some(codewhale_execpolicy::AskForApproval::UnlessTrusted),
294 "never" | "deny" => Some(codewhale_execpolicy::AskForApproval::Never),
295 _ => None,
296 })
297 .unwrap_or(codewhale_execpolicy::AskForApproval::OnRequest)
298 };
299 match runtime.invoke_tool(req.call, approval_mode, &cwd).await {
300 Ok(value) => Json(value),
301 Err(err) => Json(json!({ "ok": false, "error": err.to_string() })),
302 }
303}
304
305async fn jobs_handler(State(state): State<AppState>) -> Json<AppResponse> {
306 let runtime = state.runtime.lock().await;
307 Json(runtime.app_status())
308}
309
310async fn mcp_startup_handler(State(state): State<AppState>) -> Json<Value> {
311 let runtime = state.runtime.lock().await;
312 let summary = runtime.mcp_startup().await;
313 Json(json!({
314 "ok": true,
315 "summary": summary
316 }))
317}
318
319async fn app_handler(
320 State(state): State<AppState>,
321 Json(req): Json<AppRequest>,
322) -> Json<AppResponse> {
323 Json(process_app_request(&state, req, AppTransport::Http).await)
324}
325
326fn build_state(config_path: Option<PathBuf>, auth_token: Option<String>) -> Result<AppState> {
327 let store = ConfigStore::load(config_path.clone())?;
328 let config = store.config.clone();
329 let exec_policy = store.exec_policy_engine();
330 let registry = ModelRegistry::default();
331
332 let state_db_path = config_path
333 .as_ref()
334 .and_then(|p| p.parent().map(|parent| parent.join("state.db")));
335 let state_store = StateStore::open(state_db_path)?;
336
337 let mut hooks = HookDispatcher::default();
338 hooks.add_sink(Arc::new(StdoutHookSink));
339 let hook_log_path = config_path
340 .as_ref()
341 .and_then(|p| p.parent().map(|parent| parent.join("events.jsonl")))
342 .unwrap_or_else(|| PathBuf::from(".deepseek/events.jsonl"));
343 hooks.add_sink(Arc::new(JsonlHookSink::new(hook_log_path)));
344
345 if let Some(socket_path) = config
346 .hook_sinks
347 .as_ref()
348 .and_then(|sinks| sinks.unix_socket_path.as_ref())
349 .filter(|path| !path.as_os_str().is_empty())
350 {
351 hooks.add_sink(Arc::new(UnixSocketHookSink::new(socket_path.clone())));
352 }
353
354 let runtime = Runtime::new(
355 config.clone(),
356 registry.clone(),
357 state_store,
358 Arc::new(ToolRegistry::default()),
359 Arc::new(McpManager::default()),
360 exec_policy,
361 hooks,
362 );
363
364 Ok(AppState {
365 config_path,
366 config: Arc::new(RwLock::new(config)),
367 runtime: Arc::new(Mutex::new(runtime)),
368 registry,
369 auth_token,
370 })
371}
372
373fn resolve_auth_token(options: &AppServerOptions) -> Result<Option<String>> {
374 let configured = options.auth_token.as_ref().map(|token| token.trim());
375 if let Some(token) = configured
376 && token.is_empty()
377 {
378 bail!("app-server auth token cannot be empty");
379 }
380
381 if options.insecure_no_auth {
382 if !options.listen.ip().is_loopback() {
383 bail!("refusing unauthenticated app-server bind on non-loopback address");
384 }
385 eprintln!("warning: app-server HTTP auth disabled by --insecure-no-auth");
386 return Ok(None);
387 }
388
389 let token = configured
390 .map(str::to_string)
391 .unwrap_or_else(|| format!("cwapp_{}", Uuid::new_v4().simple()));
392 if options.auth_token.is_some() {
393 eprintln!("app-server auth: bearer token required for HTTP routes.");
394 } else {
395 eprintln!("app-server auth: generated bearer token for this process.");
396 eprintln!(" Authorization: Bearer {token}");
397 eprintln!(" Pass --auth-token or set CODEWHALE_APP_SERVER_TOKEN for a stable token.");
398 }
399 Ok(Some(token))
400}
401
402fn cors_layer(extra_origins: &[String]) -> CorsLayer {
403 let mut origins: Vec<HeaderValue> = DEFAULT_CORS_ORIGINS
404 .iter()
405 .filter_map(|origin| HeaderValue::from_str(origin).ok())
406 .collect();
407 for raw in extra_origins {
408 let trimmed = raw.trim();
409 if trimmed.is_empty() {
410 continue;
411 }
412 match HeaderValue::from_str(trimmed) {
413 Ok(value) if !origins.contains(&value) => origins.push(value),
414 Ok(_) => {}
415 Err(err) => {
416 eprintln!("warning: ignoring invalid app-server CORS origin `{trimmed}`: {err}")
417 }
418 }
419 }
420
421 CorsLayer::new()
422 .allow_origin(origins)
423 .allow_methods([Method::GET, Method::POST, Method::OPTIONS])
424 .allow_headers([header::AUTHORIZATION, header::CONTENT_TYPE])
425}
426
427async fn require_app_server_token(
428 State(state): State<AppState>,
429 req: Request,
430 next: Next,
431) -> Response {
432 let Some(expected) = state.auth_token.as_deref() else {
433 return next.run(req).await;
434 };
435 let authorized = req
436 .headers()
437 .get(header::AUTHORIZATION)
438 .and_then(|value| value.to_str().ok())
439 .and_then(|raw| raw.strip_prefix("Bearer "))
440 .is_some_and(|token| token == expected);
441
442 if authorized {
443 next.run(req).await
444 } else {
445 (
446 StatusCode::UNAUTHORIZED,
447 Json(json!({
448 "error": {
449 "message": "app-server bearer token required",
450 "status": StatusCode::UNAUTHORIZED.as_u16(),
451 }
452 })),
453 )
454 .into_response()
455 }
456}
457
458fn params_or_object(params: Value) -> Value {
459 if params.is_null() { json!({}) } else { params }
460}
461
462fn parse_params<T: DeserializeOwned>(params: Value) -> std::result::Result<T, JsonRpcError> {
463 serde_json::from_value(params).map_err(|err| JsonRpcError::invalid_params(err.to_string()))
464}
465
466fn jsonrpc_result(id: Option<Value>, result: Value) -> Value {
467 json!({
468 "jsonrpc": "2.0",
469 "id": id.unwrap_or(Value::Null),
470 "result": result
471 })
472}
473
474fn jsonrpc_error(id: Option<Value>, err: JsonRpcError) -> Value {
475 json!({
476 "jsonrpc": "2.0",
477 "id": id.unwrap_or(Value::Null),
478 "error": {
479 "code": err.code,
480 "message": err.message,
481 "data": err.data
482 }
483 })
484}
485
486impl JsonRpcError {
487 fn parse_error(message: impl Into<String>) -> Self {
488 Self {
489 code: -32700,
490 message: message.into(),
491 data: None,
492 }
493 }
494
495 fn invalid_request(message: impl Into<String>) -> Self {
496 Self {
497 code: -32600,
498 message: message.into(),
499 data: None,
500 }
501 }
502
503 fn method_not_found(method: &str) -> Self {
504 Self {
505 code: -32601,
506 message: format!("unsupported method: {method}"),
507 data: None,
508 }
509 }
510
511 fn invalid_params(message: impl Into<String>) -> Self {
512 Self {
513 code: -32602,
514 message: message.into(),
515 data: None,
516 }
517 }
518
519 fn internal(message: impl Into<String>) -> Self {
520 Self {
521 code: -32603,
522 message: message.into(),
523 data: None,
524 }
525 }
526}
527
528async fn handle_thread_request(
529 state: &AppState,
530 req: ThreadRequest,
531) -> std::result::Result<ThreadResponse, JsonRpcError> {
532 let mut runtime = state.runtime.lock().await;
533 runtime
534 .handle_thread(req)
535 .await
536 .map_err(|err| JsonRpcError::internal(err.to_string()))
537}
538
539async fn handle_prompt_request(
540 state: &AppState,
541 req: PromptRequest,
542) -> std::result::Result<PromptResponse, JsonRpcError> {
543 let mut runtime = state.runtime.lock().await;
544 runtime
545 .handle_prompt(req, &CliRuntimeOverrides::default())
546 .await
547 .map_err(|err| JsonRpcError::internal(err.to_string()))
548}
549
550async fn dispatch_stdio_request(
551 state: &AppState,
552 method: &str,
553 params: Value,
554) -> std::result::Result<StdioDispatchResult, JsonRpcError> {
555 let outcome = match method {
556 "healthz" | "app/healthz" => StdioDispatchResult {
557 result: json!({
558 "status": "ok",
559 "service": "deepseek-app-server",
560 "transport": "stdio"
561 }),
562 should_exit: false,
563 },
564 "capabilities" => StdioDispatchResult {
565 result: json!({
566 "transport": "stdio",
567 "families": ["thread/*", "app/*", "prompt/*"],
568 "methods": [
569 "healthz",
570 "thread/capabilities",
571 "thread/request",
572 "thread/create",
573 "thread/start",
574 "thread/resume",
575 "thread/fork",
576 "thread/list",
577 "thread/read",
578 "thread/set_name",
579 "thread/goal/set",
580 "thread/goal/get",
581 "thread/goal/clear",
582 "thread/archive",
583 "thread/unarchive",
584 "thread/message",
585 "app/capabilities",
586 "app/request",
587 "app/config/get",
588 "app/config/set",
589 "app/config/unset",
590 "app/config/list",
591 "app/models",
592 "app/thread_loaded_list",
593 "prompt/capabilities",
594 "prompt/request",
595 "prompt/run",
596 "shutdown"
597 ]
598 }),
599 should_exit: false,
600 },
601 "thread/capabilities" => StdioDispatchResult {
602 result: json!({
603 "methods": [
604 "thread/request",
605 "thread/create",
606 "thread/start",
607 "thread/resume",
608 "thread/fork",
609 "thread/list",
610 "thread/read",
611 "thread/set_name",
612 "thread/goal/set",
613 "thread/goal/get",
614 "thread/goal/clear",
615 "thread/archive",
616 "thread/unarchive",
617 "thread/message"
618 ]
619 }),
620 should_exit: false,
621 },
622 "thread/request" => {
623 let request: ThreadRequest = parse_params(params)?;
624 let response = handle_thread_request(state, request).await?;
625 StdioDispatchResult {
626 result: serde_json::to_value(response)
627 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
628 should_exit: false,
629 }
630 }
631 "thread/create" => {
632 #[derive(Debug, Deserialize)]
633 struct CreateParams {
634 #[serde(default)]
635 metadata: Value,
636 }
637 let parsed: CreateParams = parse_params(params_or_object(params))?;
638 let response = handle_thread_request(
639 state,
640 ThreadRequest::Create {
641 metadata: parsed.metadata,
642 },
643 )
644 .await?;
645 StdioDispatchResult {
646 result: serde_json::to_value(response)
647 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
648 should_exit: false,
649 }
650 }
651 "thread/start" => {
652 let request = ThreadRequest::Start(parse_params(params_or_object(params))?);
653 let response = handle_thread_request(state, request).await?;
654 StdioDispatchResult {
655 result: serde_json::to_value(response)
656 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
657 should_exit: false,
658 }
659 }
660 "thread/resume" => {
661 let request = ThreadRequest::Resume(parse_params(params_or_object(params))?);
662 let response = handle_thread_request(state, request).await?;
663 StdioDispatchResult {
664 result: serde_json::to_value(response)
665 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
666 should_exit: false,
667 }
668 }
669 "thread/fork" => {
670 let request = ThreadRequest::Fork(parse_params(params_or_object(params))?);
671 let response = handle_thread_request(state, request).await?;
672 StdioDispatchResult {
673 result: serde_json::to_value(response)
674 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
675 should_exit: false,
676 }
677 }
678 "thread/list" => {
679 let request = ThreadRequest::List(parse_params(params_or_object(params))?);
680 let response = handle_thread_request(state, request).await?;
681 StdioDispatchResult {
682 result: serde_json::to_value(response)
683 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
684 should_exit: false,
685 }
686 }
687 "thread/read" => {
688 let request = ThreadRequest::Read(parse_params(params_or_object(params))?);
689 let response = handle_thread_request(state, request).await?;
690 StdioDispatchResult {
691 result: serde_json::to_value(response)
692 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
693 should_exit: false,
694 }
695 }
696 "thread/set_name" | "thread/set-name" => {
697 let request = ThreadRequest::SetName(parse_params(params_or_object(params))?);
698 let response = handle_thread_request(state, request).await?;
699 StdioDispatchResult {
700 result: serde_json::to_value(response)
701 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
702 should_exit: false,
703 }
704 }
705 "thread/goal/set" | "thread/goal_set" | "thread/goal-set" => {
706 let request = ThreadRequest::GoalSet(parse_params::<ThreadGoalSetParams>(
707 params_or_object(params),
708 )?);
709 let response = handle_thread_request(state, request).await?;
710 StdioDispatchResult {
711 result: serde_json::to_value(response)
712 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
713 should_exit: false,
714 }
715 }
716 "thread/goal/get" | "thread/goal_get" | "thread/goal-get" => {
717 let request = ThreadRequest::GoalGet(parse_params::<ThreadGoalGetParams>(
718 params_or_object(params),
719 )?);
720 let response = handle_thread_request(state, request).await?;
721 StdioDispatchResult {
722 result: serde_json::to_value(response)
723 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
724 should_exit: false,
725 }
726 }
727 "thread/goal/clear" | "thread/goal_clear" | "thread/goal-clear" => {
728 let request = ThreadRequest::GoalClear(parse_params::<ThreadGoalClearParams>(
729 params_or_object(params),
730 )?);
731 let response = handle_thread_request(state, request).await?;
732 StdioDispatchResult {
733 result: serde_json::to_value(response)
734 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
735 should_exit: false,
736 }
737 }
738 "thread/archive" => {
739 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
740 let response = handle_thread_request(
741 state,
742 ThreadRequest::Archive {
743 thread_id: parsed.thread_id,
744 },
745 )
746 .await?;
747 StdioDispatchResult {
748 result: serde_json::to_value(response)
749 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
750 should_exit: false,
751 }
752 }
753 "thread/unarchive" => {
754 let parsed: ThreadIdParams = parse_params(params_or_object(params))?;
755 let response = handle_thread_request(
756 state,
757 ThreadRequest::Unarchive {
758 thread_id: parsed.thread_id,
759 },
760 )
761 .await?;
762 StdioDispatchResult {
763 result: serde_json::to_value(response)
764 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
765 should_exit: false,
766 }
767 }
768 "thread/message" => {
769 let parsed: ThreadMessageParams = parse_params(params_or_object(params))?;
770 let response = handle_thread_request(
771 state,
772 ThreadRequest::Message {
773 thread_id: parsed.thread_id,
774 input: parsed.input,
775 },
776 )
777 .await?;
778 StdioDispatchResult {
779 result: serde_json::to_value(response)
780 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
781 should_exit: false,
782 }
783 }
784 "app/capabilities" => {
785 let response =
786 process_app_request(state, AppRequest::Capabilities, AppTransport::Stdio).await;
787 StdioDispatchResult {
788 result: serde_json::to_value(response)
789 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
790 should_exit: false,
791 }
792 }
793 "app/request" => {
794 let request: AppRequest = parse_params(params)?;
795 let response = process_app_request(state, request, AppTransport::Stdio).await;
796 StdioDispatchResult {
797 result: serde_json::to_value(response)
798 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
799 should_exit: false,
800 }
801 }
802 "app/config/get" => {
803 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
804 let response = process_app_request(
805 state,
806 AppRequest::ConfigGet { key: parsed.key },
807 AppTransport::Stdio,
808 )
809 .await;
810 StdioDispatchResult {
811 result: serde_json::to_value(response)
812 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
813 should_exit: false,
814 }
815 }
816 "app/config/set" => {
817 let parsed: ConfigSetParams = parse_params(params_or_object(params))?;
818 let response = process_app_request(
819 state,
820 AppRequest::ConfigSet {
821 key: parsed.key,
822 value: parsed.value,
823 },
824 AppTransport::Stdio,
825 )
826 .await;
827 StdioDispatchResult {
828 result: serde_json::to_value(response)
829 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
830 should_exit: false,
831 }
832 }
833 "app/config/unset" => {
834 let parsed: ConfigGetParams = parse_params(params_or_object(params))?;
835 let response = process_app_request(
836 state,
837 AppRequest::ConfigUnset { key: parsed.key },
838 AppTransport::Stdio,
839 )
840 .await;
841 StdioDispatchResult {
842 result: serde_json::to_value(response)
843 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
844 should_exit: false,
845 }
846 }
847 "app/config/list" => {
848 let response =
849 process_app_request(state, AppRequest::ConfigList, AppTransport::Stdio).await;
850 StdioDispatchResult {
851 result: serde_json::to_value(response)
852 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
853 should_exit: false,
854 }
855 }
856 "app/models" => {
857 let response =
858 process_app_request(state, AppRequest::Models, AppTransport::Stdio).await;
859 StdioDispatchResult {
860 result: serde_json::to_value(response)
861 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
862 should_exit: false,
863 }
864 }
865 "app/thread_loaded_list" | "app/thread-loaded-list" => {
866 let response =
867 process_app_request(state, AppRequest::ThreadLoadedList, AppTransport::Stdio).await;
868 StdioDispatchResult {
869 result: serde_json::to_value(response)
870 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
871 should_exit: false,
872 }
873 }
874 "prompt/capabilities" => StdioDispatchResult {
875 result: json!({
876 "methods": ["prompt/request", "prompt/run"]
877 }),
878 should_exit: false,
879 },
880 "prompt/request" | "prompt/run" => {
881 let request: PromptRequest = parse_params(params)?;
882 let response = handle_prompt_request(state, request).await?;
883 StdioDispatchResult {
884 result: serde_json::to_value(response)
885 .map_err(|err| JsonRpcError::internal(err.to_string()))?,
886 should_exit: false,
887 }
888 }
889 "shutdown" => StdioDispatchResult {
890 result: json!({"ok": true, "status": "stopped"}),
891 should_exit: true,
892 },
893 _ => return Err(JsonRpcError::method_not_found(method)),
894 };
895 Ok(outcome)
896}
897
898async fn process_app_request(
899 state: &AppState,
900 req: AppRequest,
901 transport: AppTransport,
902) -> AppResponse {
903 match req {
904 AppRequest::Capabilities => AppResponse {
905 ok: true,
906 data: json!({
907 "routes": ["/thread", "/app", "/prompt", "/tool", "/jobs", "/mcp/startup"],
908 "config": ["get", "set", "unset", "list"],
909 "events": ["response_start", "response_delta", "response_end", "tool_call_start", "tool_call_result", "mcp_startup_update", "mcp_startup_complete"],
910 "transport": "stdio+http",
911 "config_path": state.config_path.as_ref().map(|p| p.display().to_string()),
912 }),
913 events: Vec::new(),
914 },
915 AppRequest::ConfigGet { key } => {
916 let cfg = state.config.read().await;
917 let value = match transport {
918 AppTransport::Http => cfg.get_display_value(&key),
919 AppTransport::Stdio => cfg.get_value(&key),
920 };
921 AppResponse {
922 ok: true,
923 data: json!({ "key": key, "value": value }),
924 events: Vec::new(),
925 }
926 }
927 AppRequest::ConfigSet { key, value } => {
928 let mut cfg = state.config.write().await;
929 let result = cfg.set_value(&key, &value);
930 let ok = result.is_ok();
931 let message = result.err().map(|e| e.to_string());
932 let snapshot = cfg.clone();
933 drop(cfg);
934 if let Err(e) = persist_config(state, snapshot).await {
935 tracing::error!("Failed to persist config after set: {e}");
936 }
937 AppResponse {
938 ok,
939 data: json!({ "key": key, "value": value, "error": message }),
940 events: Vec::new(),
941 }
942 }
943 AppRequest::ConfigUnset { key } => {
944 let mut cfg = state.config.write().await;
945 let result = cfg.unset_value(&key);
946 let ok = result.is_ok();
947 let message = result.err().map(|e| e.to_string());
948 let snapshot = cfg.clone();
949 drop(cfg);
950 if let Err(e) = persist_config(state, snapshot).await {
951 tracing::error!("Failed to persist config after unset: {e}");
952 }
953 AppResponse {
954 ok,
955 data: json!({ "key": key, "error": message }),
956 events: Vec::new(),
957 }
958 }
959 AppRequest::ConfigList => {
960 let cfg = state.config.read().await;
961 AppResponse {
962 ok: true,
963 data: json!({ "values": cfg.list_values() }),
964 events: Vec::new(),
965 }
966 }
967 AppRequest::Models => AppResponse {
968 ok: true,
969 data: json!({ "models": state.registry.list() }),
970 events: Vec::new(),
971 },
972 AppRequest::ThreadLoadedList => {
973 let mut runtime = state.runtime.lock().await;
974 let response = runtime
975 .handle_thread(codewhale_protocol::ThreadRequest::List(
976 codewhale_protocol::ThreadListParams {
977 include_archived: false,
978 limit: Some(50),
979 },
980 ))
981 .await;
982 match response {
983 Ok(thread_resp) => AppResponse {
984 ok: true,
985 data: json!({ "threads": thread_resp.threads }),
986 events: thread_resp.events,
987 },
988 Err(err) => AppResponse {
989 ok: false,
990 data: json!({ "error": err.to_string() }),
991 events: Vec::new(),
992 },
993 }
994 }
995 }
996}
997
998async fn persist_config(state: &AppState, config: codewhale_config::ConfigToml) -> Result<()> {
999 if state.config_path.is_none() {
1000 return Ok(());
1001 }
1002 let mut store = ConfigStore::load(state.config_path.clone())?;
1003 store.config = config;
1004 store.save()
1005}
1006
1007#[cfg(test)]
1008mod tests {
1009 use super::*;
1010 use axum::body::{Body, to_bytes};
1011 use codewhale_protocol::AppRequest;
1012 use std::fs;
1013 use tower::ServiceExt;
1014
1015 fn app_with_config(auth_token: Option<&str>) -> (Router, tempfile::TempDir) {
1016 let tmp = tempfile::tempdir().expect("tempdir");
1017 let config_path = tmp.path().join("config.toml");
1018 fs::write(&config_path, "api_key = \"sk-deepseek-secret\"\n").expect("write config");
1019 let state = build_state(
1020 Some(config_path),
1021 auth_token.map(std::string::ToString::to_string),
1022 )
1023 .expect("state");
1024 (app_router(state, &[]), tmp)
1025 }
1026
1027 async fn response_body_json(response: Response) -> Value {
1028 let bytes = to_bytes(response.into_body(), usize::MAX)
1029 .await
1030 .expect("body bytes");
1031 serde_json::from_slice(&bytes).expect("json response")
1032 }
1033
1034 #[tokio::test]
1035 async fn http_app_routes_require_bearer_token_when_auth_enabled() {
1036 let (app, _tmp) = app_with_config(Some("test-token"));
1037 let response = app
1038 .oneshot(
1039 Request::builder()
1040 .method(Method::POST)
1041 .uri("/app")
1042 .header(header::CONTENT_TYPE, "application/json")
1043 .body(Body::from(
1044 serde_json::to_vec(&AppRequest::ConfigGet {
1045 key: "api_key".to_string(),
1046 })
1047 .expect("request json"),
1048 ))
1049 .expect("request"),
1050 )
1051 .await
1052 .expect("response");
1053
1054 assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
1055 }
1056
1057 #[tokio::test]
1058 async fn http_config_get_redacts_sensitive_values_after_auth() {
1059 let (app, _tmp) = app_with_config(Some("test-token"));
1060 let response = app
1061 .oneshot(
1062 Request::builder()
1063 .method(Method::POST)
1064 .uri("/app")
1065 .header(header::AUTHORIZATION, "Bearer test-token")
1066 .header(header::CONTENT_TYPE, "application/json")
1067 .body(Body::from(
1068 serde_json::to_vec(&AppRequest::ConfigGet {
1069 key: "api_key".to_string(),
1070 })
1071 .expect("request json"),
1072 ))
1073 .expect("request"),
1074 )
1075 .await
1076 .expect("response");
1077
1078 assert_eq!(response.status(), StatusCode::OK);
1079 let body = response_body_json(response).await;
1080 assert_eq!(body["data"]["value"], "sk-d***cret");
1081 }
1082
1083 #[tokio::test]
1084 async fn cors_does_not_allow_arbitrary_origins() {
1085 let (app, _tmp) = app_with_config(Some("test-token"));
1086 let response = app
1087 .oneshot(
1088 Request::builder()
1089 .method(Method::GET)
1090 .uri("/healthz")
1091 .header(header::ORIGIN, "https://attacker.example")
1092 .body(Body::empty())
1093 .expect("request"),
1094 )
1095 .await
1096 .expect("response");
1097
1098 assert_eq!(response.status(), StatusCode::OK);
1099 assert!(
1100 response
1101 .headers()
1102 .get(header::ACCESS_CONTROL_ALLOW_ORIGIN)
1103 .is_none()
1104 );
1105 }
1106
1107 #[tokio::test]
1108 async fn build_state_loads_permissions_into_runtime_policy() {
1109 let tmp = tempfile::tempdir().expect("tempdir");
1110 let config_path = tmp.path().join("config.toml");
1111 fs::write(&config_path, "api_key = \"sk-deepseek-secret\"\n").expect("write config");
1112 fs::write(
1113 tmp.path().join("permissions.toml"),
1114 r#"
1115 [[rules]]
1116 tool = "exec_shell"
1117 command = "cargo test"
1118 "#,
1119 )
1120 .expect("write permissions");
1121
1122 let state = build_state(Some(config_path), None).expect("state");
1123 let runtime = state.runtime.lock().await;
1124 let decision = runtime
1125 .exec_policy
1126 .check(codewhale_execpolicy::ExecPolicyContext {
1127 command: "cargo test --workspace",
1128 cwd: "/workspace",
1129 tool: Some("exec_shell"),
1130 path: None,
1131 ask_for_approval: codewhale_execpolicy::AskForApproval::UnlessTrusted,
1132 sandbox_mode: Some("workspace-write"),
1133 })
1134 .expect("policy check");
1135
1136 assert!(decision.allow);
1137 assert!(decision.requires_approval);
1138 assert_eq!(
1139 decision.matched_rule.as_deref(),
1140 Some("tool=exec_shell command=cargo test")
1141 );
1142 }
1143
1144 #[test]
1145 fn non_loopback_bind_without_auth_fails_fast() {
1146 let options = AppServerOptions {
1147 listen: "0.0.0.0:8787".parse().expect("socket addr"),
1148 config_path: None,
1149 auth_token: None,
1150 insecure_no_auth: true,
1151 cors_origins: Vec::new(),
1152 };
1153
1154 let err = resolve_auth_token(&options).expect_err("non-loopback unauth should fail");
1155 assert!(
1156 err.to_string()
1157 .contains("refusing unauthenticated app-server bind")
1158 );
1159 }
1160
1161 #[tokio::test]
1162 async fn stdio_transport_keeps_raw_config_get_for_legacy_clients() {
1163 let tmp = tempfile::tempdir().expect("tempdir");
1164 let config_path = tmp.path().join("config.toml");
1165 fs::write(&config_path, "").expect("write config");
1166 let state = build_state(Some(config_path), None).expect("state");
1167 {
1168 let mut cfg = state.config.write().await;
1169 cfg.api_key = Some("sk-deepseek-secret".to_string());
1170 }
1171
1172 let response = process_app_request(
1173 &state,
1174 AppRequest::ConfigGet {
1175 key: "api_key".to_string(),
1176 },
1177 AppTransport::Stdio,
1178 )
1179 .await;
1180
1181 assert_eq!(response.data["value"], "sk-deepseek-secret");
1182 }
1183
1184 #[tokio::test]
1185 async fn stdio_thread_goal_methods_round_trip_persisted_goal() {
1186 let tmp = tempfile::tempdir().expect("tempdir");
1187 let config_path = tmp.path().join("config.toml");
1188 fs::write(&config_path, "").expect("write config");
1189 let state = build_state(Some(config_path), None).expect("state");
1190
1191 let capabilities = dispatch_stdio_request(&state, "thread/capabilities", json!({}))
1192 .await
1193 .expect("thread capabilities");
1194 assert!(
1195 capabilities.result["methods"]
1196 .as_array()
1197 .expect("methods")
1198 .iter()
1199 .any(|method| method == "thread/goal/set")
1200 );
1201
1202 let started = dispatch_stdio_request(&state, "thread/start", json!({}))
1203 .await
1204 .expect("start thread");
1205 let thread_id = started.result["thread_id"]
1206 .as_str()
1207 .expect("thread id")
1208 .to_string();
1209
1210 let set = dispatch_stdio_request(
1211 &state,
1212 "thread/goal/set",
1213 json!({
1214 "thread_id": thread_id,
1215 "objective": "Release 0.8.59",
1216 "token_budget": 59000
1217 }),
1218 )
1219 .await
1220 .expect("set goal");
1221 assert_eq!(set.result["status"], "ok");
1222 assert_eq!(set.result["goal"]["objective"], "Release 0.8.59");
1223 assert_eq!(set.result["goal"]["status"], "active");
1224
1225 let got = dispatch_stdio_request(
1226 &state,
1227 "thread/goal/get",
1228 json!({
1229 "thread_id": thread_id
1230 }),
1231 )
1232 .await
1233 .expect("get goal");
1234 assert_eq!(got.result["goal"]["token_budget"], 59000);
1235
1236 let cleared = dispatch_stdio_request(
1237 &state,
1238 "thread/goal/clear",
1239 json!({
1240 "thread_id": thread_id
1241 }),
1242 )
1243 .await
1244 .expect("clear goal");
1245 assert_eq!(cleared.result["status"], "cleared");
1246 assert_eq!(cleared.result["data"]["cleared"], true);
1247 }
1248
1249 const EXPECTED_CAPABILITY_METHODS: &[&str] = &[
1258 "healthz",
1259 "thread/capabilities",
1260 "thread/request",
1261 "thread/create",
1262 "thread/start",
1263 "thread/resume",
1264 "thread/fork",
1265 "thread/list",
1266 "thread/read",
1267 "thread/set_name",
1268 "thread/goal/set",
1269 "thread/goal/get",
1270 "thread/goal/clear",
1271 "thread/archive",
1272 "thread/unarchive",
1273 "thread/message",
1274 "app/capabilities",
1275 "app/request",
1276 "app/config/get",
1277 "app/config/set",
1278 "app/config/unset",
1279 "app/config/list",
1280 "app/models",
1281 "app/thread_loaded_list",
1282 "prompt/capabilities",
1283 "prompt/request",
1284 "prompt/run",
1285 "shutdown",
1286 ];
1287
1288 fn capability_test_state() -> (AppState, tempfile::TempDir) {
1289 let tmp = tempfile::tempdir().expect("tempdir");
1290 let config_path = tmp.path().join("config.toml");
1291 fs::write(&config_path, "").expect("write config");
1292 let state = build_state(Some(config_path), None).expect("state");
1293 (state, tmp)
1294 }
1295
1296 #[tokio::test]
1297 async fn capabilities_method_set_is_stable() {
1298 let (state, _tmp) = capability_test_state();
1299 let caps = dispatch_stdio_request(&state, "capabilities", json!({}))
1300 .await
1301 .expect("capabilities dispatch");
1302 let methods: Vec<String> = caps.result["methods"]
1303 .as_array()
1304 .expect("methods array")
1305 .iter()
1306 .map(|m| m.as_str().expect("method string").to_string())
1307 .collect();
1308 assert_eq!(
1309 methods, EXPECTED_CAPABILITY_METHODS,
1310 "app-server stdio capability set drifted; update the dispatcher, this \
1311 snapshot, and docs/RUNTIME_API.md together"
1312 );
1313 }
1314
1315 #[tokio::test]
1316 async fn every_advertised_capability_is_dispatchable() {
1317 let (state, _tmp) = capability_test_state();
1318 for method in EXPECTED_CAPABILITY_METHODS {
1322 if let Err(err) = dispatch_stdio_request(&state, method, json!({})).await {
1323 assert_ne!(
1324 err.code,
1325 JsonRpcError::method_not_found(method).code,
1326 "advertised capability `{method}` is not dispatchable"
1327 );
1328 }
1329 }
1330 }
1331
1332 #[test]
1335 fn auth_token_empty_string_fails() {
1336 let options = AppServerOptions {
1337 listen: "127.0.0.1:0".parse().expect("addr"),
1338 config_path: None,
1339 auth_token: Some(" ".to_string()),
1340 insecure_no_auth: false,
1341 cors_origins: Vec::new(),
1342 };
1343 let err = resolve_auth_token(&options).expect_err("empty token should fail");
1344 assert!(err.to_string().contains("cannot be empty"));
1345 }
1346
1347 #[test]
1348 fn auth_token_generated_when_none_provided() {
1349 let options = AppServerOptions {
1350 listen: "127.0.0.1:0".parse().expect("addr"),
1351 config_path: None,
1352 auth_token: None,
1353 insecure_no_auth: false,
1354 cors_origins: Vec::new(),
1355 };
1356 let token = resolve_auth_token(&options).unwrap();
1357 assert!(token.is_some());
1358 assert!(token.unwrap().starts_with("cwapp_"));
1359 }
1360
1361 #[test]
1362 fn auth_token_explicit_is_preserved() {
1363 let options = AppServerOptions {
1364 listen: "127.0.0.1:0".parse().expect("addr"),
1365 config_path: None,
1366 auth_token: Some("my-secret".to_string()),
1367 insecure_no_auth: false,
1368 cors_origins: Vec::new(),
1369 };
1370 let token = resolve_auth_token(&options).unwrap();
1371 assert_eq!(token.as_deref(), Some("my-secret"));
1372 }
1373
1374 #[test]
1375 fn insecure_no_auth_on_loopback_returns_none() {
1376 let options = AppServerOptions {
1377 listen: "127.0.0.1:0".parse().expect("addr"),
1378 config_path: None,
1379 auth_token: None,
1380 insecure_no_auth: true,
1381 cors_origins: Vec::new(),
1382 };
1383 let token = resolve_auth_token(&options).unwrap();
1384 assert!(token.is_none());
1385 }
1386
1387 #[test]
1390 fn cors_layer_includes_default_origins() {
1391 let layer = cors_layer(&[]);
1392 let _ = layer;
1394 }
1395
1396 #[test]
1397 fn cors_layer_adds_extra_origins() {
1398 let extras = vec!["https://example.com".to_string()];
1399 let layer = cors_layer(&extras);
1400 let _ = layer;
1401 }
1402
1403 #[test]
1404 fn cors_layer_skips_empty_origins() {
1405 let extras = vec!["".to_string(), " ".to_string()];
1406 let layer = cors_layer(&extras);
1407 let _ = layer;
1408 }
1409
1410 #[test]
1413 fn params_or_object_returns_object_for_null() {
1414 let result = params_or_object(Value::Null);
1415 assert_eq!(result, json!({}));
1416 }
1417
1418 #[test]
1419 fn params_or_object_passthrough_for_non_null() {
1420 let input = json!({"key": "value"});
1421 let result = params_or_object(input.clone());
1422 assert_eq!(result, input);
1423 }
1424
1425 #[test]
1426 fn jsonrpc_result_format() {
1427 let result = jsonrpc_result(Some(json!(1)), json!({"ok": true}));
1428 assert_eq!(result["jsonrpc"], "2.0");
1429 assert_eq!(result["id"], 1);
1430 assert_eq!(result["result"]["ok"], true);
1431 }
1432
1433 #[test]
1434 fn jsonrpc_result_null_id() {
1435 let result = jsonrpc_result(None, json!(null));
1436 assert_eq!(result["id"], Value::Null);
1437 }
1438
1439 #[test]
1440 fn jsonrpc_error_format() {
1441 let err = jsonrpc_error(Some(json!(2)), JsonRpcError::internal("oops"));
1442 assert_eq!(err["jsonrpc"], "2.0");
1443 assert_eq!(err["id"], 2);
1444 assert_eq!(err["error"]["code"], -32603);
1445 assert_eq!(err["error"]["message"], "oops");
1446 }
1447
1448 #[test]
1449 fn jsonrpc_error_codes() {
1450 assert_eq!(JsonRpcError::parse_error("").code, -32700);
1451 assert_eq!(JsonRpcError::invalid_request("").code, -32600);
1452 assert_eq!(JsonRpcError::method_not_found("x").code, -32601);
1453 assert_eq!(JsonRpcError::invalid_params("").code, -32602);
1454 assert_eq!(JsonRpcError::internal("").code, -32603);
1455 }
1456
1457 #[test]
1460 fn app_server_options_debug_does_not_leak_token() {
1461 let options = AppServerOptions {
1462 listen: "127.0.0.1:8080".parse().expect("addr"),
1463 config_path: None,
1464 auth_token: Some("secret-token".to_string()),
1465 insecure_no_auth: false,
1466 cors_origins: vec!["https://example.com".to_string()],
1467 };
1468 let debug = format!("{options:?}");
1469 assert!(!debug.contains("secret-token"));
1470 assert!(debug.contains("<redacted>"));
1471 assert!(debug.contains("8080"));
1472 }
1473
1474 #[test]
1477 fn default_cors_origins_include_common_dev_ports() {
1478 assert!(DEFAULT_CORS_ORIGINS.contains(&"http://localhost:3000"));
1479 assert!(DEFAULT_CORS_ORIGINS.contains(&"http://localhost:5173"));
1480 assert!(DEFAULT_CORS_ORIGINS.contains(&"tauri://localhost"));
1481 }
1482}