scud/rpc/
server.rs

1//! JSON RPC server for IPC communication
2//!
3//! Handles stdin/stdout communication with JSON RPC protocol.
4
5use anyhow::Result;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::io::{self, BufRead, Write};
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::sync::Mutex;
13
14use crate::commands::spawn::terminal::Harness;
15use crate::extensions::runner::{spawn_agent, AgentEvent, SpawnConfig};
16
17use super::types::*;
18
19/// RPC Server configuration
20#[derive(Debug, Clone)]
21pub struct RpcServerConfig {
22    /// Default working directory for agents
23    pub working_dir: PathBuf,
24    /// Default harness to use
25    pub default_harness: Harness,
26    /// Default model to use
27    pub default_model: Option<String>,
28    /// Project root for task storage access
29    pub project_root: Option<PathBuf>,
30}
31
32impl Default for RpcServerConfig {
33    fn default() -> Self {
34        Self {
35            working_dir: std::env::current_dir().unwrap_or_default(),
36            default_harness: Harness::default(),
37            default_model: None,
38            project_root: None,
39        }
40    }
41}
42
43/// JSON RPC Server for subagent communication
44pub struct RpcServer {
45    config: RpcServerConfig,
46    /// Active agent task handles
47    active_agents: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
48    /// Event channel for agent events
49    event_tx: mpsc::Sender<AgentEvent>,
50    event_rx: mpsc::Receiver<AgentEvent>,
51}
52
53impl RpcServer {
54    /// Create a new RPC server with the given configuration
55    pub fn new(config: RpcServerConfig) -> Self {
56        let (event_tx, event_rx) = mpsc::channel(1000);
57        Self {
58            config,
59            active_agents: Arc::new(Mutex::new(HashMap::new())),
60            event_tx,
61            event_rx,
62        }
63    }
64
65    /// Run the RPC server loop
66    ///
67    /// Reads JSON RPC requests from stdin and writes responses/events to stdout.
68    /// This function blocks until a shutdown request is received or stdin closes.
69    pub async fn run(&mut self) -> Result<()> {
70        // Emit server ready notification
71        self.emit_notification(RpcNotification::server_ready(env!("CARGO_PKG_VERSION")))?;
72
73        let stdin = io::stdin();
74        let reader = stdin.lock();
75
76        // Spawn a task to forward agent events to stdout
77        let event_rx = std::mem::replace(&mut self.event_rx, mpsc::channel(1).1);
78        let event_forwarder = tokio::spawn(Self::forward_events(event_rx));
79
80        // Read requests from stdin line by line
81        for line in reader.lines() {
82            let line = match line {
83                Ok(l) => l,
84                Err(e) => {
85                    eprintln!("Error reading stdin: {}", e);
86                    break;
87                }
88            };
89
90            // Skip empty lines
91            if line.trim().is_empty() {
92                continue;
93            }
94
95            // Parse the request
96            let request: RpcRequest = match serde_json::from_str(&line) {
97                Ok(req) => req,
98                Err(e) => {
99                    // Try to extract ID for error response
100                    let id = Self::extract_id(&line).unwrap_or(RpcId::Null);
101                    self.emit_response(RpcResponse::error(
102                        id,
103                        RpcError::parse_error(&e.to_string()),
104                    ))?;
105                    continue;
106                }
107            };
108
109            // Validate JSON RPC version
110            if request.jsonrpc != JSONRPC_VERSION {
111                if let Some(id) = request.id.clone() {
112                    self.emit_response(RpcResponse::error(
113                        id,
114                        RpcError::invalid_request("Invalid JSON-RPC version"),
115                    ))?;
116                }
117                continue;
118            }
119
120            // Handle the request
121            let should_shutdown = self.handle_request(request).await?;
122            if should_shutdown {
123                break;
124            }
125        }
126
127        // Emit shutdown notification
128        self.emit_notification(RpcNotification::server_shutdown())?;
129
130        // Cancel the event forwarder
131        event_forwarder.abort();
132
133        Ok(())
134    }
135
136    /// Handle a single RPC request
137    async fn handle_request(&mut self, request: RpcRequest) -> Result<bool> {
138        let id = request.id.clone();
139        let method = request.method.as_str();
140
141        match method {
142            "ping" => {
143                if let Some(id) = id {
144                    self.emit_response(RpcResponse::success(
145                        id,
146                        serde_json::json!({"pong": true}),
147                    ))?;
148                }
149            }
150
151            "shutdown" => {
152                if let Some(id) = id {
153                    self.emit_response(RpcResponse::success(
154                        id,
155                        serde_json::json!({"status": "shutting_down"}),
156                    ))?;
157                }
158                return Ok(true);
159            }
160
161            "spawn" => {
162                let result = self.handle_spawn(request.params).await;
163                if let Some(id) = id {
164                    match result {
165                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
166                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
167                    }
168                }
169            }
170
171            "spawn_task" => {
172                let result = self.handle_spawn_task(request.params).await;
173                if let Some(id) = id {
174                    match result {
175                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
176                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
177                    }
178                }
179            }
180
181            "cancel" => {
182                let result = self.handle_cancel(request.params).await;
183                if let Some(id) = id {
184                    match result {
185                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
186                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
187                    }
188                }
189            }
190
191            "list_agents" => {
192                let result = self.handle_list_agents().await;
193                if let Some(id) = id {
194                    self.emit_response(RpcResponse::success(id, result))?;
195                }
196            }
197
198            "list_tasks" => {
199                let result = self.handle_list_tasks(request.params).await;
200                if let Some(id) = id {
201                    match result {
202                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
203                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
204                    }
205                }
206            }
207
208            "get_task" => {
209                let result = self.handle_get_task(request.params).await;
210                if let Some(id) = id {
211                    match result {
212                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
213                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
214                    }
215                }
216            }
217
218            "set_status" => {
219                let result = self.handle_set_status(request.params).await;
220                if let Some(id) = id {
221                    match result {
222                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
223                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
224                    }
225                }
226            }
227
228            "next_task" => {
229                let result = self.handle_next_task(request.params).await;
230                if let Some(id) = id {
231                    match result {
232                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
233                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
234                    }
235                }
236            }
237
238            _ => {
239                if let Some(id) = id {
240                    self.emit_response(RpcResponse::error(id, RpcError::method_not_found(method)))?;
241                }
242            }
243        }
244
245        Ok(false)
246    }
247
248    /// Handle "spawn" method - spawn agent with custom prompt
249    async fn handle_spawn(&self, params: Value) -> Result<Value, RpcError> {
250        let params: SpawnParams =
251            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
252
253        // Parse harness
254        let harness = if let Some(h) = params.harness {
255            Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
256        } else {
257            self.config.default_harness
258        };
259
260        // Determine working directory
261        let working_dir = params
262            .working_dir
263            .map(PathBuf::from)
264            .unwrap_or_else(|| self.config.working_dir.clone());
265
266        let config = SpawnConfig {
267            task_id: params.task_id.clone(),
268            prompt: params.prompt,
269            working_dir,
270            harness,
271            model: params.model.or_else(|| self.config.default_model.clone()),
272        };
273
274        // Spawn the agent
275        let event_tx = self.event_tx.clone();
276        let task_id = params.task_id.clone();
277
278        match spawn_agent(config, event_tx).await {
279            Ok(handle) => {
280                // Track the agent handle
281                let mut agents = self.active_agents.lock().await;
282                agents.insert(task_id.clone(), tokio::spawn(async move {
283                    let _ = handle.await;
284                }));
285
286                Ok(serde_json::json!({
287                    "status": "spawned",
288                    "task_id": task_id
289                }))
290            }
291            Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
292        }
293    }
294
295    /// Handle "spawn_task" method - spawn agent for a task in the graph
296    async fn handle_spawn_task(&self, params: Value) -> Result<Value, RpcError> {
297        let params: SpawnTaskParams =
298            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
299
300        // Load task from storage
301        let storage = crate::storage::Storage::new(self.config.project_root.clone());
302
303        // Get the tag
304        let tag = params
305            .tag
306            .or_else(|| storage.get_active_group().ok().flatten());
307        let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
308
309        // Load the task group
310        let group = storage
311            .load_group(&tag)
312            .map_err(|e: anyhow::Error| RpcError::task_not_found(&e.to_string()))?;
313
314        // Find the task
315        let task = group
316            .tasks
317            .iter()
318            .find(|t| t.id == params.task_id)
319            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
320
321        // Generate prompt for the task
322        let prompt = crate::commands::spawn::agent::generate_prompt(task, &tag);
323
324        // Parse harness (prefer params, then task's agent_type, then default)
325        let harness = if let Some(h) = params.harness {
326            Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
327        } else if let Some(agent_type) = &task.agent_type {
328            // Try to load from agent definition
329            if let Some(agent_def) =
330                crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
331            {
332                agent_def.harness().unwrap_or(self.config.default_harness)
333            } else {
334                self.config.default_harness
335            }
336        } else {
337            self.config.default_harness
338        };
339
340        // Get model
341        let model = params.model.or_else(|| {
342            if let Some(agent_type) = &task.agent_type {
343                if let Some(agent_def) =
344                    crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
345                {
346                    return agent_def.model().map(String::from);
347                }
348            }
349            self.config.default_model.clone()
350        });
351
352        let config = SpawnConfig {
353            task_id: params.task_id.clone(),
354            prompt,
355            working_dir: self.config.working_dir.clone(),
356            harness,
357            model,
358        };
359
360        // Spawn the agent
361        let event_tx = self.event_tx.clone();
362        let task_id = params.task_id.clone();
363
364        match spawn_agent(config, event_tx).await {
365            Ok(handle) => {
366                // Track the agent handle
367                let mut agents = self.active_agents.lock().await;
368                agents.insert(task_id.clone(), tokio::spawn(async move {
369                    let _ = handle.await;
370                }));
371
372                Ok(serde_json::json!({
373                    "status": "spawned",
374                    "task_id": task_id,
375                    "tag": tag
376                }))
377            }
378            Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
379        }
380    }
381
382    /// Handle "cancel" method - cancel a running agent
383    async fn handle_cancel(&self, params: Value) -> Result<Value, RpcError> {
384        let task_id = params
385            .get("task_id")
386            .and_then(|v| v.as_str())
387            .ok_or_else(|| RpcError::invalid_params("Missing task_id"))?;
388
389        let mut agents = self.active_agents.lock().await;
390
391        if let Some(handle) = agents.remove(task_id) {
392            handle.abort();
393            Ok(serde_json::json!({
394                "status": "cancelled",
395                "task_id": task_id
396            }))
397        } else {
398            Err(RpcError::task_not_found(task_id))
399        }
400    }
401
402    /// Handle "list_agents" method - list currently active agents
403    async fn handle_list_agents(&self) -> Value {
404        let agents = self.active_agents.lock().await;
405        let agent_ids: Vec<&String> = agents.keys().collect();
406        serde_json::json!({
407            "agents": agent_ids,
408            "count": agent_ids.len()
409        })
410    }
411
412    /// Handle "list_tasks" method - list tasks from storage
413    async fn handle_list_tasks(&self, params: Value) -> Result<Value, RpcError> {
414        let params: ListTasksParams = serde_json::from_value(params).unwrap_or_default();
415
416        let storage = crate::storage::Storage::new(self.config.project_root.clone());
417
418        let tag = params
419            .tag
420            .or_else(|| storage.get_active_group().ok().flatten());
421        let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
422
423        let group = storage
424            .load_group(&tag)
425            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
426
427        let tasks: Vec<Value> = group
428            .tasks
429            .iter()
430            .filter(|t| {
431                params
432                    .status
433                    .as_ref()
434                    .map(|s| t.status.as_str().to_lowercase() == s.to_lowercase())
435                    .unwrap_or(true)
436            })
437            .map(|t| {
438                serde_json::json!({
439                    "id": t.id,
440                    "title": t.title,
441                    "status": t.status.as_str(),
442                    "complexity": t.complexity,
443                    "dependencies": t.dependencies
444                })
445            })
446            .collect();
447
448        Ok(serde_json::json!({
449            "tag": tag,
450            "tasks": tasks,
451            "count": tasks.len()
452        }))
453    }
454
455    /// Handle "get_task" method - get a single task
456    async fn handle_get_task(&self, params: Value) -> Result<Value, RpcError> {
457        let params: GetTaskParams =
458            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
459
460        let storage = crate::storage::Storage::new(self.config.project_root.clone());
461
462        let tag = params
463            .tag
464            .or_else(|| storage.get_active_group().ok().flatten());
465        let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
466
467        let group = storage
468            .load_group(&tag)
469            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
470
471        let task = group
472            .tasks
473            .iter()
474            .find(|t| t.id == params.task_id)
475            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
476
477        Ok(serde_json::json!({
478            "id": task.id,
479            "title": task.title,
480            "description": task.description,
481            "status": task.status.as_str(),
482            "complexity": task.complexity,
483            "priority": format!("{:?}", task.priority).to_lowercase(),
484            "dependencies": task.dependencies,
485            "agent_type": task.agent_type,
486            "assigned_to": task.assigned_to
487        }))
488    }
489
490    /// Handle "set_status" method - update task status
491    async fn handle_set_status(&self, params: Value) -> Result<Value, RpcError> {
492        let params: SetStatusParams =
493            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
494
495        let storage = crate::storage::Storage::new(self.config.project_root.clone());
496
497        let tag = params
498            .tag
499            .or_else(|| storage.get_active_group().ok().flatten());
500        let tag = tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
501
502        let mut group = storage
503            .load_group(&tag)
504            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
505
506        // Find and update the task
507        let task = group
508            .tasks
509            .iter_mut()
510            .find(|t| t.id == params.task_id)
511            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
512
513        // Parse status
514        let new_status = crate::models::task::TaskStatus::from_str(&params.status)
515            .ok_or_else(|| RpcError::invalid_params(&format!("Invalid status: {}", params.status)))?;
516
517        let old_status = task.status.as_str().to_string();
518        task.status = new_status;
519
520        // Save the group
521        storage
522            .update_group(&tag, &group)
523            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
524
525        Ok(serde_json::json!({
526            "task_id": params.task_id,
527            "old_status": old_status,
528            "new_status": params.status
529        }))
530    }
531
532    /// Handle "next_task" method - get next available task
533    async fn handle_next_task(&self, params: Value) -> Result<Value, RpcError> {
534        let params: NextTaskParams = serde_json::from_value(params).unwrap_or_default();
535
536        let storage = crate::storage::Storage::new(self.config.project_root.clone());
537
538        // Use next command logic
539        let result = crate::commands::helpers::find_next_task(
540            &storage,
541            params.tag.as_deref(),
542            params.all_tags,
543        );
544
545        match result {
546            Some((task, tag)) => Ok(serde_json::json!({
547                "task_id": task.id,
548                "title": task.title,
549                "tag": tag,
550                "complexity": task.complexity
551            })),
552            None => Ok(serde_json::json!({
553                "task_id": null,
554                "message": "No tasks available"
555            })),
556        }
557    }
558
559    /// Emit a JSON RPC response to stdout
560    fn emit_response(&self, response: RpcResponse) -> Result<()> {
561        let json = serde_json::to_string(&response)?;
562        let mut stdout = io::stdout().lock();
563        writeln!(stdout, "{}", json)?;
564        stdout.flush()?;
565        Ok(())
566    }
567
568    /// Emit a JSON RPC notification to stdout
569    fn emit_notification(&self, notification: RpcNotification) -> Result<()> {
570        let json = serde_json::to_string(&notification)?;
571        let mut stdout = io::stdout().lock();
572        writeln!(stdout, "{}", json)?;
573        stdout.flush()?;
574        Ok(())
575    }
576
577    /// Try to extract ID from malformed JSON for error responses
578    fn extract_id(json_str: &str) -> Option<RpcId> {
579        // Simple heuristic: try to parse just enough to get the id
580        let parsed: Result<Value, _> = serde_json::from_str(json_str);
581        if let Ok(v) = parsed {
582            if let Some(id) = v.get("id") {
583                if let Some(n) = id.as_i64() {
584                    return Some(RpcId::Number(n));
585                }
586                if let Some(s) = id.as_str() {
587                    return Some(RpcId::String(s.to_string()));
588                }
589            }
590        }
591        None
592    }
593
594    /// Forward agent events to stdout as JSON RPC notifications
595    async fn forward_events(mut event_rx: mpsc::Receiver<AgentEvent>) {
596        while let Some(event) = event_rx.recv().await {
597            let notification = match event {
598                AgentEvent::Started { task_id } => RpcNotification::agent_started(&task_id),
599                AgentEvent::Output { task_id, line } => RpcNotification::agent_output(&task_id, &line),
600                AgentEvent::Completed { result } => RpcNotification::agent_completed(
601                    &result.task_id,
602                    result.success,
603                    result.exit_code,
604                    result.duration_ms,
605                ),
606                AgentEvent::SpawnFailed { task_id, error } => {
607                    RpcNotification::agent_spawn_failed(&task_id, &error)
608                }
609            };
610
611            // Write to stdout
612            if let Ok(json) = serde_json::to_string(&notification) {
613                let mut stdout = io::stdout().lock();
614                let _ = writeln!(stdout, "{}", json);
615                let _ = stdout.flush();
616            }
617        }
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624
625    #[test]
626    fn test_server_config_default() {
627        let config = RpcServerConfig::default();
628        assert_eq!(config.default_harness, Harness::default());
629    }
630
631    #[test]
632    fn test_extract_id_number() {
633        let id = RpcServer::extract_id(r#"{"id": 42, "invalid": true}"#);
634        assert_eq!(id, Some(RpcId::Number(42)));
635    }
636
637    #[test]
638    fn test_extract_id_string() {
639        let id = RpcServer::extract_id(r#"{"id": "abc-123"}"#);
640        assert_eq!(id, Some(RpcId::String("abc-123".to_string())));
641    }
642}