Skip to main content

ipckit/
file_channel.rs

1//! File-based IPC Channel
2//!
3//! A simple IPC mechanism using local files for communication.
4//! This is useful for frontend-backend communication where:
5//! - Backend (Python) writes to a file, Frontend reads it
6//! - Frontend writes to another file, Backend reads it
7//!
8//! ## Protocol
9//!
10//! Each message file contains:
11//! - Line 1: Message ID (UUID)
12//! - Line 2: Timestamp (Unix epoch in milliseconds)
13//! - Line 3: Message type (request/response/event)
14//! - Line 4+: JSON payload
15//!
16//! ## File Structure
17//!
18//! ```text
19//! {channel_dir}/
20//! ├── backend_to_frontend.json   # Backend writes, Frontend reads
21//! ├── frontend_to_backend.json   # Frontend writes, Backend reads
22//! ├── backend_to_frontend.lock   # Lock file for atomic writes
23//! ├── frontend_to_backend.lock   # Lock file for atomic writes
24//! └── .channel_info              # Channel metadata
25//! ```
26
27use crate::error::{IpcError, Result};
28use serde::{Deserialize, Serialize};
29use std::fs::{self, OpenOptions};
30use std::io::Write;
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34/// Message types for file-based IPC
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(rename_all = "lowercase")]
37pub enum MessageType {
38    Request,
39    Response,
40    Event,
41}
42
43/// A message in the file-based IPC protocol
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct FileMessage {
46    /// Unique message ID
47    pub id: String,
48    /// Timestamp in milliseconds since Unix epoch
49    pub timestamp: u64,
50    /// Message type
51    #[serde(rename = "type")]
52    pub msg_type: MessageType,
53    /// For responses, the ID of the request being responded to
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub reply_to: Option<String>,
56    /// Method name (for requests)
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub method: Option<String>,
59    /// Message payload (JSON value)
60    pub payload: serde_json::Value,
61    /// Error message (for error responses)
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub error: Option<String>,
64}
65
66impl FileMessage {
67    /// Create a new request message
68    pub fn request(method: &str, payload: serde_json::Value) -> Self {
69        Self {
70            id: uuid_v4(),
71            timestamp: current_timestamp_ms(),
72            msg_type: MessageType::Request,
73            reply_to: None,
74            method: Some(method.to_string()),
75            payload,
76            error: None,
77        }
78    }
79
80    /// Create a response message
81    pub fn response(request_id: &str, payload: serde_json::Value) -> Self {
82        Self {
83            id: uuid_v4(),
84            timestamp: current_timestamp_ms(),
85            msg_type: MessageType::Response,
86            reply_to: Some(request_id.to_string()),
87            method: None,
88            payload,
89            error: None,
90        }
91    }
92
93    /// Create an error response
94    pub fn error_response(request_id: &str, error: &str) -> Self {
95        Self {
96            id: uuid_v4(),
97            timestamp: current_timestamp_ms(),
98            msg_type: MessageType::Response,
99            reply_to: Some(request_id.to_string()),
100            method: None,
101            payload: serde_json::Value::Null,
102            error: Some(error.to_string()),
103        }
104    }
105
106    /// Create an event message (no response expected)
107    pub fn event(name: &str, payload: serde_json::Value) -> Self {
108        Self {
109            id: uuid_v4(),
110            timestamp: current_timestamp_ms(),
111            msg_type: MessageType::Event,
112            reply_to: None,
113            method: Some(name.to_string()),
114            payload,
115            error: None,
116        }
117    }
118}
119
120/// File-based IPC channel for backend (Python/Rust) side
121pub struct FileChannel {
122    /// Channel directory
123    dir: PathBuf,
124    /// File for outgoing messages (backend -> frontend)
125    outbox_path: PathBuf,
126    /// File for incoming messages (frontend -> backend)
127    inbox_path: PathBuf,
128    /// Last processed message ID from inbox
129    last_inbox_id: Option<String>,
130    /// Last processed message timestamp
131    last_inbox_timestamp: u64,
132}
133
134impl FileChannel {
135    /// Create or open a file channel
136    ///
137    /// # Arguments
138    /// * `dir` - Directory for channel files (will be created if not exists)
139    /// * `is_backend` - True for backend side, false for frontend side
140    pub fn new<P: AsRef<Path>>(dir: P, is_backend: bool) -> Result<Self> {
141        let dir = dir.as_ref().to_path_buf();
142
143        // Create directory if not exists
144        fs::create_dir_all(&dir)?;
145
146        // Determine file paths based on role
147        let (outbox_path, inbox_path) = if is_backend {
148            (
149                dir.join("backend_to_frontend.json"),
150                dir.join("frontend_to_backend.json"),
151            )
152        } else {
153            (
154                dir.join("frontend_to_backend.json"),
155                dir.join("backend_to_frontend.json"),
156            )
157        };
158
159        // Create channel info file
160        let info_path = dir.join(".channel_info");
161        if !info_path.exists() {
162            let info = serde_json::json!({
163                "version": "1.0",
164                "created": current_timestamp_ms(),
165                "protocol": "file-ipc"
166            });
167            fs::write(&info_path, serde_json::to_string_pretty(&info).unwrap())?;
168        }
169
170        // Initialize empty message files if not exist
171        for path in [&outbox_path, &inbox_path] {
172            if !path.exists() {
173                fs::write(path, "[]")?;
174            }
175        }
176
177        Ok(Self {
178            dir,
179            outbox_path,
180            inbox_path,
181            last_inbox_id: None,
182            last_inbox_timestamp: 0,
183        })
184    }
185
186    /// Create a backend-side channel
187    pub fn backend<P: AsRef<Path>>(dir: P) -> Result<Self> {
188        Self::new(dir, true)
189    }
190
191    /// Create a frontend-side channel
192    pub fn frontend<P: AsRef<Path>>(dir: P) -> Result<Self> {
193        Self::new(dir, false)
194    }
195
196    /// Get the channel directory
197    pub fn dir(&self) -> &Path {
198        &self.dir
199    }
200
201    /// Send a message (write to outbox)
202    pub fn send(&self, message: &FileMessage) -> Result<()> {
203        let lock_path = self.outbox_path.with_extension("lock");
204        let _lock = FileLock::acquire(&lock_path)?;
205
206        // Read existing messages
207        let mut messages = self.read_message_file(&self.outbox_path)?;
208
209        // Add new message
210        messages.push(message.clone());
211
212        // Keep only recent messages (last 100)
213        if messages.len() > 100 {
214            let skip_count = messages.len() - 100;
215            messages = messages.into_iter().skip(skip_count).collect();
216        }
217
218        // Write back atomically
219        let temp_path = self.outbox_path.with_extension("tmp");
220        let content = serde_json::to_string_pretty(&messages)
221            .map_err(|e| IpcError::serialization(e.to_string()))?;
222        fs::write(&temp_path, &content)?;
223        fs::rename(&temp_path, &self.outbox_path)?;
224
225        Ok(())
226    }
227
228    /// Send a request and return the message ID
229    pub fn send_request(&self, method: &str, params: serde_json::Value) -> Result<String> {
230        let msg = FileMessage::request(method, params);
231        let id = msg.id.clone();
232        self.send(&msg)?;
233        Ok(id)
234    }
235
236    /// Send a response to a request
237    pub fn send_response(&self, request_id: &str, result: serde_json::Value) -> Result<()> {
238        let msg = FileMessage::response(request_id, result);
239        self.send(&msg)
240    }
241
242    /// Send an error response
243    pub fn send_error(&self, request_id: &str, error: &str) -> Result<()> {
244        let msg = FileMessage::error_response(request_id, error);
245        self.send(&msg)
246    }
247
248    /// Send an event
249    pub fn send_event(&self, name: &str, payload: serde_json::Value) -> Result<()> {
250        let msg = FileMessage::event(name, payload);
251        self.send(&msg)
252    }
253
254    /// Receive new messages from inbox
255    pub fn recv(&mut self) -> Result<Vec<FileMessage>> {
256        let messages = self.read_message_file(&self.inbox_path)?;
257
258        // Filter to only new messages
259        let new_messages: Vec<FileMessage> = messages
260            .into_iter()
261            .filter(|m| {
262                m.timestamp > self.last_inbox_timestamp
263                    || (m.timestamp == self.last_inbox_timestamp
264                        && self.last_inbox_id.as_ref() != Some(&m.id))
265            })
266            .collect();
267
268        // Update last processed
269        if let Some(last) = new_messages.last() {
270            self.last_inbox_timestamp = last.timestamp;
271            self.last_inbox_id = Some(last.id.clone());
272        }
273
274        Ok(new_messages)
275    }
276
277    /// Receive a single new message (non-blocking)
278    pub fn recv_one(&mut self) -> Result<Option<FileMessage>> {
279        let messages = self.recv()?;
280        Ok(messages.into_iter().next())
281    }
282
283    /// Wait for a response to a specific request
284    pub fn wait_response(&mut self, request_id: &str, timeout: Duration) -> Result<FileMessage> {
285        let start = std::time::Instant::now();
286        let poll_interval = Duration::from_millis(50);
287
288        loop {
289            let messages = self.recv()?;
290
291            for msg in messages {
292                if msg.msg_type == MessageType::Response
293                    && msg.reply_to.as_ref() == Some(&request_id.to_string())
294                {
295                    return Ok(msg);
296                }
297            }
298
299            if start.elapsed() > timeout {
300                return Err(IpcError::Timeout);
301            }
302
303            std::thread::sleep(poll_interval);
304        }
305    }
306
307    /// Poll for new messages with a callback
308    pub fn poll<F>(&mut self, interval: Duration, mut callback: F) -> Result<()>
309    where
310        F: FnMut(FileMessage) -> bool,
311    {
312        loop {
313            let messages = self.recv()?;
314
315            for msg in messages {
316                if !callback(msg) {
317                    return Ok(());
318                }
319            }
320
321            std::thread::sleep(interval);
322        }
323    }
324
325    /// Clear all messages in both inbox and outbox
326    pub fn clear(&self) -> Result<()> {
327        fs::write(&self.outbox_path, "[]")?;
328        fs::write(&self.inbox_path, "[]")?;
329        Ok(())
330    }
331
332    /// Read messages from a file
333    fn read_message_file(&self, path: &Path) -> Result<Vec<FileMessage>> {
334        if !path.exists() {
335            return Ok(Vec::new());
336        }
337
338        let content = fs::read_to_string(path)?;
339        if content.trim().is_empty() || content.trim() == "[]" {
340            return Ok(Vec::new());
341        }
342
343        serde_json::from_str(&content).map_err(|e| IpcError::deserialization(e.to_string()))
344    }
345}
346
347/// Simple file-based lock for atomic operations
348struct FileLock {
349    path: PathBuf,
350}
351
352impl FileLock {
353    fn acquire(path: &Path) -> Result<Self> {
354        let path = path.to_path_buf();
355        let max_attempts = 50;
356        let wait_time = Duration::from_millis(10);
357
358        for _ in 0..max_attempts {
359            match OpenOptions::new().write(true).create_new(true).open(&path) {
360                Ok(mut file) => {
361                    // Write PID to lock file
362                    let _ = writeln!(file, "{}", std::process::id());
363                    return Ok(Self { path });
364                }
365                Err(_) => {
366                    std::thread::sleep(wait_time);
367                }
368            }
369        }
370
371        // Force acquire if lock is stale (older than 5 seconds)
372        if let Ok(metadata) = fs::metadata(&path) {
373            if let Ok(modified) = metadata.modified() {
374                if modified.elapsed().unwrap_or_default() > Duration::from_secs(5) {
375                    let _ = fs::remove_file(&path);
376                    return Self::acquire(&path);
377                }
378            }
379        }
380
381        Err(IpcError::Timeout)
382    }
383}
384
385impl Drop for FileLock {
386    fn drop(&mut self) {
387        let _ = fs::remove_file(&self.path);
388    }
389}
390
391/// Generate a simple UUID v4
392fn uuid_v4() -> String {
393    use std::collections::hash_map::RandomState;
394    use std::hash::{BuildHasher, Hasher};
395
396    let state = RandomState::new();
397    let mut hasher = state.build_hasher();
398    hasher.write_u64(current_timestamp_ms());
399    hasher.write_usize(std::process::id() as usize);
400    let h1 = hasher.finish();
401
402    let state2 = RandomState::new();
403    let mut hasher2 = state2.build_hasher();
404    hasher2.write_u64(h1);
405    let h2 = hasher2.finish();
406
407    format!(
408        "{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
409        (h1 >> 32) as u32,
410        (h1 >> 16) as u16,
411        h1 as u16 & 0x0FFF,
412        (h2 >> 48) as u16 & 0x3FFF | 0x8000,
413        h2 & 0xFFFFFFFFFFFF
414    )
415}
416
417/// Get current timestamp in milliseconds
418fn current_timestamp_ms() -> u64 {
419    SystemTime::now()
420        .duration_since(UNIX_EPOCH)
421        .unwrap_or_default()
422        .as_millis() as u64
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428    use std::thread;
429    use tempfile::tempdir;
430
431    #[test]
432    fn test_file_channel_basic() {
433        let dir = tempdir().unwrap();
434
435        let mut backend = FileChannel::backend(dir.path()).unwrap();
436        let mut frontend = FileChannel::frontend(dir.path()).unwrap();
437
438        // Backend sends request
439        let msg = FileMessage::request("ping", serde_json::json!({}));
440        backend.send(&msg).unwrap();
441
442        // Frontend receives
443        let received = frontend.recv().unwrap();
444        assert_eq!(received.len(), 1);
445        assert_eq!(received[0].method.as_ref().unwrap(), "ping");
446
447        // Frontend sends response
448        frontend
449            .send_response(&received[0].id, serde_json::json!({"pong": true}))
450            .unwrap();
451
452        // Backend receives response
453        let responses = backend.recv().unwrap();
454        assert_eq!(responses.len(), 1);
455        assert_eq!(responses[0].reply_to.as_ref().unwrap(), &received[0].id);
456    }
457
458    #[test]
459    fn test_file_channel_concurrent() {
460        let dir = tempdir().unwrap();
461        let dir_path = dir.path().to_path_buf();
462
463        let handle = thread::spawn({
464            let dir_path = dir_path.clone();
465            move || {
466                let mut frontend = FileChannel::frontend(&dir_path).unwrap();
467                thread::sleep(Duration::from_millis(100));
468
469                // Wait for request
470                loop {
471                    let msgs = frontend.recv().unwrap();
472                    for msg in msgs {
473                        if msg.method.as_ref() == Some(&"test".to_string()) {
474                            frontend
475                                .send_response(&msg.id, serde_json::json!({"ok": true}))
476                                .unwrap();
477                            return;
478                        }
479                    }
480                    thread::sleep(Duration::from_millis(50));
481                }
482            }
483        });
484
485        let mut backend = FileChannel::backend(&dir_path).unwrap();
486        let request_id = backend
487            .send_request("test", serde_json::json!({"value": 42}))
488            .unwrap();
489
490        let response = backend
491            .wait_response(&request_id, Duration::from_secs(5))
492            .unwrap();
493        assert!(response.payload.get("ok").unwrap().as_bool().unwrap());
494
495        handle.join().unwrap();
496    }
497}