Skip to main content

tauri_plugin_background_service/desktop/
ipc_client.rs

1//! Desktop IPC client for the GUI process.
2//!
3//! [`IpcClient`] connects to the headless sidecar's Unix domain socket and
4//! provides methods to start/stop the background service and receive events
5//! over the IPC protocol.
6//!
7//! Only available when the `desktop-service` Cargo feature is enabled.
8
9use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17    decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21#[cfg(test)]
22use crate::models::StopReason;
23use crate::models::{PluginEvent, ServiceStatus, StartConfig};
24
25/// IPC client for communicating with the headless sidecar service.
26///
27/// Connects to the sidecar's Unix domain socket and translates method calls
28/// into [`IpcRequest`] messages. Responses are decoded from [`IpcResponse`]
29/// frames.
30///
31/// Events from the sidecar (started/stopped/error) are read as [`IpcEvent`]
32/// frames and converted to [`PluginEvent`] for emission via the Tauri event
33/// system.
34pub struct IpcClient {
35    stream: TransportStream,
36}
37
38impl IpcClient {
39    /// Connect to the sidecar's IPC socket at the given path.
40    pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
41        let stream = transport::connect(&path).await?;
42        Ok(Self { stream })
43    }
44
45    /// Send a Start command to the sidecar.
46    pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
47        let request = IpcRequest::Start { config };
48        let (response, _events) = self.send_and_read(&request).await?;
49        if response.ok {
50            Ok(())
51        } else {
52            Err(ServiceError::Ipc(
53                response.error.unwrap_or_else(|| "unknown error".into()),
54            ))
55        }
56    }
57
58    /// Send a Stop command to the sidecar.
59    pub async fn stop(&mut self) -> Result<(), ServiceError> {
60        let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
61        if response.ok {
62            Ok(())
63        } else {
64            Err(ServiceError::Ipc(
65                response.error.unwrap_or_else(|| "unknown error".into()),
66            ))
67        }
68    }
69
70    /// Send an IsRunning query to the sidecar.
71    pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
72        let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
73        if response.ok {
74            Ok(response
75                .data
76                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
77                .unwrap_or(false))
78        } else {
79            Err(ServiceError::Ipc(
80                response.error.unwrap_or_else(|| "unknown error".into()),
81            ))
82        }
83    }
84
85    /// Query the current service lifecycle state.
86    pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
87        let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
88        if response.ok {
89            response
90                .data
91                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
92                .and_then(|d| {
93                    serde_json::from_value::<ServiceStatus>(d)
94                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
95                })
96        } else {
97            Err(ServiceError::Ipc(
98                response.error.unwrap_or_else(|| "unknown error".into()),
99            ))
100        }
101    }
102
103    /// Read the next [`IpcEvent`] from the socket.
104    ///
105    /// Returns `None` if the connection was closed.
106    pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
107        let frame = match self.read_frame().await? {
108            Some(f) => f,
109            None => return Ok(None),
110        };
111        match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
112            IpcMessage::Event(event) => Ok(Some(event)),
113            other => Err(ServiceError::Ipc(format!(
114                "expected event frame, got {:?}",
115                std::mem::discriminant(&other),
116            ))),
117        }
118    }
119
120    /// Spawn a background task that reads [`IpcEvent`] frames and emits
121    /// [`PluginEvent`] via the given `AppHandle`.
122    ///
123    /// The task runs until the socket is closed or an error occurs.
124    pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
125        tokio::spawn(async move {
126            loop {
127                match self.read_event().await {
128                    Ok(Some(event)) => {
129                        let plugin_event = ipc_event_to_plugin_event(event);
130                        let _ = app.emit("background-service://event", plugin_event);
131                    }
132                    Ok(None) => break,
133                    Err(_) => break,
134                }
135            }
136        });
137    }
138
139    // -- Private helpers -------------------------------------------------------
140
141    async fn send_and_read(
142        &mut self,
143        request: &IpcRequest,
144    ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
145        self.send_request(request).await?;
146        // The server interleaves IpcResponse and broadcast IpcEvent frames on
147        // the same socket. Read frames in a loop until we get a Response,
148        // collecting any Event frames encountered along the way.
149        let mut events = Vec::new();
150        loop {
151            let frame = self
152                .read_frame()
153                .await?
154                .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
155            match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
156                IpcMessage::Response(resp) => return Ok((resp, events)),
157                IpcMessage::Event(e) => {
158                    events.push(e);
159                }
160                IpcMessage::Request(_) => {
161                    return Err(ServiceError::Ipc("unexpected request frame".into()));
162                }
163            }
164        }
165    }
166
167    async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
168        let msg = IpcMessage::Request(request.clone());
169        let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
170        transport::write_frame(&mut self.stream, &frame)
171            .await
172            .map_err(ServiceError::Ipc)?;
173        Ok(())
174    }
175
176    /// Read a single length-prefixed frame from the socket.
177    ///
178    /// Returns the payload bytes only (no length prefix).
179    /// Returns `None` if the connection was closed cleanly.
180    async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
181        transport::read_frame(&mut self.stream)
182            .await
183            .map_err(ServiceError::Ipc)
184    }
185}
186
187/// Convert an [`IpcEvent`] to a [`PluginEvent`].
188pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
189    match event {
190        IpcEvent::Started => PluginEvent::Started,
191        IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
192        IpcEvent::Error { message } => PluginEvent::Error { message },
193    }
194}
195
196// ─── Persistent IPC Client ────────────────────────────────────────────────────
197
198/// Internal command sent from the handle to the background connection task.
199enum IpcCommand {
200    Start {
201        config: StartConfig,
202        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
203    },
204    Stop {
205        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
206    },
207    IsRunning {
208        reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
209    },
210    GetState {
211        reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
212    },
213    EnableAutoRestart {
214        config: Option<StartConfig>,
215        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
216    },
217    DisableAutoRestart {
218        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
219    },
220    GetDesiredState {
221        reply: tokio::sync::oneshot::Sender<
222            Result<Option<crate::desired_state::DesiredState>, ServiceError>,
223        >,
224    },
225    ValidateSetup {
226        reply: tokio::sync::oneshot::Sender<
227            Result<crate::models::SetupValidationReport, ServiceError>,
228        >,
229    },
230    GetLifecycleStatus {
231        reply: tokio::sync::oneshot::Sender<Result<crate::models::LifecycleStatus, ServiceError>>,
232    },
233}
234
235/// Handle to a persistent IPC client that maintains a long-lived connection
236/// to the headless sidecar.
237///
238/// The background task automatically:
239/// - Relays [`IpcEvent`] frames to `app.emit("background-service://event", ...)`
240/// - Reconnects on connection failure with exponential backoff (1s–30s, up to 10 retries)
241/// - Forwards commands (start/stop/is_running) over the same connection
242pub struct PersistentIpcClientHandle {
243    cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
244    shutdown: tokio_util::sync::CancellationToken,
245    connected: Arc<AtomicBool>,
246    socket_path: PathBuf,
247}
248
249impl Drop for PersistentIpcClientHandle {
250    fn drop(&mut self) {
251        self.shutdown.cancel();
252    }
253}
254
255impl PersistentIpcClientHandle {
256    /// Spawn the persistent IPC client background task.
257    ///
258    /// The task immediately begins trying to connect to the socket at
259    /// `socket_path`. Events are relayed to the Tauri event system via
260    /// `app.emit()`.
261    pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
262        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
263        let shutdown = tokio_util::sync::CancellationToken::new();
264        let connected = Arc::new(AtomicBool::new(false));
265
266        tokio::spawn(persistent_client_loop(
267            socket_path.clone(),
268            app,
269            cmd_rx,
270            shutdown.clone(),
271            connected.clone(),
272        ));
273
274        Self {
275            cmd_tx,
276            shutdown,
277            connected,
278            socket_path,
279        }
280    }
281
282    /// Send a Start command through the persistent connection.
283    pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
284        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
285        self.cmd_tx
286            .send(IpcCommand::Start {
287                config,
288                reply: reply_tx,
289            })
290            .await
291            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
292        reply_rx
293            .await
294            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
295    }
296
297    /// Send a Stop command through the persistent connection.
298    pub async fn stop(&self) -> Result<(), ServiceError> {
299        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
300        self.cmd_tx
301            .send(IpcCommand::Stop { reply: reply_tx })
302            .await
303            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
304        reply_rx
305            .await
306            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
307    }
308
309    /// Query whether the service is running through the persistent connection.
310    pub async fn is_running(&self) -> Result<bool, ServiceError> {
311        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
312        self.cmd_tx
313            .send(IpcCommand::IsRunning { reply: reply_tx })
314            .await
315            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
316        reply_rx
317            .await
318            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
319    }
320
321    /// Query the current service lifecycle state through the persistent connection.
322    pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
323        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
324        self.cmd_tx
325            .send(IpcCommand::GetState { reply: reply_tx })
326            .await
327            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
328        reply_rx
329            .await
330            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
331    }
332
333    /// Returns `true` if the persistent client is currently connected to the
334    /// headless sidecar, `false` otherwise.
335    pub fn is_connected(&self) -> bool {
336        self.connected.load(std::sync::atomic::Ordering::Relaxed)
337    }
338
339    /// Returns the socket path this client is configured to connect to.
340    pub fn socket_path(&self) -> &PathBuf {
341        &self.socket_path
342    }
343
344    /// Wait until the persistent client is connected, polling `is_connected()`
345    /// at 500ms intervals.
346    ///
347    /// Returns `Ok(true)` if connected within the timeout, `Ok(false)` if the
348    /// timeout elapsed without connecting.
349    pub async fn wait_for_connected(&self, timeout: Duration) -> Result<bool, ServiceError> {
350        let deadline = tokio::time::Instant::now() + timeout;
351        let poll_interval = Duration::from_millis(500);
352
353        while tokio::time::Instant::now() < deadline {
354            if self.is_connected() {
355                return Ok(true);
356            }
357            let remaining = deadline - tokio::time::Instant::now();
358            let sleep_dur = poll_interval.min(remaining);
359            tokio::time::sleep(sleep_dur).await;
360        }
361
362        if self.is_connected() {
363            Ok(true)
364        } else {
365            Ok(false)
366        }
367    }
368
369    /// Enable auto-restart through the persistent connection.
370    pub async fn enable_auto_restart(
371        &self,
372        config: Option<StartConfig>,
373    ) -> Result<(), ServiceError> {
374        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
375        self.cmd_tx
376            .send(IpcCommand::EnableAutoRestart {
377                config,
378                reply: reply_tx,
379            })
380            .await
381            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
382        reply_rx
383            .await
384            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
385    }
386
387    /// Disable auto-restart through the persistent connection.
388    pub async fn disable_auto_restart(&self) -> Result<(), ServiceError> {
389        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
390        self.cmd_tx
391            .send(IpcCommand::DisableAutoRestart { reply: reply_tx })
392            .await
393            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
394        reply_rx
395            .await
396            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
397    }
398
399    /// Get the persisted desired-state through the persistent connection.
400    pub async fn get_desired_state(
401        &self,
402    ) -> Result<Option<crate::desired_state::DesiredState>, ServiceError> {
403        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
404        self.cmd_tx
405            .send(IpcCommand::GetDesiredState { reply: reply_tx })
406            .await
407            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
408        reply_rx
409            .await
410            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
411    }
412
413    /// Validate background service setup prerequisites through the persistent connection.
414    pub async fn validate_setup(
415        &self,
416    ) -> Result<crate::models::SetupValidationReport, ServiceError> {
417        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
418        self.cmd_tx
419            .send(IpcCommand::ValidateSetup { reply: reply_tx })
420            .await
421            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
422        reply_rx
423            .await
424            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
425    }
426
427    /// Get the complete lifecycle status snapshot.
428    pub async fn get_lifecycle_status(
429        &self,
430    ) -> Result<crate::models::LifecycleStatus, ServiceError> {
431        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
432        self.cmd_tx
433            .send(IpcCommand::GetLifecycleStatus { reply: reply_tx })
434            .await
435            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
436        reply_rx
437            .await
438            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
439    }
440}
441
442/// Background task: maintain a persistent connection with reconnection.
443async fn persistent_client_loop<R: Runtime>(
444    socket_path: PathBuf,
445    app: tauri::AppHandle<R>,
446    mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
447    shutdown: tokio_util::sync::CancellationToken,
448    connected: Arc<AtomicBool>,
449) {
450    use backon::BackoffBuilder;
451
452    let backoff_builder = backon::ExponentialBuilder::default()
453        .with_min_delay(Duration::from_secs(1))
454        .with_max_delay(Duration::from_secs(30))
455        .with_max_times(10)
456        .with_jitter();
457
458    let mut attempts = backoff_builder.build();
459
460    loop {
461        tokio::select! {
462            biased;
463            _ = shutdown.cancelled() => {
464                log::info!("Persistent IPC client shutting down");
465                connected.store(false, std::sync::atomic::Ordering::Relaxed);
466                break;
467            }
468            connect_result = transport::connect(&socket_path) => {
469                match connect_result {
470                    Ok(stream) => {
471                        log::info!("Persistent IPC client connected");
472                        connected.store(true, std::sync::atomic::Ordering::Relaxed);
473                        let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
474                        // Reset backoff on successful connect (even if session later failed).
475                        attempts = backoff_builder.build();
476                        if result.is_err() {
477                            log::info!("Persistent IPC connection lost, reconnecting...");
478                            connected.store(false, std::sync::atomic::Ordering::Relaxed);
479                        }
480                    }
481                    Err(_) => {
482                        log::debug!("Persistent IPC client: connection failed, retrying...");
483                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
484                    }
485                }
486                let delay = match attempts.next() {
487                    Some(d) => d,
488                    None => {
489                        log::warn!("Persistent IPC client: backoff exhausted, giving up");
490                        break;
491                    }
492                };
493                tokio::select! {
494                    biased;
495                    _ = shutdown.cancelled() => {
496                        log::info!("Persistent IPC client shutting down");
497                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
498                        break;
499                    }
500                    _ = tokio::time::sleep(delay) => {}
501                }
502            }
503        }
504    }
505}
506
507/// Run a single persistent connection until it fails.
508///
509/// Splits the stream into read/write halves:
510/// - A reader task continuously reads frames and relays events to `app.emit()`.
511///   When a response frame arrives, it forwards it via a shared oneshot channel.
512/// - The main loop receives commands from `cmd_rx` and sends requests.
513async fn run_persistent_connection<R: Runtime>(
514    stream: TransportStream,
515    app: &tauri::AppHandle<R>,
516    cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
517    connected: &Arc<AtomicBool>,
518) -> Result<(), ServiceError> {
519    let (read_half, mut write_half) = transport::split(stream);
520
521    // Shared slot for the reader task to deliver response frames.
522    let response_slot: std::sync::Arc<
523        tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
524    > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
525
526    let slot_writer = response_slot.clone();
527    let app_clone = app.clone();
528    let connected_reader = connected.clone();
529
530    // Reader task: reads frames and either relays events or delivers responses.
531    let reader_handle = tokio::spawn(async move {
532        let mut read_half = read_half;
533        loop {
534            let frame = match read_frame_from(&mut read_half).await {
535                Ok(Some(f)) => f,
536                Ok(None) => break, // Connection closed
537                Err(_) => break,
538            };
539
540            match decode_frame(&frame) {
541                Ok(IpcMessage::Response(resp)) => {
542                    let mut slot = slot_writer.lock().await;
543                    if let Some(sender) = slot.take() {
544                        let _ = sender.send(resp);
545                    }
546                    continue;
547                }
548                Ok(IpcMessage::Event(event)) => {
549                    let plugin_event = ipc_event_to_plugin_event(event);
550                    let _ = app_clone.emit("background-service://event", plugin_event);
551                    continue;
552                }
553                Ok(IpcMessage::Request(_)) => {
554                    log::warn!("unexpected request frame on client connection");
555                    continue;
556                }
557                Err(e) => {
558                    log::debug!("failed to decode IPC frame: {e}");
559                    continue;
560                }
561            }
562        }
563        // Reader exited — mark disconnected.
564        connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
565    });
566
567    // Main loop: receive commands, send requests, wait for responses.
568    let result = loop {
569        tokio::select! {
570            cmd = cmd_rx.recv() => {
571                let cmd = match cmd {
572                    Some(c) => c,
573                    None => break Err(ServiceError::Ipc("command channel closed".into())),
574                };
575
576                match cmd {
577                    IpcCommand::Start { config, reply } => {
578                        let request = IpcRequest::Start { config };
579                        let rx = prepare_response_slot(&response_slot).await;
580                        if let Err(e) = send_request_to(&mut write_half, &request).await {
581                            let _ = reply.send(Err(e));
582                            break Err(ServiceError::Ipc("send failed".into()));
583                        }
584                        let response = await_response(rx).await;
585                        let result = match response {
586                            Ok(resp) if resp.ok => Ok(()),
587                            Ok(resp) => Err(ServiceError::Ipc(
588                                resp.error.unwrap_or_else(|| "unknown error".into()),
589                            )),
590                            Err(e) => Err(e),
591                        };
592                        let _ = reply.send(result);
593                    }
594                    IpcCommand::Stop { reply } => {
595                        let rx = prepare_response_slot(&response_slot).await;
596                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
597                            let _ = reply.send(Err(e));
598                            break Err(ServiceError::Ipc("send failed".into()));
599                        }
600                        let response = await_response(rx).await;
601                        let result = match response {
602                            Ok(resp) if resp.ok => Ok(()),
603                            Ok(resp) => Err(ServiceError::Ipc(
604                                resp.error.unwrap_or_else(|| "unknown error".into()),
605                            )),
606                            Err(e) => Err(e),
607                        };
608                        let _ = reply.send(result);
609                    }
610                    IpcCommand::IsRunning { reply } => {
611                        let rx = prepare_response_slot(&response_slot).await;
612                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
613                            let _ = reply.send(Err(e));
614                            break Err(ServiceError::Ipc("send failed".into()));
615                        }
616                        let response = await_response(rx).await;
617                        let result = match response {
618                            Ok(resp) if resp.ok => Ok(resp
619                                .data
620                                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
621                                .unwrap_or(false)),
622                            Ok(resp) => Err(ServiceError::Ipc(
623                                resp.error.unwrap_or_else(|| "unknown error".into()),
624                            )),
625                            Err(e) => Err(e),
626                        };
627                        let _ = reply.send(result);
628                    }
629                    IpcCommand::GetState { reply } => {
630                        let rx = prepare_response_slot(&response_slot).await;
631                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
632                            let _ = reply.send(Err(e));
633                            break Err(ServiceError::Ipc("send failed".into()));
634                        }
635                        let response = await_response(rx).await;
636                        let result = match response {
637                            Ok(resp) if resp.ok => resp
638                                .data
639                                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
640                                .and_then(|d| {
641                                    serde_json::from_value::<ServiceStatus>(d)
642                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
643                                }),
644                            Ok(resp) => Err(ServiceError::Ipc(
645                                resp.error.unwrap_or_else(|| "unknown error".into()),
646                            )),
647                            Err(e) => Err(e),
648                        };
649                        let _ = reply.send(result);
650                    }
651                    IpcCommand::EnableAutoRestart { config, reply } => {
652                        let request = IpcRequest::EnableAutoRestart { config };
653                        let rx = prepare_response_slot(&response_slot).await;
654                        if let Err(e) = send_request_to(&mut write_half, &request).await {
655                            let _ = reply.send(Err(e));
656                            break Err(ServiceError::Ipc("send failed".into()));
657                        }
658                        let response = await_response(rx).await;
659                        let result = match response {
660                            Ok(resp) if resp.ok => Ok(()),
661                            Ok(resp) => Err(ServiceError::Ipc(
662                                resp.error.unwrap_or_else(|| "unknown error".into()),
663                            )),
664                            Err(e) => Err(e),
665                        };
666                        let _ = reply.send(result);
667                    }
668                    IpcCommand::DisableAutoRestart { reply } => {
669                        let rx = prepare_response_slot(&response_slot).await;
670                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::DisableAutoRestart).await {
671                            let _ = reply.send(Err(e));
672                            break Err(ServiceError::Ipc("send failed".into()));
673                        }
674                        let response = await_response(rx).await;
675                        let result = match response {
676                            Ok(resp) if resp.ok => Ok(()),
677                            Ok(resp) => Err(ServiceError::Ipc(
678                                resp.error.unwrap_or_else(|| "unknown error".into()),
679                            )),
680                            Err(e) => Err(e),
681                        };
682                        let _ = reply.send(result);
683                    }
684                    IpcCommand::GetDesiredState { reply } => {
685                        let rx = prepare_response_slot(&response_slot).await;
686                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetDesiredState).await {
687                            let _ = reply.send(Err(e));
688                            break Err(ServiceError::Ipc("send failed".into()));
689                        }
690                        let response = await_response(rx).await;
691                        let result = match response {
692                            Ok(resp) if resp.ok => {
693                                match resp.data {
694                                    Some(d) => serde_json::from_value::<crate::desired_state::DesiredState>(d)
695                                        .map(Some)
696                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetDesiredState: {e}"))),
697                                    None => Ok(None),
698                                }
699                            }
700                            Ok(resp) => Err(ServiceError::Ipc(
701                                resp.error.unwrap_or_else(|| "unknown error".into()),
702                            )),
703                            Err(e) => Err(e),
704                        };
705                        let _ = reply.send(result);
706                    }
707                    IpcCommand::ValidateSetup { reply } => {
708                        let rx = prepare_response_slot(&response_slot).await;
709                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::ValidateSetup).await {
710                            let _ = reply.send(Err(e));
711                            break Err(ServiceError::Ipc("send failed".into()));
712                        }
713                        let response = await_response(rx).await;
714                        let result = match response {
715                            Ok(resp) if resp.ok => {
716                                match resp.data {
717                                    Some(d) => serde_json::from_value::<crate::models::SetupValidationReport>(d)
718                                        .map_err(|e| ServiceError::Ipc(format!("deserialize ValidateSetup: {e}"))),
719                                    None => Err(ServiceError::Ipc("missing ValidateSetup response data".into())),
720                                }
721                            }
722                            Ok(resp) => Err(ServiceError::Ipc(
723                                resp.error.unwrap_or_else(|| "unknown error".into()),
724                            )),
725                            Err(e) => Err(e),
726                        };
727                        let _ = reply.send(result);
728                    }
729                    IpcCommand::GetLifecycleStatus { reply } => {
730                        let rx = prepare_response_slot(&response_slot).await;
731                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetLifecycleStatus).await {
732                            let _ = reply.send(Err(e));
733                            break Err(ServiceError::Ipc("send failed".into()));
734                        }
735                        let response = await_response(rx).await;
736                        let result = match response {
737                            Ok(resp) if resp.ok => {
738                                match resp.data {
739                                    Some(d) => serde_json::from_value::<crate::models::LifecycleStatus>(d)
740                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetLifecycleStatus: {e}"))),
741                                    None => Err(ServiceError::Ipc("missing GetLifecycleStatus response data".into())),
742                                }
743                            }
744                            Ok(resp) => Err(ServiceError::Ipc(
745                                resp.error.unwrap_or_else(|| "unknown error".into()),
746                            )),
747                            Err(e) => Err(e),
748                        };
749                        let _ = reply.send(result);
750                    }
751                }
752            }
753            _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
754                // Timeout — check if reader is still alive
755                if reader_handle.is_finished() {
756                    break Err(ServiceError::Ipc("reader task died".into()));
757                }
758            }
759        }
760    };
761
762    reader_handle.abort();
763    result
764}
765
766/// Send an IPC request frame through a write half.
767async fn send_request_to(
768    write_half: &mut TransportWriteHalf,
769    request: &IpcRequest,
770) -> Result<(), ServiceError> {
771    let msg = IpcMessage::Request(request.clone());
772    let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
773    transport::write_frame(write_half, &frame)
774        .await
775        .map_err(ServiceError::Ipc)?;
776    Ok(())
777}
778
779/// Prepare the shared response slot for an upcoming request.
780///
781/// Creates a oneshot channel and stores the sender in `slot` so the reader
782/// task can deliver the next response. Returns the receiver end.
783///
784/// Must be called **before** sending the request to prevent losing fast
785/// responses that arrive before the slot is set.
786async fn prepare_response_slot(
787    slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
788) -> tokio::sync::oneshot::Receiver<IpcResponse> {
789    let (tx, rx) = tokio::sync::oneshot::channel();
790    let mut guard = slot.lock().await;
791    debug_assert!(
792        guard.is_none(),
793        "response slot overwritten — sequential command invariant violated"
794    );
795    *guard = Some(tx);
796    rx
797}
798
799/// Await a response from the reader task with a timeout.
800///
801/// Returns `Err` if the response doesn't arrive within 10 seconds, preventing
802/// permanent hangs when the connection drops during command processing.
803async fn await_response(
804    rx: tokio::sync::oneshot::Receiver<IpcResponse>,
805) -> Result<IpcResponse, ServiceError> {
806    tokio::select! {
807        response = rx => {
808            response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
809        }
810        _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
811            Err(ServiceError::Ipc("response timeout".into()))
812        }
813    }
814}
815
816/// Read a single length-prefixed frame from a read half.
817///
818/// Returns the payload bytes only (no length prefix).
819async fn read_frame_from(
820    read_half: &mut TransportReadHalf,
821) -> Result<Option<Vec<u8>>, ServiceError> {
822    transport::read_frame(read_half)
823        .await
824        .map_err(ServiceError::Ipc)
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    use crate::desktop::test_helpers::{
831        setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
832    };
833    use std::sync::atomic::Ordering;
834    use std::time::Duration;
835    use tauri::Listener;
836
837    // -- AC1: Client connects ---------------------------------------------------
838
839    #[tokio::test]
840    async fn ipc_client_connect() {
841        let (path, shutdown, _event_tx) = setup_server();
842        let result = IpcClient::connect(path).await;
843        assert!(result.is_ok(), "client should connect: {:?}", result.err());
844        shutdown.cancel();
845    }
846
847    // -- AC2: Start command works -----------------------------------------------
848
849    #[tokio::test]
850    async fn ipc_client_send_start() {
851        let (path, shutdown, _event_tx) = setup_server();
852        let mut client = IpcClient::connect(path).await.unwrap();
853        let result = client.start(StartConfig::default()).await;
854        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
855        shutdown.cancel();
856    }
857
858    // -- AC3: Stop command works ------------------------------------------------
859
860    #[tokio::test]
861    async fn ipc_client_send_stop() {
862        let (path, shutdown, _event_tx) = setup_server();
863        let mut client = IpcClient::connect(path).await.unwrap();
864        client.start(StartConfig::default()).await.unwrap();
865        let result = client.stop().await;
866        assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
867        shutdown.cancel();
868    }
869
870    // -- AC4: IsRunning returns status ------------------------------------------
871
872    #[tokio::test]
873    async fn ipc_client_is_running() {
874        let (path, shutdown, _event_tx) = setup_server();
875        let mut client = IpcClient::connect(path).await.unwrap();
876
877        let running = client.is_running().await.unwrap();
878        assert!(!running, "should not be running initially");
879
880        client.start(StartConfig::default()).await.unwrap();
881        let running = client.is_running().await.unwrap();
882        assert!(running, "should be running after start");
883
884        shutdown.cancel();
885    }
886
887    // -- GetState returns ServiceStatus ------------------------------------------
888
889    #[tokio::test]
890    async fn ipc_client_get_state_initial() {
891        let (path, shutdown, _event_tx) = setup_server();
892        let mut client = IpcClient::connect(path).await.unwrap();
893
894        let status = client.get_state().await.unwrap();
895        assert!(
896            matches!(status.state, crate::models::ServiceState::Idle),
897            "expected Idle, got {:?}",
898            status.state
899        );
900        assert_eq!(status.last_error, None);
901
902        shutdown.cancel();
903    }
904
905    #[tokio::test]
906    async fn ipc_client_get_state_after_start() {
907        let (path, shutdown, _event_tx) = setup_server();
908        let mut client = IpcClient::connect(path).await.unwrap();
909
910        client.start(StartConfig::default()).await.unwrap();
911
912        // Poll until Running — Start replies at Initializing, spawned task
913        // transitions to Running asynchronously.
914        let status = tokio::time::timeout(Duration::from_secs(2), async {
915            loop {
916                let s = client.get_state().await.unwrap();
917                if matches!(s.state, crate::models::ServiceState::Running) {
918                    return s;
919                }
920                tokio::time::sleep(Duration::from_millis(10)).await;
921            }
922        })
923        .await
924        .expect("timed out waiting for Running state");
925        assert_eq!(status.last_error, None);
926
927        shutdown.cancel();
928    }
929
930    #[tokio::test]
931    async fn ipc_client_get_state_after_stop() {
932        let (path, shutdown, _event_tx) = setup_server();
933        let mut client = IpcClient::connect(path).await.unwrap();
934
935        client.start(StartConfig::default()).await.unwrap();
936        client.stop().await.unwrap();
937        let status = client.get_state().await.unwrap();
938        assert!(
939            matches!(status.state, crate::models::ServiceState::Stopped),
940            "expected Stopped, got {:?}",
941            status.state
942        );
943
944        shutdown.cancel();
945    }
946
947    // -- AC5: Events are received -----------------------------------------------
948
949    #[tokio::test]
950    async fn ipc_client_receive_events() {
951        let (path, shutdown, event_tx) =
952            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
953        let mut client = IpcClient::connect(path).await.unwrap();
954        client.start(StartConfig::default()).await.unwrap();
955
956        // Simulate relay broadcasting Started
957        let _ = event_tx.send(IpcEvent::Started);
958
959        let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
960            .await
961            .expect("timed out waiting for event")
962            .expect("read_event failed");
963
964        assert!(event.is_some(), "should receive an event");
965        let event = event.unwrap();
966        assert!(
967            matches!(event, IpcEvent::Started),
968            "Expected Started event, got {:?}",
969            event
970        );
971
972        shutdown.cancel();
973    }
974
975    // -- Additional: Stop when not running returns error -------------------------
976
977    #[tokio::test]
978    async fn ipc_client_stop_when_not_running() {
979        let (path, shutdown, _event_tx) = setup_server();
980        let mut client = IpcClient::connect(path).await.unwrap();
981        let result = client.stop().await;
982        assert!(result.is_err(), "stop when not running should fail");
983        shutdown.cancel();
984    }
985
986    // -- Additional: Connect to nonexistent socket fails -------------------------
987
988    #[tokio::test]
989    async fn ipc_client_connect_to_nonexistent() {
990        let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
991        let result = IpcClient::connect(path).await;
992        assert!(
993            result.is_err(),
994            "should fail to connect to nonexistent socket"
995        );
996    }
997
998    // -- Additional: ipc_event_to_plugin_event conversion -----------------------
999
1000    #[test]
1001    fn ipc_event_to_plugin_event_started() {
1002        let event = IpcEvent::Started;
1003        let plugin = ipc_event_to_plugin_event(event);
1004        assert!(matches!(plugin, PluginEvent::Started));
1005    }
1006
1007    #[test]
1008    fn ipc_event_to_plugin_event_stopped() {
1009        let event = IpcEvent::Stopped {
1010            reason: StopReason::UserStop,
1011        };
1012        let plugin = ipc_event_to_plugin_event(event);
1013        match plugin {
1014            PluginEvent::Stopped { reason } => assert_eq!(reason, StopReason::UserStop),
1015            other => panic!("Expected Stopped, got {other:?}"),
1016        }
1017    }
1018
1019    #[test]
1020    fn ipc_event_to_plugin_event_error() {
1021        let event = IpcEvent::Error {
1022            message: "init failed".into(),
1023        };
1024        let plugin = ipc_event_to_plugin_event(event);
1025        match plugin {
1026            PluginEvent::Error { message } => assert_eq!(message, "init failed"),
1027            other => panic!("Expected Error, got {other:?}"),
1028        }
1029    }
1030
1031    // -- Additional: Full lifecycle ---------------------------------------------
1032
1033    #[tokio::test]
1034    async fn ipc_client_full_lifecycle() {
1035        let (path, shutdown, _event_tx) = setup_server();
1036        let mut client = IpcClient::connect(path).await.unwrap();
1037
1038        assert!(!client.is_running().await.unwrap());
1039        client.start(StartConfig::default()).await.unwrap();
1040        assert!(client.is_running().await.unwrap());
1041        client.stop().await.unwrap();
1042        assert!(!client.is_running().await.unwrap());
1043
1044        shutdown.cancel();
1045    }
1046
1047    // -- Additional: listen_events spawns and converts events -------------------
1048
1049    #[tokio::test]
1050    async fn ipc_client_listen_events() {
1051        let (path, shutdown, event_tx) =
1052            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1053        let app = tauri::test::mock_app();
1054
1055        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1056        let received_clone = received.clone();
1057        app.listen("background-service://event", move |_event| {
1058            received_clone.store(true, Ordering::SeqCst);
1059        });
1060
1061        let mut client = IpcClient::connect(path).await.unwrap();
1062        client.start(StartConfig::default()).await.unwrap();
1063        client.listen_events(app.handle().clone());
1064
1065        // Simulate relay broadcasting Started
1066        let _ = event_tx.send(IpcEvent::Started);
1067
1068        tokio::time::timeout(Duration::from_millis(500), async {
1069            while !received.load(Ordering::SeqCst) {
1070                tokio::time::sleep(Duration::from_millis(10)).await;
1071            }
1072        })
1073        .await
1074        .expect("timed out waiting for event via listen_events");
1075
1076        assert!(
1077            received.load(Ordering::SeqCst),
1078            "should have received event"
1079        );
1080        shutdown.cancel();
1081    }
1082
1083    // ═══════════════════════════════════════════════════════════════════════
1084    //  IPC LOOPBACK TESTS (Step 20 — AC2, AC3, AC4)
1085    // ═══════════════════════════════════════════════════════════════════════
1086
1087    // -- AC2: IPC loopback full lifecycle with event verification ---------------
1088
1089    /// Comprehensive IPC loopback: IpcServer + IpcClient in the same process.
1090    /// Exercises start → Started event → running → stop → Stopped event → stopped.
1091    ///
1092    /// Note: IpcEvent frames must be read BEFORE other requests because
1093    /// `send_and_read` skips event frames looking for IpcResponse.
1094    #[tokio::test]
1095    async fn ipc_loopback_full_lifecycle_with_events() {
1096        let (path, shutdown, event_tx) = setup_server();
1097        let mut client = IpcClient::connect(path).await.unwrap();
1098
1099        // Initially not running
1100        assert!(
1101            !client.is_running().await.unwrap(),
1102            "should not be running initially"
1103        );
1104
1105        // Start the service
1106        client
1107            .start(StartConfig::default())
1108            .await
1109            .expect("start should succeed");
1110
1111        // Simulate relay broadcasting Started
1112        let _ = event_tx.send(IpcEvent::Started);
1113
1114        // Read the Started event BEFORE any other request
1115        // (send_and_read on subsequent calls would skip buffered events)
1116        let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1117            .await
1118            .expect("timed out waiting for Started event")
1119            .expect("read_event failed")
1120            .expect("should receive event");
1121        assert!(
1122            matches!(started, IpcEvent::Started),
1123            "Expected Started event, got {started:?}"
1124        );
1125
1126        // Verify running (after consuming the event)
1127        assert!(
1128            client.is_running().await.unwrap(),
1129            "should be running after start"
1130        );
1131
1132        // Stop the service
1133        client.stop().await.expect("stop should succeed");
1134
1135        // Simulate relay broadcasting Stopped
1136        let _ = event_tx.send(IpcEvent::Stopped {
1137            reason: StopReason::UserStop,
1138        });
1139
1140        // Read the Stopped event BEFORE any other request
1141        let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1142            .await
1143            .expect("timed out waiting for Stopped event")
1144            .expect("read_event failed")
1145            .expect("should receive event");
1146        assert!(
1147            matches!(stopped, IpcEvent::Stopped { .. }),
1148            "Expected Stopped event, got {stopped:?}"
1149        );
1150
1151        // Verify not running
1152        assert!(
1153            !client.is_running().await.unwrap(),
1154            "should not be running after stop"
1155        );
1156
1157        shutdown.cancel();
1158    }
1159
1160    // -- AC3: Event streaming converts IpcEvent to PluginEvent -------------------
1161
1162    /// Verify events streamed through IPC are correctly converted to PluginEvent.
1163    #[tokio::test]
1164    async fn ipc_loopback_event_streaming_plugin_event_conversion() {
1165        let (path, shutdown, event_tx) = setup_server();
1166        let mut client = IpcClient::connect(path).await.unwrap();
1167
1168        // Start — simulate relay broadcasting Started
1169        client.start(StartConfig::default()).await.unwrap();
1170        let _ = event_tx.send(IpcEvent::Started);
1171        let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1172            .await
1173            .expect("timed out")
1174            .expect("read_event failed")
1175            .expect("should receive event");
1176        let started_plugin = ipc_event_to_plugin_event(started_ipc);
1177        assert!(
1178            matches!(started_plugin, PluginEvent::Started),
1179            "Expected PluginEvent::Started, got {started_plugin:?}"
1180        );
1181
1182        // Stop — simulate relay broadcasting Stopped
1183        client.stop().await.unwrap();
1184        let _ = event_tx.send(IpcEvent::Stopped {
1185            reason: StopReason::UserStop,
1186        });
1187        let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1188            .await
1189            .expect("timed out")
1190            .expect("read_event failed")
1191            .expect("should receive event");
1192        let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
1193        match stopped_plugin {
1194            PluginEvent::Stopped { reason } => {
1195                assert_eq!(reason, StopReason::UserStop, "Expected UserStop reason");
1196            }
1197            other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
1198        }
1199
1200        shutdown.cancel();
1201    }
1202
1203    // -- AC4: Error handling — connection drop detected by client ---------------
1204
1205    /// Verify client detects a dropped connection gracefully (no panic).
1206    /// Simulates the server side closing the socket mid-connection.
1207    #[tokio::test]
1208    async fn ipc_loopback_connection_drop_returns_error() {
1209        let path = crate::desktop::test_helpers::unique_socket_path();
1210
1211        // Create a minimal "server" that accepts one connection then drops it.
1212        let listener = transport::bind(path.clone()).unwrap();
1213        let path_clone = path.clone();
1214
1215        let client_handle =
1216            tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
1217
1218        // Accept the connection and immediately drop the server-side stream.
1219        let (server_stream, _) = listener.accept().await.unwrap();
1220        drop(server_stream);
1221        tokio::time::sleep(Duration::from_millis(20)).await;
1222
1223        let mut client = client_handle.await.unwrap();
1224
1225        // Client should detect the closed connection on next operation.
1226        let result = client.is_running().await;
1227        assert!(
1228            result.is_err(),
1229            "should get error after server drops connection"
1230        );
1231
1232        let _ = std::fs::remove_file(&path);
1233    }
1234
1235    // -- AC4: Error handling — double start returns error through IPC ------------
1236
1237    /// Verify second start (when already running) returns an IPC error.
1238    #[tokio::test]
1239    async fn ipc_loopback_double_start_returns_error() {
1240        let (path, shutdown, _event_tx) = setup_server();
1241        let mut client = IpcClient::connect(path).await.unwrap();
1242
1243        client.start(StartConfig::default()).await.unwrap();
1244
1245        let result = client.start(StartConfig::default()).await;
1246        assert!(result.is_err(), "double start should return error");
1247        let err_msg = result.unwrap_err().to_string();
1248        assert!(
1249            err_msg.to_lowercase().contains("already"),
1250            "Error should mention 'already': {err_msg}"
1251        );
1252
1253        shutdown.cancel();
1254    }
1255
1256    // ═══════════════════════════════════════════════════════════════════════
1257    //  PERSISTENT IPC CLIENT TESTS (Step 12)
1258    // ═══════════════════════════════════════════════════════════════════════
1259
1260    // -- AC1: Persistent client connects and maintains connection --
1261
1262    /// Verify the persistent client connects to a running server and can
1263    /// forward commands through the persistent connection.
1264    #[tokio::test]
1265    async fn persistent_client_connects() {
1266        let (path, shutdown, _event_tx) = setup_server();
1267        let app = tauri::test::mock_app();
1268
1269        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1270
1271        // Give the background task time to connect.
1272        tokio::time::sleep(Duration::from_millis(100)).await;
1273
1274        // Send a command through the persistent connection.
1275        let running = handle.is_running().await;
1276        assert!(
1277            running.is_ok(),
1278            "should get response via persistent connection: {:?}",
1279            running.err()
1280        );
1281        assert!(!running.unwrap(), "should not be running initially");
1282
1283        shutdown.cancel();
1284    }
1285
1286    // -- AC3: Auto-reconnect --
1287
1288    /// Verify the persistent client reconnects after the server restarts.
1289    #[tokio::test]
1290    async fn persistent_client_reconnects() {
1291        use crate::desktop::ipc_server::IpcServer;
1292        use crate::manager::{manager_loop, ServiceFactory};
1293        use tokio_util::sync::CancellationToken;
1294
1295        // First server
1296        let (path, shutdown1, _event_tx) = setup_server();
1297        let app = tauri::test::mock_app();
1298
1299        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1300
1301        // Verify connected to first server.
1302        tokio::time::sleep(Duration::from_millis(100)).await;
1303        let result = handle.is_running().await;
1304        assert!(
1305            result.is_ok(),
1306            "should connect to first server: {:?}",
1307            result.err()
1308        );
1309
1310        // Kill first server and wait for socket cleanup.
1311        shutdown1.cancel();
1312        tokio::time::sleep(Duration::from_millis(150)).await;
1313
1314        // Start second server at the same path.
1315        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1316        let factory: ServiceFactory<tauri::test::MockRuntime> =
1317            Box::new(|| Box::new(BlockingService));
1318        tokio::spawn(manager_loop(
1319            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1320        ));
1321        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1322        let shutdown2 = CancellationToken::new();
1323        let s2 = shutdown2.clone();
1324        tokio::spawn(async move { server2.run(s2).await });
1325
1326        // Wait for the client to reconnect (1s reconnect delay + margin).
1327        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1328            loop {
1329                tokio::time::sleep(Duration::from_millis(200)).await;
1330                if handle.is_running().await.is_ok() {
1331                    break;
1332                }
1333            }
1334        })
1335        .await;
1336        assert!(
1337            reconnected.is_ok(),
1338            "persistent client should reconnect to second server"
1339        );
1340
1341        shutdown2.cancel();
1342    }
1343
1344    // -- AC2: Event relay via app.emit() --
1345
1346    /// Verify events from the server are relayed to `app.emit()` by the
1347    /// persistent client's background reader task.
1348    #[tokio::test]
1349    async fn event_relay() {
1350        let (path, shutdown, event_tx) =
1351            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1352        let app = tauri::test::mock_app();
1353
1354        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1355        let received_clone = received.clone();
1356        app.listen("background-service://event", move |_event| {
1357            received_clone.store(true, Ordering::SeqCst);
1358        });
1359
1360        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1361
1362        // Start the service — the reader task should relay the Started event.
1363        let result = handle.start(StartConfig::default()).await;
1364        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1365
1366        // Simulate relay broadcasting Started
1367        let _ = event_tx.send(IpcEvent::Started);
1368
1369        // Wait for the event to be relayed via app.emit().
1370        tokio::time::timeout(Duration::from_millis(500), async {
1371            while !received.load(Ordering::SeqCst) {
1372                tokio::time::sleep(Duration::from_millis(10)).await;
1373            }
1374        })
1375        .await
1376        .expect("timed out waiting for event relay via app.emit()");
1377
1378        assert!(
1379            received.load(Ordering::SeqCst),
1380            "event should be relayed through app.emit()"
1381        );
1382
1383        shutdown.cancel();
1384    }
1385
1386    // -- AC4: Start/Stop lifecycle through persistent client --
1387
1388    /// Verify the full start → running → stop → not-running lifecycle works
1389    /// through the persistent IPC client.
1390    #[tokio::test]
1391    async fn start_stop_lifecycle() {
1392        let (path, shutdown, _event_tx) = setup_server();
1393        let app = tauri::test::mock_app();
1394
1395        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1396
1397        // Initially not running.
1398        let running = handle.is_running().await.unwrap();
1399        assert!(!running, "should not be running initially");
1400
1401        // Start.
1402        handle
1403            .start(StartConfig::default())
1404            .await
1405            .expect("start should succeed");
1406        let running = handle.is_running().await.unwrap();
1407        assert!(running, "should be running after start");
1408
1409        // Stop.
1410        handle.stop().await.expect("stop should succeed");
1411        let running = handle.is_running().await.unwrap();
1412        assert!(!running, "should not be running after stop");
1413
1414        shutdown.cancel();
1415    }
1416
1417    // -- GetState through persistent client --
1418
1419    #[tokio::test]
1420    async fn persistent_client_get_state() {
1421        let (path, shutdown, _event_tx) = setup_server();
1422        let app = tauri::test::mock_app();
1423
1424        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1425
1426        // Give the background task time to connect.
1427        tokio::time::sleep(Duration::from_millis(100)).await;
1428
1429        let status = handle.get_state().await.unwrap();
1430        assert!(
1431            matches!(status.state, crate::models::ServiceState::Idle),
1432            "expected Idle, got {:?}",
1433            status.state
1434        );
1435
1436        handle.start(StartConfig::default()).await.unwrap();
1437
1438        // Poll until Running — race between Start reply (Initializing) and
1439        // spawned task transition to Running.
1440        let status = tokio::time::timeout(Duration::from_secs(2), async {
1441            loop {
1442                let s = handle.get_state().await.unwrap();
1443                if matches!(s.state, crate::models::ServiceState::Running) {
1444                    return s;
1445                }
1446                tokio::time::sleep(Duration::from_millis(10)).await;
1447            }
1448        })
1449        .await
1450        .expect("timed out waiting for Running state");
1451        assert!(
1452            matches!(status.state, crate::models::ServiceState::Running),
1453            "expected Running, got {:?}",
1454            status.state
1455        );
1456
1457        shutdown.cancel();
1458    }
1459
1460    // -- Fix: Timeout prevents permanent hang on unresponsive server --
1461
1462    /// Verify the persistent client returns an error (not hang) when the
1463    /// server accepts a connection but never responds to a command.
1464    ///
1465    /// This is a regression test for the critical bug where `wait_for_response`
1466    /// had no timeout — a dropped connection during command processing caused
1467    /// both the reconnect loop and the caller to hang permanently.
1468    #[tokio::test]
1469    async fn persistent_client_timeout_on_unresponsive_server() {
1470        let path = crate::desktop::test_helpers::unique_socket_path();
1471        let listener = transport::bind(path.clone()).unwrap();
1472
1473        // Server that accepts the connection but never responds.
1474        let server_handle = tokio::spawn(async move {
1475            let (_stream, _) = listener.accept().await.unwrap();
1476            // Hold connection open — never send a response.
1477            tokio::time::sleep(Duration::from_secs(60)).await;
1478        });
1479
1480        let app = tauri::test::mock_app();
1481        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1482
1483        // Give the background task time to connect.
1484        tokio::time::sleep(Duration::from_millis(100)).await;
1485
1486        // Start should timeout and return an error, not hang forever.
1487        let result = tokio::time::timeout(
1488            Duration::from_secs(15),
1489            handle.start(StartConfig::default()),
1490        )
1491        .await;
1492
1493        assert!(
1494            result.is_ok(),
1495            "start should not hang — expected error, got outer timeout"
1496        );
1497        let inner = result.unwrap();
1498        assert!(
1499            inner.is_err(),
1500            "start should return error when server is unresponsive"
1501        );
1502
1503        server_handle.abort();
1504        let _ = std::fs::remove_file(&path);
1505    }
1506
1507    // -- C1: Persistent client terminates on handle drop --
1508
1509    /// Verify that dropping `PersistentIpcClientHandle` causes the background
1510    /// reconnection task to stop (via `CancellationToken`), preventing resource
1511    /// leaks where the task reconnects forever after the handle is dropped.
1512    #[tokio::test]
1513    async fn persistent_client_terminates_on_handle_drop() {
1514        let (path, shutdown, _event_tx) = setup_server();
1515        let app = tauri::test::mock_app();
1516
1517        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1518
1519        // Give the background task time to connect.
1520        tokio::time::sleep(Duration::from_millis(100)).await;
1521
1522        // Drop the handle — this should cancel the shutdown token.
1523        drop(handle);
1524
1525        // The background task should terminate within a bounded time.
1526        // We can't observe the JoinHandle directly (it's fire-and-forget),
1527        // but we can verify the socket isn't being reconnected to by checking
1528        // that server shutdown succeeds cleanly.
1529        tokio::time::sleep(Duration::from_secs(2)).await;
1530
1531        shutdown.cancel();
1532    }
1533
1534    // ═══════════════════════════════════════════════════════════════════════
1535    //  BUFFERED EVENTS TESTS (Step 4)
1536    // ═══════════════════════════════════════════════════════════════════════
1537
1538    /// Helper: create a raw server that sends specific frames in response to
1539    /// any request, giving full control over the event/response interleaving.
1540    async fn buffered_server(
1541        path: &std::path::Path,
1542        frames: Vec<IpcMessage>,
1543    ) -> tokio::task::JoinHandle<()> {
1544        let listener = transport::bind(path.to_path_buf()).unwrap();
1545        tokio::spawn(async move {
1546            let (mut stream, _) = listener.accept().await.unwrap();
1547            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1548            // Read and discard the incoming request.
1549            let mut len_buf = [0u8; 4];
1550            if stream.read_exact(&mut len_buf).await.is_err() {
1551                return;
1552            }
1553            let len = u32::from_be_bytes(len_buf) as usize;
1554            let mut payload = vec![0u8; len];
1555            if stream.read_exact(&mut payload).await.is_err() {
1556                return;
1557            }
1558            // Send the pre-programmed frames in order.
1559            for msg in &frames {
1560                let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1561                if stream.write_all(&frame).await.is_err() {
1562                    return;
1563                }
1564            }
1565        })
1566    }
1567
1568    /// send_and_read returns response with empty event list when no events interleave.
1569    #[tokio::test]
1570    async fn send_and_read_no_interleaved_events() {
1571        let path = crate::desktop::test_helpers::unique_socket_path();
1572        let server = buffered_server(
1573            &path,
1574            vec![IpcMessage::Response(IpcResponse {
1575                ok: true,
1576                data: None,
1577                error: None,
1578            })],
1579        )
1580        .await;
1581
1582        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1583        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1584        assert!(response.ok, "response should be ok");
1585        assert!(
1586            events.is_empty(),
1587            "events should be empty when no events interleave, got {:?}",
1588            events
1589        );
1590
1591        server.await.unwrap();
1592        let _ = std::fs::remove_file(&path);
1593    }
1594
1595    /// send_and_read collects a single interleaved event alongside the response.
1596    #[tokio::test]
1597    async fn send_and_read_single_interleaved_event() {
1598        let path = crate::desktop::test_helpers::unique_socket_path();
1599        let server = buffered_server(
1600            &path,
1601            vec![
1602                IpcMessage::Event(IpcEvent::Started),
1603                IpcMessage::Response(IpcResponse {
1604                    ok: true,
1605                    data: None,
1606                    error: None,
1607                }),
1608            ],
1609        )
1610        .await;
1611
1612        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1613        let (response, events) = client
1614            .send_and_read(&IpcRequest::Start {
1615                config: StartConfig::default(),
1616            })
1617            .await
1618            .unwrap();
1619        assert!(response.ok, "response should be ok");
1620        assert_eq!(events.len(), 1, "should collect exactly one event");
1621        assert!(
1622            matches!(events[0], IpcEvent::Started),
1623            "expected Started event, got {:?}",
1624            events[0]
1625        );
1626
1627        server.await.unwrap();
1628        let _ = std::fs::remove_file(&path);
1629    }
1630
1631    // ═══════════════════════════════════════════════════════════════════════
1632    //  IS_CONNECTED TESTS (Step 5)
1633    // ═══════════════════════════════════════════════════════════════════════
1634
1635    /// is_connected() returns false before the background task has connected
1636    /// to any server.
1637    #[tokio::test]
1638    async fn is_connected_false_before_server() {
1639        let app = tauri::test::mock_app();
1640        let path = crate::desktop::test_helpers::unique_socket_path();
1641        // No server running — spawn handle pointing at a nonexistent socket.
1642        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1643        // The background task may or may not have attempted a connection yet,
1644        // but it should definitely NOT be connected.
1645        tokio::time::sleep(Duration::from_millis(50)).await;
1646        assert!(
1647            !handle.is_connected(),
1648            "should not be connected when no server is running"
1649        );
1650        let _ = std::fs::remove_file(&path);
1651    }
1652
1653    /// is_connected() returns true once the persistent client has established
1654    /// a connection to a running server.
1655    #[tokio::test]
1656    async fn is_connected_true_after_connect() {
1657        let (path, shutdown, _event_tx) = setup_server();
1658        let app = tauri::test::mock_app();
1659        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1660
1661        // Wait for the background task to connect.
1662        tokio::time::timeout(Duration::from_secs(2), async {
1663            while !handle.is_connected() {
1664                tokio::time::sleep(Duration::from_millis(50)).await;
1665            }
1666        })
1667        .await
1668        .expect("timed out waiting for is_connected to become true");
1669
1670        assert!(
1671            handle.is_connected(),
1672            "should be connected after server is up"
1673        );
1674
1675        shutdown.cancel();
1676    }
1677
1678    /// is_connected() returns false after the server shuts down and the
1679    /// persistent client detects the disconnection.
1680    ///
1681    /// Uses a minimal server that accepts one connection then explicitly drops
1682    /// it, guaranteeing the reader task exits and sets connected = false.
1683    #[tokio::test]
1684    async fn is_connected_false_after_server_shutdown() {
1685        let path = crate::desktop::test_helpers::unique_socket_path();
1686        let path_clone = path.clone();
1687        let listener = transport::bind(path.clone()).unwrap();
1688
1689        // Server that accepts a connection, waits briefly, then drops
1690        // everything (stream + listener), preventing reconnection.
1691        let server_handle = tokio::spawn(async move {
1692            let (stream, _) = listener.accept().await.unwrap();
1693            // Hold the connection briefly so the client can connect.
1694            tokio::time::sleep(Duration::from_millis(200)).await;
1695            // Drop the stream — reader will detect EOF.
1696            drop(stream);
1697            // Drop the listener (moved into this closure) and clean up socket.
1698            let _ = std::fs::remove_file(&path_clone);
1699        });
1700
1701        let app = tauri::test::mock_app();
1702        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1703
1704        // Wait for connection.
1705        tokio::time::timeout(Duration::from_secs(2), async {
1706            while !handle.is_connected() {
1707                tokio::time::sleep(Duration::from_millis(50)).await;
1708            }
1709        })
1710        .await
1711        .expect("timed out waiting for initial connection");
1712
1713        assert!(handle.is_connected(), "should be connected initially");
1714
1715        // Wait for the server to drop the connection and listener.
1716        tokio::time::timeout(Duration::from_secs(3), async {
1717            while handle.is_connected() {
1718                tokio::time::sleep(Duration::from_millis(50)).await;
1719            }
1720        })
1721        .await
1722        .expect("timed out waiting for is_connected to become false");
1723
1724        assert!(
1725            !handle.is_connected(),
1726            "should not be connected after server shutdown"
1727        );
1728
1729        server_handle.abort();
1730        let _ = std::fs::remove_file(&path);
1731    }
1732
1733    // ═══════════════════════════════════════════════════════════════════════
1734    //  BACKOFF BEHAVIOR TESTS (Step 6c)
1735    // ═══════════════════════════════════════════════════════════════════════
1736
1737    /// Verify the ExponentialBuilder config used in persistent_client_loop
1738    /// produces increasing delays, respects the 30s max, and exhausts after
1739    /// exactly 10 attempts.
1740    #[test]
1741    fn backoff_builder_produces_increasing_delays() {
1742        use backon::BackoffBuilder;
1743
1744        let builder = backon::ExponentialBuilder::default()
1745            .with_min_delay(Duration::from_secs(1))
1746            .with_max_delay(Duration::from_secs(30))
1747            .with_max_times(10)
1748            .with_jitter();
1749
1750        let mut attempts = builder.build();
1751        let mut delays = Vec::new();
1752        for d in attempts.by_ref() {
1753            delays.push(d);
1754        }
1755
1756        assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1757
1758        // First delay ≈ 1s (with jitter, allow 0.5–2s).
1759        assert!(
1760            delays[0] >= Duration::from_millis(500),
1761            "first delay too short: {:?}",
1762            delays[0]
1763        );
1764        assert!(
1765            delays[0] <= Duration::from_secs(2),
1766            "first delay too long: {:?}",
1767            delays[0]
1768        );
1769
1770        // Last delay should be at or near the 30s cap.
1771        assert!(
1772            delays[9] >= Duration::from_secs(15),
1773            "last delay should approach max: {:?}",
1774            delays[9]
1775        );
1776
1777        // All delays capped — with jitter, allow up to 2× max_delay.
1778        for d in &delays {
1779            assert!(
1780                *d <= Duration::from_secs(60),
1781                "delay exceeds max_delay + jitter margin: {:?}",
1782                d
1783            );
1784        }
1785
1786        // Iterator is exhausted after 10 attempts.
1787        assert!(
1788            attempts.next().is_none(),
1789            "should return None after 10 attempts"
1790        );
1791    }
1792
1793    /// Verify the persistent client stops retrying after exhausting its backoff
1794    /// budget and that subsequent commands fail with "shut down" (channel closed).
1795    ///
1796    /// With min_delay=1s, max_delay=30s, max_times=10, total retry time ≈ 152s.
1797    /// Marked `#[ignore]` to avoid slowing down normal test runs.
1798    /// Run with `cargo test -- --ignored`.
1799    #[ignore]
1800    #[tokio::test]
1801    async fn persistent_client_exits_after_max_retries() {
1802        let app = tauri::test::mock_app();
1803        let path = crate::desktop::test_helpers::unique_socket_path();
1804        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1805
1806        // Wait for the background task to exhaust retries.
1807        // Poll is_running() — once the loop exits, the command channel closes
1808        // and we get "shut down" instead of a timeout.
1809        let exited = tokio::time::timeout(Duration::from_secs(180), async {
1810            loop {
1811                tokio::time::sleep(Duration::from_secs(5)).await;
1812                if let Err(e) = handle.is_running().await {
1813                    if e.to_string().contains("shut down") {
1814                        return;
1815                    }
1816                }
1817            }
1818        })
1819        .await;
1820
1821        assert!(
1822            exited.is_ok(),
1823            "persistent client should exit after max retries"
1824        );
1825        assert!(!handle.is_connected(), "should not be connected after exit");
1826
1827        let _ = std::fs::remove_file(&path);
1828    }
1829
1830    /// Verify the persistent client reconnects after a server restart and that
1831    /// the backoff resets (reconnection starts from ~1s min_delay, not an
1832    /// accumulated delay).
1833    #[tokio::test]
1834    async fn persistent_client_reconnects_after_server_restart() {
1835        use crate::desktop::ipc_server::IpcServer;
1836        use crate::manager::{manager_loop, ServiceFactory};
1837        use tokio_util::sync::CancellationToken;
1838
1839        // Start first server.
1840        let (path, shutdown1, _event_tx) = setup_server();
1841        let app = tauri::test::mock_app();
1842        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1843
1844        // Wait for connection to first server.
1845        tokio::time::timeout(Duration::from_secs(2), async {
1846            while !handle.is_connected() {
1847                tokio::time::sleep(Duration::from_millis(50)).await;
1848            }
1849        })
1850        .await
1851        .expect("should connect to first server");
1852
1853        // Verify commands work through first connection.
1854        let result = handle.is_running().await;
1855        assert!(
1856            result.is_ok(),
1857            "command should succeed on first server: {:?}",
1858            result.err()
1859        );
1860
1861        // Kill first server.
1862        shutdown1.cancel();
1863        tokio::time::sleep(Duration::from_millis(150)).await;
1864
1865        // Start second server at the same path.
1866        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1867        let factory: ServiceFactory<tauri::test::MockRuntime> =
1868            Box::new(|| Box::new(BlockingService));
1869        tokio::spawn(manager_loop(
1870            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1871        ));
1872        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1873        let shutdown2 = CancellationToken::new();
1874        let s2 = shutdown2.clone();
1875        tokio::spawn(async move { server2.run(s2).await });
1876
1877        // Client should reconnect within ~1s (backoff resets to min_delay after
1878        // a successful session, so the first retry is ~1s, not accumulated).
1879        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1880            loop {
1881                if handle.is_connected() {
1882                    break;
1883                }
1884                tokio::time::sleep(Duration::from_millis(100)).await;
1885            }
1886        })
1887        .await;
1888
1889        assert!(
1890            reconnected.is_ok(),
1891            "persistent client should reconnect after server restart (backoff resets)"
1892        );
1893
1894        // Verify commands work through the new connection.
1895        let result = handle.is_running().await;
1896        assert!(
1897            result.is_ok(),
1898            "commands should work after reconnection: {:?}",
1899            result.err()
1900        );
1901
1902        shutdown2.cancel();
1903    }
1904
1905    // ═══════════════════════════════════════════════════════════════════════
1906    //  C3: ZERO-LENGTH FRAME REJECTION TESTS
1907    // ═══════════════════════════════════════════════════════════════════════
1908
1909    /// Verify that receiving a zero-length frame (\x00\x00\x00\x00) from the
1910    /// server produces an error, not `Ok(None)`. Zero-length frames are
1911    /// protocol violations and must be rejected explicitly.
1912    #[tokio::test]
1913    async fn ipc_client_rejects_zero_length_frame() {
1914        let path = crate::desktop::test_helpers::unique_socket_path();
1915        let listener = transport::bind(path.clone()).unwrap();
1916
1917        // Server that sends a zero-length frame immediately after accepting.
1918        let server_handle = tokio::spawn(async move {
1919            let (mut stream, _) = listener.accept().await.unwrap();
1920            use tokio::io::AsyncWriteExt;
1921            stream.write_all(&[0u8; 4]).await.unwrap();
1922            tokio::time::sleep(Duration::from_millis(500)).await;
1923        });
1924
1925        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1926
1927        // Reading a frame should return an error, not Ok(None)
1928        let result = client.read_frame().await;
1929        assert!(
1930            result.is_err(),
1931            "zero-length frame should return error, got {:?}",
1932            result
1933        );
1934        let err = result.unwrap_err().to_string();
1935        assert!(
1936            err.contains("zero-length frame"),
1937            "Error should mention 'zero-length frame': {err}"
1938        );
1939
1940        server_handle.abort();
1941        let _ = std::fs::remove_file(&path);
1942    }
1943
1944    /// Verify that an actual EOF (connection drop) still returns `Ok(None)`.
1945    /// This is the "clean close" case — distinct from a zero-length frame.
1946    #[tokio::test]
1947    async fn ipc_client_eof_returns_ok_none() {
1948        let path = crate::desktop::test_helpers::unique_socket_path();
1949        let listener = transport::bind(path.clone()).unwrap();
1950
1951        // Server that accepts a connection then immediately drops it.
1952        let server_handle = tokio::spawn(async move {
1953            let (stream, _) = listener.accept().await.unwrap();
1954            drop(stream);
1955        });
1956
1957        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1958        tokio::time::sleep(Duration::from_millis(20)).await;
1959
1960        // Reading a frame should return Ok(None) for clean EOF
1961        let result = client.read_frame().await;
1962        assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1963        assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1964
1965        server_handle.abort();
1966        let _ = std::fs::remove_file(&path);
1967    }
1968
1969    // ═══════════════════════════════════════════════════════════════════════
1970    //  WAIT_FOR_CONNECTED TESTS (Step 12)
1971    // ═══════════════════════════════════════════════════════════════════════
1972
1973    /// wait_for_connected returns Ok immediately when already connected.
1974    #[tokio::test]
1975    async fn wait_for_connected_returns_immediately_when_connected() {
1976        let (path, shutdown, _event_tx) = setup_server();
1977        let app = tauri::test::mock_app();
1978        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1979
1980        // Wait for initial connection.
1981        tokio::time::timeout(Duration::from_secs(2), async {
1982            while !handle.is_connected() {
1983                tokio::time::sleep(Duration::from_millis(50)).await;
1984            }
1985        })
1986        .await
1987        .expect("should connect");
1988
1989        // Already connected → should return Ok immediately.
1990        let result = handle
1991            .wait_for_connected(Duration::from_secs(5))
1992            .await
1993            .unwrap();
1994        assert!(result, "should return true when connected");
1995
1996        shutdown.cancel();
1997    }
1998
1999    /// wait_for_connected returns Err(timeout) when no server is running.
2000    #[tokio::test]
2001    async fn wait_for_connected_times_out_when_no_server() {
2002        let app = tauri::test::mock_app();
2003        let path = crate::desktop::test_helpers::unique_socket_path();
2004        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
2005
2006        // No server → should time out quickly.
2007        let result = handle
2008            .wait_for_connected(Duration::from_millis(200))
2009            .await
2010            .unwrap();
2011        assert!(!result, "should return false when no server and timeout");
2012
2013        let _ = std::fs::remove_file(&path);
2014    }
2015
2016    /// wait_for_connected returns Ok once the server starts and client connects.
2017    #[tokio::test]
2018    async fn wait_for_connected_succeeds_after_server_starts() {
2019        let (path, shutdown, _event_tx) = setup_server();
2020        let app = tauri::test::mock_app();
2021        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
2022
2023        // Server is running — wait_for_connected should succeed within timeout.
2024        let result = handle
2025            .wait_for_connected(Duration::from_secs(5))
2026            .await
2027            .unwrap();
2028        assert!(result, "should connect within timeout");
2029
2030        shutdown.cancel();
2031    }
2032
2033    /// send_and_read collects multiple consecutive events before the response.
2034    #[tokio::test]
2035    async fn send_and_read_multiple_interleaved_events() {
2036        let path = crate::desktop::test_helpers::unique_socket_path();
2037        let server = buffered_server(
2038            &path,
2039            vec![
2040                IpcMessage::Event(IpcEvent::Started),
2041                IpcMessage::Event(IpcEvent::Error {
2042                    message: "warning".into(),
2043                }),
2044                IpcMessage::Event(IpcEvent::Stopped {
2045                    reason: StopReason::UserStop,
2046                }),
2047                IpcMessage::Response(IpcResponse {
2048                    ok: true,
2049                    data: Some(serde_json::json!({"running": false})),
2050                    error: None,
2051                }),
2052            ],
2053        )
2054        .await;
2055
2056        let mut client = IpcClient::connect(path.clone()).await.unwrap();
2057        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
2058        assert!(response.ok, "response should be ok");
2059        assert_eq!(events.len(), 3, "should collect all three events");
2060        assert!(
2061            matches!(events[0], IpcEvent::Started),
2062            "first event should be Started"
2063        );
2064        assert!(
2065            matches!(events[1], IpcEvent::Error { .. }),
2066            "second event should be Error"
2067        );
2068        assert!(
2069            matches!(events[2], IpcEvent::Stopped { .. }),
2070            "third event should be Stopped"
2071        );
2072
2073        server.await.unwrap();
2074        let _ = std::fs::remove_file(&path);
2075    }
2076}