1use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17 decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21use crate::models::{PluginEvent, ServiceStatus, StartConfig};
22
23pub struct IpcClient {
33 stream: TransportStream,
34}
35
36impl IpcClient {
37 pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
39 let stream = transport::connect(&path).await?;
40 Ok(Self { stream })
41 }
42
43 pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45 let request = IpcRequest::Start { config };
46 let (response, _events) = self.send_and_read(&request).await?;
47 if response.ok {
48 Ok(())
49 } else {
50 Err(ServiceError::Ipc(
51 response.error.unwrap_or_else(|| "unknown error".into()),
52 ))
53 }
54 }
55
56 pub async fn stop(&mut self) -> Result<(), ServiceError> {
58 let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
59 if response.ok {
60 Ok(())
61 } else {
62 Err(ServiceError::Ipc(
63 response.error.unwrap_or_else(|| "unknown error".into()),
64 ))
65 }
66 }
67
68 pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70 let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
71 if response.ok {
72 Ok(response
73 .data
74 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75 .unwrap_or(false))
76 } else {
77 Err(ServiceError::Ipc(
78 response.error.unwrap_or_else(|| "unknown error".into()),
79 ))
80 }
81 }
82
83 pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
85 let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
86 if response.ok {
87 response
88 .data
89 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
90 .and_then(|d| {
91 serde_json::from_value::<ServiceStatus>(d)
92 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
93 })
94 } else {
95 Err(ServiceError::Ipc(
96 response.error.unwrap_or_else(|| "unknown error".into()),
97 ))
98 }
99 }
100
101 pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
105 let frame = match self.read_frame().await? {
106 Some(f) => f,
107 None => return Ok(None),
108 };
109 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
110 IpcMessage::Event(event) => Ok(Some(event)),
111 other => Err(ServiceError::Ipc(format!(
112 "expected event frame, got {:?}",
113 std::mem::discriminant(&other),
114 ))),
115 }
116 }
117
118 pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
123 tokio::spawn(async move {
124 loop {
125 match self.read_event().await {
126 Ok(Some(event)) => {
127 let plugin_event = ipc_event_to_plugin_event(event);
128 let _ = app.emit("background-service://event", plugin_event);
129 }
130 Ok(None) => break,
131 Err(_) => break,
132 }
133 }
134 });
135 }
136
137 async fn send_and_read(
140 &mut self,
141 request: &IpcRequest,
142 ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
143 self.send_request(request).await?;
144 let mut events = Vec::new();
148 loop {
149 let frame = self
150 .read_frame()
151 .await?
152 .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
153 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
154 IpcMessage::Response(resp) => return Ok((resp, events)),
155 IpcMessage::Event(e) => {
156 events.push(e);
157 }
158 IpcMessage::Request(_) => {
159 return Err(ServiceError::Ipc("unexpected request frame".into()));
160 }
161 }
162 }
163 }
164
165 async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
166 let msg = IpcMessage::Request(request.clone());
167 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
168 transport::write_frame(&mut self.stream, &frame)
169 .await
170 .map_err(ServiceError::Ipc)?;
171 Ok(())
172 }
173
174 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
179 transport::read_frame(&mut self.stream)
180 .await
181 .map_err(ServiceError::Ipc)
182 }
183}
184
185pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
187 match event {
188 IpcEvent::Started => PluginEvent::Started,
189 IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
190 IpcEvent::Error { message } => PluginEvent::Error { message },
191 }
192}
193
194enum IpcCommand {
198 Start {
199 config: StartConfig,
200 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
201 },
202 Stop {
203 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
204 },
205 IsRunning {
206 reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
207 },
208 GetState {
209 reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
210 },
211}
212
213pub struct PersistentIpcClientHandle {
221 cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
222 shutdown: tokio_util::sync::CancellationToken,
223 connected: Arc<AtomicBool>,
224}
225
226impl Drop for PersistentIpcClientHandle {
227 fn drop(&mut self) {
228 self.shutdown.cancel();
229 }
230}
231
232impl PersistentIpcClientHandle {
233 pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
239 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
240 let shutdown = tokio_util::sync::CancellationToken::new();
241 let connected = Arc::new(AtomicBool::new(false));
242
243 tokio::spawn(persistent_client_loop(
244 socket_path,
245 app,
246 cmd_rx,
247 shutdown.clone(),
248 connected.clone(),
249 ));
250
251 Self {
252 cmd_tx,
253 shutdown,
254 connected,
255 }
256 }
257
258 pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
260 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
261 self.cmd_tx
262 .send(IpcCommand::Start {
263 config,
264 reply: reply_tx,
265 })
266 .await
267 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
268 reply_rx
269 .await
270 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
271 }
272
273 pub async fn stop(&self) -> Result<(), ServiceError> {
275 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
276 self.cmd_tx
277 .send(IpcCommand::Stop { reply: reply_tx })
278 .await
279 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
280 reply_rx
281 .await
282 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
283 }
284
285 pub async fn is_running(&self) -> Result<bool, ServiceError> {
287 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
288 self.cmd_tx
289 .send(IpcCommand::IsRunning { reply: reply_tx })
290 .await
291 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
292 reply_rx
293 .await
294 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
295 }
296
297 pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
299 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
300 self.cmd_tx
301 .send(IpcCommand::GetState { reply: reply_tx })
302 .await
303 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
304 reply_rx
305 .await
306 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
307 }
308
309 pub fn is_connected(&self) -> bool {
312 self.connected.load(std::sync::atomic::Ordering::Relaxed)
313 }
314}
315
316async fn persistent_client_loop<R: Runtime>(
318 socket_path: PathBuf,
319 app: tauri::AppHandle<R>,
320 mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
321 shutdown: tokio_util::sync::CancellationToken,
322 connected: Arc<AtomicBool>,
323) {
324 use backon::BackoffBuilder;
325
326 let backoff_builder = backon::ExponentialBuilder::default()
327 .with_min_delay(Duration::from_secs(1))
328 .with_max_delay(Duration::from_secs(30))
329 .with_max_times(10)
330 .with_jitter();
331
332 let mut attempts = backoff_builder.build();
333
334 loop {
335 tokio::select! {
336 biased;
337 _ = shutdown.cancelled() => {
338 log::info!("Persistent IPC client shutting down");
339 connected.store(false, std::sync::atomic::Ordering::Relaxed);
340 break;
341 }
342 connect_result = transport::connect(&socket_path) => {
343 match connect_result {
344 Ok(stream) => {
345 log::info!("Persistent IPC client connected");
346 connected.store(true, std::sync::atomic::Ordering::Relaxed);
347 let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
348 attempts = backoff_builder.build();
350 if result.is_err() {
351 log::info!("Persistent IPC connection lost, reconnecting...");
352 connected.store(false, std::sync::atomic::Ordering::Relaxed);
353 }
354 }
355 Err(_) => {
356 log::debug!("Persistent IPC client: connection failed, retrying...");
357 connected.store(false, std::sync::atomic::Ordering::Relaxed);
358 }
359 }
360 let delay = match attempts.next() {
361 Some(d) => d,
362 None => {
363 log::warn!("Persistent IPC client: backoff exhausted, giving up");
364 break;
365 }
366 };
367 tokio::select! {
368 biased;
369 _ = shutdown.cancelled() => {
370 log::info!("Persistent IPC client shutting down");
371 connected.store(false, std::sync::atomic::Ordering::Relaxed);
372 break;
373 }
374 _ = tokio::time::sleep(delay) => {}
375 }
376 }
377 }
378 }
379}
380
381async fn run_persistent_connection<R: Runtime>(
388 stream: TransportStream,
389 app: &tauri::AppHandle<R>,
390 cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
391 connected: &Arc<AtomicBool>,
392) -> Result<(), ServiceError> {
393 let (read_half, mut write_half) = transport::split(stream);
394
395 let response_slot: std::sync::Arc<
397 tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
398 > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
399
400 let slot_writer = response_slot.clone();
401 let app_clone = app.clone();
402 let connected_reader = connected.clone();
403
404 let reader_handle = tokio::spawn(async move {
406 let mut read_half = read_half;
407 loop {
408 let frame = match read_frame_from(&mut read_half).await {
409 Ok(Some(f)) => f,
410 Ok(None) => break, Err(_) => break,
412 };
413
414 match decode_frame(&frame) {
415 Ok(IpcMessage::Response(resp)) => {
416 let mut slot = slot_writer.lock().await;
417 if let Some(sender) = slot.take() {
418 let _ = sender.send(resp);
419 }
420 continue;
421 }
422 Ok(IpcMessage::Event(event)) => {
423 let plugin_event = ipc_event_to_plugin_event(event);
424 let _ = app_clone.emit("background-service://event", plugin_event);
425 continue;
426 }
427 Ok(IpcMessage::Request(_)) => {
428 log::warn!("unexpected request frame on client connection");
429 continue;
430 }
431 Err(e) => {
432 log::debug!("failed to decode IPC frame: {e}");
433 continue;
434 }
435 }
436 }
437 connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
439 });
440
441 let result = loop {
443 tokio::select! {
444 cmd = cmd_rx.recv() => {
445 let cmd = match cmd {
446 Some(c) => c,
447 None => break Err(ServiceError::Ipc("command channel closed".into())),
448 };
449
450 match cmd {
451 IpcCommand::Start { config, reply } => {
452 let request = IpcRequest::Start { config };
453 let rx = prepare_response_slot(&response_slot).await;
454 if let Err(e) = send_request_to(&mut write_half, &request).await {
455 let _ = reply.send(Err(e));
456 break Err(ServiceError::Ipc("send failed".into()));
457 }
458 let response = await_response(rx).await;
459 let result = match response {
460 Ok(resp) if resp.ok => Ok(()),
461 Ok(resp) => Err(ServiceError::Ipc(
462 resp.error.unwrap_or_else(|| "unknown error".into()),
463 )),
464 Err(e) => Err(e),
465 };
466 let _ = reply.send(result);
467 }
468 IpcCommand::Stop { reply } => {
469 let rx = prepare_response_slot(&response_slot).await;
470 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
471 let _ = reply.send(Err(e));
472 break Err(ServiceError::Ipc("send failed".into()));
473 }
474 let response = await_response(rx).await;
475 let result = match response {
476 Ok(resp) if resp.ok => Ok(()),
477 Ok(resp) => Err(ServiceError::Ipc(
478 resp.error.unwrap_or_else(|| "unknown error".into()),
479 )),
480 Err(e) => Err(e),
481 };
482 let _ = reply.send(result);
483 }
484 IpcCommand::IsRunning { reply } => {
485 let rx = prepare_response_slot(&response_slot).await;
486 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
487 let _ = reply.send(Err(e));
488 break Err(ServiceError::Ipc("send failed".into()));
489 }
490 let response = await_response(rx).await;
491 let result = match response {
492 Ok(resp) if resp.ok => Ok(resp
493 .data
494 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
495 .unwrap_or(false)),
496 Ok(resp) => Err(ServiceError::Ipc(
497 resp.error.unwrap_or_else(|| "unknown error".into()),
498 )),
499 Err(e) => Err(e),
500 };
501 let _ = reply.send(result);
502 }
503 IpcCommand::GetState { reply } => {
504 let rx = prepare_response_slot(&response_slot).await;
505 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
506 let _ = reply.send(Err(e));
507 break Err(ServiceError::Ipc("send failed".into()));
508 }
509 let response = await_response(rx).await;
510 let result = match response {
511 Ok(resp) if resp.ok => resp
512 .data
513 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
514 .and_then(|d| {
515 serde_json::from_value::<ServiceStatus>(d)
516 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
517 }),
518 Ok(resp) => Err(ServiceError::Ipc(
519 resp.error.unwrap_or_else(|| "unknown error".into()),
520 )),
521 Err(e) => Err(e),
522 };
523 let _ = reply.send(result);
524 }
525 }
526 }
527 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
528 if reader_handle.is_finished() {
530 break Err(ServiceError::Ipc("reader task died".into()));
531 }
532 }
533 }
534 };
535
536 reader_handle.abort();
537 result
538}
539
540async fn send_request_to(
542 write_half: &mut TransportWriteHalf,
543 request: &IpcRequest,
544) -> Result<(), ServiceError> {
545 let msg = IpcMessage::Request(request.clone());
546 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
547 transport::write_frame(write_half, &frame)
548 .await
549 .map_err(ServiceError::Ipc)?;
550 Ok(())
551}
552
553async fn prepare_response_slot(
561 slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
562) -> tokio::sync::oneshot::Receiver<IpcResponse> {
563 let (tx, rx) = tokio::sync::oneshot::channel();
564 let mut guard = slot.lock().await;
565 debug_assert!(
566 guard.is_none(),
567 "response slot overwritten — sequential command invariant violated"
568 );
569 *guard = Some(tx);
570 rx
571}
572
573async fn await_response(
578 rx: tokio::sync::oneshot::Receiver<IpcResponse>,
579) -> Result<IpcResponse, ServiceError> {
580 tokio::select! {
581 response = rx => {
582 response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
583 }
584 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
585 Err(ServiceError::Ipc("response timeout".into()))
586 }
587 }
588}
589
590async fn read_frame_from(
594 read_half: &mut TransportReadHalf,
595) -> Result<Option<Vec<u8>>, ServiceError> {
596 transport::read_frame(read_half)
597 .await
598 .map_err(ServiceError::Ipc)
599}
600
601#[cfg(test)]
602mod tests {
603 use super::*;
604 use crate::desktop::test_helpers::{
605 setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
606 };
607 use std::sync::atomic::Ordering;
608 use std::time::Duration;
609 use tauri::Listener;
610
611 #[tokio::test]
614 async fn ipc_client_connect() {
615 let (path, shutdown, _event_tx) = setup_server();
616 let result = IpcClient::connect(path).await;
617 assert!(result.is_ok(), "client should connect: {:?}", result.err());
618 shutdown.cancel();
619 }
620
621 #[tokio::test]
624 async fn ipc_client_send_start() {
625 let (path, shutdown, _event_tx) = setup_server();
626 let mut client = IpcClient::connect(path).await.unwrap();
627 let result = client.start(StartConfig::default()).await;
628 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
629 shutdown.cancel();
630 }
631
632 #[tokio::test]
635 async fn ipc_client_send_stop() {
636 let (path, shutdown, _event_tx) = setup_server();
637 let mut client = IpcClient::connect(path).await.unwrap();
638 client.start(StartConfig::default()).await.unwrap();
639 let result = client.stop().await;
640 assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
641 shutdown.cancel();
642 }
643
644 #[tokio::test]
647 async fn ipc_client_is_running() {
648 let (path, shutdown, _event_tx) = setup_server();
649 let mut client = IpcClient::connect(path).await.unwrap();
650
651 let running = client.is_running().await.unwrap();
652 assert!(!running, "should not be running initially");
653
654 client.start(StartConfig::default()).await.unwrap();
655 let running = client.is_running().await.unwrap();
656 assert!(running, "should be running after start");
657
658 shutdown.cancel();
659 }
660
661 #[tokio::test]
664 async fn ipc_client_get_state_initial() {
665 let (path, shutdown, _event_tx) = setup_server();
666 let mut client = IpcClient::connect(path).await.unwrap();
667
668 let status = client.get_state().await.unwrap();
669 assert!(
670 matches!(status.state, crate::models::ServiceState::Idle),
671 "expected Idle, got {:?}",
672 status.state
673 );
674 assert_eq!(status.last_error, None);
675
676 shutdown.cancel();
677 }
678
679 #[tokio::test]
680 async fn ipc_client_get_state_after_start() {
681 let (path, shutdown, _event_tx) = setup_server();
682 let mut client = IpcClient::connect(path).await.unwrap();
683
684 client.start(StartConfig::default()).await.unwrap();
685
686 let status = tokio::time::timeout(Duration::from_secs(2), async {
689 loop {
690 let s = client.get_state().await.unwrap();
691 if matches!(s.state, crate::models::ServiceState::Running) {
692 return s;
693 }
694 tokio::time::sleep(Duration::from_millis(10)).await;
695 }
696 })
697 .await
698 .expect("timed out waiting for Running state");
699 assert_eq!(status.last_error, None);
700
701 shutdown.cancel();
702 }
703
704 #[tokio::test]
705 async fn ipc_client_get_state_after_stop() {
706 let (path, shutdown, _event_tx) = setup_server();
707 let mut client = IpcClient::connect(path).await.unwrap();
708
709 client.start(StartConfig::default()).await.unwrap();
710 client.stop().await.unwrap();
711 let status = client.get_state().await.unwrap();
712 assert!(
713 matches!(status.state, crate::models::ServiceState::Stopped),
714 "expected Stopped, got {:?}",
715 status.state
716 );
717
718 shutdown.cancel();
719 }
720
721 #[tokio::test]
724 async fn ipc_client_receive_events() {
725 let (path, shutdown, event_tx) =
726 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
727 let mut client = IpcClient::connect(path).await.unwrap();
728 client.start(StartConfig::default()).await.unwrap();
729
730 let _ = event_tx.send(IpcEvent::Started);
732
733 let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
734 .await
735 .expect("timed out waiting for event")
736 .expect("read_event failed");
737
738 assert!(event.is_some(), "should receive an event");
739 let event = event.unwrap();
740 assert!(
741 matches!(event, IpcEvent::Started),
742 "Expected Started event, got {:?}",
743 event
744 );
745
746 shutdown.cancel();
747 }
748
749 #[tokio::test]
752 async fn ipc_client_stop_when_not_running() {
753 let (path, shutdown, _event_tx) = setup_server();
754 let mut client = IpcClient::connect(path).await.unwrap();
755 let result = client.stop().await;
756 assert!(result.is_err(), "stop when not running should fail");
757 shutdown.cancel();
758 }
759
760 #[tokio::test]
763 async fn ipc_client_connect_to_nonexistent() {
764 let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
765 let result = IpcClient::connect(path).await;
766 assert!(
767 result.is_err(),
768 "should fail to connect to nonexistent socket"
769 );
770 }
771
772 #[test]
775 fn ipc_event_to_plugin_event_started() {
776 let event = IpcEvent::Started;
777 let plugin = ipc_event_to_plugin_event(event);
778 assert!(matches!(plugin, PluginEvent::Started));
779 }
780
781 #[test]
782 fn ipc_event_to_plugin_event_stopped() {
783 let event = IpcEvent::Stopped {
784 reason: "cancelled".into(),
785 };
786 let plugin = ipc_event_to_plugin_event(event);
787 match plugin {
788 PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
789 other => panic!("Expected Stopped, got {other:?}"),
790 }
791 }
792
793 #[test]
794 fn ipc_event_to_plugin_event_error() {
795 let event = IpcEvent::Error {
796 message: "init failed".into(),
797 };
798 let plugin = ipc_event_to_plugin_event(event);
799 match plugin {
800 PluginEvent::Error { message } => assert_eq!(message, "init failed"),
801 other => panic!("Expected Error, got {other:?}"),
802 }
803 }
804
805 #[tokio::test]
808 async fn ipc_client_full_lifecycle() {
809 let (path, shutdown, _event_tx) = setup_server();
810 let mut client = IpcClient::connect(path).await.unwrap();
811
812 assert!(!client.is_running().await.unwrap());
813 client.start(StartConfig::default()).await.unwrap();
814 assert!(client.is_running().await.unwrap());
815 client.stop().await.unwrap();
816 assert!(!client.is_running().await.unwrap());
817
818 shutdown.cancel();
819 }
820
821 #[tokio::test]
824 async fn ipc_client_listen_events() {
825 let (path, shutdown, event_tx) =
826 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
827 let app = tauri::test::mock_app();
828
829 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
830 let received_clone = received.clone();
831 app.listen("background-service://event", move |_event| {
832 received_clone.store(true, Ordering::SeqCst);
833 });
834
835 let mut client = IpcClient::connect(path).await.unwrap();
836 client.start(StartConfig::default()).await.unwrap();
837 client.listen_events(app.handle().clone());
838
839 let _ = event_tx.send(IpcEvent::Started);
841
842 tokio::time::timeout(Duration::from_millis(500), async {
843 while !received.load(Ordering::SeqCst) {
844 tokio::time::sleep(Duration::from_millis(10)).await;
845 }
846 })
847 .await
848 .expect("timed out waiting for event via listen_events");
849
850 assert!(
851 received.load(Ordering::SeqCst),
852 "should have received event"
853 );
854 shutdown.cancel();
855 }
856
857 #[tokio::test]
869 async fn ipc_loopback_full_lifecycle_with_events() {
870 let (path, shutdown, event_tx) = setup_server();
871 let mut client = IpcClient::connect(path).await.unwrap();
872
873 assert!(
875 !client.is_running().await.unwrap(),
876 "should not be running initially"
877 );
878
879 client
881 .start(StartConfig::default())
882 .await
883 .expect("start should succeed");
884
885 let _ = event_tx.send(IpcEvent::Started);
887
888 let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
891 .await
892 .expect("timed out waiting for Started event")
893 .expect("read_event failed")
894 .expect("should receive event");
895 assert!(
896 matches!(started, IpcEvent::Started),
897 "Expected Started event, got {started:?}"
898 );
899
900 assert!(
902 client.is_running().await.unwrap(),
903 "should be running after start"
904 );
905
906 client.stop().await.expect("stop should succeed");
908
909 let _ = event_tx.send(IpcEvent::Stopped {
911 reason: "cancelled".into(),
912 });
913
914 let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
916 .await
917 .expect("timed out waiting for Stopped event")
918 .expect("read_event failed")
919 .expect("should receive event");
920 assert!(
921 matches!(stopped, IpcEvent::Stopped { .. }),
922 "Expected Stopped event, got {stopped:?}"
923 );
924
925 assert!(
927 !client.is_running().await.unwrap(),
928 "should not be running after stop"
929 );
930
931 shutdown.cancel();
932 }
933
934 #[tokio::test]
938 async fn ipc_loopback_event_streaming_plugin_event_conversion() {
939 let (path, shutdown, event_tx) = setup_server();
940 let mut client = IpcClient::connect(path).await.unwrap();
941
942 client.start(StartConfig::default()).await.unwrap();
944 let _ = event_tx.send(IpcEvent::Started);
945 let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
946 .await
947 .expect("timed out")
948 .expect("read_event failed")
949 .expect("should receive event");
950 let started_plugin = ipc_event_to_plugin_event(started_ipc);
951 assert!(
952 matches!(started_plugin, PluginEvent::Started),
953 "Expected PluginEvent::Started, got {started_plugin:?}"
954 );
955
956 client.stop().await.unwrap();
958 let _ = event_tx.send(IpcEvent::Stopped {
959 reason: "cancelled".into(),
960 });
961 let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
962 .await
963 .expect("timed out")
964 .expect("read_event failed")
965 .expect("should receive event");
966 let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
967 match stopped_plugin {
968 PluginEvent::Stopped { reason } => {
969 assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
970 }
971 other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
972 }
973
974 shutdown.cancel();
975 }
976
977 #[tokio::test]
982 async fn ipc_loopback_connection_drop_returns_error() {
983 let path = crate::desktop::test_helpers::unique_socket_path();
984
985 let listener = transport::bind(path.clone()).unwrap();
987 let path_clone = path.clone();
988
989 let client_handle =
990 tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
991
992 let (server_stream, _) = listener.accept().await.unwrap();
994 drop(server_stream);
995 tokio::time::sleep(Duration::from_millis(20)).await;
996
997 let mut client = client_handle.await.unwrap();
998
999 let result = client.is_running().await;
1001 assert!(
1002 result.is_err(),
1003 "should get error after server drops connection"
1004 );
1005
1006 let _ = std::fs::remove_file(&path);
1007 }
1008
1009 #[tokio::test]
1013 async fn ipc_loopback_double_start_returns_error() {
1014 let (path, shutdown, _event_tx) = setup_server();
1015 let mut client = IpcClient::connect(path).await.unwrap();
1016
1017 client.start(StartConfig::default()).await.unwrap();
1018
1019 let result = client.start(StartConfig::default()).await;
1020 assert!(result.is_err(), "double start should return error");
1021 let err_msg = result.unwrap_err().to_string();
1022 assert!(
1023 err_msg.to_lowercase().contains("already"),
1024 "Error should mention 'already': {err_msg}"
1025 );
1026
1027 shutdown.cancel();
1028 }
1029
1030 #[tokio::test]
1039 async fn persistent_client_connects() {
1040 let (path, shutdown, _event_tx) = setup_server();
1041 let app = tauri::test::mock_app();
1042
1043 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1044
1045 tokio::time::sleep(Duration::from_millis(100)).await;
1047
1048 let running = handle.is_running().await;
1050 assert!(
1051 running.is_ok(),
1052 "should get response via persistent connection: {:?}",
1053 running.err()
1054 );
1055 assert!(!running.unwrap(), "should not be running initially");
1056
1057 shutdown.cancel();
1058 }
1059
1060 #[tokio::test]
1064 async fn persistent_client_reconnects() {
1065 use crate::desktop::ipc_server::IpcServer;
1066 use crate::manager::{manager_loop, ServiceFactory};
1067 use tokio_util::sync::CancellationToken;
1068
1069 let (path, shutdown1, _event_tx) = setup_server();
1071 let app = tauri::test::mock_app();
1072
1073 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1074
1075 tokio::time::sleep(Duration::from_millis(100)).await;
1077 let result = handle.is_running().await;
1078 assert!(
1079 result.is_ok(),
1080 "should connect to first server: {:?}",
1081 result.err()
1082 );
1083
1084 shutdown1.cancel();
1086 tokio::time::sleep(Duration::from_millis(150)).await;
1087
1088 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1090 let factory: ServiceFactory<tauri::test::MockRuntime> =
1091 Box::new(|| Box::new(BlockingService));
1092 tokio::spawn(manager_loop(
1093 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false,
1094 ));
1095 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1096 let shutdown2 = CancellationToken::new();
1097 let s2 = shutdown2.clone();
1098 tokio::spawn(async move { server2.run(s2).await });
1099
1100 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1102 loop {
1103 tokio::time::sleep(Duration::from_millis(200)).await;
1104 if handle.is_running().await.is_ok() {
1105 break;
1106 }
1107 }
1108 })
1109 .await;
1110 assert!(
1111 reconnected.is_ok(),
1112 "persistent client should reconnect to second server"
1113 );
1114
1115 shutdown2.cancel();
1116 }
1117
1118 #[tokio::test]
1123 async fn event_relay() {
1124 let (path, shutdown, event_tx) =
1125 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1126 let app = tauri::test::mock_app();
1127
1128 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1129 let received_clone = received.clone();
1130 app.listen("background-service://event", move |_event| {
1131 received_clone.store(true, Ordering::SeqCst);
1132 });
1133
1134 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1135
1136 let result = handle.start(StartConfig::default()).await;
1138 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1139
1140 let _ = event_tx.send(IpcEvent::Started);
1142
1143 tokio::time::timeout(Duration::from_millis(500), async {
1145 while !received.load(Ordering::SeqCst) {
1146 tokio::time::sleep(Duration::from_millis(10)).await;
1147 }
1148 })
1149 .await
1150 .expect("timed out waiting for event relay via app.emit()");
1151
1152 assert!(
1153 received.load(Ordering::SeqCst),
1154 "event should be relayed through app.emit()"
1155 );
1156
1157 shutdown.cancel();
1158 }
1159
1160 #[tokio::test]
1165 async fn start_stop_lifecycle() {
1166 let (path, shutdown, _event_tx) = setup_server();
1167 let app = tauri::test::mock_app();
1168
1169 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1170
1171 let running = handle.is_running().await.unwrap();
1173 assert!(!running, "should not be running initially");
1174
1175 handle
1177 .start(StartConfig::default())
1178 .await
1179 .expect("start should succeed");
1180 let running = handle.is_running().await.unwrap();
1181 assert!(running, "should be running after start");
1182
1183 handle.stop().await.expect("stop should succeed");
1185 let running = handle.is_running().await.unwrap();
1186 assert!(!running, "should not be running after stop");
1187
1188 shutdown.cancel();
1189 }
1190
1191 #[tokio::test]
1194 async fn persistent_client_get_state() {
1195 let (path, shutdown, _event_tx) = setup_server();
1196 let app = tauri::test::mock_app();
1197
1198 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1199
1200 tokio::time::sleep(Duration::from_millis(100)).await;
1202
1203 let status = handle.get_state().await.unwrap();
1204 assert!(
1205 matches!(status.state, crate::models::ServiceState::Idle),
1206 "expected Idle, got {:?}",
1207 status.state
1208 );
1209
1210 handle.start(StartConfig::default()).await.unwrap();
1211
1212 let status = tokio::time::timeout(Duration::from_secs(2), async {
1215 loop {
1216 let s = handle.get_state().await.unwrap();
1217 if matches!(s.state, crate::models::ServiceState::Running) {
1218 return s;
1219 }
1220 tokio::time::sleep(Duration::from_millis(10)).await;
1221 }
1222 })
1223 .await
1224 .expect("timed out waiting for Running state");
1225 assert!(
1226 matches!(status.state, crate::models::ServiceState::Running),
1227 "expected Running, got {:?}",
1228 status.state
1229 );
1230
1231 shutdown.cancel();
1232 }
1233
1234 #[tokio::test]
1243 async fn persistent_client_timeout_on_unresponsive_server() {
1244 let path = crate::desktop::test_helpers::unique_socket_path();
1245 let listener = transport::bind(path.clone()).unwrap();
1246
1247 let server_handle = tokio::spawn(async move {
1249 let (_stream, _) = listener.accept().await.unwrap();
1250 tokio::time::sleep(Duration::from_secs(60)).await;
1252 });
1253
1254 let app = tauri::test::mock_app();
1255 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1256
1257 tokio::time::sleep(Duration::from_millis(100)).await;
1259
1260 let result = tokio::time::timeout(
1262 Duration::from_secs(15),
1263 handle.start(StartConfig::default()),
1264 )
1265 .await;
1266
1267 assert!(
1268 result.is_ok(),
1269 "start should not hang — expected error, got outer timeout"
1270 );
1271 let inner = result.unwrap();
1272 assert!(
1273 inner.is_err(),
1274 "start should return error when server is unresponsive"
1275 );
1276
1277 server_handle.abort();
1278 let _ = std::fs::remove_file(&path);
1279 }
1280
1281 #[tokio::test]
1287 async fn persistent_client_terminates_on_handle_drop() {
1288 let (path, shutdown, _event_tx) = setup_server();
1289 let app = tauri::test::mock_app();
1290
1291 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1292
1293 tokio::time::sleep(Duration::from_millis(100)).await;
1295
1296 drop(handle);
1298
1299 tokio::time::sleep(Duration::from_secs(2)).await;
1304
1305 shutdown.cancel();
1306 }
1307
1308 async fn buffered_server(
1315 path: &std::path::Path,
1316 frames: Vec<IpcMessage>,
1317 ) -> tokio::task::JoinHandle<()> {
1318 let listener = transport::bind(path.to_path_buf()).unwrap();
1319 tokio::spawn(async move {
1320 let (mut stream, _) = listener.accept().await.unwrap();
1321 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1322 let mut len_buf = [0u8; 4];
1324 if stream.read_exact(&mut len_buf).await.is_err() {
1325 return;
1326 }
1327 let len = u32::from_be_bytes(len_buf) as usize;
1328 let mut payload = vec![0u8; len];
1329 if stream.read_exact(&mut payload).await.is_err() {
1330 return;
1331 }
1332 for msg in &frames {
1334 let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1335 if stream.write_all(&frame).await.is_err() {
1336 return;
1337 }
1338 }
1339 })
1340 }
1341
1342 #[tokio::test]
1344 async fn send_and_read_no_interleaved_events() {
1345 let path = crate::desktop::test_helpers::unique_socket_path();
1346 let server = buffered_server(
1347 &path,
1348 vec![IpcMessage::Response(IpcResponse {
1349 ok: true,
1350 data: None,
1351 error: None,
1352 })],
1353 )
1354 .await;
1355
1356 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1357 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1358 assert!(response.ok, "response should be ok");
1359 assert!(
1360 events.is_empty(),
1361 "events should be empty when no events interleave, got {:?}",
1362 events
1363 );
1364
1365 server.await.unwrap();
1366 let _ = std::fs::remove_file(&path);
1367 }
1368
1369 #[tokio::test]
1371 async fn send_and_read_single_interleaved_event() {
1372 let path = crate::desktop::test_helpers::unique_socket_path();
1373 let server = buffered_server(
1374 &path,
1375 vec![
1376 IpcMessage::Event(IpcEvent::Started),
1377 IpcMessage::Response(IpcResponse {
1378 ok: true,
1379 data: None,
1380 error: None,
1381 }),
1382 ],
1383 )
1384 .await;
1385
1386 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1387 let (response, events) = client
1388 .send_and_read(&IpcRequest::Start {
1389 config: StartConfig::default(),
1390 })
1391 .await
1392 .unwrap();
1393 assert!(response.ok, "response should be ok");
1394 assert_eq!(events.len(), 1, "should collect exactly one event");
1395 assert!(
1396 matches!(events[0], IpcEvent::Started),
1397 "expected Started event, got {:?}",
1398 events[0]
1399 );
1400
1401 server.await.unwrap();
1402 let _ = std::fs::remove_file(&path);
1403 }
1404
1405 #[tokio::test]
1412 async fn is_connected_false_before_server() {
1413 let app = tauri::test::mock_app();
1414 let path = crate::desktop::test_helpers::unique_socket_path();
1415 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1417 tokio::time::sleep(Duration::from_millis(50)).await;
1420 assert!(
1421 !handle.is_connected(),
1422 "should not be connected when no server is running"
1423 );
1424 let _ = std::fs::remove_file(&path);
1425 }
1426
1427 #[tokio::test]
1430 async fn is_connected_true_after_connect() {
1431 let (path, shutdown, _event_tx) = setup_server();
1432 let app = tauri::test::mock_app();
1433 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1434
1435 tokio::time::timeout(Duration::from_secs(2), async {
1437 while !handle.is_connected() {
1438 tokio::time::sleep(Duration::from_millis(50)).await;
1439 }
1440 })
1441 .await
1442 .expect("timed out waiting for is_connected to become true");
1443
1444 assert!(
1445 handle.is_connected(),
1446 "should be connected after server is up"
1447 );
1448
1449 shutdown.cancel();
1450 }
1451
1452 #[tokio::test]
1458 async fn is_connected_false_after_server_shutdown() {
1459 let path = crate::desktop::test_helpers::unique_socket_path();
1460 let path_clone = path.clone();
1461 let listener = transport::bind(path.clone()).unwrap();
1462
1463 let server_handle = tokio::spawn(async move {
1466 let (stream, _) = listener.accept().await.unwrap();
1467 tokio::time::sleep(Duration::from_millis(200)).await;
1469 drop(stream);
1471 let _ = std::fs::remove_file(&path_clone);
1473 });
1474
1475 let app = tauri::test::mock_app();
1476 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1477
1478 tokio::time::timeout(Duration::from_secs(2), async {
1480 while !handle.is_connected() {
1481 tokio::time::sleep(Duration::from_millis(50)).await;
1482 }
1483 })
1484 .await
1485 .expect("timed out waiting for initial connection");
1486
1487 assert!(handle.is_connected(), "should be connected initially");
1488
1489 tokio::time::timeout(Duration::from_secs(3), async {
1491 while handle.is_connected() {
1492 tokio::time::sleep(Duration::from_millis(50)).await;
1493 }
1494 })
1495 .await
1496 .expect("timed out waiting for is_connected to become false");
1497
1498 assert!(
1499 !handle.is_connected(),
1500 "should not be connected after server shutdown"
1501 );
1502
1503 server_handle.abort();
1504 let _ = std::fs::remove_file(&path);
1505 }
1506
1507 #[test]
1515 fn backoff_builder_produces_increasing_delays() {
1516 use backon::BackoffBuilder;
1517
1518 let builder = backon::ExponentialBuilder::default()
1519 .with_min_delay(Duration::from_secs(1))
1520 .with_max_delay(Duration::from_secs(30))
1521 .with_max_times(10)
1522 .with_jitter();
1523
1524 let mut attempts = builder.build();
1525 let mut delays = Vec::new();
1526 while let Some(d) = attempts.next() {
1527 delays.push(d);
1528 }
1529
1530 assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1531
1532 assert!(
1534 delays[0] >= Duration::from_millis(500),
1535 "first delay too short: {:?}",
1536 delays[0]
1537 );
1538 assert!(
1539 delays[0] <= Duration::from_secs(2),
1540 "first delay too long: {:?}",
1541 delays[0]
1542 );
1543
1544 assert!(
1546 delays[9] >= Duration::from_secs(15),
1547 "last delay should approach max: {:?}",
1548 delays[9]
1549 );
1550
1551 for d in &delays {
1553 assert!(
1554 *d <= Duration::from_secs(60),
1555 "delay exceeds max_delay + jitter margin: {:?}",
1556 d
1557 );
1558 }
1559
1560 assert!(
1562 attempts.next().is_none(),
1563 "should return None after 10 attempts"
1564 );
1565 }
1566
1567 #[ignore]
1574 #[tokio::test]
1575 async fn persistent_client_exits_after_max_retries() {
1576 let app = tauri::test::mock_app();
1577 let path = crate::desktop::test_helpers::unique_socket_path();
1578 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1579
1580 let exited = tokio::time::timeout(Duration::from_secs(180), async {
1584 loop {
1585 tokio::time::sleep(Duration::from_secs(5)).await;
1586 if let Err(e) = handle.is_running().await {
1587 if e.to_string().contains("shut down") {
1588 return;
1589 }
1590 }
1591 }
1592 })
1593 .await;
1594
1595 assert!(
1596 exited.is_ok(),
1597 "persistent client should exit after max retries"
1598 );
1599 assert!(!handle.is_connected(), "should not be connected after exit");
1600
1601 let _ = std::fs::remove_file(&path);
1602 }
1603
1604 #[tokio::test]
1608 async fn persistent_client_reconnects_after_server_restart() {
1609 use crate::desktop::ipc_server::IpcServer;
1610 use crate::manager::{manager_loop, ServiceFactory};
1611 use tokio_util::sync::CancellationToken;
1612
1613 let (path, shutdown1, _event_tx) = setup_server();
1615 let app = tauri::test::mock_app();
1616 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1617
1618 tokio::time::timeout(Duration::from_secs(2), async {
1620 while !handle.is_connected() {
1621 tokio::time::sleep(Duration::from_millis(50)).await;
1622 }
1623 })
1624 .await
1625 .expect("should connect to first server");
1626
1627 let result = handle.is_running().await;
1629 assert!(
1630 result.is_ok(),
1631 "command should succeed on first server: {:?}",
1632 result.err()
1633 );
1634
1635 shutdown1.cancel();
1637 tokio::time::sleep(Duration::from_millis(150)).await;
1638
1639 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1641 let factory: ServiceFactory<tauri::test::MockRuntime> =
1642 Box::new(|| Box::new(BlockingService));
1643 tokio::spawn(manager_loop(
1644 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false,
1645 ));
1646 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1647 let shutdown2 = CancellationToken::new();
1648 let s2 = shutdown2.clone();
1649 tokio::spawn(async move { server2.run(s2).await });
1650
1651 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1654 loop {
1655 if handle.is_connected() {
1656 break;
1657 }
1658 tokio::time::sleep(Duration::from_millis(100)).await;
1659 }
1660 })
1661 .await;
1662
1663 assert!(
1664 reconnected.is_ok(),
1665 "persistent client should reconnect after server restart (backoff resets)"
1666 );
1667
1668 let result = handle.is_running().await;
1670 assert!(
1671 result.is_ok(),
1672 "commands should work after reconnection: {:?}",
1673 result.err()
1674 );
1675
1676 shutdown2.cancel();
1677 }
1678
1679 #[tokio::test]
1687 async fn ipc_client_rejects_zero_length_frame() {
1688 let path = crate::desktop::test_helpers::unique_socket_path();
1689 let listener = transport::bind(path.clone()).unwrap();
1690
1691 let server_handle = tokio::spawn(async move {
1693 let (mut stream, _) = listener.accept().await.unwrap();
1694 use tokio::io::AsyncWriteExt;
1695 stream.write_all(&[0u8; 4]).await.unwrap();
1696 tokio::time::sleep(Duration::from_millis(500)).await;
1697 });
1698
1699 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1700
1701 let result = client.read_frame().await;
1703 assert!(
1704 result.is_err(),
1705 "zero-length frame should return error, got {:?}",
1706 result
1707 );
1708 let err = result.unwrap_err().to_string();
1709 assert!(
1710 err.contains("zero-length frame"),
1711 "Error should mention 'zero-length frame': {err}"
1712 );
1713
1714 server_handle.abort();
1715 let _ = std::fs::remove_file(&path);
1716 }
1717
1718 #[tokio::test]
1721 async fn ipc_client_eof_returns_ok_none() {
1722 let path = crate::desktop::test_helpers::unique_socket_path();
1723 let listener = transport::bind(path.clone()).unwrap();
1724
1725 let server_handle = tokio::spawn(async move {
1727 let (stream, _) = listener.accept().await.unwrap();
1728 drop(stream);
1729 });
1730
1731 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1732 tokio::time::sleep(Duration::from_millis(20)).await;
1733
1734 let result = client.read_frame().await;
1736 assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1737 assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1738
1739 server_handle.abort();
1740 let _ = std::fs::remove_file(&path);
1741 }
1742
1743 #[tokio::test]
1745 async fn send_and_read_multiple_interleaved_events() {
1746 let path = crate::desktop::test_helpers::unique_socket_path();
1747 let server = buffered_server(
1748 &path,
1749 vec![
1750 IpcMessage::Event(IpcEvent::Started),
1751 IpcMessage::Event(IpcEvent::Error {
1752 message: "warning".into(),
1753 }),
1754 IpcMessage::Event(IpcEvent::Stopped {
1755 reason: "cancelled".into(),
1756 }),
1757 IpcMessage::Response(IpcResponse {
1758 ok: true,
1759 data: Some(serde_json::json!({"running": false})),
1760 error: None,
1761 }),
1762 ],
1763 )
1764 .await;
1765
1766 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1767 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1768 assert!(response.ok, "response should be ok");
1769 assert_eq!(events.len(), 3, "should collect all three events");
1770 assert!(
1771 matches!(events[0], IpcEvent::Started),
1772 "first event should be Started"
1773 );
1774 assert!(
1775 matches!(events[1], IpcEvent::Error { .. }),
1776 "second event should be Error"
1777 );
1778 assert!(
1779 matches!(events[2], IpcEvent::Stopped { .. }),
1780 "third event should be Stopped"
1781 );
1782
1783 server.await.unwrap();
1784 let _ = std::fs::remove_file(&path);
1785 }
1786}