Skip to main content

scud/rpc/
server.rs

1//! JSON RPC server for IPC communication
2//!
3//! Handles stdin/stdout communication with JSON RPC protocol.
4
5use anyhow::Result;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::io::{self, BufRead, Write};
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::sync::mpsc;
12use tokio::sync::Mutex;
13
14use crate::commands::spawn::terminal::Harness;
15use crate::extensions::runner::{spawn_agent, AgentEvent, SpawnConfig};
16
17use super::types::*;
18
19/// RPC Server configuration
20#[derive(Debug, Clone)]
21pub struct RpcServerConfig {
22    /// Default working directory for agents
23    pub working_dir: PathBuf,
24    /// Default harness to use
25    pub default_harness: Harness,
26    /// Default model to use
27    pub default_model: Option<String>,
28    /// Project root for task storage access
29    pub project_root: Option<PathBuf>,
30}
31
32impl Default for RpcServerConfig {
33    fn default() -> Self {
34        Self {
35            working_dir: std::env::current_dir().unwrap_or_default(),
36            default_harness: Harness::default(),
37            default_model: None,
38            project_root: None,
39        }
40    }
41}
42
43/// JSON RPC Server for subagent communication
44pub struct RpcServer {
45    config: RpcServerConfig,
46    /// Active agent task handles
47    active_agents: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
48    /// Event channel for agent events
49    event_tx: mpsc::Sender<AgentEvent>,
50    event_rx: mpsc::Receiver<AgentEvent>,
51}
52
53impl RpcServer {
54    /// Create a new RPC server with the given configuration
55    pub fn new(config: RpcServerConfig) -> Self {
56        let (event_tx, event_rx) = mpsc::channel(1000);
57        Self {
58            config,
59            active_agents: Arc::new(Mutex::new(HashMap::new())),
60            event_tx,
61            event_rx,
62        }
63    }
64
65    /// Run the RPC server loop
66    ///
67    /// Reads JSON RPC requests from stdin and writes responses/events to stdout.
68    /// This function blocks until a shutdown request is received or stdin closes.
69    pub async fn run(&mut self) -> Result<()> {
70        // Emit server ready notification
71        self.emit_notification(RpcNotification::server_ready(env!("CARGO_PKG_VERSION")))?;
72
73        let stdin = io::stdin();
74        let reader = stdin.lock();
75
76        // Spawn a task to forward agent events to stdout
77        let event_rx = std::mem::replace(&mut self.event_rx, mpsc::channel(1).1);
78        let event_forwarder = tokio::spawn(Self::forward_events(event_rx));
79
80        // Read requests from stdin line by line
81        for line in reader.lines() {
82            let line = match line {
83                Ok(l) => l,
84                Err(e) => {
85                    eprintln!("Error reading stdin: {}", e);
86                    break;
87                }
88            };
89
90            // Skip empty lines
91            if line.trim().is_empty() {
92                continue;
93            }
94
95            // Parse the request
96            let request: RpcRequest = match serde_json::from_str(&line) {
97                Ok(req) => req,
98                Err(e) => {
99                    // Try to extract ID for error response
100                    let id = Self::extract_id(&line).unwrap_or(RpcId::Null);
101                    self.emit_response(RpcResponse::error(
102                        id,
103                        RpcError::parse_error(&e.to_string()),
104                    ))?;
105                    continue;
106                }
107            };
108
109            // Validate JSON RPC version
110            if request.jsonrpc != JSONRPC_VERSION {
111                if let Some(id) = request.id.clone() {
112                    self.emit_response(RpcResponse::error(
113                        id,
114                        RpcError::invalid_request("Invalid JSON-RPC version"),
115                    ))?;
116                }
117                continue;
118            }
119
120            // Handle the request
121            let should_shutdown = self.handle_request(request).await?;
122            if should_shutdown {
123                break;
124            }
125        }
126
127        // Emit shutdown notification
128        self.emit_notification(RpcNotification::server_shutdown())?;
129
130        // Cancel the event forwarder
131        event_forwarder.abort();
132
133        Ok(())
134    }
135
136    /// Handle a single RPC request
137    async fn handle_request(&mut self, request: RpcRequest) -> Result<bool> {
138        let id = request.id.clone();
139        let method = request.method.as_str();
140
141        match method {
142            "ping" => {
143                if let Some(id) = id {
144                    self.emit_response(RpcResponse::success(
145                        id,
146                        serde_json::json!({"pong": true}),
147                    ))?;
148                }
149            }
150
151            "shutdown" => {
152                if let Some(id) = id {
153                    self.emit_response(RpcResponse::success(
154                        id,
155                        serde_json::json!({"status": "shutting_down"}),
156                    ))?;
157                }
158                return Ok(true);
159            }
160
161            "spawn" => {
162                let result = self.handle_spawn(request.params).await;
163                if let Some(id) = id {
164                    match result {
165                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
166                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
167                    }
168                }
169            }
170
171            "spawn_task" => {
172                let result = self.handle_spawn_task(request.params).await;
173                if let Some(id) = id {
174                    match result {
175                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
176                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
177                    }
178                }
179            }
180
181            "cancel" => {
182                let result = self.handle_cancel(request.params).await;
183                if let Some(id) = id {
184                    match result {
185                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
186                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
187                    }
188                }
189            }
190
191            "list_agents" => {
192                let result = self.handle_list_agents().await;
193                if let Some(id) = id {
194                    self.emit_response(RpcResponse::success(id, result))?;
195                }
196            }
197
198            "list_tasks" => {
199                let result = self.handle_list_tasks(request.params).await;
200                if let Some(id) = id {
201                    match result {
202                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
203                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
204                    }
205                }
206            }
207
208            "get_task" => {
209                let result = self.handle_get_task(request.params).await;
210                if let Some(id) = id {
211                    match result {
212                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
213                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
214                    }
215                }
216            }
217
218            "set_status" => {
219                let result = self.handle_set_status(request.params).await;
220                if let Some(id) = id {
221                    match result {
222                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
223                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
224                    }
225                }
226            }
227
228            "next_task" => {
229                let result = self.handle_next_task(request.params).await;
230                if let Some(id) = id {
231                    match result {
232                        Ok(value) => self.emit_response(RpcResponse::success(id, value))?,
233                        Err(e) => self.emit_response(RpcResponse::error(id, e))?,
234                    }
235                }
236            }
237
238            _ => {
239                if let Some(id) = id {
240                    self.emit_response(RpcResponse::error(id, RpcError::method_not_found(method)))?;
241                }
242            }
243        }
244
245        Ok(false)
246    }
247
248    /// Handle "spawn" method - spawn agent with custom prompt
249    async fn handle_spawn(&self, params: Value) -> Result<Value, RpcError> {
250        let params: SpawnParams =
251            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
252
253        // Parse harness
254        let harness = if let Some(h) = params.harness {
255            Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
256        } else {
257            self.config.default_harness
258        };
259
260        // Determine working directory
261        let working_dir = params
262            .working_dir
263            .map(PathBuf::from)
264            .unwrap_or_else(|| self.config.working_dir.clone());
265
266        let config = SpawnConfig {
267            task_id: params.task_id.clone(),
268            prompt: params.prompt,
269            working_dir,
270            harness,
271            model: params.model.or_else(|| self.config.default_model.clone()),
272        };
273
274        // Spawn the agent
275        let event_tx = self.event_tx.clone();
276        let task_id = params.task_id.clone();
277
278        match spawn_agent(config, event_tx).await {
279            Ok(handle) => {
280                // Track the agent handle
281                let mut agents = self.active_agents.lock().await;
282                agents.insert(
283                    task_id.clone(),
284                    tokio::spawn(async move {
285                        let _ = handle.await;
286                    }),
287                );
288
289                Ok(serde_json::json!({
290                    "status": "spawned",
291                    "task_id": task_id
292                }))
293            }
294            Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
295        }
296    }
297
298    /// Handle "spawn_task" method - spawn agent for a task in the graph
299    async fn handle_spawn_task(&self, params: Value) -> Result<Value, RpcError> {
300        let params: SpawnTaskParams =
301            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
302
303        // Load task from storage
304        let storage = crate::storage::Storage::new(self.config.project_root.clone());
305
306        // Get the tag
307        let tag = params
308            .tag
309            .or_else(|| storage.get_active_group().ok().flatten());
310        let tag =
311            tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
312
313        // Load the task group
314        let group = storage
315            .load_group(&tag)
316            .map_err(|e: anyhow::Error| RpcError::task_not_found(&e.to_string()))?;
317
318        // Find the task
319        let task = group
320            .tasks
321            .iter()
322            .find(|t| t.id == params.task_id)
323            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
324
325        // Generate prompt for the task
326        let prompt = crate::commands::spawn::agent::generate_prompt(task, &tag);
327
328        // Parse harness (prefer params, then task's agent_type, then default)
329        let harness = if let Some(h) = params.harness {
330            Harness::parse(&h).map_err(|e| RpcError::invalid_params(&e.to_string()))?
331        } else if let Some(agent_type) = &task.agent_type {
332            // Try to load from agent definition
333            if let Some(agent_def) =
334                crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
335            {
336                agent_def.harness().unwrap_or(self.config.default_harness)
337            } else {
338                self.config.default_harness
339            }
340        } else {
341            self.config.default_harness
342        };
343
344        // Get model
345        let model = params.model.or_else(|| {
346            if let Some(agent_type) = &task.agent_type {
347                if let Some(agent_def) =
348                    crate::agents::AgentDef::try_load(agent_type, &self.config.working_dir)
349                {
350                    return agent_def.model().map(String::from);
351                }
352            }
353            self.config.default_model.clone()
354        });
355
356        let config = SpawnConfig {
357            task_id: params.task_id.clone(),
358            prompt,
359            working_dir: self.config.working_dir.clone(),
360            harness,
361            model,
362        };
363
364        // Spawn the agent
365        let event_tx = self.event_tx.clone();
366        let task_id = params.task_id.clone();
367
368        match spawn_agent(config, event_tx).await {
369            Ok(handle) => {
370                // Track the agent handle
371                let mut agents = self.active_agents.lock().await;
372                agents.insert(
373                    task_id.clone(),
374                    tokio::spawn(async move {
375                        let _ = handle.await;
376                    }),
377                );
378
379                Ok(serde_json::json!({
380                    "status": "spawned",
381                    "task_id": task_id,
382                    "tag": tag
383                }))
384            }
385            Err(e) => Err(RpcError::spawn_failed(&e.to_string())),
386        }
387    }
388
389    /// Handle "cancel" method - cancel a running agent
390    async fn handle_cancel(&self, params: Value) -> Result<Value, RpcError> {
391        let task_id = params
392            .get("task_id")
393            .and_then(|v| v.as_str())
394            .ok_or_else(|| RpcError::invalid_params("Missing task_id"))?;
395
396        let mut agents = self.active_agents.lock().await;
397
398        if let Some(handle) = agents.remove(task_id) {
399            handle.abort();
400            Ok(serde_json::json!({
401                "status": "cancelled",
402                "task_id": task_id
403            }))
404        } else {
405            Err(RpcError::task_not_found(task_id))
406        }
407    }
408
409    /// Handle "list_agents" method - list currently active agents
410    async fn handle_list_agents(&self) -> Value {
411        let agents = self.active_agents.lock().await;
412        let agent_ids: Vec<&String> = agents.keys().collect();
413        serde_json::json!({
414            "agents": agent_ids,
415            "count": agent_ids.len()
416        })
417    }
418
419    /// Handle "list_tasks" method - list tasks from storage
420    async fn handle_list_tasks(&self, params: Value) -> Result<Value, RpcError> {
421        let params: ListTasksParams = serde_json::from_value(params).unwrap_or_default();
422
423        let storage = crate::storage::Storage::new(self.config.project_root.clone());
424
425        let tag = params
426            .tag
427            .or_else(|| storage.get_active_group().ok().flatten());
428        let tag =
429            tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
430
431        let group = storage
432            .load_group(&tag)
433            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
434
435        let tasks: Vec<Value> = group
436            .tasks
437            .iter()
438            .filter(|t| {
439                params
440                    .status
441                    .as_ref()
442                    .map(|s| t.status.as_str().to_lowercase() == s.to_lowercase())
443                    .unwrap_or(true)
444            })
445            .map(|t| {
446                serde_json::json!({
447                    "id": t.id,
448                    "title": t.title,
449                    "status": t.status.as_str(),
450                    "complexity": t.complexity,
451                    "dependencies": t.dependencies
452                })
453            })
454            .collect();
455
456        Ok(serde_json::json!({
457            "tag": tag,
458            "tasks": tasks,
459            "count": tasks.len()
460        }))
461    }
462
463    /// Handle "get_task" method - get a single task
464    async fn handle_get_task(&self, params: Value) -> Result<Value, RpcError> {
465        let params: GetTaskParams =
466            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
467
468        let storage = crate::storage::Storage::new(self.config.project_root.clone());
469
470        let tag = params
471            .tag
472            .or_else(|| storage.get_active_group().ok().flatten());
473        let tag =
474            tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
475
476        let group = storage
477            .load_group(&tag)
478            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
479
480        let task = group
481            .tasks
482            .iter()
483            .find(|t| t.id == params.task_id)
484            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
485
486        Ok(serde_json::json!({
487            "id": task.id,
488            "title": task.title,
489            "description": task.description,
490            "status": task.status.as_str(),
491            "complexity": task.complexity,
492            "priority": format!("{:?}", task.priority).to_lowercase(),
493            "dependencies": task.dependencies,
494            "agent_type": task.agent_type,
495            "assigned_to": task.assigned_to
496        }))
497    }
498
499    /// Handle "set_status" method - update task status
500    async fn handle_set_status(&self, params: Value) -> Result<Value, RpcError> {
501        let params: SetStatusParams =
502            serde_json::from_value(params).map_err(|e| RpcError::invalid_params(&e.to_string()))?;
503
504        let storage = crate::storage::Storage::new(self.config.project_root.clone());
505
506        let tag = params
507            .tag
508            .or_else(|| storage.get_active_group().ok().flatten());
509        let tag =
510            tag.ok_or_else(|| RpcError::invalid_params("No tag specified and no active tag"))?;
511
512        let mut group = storage
513            .load_group(&tag)
514            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
515
516        // Find and update the task
517        let task = group
518            .tasks
519            .iter_mut()
520            .find(|t| t.id == params.task_id)
521            .ok_or_else(|| RpcError::task_not_found(&params.task_id))?;
522
523        // Parse status
524        let new_status =
525            crate::models::task::TaskStatus::from_str(&params.status).ok_or_else(|| {
526                RpcError::invalid_params(&format!("Invalid status: {}", params.status))
527            })?;
528
529        let old_status = task.status.as_str().to_string();
530        task.status = new_status;
531
532        // Save the group
533        storage
534            .update_group(&tag, &group)
535            .map_err(|e: anyhow::Error| RpcError::internal_error(&e.to_string()))?;
536
537        Ok(serde_json::json!({
538            "task_id": params.task_id,
539            "old_status": old_status,
540            "new_status": params.status
541        }))
542    }
543
544    /// Handle "next_task" method - get next available task
545    async fn handle_next_task(&self, params: Value) -> Result<Value, RpcError> {
546        let params: NextTaskParams = serde_json::from_value(params).unwrap_or_default();
547
548        let storage = crate::storage::Storage::new(self.config.project_root.clone());
549
550        // Use next command logic
551        let result = crate::commands::helpers::find_next_task(
552            &storage,
553            params.tag.as_deref(),
554            params.all_tags,
555        );
556
557        match result {
558            Some((task, tag)) => Ok(serde_json::json!({
559                "task_id": task.id,
560                "title": task.title,
561                "tag": tag,
562                "complexity": task.complexity
563            })),
564            None => Ok(serde_json::json!({
565                "task_id": null,
566                "message": "No tasks available"
567            })),
568        }
569    }
570
571    /// Emit a JSON RPC response to stdout
572    fn emit_response(&self, response: RpcResponse) -> Result<()> {
573        let json = serde_json::to_string(&response)?;
574        let mut stdout = io::stdout().lock();
575        writeln!(stdout, "{}", json)?;
576        stdout.flush()?;
577        Ok(())
578    }
579
580    /// Emit a JSON RPC notification to stdout
581    fn emit_notification(&self, notification: RpcNotification) -> Result<()> {
582        let json = serde_json::to_string(&notification)?;
583        let mut stdout = io::stdout().lock();
584        writeln!(stdout, "{}", json)?;
585        stdout.flush()?;
586        Ok(())
587    }
588
589    /// Try to extract ID from malformed JSON for error responses
590    fn extract_id(json_str: &str) -> Option<RpcId> {
591        // Simple heuristic: try to parse just enough to get the id
592        let parsed: Result<Value, _> = serde_json::from_str(json_str);
593        if let Ok(v) = parsed {
594            if let Some(id) = v.get("id") {
595                if let Some(n) = id.as_i64() {
596                    return Some(RpcId::Number(n));
597                }
598                if let Some(s) = id.as_str() {
599                    return Some(RpcId::String(s.to_string()));
600                }
601            }
602        }
603        None
604    }
605
606    /// Forward agent events to stdout as JSON RPC notifications
607    async fn forward_events(mut event_rx: mpsc::Receiver<AgentEvent>) {
608        while let Some(event) = event_rx.recv().await {
609            let notification = match event {
610                AgentEvent::Started { task_id } => RpcNotification::agent_started(&task_id),
611                AgentEvent::Output { task_id, line } => {
612                    RpcNotification::agent_output(&task_id, &line)
613                }
614                AgentEvent::Completed { result } => RpcNotification::agent_completed(
615                    &result.task_id,
616                    result.success,
617                    result.exit_code,
618                    result.duration_ms,
619                ),
620                AgentEvent::SpawnFailed { task_id, error } => {
621                    RpcNotification::agent_spawn_failed(&task_id, &error)
622                }
623            };
624
625            // Write to stdout
626            if let Ok(json) = serde_json::to_string(&notification) {
627                let mut stdout = io::stdout().lock();
628                let _ = writeln!(stdout, "{}", json);
629                let _ = stdout.flush();
630            }
631        }
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638
639    #[test]
640    fn test_server_config_default() {
641        let config = RpcServerConfig::default();
642        assert_eq!(config.default_harness, Harness::default());
643    }
644
645    #[test]
646    fn test_extract_id_number() {
647        let id = RpcServer::extract_id(r#"{"id": 42, "invalid": true}"#);
648        assert_eq!(id, Some(RpcId::Number(42)));
649    }
650
651    #[test]
652    fn test_extract_id_string() {
653        let id = RpcServer::extract_id(r#"{"id": "abc-123"}"#);
654        assert_eq!(id, Some(RpcId::String("abc-123".to_string())));
655    }
656}