1use std::path::PathBuf;
10
11use tauri::{Emitter, Runtime};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::UnixStream;
14
15use crate::desktop::ipc::{
16 decode_frame, encode_frame, IpcEvent, IpcRequest, IpcResponse, MAX_FRAME_SIZE,
17};
18use crate::error::ServiceError;
19use crate::models::{PluginEvent, StartConfig};
20
21pub struct IpcClient {
31 stream: UnixStream,
32}
33
34impl IpcClient {
35 pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
37 let stream = UnixStream::connect(&path)
38 .await
39 .map_err(|e| ServiceError::Ipc(format!("connect failed: {e}")))?;
40 Ok(Self { stream })
41 }
42
43 pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45 let request = IpcRequest::Start { config };
46 let response = self.send_and_read(&request).await?;
47 if response.ok {
48 Ok(())
49 } else {
50 Err(ServiceError::Ipc(
51 response.error.unwrap_or_else(|| "unknown error".into()),
52 ))
53 }
54 }
55
56 pub async fn stop(&mut self) -> Result<(), ServiceError> {
58 let response = self.send_and_read(&IpcRequest::Stop).await?;
59 if response.ok {
60 Ok(())
61 } else {
62 Err(ServiceError::Ipc(
63 response.error.unwrap_or_else(|| "unknown error".into()),
64 ))
65 }
66 }
67
68 pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70 let response = self.send_and_read(&IpcRequest::IsRunning).await?;
71 if response.ok {
72 Ok(response
73 .data
74 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75 .unwrap_or(false))
76 } else {
77 Err(ServiceError::Ipc(
78 response.error.unwrap_or_else(|| "unknown error".into()),
79 ))
80 }
81 }
82
83 pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
87 let frame = match self.read_frame().await? {
88 Some(f) => f,
89 None => return Ok(None),
90 };
91 decode_frame::<IpcEvent>(&frame)
92 .map(Some)
93 .map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))
94 }
95
96 pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
101 tokio::spawn(async move {
102 loop {
103 match self.read_event().await {
104 Ok(Some(event)) => {
105 let plugin_event = ipc_event_to_plugin_event(event);
106 let _ = app.emit("background-service://event", plugin_event);
107 }
108 Ok(None) => break,
109 Err(_) => break,
110 }
111 }
112 });
113 }
114
115 async fn send_and_read(
118 &mut self,
119 request: &IpcRequest,
120 ) -> Result<IpcResponse, ServiceError> {
121 self.send_request(request).await?;
122 loop {
131 let frame = self
132 .read_frame()
133 .await?
134 .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
135 if let Ok(resp) = decode_frame::<IpcResponse>(&frame) {
136 return Ok(resp);
137 }
138 log::debug!(
140 "send_and_read: discarding interleaved non-response frame ({} bytes)",
141 frame.len()
142 );
143 }
144 }
145
146 async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
147 let frame = encode_frame(request).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
148 self.stream
149 .write_all(&frame)
150 .await
151 .map_err(|e| ServiceError::Ipc(format!("send request: {e}")))?;
152 Ok(())
153 }
154
155 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
159 let mut len_buf = [0u8; 4];
160 match self.stream.read_exact(&mut len_buf).await {
161 Ok(_) => {}
162 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
163 Err(e) => return Err(ServiceError::Ipc(format!("read frame: {e}"))),
164 }
165 let len = u32::from_be_bytes(len_buf) as usize;
166 if len > MAX_FRAME_SIZE {
167 return Err(ServiceError::Ipc(format!("frame too large: {len}")));
168 }
169 if len == 0 {
170 return Ok(None);
171 }
172 let mut payload = vec![0u8; len];
173 self.stream
174 .read_exact(&mut payload)
175 .await
176 .map_err(|e| ServiceError::Ipc(format!("read payload: {e}")))?;
177 let mut frame = Vec::with_capacity(4 + len);
178 frame.extend_from_slice(&len_buf);
179 frame.extend_from_slice(&payload);
180 Ok(Some(frame))
181 }
182}
183
184pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
186 match event {
187 IpcEvent::Started => PluginEvent::Started,
188 IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
189 IpcEvent::Error { message } => PluginEvent::Error { message },
190 }
191}
192
193enum IpcCommand {
197 Start {
198 config: StartConfig,
199 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
200 },
201 Stop {
202 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
203 },
204 IsRunning {
205 reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
206 },
207}
208
209pub struct PersistentIpcClientHandle {
217 cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
218 shutdown: tokio_util::sync::CancellationToken,
219}
220
221impl Drop for PersistentIpcClientHandle {
222 fn drop(&mut self) {
223 self.shutdown.cancel();
224 }
225}
226
227impl PersistentIpcClientHandle {
228 pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
234 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
235 let shutdown = tokio_util::sync::CancellationToken::new();
236
237 tokio::spawn(persistent_client_loop(socket_path, app, cmd_rx, shutdown.clone()));
238
239 Self { cmd_tx, shutdown }
240 }
241
242 pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
244 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
245 self.cmd_tx
246 .send(IpcCommand::Start {
247 config,
248 reply: reply_tx,
249 })
250 .await
251 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
252 reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
253 }
254
255 pub async fn stop(&self) -> Result<(), ServiceError> {
257 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
258 self.cmd_tx
259 .send(IpcCommand::Stop { reply: reply_tx })
260 .await
261 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
262 reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
263 }
264
265 pub async fn is_running(&self) -> Result<bool, ServiceError> {
267 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
268 self.cmd_tx
269 .send(IpcCommand::IsRunning { reply: reply_tx })
270 .await
271 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
272 reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
273 }
274}
275
276async fn persistent_client_loop<R: Runtime>(
278 socket_path: PathBuf,
279 app: tauri::AppHandle<R>,
280 mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
281 shutdown: tokio_util::sync::CancellationToken,
282) {
283 loop {
284 tokio::select! {
285 biased;
286 _ = shutdown.cancelled() => {
287 log::info!("Persistent IPC client shutting down");
288 break;
289 }
290 connect_result = UnixStream::connect(&socket_path) => {
291 match connect_result {
292 Ok(stream) => {
293 log::info!("Persistent IPC client connected");
294 if run_persistent_connection(stream, &app, &mut cmd_rx).await.is_err() {
295 log::info!("Persistent IPC connection lost, reconnecting...");
296 }
297 }
298 Err(_) => {
299 log::debug!("Persistent IPC client: connection failed, retrying...");
300 }
301 }
302 tokio::select! {
303 biased;
304 _ = shutdown.cancelled() => {
305 log::info!("Persistent IPC client shutting down");
306 break;
307 }
308 _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
309 }
310 }
311 }
312 }
313}
314
315async fn run_persistent_connection<R: Runtime>(
322 stream: UnixStream,
323 app: &tauri::AppHandle<R>,
324 cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
325) -> Result<(), ServiceError> {
326 let (read_half, mut write_half) = stream.into_split();
327
328 let response_slot: std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>> =
330 std::sync::Arc::new(tokio::sync::Mutex::new(None));
331
332 let slot_writer = response_slot.clone();
333 let app_clone = app.clone();
334
335 let reader_handle = tokio::spawn(async move {
337 let mut read_half = read_half;
338 loop {
339 let frame = match read_frame_from(&mut read_half).await {
340 Ok(Some(f)) => f,
341 Ok(None) => break, Err(_) => break,
343 };
344
345 if let Ok(resp) = decode_frame::<IpcResponse>(&frame) {
347 let mut slot = slot_writer.lock().await;
348 if let Some(sender) = slot.take() {
349 let _ = sender.send(resp);
350 }
351 continue;
352 }
353
354 if let Ok(event) = decode_frame::<IpcEvent>(&frame) {
356 let plugin_event = ipc_event_to_plugin_event(event);
357 let _ = app_clone.emit("background-service://event", plugin_event);
358 continue;
359 }
360
361 }
363 });
364
365 let result = loop {
367 tokio::select! {
368 cmd = cmd_rx.recv() => {
369 let cmd = match cmd {
370 Some(c) => c,
371 None => break Err(ServiceError::Ipc("command channel closed".into())),
372 };
373
374 match cmd {
375 IpcCommand::Start { config, reply } => {
376 let request = IpcRequest::Start { config };
377 let rx = prepare_response_slot(&response_slot).await;
378 if let Err(e) = send_request_to(&mut write_half, &request).await {
379 let _ = reply.send(Err(e));
380 break Err(ServiceError::Ipc("send failed".into()));
381 }
382 let response = await_response(rx).await;
383 let result = match response {
384 Ok(resp) if resp.ok => Ok(()),
385 Ok(resp) => Err(ServiceError::Ipc(
386 resp.error.unwrap_or_else(|| "unknown error".into()),
387 )),
388 Err(e) => Err(e),
389 };
390 let _ = reply.send(result);
391 }
392 IpcCommand::Stop { reply } => {
393 let rx = prepare_response_slot(&response_slot).await;
394 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
395 let _ = reply.send(Err(e));
396 break Err(ServiceError::Ipc("send failed".into()));
397 }
398 let response = await_response(rx).await;
399 let result = match response {
400 Ok(resp) if resp.ok => Ok(()),
401 Ok(resp) => Err(ServiceError::Ipc(
402 resp.error.unwrap_or_else(|| "unknown error".into()),
403 )),
404 Err(e) => Err(e),
405 };
406 let _ = reply.send(result);
407 }
408 IpcCommand::IsRunning { reply } => {
409 let rx = prepare_response_slot(&response_slot).await;
410 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
411 let _ = reply.send(Err(e));
412 break Err(ServiceError::Ipc("send failed".into()));
413 }
414 let response = await_response(rx).await;
415 let result = match response {
416 Ok(resp) if resp.ok => Ok(resp
417 .data
418 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
419 .unwrap_or(false)),
420 Ok(resp) => Err(ServiceError::Ipc(
421 resp.error.unwrap_or_else(|| "unknown error".into()),
422 )),
423 Err(e) => Err(e),
424 };
425 let _ = reply.send(result);
426 }
427 }
428 }
429 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
430 if reader_handle.is_finished() {
432 break Err(ServiceError::Ipc("reader task died".into()));
433 }
434 }
435 }
436 };
437
438 reader_handle.abort();
439 result
440}
441
442async fn send_request_to(
444 write_half: &mut tokio::net::unix::OwnedWriteHalf,
445 request: &IpcRequest,
446) -> Result<(), ServiceError> {
447 let frame = encode_frame(request).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
448 write_half
449 .write_all(&frame)
450 .await
451 .map_err(|e| ServiceError::Ipc(format!("send: {e}")))?;
452 Ok(())
453}
454
455async fn prepare_response_slot(
463 slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
464) -> tokio::sync::oneshot::Receiver<IpcResponse> {
465 let (tx, rx) = tokio::sync::oneshot::channel();
466 let mut guard = slot.lock().await;
467 debug_assert!(
468 guard.is_none(),
469 "response slot overwritten — sequential command invariant violated"
470 );
471 *guard = Some(tx);
472 rx
473}
474
475async fn await_response(
480 rx: tokio::sync::oneshot::Receiver<IpcResponse>,
481) -> Result<IpcResponse, ServiceError> {
482 tokio::select! {
483 response = rx => {
484 response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
485 }
486 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
487 Err(ServiceError::Ipc("response timeout".into()))
488 }
489 }
490}
491
492async fn read_frame_from(
494 read_half: &mut tokio::net::unix::OwnedReadHalf,
495) -> Result<Option<Vec<u8>>, ServiceError> {
496 let mut len_buf = [0u8; 4];
497 match read_half.read_exact(&mut len_buf).await {
498 Ok(_) => {}
499 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
500 Err(e) => return Err(ServiceError::Ipc(format!("read frame: {e}"))),
501 }
502 let len = u32::from_be_bytes(len_buf) as usize;
503 if len > MAX_FRAME_SIZE {
504 return Err(ServiceError::Ipc(format!("frame too large: {len}")));
505 }
506 if len == 0 {
507 return Ok(None);
508 }
509 let mut payload = vec![0u8; len];
510 read_half
511 .read_exact(&mut payload)
512 .await
513 .map_err(|e| ServiceError::Ipc(format!("read payload: {e}")))?;
514 let mut frame = Vec::with_capacity(4 + len);
515 frame.extend_from_slice(&len_buf);
516 frame.extend_from_slice(&payload);
517 Ok(Some(frame))
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use crate::desktop::test_helpers::{
524 setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
525 };
526 use std::sync::atomic::Ordering;
527 use std::time::Duration;
528 use tauri::Listener;
529
530 #[tokio::test]
533 async fn ipc_client_connect() {
534 let (path, shutdown) = setup_server();
535 let result = IpcClient::connect(path).await;
536 assert!(result.is_ok(), "client should connect: {:?}", result.err());
537 shutdown.cancel();
538 }
539
540 #[tokio::test]
543 async fn ipc_client_send_start() {
544 let (path, shutdown) = setup_server();
545 let mut client = IpcClient::connect(path).await.unwrap();
546 let result = client.start(StartConfig::default()).await;
547 assert!(
548 result.is_ok(),
549 "start should succeed: {:?}",
550 result.err()
551 );
552 shutdown.cancel();
553 }
554
555 #[tokio::test]
558 async fn ipc_client_send_stop() {
559 let (path, shutdown) = setup_server();
560 let mut client = IpcClient::connect(path).await.unwrap();
561 client.start(StartConfig::default()).await.unwrap();
562 let result = client.stop().await;
563 assert!(
564 result.is_ok(),
565 "stop should succeed: {:?}",
566 result.err()
567 );
568 shutdown.cancel();
569 }
570
571 #[tokio::test]
574 async fn ipc_client_is_running() {
575 let (path, shutdown) = setup_server();
576 let mut client = IpcClient::connect(path).await.unwrap();
577
578 let running = client.is_running().await.unwrap();
579 assert!(!running, "should not be running initially");
580
581 client.start(StartConfig::default()).await.unwrap();
582 let running = client.is_running().await.unwrap();
583 assert!(running, "should be running after start");
584
585 shutdown.cancel();
586 }
587
588 #[tokio::test]
591 async fn ipc_client_receive_events() {
592 let (path, shutdown) =
593 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
594 let mut client = IpcClient::connect(path).await.unwrap();
595 client.start(StartConfig::default()).await.unwrap();
596
597 let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
598 .await
599 .expect("timed out waiting for event")
600 .expect("read_event failed");
601
602 assert!(event.is_some(), "should receive an event");
603 let event = event.unwrap();
604 assert!(
605 matches!(event, IpcEvent::Started),
606 "Expected Started event, got {:?}",
607 event
608 );
609
610 shutdown.cancel();
611 }
612
613 #[tokio::test]
616 async fn ipc_client_stop_when_not_running() {
617 let (path, shutdown) = setup_server();
618 let mut client = IpcClient::connect(path).await.unwrap();
619 let result = client.stop().await;
620 assert!(result.is_err(), "stop when not running should fail");
621 shutdown.cancel();
622 }
623
624 #[tokio::test]
627 async fn ipc_client_connect_to_nonexistent() {
628 let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
629 let result = IpcClient::connect(path).await;
630 assert!(
631 result.is_err(),
632 "should fail to connect to nonexistent socket"
633 );
634 }
635
636 #[test]
639 fn ipc_event_to_plugin_event_started() {
640 let event = IpcEvent::Started;
641 let plugin = ipc_event_to_plugin_event(event);
642 assert!(matches!(plugin, PluginEvent::Started));
643 }
644
645 #[test]
646 fn ipc_event_to_plugin_event_stopped() {
647 let event = IpcEvent::Stopped {
648 reason: "cancelled".into(),
649 };
650 let plugin = ipc_event_to_plugin_event(event);
651 match plugin {
652 PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
653 other => panic!("Expected Stopped, got {other:?}"),
654 }
655 }
656
657 #[test]
658 fn ipc_event_to_plugin_event_error() {
659 let event = IpcEvent::Error {
660 message: "init failed".into(),
661 };
662 let plugin = ipc_event_to_plugin_event(event);
663 match plugin {
664 PluginEvent::Error { message } => assert_eq!(message, "init failed"),
665 other => panic!("Expected Error, got {other:?}"),
666 }
667 }
668
669 #[tokio::test]
672 async fn ipc_client_full_lifecycle() {
673 let (path, shutdown) = setup_server();
674 let mut client = IpcClient::connect(path).await.unwrap();
675
676 assert!(!client.is_running().await.unwrap());
677 client.start(StartConfig::default()).await.unwrap();
678 assert!(client.is_running().await.unwrap());
679 client.stop().await.unwrap();
680 assert!(!client.is_running().await.unwrap());
681
682 shutdown.cancel();
683 }
684
685 #[tokio::test]
688 async fn ipc_client_listen_events() {
689 let (path, shutdown) =
690 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
691 let app = tauri::test::mock_app();
692
693 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
694 let received_clone = received.clone();
695 app.listen("background-service://event", move |_event| {
696 received_clone.store(true, Ordering::SeqCst);
697 });
698
699 let mut client = IpcClient::connect(path).await.unwrap();
700 client.start(StartConfig::default()).await.unwrap();
701 client.listen_events(app.handle().clone());
702
703 tokio::time::timeout(Duration::from_millis(500), async {
704 while !received.load(Ordering::SeqCst) {
705 tokio::time::sleep(Duration::from_millis(10)).await;
706 }
707 })
708 .await
709 .expect("timed out waiting for event via listen_events");
710
711 assert!(received.load(Ordering::SeqCst), "should have received event");
712 shutdown.cancel();
713 }
714
715 #[tokio::test]
727 async fn ipc_loopback_full_lifecycle_with_events() {
728 let (path, shutdown) = setup_server();
729 let mut client = IpcClient::connect(path).await.unwrap();
730
731 assert!(
733 !client.is_running().await.unwrap(),
734 "should not be running initially"
735 );
736
737 client
739 .start(StartConfig::default())
740 .await
741 .expect("start should succeed");
742
743 let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
746 .await
747 .expect("timed out waiting for Started event")
748 .expect("read_event failed")
749 .expect("should receive event");
750 assert!(
751 matches!(started, IpcEvent::Started),
752 "Expected Started event, got {started:?}"
753 );
754
755 assert!(
757 client.is_running().await.unwrap(),
758 "should be running after start"
759 );
760
761 client.stop().await.expect("stop should succeed");
763
764 let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
766 .await
767 .expect("timed out waiting for Stopped event")
768 .expect("read_event failed")
769 .expect("should receive event");
770 assert!(
771 matches!(stopped, IpcEvent::Stopped { .. }),
772 "Expected Stopped event, got {stopped:?}"
773 );
774
775 assert!(
777 !client.is_running().await.unwrap(),
778 "should not be running after stop"
779 );
780
781 shutdown.cancel();
782 }
783
784 #[tokio::test]
788 async fn ipc_loopback_event_streaming_plugin_event_conversion() {
789 let (path, shutdown) = setup_server();
790 let mut client = IpcClient::connect(path).await.unwrap();
791
792 client.start(StartConfig::default()).await.unwrap();
794 let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
795 .await
796 .expect("timed out")
797 .expect("read_event failed")
798 .expect("should receive event");
799 let started_plugin = ipc_event_to_plugin_event(started_ipc);
800 assert!(
801 matches!(started_plugin, PluginEvent::Started),
802 "Expected PluginEvent::Started, got {started_plugin:?}"
803 );
804
805 client.stop().await.unwrap();
807 let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
808 .await
809 .expect("timed out")
810 .expect("read_event failed")
811 .expect("should receive event");
812 let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
813 match stopped_plugin {
814 PluginEvent::Stopped { reason } => {
815 assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
816 }
817 other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
818 }
819
820 shutdown.cancel();
821 }
822
823 #[tokio::test]
828 async fn ipc_loopback_connection_drop_returns_error() {
829 let path = crate::desktop::test_helpers::unique_socket_path();
830
831 let listener = tokio::net::UnixListener::bind(&path).unwrap();
833 let path_clone = path.clone();
834
835 let client_handle = tokio::spawn(async move {
836 IpcClient::connect(path_clone).await.unwrap()
837 });
838
839 let (server_stream, _) = listener.accept().await.unwrap();
841 drop(server_stream);
842 tokio::time::sleep(Duration::from_millis(20)).await;
843
844 let mut client = client_handle.await.unwrap();
845
846 let result = client.is_running().await;
848 assert!(
849 result.is_err(),
850 "should get error after server drops connection"
851 );
852
853 let _ = std::fs::remove_file(&path);
854 }
855
856 #[tokio::test]
860 async fn ipc_loopback_double_start_returns_error() {
861 let (path, shutdown) = setup_server();
862 let mut client = IpcClient::connect(path).await.unwrap();
863
864 client.start(StartConfig::default()).await.unwrap();
865
866 let result = client.start(StartConfig::default()).await;
867 assert!(result.is_err(), "double start should return error");
868 let err_msg = result.unwrap_err().to_string();
869 assert!(
870 err_msg.to_lowercase().contains("already"),
871 "Error should mention 'already': {err_msg}"
872 );
873
874 shutdown.cancel();
875 }
876
877 #[tokio::test]
886 async fn persistent_client_connects() {
887 let (path, shutdown) = setup_server();
888 let app = tauri::test::mock_app();
889
890 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
891
892 tokio::time::sleep(Duration::from_millis(100)).await;
894
895 let running = handle.is_running().await;
897 assert!(
898 running.is_ok(),
899 "should get response via persistent connection: {:?}",
900 running.err()
901 );
902 assert!(!running.unwrap(), "should not be running initially");
903
904 shutdown.cancel();
905 }
906
907 #[tokio::test]
911 async fn persistent_client_reconnects() {
912 use crate::desktop::ipc_server::IpcServer;
913 use crate::manager::{manager_loop, ServiceFactory};
914 use tokio_util::sync::CancellationToken;
915
916 let (path, shutdown1) = setup_server();
918 let app = tauri::test::mock_app();
919
920 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
921
922 tokio::time::sleep(Duration::from_millis(100)).await;
924 let result = handle.is_running().await;
925 assert!(
926 result.is_ok(),
927 "should connect to first server: {:?}",
928 result.err()
929 );
930
931 shutdown1.cancel();
933 tokio::time::sleep(Duration::from_millis(150)).await;
934
935 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
937 let factory: ServiceFactory<tauri::test::MockRuntime> =
938 Box::new(|| Box::new(BlockingService));
939 tokio::spawn(manager_loop(cmd_rx2, factory, 0.0, 0.0));
940 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
941 let shutdown2 = CancellationToken::new();
942 let s2 = shutdown2.clone();
943 tokio::spawn(async move { server2.run(s2).await });
944
945 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
947 loop {
948 tokio::time::sleep(Duration::from_millis(200)).await;
949 if handle.is_running().await.is_ok() {
950 break;
951 }
952 }
953 })
954 .await;
955 assert!(
956 reconnected.is_ok(),
957 "persistent client should reconnect to second server"
958 );
959
960 shutdown2.cancel();
961 }
962
963 #[tokio::test]
968 async fn event_relay() {
969 let (path, shutdown) =
970 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
971 let app = tauri::test::mock_app();
972
973 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
974 let received_clone = received.clone();
975 app.listen("background-service://event", move |_event| {
976 received_clone.store(true, Ordering::SeqCst);
977 });
978
979 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
980
981 let result = handle.start(StartConfig::default()).await;
983 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
984
985 tokio::time::timeout(Duration::from_millis(500), async {
987 while !received.load(Ordering::SeqCst) {
988 tokio::time::sleep(Duration::from_millis(10)).await;
989 }
990 })
991 .await
992 .expect("timed out waiting for event relay via app.emit()");
993
994 assert!(
995 received.load(Ordering::SeqCst),
996 "event should be relayed through app.emit()"
997 );
998
999 shutdown.cancel();
1000 }
1001
1002 #[tokio::test]
1007 async fn start_stop_lifecycle() {
1008 let (path, shutdown) = setup_server();
1009 let app = tauri::test::mock_app();
1010
1011 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1012
1013 let running = handle.is_running().await.unwrap();
1015 assert!(!running, "should not be running initially");
1016
1017 handle
1019 .start(StartConfig::default())
1020 .await
1021 .expect("start should succeed");
1022 let running = handle.is_running().await.unwrap();
1023 assert!(running, "should be running after start");
1024
1025 handle.stop().await.expect("stop should succeed");
1027 let running = handle.is_running().await.unwrap();
1028 assert!(!running, "should not be running after stop");
1029
1030 shutdown.cancel();
1031 }
1032
1033 #[tokio::test]
1042 async fn persistent_client_timeout_on_unresponsive_server() {
1043 let path = crate::desktop::test_helpers::unique_socket_path();
1044 let listener = tokio::net::UnixListener::bind(&path).unwrap();
1045
1046 let server_handle = tokio::spawn(async move {
1048 let (_stream, _) = listener.accept().await.unwrap();
1049 tokio::time::sleep(Duration::from_secs(60)).await;
1051 });
1052
1053 let app = tauri::test::mock_app();
1054 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1055
1056 tokio::time::sleep(Duration::from_millis(100)).await;
1058
1059 let result = tokio::time::timeout(
1061 Duration::from_secs(15),
1062 handle.start(StartConfig::default()),
1063 )
1064 .await;
1065
1066 assert!(
1067 result.is_ok(),
1068 "start should not hang — expected error, got outer timeout"
1069 );
1070 let inner = result.unwrap();
1071 assert!(
1072 inner.is_err(),
1073 "start should return error when server is unresponsive"
1074 );
1075
1076 server_handle.abort();
1077 let _ = std::fs::remove_file(&path);
1078 }
1079
1080 #[tokio::test]
1086 async fn persistent_client_terminates_on_handle_drop() {
1087 let (path, shutdown) = setup_server();
1088 let app = tauri::test::mock_app();
1089
1090 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1091
1092 tokio::time::sleep(Duration::from_millis(100)).await;
1094
1095 drop(handle);
1097
1098 tokio::time::sleep(Duration::from_secs(2)).await;
1103
1104 shutdown.cancel();
1105 }
1106}