Skip to main content

oparry_wrapper/
ipc.rs

1//! IPC (Inter-Process Communication) for Claude Code wrapper
2//!
3//! This module handles communication between Claude Code and Parry
4//! using stdin/stdout JSON protocol.
5
6use crate::protocol::{ClaudeRequest, ClaudeResponse};
7use oparry_core::{Error, Result};
8use std::io::{self, BufRead, BufReader, Write};
9use std::sync::{Arc, Mutex};
10use tracing::{debug, error, info, trace, warn};
11
12/// IPC channel for Claude Code communication
13pub struct IpcChannel {
14    /// Input reader (stdin)
15    reader: Arc<Mutex<BufReader<io::Stdin>>>,
16    /// Output writer (stdout)
17    writer: Arc<Mutex<io::Stdout>>,
18    /// Whether we're running in interactive mode
19    interactive: bool,
20}
21
22impl Clone for IpcChannel {
23    fn clone(&self) -> Self {
24        Self {
25            reader: Arc::clone(&self.reader),
26            writer: Arc::clone(&self.writer),
27            interactive: self.interactive,
28        }
29    }
30}
31
32impl IpcChannel {
33    /// Create a new IPC channel using stdin/stdout
34    pub fn stdio() -> Self {
35        Self {
36            reader: Arc::new(Mutex::new(BufReader::new(io::stdin()))),
37            writer: Arc::new(Mutex::new(io::stdout())),
38            interactive: true,
39        }
40    }
41
42    /// Create a non-interactive channel for testing
43    pub fn buffered() -> Self {
44        Self {
45            reader: Arc::new(Mutex::new(BufReader::new(io::stdin()))),
46            writer: Arc::new(Mutex::new(io::stdout())),
47            interactive: false,
48        }
49    }
50
51    /// Read a single JSON request from stdin
52    ///
53    /// Expected format: newline-delimited JSON
54    pub fn read_request(&self) -> Result<Option<ClaudeRequest>> {
55        let mut reader = self.reader.lock()
56            .map_err(|e| Error::Wrapper(format!("Failed to lock reader: {}", e)))?;
57
58        let mut line = String::new();
59        let bytes_read = reader.read_line(&mut line)
60            .map_err(|e| Error::Wrapper(format!("Failed to read from stdin: {}", e)))?;
61
62        if bytes_read == 0 {
63            // EOF - clean shutdown
64            debug!("Received EOF on stdin, shutting down");
65            return Ok(None);
66        }
67
68        let line = line.trim();
69        if line.is_empty() {
70            // Skip empty lines
71            trace!("Skipping empty line");
72            return Ok(None);
73        }
74
75        debug!("Received request: {}", line);
76
77        // Parse JSON request
78        let request = ClaudeRequest::from_json(line)?;
79        Ok(Some(request))
80    }
81
82    /// Send a response to Claude Code via stdout
83    pub fn send_response(&self, response: &ClaudeResponse) -> Result<()> {
84        let json = response.to_json()?;
85        let output = format!("{}\n", json);
86
87        debug!("Sending response: {}", json);
88
89        let mut writer = self.writer.lock()
90            .map_err(|e| Error::Wrapper(format!("Failed to lock writer: {}", e)))?;
91
92        writer.write_all(output.as_bytes())
93            .map_err(|e| Error::Wrapper(format!("Failed to write to stdout: {}", e)))?;
94        writer.flush()
95            .map_err(|e| Error::Wrapper(format!("Failed to flush stdout: {}", e)))?;
96
97        Ok(())
98    }
99
100    /// Enter the main IPC loop
101    ///
102    /// This blocks and handles requests until EOF is received
103    pub fn run_loop<F>(self, mut handler: F) -> Result<()>
104    where
105        F: FnMut(ClaudeRequest) -> Result<ClaudeResponse>,
106    {
107        info!("Starting IPC loop in {} mode",
108            if self.interactive { "interactive" } else { "buffered" });
109
110        loop {
111            match self.read_request() {
112                Ok(Some(request)) => {
113                    trace!("Processing request: {:?}", request.id());
114
115                    let response = handler(request.clone());
116
117                    match response {
118                        Ok(resp) => {
119                            if let Err(e) = self.send_response(&resp) {
120                                error!("Failed to send response: {}", e);
121                                // Try to send error response
122                                let error_resp = ClaudeResponse::protocol_error(format!("Internal error: {}", e));
123                                let _ = self.send_response(&error_resp);
124                                return Err(e);
125                            }
126                        }
127                        Err(e) => {
128                            // Send protocol error for handler failures
129                            warn!("Handler failed: {}", e);
130                            let error_resp = ClaudeResponse::protocol_error(format!("Handler error: {}", e));
131                            if let Err(send_err) = self.send_response(&error_resp) {
132                                error!("Failed to send error response: {}", send_err);
133                                return Err(e);
134                            }
135                        }
136                    }
137                }
138                Ok(None) => {
139                    // Empty line or EOF
140                    if self.reader.lock()
141                        .map(|mut r| r.fill_buf().map(|b| b.is_empty()).unwrap_or(false))
142                        .unwrap_or(false)
143                    {
144                        info!("EOF detected, exiting IPC loop");
145                        break;
146                    }
147                    continue;
148                }
149                Err(e) => {
150                    error!("Failed to read request: {}", e);
151                    let error_resp = ClaudeResponse::protocol_error(format!("Parse error: {}", e));
152                    self.send_response(&error_resp)?;
153                    return Err(e);
154                }
155            }
156        }
157
158        info!("IPC loop terminated cleanly");
159        Ok(())
160    }
161
162    /// Check if this is an interactive channel
163    pub fn is_interactive(&self) -> bool {
164        self.interactive
165    }
166}
167
168impl Default for IpcChannel {
169    fn default() -> Self {
170        Self::stdio()
171    }
172}
173
174/// Simple in-memory channel for testing
175#[cfg(test)]
176pub struct MockIpcChannel {
177    pub received_requests: Arc<Mutex<Vec<ClaudeRequest>>>,
178    pub responses_to_send: Arc<Mutex<Vec<ClaudeResponse>>>,
179}
180
181#[cfg(test)]
182impl MockIpcChannel {
183    pub fn new() -> Self {
184        Self {
185            received_requests: Arc::new(Mutex::new(Vec::new())),
186            responses_to_send: Arc::new(Mutex::new(Vec::new())),
187        }
188    }
189
190    pub fn queue_response(&self, response: ClaudeResponse) {
191        self.responses_to_send.lock().unwrap().push(response);
192    }
193
194    pub fn take_received(&self) -> Vec<ClaudeRequest> {
195        let mut reqs = self.received_requests.lock().unwrap();
196        std::mem::take(&mut *reqs)
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use crate::protocol::{WriteFileRequest, IssueDetail, IssueSeverity};
204    use std::path::PathBuf;
205
206    #[test]
207    fn test_request_serialization_roundtrip() {
208        let request = ClaudeRequest::WriteFile(WriteFileRequest {
209            id: "test-1".to_string(),
210            path: PathBuf::from("src/test.ts"),
211            content: "export const test = true;".to_string(),
212            encoding: Some("utf-8".to_string()),
213            create_dirs: Some(true),
214        });
215
216        let json = request.to_json().unwrap();
217        let parsed = ClaudeRequest::from_json(&json).unwrap();
218
219        match parsed {
220            ClaudeRequest::WriteFile(w) => {
221                assert_eq!(w.id, "test-1");
222                assert_eq!(w.path, PathBuf::from("src/test.ts"));
223                assert_eq!(w.content, "export const test = true;");
224            }
225            _ => panic!("Wrong request type"),
226        }
227    }
228
229    #[test]
230    fn test_response_serialization() {
231        let response = ClaudeResponse::rejected(
232            "req-1",
233            "Validation failed",
234            vec![
235                IssueDetail {
236                    code: "test-error".to_string(),
237                    level: IssueSeverity::Error,
238                    message: "Test error".to_string(),
239                    line: Some(10),
240                    column: Some(5),
241                    suggestion: Some("Fix it".to_string()),
242                    context: None,
243                }
244            ],
245        );
246
247        let json = response.to_json().unwrap();
248        assert!(json.contains("rejected"));
249        assert!(json.contains("test-error"));
250        assert!(json.contains("Validation failed"));
251
252        // Parse it back
253        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
254        assert_eq!(parsed["type"], "rejected");
255        assert_eq!(parsed["request_id"], "req-1");
256    }
257
258    #[test]
259    fn test_mock_channel() {
260        let channel = MockIpcChannel::new();
261
262        // Queue some responses
263        channel.queue_response(ClaudeResponse::Pong);
264        channel.queue_response(ClaudeResponse::approved("test-1"));
265
266        assert_eq!(channel.responses_to_send.lock().unwrap().len(), 2);
267    }
268}