1use anyhow::Result;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::io::{self, BufRead, Write};
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::sync::Mutex;
13
14use crate::commands::spawn::terminal::Harness;
15use crate::extensions::runner::{spawn_agent, AgentEvent, SpawnConfig};
16
17use super::types::*;
18
19#[derive(Debug, Clone)]
21pub struct RpcServerConfig {
22 pub working_dir: PathBuf,
24 pub default_harness: Harness,
26 pub default_model: Option<String>,
28 pub project_root: Option<PathBuf>,
30}
31
32impl Default for RpcServerConfig {
33 fn default() -> Self {
34 Self {
35 working_dir: std::env::current_dir().unwrap_or_default(),
36 default_harness: Harness::default(),
37 default_model: None,
38 project_root: None,
39 }
40 }
41}
42
43pub struct RpcServer {
45 config: RpcServerConfig,
46 active_agents: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
48 event_tx: mpsc::Sender<AgentEvent>,
50 event_rx: mpsc::Receiver<AgentEvent>,
51}
52
53impl RpcServer {
54 pub fn new(config: RpcServerConfig) -> Self {
56 let (event_tx, event_rx) = mpsc::channel(1000);
57 Self {
58 config,
59 active_agents: Arc::new(Mutex::new(HashMap::new())),
60 event_tx,
61 event_rx,
62 }
63 }
64
65 pub async fn run(&mut self) -> Result<()> {
70 self.emit_notification(RpcNotification::server_ready(env!("CARGO_PKG_VERSION")))?;
72
73 let stdin = io::stdin();
74 let reader = stdin.lock();
75
76 let event_rx = std::mem::replace(&mut self.event_rx, mpsc::channel(1).1);
78 let event_forwarder = tokio::spawn(Self::forward_events(event_rx));
79
80 for line in reader.lines() {
82 let line = match line {
83 Ok(l) => l,
84 Err(e) => {
85 eprintln!("Error reading stdin: {}", e);
86 break;
87 }
88 };
89
90 if line.trim().is_empty() {
92 continue;
93 }
94
95 let request: RpcRequest = match serde_json::from_str(&line) {
97 Ok(req) => req,
98 Err(e) => {
99 let id = Self::extract_id(&line).unwrap_or(RpcId::Null);
101 self.emit_response(RpcResponse::error(
102 id,
103 RpcError::parse_error(&e.to_string()),
104 ))?;
105 continue;
106 }
107 };
108
109 if request.jsonrpc != JSONRPC_VERSION {
111 if let Some(id) = request.id.clone() {
112 self.emit_response(RpcResponse::error(
113 id,
114 RpcError::invalid_request("Invalid JSON-RPC version"),
115 ))?;
116 }
117 continue;
118 }
119
120 let should_shutdown = self.handle_request(request).await?;
122 if should_shutdown {
123 break;
124 }
125 }
126
127 self.emit_notification(RpcNotification::server_shutdown())?;
129
130 event_forwarder.abort();
132
133 Ok(())
134 }
135
136 async fn handle_request(&mut self, request: RpcRequest) -> Result<bool> {
138 let id = request.id.clone();
139 let method = request.method.as_str();
140
141 match method {
142 "ping" => {
143 if let Some(id) = id {
144 self.emit_response(RpcResponse::success(
145 id,
146 serde_json::json!({"pong": true}),
147 ))?;
148 }
149 }
150
151 "shutdown" => {
152 if let Some(id) = id {
153 self.emit_response(RpcResponse::success(
154 id,
155 serde_json::json!({"status": "shutting_down"}),
156 ))?;
157 }
158 return Ok(true);
159 }
160
161 "spawn" => {
162 let result = self.handle_spawn(request.params).await;
163 if let Some(id) = id {
164 match result {
165 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
166 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
167 }
168 }
169 }
170
171 "spawn_task" => {
172 let result = self.handle_spawn_task(request.params).await;
173 if let Some(id) = id {
174 match result {
175 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
176 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
177 }
178 }
179 }
180
181 "cancel" => {
182 let result = self.handle_cancel(request.params).await;
183 if let Some(id) = id {
184 match result {
185 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
186 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
187 }
188 }
189 }
190
191 "list_agents" => {
192 let result = self.handle_list_agents().await;
193 if let Some(id) = id {
194 self.emit_response(RpcResponse::success(id, result))?;
195 }
196 }
197
198 "list_tasks" => {
199 let result = self.handle_list_tasks(request.params).await;
200 if let Some(id) = id {
201 match result {
202 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
203 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
204 }
205 }
206 }
207
208 "get_task" => {
209 let result = self.handle_get_task(request.params).await;
210 if let Some(id) = id {
211 match result {
212 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
213 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
214 }
215 }
216 }
217
218 "set_status" => {
219 let result = self.handle_set_status(request.params).await;
220 if let Some(id) = id {
221 match result {
222 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
223 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
224 }
225 }
226 }
227
228 "next_task" => {
229 let result = self.handle_next_task(request.params).await;
230 if let Some(id) = id {
231 match result {
232 Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
233 Err(e) => self.emit_response(RpcResponse::error(id, e))?,
234 }
235 }
236 }
237
238 _ => {
239 if let Some(id) = id {
240 self.emit_response(RpcResponse::error(id, RpcError::method_not_found(method)))?;
241 }
242 }
243 }
244
245 Ok(false)
246 }
247
248 async fn handle_spawn(&self, params: Value) -> Result<Value, RpcError> {
250 let params: SpawnParams =
251 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
252
253 let harness = if let Some(h) = params.harness {
255 Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
256 } else {
257 self.config.default_harness
258 };
259
260 let working_dir = params
262 .working_dir
263 .map(PathBuf::from)
264 .unwrap_or_else(|| self.config.working_dir.clone());
265
266 let config = SpawnConfig {
267 task_id: params.task_id.clone(),
268 prompt: params.prompt,
269 working_dir,
270 harness,
271 model: params.model.or_else(|| self.config.default_model.clone()),
272 };
273
274 let event_tx = self.event_tx.clone();
276 let task_id = params.task_id.clone();
277
278 match spawn_agent(config, event_tx).await {
279 Ok(handle) => {
280 let mut agents = self.active_agents.lock().await;
282 agents.insert(
283 task_id.clone(),
284 tokio::spawn(async move {
285 let _ = handle.await;
286 }),
287 );
288
289 Ok(serde_json::json!({
290 "status": "spawned",
291 "task_id": task_id
292 }))
293 }
294 Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
295 }
296 }
297
298 async fn handle_spawn_task(&self, params: Value) -> Result<Value, RpcError> {
300 let params: SpawnTaskParams =
301 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
302
303 let storage = crate::storage::Storage::new(self.config.project_root.clone());
305
306 let tag = params
308 .tag
309 .or_else(|| storage.get_active_group().ok().flatten());
310 let tag =
311 tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
312
313 let group = storage
315 .load_group(&tag)
316 .map_err(|e: anyhow::Error| RpcError::task_not_found(&e.to_string()))?;
317
318 let task = group
320 .tasks
321 .iter()
322 .find(|t| t.id == params.task_id)
323 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
324
325 let prompt = crate::commands::spawn::agent::generate_prompt(task, &tag);
327
328 let harness = if let Some(h) = params.harness {
330 Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
331 } else if let Some(agent_type) = &task.agent_type {
332 if let Some(agent_def) =
334 crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
335 {
336 agent_def.harness().unwrap_or(self.config.default_harness)
337 } else {
338 self.config.default_harness
339 }
340 } else {
341 self.config.default_harness
342 };
343
344 let model = params.model.or_else(|| {
346 if let Some(agent_type) = &task.agent_type {
347 if let Some(agent_def) =
348 crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
349 {
350 return agent_def.model().map(String::from);
351 }
352 }
353 self.config.default_model.clone()
354 });
355
356 let config = SpawnConfig {
357 task_id: params.task_id.clone(),
358 prompt,
359 working_dir: self.config.working_dir.clone(),
360 harness,
361 model,
362 };
363
364 let event_tx = self.event_tx.clone();
366 let task_id = params.task_id.clone();
367
368 match spawn_agent(config, event_tx).await {
369 Ok(handle) => {
370 let mut agents = self.active_agents.lock().await;
372 agents.insert(
373 task_id.clone(),
374 tokio::spawn(async move {
375 let _ = handle.await;
376 }),
377 );
378
379 Ok(serde_json::json!({
380 "status": "spawned",
381 "task_id": task_id,
382 "tag": tag
383 }))
384 }
385 Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
386 }
387 }
388
389 async fn handle_cancel(&self, params: Value) -> Result<Value, RpcError> {
391 let task_id = params
392 .get("task_id")
393 .and_then(|v| v.as_str())
394 .ok_or_else(|| RpcError::invalid_params("Missing task_id"))?;
395
396 let mut agents = self.active_agents.lock().await;
397
398 if let Some(handle) = agents.remove(task_id) {
399 handle.abort();
400 Ok(serde_json::json!({
401 "status": "cancelled",
402 "task_id": task_id
403 }))
404 } else {
405 Err(RpcError::task_not_found(task_id))
406 }
407 }
408
409 async fn handle_list_agents(&self) -> Value {
411 let agents = self.active_agents.lock().await;
412 let agent_ids: Vec<&String> = agents.keys().collect();
413 serde_json::json!({
414 "agents": agent_ids,
415 "count": agent_ids.len()
416 })
417 }
418
419 async fn handle_list_tasks(&self, params: Value) -> Result<Value, RpcError> {
421 let params: ListTasksParams = serde_json::from_value(params).unwrap_or_default();
422
423 let storage = crate::storage::Storage::new(self.config.project_root.clone());
424
425 let tag = params
426 .tag
427 .or_else(|| storage.get_active_group().ok().flatten());
428 let tag =
429 tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
430
431 let group = storage
432 .load_group(&tag)
433 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
434
435 let tasks: Vec<Value> = group
436 .tasks
437 .iter()
438 .filter(|t| {
439 params
440 .status
441 .as_ref()
442 .map(|s| t.status.as_str().to_lowercase() == s.to_lowercase())
443 .unwrap_or(true)
444 })
445 .map(|t| {
446 serde_json::json!({
447 "id": t.id,
448 "title": t.title,
449 "status": t.status.as_str(),
450 "complexity": t.complexity,
451 "dependencies": t.dependencies
452 })
453 })
454 .collect();
455
456 Ok(serde_json::json!({
457 "tag": tag,
458 "tasks": tasks,
459 "count": tasks.len()
460 }))
461 }
462
463 async fn handle_get_task(&self, params: Value) -> Result<Value, RpcError> {
465 let params: GetTaskParams =
466 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
467
468 let storage = crate::storage::Storage::new(self.config.project_root.clone());
469
470 let tag = params
471 .tag
472 .or_else(|| storage.get_active_group().ok().flatten());
473 let tag =
474 tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
475
476 let group = storage
477 .load_group(&tag)
478 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
479
480 let task = group
481 .tasks
482 .iter()
483 .find(|t| t.id == params.task_id)
484 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
485
486 Ok(serde_json::json!({
487 "id": task.id,
488 "title": task.title,
489 "description": task.description,
490 "status": task.status.as_str(),
491 "complexity": task.complexity,
492 "priority": format!("{:?}", task.priority).to_lowercase(),
493 "dependencies": task.dependencies,
494 "agent_type": task.agent_type,
495 "assigned_to": task.assigned_to
496 }))
497 }
498
499 async fn handle_set_status(&self, params: Value) -> Result<Value, RpcError> {
501 let params: SetStatusParams =
502 serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
503
504 let storage = crate::storage::Storage::new(self.config.project_root.clone());
505
506 let tag = params
507 .tag
508 .or_else(|| storage.get_active_group().ok().flatten());
509 let tag =
510 tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
511
512 let mut group = storage
513 .load_group(&tag)
514 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
515
516 let task = group
518 .tasks
519 .iter_mut()
520 .find(|t| t.id == params.task_id)
521 .ok_or_else(|| RpcError::task_not_found(¶ms.task_id))?;
522
523 let new_status =
525 crate::models::task::TaskStatus::from_str(¶ms.status).ok_or_else(|| {
526 RpcError::invalid_params(&format!("Invalid status: {}", params.status))
527 })?;
528
529 let old_status = task.status.as_str().to_string();
530 task.status = new_status;
531
532 storage
534 .update_group(&tag, &group)
535 .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
536
537 Ok(serde_json::json!({
538 "task_id": params.task_id,
539 "old_status": old_status,
540 "new_status": params.status
541 }))
542 }
543
544 async fn handle_next_task(&self, params: Value) -> Result<Value, RpcError> {
546 let params: NextTaskParams = serde_json::from_value(params).unwrap_or_default();
547
548 let storage = crate::storage::Storage::new(self.config.project_root.clone());
549
550 let result = crate::commands::helpers::find_next_task(
552 &storage,
553 params.tag.as_deref(),
554 params.all_tags,
555 );
556
557 match result {
558 Some((task, tag)) => Ok(serde_json::json!({
559 "task_id": task.id,
560 "title": task.title,
561 "tag": tag,
562 "complexity": task.complexity
563 })),
564 None => Ok(serde_json::json!({
565 "task_id": null,
566 "message": "No tasks available"
567 })),
568 }
569 }
570
571 fn emit_response(&self, response: RpcResponse) -> Result<()> {
573 let json = serde_json::to_string(&response)?;
574 let mut stdout = io::stdout().lock();
575 writeln!(stdout, "{}", json)?;
576 stdout.flush()?;
577 Ok(())
578 }
579
580 fn emit_notification(&self, notification: RpcNotification) -> Result<()> {
582 let json = serde_json::to_string(¬ification)?;
583 let mut stdout = io::stdout().lock();
584 writeln!(stdout, "{}", json)?;
585 stdout.flush()?;
586 Ok(())
587 }
588
589 fn extract_id(json_str: &str) -> Option<RpcId> {
591 let parsed: Result<Value, _> = serde_json::from_str(json_str);
593 if let Ok(v) = parsed {
594 if let Some(id) = v.get("id") {
595 if let Some(n) = id.as_i64() {
596 return Some(RpcId::Number(n));
597 }
598 if let Some(s) = id.as_str() {
599 return Some(RpcId::String(s.to_string()));
600 }
601 }
602 }
603 None
604 }
605
606 async fn forward_events(mut event_rx: mpsc::Receiver<AgentEvent>) {
608 while let Some(event) = event_rx.recv().await {
609 let notification = match event {
610 AgentEvent::Started { task_id } => RpcNotification::agent_started(&task_id),
611 AgentEvent::Output { task_id, line } => {
612 RpcNotification::agent_output(&task_id, &line)
613 }
614 AgentEvent::Completed { result } => RpcNotification::agent_completed(
615 &result.task_id,
616 result.success,
617 result.exit_code,
618 result.duration_ms,
619 ),
620 AgentEvent::SpawnFailed { task_id, error } => {
621 RpcNotification::agent_spawn_failed(&task_id, &error)
622 }
623 };
624
625 if let Ok(json) = serde_json::to_string(¬ification) {
627 let mut stdout = io::stdout().lock();
628 let _ = writeln!(stdout, "{}", json);
629 let _ = stdout.flush();
630 }
631 }
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638
639 #[test]
640 fn test_server_config_default() {
641 let config = RpcServerConfig::default();
642 assert_eq!(config.default_harness, Harness::default());
643 }
644
645 #[test]
646 fn test_extract_id_number() {
647 let id = RpcServer::extract_id(r#"{"id": 42, "invalid": true}"#);
648 assert_eq!(id, Some(RpcId::Number(42)));
649 }
650
651 #[test]
652 fn test_extract_id_string() {
653 let id = RpcServer::extract_id(r#"{"id": "abc-123"}"#);
654 assert_eq!(id, Some(RpcId::String("abc-123".to_string())));
655 }
656}