Skip to main content

tauri_plugin_background_service/desktop/
ipc_client.rs

1//! Desktop IPC client for the GUI process.
2//!
3//! [`IpcClient`] connects to the headless sidecar's Unix domain socket and
4//! provides methods to start/stop the background service and receive events
5//! over the IPC protocol.
6//!
7//! Only available when the `desktop-service` Cargo feature is enabled.
8
9use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17    decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21use crate::models::{PluginEvent, ServiceStatus, StartConfig};
22
23/// IPC client for communicating with the headless sidecar service.
24///
25/// Connects to the sidecar's Unix domain socket and translates method calls
26/// into [`IpcRequest`] messages. Responses are decoded from [`IpcResponse`]
27/// frames.
28///
29/// Events from the sidecar (started/stopped/error) are read as [`IpcEvent`]
30/// frames and converted to [`PluginEvent`] for emission via the Tauri event
31/// system.
32pub struct IpcClient {
33    stream: TransportStream,
34}
35
36impl IpcClient {
37    /// Connect to the sidecar's IPC socket at the given path.
38    pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
39        let stream = transport::connect(&path).await?;
40        Ok(Self { stream })
41    }
42
43    /// Send a Start command to the sidecar.
44    pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45        let request = IpcRequest::Start { config };
46        let (response, _events) = self.send_and_read(&request).await?;
47        if response.ok {
48            Ok(())
49        } else {
50            Err(ServiceError::Ipc(
51                response.error.unwrap_or_else(|| "unknown error".into()),
52            ))
53        }
54    }
55
56    /// Send a Stop command to the sidecar.
57    pub async fn stop(&mut self) -> Result<(), ServiceError> {
58        let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
59        if response.ok {
60            Ok(())
61        } else {
62            Err(ServiceError::Ipc(
63                response.error.unwrap_or_else(|| "unknown error".into()),
64            ))
65        }
66    }
67
68    /// Send an IsRunning query to the sidecar.
69    pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70        let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
71        if response.ok {
72            Ok(response
73                .data
74                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75                .unwrap_or(false))
76        } else {
77            Err(ServiceError::Ipc(
78                response.error.unwrap_or_else(|| "unknown error".into()),
79            ))
80        }
81    }
82
83    /// Query the current service lifecycle state.
84    pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
85        let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
86        if response.ok {
87            response
88                .data
89                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
90                .and_then(|d| {
91                    serde_json::from_value::<ServiceStatus>(d)
92                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
93                })
94        } else {
95            Err(ServiceError::Ipc(
96                response.error.unwrap_or_else(|| "unknown error".into()),
97            ))
98        }
99    }
100
101    /// Read the next [`IpcEvent`] from the socket.
102    ///
103    /// Returns `None` if the connection was closed.
104    pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
105        let frame = match self.read_frame().await? {
106            Some(f) => f,
107            None => return Ok(None),
108        };
109        match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
110            IpcMessage::Event(event) => Ok(Some(event)),
111            other => Err(ServiceError::Ipc(format!(
112                "expected event frame, got {:?}",
113                std::mem::discriminant(&other),
114            ))),
115        }
116    }
117
118    /// Spawn a background task that reads [`IpcEvent`] frames and emits
119    /// [`PluginEvent`] via the given `AppHandle`.
120    ///
121    /// The task runs until the socket is closed or an error occurs.
122    pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
123        tokio::spawn(async move {
124            loop {
125                match self.read_event().await {
126                    Ok(Some(event)) => {
127                        let plugin_event = ipc_event_to_plugin_event(event);
128                        let _ = app.emit("background-service://event", plugin_event);
129                    }
130                    Ok(None) => break,
131                    Err(_) => break,
132                }
133            }
134        });
135    }
136
137    // -- Private helpers -------------------------------------------------------
138
139    async fn send_and_read(
140        &mut self,
141        request: &IpcRequest,
142    ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
143        self.send_request(request).await?;
144        // The server interleaves IpcResponse and broadcast IpcEvent frames on
145        // the same socket. Read frames in a loop until we get a Response,
146        // collecting any Event frames encountered along the way.
147        let mut events = Vec::new();
148        loop {
149            let frame = self
150                .read_frame()
151                .await?
152                .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
153            match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
154                IpcMessage::Response(resp) => return Ok((resp, events)),
155                IpcMessage::Event(e) => {
156                    events.push(e);
157                }
158                IpcMessage::Request(_) => {
159                    return Err(ServiceError::Ipc("unexpected request frame".into()));
160                }
161            }
162        }
163    }
164
165    async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
166        let msg = IpcMessage::Request(request.clone());
167        let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
168        transport::write_frame(&mut self.stream, &frame)
169            .await
170            .map_err(ServiceError::Ipc)?;
171        Ok(())
172    }
173
174    /// Read a single length-prefixed frame from the socket.
175    ///
176    /// Returns the payload bytes only (no length prefix).
177    /// Returns `None` if the connection was closed cleanly.
178    async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
179        transport::read_frame(&mut self.stream)
180            .await
181            .map_err(ServiceError::Ipc)
182    }
183}
184
185/// Convert an [`IpcEvent`] to a [`PluginEvent`].
186pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
187    match event {
188        IpcEvent::Started => PluginEvent::Started,
189        IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
190        IpcEvent::Error { message } => PluginEvent::Error { message },
191    }
192}
193
194// ─── Persistent IPC Client ────────────────────────────────────────────────────
195
196/// Internal command sent from the handle to the background connection task.
197enum IpcCommand {
198    Start {
199        config: StartConfig,
200        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
201    },
202    Stop {
203        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
204    },
205    IsRunning {
206        reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
207    },
208    GetState {
209        reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
210    },
211    EnableAutoRestart {
212        config: Option<StartConfig>,
213        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
214    },
215    DisableAutoRestart {
216        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
217    },
218    GetDesiredState {
219        reply: tokio::sync::oneshot::Sender<
220            Result<Option<crate::desired_state::DesiredState>, ServiceError>,
221        >,
222    },
223    ValidateSetup {
224        reply: tokio::sync::oneshot::Sender<
225            Result<crate::models::SetupValidationReport, ServiceError>,
226        >,
227    },
228}
229
230/// Handle to a persistent IPC client that maintains a long-lived connection
231/// to the headless sidecar.
232///
233/// The background task automatically:
234/// - Relays [`IpcEvent`] frames to `app.emit("background-service://event", ...)`
235/// - Reconnects on connection failure with exponential backoff (1s–30s, up to 10 retries)
236/// - Forwards commands (start/stop/is_running) over the same connection
237pub struct PersistentIpcClientHandle {
238    cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
239    shutdown: tokio_util::sync::CancellationToken,
240    connected: Arc<AtomicBool>,
241    socket_path: PathBuf,
242}
243
244impl Drop for PersistentIpcClientHandle {
245    fn drop(&mut self) {
246        self.shutdown.cancel();
247    }
248}
249
250impl PersistentIpcClientHandle {
251    /// Spawn the persistent IPC client background task.
252    ///
253    /// The task immediately begins trying to connect to the socket at
254    /// `socket_path`. Events are relayed to the Tauri event system via
255    /// `app.emit()`.
256    pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
257        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
258        let shutdown = tokio_util::sync::CancellationToken::new();
259        let connected = Arc::new(AtomicBool::new(false));
260
261        tokio::spawn(persistent_client_loop(
262            socket_path.clone(),
263            app,
264            cmd_rx,
265            shutdown.clone(),
266            connected.clone(),
267        ));
268
269        Self {
270            cmd_tx,
271            shutdown,
272            connected,
273            socket_path,
274        }
275    }
276
277    /// Send a Start command through the persistent connection.
278    pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
279        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
280        self.cmd_tx
281            .send(IpcCommand::Start {
282                config,
283                reply: reply_tx,
284            })
285            .await
286            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
287        reply_rx
288            .await
289            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
290    }
291
292    /// Send a Stop command through the persistent connection.
293    pub async fn stop(&self) -> Result<(), ServiceError> {
294        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
295        self.cmd_tx
296            .send(IpcCommand::Stop { reply: reply_tx })
297            .await
298            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
299        reply_rx
300            .await
301            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
302    }
303
304    /// Query whether the service is running through the persistent connection.
305    pub async fn is_running(&self) -> Result<bool, ServiceError> {
306        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
307        self.cmd_tx
308            .send(IpcCommand::IsRunning { reply: reply_tx })
309            .await
310            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
311        reply_rx
312            .await
313            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
314    }
315
316    /// Query the current service lifecycle state through the persistent connection.
317    pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
318        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
319        self.cmd_tx
320            .send(IpcCommand::GetState { reply: reply_tx })
321            .await
322            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
323        reply_rx
324            .await
325            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
326    }
327
328    /// Returns `true` if the persistent client is currently connected to the
329    /// headless sidecar, `false` otherwise.
330    pub fn is_connected(&self) -> bool {
331        self.connected.load(std::sync::atomic::Ordering::Relaxed)
332    }
333
334    /// Returns the socket path this client is configured to connect to.
335    pub fn socket_path(&self) -> &PathBuf {
336        &self.socket_path
337    }
338
339    /// Wait until the persistent client is connected, polling `is_connected()`
340    /// at 500ms intervals.
341    ///
342    /// Returns `Ok(true)` if connected within the timeout, `Ok(false)` if the
343    /// timeout elapsed without connecting.
344    pub async fn wait_for_connected(&self, timeout: Duration) -> Result<bool, ServiceError> {
345        let deadline = tokio::time::Instant::now() + timeout;
346        let poll_interval = Duration::from_millis(500);
347
348        while tokio::time::Instant::now() < deadline {
349            if self.is_connected() {
350                return Ok(true);
351            }
352            let remaining = deadline - tokio::time::Instant::now();
353            let sleep_dur = poll_interval.min(remaining);
354            tokio::time::sleep(sleep_dur).await;
355        }
356
357        if self.is_connected() {
358            Ok(true)
359        } else {
360            Ok(false)
361        }
362    }
363
364    /// Enable auto-restart through the persistent connection.
365    pub async fn enable_auto_restart(
366        &self,
367        config: Option<StartConfig>,
368    ) -> Result<(), ServiceError> {
369        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
370        self.cmd_tx
371            .send(IpcCommand::EnableAutoRestart {
372                config,
373                reply: reply_tx,
374            })
375            .await
376            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
377        reply_rx
378            .await
379            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
380    }
381
382    /// Disable auto-restart through the persistent connection.
383    pub async fn disable_auto_restart(&self) -> Result<(), ServiceError> {
384        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
385        self.cmd_tx
386            .send(IpcCommand::DisableAutoRestart { reply: reply_tx })
387            .await
388            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
389        reply_rx
390            .await
391            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
392    }
393
394    /// Get the persisted desired-state through the persistent connection.
395    pub async fn get_desired_state(
396        &self,
397    ) -> Result<Option<crate::desired_state::DesiredState>, ServiceError> {
398        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
399        self.cmd_tx
400            .send(IpcCommand::GetDesiredState { reply: reply_tx })
401            .await
402            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
403        reply_rx
404            .await
405            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
406    }
407
408    /// Validate background service setup prerequisites through the persistent connection.
409    pub async fn validate_setup(
410        &self,
411    ) -> Result<crate::models::SetupValidationReport, ServiceError> {
412        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
413        self.cmd_tx
414            .send(IpcCommand::ValidateSetup { reply: reply_tx })
415            .await
416            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
417        reply_rx
418            .await
419            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
420    }
421}
422
423/// Background task: maintain a persistent connection with reconnection.
424async fn persistent_client_loop<R: Runtime>(
425    socket_path: PathBuf,
426    app: tauri::AppHandle<R>,
427    mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
428    shutdown: tokio_util::sync::CancellationToken,
429    connected: Arc<AtomicBool>,
430) {
431    use backon::BackoffBuilder;
432
433    let backoff_builder = backon::ExponentialBuilder::default()
434        .with_min_delay(Duration::from_secs(1))
435        .with_max_delay(Duration::from_secs(30))
436        .with_max_times(10)
437        .with_jitter();
438
439    let mut attempts = backoff_builder.build();
440
441    loop {
442        tokio::select! {
443            biased;
444            _ = shutdown.cancelled() => {
445                log::info!("Persistent IPC client shutting down");
446                connected.store(false, std::sync::atomic::Ordering::Relaxed);
447                break;
448            }
449            connect_result = transport::connect(&socket_path) => {
450                match connect_result {
451                    Ok(stream) => {
452                        log::info!("Persistent IPC client connected");
453                        connected.store(true, std::sync::atomic::Ordering::Relaxed);
454                        let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
455                        // Reset backoff on successful connect (even if session later failed).
456                        attempts = backoff_builder.build();
457                        if result.is_err() {
458                            log::info!("Persistent IPC connection lost, reconnecting...");
459                            connected.store(false, std::sync::atomic::Ordering::Relaxed);
460                        }
461                    }
462                    Err(_) => {
463                        log::debug!("Persistent IPC client: connection failed, retrying...");
464                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
465                    }
466                }
467                let delay = match attempts.next() {
468                    Some(d) => d,
469                    None => {
470                        log::warn!("Persistent IPC client: backoff exhausted, giving up");
471                        break;
472                    }
473                };
474                tokio::select! {
475                    biased;
476                    _ = shutdown.cancelled() => {
477                        log::info!("Persistent IPC client shutting down");
478                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
479                        break;
480                    }
481                    _ = tokio::time::sleep(delay) => {}
482                }
483            }
484        }
485    }
486}
487
488/// Run a single persistent connection until it fails.
489///
490/// Splits the stream into read/write halves:
491/// - A reader task continuously reads frames and relays events to `app.emit()`.
492///   When a response frame arrives, it forwards it via a shared oneshot channel.
493/// - The main loop receives commands from `cmd_rx` and sends requests.
494async fn run_persistent_connection<R: Runtime>(
495    stream: TransportStream,
496    app: &tauri::AppHandle<R>,
497    cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
498    connected: &Arc<AtomicBool>,
499) -> Result<(), ServiceError> {
500    let (read_half, mut write_half) = transport::split(stream);
501
502    // Shared slot for the reader task to deliver response frames.
503    let response_slot: std::sync::Arc<
504        tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
505    > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
506
507    let slot_writer = response_slot.clone();
508    let app_clone = app.clone();
509    let connected_reader = connected.clone();
510
511    // Reader task: reads frames and either relays events or delivers responses.
512    let reader_handle = tokio::spawn(async move {
513        let mut read_half = read_half;
514        loop {
515            let frame = match read_frame_from(&mut read_half).await {
516                Ok(Some(f)) => f,
517                Ok(None) => break, // Connection closed
518                Err(_) => break,
519            };
520
521            match decode_frame(&frame) {
522                Ok(IpcMessage::Response(resp)) => {
523                    let mut slot = slot_writer.lock().await;
524                    if let Some(sender) = slot.take() {
525                        let _ = sender.send(resp);
526                    }
527                    continue;
528                }
529                Ok(IpcMessage::Event(event)) => {
530                    let plugin_event = ipc_event_to_plugin_event(event);
531                    let _ = app_clone.emit("background-service://event", plugin_event);
532                    continue;
533                }
534                Ok(IpcMessage::Request(_)) => {
535                    log::warn!("unexpected request frame on client connection");
536                    continue;
537                }
538                Err(e) => {
539                    log::debug!("failed to decode IPC frame: {e}");
540                    continue;
541                }
542            }
543        }
544        // Reader exited — mark disconnected.
545        connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
546    });
547
548    // Main loop: receive commands, send requests, wait for responses.
549    let result = loop {
550        tokio::select! {
551            cmd = cmd_rx.recv() => {
552                let cmd = match cmd {
553                    Some(c) => c,
554                    None => break Err(ServiceError::Ipc("command channel closed".into())),
555                };
556
557                match cmd {
558                    IpcCommand::Start { config, reply } => {
559                        let request = IpcRequest::Start { config };
560                        let rx = prepare_response_slot(&response_slot).await;
561                        if let Err(e) = send_request_to(&mut write_half, &request).await {
562                            let _ = reply.send(Err(e));
563                            break Err(ServiceError::Ipc("send failed".into()));
564                        }
565                        let response = await_response(rx).await;
566                        let result = match response {
567                            Ok(resp) if resp.ok => Ok(()),
568                            Ok(resp) => Err(ServiceError::Ipc(
569                                resp.error.unwrap_or_else(|| "unknown error".into()),
570                            )),
571                            Err(e) => Err(e),
572                        };
573                        let _ = reply.send(result);
574                    }
575                    IpcCommand::Stop { reply } => {
576                        let rx = prepare_response_slot(&response_slot).await;
577                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
578                            let _ = reply.send(Err(e));
579                            break Err(ServiceError::Ipc("send failed".into()));
580                        }
581                        let response = await_response(rx).await;
582                        let result = match response {
583                            Ok(resp) if resp.ok => Ok(()),
584                            Ok(resp) => Err(ServiceError::Ipc(
585                                resp.error.unwrap_or_else(|| "unknown error".into()),
586                            )),
587                            Err(e) => Err(e),
588                        };
589                        let _ = reply.send(result);
590                    }
591                    IpcCommand::IsRunning { reply } => {
592                        let rx = prepare_response_slot(&response_slot).await;
593                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
594                            let _ = reply.send(Err(e));
595                            break Err(ServiceError::Ipc("send failed".into()));
596                        }
597                        let response = await_response(rx).await;
598                        let result = match response {
599                            Ok(resp) if resp.ok => Ok(resp
600                                .data
601                                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
602                                .unwrap_or(false)),
603                            Ok(resp) => Err(ServiceError::Ipc(
604                                resp.error.unwrap_or_else(|| "unknown error".into()),
605                            )),
606                            Err(e) => Err(e),
607                        };
608                        let _ = reply.send(result);
609                    }
610                    IpcCommand::GetState { reply } => {
611                        let rx = prepare_response_slot(&response_slot).await;
612                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
613                            let _ = reply.send(Err(e));
614                            break Err(ServiceError::Ipc("send failed".into()));
615                        }
616                        let response = await_response(rx).await;
617                        let result = match response {
618                            Ok(resp) if resp.ok => resp
619                                .data
620                                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
621                                .and_then(|d| {
622                                    serde_json::from_value::<ServiceStatus>(d)
623                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
624                                }),
625                            Ok(resp) => Err(ServiceError::Ipc(
626                                resp.error.unwrap_or_else(|| "unknown error".into()),
627                            )),
628                            Err(e) => Err(e),
629                        };
630                        let _ = reply.send(result);
631                    }
632                    IpcCommand::EnableAutoRestart { config, reply } => {
633                        let request = IpcRequest::EnableAutoRestart { config };
634                        let rx = prepare_response_slot(&response_slot).await;
635                        if let Err(e) = send_request_to(&mut write_half, &request).await {
636                            let _ = reply.send(Err(e));
637                            break Err(ServiceError::Ipc("send failed".into()));
638                        }
639                        let response = await_response(rx).await;
640                        let result = match response {
641                            Ok(resp) if resp.ok => Ok(()),
642                            Ok(resp) => Err(ServiceError::Ipc(
643                                resp.error.unwrap_or_else(|| "unknown error".into()),
644                            )),
645                            Err(e) => Err(e),
646                        };
647                        let _ = reply.send(result);
648                    }
649                    IpcCommand::DisableAutoRestart { reply } => {
650                        let rx = prepare_response_slot(&response_slot).await;
651                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::DisableAutoRestart).await {
652                            let _ = reply.send(Err(e));
653                            break Err(ServiceError::Ipc("send failed".into()));
654                        }
655                        let response = await_response(rx).await;
656                        let result = match response {
657                            Ok(resp) if resp.ok => Ok(()),
658                            Ok(resp) => Err(ServiceError::Ipc(
659                                resp.error.unwrap_or_else(|| "unknown error".into()),
660                            )),
661                            Err(e) => Err(e),
662                        };
663                        let _ = reply.send(result);
664                    }
665                    IpcCommand::GetDesiredState { reply } => {
666                        let rx = prepare_response_slot(&response_slot).await;
667                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetDesiredState).await {
668                            let _ = reply.send(Err(e));
669                            break Err(ServiceError::Ipc("send failed".into()));
670                        }
671                        let response = await_response(rx).await;
672                        let result = match response {
673                            Ok(resp) if resp.ok => {
674                                match resp.data {
675                                    Some(d) => serde_json::from_value::<crate::desired_state::DesiredState>(d)
676                                        .map(Some)
677                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetDesiredState: {e}"))),
678                                    None => Ok(None),
679                                }
680                            }
681                            Ok(resp) => Err(ServiceError::Ipc(
682                                resp.error.unwrap_or_else(|| "unknown error".into()),
683                            )),
684                            Err(e) => Err(e),
685                        };
686                        let _ = reply.send(result);
687                    }
688                    IpcCommand::ValidateSetup { reply } => {
689                        let rx = prepare_response_slot(&response_slot).await;
690                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::ValidateSetup).await {
691                            let _ = reply.send(Err(e));
692                            break Err(ServiceError::Ipc("send failed".into()));
693                        }
694                        let response = await_response(rx).await;
695                        let result = match response {
696                            Ok(resp) if resp.ok => {
697                                match resp.data {
698                                    Some(d) => serde_json::from_value::<crate::models::SetupValidationReport>(d)
699                                        .map_err(|e| ServiceError::Ipc(format!("deserialize ValidateSetup: {e}"))),
700                                    None => Err(ServiceError::Ipc("missing ValidateSetup response data".into())),
701                                }
702                            }
703                            Ok(resp) => Err(ServiceError::Ipc(
704                                resp.error.unwrap_or_else(|| "unknown error".into()),
705                            )),
706                            Err(e) => Err(e),
707                        };
708                        let _ = reply.send(result);
709                    }
710                }
711            }
712            _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
713                // Timeout — check if reader is still alive
714                if reader_handle.is_finished() {
715                    break Err(ServiceError::Ipc("reader task died".into()));
716                }
717            }
718        }
719    };
720
721    reader_handle.abort();
722    result
723}
724
725/// Send an IPC request frame through a write half.
726async fn send_request_to(
727    write_half: &mut TransportWriteHalf,
728    request: &IpcRequest,
729) -> Result<(), ServiceError> {
730    let msg = IpcMessage::Request(request.clone());
731    let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
732    transport::write_frame(write_half, &frame)
733        .await
734        .map_err(ServiceError::Ipc)?;
735    Ok(())
736}
737
738/// Prepare the shared response slot for an upcoming request.
739///
740/// Creates a oneshot channel and stores the sender in `slot` so the reader
741/// task can deliver the next response. Returns the receiver end.
742///
743/// Must be called **before** sending the request to prevent losing fast
744/// responses that arrive before the slot is set.
745async fn prepare_response_slot(
746    slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
747) -> tokio::sync::oneshot::Receiver<IpcResponse> {
748    let (tx, rx) = tokio::sync::oneshot::channel();
749    let mut guard = slot.lock().await;
750    debug_assert!(
751        guard.is_none(),
752        "response slot overwritten — sequential command invariant violated"
753    );
754    *guard = Some(tx);
755    rx
756}
757
758/// Await a response from the reader task with a timeout.
759///
760/// Returns `Err` if the response doesn't arrive within 10 seconds, preventing
761/// permanent hangs when the connection drops during command processing.
762async fn await_response(
763    rx: tokio::sync::oneshot::Receiver<IpcResponse>,
764) -> Result<IpcResponse, ServiceError> {
765    tokio::select! {
766        response = rx => {
767            response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
768        }
769        _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
770            Err(ServiceError::Ipc("response timeout".into()))
771        }
772    }
773}
774
775/// Read a single length-prefixed frame from a read half.
776///
777/// Returns the payload bytes only (no length prefix).
778async fn read_frame_from(
779    read_half: &mut TransportReadHalf,
780) -> Result<Option<Vec<u8>>, ServiceError> {
781    transport::read_frame(read_half)
782        .await
783        .map_err(ServiceError::Ipc)
784}
785
786#[cfg(test)]
787mod tests {
788    use super::*;
789    use crate::desktop::test_helpers::{
790        setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
791    };
792    use std::sync::atomic::Ordering;
793    use std::time::Duration;
794    use tauri::Listener;
795
796    // -- AC1: Client connects ---------------------------------------------------
797
798    #[tokio::test]
799    async fn ipc_client_connect() {
800        let (path, shutdown, _event_tx) = setup_server();
801        let result = IpcClient::connect(path).await;
802        assert!(result.is_ok(), "client should connect: {:?}", result.err());
803        shutdown.cancel();
804    }
805
806    // -- AC2: Start command works -----------------------------------------------
807
808    #[tokio::test]
809    async fn ipc_client_send_start() {
810        let (path, shutdown, _event_tx) = setup_server();
811        let mut client = IpcClient::connect(path).await.unwrap();
812        let result = client.start(StartConfig::default()).await;
813        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
814        shutdown.cancel();
815    }
816
817    // -- AC3: Stop command works ------------------------------------------------
818
819    #[tokio::test]
820    async fn ipc_client_send_stop() {
821        let (path, shutdown, _event_tx) = setup_server();
822        let mut client = IpcClient::connect(path).await.unwrap();
823        client.start(StartConfig::default()).await.unwrap();
824        let result = client.stop().await;
825        assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
826        shutdown.cancel();
827    }
828
829    // -- AC4: IsRunning returns status ------------------------------------------
830
831    #[tokio::test]
832    async fn ipc_client_is_running() {
833        let (path, shutdown, _event_tx) = setup_server();
834        let mut client = IpcClient::connect(path).await.unwrap();
835
836        let running = client.is_running().await.unwrap();
837        assert!(!running, "should not be running initially");
838
839        client.start(StartConfig::default()).await.unwrap();
840        let running = client.is_running().await.unwrap();
841        assert!(running, "should be running after start");
842
843        shutdown.cancel();
844    }
845
846    // -- GetState returns ServiceStatus ------------------------------------------
847
848    #[tokio::test]
849    async fn ipc_client_get_state_initial() {
850        let (path, shutdown, _event_tx) = setup_server();
851        let mut client = IpcClient::connect(path).await.unwrap();
852
853        let status = client.get_state().await.unwrap();
854        assert!(
855            matches!(status.state, crate::models::ServiceState::Idle),
856            "expected Idle, got {:?}",
857            status.state
858        );
859        assert_eq!(status.last_error, None);
860
861        shutdown.cancel();
862    }
863
864    #[tokio::test]
865    async fn ipc_client_get_state_after_start() {
866        let (path, shutdown, _event_tx) = setup_server();
867        let mut client = IpcClient::connect(path).await.unwrap();
868
869        client.start(StartConfig::default()).await.unwrap();
870
871        // Poll until Running — Start replies at Initializing, spawned task
872        // transitions to Running asynchronously.
873        let status = tokio::time::timeout(Duration::from_secs(2), async {
874            loop {
875                let s = client.get_state().await.unwrap();
876                if matches!(s.state, crate::models::ServiceState::Running) {
877                    return s;
878                }
879                tokio::time::sleep(Duration::from_millis(10)).await;
880            }
881        })
882        .await
883        .expect("timed out waiting for Running state");
884        assert_eq!(status.last_error, None);
885
886        shutdown.cancel();
887    }
888
889    #[tokio::test]
890    async fn ipc_client_get_state_after_stop() {
891        let (path, shutdown, _event_tx) = setup_server();
892        let mut client = IpcClient::connect(path).await.unwrap();
893
894        client.start(StartConfig::default()).await.unwrap();
895        client.stop().await.unwrap();
896        let status = client.get_state().await.unwrap();
897        assert!(
898            matches!(status.state, crate::models::ServiceState::Stopped),
899            "expected Stopped, got {:?}",
900            status.state
901        );
902
903        shutdown.cancel();
904    }
905
906    // -- AC5: Events are received -----------------------------------------------
907
908    #[tokio::test]
909    async fn ipc_client_receive_events() {
910        let (path, shutdown, event_tx) =
911            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
912        let mut client = IpcClient::connect(path).await.unwrap();
913        client.start(StartConfig::default()).await.unwrap();
914
915        // Simulate relay broadcasting Started
916        let _ = event_tx.send(IpcEvent::Started);
917
918        let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
919            .await
920            .expect("timed out waiting for event")
921            .expect("read_event failed");
922
923        assert!(event.is_some(), "should receive an event");
924        let event = event.unwrap();
925        assert!(
926            matches!(event, IpcEvent::Started),
927            "Expected Started event, got {:?}",
928            event
929        );
930
931        shutdown.cancel();
932    }
933
934    // -- Additional: Stop when not running returns error -------------------------
935
936    #[tokio::test]
937    async fn ipc_client_stop_when_not_running() {
938        let (path, shutdown, _event_tx) = setup_server();
939        let mut client = IpcClient::connect(path).await.unwrap();
940        let result = client.stop().await;
941        assert!(result.is_err(), "stop when not running should fail");
942        shutdown.cancel();
943    }
944
945    // -- Additional: Connect to nonexistent socket fails -------------------------
946
947    #[tokio::test]
948    async fn ipc_client_connect_to_nonexistent() {
949        let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
950        let result = IpcClient::connect(path).await;
951        assert!(
952            result.is_err(),
953            "should fail to connect to nonexistent socket"
954        );
955    }
956
957    // -- Additional: ipc_event_to_plugin_event conversion -----------------------
958
959    #[test]
960    fn ipc_event_to_plugin_event_started() {
961        let event = IpcEvent::Started;
962        let plugin = ipc_event_to_plugin_event(event);
963        assert!(matches!(plugin, PluginEvent::Started));
964    }
965
966    #[test]
967    fn ipc_event_to_plugin_event_stopped() {
968        let event = IpcEvent::Stopped {
969            reason: "cancelled".into(),
970        };
971        let plugin = ipc_event_to_plugin_event(event);
972        match plugin {
973            PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
974            other => panic!("Expected Stopped, got {other:?}"),
975        }
976    }
977
978    #[test]
979    fn ipc_event_to_plugin_event_error() {
980        let event = IpcEvent::Error {
981            message: "init failed".into(),
982        };
983        let plugin = ipc_event_to_plugin_event(event);
984        match plugin {
985            PluginEvent::Error { message } => assert_eq!(message, "init failed"),
986            other => panic!("Expected Error, got {other:?}"),
987        }
988    }
989
990    // -- Additional: Full lifecycle ---------------------------------------------
991
992    #[tokio::test]
993    async fn ipc_client_full_lifecycle() {
994        let (path, shutdown, _event_tx) = setup_server();
995        let mut client = IpcClient::connect(path).await.unwrap();
996
997        assert!(!client.is_running().await.unwrap());
998        client.start(StartConfig::default()).await.unwrap();
999        assert!(client.is_running().await.unwrap());
1000        client.stop().await.unwrap();
1001        assert!(!client.is_running().await.unwrap());
1002
1003        shutdown.cancel();
1004    }
1005
1006    // -- Additional: listen_events spawns and converts events -------------------
1007
1008    #[tokio::test]
1009    async fn ipc_client_listen_events() {
1010        let (path, shutdown, event_tx) =
1011            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1012        let app = tauri::test::mock_app();
1013
1014        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1015        let received_clone = received.clone();
1016        app.listen("background-service://event", move |_event| {
1017            received_clone.store(true, Ordering::SeqCst);
1018        });
1019
1020        let mut client = IpcClient::connect(path).await.unwrap();
1021        client.start(StartConfig::default()).await.unwrap();
1022        client.listen_events(app.handle().clone());
1023
1024        // Simulate relay broadcasting Started
1025        let _ = event_tx.send(IpcEvent::Started);
1026
1027        tokio::time::timeout(Duration::from_millis(500), async {
1028            while !received.load(Ordering::SeqCst) {
1029                tokio::time::sleep(Duration::from_millis(10)).await;
1030            }
1031        })
1032        .await
1033        .expect("timed out waiting for event via listen_events");
1034
1035        assert!(
1036            received.load(Ordering::SeqCst),
1037            "should have received event"
1038        );
1039        shutdown.cancel();
1040    }
1041
1042    // ═══════════════════════════════════════════════════════════════════════
1043    //  IPC LOOPBACK TESTS (Step 20 — AC2, AC3, AC4)
1044    // ═══════════════════════════════════════════════════════════════════════
1045
1046    // -- AC2: IPC loopback full lifecycle with event verification ---------------
1047
1048    /// Comprehensive IPC loopback: IpcServer + IpcClient in the same process.
1049    /// Exercises start → Started event → running → stop → Stopped event → stopped.
1050    ///
1051    /// Note: IpcEvent frames must be read BEFORE other requests because
1052    /// `send_and_read` skips event frames looking for IpcResponse.
1053    #[tokio::test]
1054    async fn ipc_loopback_full_lifecycle_with_events() {
1055        let (path, shutdown, event_tx) = setup_server();
1056        let mut client = IpcClient::connect(path).await.unwrap();
1057
1058        // Initially not running
1059        assert!(
1060            !client.is_running().await.unwrap(),
1061            "should not be running initially"
1062        );
1063
1064        // Start the service
1065        client
1066            .start(StartConfig::default())
1067            .await
1068            .expect("start should succeed");
1069
1070        // Simulate relay broadcasting Started
1071        let _ = event_tx.send(IpcEvent::Started);
1072
1073        // Read the Started event BEFORE any other request
1074        // (send_and_read on subsequent calls would skip buffered events)
1075        let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1076            .await
1077            .expect("timed out waiting for Started event")
1078            .expect("read_event failed")
1079            .expect("should receive event");
1080        assert!(
1081            matches!(started, IpcEvent::Started),
1082            "Expected Started event, got {started:?}"
1083        );
1084
1085        // Verify running (after consuming the event)
1086        assert!(
1087            client.is_running().await.unwrap(),
1088            "should be running after start"
1089        );
1090
1091        // Stop the service
1092        client.stop().await.expect("stop should succeed");
1093
1094        // Simulate relay broadcasting Stopped
1095        let _ = event_tx.send(IpcEvent::Stopped {
1096            reason: "cancelled".into(),
1097        });
1098
1099        // Read the Stopped event BEFORE any other request
1100        let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1101            .await
1102            .expect("timed out waiting for Stopped event")
1103            .expect("read_event failed")
1104            .expect("should receive event");
1105        assert!(
1106            matches!(stopped, IpcEvent::Stopped { .. }),
1107            "Expected Stopped event, got {stopped:?}"
1108        );
1109
1110        // Verify not running
1111        assert!(
1112            !client.is_running().await.unwrap(),
1113            "should not be running after stop"
1114        );
1115
1116        shutdown.cancel();
1117    }
1118
1119    // -- AC3: Event streaming converts IpcEvent to PluginEvent -------------------
1120
1121    /// Verify events streamed through IPC are correctly converted to PluginEvent.
1122    #[tokio::test]
1123    async fn ipc_loopback_event_streaming_plugin_event_conversion() {
1124        let (path, shutdown, event_tx) = setup_server();
1125        let mut client = IpcClient::connect(path).await.unwrap();
1126
1127        // Start — simulate relay broadcasting Started
1128        client.start(StartConfig::default()).await.unwrap();
1129        let _ = event_tx.send(IpcEvent::Started);
1130        let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1131            .await
1132            .expect("timed out")
1133            .expect("read_event failed")
1134            .expect("should receive event");
1135        let started_plugin = ipc_event_to_plugin_event(started_ipc);
1136        assert!(
1137            matches!(started_plugin, PluginEvent::Started),
1138            "Expected PluginEvent::Started, got {started_plugin:?}"
1139        );
1140
1141        // Stop — simulate relay broadcasting Stopped
1142        client.stop().await.unwrap();
1143        let _ = event_tx.send(IpcEvent::Stopped {
1144            reason: "cancelled".into(),
1145        });
1146        let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1147            .await
1148            .expect("timed out")
1149            .expect("read_event failed")
1150            .expect("should receive event");
1151        let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
1152        match stopped_plugin {
1153            PluginEvent::Stopped { reason } => {
1154                assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
1155            }
1156            other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
1157        }
1158
1159        shutdown.cancel();
1160    }
1161
1162    // -- AC4: Error handling — connection drop detected by client ---------------
1163
1164    /// Verify client detects a dropped connection gracefully (no panic).
1165    /// Simulates the server side closing the socket mid-connection.
1166    #[tokio::test]
1167    async fn ipc_loopback_connection_drop_returns_error() {
1168        let path = crate::desktop::test_helpers::unique_socket_path();
1169
1170        // Create a minimal "server" that accepts one connection then drops it.
1171        let listener = transport::bind(path.clone()).unwrap();
1172        let path_clone = path.clone();
1173
1174        let client_handle =
1175            tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
1176
1177        // Accept the connection and immediately drop the server-side stream.
1178        let (server_stream, _) = listener.accept().await.unwrap();
1179        drop(server_stream);
1180        tokio::time::sleep(Duration::from_millis(20)).await;
1181
1182        let mut client = client_handle.await.unwrap();
1183
1184        // Client should detect the closed connection on next operation.
1185        let result = client.is_running().await;
1186        assert!(
1187            result.is_err(),
1188            "should get error after server drops connection"
1189        );
1190
1191        let _ = std::fs::remove_file(&path);
1192    }
1193
1194    // -- AC4: Error handling — double start returns error through IPC ------------
1195
1196    /// Verify second start (when already running) returns an IPC error.
1197    #[tokio::test]
1198    async fn ipc_loopback_double_start_returns_error() {
1199        let (path, shutdown, _event_tx) = setup_server();
1200        let mut client = IpcClient::connect(path).await.unwrap();
1201
1202        client.start(StartConfig::default()).await.unwrap();
1203
1204        let result = client.start(StartConfig::default()).await;
1205        assert!(result.is_err(), "double start should return error");
1206        let err_msg = result.unwrap_err().to_string();
1207        assert!(
1208            err_msg.to_lowercase().contains("already"),
1209            "Error should mention 'already': {err_msg}"
1210        );
1211
1212        shutdown.cancel();
1213    }
1214
1215    // ═══════════════════════════════════════════════════════════════════════
1216    //  PERSISTENT IPC CLIENT TESTS (Step 12)
1217    // ═══════════════════════════════════════════════════════════════════════
1218
1219    // -- AC1: Persistent client connects and maintains connection --
1220
1221    /// Verify the persistent client connects to a running server and can
1222    /// forward commands through the persistent connection.
1223    #[tokio::test]
1224    async fn persistent_client_connects() {
1225        let (path, shutdown, _event_tx) = setup_server();
1226        let app = tauri::test::mock_app();
1227
1228        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1229
1230        // Give the background task time to connect.
1231        tokio::time::sleep(Duration::from_millis(100)).await;
1232
1233        // Send a command through the persistent connection.
1234        let running = handle.is_running().await;
1235        assert!(
1236            running.is_ok(),
1237            "should get response via persistent connection: {:?}",
1238            running.err()
1239        );
1240        assert!(!running.unwrap(), "should not be running initially");
1241
1242        shutdown.cancel();
1243    }
1244
1245    // -- AC3: Auto-reconnect --
1246
1247    /// Verify the persistent client reconnects after the server restarts.
1248    #[tokio::test]
1249    async fn persistent_client_reconnects() {
1250        use crate::desktop::ipc_server::IpcServer;
1251        use crate::manager::{manager_loop, ServiceFactory};
1252        use tokio_util::sync::CancellationToken;
1253
1254        // First server
1255        let (path, shutdown1, _event_tx) = setup_server();
1256        let app = tauri::test::mock_app();
1257
1258        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1259
1260        // Verify connected to first server.
1261        tokio::time::sleep(Duration::from_millis(100)).await;
1262        let result = handle.is_running().await;
1263        assert!(
1264            result.is_ok(),
1265            "should connect to first server: {:?}",
1266            result.err()
1267        );
1268
1269        // Kill first server and wait for socket cleanup.
1270        shutdown1.cancel();
1271        tokio::time::sleep(Duration::from_millis(150)).await;
1272
1273        // Start second server at the same path.
1274        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1275        let factory: ServiceFactory<tauri::test::MockRuntime> =
1276            Box::new(|| Box::new(BlockingService));
1277        tokio::spawn(manager_loop(
1278            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1279        ));
1280        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1281        let shutdown2 = CancellationToken::new();
1282        let s2 = shutdown2.clone();
1283        tokio::spawn(async move { server2.run(s2).await });
1284
1285        // Wait for the client to reconnect (1s reconnect delay + margin).
1286        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1287            loop {
1288                tokio::time::sleep(Duration::from_millis(200)).await;
1289                if handle.is_running().await.is_ok() {
1290                    break;
1291                }
1292            }
1293        })
1294        .await;
1295        assert!(
1296            reconnected.is_ok(),
1297            "persistent client should reconnect to second server"
1298        );
1299
1300        shutdown2.cancel();
1301    }
1302
1303    // -- AC2: Event relay via app.emit() --
1304
1305    /// Verify events from the server are relayed to `app.emit()` by the
1306    /// persistent client's background reader task.
1307    #[tokio::test]
1308    async fn event_relay() {
1309        let (path, shutdown, event_tx) =
1310            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1311        let app = tauri::test::mock_app();
1312
1313        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1314        let received_clone = received.clone();
1315        app.listen("background-service://event", move |_event| {
1316            received_clone.store(true, Ordering::SeqCst);
1317        });
1318
1319        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1320
1321        // Start the service — the reader task should relay the Started event.
1322        let result = handle.start(StartConfig::default()).await;
1323        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1324
1325        // Simulate relay broadcasting Started
1326        let _ = event_tx.send(IpcEvent::Started);
1327
1328        // Wait for the event to be relayed via app.emit().
1329        tokio::time::timeout(Duration::from_millis(500), async {
1330            while !received.load(Ordering::SeqCst) {
1331                tokio::time::sleep(Duration::from_millis(10)).await;
1332            }
1333        })
1334        .await
1335        .expect("timed out waiting for event relay via app.emit()");
1336
1337        assert!(
1338            received.load(Ordering::SeqCst),
1339            "event should be relayed through app.emit()"
1340        );
1341
1342        shutdown.cancel();
1343    }
1344
1345    // -- AC4: Start/Stop lifecycle through persistent client --
1346
1347    /// Verify the full start → running → stop → not-running lifecycle works
1348    /// through the persistent IPC client.
1349    #[tokio::test]
1350    async fn start_stop_lifecycle() {
1351        let (path, shutdown, _event_tx) = setup_server();
1352        let app = tauri::test::mock_app();
1353
1354        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1355
1356        // Initially not running.
1357        let running = handle.is_running().await.unwrap();
1358        assert!(!running, "should not be running initially");
1359
1360        // Start.
1361        handle
1362            .start(StartConfig::default())
1363            .await
1364            .expect("start should succeed");
1365        let running = handle.is_running().await.unwrap();
1366        assert!(running, "should be running after start");
1367
1368        // Stop.
1369        handle.stop().await.expect("stop should succeed");
1370        let running = handle.is_running().await.unwrap();
1371        assert!(!running, "should not be running after stop");
1372
1373        shutdown.cancel();
1374    }
1375
1376    // -- GetState through persistent client --
1377
1378    #[tokio::test]
1379    async fn persistent_client_get_state() {
1380        let (path, shutdown, _event_tx) = setup_server();
1381        let app = tauri::test::mock_app();
1382
1383        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1384
1385        // Give the background task time to connect.
1386        tokio::time::sleep(Duration::from_millis(100)).await;
1387
1388        let status = handle.get_state().await.unwrap();
1389        assert!(
1390            matches!(status.state, crate::models::ServiceState::Idle),
1391            "expected Idle, got {:?}",
1392            status.state
1393        );
1394
1395        handle.start(StartConfig::default()).await.unwrap();
1396
1397        // Poll until Running — race between Start reply (Initializing) and
1398        // spawned task transition to Running.
1399        let status = tokio::time::timeout(Duration::from_secs(2), async {
1400            loop {
1401                let s = handle.get_state().await.unwrap();
1402                if matches!(s.state, crate::models::ServiceState::Running) {
1403                    return s;
1404                }
1405                tokio::time::sleep(Duration::from_millis(10)).await;
1406            }
1407        })
1408        .await
1409        .expect("timed out waiting for Running state");
1410        assert!(
1411            matches!(status.state, crate::models::ServiceState::Running),
1412            "expected Running, got {:?}",
1413            status.state
1414        );
1415
1416        shutdown.cancel();
1417    }
1418
1419    // -- Fix: Timeout prevents permanent hang on unresponsive server --
1420
1421    /// Verify the persistent client returns an error (not hang) when the
1422    /// server accepts a connection but never responds to a command.
1423    ///
1424    /// This is a regression test for the critical bug where `wait_for_response`
1425    /// had no timeout — a dropped connection during command processing caused
1426    /// both the reconnect loop and the caller to hang permanently.
1427    #[tokio::test]
1428    async fn persistent_client_timeout_on_unresponsive_server() {
1429        let path = crate::desktop::test_helpers::unique_socket_path();
1430        let listener = transport::bind(path.clone()).unwrap();
1431
1432        // Server that accepts the connection but never responds.
1433        let server_handle = tokio::spawn(async move {
1434            let (_stream, _) = listener.accept().await.unwrap();
1435            // Hold connection open — never send a response.
1436            tokio::time::sleep(Duration::from_secs(60)).await;
1437        });
1438
1439        let app = tauri::test::mock_app();
1440        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1441
1442        // Give the background task time to connect.
1443        tokio::time::sleep(Duration::from_millis(100)).await;
1444
1445        // Start should timeout and return an error, not hang forever.
1446        let result = tokio::time::timeout(
1447            Duration::from_secs(15),
1448            handle.start(StartConfig::default()),
1449        )
1450        .await;
1451
1452        assert!(
1453            result.is_ok(),
1454            "start should not hang — expected error, got outer timeout"
1455        );
1456        let inner = result.unwrap();
1457        assert!(
1458            inner.is_err(),
1459            "start should return error when server is unresponsive"
1460        );
1461
1462        server_handle.abort();
1463        let _ = std::fs::remove_file(&path);
1464    }
1465
1466    // -- C1: Persistent client terminates on handle drop --
1467
1468    /// Verify that dropping `PersistentIpcClientHandle` causes the background
1469    /// reconnection task to stop (via `CancellationToken`), preventing resource
1470    /// leaks where the task reconnects forever after the handle is dropped.
1471    #[tokio::test]
1472    async fn persistent_client_terminates_on_handle_drop() {
1473        let (path, shutdown, _event_tx) = setup_server();
1474        let app = tauri::test::mock_app();
1475
1476        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1477
1478        // Give the background task time to connect.
1479        tokio::time::sleep(Duration::from_millis(100)).await;
1480
1481        // Drop the handle — this should cancel the shutdown token.
1482        drop(handle);
1483
1484        // The background task should terminate within a bounded time.
1485        // We can't observe the JoinHandle directly (it's fire-and-forget),
1486        // but we can verify the socket isn't being reconnected to by checking
1487        // that server shutdown succeeds cleanly.
1488        tokio::time::sleep(Duration::from_secs(2)).await;
1489
1490        shutdown.cancel();
1491    }
1492
1493    // ═══════════════════════════════════════════════════════════════════════
1494    //  BUFFERED EVENTS TESTS (Step 4)
1495    // ═══════════════════════════════════════════════════════════════════════
1496
1497    /// Helper: create a raw server that sends specific frames in response to
1498    /// any request, giving full control over the event/response interleaving.
1499    async fn buffered_server(
1500        path: &std::path::Path,
1501        frames: Vec<IpcMessage>,
1502    ) -> tokio::task::JoinHandle<()> {
1503        let listener = transport::bind(path.to_path_buf()).unwrap();
1504        tokio::spawn(async move {
1505            let (mut stream, _) = listener.accept().await.unwrap();
1506            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1507            // Read and discard the incoming request.
1508            let mut len_buf = [0u8; 4];
1509            if stream.read_exact(&mut len_buf).await.is_err() {
1510                return;
1511            }
1512            let len = u32::from_be_bytes(len_buf) as usize;
1513            let mut payload = vec![0u8; len];
1514            if stream.read_exact(&mut payload).await.is_err() {
1515                return;
1516            }
1517            // Send the pre-programmed frames in order.
1518            for msg in &frames {
1519                let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1520                if stream.write_all(&frame).await.is_err() {
1521                    return;
1522                }
1523            }
1524        })
1525    }
1526
1527    /// send_and_read returns response with empty event list when no events interleave.
1528    #[tokio::test]
1529    async fn send_and_read_no_interleaved_events() {
1530        let path = crate::desktop::test_helpers::unique_socket_path();
1531        let server = buffered_server(
1532            &path,
1533            vec![IpcMessage::Response(IpcResponse {
1534                ok: true,
1535                data: None,
1536                error: None,
1537            })],
1538        )
1539        .await;
1540
1541        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1542        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1543        assert!(response.ok, "response should be ok");
1544        assert!(
1545            events.is_empty(),
1546            "events should be empty when no events interleave, got {:?}",
1547            events
1548        );
1549
1550        server.await.unwrap();
1551        let _ = std::fs::remove_file(&path);
1552    }
1553
1554    /// send_and_read collects a single interleaved event alongside the response.
1555    #[tokio::test]
1556    async fn send_and_read_single_interleaved_event() {
1557        let path = crate::desktop::test_helpers::unique_socket_path();
1558        let server = buffered_server(
1559            &path,
1560            vec![
1561                IpcMessage::Event(IpcEvent::Started),
1562                IpcMessage::Response(IpcResponse {
1563                    ok: true,
1564                    data: None,
1565                    error: None,
1566                }),
1567            ],
1568        )
1569        .await;
1570
1571        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1572        let (response, events) = client
1573            .send_and_read(&IpcRequest::Start {
1574                config: StartConfig::default(),
1575            })
1576            .await
1577            .unwrap();
1578        assert!(response.ok, "response should be ok");
1579        assert_eq!(events.len(), 1, "should collect exactly one event");
1580        assert!(
1581            matches!(events[0], IpcEvent::Started),
1582            "expected Started event, got {:?}",
1583            events[0]
1584        );
1585
1586        server.await.unwrap();
1587        let _ = std::fs::remove_file(&path);
1588    }
1589
1590    // ═══════════════════════════════════════════════════════════════════════
1591    //  IS_CONNECTED TESTS (Step 5)
1592    // ═══════════════════════════════════════════════════════════════════════
1593
1594    /// is_connected() returns false before the background task has connected
1595    /// to any server.
1596    #[tokio::test]
1597    async fn is_connected_false_before_server() {
1598        let app = tauri::test::mock_app();
1599        let path = crate::desktop::test_helpers::unique_socket_path();
1600        // No server running — spawn handle pointing at a nonexistent socket.
1601        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1602        // The background task may or may not have attempted a connection yet,
1603        // but it should definitely NOT be connected.
1604        tokio::time::sleep(Duration::from_millis(50)).await;
1605        assert!(
1606            !handle.is_connected(),
1607            "should not be connected when no server is running"
1608        );
1609        let _ = std::fs::remove_file(&path);
1610    }
1611
1612    /// is_connected() returns true once the persistent client has established
1613    /// a connection to a running server.
1614    #[tokio::test]
1615    async fn is_connected_true_after_connect() {
1616        let (path, shutdown, _event_tx) = setup_server();
1617        let app = tauri::test::mock_app();
1618        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1619
1620        // Wait for the background task to connect.
1621        tokio::time::timeout(Duration::from_secs(2), async {
1622            while !handle.is_connected() {
1623                tokio::time::sleep(Duration::from_millis(50)).await;
1624            }
1625        })
1626        .await
1627        .expect("timed out waiting for is_connected to become true");
1628
1629        assert!(
1630            handle.is_connected(),
1631            "should be connected after server is up"
1632        );
1633
1634        shutdown.cancel();
1635    }
1636
1637    /// is_connected() returns false after the server shuts down and the
1638    /// persistent client detects the disconnection.
1639    ///
1640    /// Uses a minimal server that accepts one connection then explicitly drops
1641    /// it, guaranteeing the reader task exits and sets connected = false.
1642    #[tokio::test]
1643    async fn is_connected_false_after_server_shutdown() {
1644        let path = crate::desktop::test_helpers::unique_socket_path();
1645        let path_clone = path.clone();
1646        let listener = transport::bind(path.clone()).unwrap();
1647
1648        // Server that accepts a connection, waits briefly, then drops
1649        // everything (stream + listener), preventing reconnection.
1650        let server_handle = tokio::spawn(async move {
1651            let (stream, _) = listener.accept().await.unwrap();
1652            // Hold the connection briefly so the client can connect.
1653            tokio::time::sleep(Duration::from_millis(200)).await;
1654            // Drop the stream — reader will detect EOF.
1655            drop(stream);
1656            // Drop the listener (moved into this closure) and clean up socket.
1657            let _ = std::fs::remove_file(&path_clone);
1658        });
1659
1660        let app = tauri::test::mock_app();
1661        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1662
1663        // Wait for connection.
1664        tokio::time::timeout(Duration::from_secs(2), async {
1665            while !handle.is_connected() {
1666                tokio::time::sleep(Duration::from_millis(50)).await;
1667            }
1668        })
1669        .await
1670        .expect("timed out waiting for initial connection");
1671
1672        assert!(handle.is_connected(), "should be connected initially");
1673
1674        // Wait for the server to drop the connection and listener.
1675        tokio::time::timeout(Duration::from_secs(3), async {
1676            while handle.is_connected() {
1677                tokio::time::sleep(Duration::from_millis(50)).await;
1678            }
1679        })
1680        .await
1681        .expect("timed out waiting for is_connected to become false");
1682
1683        assert!(
1684            !handle.is_connected(),
1685            "should not be connected after server shutdown"
1686        );
1687
1688        server_handle.abort();
1689        let _ = std::fs::remove_file(&path);
1690    }
1691
1692    // ═══════════════════════════════════════════════════════════════════════
1693    //  BACKOFF BEHAVIOR TESTS (Step 6c)
1694    // ═══════════════════════════════════════════════════════════════════════
1695
1696    /// Verify the ExponentialBuilder config used in persistent_client_loop
1697    /// produces increasing delays, respects the 30s max, and exhausts after
1698    /// exactly 10 attempts.
1699    #[test]
1700    fn backoff_builder_produces_increasing_delays() {
1701        use backon::BackoffBuilder;
1702
1703        let builder = backon::ExponentialBuilder::default()
1704            .with_min_delay(Duration::from_secs(1))
1705            .with_max_delay(Duration::from_secs(30))
1706            .with_max_times(10)
1707            .with_jitter();
1708
1709        let mut attempts = builder.build();
1710        let mut delays = Vec::new();
1711        while let Some(d) = attempts.next() {
1712            delays.push(d);
1713        }
1714
1715        assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1716
1717        // First delay ≈ 1s (with jitter, allow 0.5–2s).
1718        assert!(
1719            delays[0] >= Duration::from_millis(500),
1720            "first delay too short: {:?}",
1721            delays[0]
1722        );
1723        assert!(
1724            delays[0] <= Duration::from_secs(2),
1725            "first delay too long: {:?}",
1726            delays[0]
1727        );
1728
1729        // Last delay should be at or near the 30s cap.
1730        assert!(
1731            delays[9] >= Duration::from_secs(15),
1732            "last delay should approach max: {:?}",
1733            delays[9]
1734        );
1735
1736        // All delays capped — with jitter, allow up to 2× max_delay.
1737        for d in &delays {
1738            assert!(
1739                *d <= Duration::from_secs(60),
1740                "delay exceeds max_delay + jitter margin: {:?}",
1741                d
1742            );
1743        }
1744
1745        // Iterator is exhausted after 10 attempts.
1746        assert!(
1747            attempts.next().is_none(),
1748            "should return None after 10 attempts"
1749        );
1750    }
1751
1752    /// Verify the persistent client stops retrying after exhausting its backoff
1753    /// budget and that subsequent commands fail with "shut down" (channel closed).
1754    ///
1755    /// With min_delay=1s, max_delay=30s, max_times=10, total retry time ≈ 152s.
1756    /// Marked `#[ignore]` to avoid slowing down normal test runs.
1757    /// Run with `cargo test -- --ignored`.
1758    #[ignore]
1759    #[tokio::test]
1760    async fn persistent_client_exits_after_max_retries() {
1761        let app = tauri::test::mock_app();
1762        let path = crate::desktop::test_helpers::unique_socket_path();
1763        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1764
1765        // Wait for the background task to exhaust retries.
1766        // Poll is_running() — once the loop exits, the command channel closes
1767        // and we get "shut down" instead of a timeout.
1768        let exited = tokio::time::timeout(Duration::from_secs(180), async {
1769            loop {
1770                tokio::time::sleep(Duration::from_secs(5)).await;
1771                if let Err(e) = handle.is_running().await {
1772                    if e.to_string().contains("shut down") {
1773                        return;
1774                    }
1775                }
1776            }
1777        })
1778        .await;
1779
1780        assert!(
1781            exited.is_ok(),
1782            "persistent client should exit after max retries"
1783        );
1784        assert!(!handle.is_connected(), "should not be connected after exit");
1785
1786        let _ = std::fs::remove_file(&path);
1787    }
1788
1789    /// Verify the persistent client reconnects after a server restart and that
1790    /// the backoff resets (reconnection starts from ~1s min_delay, not an
1791    /// accumulated delay).
1792    #[tokio::test]
1793    async fn persistent_client_reconnects_after_server_restart() {
1794        use crate::desktop::ipc_server::IpcServer;
1795        use crate::manager::{manager_loop, ServiceFactory};
1796        use tokio_util::sync::CancellationToken;
1797
1798        // Start first server.
1799        let (path, shutdown1, _event_tx) = setup_server();
1800        let app = tauri::test::mock_app();
1801        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1802
1803        // Wait for connection to first server.
1804        tokio::time::timeout(Duration::from_secs(2), async {
1805            while !handle.is_connected() {
1806                tokio::time::sleep(Duration::from_millis(50)).await;
1807            }
1808        })
1809        .await
1810        .expect("should connect to first server");
1811
1812        // Verify commands work through first connection.
1813        let result = handle.is_running().await;
1814        assert!(
1815            result.is_ok(),
1816            "command should succeed on first server: {:?}",
1817            result.err()
1818        );
1819
1820        // Kill first server.
1821        shutdown1.cancel();
1822        tokio::time::sleep(Duration::from_millis(150)).await;
1823
1824        // Start second server at the same path.
1825        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1826        let factory: ServiceFactory<tauri::test::MockRuntime> =
1827            Box::new(|| Box::new(BlockingService));
1828        tokio::spawn(manager_loop(
1829            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1830        ));
1831        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1832        let shutdown2 = CancellationToken::new();
1833        let s2 = shutdown2.clone();
1834        tokio::spawn(async move { server2.run(s2).await });
1835
1836        // Client should reconnect within ~1s (backoff resets to min_delay after
1837        // a successful session, so the first retry is ~1s, not accumulated).
1838        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1839            loop {
1840                if handle.is_connected() {
1841                    break;
1842                }
1843                tokio::time::sleep(Duration::from_millis(100)).await;
1844            }
1845        })
1846        .await;
1847
1848        assert!(
1849            reconnected.is_ok(),
1850            "persistent client should reconnect after server restart (backoff resets)"
1851        );
1852
1853        // Verify commands work through the new connection.
1854        let result = handle.is_running().await;
1855        assert!(
1856            result.is_ok(),
1857            "commands should work after reconnection: {:?}",
1858            result.err()
1859        );
1860
1861        shutdown2.cancel();
1862    }
1863
1864    // ═══════════════════════════════════════════════════════════════════════
1865    //  C3: ZERO-LENGTH FRAME REJECTION TESTS
1866    // ═══════════════════════════════════════════════════════════════════════
1867
1868    /// Verify that receiving a zero-length frame (\x00\x00\x00\x00) from the
1869    /// server produces an error, not `Ok(None)`. Zero-length frames are
1870    /// protocol violations and must be rejected explicitly.
1871    #[tokio::test]
1872    async fn ipc_client_rejects_zero_length_frame() {
1873        let path = crate::desktop::test_helpers::unique_socket_path();
1874        let listener = transport::bind(path.clone()).unwrap();
1875
1876        // Server that sends a zero-length frame immediately after accepting.
1877        let server_handle = tokio::spawn(async move {
1878            let (mut stream, _) = listener.accept().await.unwrap();
1879            use tokio::io::AsyncWriteExt;
1880            stream.write_all(&[0u8; 4]).await.unwrap();
1881            tokio::time::sleep(Duration::from_millis(500)).await;
1882        });
1883
1884        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1885
1886        // Reading a frame should return an error, not Ok(None)
1887        let result = client.read_frame().await;
1888        assert!(
1889            result.is_err(),
1890            "zero-length frame should return error, got {:?}",
1891            result
1892        );
1893        let err = result.unwrap_err().to_string();
1894        assert!(
1895            err.contains("zero-length frame"),
1896            "Error should mention 'zero-length frame': {err}"
1897        );
1898
1899        server_handle.abort();
1900        let _ = std::fs::remove_file(&path);
1901    }
1902
1903    /// Verify that an actual EOF (connection drop) still returns `Ok(None)`.
1904    /// This is the "clean close" case — distinct from a zero-length frame.
1905    #[tokio::test]
1906    async fn ipc_client_eof_returns_ok_none() {
1907        let path = crate::desktop::test_helpers::unique_socket_path();
1908        let listener = transport::bind(path.clone()).unwrap();
1909
1910        // Server that accepts a connection then immediately drops it.
1911        let server_handle = tokio::spawn(async move {
1912            let (stream, _) = listener.accept().await.unwrap();
1913            drop(stream);
1914        });
1915
1916        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1917        tokio::time::sleep(Duration::from_millis(20)).await;
1918
1919        // Reading a frame should return Ok(None) for clean EOF
1920        let result = client.read_frame().await;
1921        assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1922        assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1923
1924        server_handle.abort();
1925        let _ = std::fs::remove_file(&path);
1926    }
1927
1928    // ═══════════════════════════════════════════════════════════════════════
1929    //  WAIT_FOR_CONNECTED TESTS (Step 12)
1930    // ═══════════════════════════════════════════════════════════════════════
1931
1932    /// wait_for_connected returns Ok immediately when already connected.
1933    #[tokio::test]
1934    async fn wait_for_connected_returns_immediately_when_connected() {
1935        let (path, shutdown, _event_tx) = setup_server();
1936        let app = tauri::test::mock_app();
1937        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1938
1939        // Wait for initial connection.
1940        tokio::time::timeout(Duration::from_secs(2), async {
1941            while !handle.is_connected() {
1942                tokio::time::sleep(Duration::from_millis(50)).await;
1943            }
1944        })
1945        .await
1946        .expect("should connect");
1947
1948        // Already connected → should return Ok immediately.
1949        let result = handle
1950            .wait_for_connected(Duration::from_secs(5))
1951            .await
1952            .unwrap();
1953        assert!(result, "should return true when connected");
1954
1955        shutdown.cancel();
1956    }
1957
1958    /// wait_for_connected returns Err(timeout) when no server is running.
1959    #[tokio::test]
1960    async fn wait_for_connected_times_out_when_no_server() {
1961        let app = tauri::test::mock_app();
1962        let path = crate::desktop::test_helpers::unique_socket_path();
1963        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1964
1965        // No server → should time out quickly.
1966        let result = handle
1967            .wait_for_connected(Duration::from_millis(200))
1968            .await
1969            .unwrap();
1970        assert!(!result, "should return false when no server and timeout");
1971
1972        let _ = std::fs::remove_file(&path);
1973    }
1974
1975    /// wait_for_connected returns Ok once the server starts and client connects.
1976    #[tokio::test]
1977    async fn wait_for_connected_succeeds_after_server_starts() {
1978        let (path, shutdown, _event_tx) = setup_server();
1979        let app = tauri::test::mock_app();
1980        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1981
1982        // Server is running — wait_for_connected should succeed within timeout.
1983        let result = handle
1984            .wait_for_connected(Duration::from_secs(5))
1985            .await
1986            .unwrap();
1987        assert!(result, "should connect within timeout");
1988
1989        shutdown.cancel();
1990    }
1991
1992    /// send_and_read collects multiple consecutive events before the response.
1993    #[tokio::test]
1994    async fn send_and_read_multiple_interleaved_events() {
1995        let path = crate::desktop::test_helpers::unique_socket_path();
1996        let server = buffered_server(
1997            &path,
1998            vec![
1999                IpcMessage::Event(IpcEvent::Started),
2000                IpcMessage::Event(IpcEvent::Error {
2001                    message: "warning".into(),
2002                }),
2003                IpcMessage::Event(IpcEvent::Stopped {
2004                    reason: "cancelled".into(),
2005                }),
2006                IpcMessage::Response(IpcResponse {
2007                    ok: true,
2008                    data: Some(serde_json::json!({"running": false})),
2009                    error: None,
2010                }),
2011            ],
2012        )
2013        .await;
2014
2015        let mut client = IpcClient::connect(path.clone()).await.unwrap();
2016        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
2017        assert!(response.ok, "response should be ok");
2018        assert_eq!(events.len(), 3, "should collect all three events");
2019        assert!(
2020            matches!(events[0], IpcEvent::Started),
2021            "first event should be Started"
2022        );
2023        assert!(
2024            matches!(events[1], IpcEvent::Error { .. }),
2025            "second event should be Error"
2026        );
2027        assert!(
2028            matches!(events[2], IpcEvent::Stopped { .. }),
2029            "third event should be Stopped"
2030        );
2031
2032        server.await.unwrap();
2033        let _ = std::fs::remove_file(&path);
2034    }
2035}