1use crate::error::{IpcError, Result};
28use serde::{Deserialize, Serialize};
29use std::fs::{self, OpenOptions};
30use std::io::Write;
31use std::path::{Path, PathBuf};
32use std::time::{Duration, SystemTime, UNIX_EPOCH};
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(rename_all = "lowercase")]
37pub enum MessageType {
38 Request,
39 Response,
40 Event,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct FileMessage {
46 pub id: String,
48 pub timestamp: u64,
50 #[serde(rename = "type")]
52 pub msg_type: MessageType,
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub reply_to: Option<String>,
56 #[serde(skip_serializing_if = "Option::is_none")]
58 pub method: Option<String>,
59 pub payload: serde_json::Value,
61 #[serde(skip_serializing_if = "Option::is_none")]
63 pub error: Option<String>,
64}
65
66impl FileMessage {
67 pub fn request(method: &str, payload: serde_json::Value) -> Self {
69 Self {
70 id: uuid_v4(),
71 timestamp: current_timestamp_ms(),
72 msg_type: MessageType::Request,
73 reply_to: None,
74 method: Some(method.to_string()),
75 payload,
76 error: None,
77 }
78 }
79
80 pub fn response(request_id: &str, payload: serde_json::Value) -> Self {
82 Self {
83 id: uuid_v4(),
84 timestamp: current_timestamp_ms(),
85 msg_type: MessageType::Response,
86 reply_to: Some(request_id.to_string()),
87 method: None,
88 payload,
89 error: None,
90 }
91 }
92
93 pub fn error_response(request_id: &str, error: &str) -> Self {
95 Self {
96 id: uuid_v4(),
97 timestamp: current_timestamp_ms(),
98 msg_type: MessageType::Response,
99 reply_to: Some(request_id.to_string()),
100 method: None,
101 payload: serde_json::Value::Null,
102 error: Some(error.to_string()),
103 }
104 }
105
106 pub fn event(name: &str, payload: serde_json::Value) -> Self {
108 Self {
109 id: uuid_v4(),
110 timestamp: current_timestamp_ms(),
111 msg_type: MessageType::Event,
112 reply_to: None,
113 method: Some(name.to_string()),
114 payload,
115 error: None,
116 }
117 }
118}
119
120pub struct FileChannel {
122 dir: PathBuf,
124 outbox_path: PathBuf,
126 inbox_path: PathBuf,
128 last_inbox_id: Option<String>,
130 last_inbox_timestamp: u64,
132}
133
134impl FileChannel {
135 pub fn new<P: AsRef<Path>>(dir: P, is_backend: bool) -> Result<Self> {
141 let dir = dir.as_ref().to_path_buf();
142
143 fs::create_dir_all(&dir)?;
145
146 let (outbox_path, inbox_path) = if is_backend {
148 (
149 dir.join("backend_to_frontend.json"),
150 dir.join("frontend_to_backend.json"),
151 )
152 } else {
153 (
154 dir.join("frontend_to_backend.json"),
155 dir.join("backend_to_frontend.json"),
156 )
157 };
158
159 let info_path = dir.join(".channel_info");
161 if !info_path.exists() {
162 let info = serde_json::json!({
163 "version": "1.0",
164 "created": current_timestamp_ms(),
165 "protocol": "file-ipc"
166 });
167 fs::write(&info_path, serde_json::to_string_pretty(&info).unwrap())?;
168 }
169
170 for path in [&outbox_path, &inbox_path] {
172 if !path.exists() {
173 fs::write(path, "[]")?;
174 }
175 }
176
177 Ok(Self {
178 dir,
179 outbox_path,
180 inbox_path,
181 last_inbox_id: None,
182 last_inbox_timestamp: 0,
183 })
184 }
185
186 pub fn backend<P: AsRef<Path>>(dir: P) -> Result<Self> {
188 Self::new(dir, true)
189 }
190
191 pub fn frontend<P: AsRef<Path>>(dir: P) -> Result<Self> {
193 Self::new(dir, false)
194 }
195
196 pub fn dir(&self) -> &Path {
198 &self.dir
199 }
200
201 pub fn send(&self, message: &FileMessage) -> Result<()> {
203 let lock_path = self.outbox_path.with_extension("lock");
204 let _lock = FileLock::acquire(&lock_path)?;
205
206 let mut messages = self.read_message_file(&self.outbox_path)?;
208
209 messages.push(message.clone());
211
212 if messages.len() > 100 {
214 let skip_count = messages.len() - 100;
215 messages = messages.into_iter().skip(skip_count).collect();
216 }
217
218 let temp_path = self.outbox_path.with_extension("tmp");
220 let content = serde_json::to_string_pretty(&messages)
221 .map_err(|e| IpcError::serialization(e.to_string()))?;
222 fs::write(&temp_path, &content)?;
223 fs::rename(&temp_path, &self.outbox_path)?;
224
225 Ok(())
226 }
227
228 pub fn send_request(&self, method: &str, params: serde_json::Value) -> Result<String> {
230 let msg = FileMessage::request(method, params);
231 let id = msg.id.clone();
232 self.send(&msg)?;
233 Ok(id)
234 }
235
236 pub fn send_response(&self, request_id: &str, result: serde_json::Value) -> Result<()> {
238 let msg = FileMessage::response(request_id, result);
239 self.send(&msg)
240 }
241
242 pub fn send_error(&self, request_id: &str, error: &str) -> Result<()> {
244 let msg = FileMessage::error_response(request_id, error);
245 self.send(&msg)
246 }
247
248 pub fn send_event(&self, name: &str, payload: serde_json::Value) -> Result<()> {
250 let msg = FileMessage::event(name, payload);
251 self.send(&msg)
252 }
253
254 pub fn recv(&mut self) -> Result<Vec<FileMessage>> {
256 let messages = self.read_message_file(&self.inbox_path)?;
257
258 let new_messages: Vec<FileMessage> = messages
260 .into_iter()
261 .filter(|m| {
262 m.timestamp > self.last_inbox_timestamp
263 || (m.timestamp == self.last_inbox_timestamp
264 && self.last_inbox_id.as_ref() != Some(&m.id))
265 })
266 .collect();
267
268 if let Some(last) = new_messages.last() {
270 self.last_inbox_timestamp = last.timestamp;
271 self.last_inbox_id = Some(last.id.clone());
272 }
273
274 Ok(new_messages)
275 }
276
277 pub fn recv_one(&mut self) -> Result<Option<FileMessage>> {
279 let messages = self.recv()?;
280 Ok(messages.into_iter().next())
281 }
282
283 pub fn wait_response(&mut self, request_id: &str, timeout: Duration) -> Result<FileMessage> {
285 let start = std::time::Instant::now();
286 let poll_interval = Duration::from_millis(50);
287
288 loop {
289 let messages = self.recv()?;
290
291 for msg in messages {
292 if msg.msg_type == MessageType::Response
293 && msg.reply_to.as_ref() == Some(&request_id.to_string())
294 {
295 return Ok(msg);
296 }
297 }
298
299 if start.elapsed() > timeout {
300 return Err(IpcError::Timeout);
301 }
302
303 std::thread::sleep(poll_interval);
304 }
305 }
306
307 pub fn poll<F>(&mut self, interval: Duration, mut callback: F) -> Result<()>
309 where
310 F: FnMut(FileMessage) -> bool,
311 {
312 loop {
313 let messages = self.recv()?;
314
315 for msg in messages {
316 if !callback(msg) {
317 return Ok(());
318 }
319 }
320
321 std::thread::sleep(interval);
322 }
323 }
324
325 pub fn clear(&self) -> Result<()> {
327 fs::write(&self.outbox_path, "[]")?;
328 fs::write(&self.inbox_path, "[]")?;
329 Ok(())
330 }
331
332 fn read_message_file(&self, path: &Path) -> Result<Vec<FileMessage>> {
334 if !path.exists() {
335 return Ok(Vec::new());
336 }
337
338 let content = fs::read_to_string(path)?;
339 if content.trim().is_empty() || content.trim() == "[]" {
340 return Ok(Vec::new());
341 }
342
343 serde_json::from_str(&content).map_err(|e| IpcError::deserialization(e.to_string()))
344 }
345}
346
347struct FileLock {
349 path: PathBuf,
350}
351
352impl FileLock {
353 fn acquire(path: &Path) -> Result<Self> {
354 let path = path.to_path_buf();
355 let max_attempts = 50;
356 let wait_time = Duration::from_millis(10);
357
358 for _ in 0..max_attempts {
359 match OpenOptions::new().write(true).create_new(true).open(&path) {
360 Ok(mut file) => {
361 let _ = writeln!(file, "{}", std::process::id());
363 return Ok(Self { path });
364 }
365 Err(_) => {
366 std::thread::sleep(wait_time);
367 }
368 }
369 }
370
371 if let Ok(metadata) = fs::metadata(&path) {
373 if let Ok(modified) = metadata.modified() {
374 if modified.elapsed().unwrap_or_default() > Duration::from_secs(5) {
375 let _ = fs::remove_file(&path);
376 return Self::acquire(&path);
377 }
378 }
379 }
380
381 Err(IpcError::Timeout)
382 }
383}
384
385impl Drop for FileLock {
386 fn drop(&mut self) {
387 let _ = fs::remove_file(&self.path);
388 }
389}
390
391fn uuid_v4() -> String {
393 use std::collections::hash_map::RandomState;
394 use std::hash::{BuildHasher, Hasher};
395
396 let state = RandomState::new();
397 let mut hasher = state.build_hasher();
398 hasher.write_u64(current_timestamp_ms());
399 hasher.write_usize(std::process::id() as usize);
400 let h1 = hasher.finish();
401
402 let state2 = RandomState::new();
403 let mut hasher2 = state2.build_hasher();
404 hasher2.write_u64(h1);
405 let h2 = hasher2.finish();
406
407 format!(
408 "{:08x}-{:04x}-4{:03x}-{:04x}-{:012x}",
409 (h1 >> 32) as u32,
410 (h1 >> 16) as u16,
411 h1 as u16 & 0x0FFF,
412 (h2 >> 48) as u16 & 0x3FFF | 0x8000,
413 h2 & 0xFFFFFFFFFFFF
414 )
415}
416
417fn current_timestamp_ms() -> u64 {
419 SystemTime::now()
420 .duration_since(UNIX_EPOCH)
421 .unwrap_or_default()
422 .as_millis() as u64
423}
424
425#[cfg(test)]
426mod tests {
427 use super::*;
428 use std::thread;
429 use tempfile::tempdir;
430
431 #[test]
432 fn test_file_channel_basic() {
433 let dir = tempdir().unwrap();
434
435 let mut backend = FileChannel::backend(dir.path()).unwrap();
436 let mut frontend = FileChannel::frontend(dir.path()).unwrap();
437
438 let msg = FileMessage::request("ping", serde_json::json!({}));
440 backend.send(&msg).unwrap();
441
442 let received = frontend.recv().unwrap();
444 assert_eq!(received.len(), 1);
445 assert_eq!(received[0].method.as_ref().unwrap(), "ping");
446
447 frontend
449 .send_response(&received[0].id, serde_json::json!({"pong": true}))
450 .unwrap();
451
452 let responses = backend.recv().unwrap();
454 assert_eq!(responses.len(), 1);
455 assert_eq!(responses[0].reply_to.as_ref().unwrap(), &received[0].id);
456 }
457
458 #[test]
459 fn test_file_channel_concurrent() {
460 let dir = tempdir().unwrap();
461 let dir_path = dir.path().to_path_buf();
462
463 let handle = thread::spawn({
464 let dir_path = dir_path.clone();
465 move || {
466 let mut frontend = FileChannel::frontend(&dir_path).unwrap();
467 thread::sleep(Duration::from_millis(100));
468
469 loop {
471 let msgs = frontend.recv().unwrap();
472 for msg in msgs {
473 if msg.method.as_ref() == Some(&"test".to_string()) {
474 frontend
475 .send_response(&msg.id, serde_json::json!({"ok": true}))
476 .unwrap();
477 return;
478 }
479 }
480 thread::sleep(Duration::from_millis(50));
481 }
482 }
483 });
484
485 let mut backend = FileChannel::backend(&dir_path).unwrap();
486 let request_id = backend
487 .send_request("test", serde_json::json!({"value": 42}))
488 .unwrap();
489
490 let response = backend
491 .wait_response(&request_id, Duration::from_secs(5))
492 .unwrap();
493 assert!(response.payload.get("ok").unwrap().as_bool().unwrap());
494
495 handle.join().unwrap();
496 }
497}