Skip to main content

tauri_plugin_background_service/desktop/
ipc_client.rs

1//! Desktop IPC client for the GUI process.
2//!
3//! [`IpcClient`] connects to the headless sidecar's Unix domain socket and
4//! provides methods to start/stop the background service and receive events
5//! over the IPC protocol.
6//!
7//! Only available when the `desktop-service` Cargo feature is enabled.
8
9use std::path::PathBuf;
10
11use tauri::{Emitter, Runtime};
12use tokio::io::{AsyncReadExt, AsyncWriteExt};
13use tokio::net::UnixStream;
14
15use crate::desktop::ipc::{
16    decode_frame, encode_frame, IpcEvent, IpcRequest, IpcResponse, MAX_FRAME_SIZE,
17};
18use crate::error::ServiceError;
19use crate::models::{PluginEvent, StartConfig};
20
21/// IPC client for communicating with the headless sidecar service.
22///
23/// Connects to the sidecar's Unix domain socket and translates method calls
24/// into [`IpcRequest`] messages. Responses are decoded from [`IpcResponse`]
25/// frames.
26///
27/// Events from the sidecar (started/stopped/error) are read as [`IpcEvent`]
28/// frames and converted to [`PluginEvent`] for emission via the Tauri event
29/// system.
30pub struct IpcClient {
31    stream: UnixStream,
32}
33
34impl IpcClient {
35    /// Connect to the sidecar's IPC socket at the given path.
36    pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
37        let stream = UnixStream::connect(&path)
38            .await
39            .map_err(|e| ServiceError::Ipc(format!("connect failed: {e}")))?;
40        Ok(Self { stream })
41    }
42
43    /// Send a Start command to the sidecar.
44    pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45        let request = IpcRequest::Start { config };
46        let response = self.send_and_read(&request).await?;
47        if response.ok {
48            Ok(())
49        } else {
50            Err(ServiceError::Ipc(
51                response.error.unwrap_or_else(|| "unknown error".into()),
52            ))
53        }
54    }
55
56    /// Send a Stop command to the sidecar.
57    pub async fn stop(&mut self) -> Result<(), ServiceError> {
58        let response = self.send_and_read(&IpcRequest::Stop).await?;
59        if response.ok {
60            Ok(())
61        } else {
62            Err(ServiceError::Ipc(
63                response.error.unwrap_or_else(|| "unknown error".into()),
64            ))
65        }
66    }
67
68    /// Send an IsRunning query to the sidecar.
69    pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70        let response = self.send_and_read(&IpcRequest::IsRunning).await?;
71        if response.ok {
72            Ok(response
73                .data
74                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75                .unwrap_or(false))
76        } else {
77            Err(ServiceError::Ipc(
78                response.error.unwrap_or_else(|| "unknown error".into()),
79            ))
80        }
81    }
82
83    /// Read the next [`IpcEvent`] from the socket.
84    ///
85    /// Returns `None` if the connection was closed.
86    pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
87        let frame = match self.read_frame().await? {
88            Some(f) => f,
89            None => return Ok(None),
90        };
91        decode_frame::<IpcEvent>(&frame)
92            .map(Some)
93            .map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))
94    }
95
96    /// Spawn a background task that reads [`IpcEvent`] frames and emits
97    /// [`PluginEvent`] via the given `AppHandle`.
98    ///
99    /// The task runs until the socket is closed or an error occurs.
100    pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
101        tokio::spawn(async move {
102            loop {
103                match self.read_event().await {
104                    Ok(Some(event)) => {
105                        let plugin_event = ipc_event_to_plugin_event(event);
106                        let _ = app.emit("background-service://event", plugin_event);
107                    }
108                    Ok(None) => break,
109                    Err(_) => break,
110                }
111            }
112        });
113    }
114
115    // -- Private helpers -------------------------------------------------------
116
117    async fn send_and_read(
118        &mut self,
119        request: &IpcRequest,
120    ) -> Result<IpcResponse, ServiceError> {
121        self.send_request(request).await?;
122        // The server interleaves IpcResponse and broadcast IpcEvent frames on
123        // the same socket. Read frames in a loop until we get one that
124        // deserialises as an IpcResponse.
125        //
126        // IpcEvent frames encountered here are discarded. Direct IpcClient users
127        // should use `listen_events()` (which takes ownership of self) for event
128        // consumption. For PersistentIpcClientHandle, the background reader task
129        // handles events — these interleaved frames are redundant.
130        loop {
131            let frame = self
132                .read_frame()
133                .await?
134                .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
135            if let Ok(resp) = decode_frame::<IpcResponse>(&frame) {
136                return Ok(resp);
137            }
138            // Not a response frame — log it at debug level and skip.
139            log::debug!(
140                "send_and_read: discarding interleaved non-response frame ({} bytes)",
141                frame.len()
142            );
143        }
144    }
145
146    async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
147        let frame = encode_frame(request).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
148        self.stream
149            .write_all(&frame)
150            .await
151            .map_err(|e| ServiceError::Ipc(format!("send request: {e}")))?;
152        Ok(())
153    }
154
155    /// Read a single length-prefixed frame from the socket.
156    ///
157    /// Returns `None` if the connection was closed cleanly.
158    async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
159        let mut len_buf = [0u8; 4];
160        match self.stream.read_exact(&mut len_buf).await {
161            Ok(_) => {}
162            Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
163            Err(e) => return Err(ServiceError::Ipc(format!("read frame: {e}"))),
164        }
165        let len = u32::from_be_bytes(len_buf) as usize;
166        if len > MAX_FRAME_SIZE {
167            return Err(ServiceError::Ipc(format!("frame too large: {len}")));
168        }
169        if len == 0 {
170            return Ok(None);
171        }
172        let mut payload = vec![0u8; len];
173        self.stream
174            .read_exact(&mut payload)
175            .await
176            .map_err(|e| ServiceError::Ipc(format!("read payload: {e}")))?;
177        let mut frame = Vec::with_capacity(4 + len);
178        frame.extend_from_slice(&len_buf);
179        frame.extend_from_slice(&payload);
180        Ok(Some(frame))
181    }
182}
183
184/// Convert an [`IpcEvent`] to a [`PluginEvent`].
185pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
186    match event {
187        IpcEvent::Started => PluginEvent::Started,
188        IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
189        IpcEvent::Error { message } => PluginEvent::Error { message },
190    }
191}
192
193// ─── Persistent IPC Client ────────────────────────────────────────────────────
194
195/// Internal command sent from the handle to the background connection task.
196enum IpcCommand {
197    Start {
198        config: StartConfig,
199        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
200    },
201    Stop {
202        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
203    },
204    IsRunning {
205        reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
206    },
207}
208
209/// Handle to a persistent IPC client that maintains a long-lived connection
210/// to the headless sidecar.
211///
212/// The background task automatically:
213/// - Relays [`IpcEvent`] frames to `app.emit("background-service://event", ...)`
214/// - Reconnects on connection failure with a 1-second delay
215/// - Forwards commands (start/stop/is_running) over the same connection
216pub struct PersistentIpcClientHandle {
217    cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
218    shutdown: tokio_util::sync::CancellationToken,
219}
220
221impl Drop for PersistentIpcClientHandle {
222    fn drop(&mut self) {
223        self.shutdown.cancel();
224    }
225}
226
227impl PersistentIpcClientHandle {
228    /// Spawn the persistent IPC client background task.
229    ///
230    /// The task immediately begins trying to connect to the socket at
231    /// `socket_path`. Events are relayed to the Tauri event system via
232    /// `app.emit()`.
233    pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
234        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
235        let shutdown = tokio_util::sync::CancellationToken::new();
236
237        tokio::spawn(persistent_client_loop(socket_path, app, cmd_rx, shutdown.clone()));
238
239        Self { cmd_tx, shutdown }
240    }
241
242    /// Send a Start command through the persistent connection.
243    pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
244        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
245        self.cmd_tx
246            .send(IpcCommand::Start {
247                config,
248                reply: reply_tx,
249            })
250            .await
251            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
252        reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
253    }
254
255    /// Send a Stop command through the persistent connection.
256    pub async fn stop(&self) -> Result<(), ServiceError> {
257        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
258        self.cmd_tx
259            .send(IpcCommand::Stop { reply: reply_tx })
260            .await
261            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
262        reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
263    }
264
265    /// Query whether the service is running through the persistent connection.
266    pub async fn is_running(&self) -> Result<bool, ServiceError> {
267        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
268        self.cmd_tx
269            .send(IpcCommand::IsRunning { reply: reply_tx })
270            .await
271            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
272        reply_rx.await.map_err(|_| ServiceError::Ipc("command dropped".into()))?
273    }
274}
275
276/// Background task: maintain a persistent connection with reconnection.
277async fn persistent_client_loop<R: Runtime>(
278    socket_path: PathBuf,
279    app: tauri::AppHandle<R>,
280    mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
281    shutdown: tokio_util::sync::CancellationToken,
282) {
283    loop {
284        tokio::select! {
285            biased;
286            _ = shutdown.cancelled() => {
287                log::info!("Persistent IPC client shutting down");
288                break;
289            }
290            connect_result = UnixStream::connect(&socket_path) => {
291                match connect_result {
292                    Ok(stream) => {
293                        log::info!("Persistent IPC client connected");
294                        if run_persistent_connection(stream, &app, &mut cmd_rx).await.is_err() {
295                            log::info!("Persistent IPC connection lost, reconnecting...");
296                        }
297                    }
298                    Err(_) => {
299                        log::debug!("Persistent IPC client: connection failed, retrying...");
300                    }
301                }
302                tokio::select! {
303                    biased;
304                    _ = shutdown.cancelled() => {
305                        log::info!("Persistent IPC client shutting down");
306                        break;
307                    }
308                    _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}
309                }
310            }
311        }
312    }
313}
314
315/// Run a single persistent connection until it fails.
316///
317/// Splits the stream into read/write halves:
318/// - A reader task continuously reads frames and relays events to `app.emit()`.
319///   When a response frame arrives, it forwards it via a shared oneshot channel.
320/// - The main loop receives commands from `cmd_rx` and sends requests.
321async fn run_persistent_connection<R: Runtime>(
322    stream: UnixStream,
323    app: &tauri::AppHandle<R>,
324    cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
325) -> Result<(), ServiceError> {
326    let (read_half, mut write_half) = stream.into_split();
327
328    // Shared slot for the reader task to deliver response frames.
329    let response_slot: std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>> =
330        std::sync::Arc::new(tokio::sync::Mutex::new(None));
331
332    let slot_writer = response_slot.clone();
333    let app_clone = app.clone();
334
335    // Reader task: reads frames and either relays events or delivers responses.
336    let reader_handle = tokio::spawn(async move {
337        let mut read_half = read_half;
338        loop {
339            let frame = match read_frame_from(&mut read_half).await {
340                Ok(Some(f)) => f,
341                Ok(None) => break, // Connection closed
342                Err(_) => break,
343            };
344
345            // Try to decode as IpcResponse first (command reply)
346            if let Ok(resp) = decode_frame::<IpcResponse>(&frame) {
347                let mut slot = slot_writer.lock().await;
348                if let Some(sender) = slot.take() {
349                    let _ = sender.send(resp);
350                }
351                continue;
352            }
353
354            // Try to decode as IpcEvent
355            if let Ok(event) = decode_frame::<IpcEvent>(&frame) {
356                let plugin_event = ipc_event_to_plugin_event(event);
357                let _ = app_clone.emit("background-service://event", plugin_event);
358                continue;
359            }
360
361            // Unknown frame type — skip
362        }
363    });
364
365    // Main loop: receive commands, send requests, wait for responses.
366    let result = loop {
367        tokio::select! {
368            cmd = cmd_rx.recv() => {
369                let cmd = match cmd {
370                    Some(c) => c,
371                    None => break Err(ServiceError::Ipc("command channel closed".into())),
372                };
373
374                match cmd {
375                    IpcCommand::Start { config, reply } => {
376                        let request = IpcRequest::Start { config };
377                        let rx = prepare_response_slot(&response_slot).await;
378                        if let Err(e) = send_request_to(&mut write_half, &request).await {
379                            let _ = reply.send(Err(e));
380                            break Err(ServiceError::Ipc("send failed".into()));
381                        }
382                        let response = await_response(rx).await;
383                        let result = match response {
384                            Ok(resp) if resp.ok => Ok(()),
385                            Ok(resp) => Err(ServiceError::Ipc(
386                                resp.error.unwrap_or_else(|| "unknown error".into()),
387                            )),
388                            Err(e) => Err(e),
389                        };
390                        let _ = reply.send(result);
391                    }
392                    IpcCommand::Stop { reply } => {
393                        let rx = prepare_response_slot(&response_slot).await;
394                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
395                            let _ = reply.send(Err(e));
396                            break Err(ServiceError::Ipc("send failed".into()));
397                        }
398                        let response = await_response(rx).await;
399                        let result = match response {
400                            Ok(resp) if resp.ok => Ok(()),
401                            Ok(resp) => Err(ServiceError::Ipc(
402                                resp.error.unwrap_or_else(|| "unknown error".into()),
403                            )),
404                            Err(e) => Err(e),
405                        };
406                        let _ = reply.send(result);
407                    }
408                    IpcCommand::IsRunning { reply } => {
409                        let rx = prepare_response_slot(&response_slot).await;
410                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
411                            let _ = reply.send(Err(e));
412                            break Err(ServiceError::Ipc("send failed".into()));
413                        }
414                        let response = await_response(rx).await;
415                        let result = match response {
416                            Ok(resp) if resp.ok => Ok(resp
417                                .data
418                                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
419                                .unwrap_or(false)),
420                            Ok(resp) => Err(ServiceError::Ipc(
421                                resp.error.unwrap_or_else(|| "unknown error".into()),
422                            )),
423                            Err(e) => Err(e),
424                        };
425                        let _ = reply.send(result);
426                    }
427                }
428            }
429            _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
430                // Timeout — check if reader is still alive
431                if reader_handle.is_finished() {
432                    break Err(ServiceError::Ipc("reader task died".into()));
433                }
434            }
435        }
436    };
437
438    reader_handle.abort();
439    result
440}
441
442/// Send an IPC request frame through a write half.
443async fn send_request_to(
444    write_half: &mut tokio::net::unix::OwnedWriteHalf,
445    request: &IpcRequest,
446) -> Result<(), ServiceError> {
447    let frame = encode_frame(request).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
448    write_half
449        .write_all(&frame)
450        .await
451        .map_err(|e| ServiceError::Ipc(format!("send: {e}")))?;
452    Ok(())
453}
454
455/// Prepare the shared response slot for an upcoming request.
456///
457/// Creates a oneshot channel and stores the sender in `slot` so the reader
458/// task can deliver the next response. Returns the receiver end.
459///
460/// Must be called **before** sending the request to prevent losing fast
461/// responses that arrive before the slot is set.
462async fn prepare_response_slot(
463    slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
464) -> tokio::sync::oneshot::Receiver<IpcResponse> {
465    let (tx, rx) = tokio::sync::oneshot::channel();
466    let mut guard = slot.lock().await;
467    debug_assert!(
468        guard.is_none(),
469        "response slot overwritten — sequential command invariant violated"
470    );
471    *guard = Some(tx);
472    rx
473}
474
475/// Await a response from the reader task with a timeout.
476///
477/// Returns `Err` if the response doesn't arrive within 10 seconds, preventing
478/// permanent hangs when the connection drops during command processing.
479async fn await_response(
480    rx: tokio::sync::oneshot::Receiver<IpcResponse>,
481) -> Result<IpcResponse, ServiceError> {
482    tokio::select! {
483        response = rx => {
484            response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
485        }
486        _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
487            Err(ServiceError::Ipc("response timeout".into()))
488        }
489    }
490}
491
492/// Read a single length-prefixed frame from a read half.
493async fn read_frame_from(
494    read_half: &mut tokio::net::unix::OwnedReadHalf,
495) -> Result<Option<Vec<u8>>, ServiceError> {
496    let mut len_buf = [0u8; 4];
497    match read_half.read_exact(&mut len_buf).await {
498        Ok(_) => {}
499        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
500        Err(e) => return Err(ServiceError::Ipc(format!("read frame: {e}"))),
501    }
502    let len = u32::from_be_bytes(len_buf) as usize;
503    if len > MAX_FRAME_SIZE {
504        return Err(ServiceError::Ipc(format!("frame too large: {len}")));
505    }
506    if len == 0 {
507        return Ok(None);
508    }
509    let mut payload = vec![0u8; len];
510    read_half
511        .read_exact(&mut payload)
512        .await
513        .map_err(|e| ServiceError::Ipc(format!("read payload: {e}")))?;
514    let mut frame = Vec::with_capacity(4 + len);
515    frame.extend_from_slice(&len_buf);
516    frame.extend_from_slice(&payload);
517    Ok(Some(frame))
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use crate::desktop::test_helpers::{
524        setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
525    };
526    use std::sync::atomic::Ordering;
527    use std::time::Duration;
528    use tauri::Listener;
529
530    // -- AC1: Client connects ---------------------------------------------------
531
532    #[tokio::test]
533    async fn ipc_client_connect() {
534        let (path, shutdown) = setup_server();
535        let result = IpcClient::connect(path).await;
536        assert!(result.is_ok(), "client should connect: {:?}", result.err());
537        shutdown.cancel();
538    }
539
540    // -- AC2: Start command works -----------------------------------------------
541
542    #[tokio::test]
543    async fn ipc_client_send_start() {
544        let (path, shutdown) = setup_server();
545        let mut client = IpcClient::connect(path).await.unwrap();
546        let result = client.start(StartConfig::default()).await;
547        assert!(
548            result.is_ok(),
549            "start should succeed: {:?}",
550            result.err()
551        );
552        shutdown.cancel();
553    }
554
555    // -- AC3: Stop command works ------------------------------------------------
556
557    #[tokio::test]
558    async fn ipc_client_send_stop() {
559        let (path, shutdown) = setup_server();
560        let mut client = IpcClient::connect(path).await.unwrap();
561        client.start(StartConfig::default()).await.unwrap();
562        let result = client.stop().await;
563        assert!(
564            result.is_ok(),
565            "stop should succeed: {:?}",
566            result.err()
567        );
568        shutdown.cancel();
569    }
570
571    // -- AC4: IsRunning returns status ------------------------------------------
572
573    #[tokio::test]
574    async fn ipc_client_is_running() {
575        let (path, shutdown) = setup_server();
576        let mut client = IpcClient::connect(path).await.unwrap();
577
578        let running = client.is_running().await.unwrap();
579        assert!(!running, "should not be running initially");
580
581        client.start(StartConfig::default()).await.unwrap();
582        let running = client.is_running().await.unwrap();
583        assert!(running, "should be running after start");
584
585        shutdown.cancel();
586    }
587
588    // -- AC5: Events are received -----------------------------------------------
589
590    #[tokio::test]
591    async fn ipc_client_receive_events() {
592        let (path, shutdown) =
593            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
594        let mut client = IpcClient::connect(path).await.unwrap();
595        client.start(StartConfig::default()).await.unwrap();
596
597        let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
598            .await
599            .expect("timed out waiting for event")
600            .expect("read_event failed");
601
602        assert!(event.is_some(), "should receive an event");
603        let event = event.unwrap();
604        assert!(
605            matches!(event, IpcEvent::Started),
606            "Expected Started event, got {:?}",
607            event
608        );
609
610        shutdown.cancel();
611    }
612
613    // -- Additional: Stop when not running returns error -------------------------
614
615    #[tokio::test]
616    async fn ipc_client_stop_when_not_running() {
617        let (path, shutdown) = setup_server();
618        let mut client = IpcClient::connect(path).await.unwrap();
619        let result = client.stop().await;
620        assert!(result.is_err(), "stop when not running should fail");
621        shutdown.cancel();
622    }
623
624    // -- Additional: Connect to nonexistent socket fails -------------------------
625
626    #[tokio::test]
627    async fn ipc_client_connect_to_nonexistent() {
628        let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
629        let result = IpcClient::connect(path).await;
630        assert!(
631            result.is_err(),
632            "should fail to connect to nonexistent socket"
633        );
634    }
635
636    // -- Additional: ipc_event_to_plugin_event conversion -----------------------
637
638    #[test]
639    fn ipc_event_to_plugin_event_started() {
640        let event = IpcEvent::Started;
641        let plugin = ipc_event_to_plugin_event(event);
642        assert!(matches!(plugin, PluginEvent::Started));
643    }
644
645    #[test]
646    fn ipc_event_to_plugin_event_stopped() {
647        let event = IpcEvent::Stopped {
648            reason: "cancelled".into(),
649        };
650        let plugin = ipc_event_to_plugin_event(event);
651        match plugin {
652            PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
653            other => panic!("Expected Stopped, got {other:?}"),
654        }
655    }
656
657    #[test]
658    fn ipc_event_to_plugin_event_error() {
659        let event = IpcEvent::Error {
660            message: "init failed".into(),
661        };
662        let plugin = ipc_event_to_plugin_event(event);
663        match plugin {
664            PluginEvent::Error { message } => assert_eq!(message, "init failed"),
665            other => panic!("Expected Error, got {other:?}"),
666        }
667    }
668
669    // -- Additional: Full lifecycle ---------------------------------------------
670
671    #[tokio::test]
672    async fn ipc_client_full_lifecycle() {
673        let (path, shutdown) = setup_server();
674        let mut client = IpcClient::connect(path).await.unwrap();
675
676        assert!(!client.is_running().await.unwrap());
677        client.start(StartConfig::default()).await.unwrap();
678        assert!(client.is_running().await.unwrap());
679        client.stop().await.unwrap();
680        assert!(!client.is_running().await.unwrap());
681
682        shutdown.cancel();
683    }
684
685    // -- Additional: listen_events spawns and converts events -------------------
686
687    #[tokio::test]
688    async fn ipc_client_listen_events() {
689        let (path, shutdown) =
690            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
691        let app = tauri::test::mock_app();
692
693        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
694        let received_clone = received.clone();
695        app.listen("background-service://event", move |_event| {
696            received_clone.store(true, Ordering::SeqCst);
697        });
698
699        let mut client = IpcClient::connect(path).await.unwrap();
700        client.start(StartConfig::default()).await.unwrap();
701        client.listen_events(app.handle().clone());
702
703        tokio::time::timeout(Duration::from_millis(500), async {
704            while !received.load(Ordering::SeqCst) {
705                tokio::time::sleep(Duration::from_millis(10)).await;
706            }
707        })
708        .await
709        .expect("timed out waiting for event via listen_events");
710
711        assert!(received.load(Ordering::SeqCst), "should have received event");
712        shutdown.cancel();
713    }
714
715    // ═══════════════════════════════════════════════════════════════════════
716    //  IPC LOOPBACK TESTS (Step 20 — AC2, AC3, AC4)
717    // ═══════════════════════════════════════════════════════════════════════
718
719    // -- AC2: IPC loopback full lifecycle with event verification ---------------
720
721    /// Comprehensive IPC loopback: IpcServer + IpcClient in the same process.
722    /// Exercises start → Started event → running → stop → Stopped event → stopped.
723    ///
724    /// Note: IpcEvent frames must be read BEFORE other requests because
725    /// `send_and_read` skips event frames looking for IpcResponse.
726    #[tokio::test]
727    async fn ipc_loopback_full_lifecycle_with_events() {
728        let (path, shutdown) = setup_server();
729        let mut client = IpcClient::connect(path).await.unwrap();
730
731        // Initially not running
732        assert!(
733            !client.is_running().await.unwrap(),
734            "should not be running initially"
735        );
736
737        // Start the service
738        client
739            .start(StartConfig::default())
740            .await
741            .expect("start should succeed");
742
743        // Read the Started event BEFORE any other request
744        // (send_and_read on subsequent calls would skip buffered events)
745        let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
746            .await
747            .expect("timed out waiting for Started event")
748            .expect("read_event failed")
749            .expect("should receive event");
750        assert!(
751            matches!(started, IpcEvent::Started),
752            "Expected Started event, got {started:?}"
753        );
754
755        // Verify running (after consuming the event)
756        assert!(
757            client.is_running().await.unwrap(),
758            "should be running after start"
759        );
760
761        // Stop the service
762        client.stop().await.expect("stop should succeed");
763
764        // Read the Stopped event BEFORE any other request
765        let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
766            .await
767            .expect("timed out waiting for Stopped event")
768            .expect("read_event failed")
769            .expect("should receive event");
770        assert!(
771            matches!(stopped, IpcEvent::Stopped { .. }),
772            "Expected Stopped event, got {stopped:?}"
773        );
774
775        // Verify not running
776        assert!(
777            !client.is_running().await.unwrap(),
778            "should not be running after stop"
779        );
780
781        shutdown.cancel();
782    }
783
784    // -- AC3: Event streaming converts IpcEvent to PluginEvent -------------------
785
786    /// Verify events streamed through IPC are correctly converted to PluginEvent.
787    #[tokio::test]
788    async fn ipc_loopback_event_streaming_plugin_event_conversion() {
789        let (path, shutdown) = setup_server();
790        let mut client = IpcClient::connect(path).await.unwrap();
791
792        // Start — expect Started event → PluginEvent::Started
793        client.start(StartConfig::default()).await.unwrap();
794        let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
795            .await
796            .expect("timed out")
797            .expect("read_event failed")
798            .expect("should receive event");
799        let started_plugin = ipc_event_to_plugin_event(started_ipc);
800        assert!(
801            matches!(started_plugin, PluginEvent::Started),
802            "Expected PluginEvent::Started, got {started_plugin:?}"
803        );
804
805        // Stop — expect Stopped event → PluginEvent::Stopped
806        client.stop().await.unwrap();
807        let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
808            .await
809            .expect("timed out")
810            .expect("read_event failed")
811            .expect("should receive event");
812        let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
813        match stopped_plugin {
814            PluginEvent::Stopped { reason } => {
815                assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
816            }
817            other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
818        }
819
820        shutdown.cancel();
821    }
822
823    // -- AC4: Error handling — connection drop detected by client ---------------
824
825    /// Verify client detects a dropped connection gracefully (no panic).
826    /// Simulates the server side closing the socket mid-connection.
827    #[tokio::test]
828    async fn ipc_loopback_connection_drop_returns_error() {
829        let path = crate::desktop::test_helpers::unique_socket_path();
830
831        // Create a minimal "server" that accepts one connection then drops it.
832        let listener = tokio::net::UnixListener::bind(&path).unwrap();
833        let path_clone = path.clone();
834
835        let client_handle = tokio::spawn(async move {
836            IpcClient::connect(path_clone).await.unwrap()
837        });
838
839        // Accept the connection and immediately drop the server-side stream.
840        let (server_stream, _) = listener.accept().await.unwrap();
841        drop(server_stream);
842        tokio::time::sleep(Duration::from_millis(20)).await;
843
844        let mut client = client_handle.await.unwrap();
845
846        // Client should detect the closed connection on next operation.
847        let result = client.is_running().await;
848        assert!(
849            result.is_err(),
850            "should get error after server drops connection"
851        );
852
853        let _ = std::fs::remove_file(&path);
854    }
855
856    // -- AC4: Error handling — double start returns error through IPC ------------
857
858    /// Verify second start (when already running) returns an IPC error.
859    #[tokio::test]
860    async fn ipc_loopback_double_start_returns_error() {
861        let (path, shutdown) = setup_server();
862        let mut client = IpcClient::connect(path).await.unwrap();
863
864        client.start(StartConfig::default()).await.unwrap();
865
866        let result = client.start(StartConfig::default()).await;
867        assert!(result.is_err(), "double start should return error");
868        let err_msg = result.unwrap_err().to_string();
869        assert!(
870            err_msg.to_lowercase().contains("already"),
871            "Error should mention 'already': {err_msg}"
872        );
873
874        shutdown.cancel();
875    }
876
877    // ═══════════════════════════════════════════════════════════════════════
878    //  PERSISTENT IPC CLIENT TESTS (Step 12)
879    // ═══════════════════════════════════════════════════════════════════════
880
881    // -- AC1: Persistent client connects and maintains connection --
882
883    /// Verify the persistent client connects to a running server and can
884    /// forward commands through the persistent connection.
885    #[tokio::test]
886    async fn persistent_client_connects() {
887        let (path, shutdown) = setup_server();
888        let app = tauri::test::mock_app();
889
890        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
891
892        // Give the background task time to connect.
893        tokio::time::sleep(Duration::from_millis(100)).await;
894
895        // Send a command through the persistent connection.
896        let running = handle.is_running().await;
897        assert!(
898            running.is_ok(),
899            "should get response via persistent connection: {:?}",
900            running.err()
901        );
902        assert!(!running.unwrap(), "should not be running initially");
903
904        shutdown.cancel();
905    }
906
907    // -- AC3: Auto-reconnect --
908
909    /// Verify the persistent client reconnects after the server restarts.
910    #[tokio::test]
911    async fn persistent_client_reconnects() {
912        use crate::desktop::ipc_server::IpcServer;
913        use crate::manager::{manager_loop, ServiceFactory};
914        use tokio_util::sync::CancellationToken;
915
916        // First server
917        let (path, shutdown1) = setup_server();
918        let app = tauri::test::mock_app();
919
920        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
921
922        // Verify connected to first server.
923        tokio::time::sleep(Duration::from_millis(100)).await;
924        let result = handle.is_running().await;
925        assert!(
926            result.is_ok(),
927            "should connect to first server: {:?}",
928            result.err()
929        );
930
931        // Kill first server and wait for socket cleanup.
932        shutdown1.cancel();
933        tokio::time::sleep(Duration::from_millis(150)).await;
934
935        // Start second server at the same path.
936        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
937        let factory: ServiceFactory<tauri::test::MockRuntime> =
938            Box::new(|| Box::new(BlockingService));
939        tokio::spawn(manager_loop(cmd_rx2, factory, 0.0, 0.0));
940        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
941        let shutdown2 = CancellationToken::new();
942        let s2 = shutdown2.clone();
943        tokio::spawn(async move { server2.run(s2).await });
944
945        // Wait for the client to reconnect (1s reconnect delay + margin).
946        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
947            loop {
948                tokio::time::sleep(Duration::from_millis(200)).await;
949                if handle.is_running().await.is_ok() {
950                    break;
951                }
952            }
953        })
954        .await;
955        assert!(
956            reconnected.is_ok(),
957            "persistent client should reconnect to second server"
958        );
959
960        shutdown2.cancel();
961    }
962
963    // -- AC2: Event relay via app.emit() --
964
965    /// Verify events from the server are relayed to `app.emit()` by the
966    /// persistent client's background reader task.
967    #[tokio::test]
968    async fn event_relay() {
969        let (path, shutdown) =
970            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
971        let app = tauri::test::mock_app();
972
973        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
974        let received_clone = received.clone();
975        app.listen("background-service://event", move |_event| {
976            received_clone.store(true, Ordering::SeqCst);
977        });
978
979        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
980
981        // Start the service — the reader task should relay the Started event.
982        let result = handle.start(StartConfig::default()).await;
983        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
984
985        // Wait for the event to be relayed via app.emit().
986        tokio::time::timeout(Duration::from_millis(500), async {
987            while !received.load(Ordering::SeqCst) {
988                tokio::time::sleep(Duration::from_millis(10)).await;
989            }
990        })
991        .await
992        .expect("timed out waiting for event relay via app.emit()");
993
994        assert!(
995            received.load(Ordering::SeqCst),
996            "event should be relayed through app.emit()"
997        );
998
999        shutdown.cancel();
1000    }
1001
1002    // -- AC4: Start/Stop lifecycle through persistent client --
1003
1004    /// Verify the full start → running → stop → not-running lifecycle works
1005    /// through the persistent IPC client.
1006    #[tokio::test]
1007    async fn start_stop_lifecycle() {
1008        let (path, shutdown) = setup_server();
1009        let app = tauri::test::mock_app();
1010
1011        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1012
1013        // Initially not running.
1014        let running = handle.is_running().await.unwrap();
1015        assert!(!running, "should not be running initially");
1016
1017        // Start.
1018        handle
1019            .start(StartConfig::default())
1020            .await
1021            .expect("start should succeed");
1022        let running = handle.is_running().await.unwrap();
1023        assert!(running, "should be running after start");
1024
1025        // Stop.
1026        handle.stop().await.expect("stop should succeed");
1027        let running = handle.is_running().await.unwrap();
1028        assert!(!running, "should not be running after stop");
1029
1030        shutdown.cancel();
1031    }
1032
1033    // -- Fix: Timeout prevents permanent hang on unresponsive server --
1034
1035    /// Verify the persistent client returns an error (not hang) when the
1036    /// server accepts a connection but never responds to a command.
1037    ///
1038    /// This is a regression test for the critical bug where `wait_for_response`
1039    /// had no timeout — a dropped connection during command processing caused
1040    /// both the reconnect loop and the caller to hang permanently.
1041    #[tokio::test]
1042    async fn persistent_client_timeout_on_unresponsive_server() {
1043        let path = crate::desktop::test_helpers::unique_socket_path();
1044        let listener = tokio::net::UnixListener::bind(&path).unwrap();
1045
1046        // Server that accepts the connection but never responds.
1047        let server_handle = tokio::spawn(async move {
1048            let (_stream, _) = listener.accept().await.unwrap();
1049            // Hold connection open — never send a response.
1050            tokio::time::sleep(Duration::from_secs(60)).await;
1051        });
1052
1053        let app = tauri::test::mock_app();
1054        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1055
1056        // Give the background task time to connect.
1057        tokio::time::sleep(Duration::from_millis(100)).await;
1058
1059        // Start should timeout and return an error, not hang forever.
1060        let result = tokio::time::timeout(
1061            Duration::from_secs(15),
1062            handle.start(StartConfig::default()),
1063        )
1064        .await;
1065
1066        assert!(
1067            result.is_ok(),
1068            "start should not hang — expected error, got outer timeout"
1069        );
1070        let inner = result.unwrap();
1071        assert!(
1072            inner.is_err(),
1073            "start should return error when server is unresponsive"
1074        );
1075
1076        server_handle.abort();
1077        let _ = std::fs::remove_file(&path);
1078    }
1079
1080    // -- C1: Persistent client terminates on handle drop --
1081
1082    /// Verify that dropping `PersistentIpcClientHandle` causes the background
1083    /// reconnection task to stop (via `CancellationToken`), preventing resource
1084    /// leaks where the task reconnects forever after the handle is dropped.
1085    #[tokio::test]
1086    async fn persistent_client_terminates_on_handle_drop() {
1087        let (path, shutdown) = setup_server();
1088        let app = tauri::test::mock_app();
1089
1090        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1091
1092        // Give the background task time to connect.
1093        tokio::time::sleep(Duration::from_millis(100)).await;
1094
1095        // Drop the handle — this should cancel the shutdown token.
1096        drop(handle);
1097
1098        // The background task should terminate within a bounded time.
1099        // We can't observe the JoinHandle directly (it's fire-and-forget),
1100        // but we can verify the socket isn't being reconnected to by checking
1101        // that server shutdown succeeds cleanly.
1102        tokio::time::sleep(Duration::from_secs(2)).await;
1103
1104        shutdown.cancel();
1105    }
1106}