1use anyhow::Result;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::io::{self, BufRead, Write};
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::sync::Mutex;
13
14use crate::commands::spawn::terminal::Harness;
15use crate::extensions::runner::{spawn_agent, AgentEvent, SpawnConfig};
16
17use super::types::*;
18
19#[derive(Debug, Clone)]
21pub struct RpcServerConfig {
22 pub working_dir: PathBuf,
24 pub default_harness: Harness,
26 pub default_model: Option<String>,
28 pub project_root: Option<PathBuf>,
30}
31
32impl Default for RpcServerConfig {
33 fn default() -> Self {
34 Self {
35 working_dir: std::env::current_dir().unwrap_or_default(),
36 default_harness: Harness::default(),
37 default_model: None,
38 project_root: None,
39 }
40 }
41}
42
43pub struct RpcServer {
45 config: RpcServerConfig,
46 active_agents: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
48 event_tx: mpsc::Sender<AgentEvent>,
50 event_rx: mpsc::Receiver<AgentEvent>,
51}
52
53impl RpcServer {
54 pub fn new(config: RpcServerConfig) -> Self {
56 let (event_tx, event_rx) = mpsc::channel(1000);
57 Self {
58 config,
59 active_agents: Arc::new(Mutex::new(HashMap::new())),
60 event_tx,
61 event_rx,
62 }
63 }
64
65 pub async fn run(&mut self) -> Result<()> {
70 self.emit_notification(RpcNotification::server_ready(env!("CARGO_PKG_VERSION")))?;
72
73 let stdin = io::stdin();
74 let reader = stdin.lock();
75
76 let event_rx = std::mem::replace(&mut self.event_rx, mpsc::channel(1).1);
78 let event_forwarder = tokio::spawn(Self::forward_events(event_rx));
79
80 for line in reader.lines() {
82 let line = match line {
83 Ok(l) => l,
84 Err(e) => {
85 eprintln!("Error reading stdin: {}", e);
86 break;
87 }
88 };
89
90 if line.trim().is_empty() {
92 continue;
93 }
94
95 let request: RpcRequest = match serde_json::from_str(&line) {
97 Ok(req) => req,
98 Err(e) => {
99 let id = Self::extract_id(&line).unwrap_or(RpcId::Null);
101 self.emit_response(RpcResponse::error(
102 id,
103 RpcError::parse_error(&e.to_string()),
104 ))?;
105 continue;
106 }
107 };
108
109 if request.jsonrpc != JSONRPC_VERSION {
111 if let Some(id) = request.id.clone() {
112 self.emit_response(RpcResponse::error(
113 id,
114 RpcError::invalid_request("Invalid JSON-RPC version"),
115 ))?;
116 }
117 continue;
118 }
119
120 let should_shutdown = self.handle_request(request).await?;
122 if should_shutdown {
123 break;
124 }
125 }
126
127 self.emit_notification(RpcNotification::server_shutdown())?;
129
130 event_forwarder.abort();
132
133 Ok(())
134 }
135
136 async fn handle_request(&mut self, request: RpcRequest) -> Result<bool> {
138 let id = request.id.clone();
139 let method = request.method.as_str();
140
141 match method {
142 "ping" => {
143 if let Some(id) = id {
144 self.emit_response(RpcResponse::success(
145 id,
146 serde_json::json!({"pong": true}),
147 ))?;
148 }
149 }
150
151 "shutdown" => {
152 if let Some(id) = id {
153 self.emit_response(RpcResponse::success(
154 id,
155 serde_json::json!({"status": "shutting_down"}),
156 ))?;
157 }
158 return Ok(true);
159 }
160
161 "spawn" => {
162 let result = self.handle_spawn(request.params).await;
163 if let Some(id) = id {
164 match result {
165 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
166 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
167 }
168 }
169 }
170
171 "spawn_task" => {
172 let result = self.handle_spawn_task(request.params).await;
173 if let Some(id) = id {
174 match result {
175 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
176 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
177 }
178 }
179 }
180
181 "cancel" => {
182 let result = self.handle_cancel(request.params).await;
183 if let Some(id) = id {
184 match result {
185 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
186 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
187 }
188 }
189 }
190
191 "list_agents" => {
192 let result = self.handle_list_agents().await;
193 if let Some(id) = id {
194 self.emit_response(RpcResponse::success(id, result))?;
195 }
196 }
197
198 "list_tasks" => {
199 let result = self.handle_list_tasks(request.params).await;
200 if let Some(id) = id {
201 match result {
202 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
203 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
204 }
205 }
206 }
207
208 "get_task" => {
209 let result = self.handle_get_task(request.params).await;
210 if let Some(id) = id {
211 match result {
212 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
213 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
214 }
215 }
216 }
217
218 "set_status" => {
219 let result = self.handle_set_status(request.params).await;
220 if let Some(id) = id {
221 match result {
222 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
223 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
224 }
225 }
226 }
227
228 "next_task" => {
229 let result = self.handle_next_task(request.params).await;
230 if let Some(id) = id {
231 match result {
232 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
233 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
234 }
235 }
236 }
237
238 _ => {
239 if let Some(id) = id {
240 self.emit_response(RpcResponse::error(id, RpcError::method_not_found(method)))?;
241 }
242 }
243 }
244
245 Ok(false)
246 }
247
248 async fn handle_spawn(&self, params: Value) -> Result<Value, RpcError> {
250 let params: SpawnParams =
251 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
252
253 let harness = if let Some(h) = params.harness {
255 Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
256 } else {
257 self.config.default_harness
258 };
259
260 let working_dir = params
262 .working_dir
263 .map(PathBuf::from)
264 .unwrap_or_else(|| self.config.working_dir.clone());
265
266 let config = SpawnConfig {
267 task_id: params.task_id.clone(),
268 prompt: params.prompt,
269 working_dir,
270 harness,
271 model: params.model.or_else(|| self.config.default_model.clone()),
272 };
273
274 let event_tx = self.event_tx.clone();
276 let task_id = params.task_id.clone();
277
278 match spawn_agent(config, event_tx).await {
279 Ok(handle) => {
280 let mut agents = self.active_agents.lock().await;
282 agents.insert(task_id.clone(), tokio::spawn(async move {
283 let _ = handle.await;
284 }));
285
286 Ok(serde_json::json!({
287 "status": "spawned",
288 "task_id": task_id
289 }))
290 }
291 Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
292 }
293 }
294
295 async fn handle_spawn_task(&self, params: Value) -> Result<Value, RpcError> {
297 let params: SpawnTaskParams =
298 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
299
300 let storage = crate::storage::Storage::new(self.config.project_root.clone());
302
303 let tag = params
305 .tag
306 .or_else(|| storage.get_active_group().ok().flatten());
307 let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
308
309 let group = storage
311 .load_group(&tag)
312 .map_err(|e: anyhow::Error| RpcError::task_not_found(&e.to_string()))?;
313
314 let task = group
316 .tasks
317 .iter()
318 .find(|t| t.id == params.task_id)
319 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
320
321 let prompt = crate::commands::spawn::agent::generate_prompt(task, &tag);
323
324 let harness = if let Some(h) = params.harness {
326 Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
327 } else if let Some(agent_type) = &task.agent_type {
328 if let Some(agent_def) =
330 crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
331 {
332 agent_def.harness().unwrap_or(self.config.default_harness)
333 } else {
334 self.config.default_harness
335 }
336 } else {
337 self.config.default_harness
338 };
339
340 let model = params.model.or_else(|| {
342 if let Some(agent_type) = &task.agent_type {
343 if let Some(agent_def) =
344 crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
345 {
346 return agent_def.model().map(String::from);
347 }
348 }
349 self.config.default_model.clone()
350 });
351
352 let config = SpawnConfig {
353 task_id: params.task_id.clone(),
354 prompt,
355 working_dir: self.config.working_dir.clone(),
356 harness,
357 model,
358 };
359
360 let event_tx = self.event_tx.clone();
362 let task_id = params.task_id.clone();
363
364 match spawn_agent(config, event_tx).await {
365 Ok(handle) => {
366 let mut agents = self.active_agents.lock().await;
368 agents.insert(task_id.clone(), tokio::spawn(async move {
369 let _ = handle.await;
370 }));
371
372 Ok(serde_json::json!({
373 "status": "spawned",
374 "task_id": task_id,
375 "tag": tag
376 }))
377 }
378 Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
379 }
380 }
381
382 async fn handle_cancel(&self, params: Value) -> Result<Value, RpcError> {
384 let task_id = params
385 .get("task_id")
386 .and_then(|v| v.as_str())
387 .ok_or_else(|| RpcError::invalid_params("Missing task_id"))?;
388
389 let mut agents = self.active_agents.lock().await;
390
391 if let Some(handle) = agents.remove(task_id) {
392 handle.abort();
393 Ok(serde_json::json!({
394 "status": "cancelled",
395 "task_id": task_id
396 }))
397 } else {
398 Err(RpcError::task_not_found(task_id))
399 }
400 }
401
402 async fn handle_list_agents(&self) -> Value {
404 let agents = self.active_agents.lock().await;
405 let agent_ids: Vec<&String> = agents.keys().collect();
406 serde_json::json!({
407 "agents": agent_ids,
408 "count": agent_ids.len()
409 })
410 }
411
412 async fn handle_list_tasks(&self, params: Value) -> Result<Value, RpcError> {
414 let params: ListTasksParams = serde_json::from_value(params).unwrap_or_default();
415
416 let storage = crate::storage::Storage::new(self.config.project_root.clone());
417
418 let tag = params
419 .tag
420 .or_else(|| storage.get_active_group().ok().flatten());
421 let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
422
423 let group = storage
424 .load_group(&tag)
425 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
426
427 let tasks: Vec<Value> = group
428 .tasks
429 .iter()
430 .filter(|t| {
431 params
432 .status
433 .as_ref()
434 .map(|s| t.status.as_str().to_lowercase() == s.to_lowercase())
435 .unwrap_or(true)
436 })
437 .map(|t| {
438 serde_json::json!({
439 "id": t.id,
440 "title": t.title,
441 "status": t.status.as_str(),
442 "complexity": t.complexity,
443 "dependencies": t.dependencies
444 })
445 })
446 .collect();
447
448 Ok(serde_json::json!({
449 "tag": tag,
450 "tasks": tasks,
451 "count": tasks.len()
452 }))
453 }
454
455 async fn handle_get_task(&self, params: Value) -> Result<Value, RpcError> {
457 let params: GetTaskParams =
458 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
459
460 let storage = crate::storage::Storage::new(self.config.project_root.clone());
461
462 let tag = params
463 .tag
464 .or_else(|| storage.get_active_group().ok().flatten());
465 let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
466
467 let group = storage
468 .load_group(&tag)
469 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
470
471 let task = group
472 .tasks
473 .iter()
474 .find(|t| t.id == params.task_id)
475 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
476
477 Ok(serde_json::json!({
478 "id": task.id,
479 "title": task.title,
480 "description": task.description,
481 "status": task.status.as_str(),
482 "complexity": task.complexity,
483 "priority": format!("{:?}", task.priority).to_lowercase(),
484 "dependencies": task.dependencies,
485 "agent_type": task.agent_type,
486 "assigned_to": task.assigned_to
487 }))
488 }
489
490 async fn handle_set_status(&self, params: Value) -> Result<Value, RpcError> {
492 let params: SetStatusParams =
493 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
494
495 let storage = crate::storage::Storage::new(self.config.project_root.clone());
496
497 let tag = params
498 .tag
499 .or_else(|| storage.get_active_group().ok().flatten());
500 let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
501
502 let mut group = storage
503 .load_group(&tag)
504 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
505
506 let task = group
508 .tasks
509 .iter_mut()
510 .find(|t| t.id == params.task_id)
511 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
512
513 let new_status = crate::models::task::TaskStatus::from_str(¶ms.status)
515 .ok_or_else(|| RpcError::invalid_params(&format!("Invalid status: {}", params.status)))?;
516
517 let old_status = task.status.as_str().to_string();
518 task.status = new_status;
519
520 storage
522 .update_group(&tag, &group)
523 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
524
525 Ok(serde_json::json!({
526 "task_id": params.task_id,
527 "old_status": old_status,
528 "new_status": params.status
529 }))
530 }
531
532 async fn handle_next_task(&self, params: Value) -> Result<Value, RpcError> {
534 let params: NextTaskParams = serde_json::from_value(params).unwrap_or_default();
535
536 let storage = crate::storage::Storage::new(self.config.project_root.clone());
537
538 let result = crate::commands::helpers::find_next_task(
540 &storage,
541 params.tag.as_deref(),
542 params.all_tags,
543 );
544
545 match result {
546 Some((task, tag)) => Ok(serde_json::json!({
547 "task_id": task.id,
548 "title": task.title,
549 "tag": tag,
550 "complexity": task.complexity
551 })),
552 None => Ok(serde_json::json!({
553 "task_id": null,
554 "message": "No tasks available"
555 })),
556 }
557 }
558
559 fn emit_response(&self, response: RpcResponse) -> Result<()> {
561 let json = serde_json::to_string(&response)?;
562 let mut stdout = io::stdout().lock();
563 writeln!(stdout, "{}", json)?;
564 stdout.flush()?;
565 Ok(())
566 }
567
568 fn emit_notification(&self, notification: RpcNotification) -> Result<()> {
570 let json = serde_json::to_string(¬ification)?;
571 let mut stdout = io::stdout().lock();
572 writeln!(stdout, "{}", json)?;
573 stdout.flush()?;
574 Ok(())
575 }
576
577 fn extract_id(json_str: &str) -> Option<RpcId> {
579 let parsed: Result<Value, _> = serde_json::from_str(json_str);
581 if let Ok(v) = parsed {
582 if let Some(id) = v.get("id") {
583 if let Some(n) = id.as_i64() {
584 return Some(RpcId::Number(n));
585 }
586 if let Some(s) = id.as_str() {
587 return Some(RpcId::String(s.to_string()));
588 }
589 }
590 }
591 None
592 }
593
594 async fn forward_events(mut event_rx: mpsc::Receiver<AgentEvent>) {
596 while let Some(event) = event_rx.recv().await {
597 let notification = match event {
598 AgentEvent::Started { task_id } => RpcNotification::agent_started(&task_id),
599 AgentEvent::Output { task_id, line } => RpcNotification::agent_output(&task_id, &line),
600 AgentEvent::Completed { result } => RpcNotification::agent_completed(
601 &result.task_id,
602 result.success,
603 result.exit_code,
604 result.duration_ms,
605 ),
606 AgentEvent::SpawnFailed { task_id, error } => {
607 RpcNotification::agent_spawn_failed(&task_id, &error)
608 }
609 };
610
611 if let Ok(json) = serde_json::to_string(¬ification) {
613 let mut stdout = io::stdout().lock();
614 let _ = writeln!(stdout, "{}", json);
615 let _ = stdout.flush();
616 }
617 }
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624
625 #[test]
626 fn test_server_config_default() {
627 let config = RpcServerConfig::default();
628 assert_eq!(config.default_harness, Harness::default());
629 }
630
631 #[test]
632 fn test_extract_id_number() {
633 let id = RpcServer::extract_id(r#"{"id": 42, "invalid": true}"#);
634 assert_eq!(id, Some(RpcId::Number(42)));
635 }
636
637 #[test]
638 fn test_extract_id_string() {
639 let id = RpcServer::extract_id(r#"{"id": "abc-123"}"#);
640 assert_eq!(id, Some(RpcId::String("abc-123".to_string())));
641 }
642}