Skip to main content

tauri_plugin_background_service/desktop/
ipc_client.rs

1//! Desktop IPC client for the GUI process.
2//!
3//! [`IpcClient`] connects to the headless sidecar's Unix domain socket and
4//! provides methods to start/stop the background service and receive events
5//! over the IPC protocol.
6//!
7//! Only available when the `desktop-service` Cargo feature is enabled.
8
9use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17    decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21use crate::models::{PluginEvent, ServiceStatus, StartConfig};
22
23/// IPC client for communicating with the headless sidecar service.
24///
25/// Connects to the sidecar's Unix domain socket and translates method calls
26/// into [`IpcRequest`] messages. Responses are decoded from [`IpcResponse`]
27/// frames.
28///
29/// Events from the sidecar (started/stopped/error) are read as [`IpcEvent`]
30/// frames and converted to [`PluginEvent`] for emission via the Tauri event
31/// system.
32pub struct IpcClient {
33    stream: TransportStream,
34}
35
36impl IpcClient {
37    /// Connect to the sidecar's IPC socket at the given path.
38    pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
39        let stream = transport::connect(&path).await?;
40        Ok(Self { stream })
41    }
42
43    /// Send a Start command to the sidecar.
44    pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45        let request = IpcRequest::Start { config };
46        let (response, _events) = self.send_and_read(&request).await?;
47        if response.ok {
48            Ok(())
49        } else {
50            Err(ServiceError::Ipc(
51                response.error.unwrap_or_else(|| "unknown error".into()),
52            ))
53        }
54    }
55
56    /// Send a Stop command to the sidecar.
57    pub async fn stop(&mut self) -> Result<(), ServiceError> {
58        let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
59        if response.ok {
60            Ok(())
61        } else {
62            Err(ServiceError::Ipc(
63                response.error.unwrap_or_else(|| "unknown error".into()),
64            ))
65        }
66    }
67
68    /// Send an IsRunning query to the sidecar.
69    pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70        let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
71        if response.ok {
72            Ok(response
73                .data
74                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75                .unwrap_or(false))
76        } else {
77            Err(ServiceError::Ipc(
78                response.error.unwrap_or_else(|| "unknown error".into()),
79            ))
80        }
81    }
82
83    /// Query the current service lifecycle state.
84    pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
85        let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
86        if response.ok {
87            response
88                .data
89                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
90                .and_then(|d| {
91                    serde_json::from_value::<ServiceStatus>(d)
92                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
93                })
94        } else {
95            Err(ServiceError::Ipc(
96                response.error.unwrap_or_else(|| "unknown error".into()),
97            ))
98        }
99    }
100
101    /// Read the next [`IpcEvent`] from the socket.
102    ///
103    /// Returns `None` if the connection was closed.
104    pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
105        let frame = match self.read_frame().await? {
106            Some(f) => f,
107            None => return Ok(None),
108        };
109        match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
110            IpcMessage::Event(event) => Ok(Some(event)),
111            other => Err(ServiceError::Ipc(format!(
112                "expected event frame, got {:?}",
113                std::mem::discriminant(&other),
114            ))),
115        }
116    }
117
118    /// Spawn a background task that reads [`IpcEvent`] frames and emits
119    /// [`PluginEvent`] via the given `AppHandle`.
120    ///
121    /// The task runs until the socket is closed or an error occurs.
122    pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
123        tokio::spawn(async move {
124            loop {
125                match self.read_event().await {
126                    Ok(Some(event)) => {
127                        let plugin_event = ipc_event_to_plugin_event(event);
128                        let _ = app.emit("background-service://event", plugin_event);
129                    }
130                    Ok(None) => break,
131                    Err(_) => break,
132                }
133            }
134        });
135    }
136
137    // -- Private helpers -------------------------------------------------------
138
139    async fn send_and_read(
140        &mut self,
141        request: &IpcRequest,
142    ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
143        self.send_request(request).await?;
144        // The server interleaves IpcResponse and broadcast IpcEvent frames on
145        // the same socket. Read frames in a loop until we get a Response,
146        // collecting any Event frames encountered along the way.
147        let mut events = Vec::new();
148        loop {
149            let frame = self
150                .read_frame()
151                .await?
152                .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
153            match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
154                IpcMessage::Response(resp) => return Ok((resp, events)),
155                IpcMessage::Event(e) => {
156                    events.push(e);
157                }
158                IpcMessage::Request(_) => {
159                    return Err(ServiceError::Ipc("unexpected request frame".into()));
160                }
161            }
162        }
163    }
164
165    async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
166        let msg = IpcMessage::Request(request.clone());
167        let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
168        transport::write_frame(&mut self.stream, &frame)
169            .await
170            .map_err(ServiceError::Ipc)?;
171        Ok(())
172    }
173
174    /// Read a single length-prefixed frame from the socket.
175    ///
176    /// Returns the payload bytes only (no length prefix).
177    /// Returns `None` if the connection was closed cleanly.
178    async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
179        transport::read_frame(&mut self.stream)
180            .await
181            .map_err(ServiceError::Ipc)
182    }
183}
184
185/// Convert an [`IpcEvent`] to a [`PluginEvent`].
186pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
187    match event {
188        IpcEvent::Started => PluginEvent::Started,
189        IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
190        IpcEvent::Error { message } => PluginEvent::Error { message },
191    }
192}
193
194// ─── Persistent IPC Client ────────────────────────────────────────────────────
195
196/// Internal command sent from the handle to the background connection task.
197enum IpcCommand {
198    Start {
199        config: StartConfig,
200        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
201    },
202    Stop {
203        reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
204    },
205    IsRunning {
206        reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
207    },
208    GetState {
209        reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
210    },
211}
212
213/// Handle to a persistent IPC client that maintains a long-lived connection
214/// to the headless sidecar.
215///
216/// The background task automatically:
217/// - Relays [`IpcEvent`] frames to `app.emit("background-service://event", ...)`
218/// - Reconnects on connection failure with exponential backoff (1s–30s, up to 10 retries)
219/// - Forwards commands (start/stop/is_running) over the same connection
220pub struct PersistentIpcClientHandle {
221    cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
222    shutdown: tokio_util::sync::CancellationToken,
223    connected: Arc<AtomicBool>,
224}
225
226impl Drop for PersistentIpcClientHandle {
227    fn drop(&mut self) {
228        self.shutdown.cancel();
229    }
230}
231
232impl PersistentIpcClientHandle {
233    /// Spawn the persistent IPC client background task.
234    ///
235    /// The task immediately begins trying to connect to the socket at
236    /// `socket_path`. Events are relayed to the Tauri event system via
237    /// `app.emit()`.
238    pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
239        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
240        let shutdown = tokio_util::sync::CancellationToken::new();
241        let connected = Arc::new(AtomicBool::new(false));
242
243        tokio::spawn(persistent_client_loop(
244            socket_path,
245            app,
246            cmd_rx,
247            shutdown.clone(),
248            connected.clone(),
249        ));
250
251        Self {
252            cmd_tx,
253            shutdown,
254            connected,
255        }
256    }
257
258    /// Send a Start command through the persistent connection.
259    pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
260        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
261        self.cmd_tx
262            .send(IpcCommand::Start {
263                config,
264                reply: reply_tx,
265            })
266            .await
267            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
268        reply_rx
269            .await
270            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
271    }
272
273    /// Send a Stop command through the persistent connection.
274    pub async fn stop(&self) -> Result<(), ServiceError> {
275        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
276        self.cmd_tx
277            .send(IpcCommand::Stop { reply: reply_tx })
278            .await
279            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
280        reply_rx
281            .await
282            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
283    }
284
285    /// Query whether the service is running through the persistent connection.
286    pub async fn is_running(&self) -> Result<bool, ServiceError> {
287        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
288        self.cmd_tx
289            .send(IpcCommand::IsRunning { reply: reply_tx })
290            .await
291            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
292        reply_rx
293            .await
294            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
295    }
296
297    /// Query the current service lifecycle state through the persistent connection.
298    pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
299        let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
300        self.cmd_tx
301            .send(IpcCommand::GetState { reply: reply_tx })
302            .await
303            .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
304        reply_rx
305            .await
306            .map_err(|_| ServiceError::Ipc("command dropped".into()))?
307    }
308
309    /// Returns `true` if the persistent client is currently connected to the
310    /// headless sidecar, `false` otherwise.
311    pub fn is_connected(&self) -> bool {
312        self.connected.load(std::sync::atomic::Ordering::Relaxed)
313    }
314}
315
316/// Background task: maintain a persistent connection with reconnection.
317async fn persistent_client_loop<R: Runtime>(
318    socket_path: PathBuf,
319    app: tauri::AppHandle<R>,
320    mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
321    shutdown: tokio_util::sync::CancellationToken,
322    connected: Arc<AtomicBool>,
323) {
324    use backon::BackoffBuilder;
325
326    let backoff_builder = backon::ExponentialBuilder::default()
327        .with_min_delay(Duration::from_secs(1))
328        .with_max_delay(Duration::from_secs(30))
329        .with_max_times(10)
330        .with_jitter();
331
332    let mut attempts = backoff_builder.build();
333
334    loop {
335        tokio::select! {
336            biased;
337            _ = shutdown.cancelled() => {
338                log::info!("Persistent IPC client shutting down");
339                connected.store(false, std::sync::atomic::Ordering::Relaxed);
340                break;
341            }
342            connect_result = transport::connect(&socket_path) => {
343                match connect_result {
344                    Ok(stream) => {
345                        log::info!("Persistent IPC client connected");
346                        connected.store(true, std::sync::atomic::Ordering::Relaxed);
347                        let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
348                        // Reset backoff on successful connect (even if session later failed).
349                        attempts = backoff_builder.build();
350                        if result.is_err() {
351                            log::info!("Persistent IPC connection lost, reconnecting...");
352                            connected.store(false, std::sync::atomic::Ordering::Relaxed);
353                        }
354                    }
355                    Err(_) => {
356                        log::debug!("Persistent IPC client: connection failed, retrying...");
357                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
358                    }
359                }
360                let delay = match attempts.next() {
361                    Some(d) => d,
362                    None => {
363                        log::warn!("Persistent IPC client: backoff exhausted, giving up");
364                        break;
365                    }
366                };
367                tokio::select! {
368                    biased;
369                    _ = shutdown.cancelled() => {
370                        log::info!("Persistent IPC client shutting down");
371                        connected.store(false, std::sync::atomic::Ordering::Relaxed);
372                        break;
373                    }
374                    _ = tokio::time::sleep(delay) => {}
375                }
376            }
377        }
378    }
379}
380
381/// Run a single persistent connection until it fails.
382///
383/// Splits the stream into read/write halves:
384/// - A reader task continuously reads frames and relays events to `app.emit()`.
385///   When a response frame arrives, it forwards it via a shared oneshot channel.
386/// - The main loop receives commands from `cmd_rx` and sends requests.
387async fn run_persistent_connection<R: Runtime>(
388    stream: TransportStream,
389    app: &tauri::AppHandle<R>,
390    cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
391    connected: &Arc<AtomicBool>,
392) -> Result<(), ServiceError> {
393    let (read_half, mut write_half) = transport::split(stream);
394
395    // Shared slot for the reader task to deliver response frames.
396    let response_slot: std::sync::Arc<
397        tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
398    > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
399
400    let slot_writer = response_slot.clone();
401    let app_clone = app.clone();
402    let connected_reader = connected.clone();
403
404    // Reader task: reads frames and either relays events or delivers responses.
405    let reader_handle = tokio::spawn(async move {
406        let mut read_half = read_half;
407        loop {
408            let frame = match read_frame_from(&mut read_half).await {
409                Ok(Some(f)) => f,
410                Ok(None) => break, // Connection closed
411                Err(_) => break,
412            };
413
414            match decode_frame(&frame) {
415                Ok(IpcMessage::Response(resp)) => {
416                    let mut slot = slot_writer.lock().await;
417                    if let Some(sender) = slot.take() {
418                        let _ = sender.send(resp);
419                    }
420                    continue;
421                }
422                Ok(IpcMessage::Event(event)) => {
423                    let plugin_event = ipc_event_to_plugin_event(event);
424                    let _ = app_clone.emit("background-service://event", plugin_event);
425                    continue;
426                }
427                Ok(IpcMessage::Request(_)) => {
428                    log::warn!("unexpected request frame on client connection");
429                    continue;
430                }
431                Err(e) => {
432                    log::debug!("failed to decode IPC frame: {e}");
433                    continue;
434                }
435            }
436        }
437        // Reader exited — mark disconnected.
438        connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
439    });
440
441    // Main loop: receive commands, send requests, wait for responses.
442    let result = loop {
443        tokio::select! {
444            cmd = cmd_rx.recv() => {
445                let cmd = match cmd {
446                    Some(c) => c,
447                    None => break Err(ServiceError::Ipc("command channel closed".into())),
448                };
449
450                match cmd {
451                    IpcCommand::Start { config, reply } => {
452                        let request = IpcRequest::Start { config };
453                        let rx = prepare_response_slot(&response_slot).await;
454                        if let Err(e) = send_request_to(&mut write_half, &request).await {
455                            let _ = reply.send(Err(e));
456                            break Err(ServiceError::Ipc("send failed".into()));
457                        }
458                        let response = await_response(rx).await;
459                        let result = match response {
460                            Ok(resp) if resp.ok => Ok(()),
461                            Ok(resp) => Err(ServiceError::Ipc(
462                                resp.error.unwrap_or_else(|| "unknown error".into()),
463                            )),
464                            Err(e) => Err(e),
465                        };
466                        let _ = reply.send(result);
467                    }
468                    IpcCommand::Stop { reply } => {
469                        let rx = prepare_response_slot(&response_slot).await;
470                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
471                            let _ = reply.send(Err(e));
472                            break Err(ServiceError::Ipc("send failed".into()));
473                        }
474                        let response = await_response(rx).await;
475                        let result = match response {
476                            Ok(resp) if resp.ok => Ok(()),
477                            Ok(resp) => Err(ServiceError::Ipc(
478                                resp.error.unwrap_or_else(|| "unknown error".into()),
479                            )),
480                            Err(e) => Err(e),
481                        };
482                        let _ = reply.send(result);
483                    }
484                    IpcCommand::IsRunning { reply } => {
485                        let rx = prepare_response_slot(&response_slot).await;
486                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
487                            let _ = reply.send(Err(e));
488                            break Err(ServiceError::Ipc("send failed".into()));
489                        }
490                        let response = await_response(rx).await;
491                        let result = match response {
492                            Ok(resp) if resp.ok => Ok(resp
493                                .data
494                                .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
495                                .unwrap_or(false)),
496                            Ok(resp) => Err(ServiceError::Ipc(
497                                resp.error.unwrap_or_else(|| "unknown error".into()),
498                            )),
499                            Err(e) => Err(e),
500                        };
501                        let _ = reply.send(result);
502                    }
503                    IpcCommand::GetState { reply } => {
504                        let rx = prepare_response_slot(&response_slot).await;
505                        if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
506                            let _ = reply.send(Err(e));
507                            break Err(ServiceError::Ipc("send failed".into()));
508                        }
509                        let response = await_response(rx).await;
510                        let result = match response {
511                            Ok(resp) if resp.ok => resp
512                                .data
513                                .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
514                                .and_then(|d| {
515                                    serde_json::from_value::<ServiceStatus>(d)
516                                        .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
517                                }),
518                            Ok(resp) => Err(ServiceError::Ipc(
519                                resp.error.unwrap_or_else(|| "unknown error".into()),
520                            )),
521                            Err(e) => Err(e),
522                        };
523                        let _ = reply.send(result);
524                    }
525                }
526            }
527            _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
528                // Timeout — check if reader is still alive
529                if reader_handle.is_finished() {
530                    break Err(ServiceError::Ipc("reader task died".into()));
531                }
532            }
533        }
534    };
535
536    reader_handle.abort();
537    result
538}
539
540/// Send an IPC request frame through a write half.
541async fn send_request_to(
542    write_half: &mut TransportWriteHalf,
543    request: &IpcRequest,
544) -> Result<(), ServiceError> {
545    let msg = IpcMessage::Request(request.clone());
546    let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
547    transport::write_frame(write_half, &frame)
548        .await
549        .map_err(ServiceError::Ipc)?;
550    Ok(())
551}
552
553/// Prepare the shared response slot for an upcoming request.
554///
555/// Creates a oneshot channel and stores the sender in `slot` so the reader
556/// task can deliver the next response. Returns the receiver end.
557///
558/// Must be called **before** sending the request to prevent losing fast
559/// responses that arrive before the slot is set.
560async fn prepare_response_slot(
561    slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
562) -> tokio::sync::oneshot::Receiver<IpcResponse> {
563    let (tx, rx) = tokio::sync::oneshot::channel();
564    let mut guard = slot.lock().await;
565    debug_assert!(
566        guard.is_none(),
567        "response slot overwritten — sequential command invariant violated"
568    );
569    *guard = Some(tx);
570    rx
571}
572
573/// Await a response from the reader task with a timeout.
574///
575/// Returns `Err` if the response doesn't arrive within 10 seconds, preventing
576/// permanent hangs when the connection drops during command processing.
577async fn await_response(
578    rx: tokio::sync::oneshot::Receiver<IpcResponse>,
579) -> Result<IpcResponse, ServiceError> {
580    tokio::select! {
581        response = rx => {
582            response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
583        }
584        _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
585            Err(ServiceError::Ipc("response timeout".into()))
586        }
587    }
588}
589
590/// Read a single length-prefixed frame from a read half.
591///
592/// Returns the payload bytes only (no length prefix).
593async fn read_frame_from(
594    read_half: &mut TransportReadHalf,
595) -> Result<Option<Vec<u8>>, ServiceError> {
596    transport::read_frame(read_half)
597        .await
598        .map_err(ServiceError::Ipc)
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604    use crate::desktop::test_helpers::{
605        setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
606    };
607    use std::sync::atomic::Ordering;
608    use std::time::Duration;
609    use tauri::Listener;
610
611    // -- AC1: Client connects ---------------------------------------------------
612
613    #[tokio::test]
614    async fn ipc_client_connect() {
615        let (path, shutdown, _event_tx) = setup_server();
616        let result = IpcClient::connect(path).await;
617        assert!(result.is_ok(), "client should connect: {:?}", result.err());
618        shutdown.cancel();
619    }
620
621    // -- AC2: Start command works -----------------------------------------------
622
623    #[tokio::test]
624    async fn ipc_client_send_start() {
625        let (path, shutdown, _event_tx) = setup_server();
626        let mut client = IpcClient::connect(path).await.unwrap();
627        let result = client.start(StartConfig::default()).await;
628        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
629        shutdown.cancel();
630    }
631
632    // -- AC3: Stop command works ------------------------------------------------
633
634    #[tokio::test]
635    async fn ipc_client_send_stop() {
636        let (path, shutdown, _event_tx) = setup_server();
637        let mut client = IpcClient::connect(path).await.unwrap();
638        client.start(StartConfig::default()).await.unwrap();
639        let result = client.stop().await;
640        assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
641        shutdown.cancel();
642    }
643
644    // -- AC4: IsRunning returns status ------------------------------------------
645
646    #[tokio::test]
647    async fn ipc_client_is_running() {
648        let (path, shutdown, _event_tx) = setup_server();
649        let mut client = IpcClient::connect(path).await.unwrap();
650
651        let running = client.is_running().await.unwrap();
652        assert!(!running, "should not be running initially");
653
654        client.start(StartConfig::default()).await.unwrap();
655        let running = client.is_running().await.unwrap();
656        assert!(running, "should be running after start");
657
658        shutdown.cancel();
659    }
660
661    // -- GetState returns ServiceStatus ------------------------------------------
662
663    #[tokio::test]
664    async fn ipc_client_get_state_initial() {
665        let (path, shutdown, _event_tx) = setup_server();
666        let mut client = IpcClient::connect(path).await.unwrap();
667
668        let status = client.get_state().await.unwrap();
669        assert!(
670            matches!(status.state, crate::models::ServiceState::Idle),
671            "expected Idle, got {:?}",
672            status.state
673        );
674        assert_eq!(status.last_error, None);
675
676        shutdown.cancel();
677    }
678
679    #[tokio::test]
680    async fn ipc_client_get_state_after_start() {
681        let (path, shutdown, _event_tx) = setup_server();
682        let mut client = IpcClient::connect(path).await.unwrap();
683
684        client.start(StartConfig::default()).await.unwrap();
685
686        // Poll until Running — Start replies at Initializing, spawned task
687        // transitions to Running asynchronously.
688        let status = tokio::time::timeout(Duration::from_secs(2), async {
689            loop {
690                let s = client.get_state().await.unwrap();
691                if matches!(s.state, crate::models::ServiceState::Running) {
692                    return s;
693                }
694                tokio::time::sleep(Duration::from_millis(10)).await;
695            }
696        })
697        .await
698        .expect("timed out waiting for Running state");
699        assert_eq!(status.last_error, None);
700
701        shutdown.cancel();
702    }
703
704    #[tokio::test]
705    async fn ipc_client_get_state_after_stop() {
706        let (path, shutdown, _event_tx) = setup_server();
707        let mut client = IpcClient::connect(path).await.unwrap();
708
709        client.start(StartConfig::default()).await.unwrap();
710        client.stop().await.unwrap();
711        let status = client.get_state().await.unwrap();
712        assert!(
713            matches!(status.state, crate::models::ServiceState::Stopped),
714            "expected Stopped, got {:?}",
715            status.state
716        );
717
718        shutdown.cancel();
719    }
720
721    // -- AC5: Events are received -----------------------------------------------
722
723    #[tokio::test]
724    async fn ipc_client_receive_events() {
725        let (path, shutdown, event_tx) =
726            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
727        let mut client = IpcClient::connect(path).await.unwrap();
728        client.start(StartConfig::default()).await.unwrap();
729
730        // Simulate relay broadcasting Started
731        let _ = event_tx.send(IpcEvent::Started);
732
733        let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
734            .await
735            .expect("timed out waiting for event")
736            .expect("read_event failed");
737
738        assert!(event.is_some(), "should receive an event");
739        let event = event.unwrap();
740        assert!(
741            matches!(event, IpcEvent::Started),
742            "Expected Started event, got {:?}",
743            event
744        );
745
746        shutdown.cancel();
747    }
748
749    // -- Additional: Stop when not running returns error -------------------------
750
751    #[tokio::test]
752    async fn ipc_client_stop_when_not_running() {
753        let (path, shutdown, _event_tx) = setup_server();
754        let mut client = IpcClient::connect(path).await.unwrap();
755        let result = client.stop().await;
756        assert!(result.is_err(), "stop when not running should fail");
757        shutdown.cancel();
758    }
759
760    // -- Additional: Connect to nonexistent socket fails -------------------------
761
762    #[tokio::test]
763    async fn ipc_client_connect_to_nonexistent() {
764        let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
765        let result = IpcClient::connect(path).await;
766        assert!(
767            result.is_err(),
768            "should fail to connect to nonexistent socket"
769        );
770    }
771
772    // -- Additional: ipc_event_to_plugin_event conversion -----------------------
773
774    #[test]
775    fn ipc_event_to_plugin_event_started() {
776        let event = IpcEvent::Started;
777        let plugin = ipc_event_to_plugin_event(event);
778        assert!(matches!(plugin, PluginEvent::Started));
779    }
780
781    #[test]
782    fn ipc_event_to_plugin_event_stopped() {
783        let event = IpcEvent::Stopped {
784            reason: "cancelled".into(),
785        };
786        let plugin = ipc_event_to_plugin_event(event);
787        match plugin {
788            PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
789            other => panic!("Expected Stopped, got {other:?}"),
790        }
791    }
792
793    #[test]
794    fn ipc_event_to_plugin_event_error() {
795        let event = IpcEvent::Error {
796            message: "init failed".into(),
797        };
798        let plugin = ipc_event_to_plugin_event(event);
799        match plugin {
800            PluginEvent::Error { message } => assert_eq!(message, "init failed"),
801            other => panic!("Expected Error, got {other:?}"),
802        }
803    }
804
805    // -- Additional: Full lifecycle ---------------------------------------------
806
807    #[tokio::test]
808    async fn ipc_client_full_lifecycle() {
809        let (path, shutdown, _event_tx) = setup_server();
810        let mut client = IpcClient::connect(path).await.unwrap();
811
812        assert!(!client.is_running().await.unwrap());
813        client.start(StartConfig::default()).await.unwrap();
814        assert!(client.is_running().await.unwrap());
815        client.stop().await.unwrap();
816        assert!(!client.is_running().await.unwrap());
817
818        shutdown.cancel();
819    }
820
821    // -- Additional: listen_events spawns and converts events -------------------
822
823    #[tokio::test]
824    async fn ipc_client_listen_events() {
825        let (path, shutdown, event_tx) =
826            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
827        let app = tauri::test::mock_app();
828
829        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
830        let received_clone = received.clone();
831        app.listen("background-service://event", move |_event| {
832            received_clone.store(true, Ordering::SeqCst);
833        });
834
835        let mut client = IpcClient::connect(path).await.unwrap();
836        client.start(StartConfig::default()).await.unwrap();
837        client.listen_events(app.handle().clone());
838
839        // Simulate relay broadcasting Started
840        let _ = event_tx.send(IpcEvent::Started);
841
842        tokio::time::timeout(Duration::from_millis(500), async {
843            while !received.load(Ordering::SeqCst) {
844                tokio::time::sleep(Duration::from_millis(10)).await;
845            }
846        })
847        .await
848        .expect("timed out waiting for event via listen_events");
849
850        assert!(
851            received.load(Ordering::SeqCst),
852            "should have received event"
853        );
854        shutdown.cancel();
855    }
856
857    // ═══════════════════════════════════════════════════════════════════════
858    //  IPC LOOPBACK TESTS (Step 20 — AC2, AC3, AC4)
859    // ═══════════════════════════════════════════════════════════════════════
860
861    // -- AC2: IPC loopback full lifecycle with event verification ---------------
862
863    /// Comprehensive IPC loopback: IpcServer + IpcClient in the same process.
864    /// Exercises start → Started event → running → stop → Stopped event → stopped.
865    ///
866    /// Note: IpcEvent frames must be read BEFORE other requests because
867    /// `send_and_read` skips event frames looking for IpcResponse.
868    #[tokio::test]
869    async fn ipc_loopback_full_lifecycle_with_events() {
870        let (path, shutdown, event_tx) = setup_server();
871        let mut client = IpcClient::connect(path).await.unwrap();
872
873        // Initially not running
874        assert!(
875            !client.is_running().await.unwrap(),
876            "should not be running initially"
877        );
878
879        // Start the service
880        client
881            .start(StartConfig::default())
882            .await
883            .expect("start should succeed");
884
885        // Simulate relay broadcasting Started
886        let _ = event_tx.send(IpcEvent::Started);
887
888        // Read the Started event BEFORE any other request
889        // (send_and_read on subsequent calls would skip buffered events)
890        let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
891            .await
892            .expect("timed out waiting for Started event")
893            .expect("read_event failed")
894            .expect("should receive event");
895        assert!(
896            matches!(started, IpcEvent::Started),
897            "Expected Started event, got {started:?}"
898        );
899
900        // Verify running (after consuming the event)
901        assert!(
902            client.is_running().await.unwrap(),
903            "should be running after start"
904        );
905
906        // Stop the service
907        client.stop().await.expect("stop should succeed");
908
909        // Simulate relay broadcasting Stopped
910        let _ = event_tx.send(IpcEvent::Stopped {
911            reason: "cancelled".into(),
912        });
913
914        // Read the Stopped event BEFORE any other request
915        let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
916            .await
917            .expect("timed out waiting for Stopped event")
918            .expect("read_event failed")
919            .expect("should receive event");
920        assert!(
921            matches!(stopped, IpcEvent::Stopped { .. }),
922            "Expected Stopped event, got {stopped:?}"
923        );
924
925        // Verify not running
926        assert!(
927            !client.is_running().await.unwrap(),
928            "should not be running after stop"
929        );
930
931        shutdown.cancel();
932    }
933
934    // -- AC3: Event streaming converts IpcEvent to PluginEvent -------------------
935
936    /// Verify events streamed through IPC are correctly converted to PluginEvent.
937    #[tokio::test]
938    async fn ipc_loopback_event_streaming_plugin_event_conversion() {
939        let (path, shutdown, event_tx) = setup_server();
940        let mut client = IpcClient::connect(path).await.unwrap();
941
942        // Start — simulate relay broadcasting Started
943        client.start(StartConfig::default()).await.unwrap();
944        let _ = event_tx.send(IpcEvent::Started);
945        let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
946            .await
947            .expect("timed out")
948            .expect("read_event failed")
949            .expect("should receive event");
950        let started_plugin = ipc_event_to_plugin_event(started_ipc);
951        assert!(
952            matches!(started_plugin, PluginEvent::Started),
953            "Expected PluginEvent::Started, got {started_plugin:?}"
954        );
955
956        // Stop — simulate relay broadcasting Stopped
957        client.stop().await.unwrap();
958        let _ = event_tx.send(IpcEvent::Stopped {
959            reason: "cancelled".into(),
960        });
961        let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
962            .await
963            .expect("timed out")
964            .expect("read_event failed")
965            .expect("should receive event");
966        let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
967        match stopped_plugin {
968            PluginEvent::Stopped { reason } => {
969                assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
970            }
971            other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
972        }
973
974        shutdown.cancel();
975    }
976
977    // -- AC4: Error handling — connection drop detected by client ---------------
978
979    /// Verify client detects a dropped connection gracefully (no panic).
980    /// Simulates the server side closing the socket mid-connection.
981    #[tokio::test]
982    async fn ipc_loopback_connection_drop_returns_error() {
983        let path = crate::desktop::test_helpers::unique_socket_path();
984
985        // Create a minimal "server" that accepts one connection then drops it.
986        let listener = transport::bind(path.clone()).unwrap();
987        let path_clone = path.clone();
988
989        let client_handle =
990            tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
991
992        // Accept the connection and immediately drop the server-side stream.
993        let (server_stream, _) = listener.accept().await.unwrap();
994        drop(server_stream);
995        tokio::time::sleep(Duration::from_millis(20)).await;
996
997        let mut client = client_handle.await.unwrap();
998
999        // Client should detect the closed connection on next operation.
1000        let result = client.is_running().await;
1001        assert!(
1002            result.is_err(),
1003            "should get error after server drops connection"
1004        );
1005
1006        let _ = std::fs::remove_file(&path);
1007    }
1008
1009    // -- AC4: Error handling — double start returns error through IPC ------------
1010
1011    /// Verify second start (when already running) returns an IPC error.
1012    #[tokio::test]
1013    async fn ipc_loopback_double_start_returns_error() {
1014        let (path, shutdown, _event_tx) = setup_server();
1015        let mut client = IpcClient::connect(path).await.unwrap();
1016
1017        client.start(StartConfig::default()).await.unwrap();
1018
1019        let result = client.start(StartConfig::default()).await;
1020        assert!(result.is_err(), "double start should return error");
1021        let err_msg = result.unwrap_err().to_string();
1022        assert!(
1023            err_msg.to_lowercase().contains("already"),
1024            "Error should mention 'already': {err_msg}"
1025        );
1026
1027        shutdown.cancel();
1028    }
1029
1030    // ═══════════════════════════════════════════════════════════════════════
1031    //  PERSISTENT IPC CLIENT TESTS (Step 12)
1032    // ═══════════════════════════════════════════════════════════════════════
1033
1034    // -- AC1: Persistent client connects and maintains connection --
1035
1036    /// Verify the persistent client connects to a running server and can
1037    /// forward commands through the persistent connection.
1038    #[tokio::test]
1039    async fn persistent_client_connects() {
1040        let (path, shutdown, _event_tx) = setup_server();
1041        let app = tauri::test::mock_app();
1042
1043        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1044
1045        // Give the background task time to connect.
1046        tokio::time::sleep(Duration::from_millis(100)).await;
1047
1048        // Send a command through the persistent connection.
1049        let running = handle.is_running().await;
1050        assert!(
1051            running.is_ok(),
1052            "should get response via persistent connection: {:?}",
1053            running.err()
1054        );
1055        assert!(!running.unwrap(), "should not be running initially");
1056
1057        shutdown.cancel();
1058    }
1059
1060    // -- AC3: Auto-reconnect --
1061
1062    /// Verify the persistent client reconnects after the server restarts.
1063    #[tokio::test]
1064    async fn persistent_client_reconnects() {
1065        use crate::desktop::ipc_server::IpcServer;
1066        use crate::manager::{manager_loop, ServiceFactory};
1067        use tokio_util::sync::CancellationToken;
1068
1069        // First server
1070        let (path, shutdown1, _event_tx) = setup_server();
1071        let app = tauri::test::mock_app();
1072
1073        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1074
1075        // Verify connected to first server.
1076        tokio::time::sleep(Duration::from_millis(100)).await;
1077        let result = handle.is_running().await;
1078        assert!(
1079            result.is_ok(),
1080            "should connect to first server: {:?}",
1081            result.err()
1082        );
1083
1084        // Kill first server and wait for socket cleanup.
1085        shutdown1.cancel();
1086        tokio::time::sleep(Duration::from_millis(150)).await;
1087
1088        // Start second server at the same path.
1089        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1090        let factory: ServiceFactory<tauri::test::MockRuntime> =
1091            Box::new(|| Box::new(BlockingService));
1092        tokio::spawn(manager_loop(
1093            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false,
1094        ));
1095        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1096        let shutdown2 = CancellationToken::new();
1097        let s2 = shutdown2.clone();
1098        tokio::spawn(async move { server2.run(s2).await });
1099
1100        // Wait for the client to reconnect (1s reconnect delay + margin).
1101        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1102            loop {
1103                tokio::time::sleep(Duration::from_millis(200)).await;
1104                if handle.is_running().await.is_ok() {
1105                    break;
1106                }
1107            }
1108        })
1109        .await;
1110        assert!(
1111            reconnected.is_ok(),
1112            "persistent client should reconnect to second server"
1113        );
1114
1115        shutdown2.cancel();
1116    }
1117
1118    // -- AC2: Event relay via app.emit() --
1119
1120    /// Verify events from the server are relayed to `app.emit()` by the
1121    /// persistent client's background reader task.
1122    #[tokio::test]
1123    async fn event_relay() {
1124        let (path, shutdown, event_tx) =
1125            setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1126        let app = tauri::test::mock_app();
1127
1128        let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1129        let received_clone = received.clone();
1130        app.listen("background-service://event", move |_event| {
1131            received_clone.store(true, Ordering::SeqCst);
1132        });
1133
1134        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1135
1136        // Start the service — the reader task should relay the Started event.
1137        let result = handle.start(StartConfig::default()).await;
1138        assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1139
1140        // Simulate relay broadcasting Started
1141        let _ = event_tx.send(IpcEvent::Started);
1142
1143        // Wait for the event to be relayed via app.emit().
1144        tokio::time::timeout(Duration::from_millis(500), async {
1145            while !received.load(Ordering::SeqCst) {
1146                tokio::time::sleep(Duration::from_millis(10)).await;
1147            }
1148        })
1149        .await
1150        .expect("timed out waiting for event relay via app.emit()");
1151
1152        assert!(
1153            received.load(Ordering::SeqCst),
1154            "event should be relayed through app.emit()"
1155        );
1156
1157        shutdown.cancel();
1158    }
1159
1160    // -- AC4: Start/Stop lifecycle through persistent client --
1161
1162    /// Verify the full start → running → stop → not-running lifecycle works
1163    /// through the persistent IPC client.
1164    #[tokio::test]
1165    async fn start_stop_lifecycle() {
1166        let (path, shutdown, _event_tx) = setup_server();
1167        let app = tauri::test::mock_app();
1168
1169        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1170
1171        // Initially not running.
1172        let running = handle.is_running().await.unwrap();
1173        assert!(!running, "should not be running initially");
1174
1175        // Start.
1176        handle
1177            .start(StartConfig::default())
1178            .await
1179            .expect("start should succeed");
1180        let running = handle.is_running().await.unwrap();
1181        assert!(running, "should be running after start");
1182
1183        // Stop.
1184        handle.stop().await.expect("stop should succeed");
1185        let running = handle.is_running().await.unwrap();
1186        assert!(!running, "should not be running after stop");
1187
1188        shutdown.cancel();
1189    }
1190
1191    // -- GetState through persistent client --
1192
1193    #[tokio::test]
1194    async fn persistent_client_get_state() {
1195        let (path, shutdown, _event_tx) = setup_server();
1196        let app = tauri::test::mock_app();
1197
1198        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1199
1200        // Give the background task time to connect.
1201        tokio::time::sleep(Duration::from_millis(100)).await;
1202
1203        let status = handle.get_state().await.unwrap();
1204        assert!(
1205            matches!(status.state, crate::models::ServiceState::Idle),
1206            "expected Idle, got {:?}",
1207            status.state
1208        );
1209
1210        handle.start(StartConfig::default()).await.unwrap();
1211
1212        // Poll until Running — race between Start reply (Initializing) and
1213        // spawned task transition to Running.
1214        let status = tokio::time::timeout(Duration::from_secs(2), async {
1215            loop {
1216                let s = handle.get_state().await.unwrap();
1217                if matches!(s.state, crate::models::ServiceState::Running) {
1218                    return s;
1219                }
1220                tokio::time::sleep(Duration::from_millis(10)).await;
1221            }
1222        })
1223        .await
1224        .expect("timed out waiting for Running state");
1225        assert!(
1226            matches!(status.state, crate::models::ServiceState::Running),
1227            "expected Running, got {:?}",
1228            status.state
1229        );
1230
1231        shutdown.cancel();
1232    }
1233
1234    // -- Fix: Timeout prevents permanent hang on unresponsive server --
1235
1236    /// Verify the persistent client returns an error (not hang) when the
1237    /// server accepts a connection but never responds to a command.
1238    ///
1239    /// This is a regression test for the critical bug where `wait_for_response`
1240    /// had no timeout — a dropped connection during command processing caused
1241    /// both the reconnect loop and the caller to hang permanently.
1242    #[tokio::test]
1243    async fn persistent_client_timeout_on_unresponsive_server() {
1244        let path = crate::desktop::test_helpers::unique_socket_path();
1245        let listener = transport::bind(path.clone()).unwrap();
1246
1247        // Server that accepts the connection but never responds.
1248        let server_handle = tokio::spawn(async move {
1249            let (_stream, _) = listener.accept().await.unwrap();
1250            // Hold connection open — never send a response.
1251            tokio::time::sleep(Duration::from_secs(60)).await;
1252        });
1253
1254        let app = tauri::test::mock_app();
1255        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1256
1257        // Give the background task time to connect.
1258        tokio::time::sleep(Duration::from_millis(100)).await;
1259
1260        // Start should timeout and return an error, not hang forever.
1261        let result = tokio::time::timeout(
1262            Duration::from_secs(15),
1263            handle.start(StartConfig::default()),
1264        )
1265        .await;
1266
1267        assert!(
1268            result.is_ok(),
1269            "start should not hang — expected error, got outer timeout"
1270        );
1271        let inner = result.unwrap();
1272        assert!(
1273            inner.is_err(),
1274            "start should return error when server is unresponsive"
1275        );
1276
1277        server_handle.abort();
1278        let _ = std::fs::remove_file(&path);
1279    }
1280
1281    // -- C1: Persistent client terminates on handle drop --
1282
1283    /// Verify that dropping `PersistentIpcClientHandle` causes the background
1284    /// reconnection task to stop (via `CancellationToken`), preventing resource
1285    /// leaks where the task reconnects forever after the handle is dropped.
1286    #[tokio::test]
1287    async fn persistent_client_terminates_on_handle_drop() {
1288        let (path, shutdown, _event_tx) = setup_server();
1289        let app = tauri::test::mock_app();
1290
1291        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1292
1293        // Give the background task time to connect.
1294        tokio::time::sleep(Duration::from_millis(100)).await;
1295
1296        // Drop the handle — this should cancel the shutdown token.
1297        drop(handle);
1298
1299        // The background task should terminate within a bounded time.
1300        // We can't observe the JoinHandle directly (it's fire-and-forget),
1301        // but we can verify the socket isn't being reconnected to by checking
1302        // that server shutdown succeeds cleanly.
1303        tokio::time::sleep(Duration::from_secs(2)).await;
1304
1305        shutdown.cancel();
1306    }
1307
1308    // ═══════════════════════════════════════════════════════════════════════
1309    //  BUFFERED EVENTS TESTS (Step 4)
1310    // ═══════════════════════════════════════════════════════════════════════
1311
1312    /// Helper: create a raw server that sends specific frames in response to
1313    /// any request, giving full control over the event/response interleaving.
1314    async fn buffered_server(
1315        path: &std::path::Path,
1316        frames: Vec<IpcMessage>,
1317    ) -> tokio::task::JoinHandle<()> {
1318        let listener = transport::bind(path.to_path_buf()).unwrap();
1319        tokio::spawn(async move {
1320            let (mut stream, _) = listener.accept().await.unwrap();
1321            use tokio::io::{AsyncReadExt, AsyncWriteExt};
1322            // Read and discard the incoming request.
1323            let mut len_buf = [0u8; 4];
1324            if stream.read_exact(&mut len_buf).await.is_err() {
1325                return;
1326            }
1327            let len = u32::from_be_bytes(len_buf) as usize;
1328            let mut payload = vec![0u8; len];
1329            if stream.read_exact(&mut payload).await.is_err() {
1330                return;
1331            }
1332            // Send the pre-programmed frames in order.
1333            for msg in &frames {
1334                let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1335                if stream.write_all(&frame).await.is_err() {
1336                    return;
1337                }
1338            }
1339        })
1340    }
1341
1342    /// send_and_read returns response with empty event list when no events interleave.
1343    #[tokio::test]
1344    async fn send_and_read_no_interleaved_events() {
1345        let path = crate::desktop::test_helpers::unique_socket_path();
1346        let server = buffered_server(
1347            &path,
1348            vec![IpcMessage::Response(IpcResponse {
1349                ok: true,
1350                data: None,
1351                error: None,
1352            })],
1353        )
1354        .await;
1355
1356        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1357        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1358        assert!(response.ok, "response should be ok");
1359        assert!(
1360            events.is_empty(),
1361            "events should be empty when no events interleave, got {:?}",
1362            events
1363        );
1364
1365        server.await.unwrap();
1366        let _ = std::fs::remove_file(&path);
1367    }
1368
1369    /// send_and_read collects a single interleaved event alongside the response.
1370    #[tokio::test]
1371    async fn send_and_read_single_interleaved_event() {
1372        let path = crate::desktop::test_helpers::unique_socket_path();
1373        let server = buffered_server(
1374            &path,
1375            vec![
1376                IpcMessage::Event(IpcEvent::Started),
1377                IpcMessage::Response(IpcResponse {
1378                    ok: true,
1379                    data: None,
1380                    error: None,
1381                }),
1382            ],
1383        )
1384        .await;
1385
1386        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1387        let (response, events) = client
1388            .send_and_read(&IpcRequest::Start {
1389                config: StartConfig::default(),
1390            })
1391            .await
1392            .unwrap();
1393        assert!(response.ok, "response should be ok");
1394        assert_eq!(events.len(), 1, "should collect exactly one event");
1395        assert!(
1396            matches!(events[0], IpcEvent::Started),
1397            "expected Started event, got {:?}",
1398            events[0]
1399        );
1400
1401        server.await.unwrap();
1402        let _ = std::fs::remove_file(&path);
1403    }
1404
1405    // ═══════════════════════════════════════════════════════════════════════
1406    //  IS_CONNECTED TESTS (Step 5)
1407    // ═══════════════════════════════════════════════════════════════════════
1408
1409    /// is_connected() returns false before the background task has connected
1410    /// to any server.
1411    #[tokio::test]
1412    async fn is_connected_false_before_server() {
1413        let app = tauri::test::mock_app();
1414        let path = crate::desktop::test_helpers::unique_socket_path();
1415        // No server running — spawn handle pointing at a nonexistent socket.
1416        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1417        // The background task may or may not have attempted a connection yet,
1418        // but it should definitely NOT be connected.
1419        tokio::time::sleep(Duration::from_millis(50)).await;
1420        assert!(
1421            !handle.is_connected(),
1422            "should not be connected when no server is running"
1423        );
1424        let _ = std::fs::remove_file(&path);
1425    }
1426
1427    /// is_connected() returns true once the persistent client has established
1428    /// a connection to a running server.
1429    #[tokio::test]
1430    async fn is_connected_true_after_connect() {
1431        let (path, shutdown, _event_tx) = setup_server();
1432        let app = tauri::test::mock_app();
1433        let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1434
1435        // Wait for the background task to connect.
1436        tokio::time::timeout(Duration::from_secs(2), async {
1437            while !handle.is_connected() {
1438                tokio::time::sleep(Duration::from_millis(50)).await;
1439            }
1440        })
1441        .await
1442        .expect("timed out waiting for is_connected to become true");
1443
1444        assert!(
1445            handle.is_connected(),
1446            "should be connected after server is up"
1447        );
1448
1449        shutdown.cancel();
1450    }
1451
1452    /// is_connected() returns false after the server shuts down and the
1453    /// persistent client detects the disconnection.
1454    ///
1455    /// Uses a minimal server that accepts one connection then explicitly drops
1456    /// it, guaranteeing the reader task exits and sets connected = false.
1457    #[tokio::test]
1458    async fn is_connected_false_after_server_shutdown() {
1459        let path = crate::desktop::test_helpers::unique_socket_path();
1460        let path_clone = path.clone();
1461        let listener = transport::bind(path.clone()).unwrap();
1462
1463        // Server that accepts a connection, waits briefly, then drops
1464        // everything (stream + listener), preventing reconnection.
1465        let server_handle = tokio::spawn(async move {
1466            let (stream, _) = listener.accept().await.unwrap();
1467            // Hold the connection briefly so the client can connect.
1468            tokio::time::sleep(Duration::from_millis(200)).await;
1469            // Drop the stream — reader will detect EOF.
1470            drop(stream);
1471            // Drop the listener (moved into this closure) and clean up socket.
1472            let _ = std::fs::remove_file(&path_clone);
1473        });
1474
1475        let app = tauri::test::mock_app();
1476        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1477
1478        // Wait for connection.
1479        tokio::time::timeout(Duration::from_secs(2), async {
1480            while !handle.is_connected() {
1481                tokio::time::sleep(Duration::from_millis(50)).await;
1482            }
1483        })
1484        .await
1485        .expect("timed out waiting for initial connection");
1486
1487        assert!(handle.is_connected(), "should be connected initially");
1488
1489        // Wait for the server to drop the connection and listener.
1490        tokio::time::timeout(Duration::from_secs(3), async {
1491            while handle.is_connected() {
1492                tokio::time::sleep(Duration::from_millis(50)).await;
1493            }
1494        })
1495        .await
1496        .expect("timed out waiting for is_connected to become false");
1497
1498        assert!(
1499            !handle.is_connected(),
1500            "should not be connected after server shutdown"
1501        );
1502
1503        server_handle.abort();
1504        let _ = std::fs::remove_file(&path);
1505    }
1506
1507    // ═══════════════════════════════════════════════════════════════════════
1508    //  BACKOFF BEHAVIOR TESTS (Step 6c)
1509    // ═══════════════════════════════════════════════════════════════════════
1510
1511    /// Verify the ExponentialBuilder config used in persistent_client_loop
1512    /// produces increasing delays, respects the 30s max, and exhausts after
1513    /// exactly 10 attempts.
1514    #[test]
1515    fn backoff_builder_produces_increasing_delays() {
1516        use backon::BackoffBuilder;
1517
1518        let builder = backon::ExponentialBuilder::default()
1519            .with_min_delay(Duration::from_secs(1))
1520            .with_max_delay(Duration::from_secs(30))
1521            .with_max_times(10)
1522            .with_jitter();
1523
1524        let mut attempts = builder.build();
1525        let mut delays = Vec::new();
1526        while let Some(d) = attempts.next() {
1527            delays.push(d);
1528        }
1529
1530        assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1531
1532        // First delay ≈ 1s (with jitter, allow 0.5–2s).
1533        assert!(
1534            delays[0] >= Duration::from_millis(500),
1535            "first delay too short: {:?}",
1536            delays[0]
1537        );
1538        assert!(
1539            delays[0] <= Duration::from_secs(2),
1540            "first delay too long: {:?}",
1541            delays[0]
1542        );
1543
1544        // Last delay should be at or near the 30s cap.
1545        assert!(
1546            delays[9] >= Duration::from_secs(15),
1547            "last delay should approach max: {:?}",
1548            delays[9]
1549        );
1550
1551        // All delays capped — with jitter, allow up to 2× max_delay.
1552        for d in &delays {
1553            assert!(
1554                *d <= Duration::from_secs(60),
1555                "delay exceeds max_delay + jitter margin: {:?}",
1556                d
1557            );
1558        }
1559
1560        // Iterator is exhausted after 10 attempts.
1561        assert!(
1562            attempts.next().is_none(),
1563            "should return None after 10 attempts"
1564        );
1565    }
1566
1567    /// Verify the persistent client stops retrying after exhausting its backoff
1568    /// budget and that subsequent commands fail with "shut down" (channel closed).
1569    ///
1570    /// With min_delay=1s, max_delay=30s, max_times=10, total retry time ≈ 152s.
1571    /// Marked `#[ignore]` to avoid slowing down normal test runs.
1572    /// Run with `cargo test -- --ignored`.
1573    #[ignore]
1574    #[tokio::test]
1575    async fn persistent_client_exits_after_max_retries() {
1576        let app = tauri::test::mock_app();
1577        let path = crate::desktop::test_helpers::unique_socket_path();
1578        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1579
1580        // Wait for the background task to exhaust retries.
1581        // Poll is_running() — once the loop exits, the command channel closes
1582        // and we get "shut down" instead of a timeout.
1583        let exited = tokio::time::timeout(Duration::from_secs(180), async {
1584            loop {
1585                tokio::time::sleep(Duration::from_secs(5)).await;
1586                if let Err(e) = handle.is_running().await {
1587                    if e.to_string().contains("shut down") {
1588                        return;
1589                    }
1590                }
1591            }
1592        })
1593        .await;
1594
1595        assert!(
1596            exited.is_ok(),
1597            "persistent client should exit after max retries"
1598        );
1599        assert!(!handle.is_connected(), "should not be connected after exit");
1600
1601        let _ = std::fs::remove_file(&path);
1602    }
1603
1604    /// Verify the persistent client reconnects after a server restart and that
1605    /// the backoff resets (reconnection starts from ~1s min_delay, not an
1606    /// accumulated delay).
1607    #[tokio::test]
1608    async fn persistent_client_reconnects_after_server_restart() {
1609        use crate::desktop::ipc_server::IpcServer;
1610        use crate::manager::{manager_loop, ServiceFactory};
1611        use tokio_util::sync::CancellationToken;
1612
1613        // Start first server.
1614        let (path, shutdown1, _event_tx) = setup_server();
1615        let app = tauri::test::mock_app();
1616        let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1617
1618        // Wait for connection to first server.
1619        tokio::time::timeout(Duration::from_secs(2), async {
1620            while !handle.is_connected() {
1621                tokio::time::sleep(Duration::from_millis(50)).await;
1622            }
1623        })
1624        .await
1625        .expect("should connect to first server");
1626
1627        // Verify commands work through first connection.
1628        let result = handle.is_running().await;
1629        assert!(
1630            result.is_ok(),
1631            "command should succeed on first server: {:?}",
1632            result.err()
1633        );
1634
1635        // Kill first server.
1636        shutdown1.cancel();
1637        tokio::time::sleep(Duration::from_millis(150)).await;
1638
1639        // Start second server at the same path.
1640        let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1641        let factory: ServiceFactory<tauri::test::MockRuntime> =
1642            Box::new(|| Box::new(BlockingService));
1643        tokio::spawn(manager_loop(
1644            cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false,
1645        ));
1646        let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1647        let shutdown2 = CancellationToken::new();
1648        let s2 = shutdown2.clone();
1649        tokio::spawn(async move { server2.run(s2).await });
1650
1651        // Client should reconnect within ~1s (backoff resets to min_delay after
1652        // a successful session, so the first retry is ~1s, not accumulated).
1653        let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1654            loop {
1655                if handle.is_connected() {
1656                    break;
1657                }
1658                tokio::time::sleep(Duration::from_millis(100)).await;
1659            }
1660        })
1661        .await;
1662
1663        assert!(
1664            reconnected.is_ok(),
1665            "persistent client should reconnect after server restart (backoff resets)"
1666        );
1667
1668        // Verify commands work through the new connection.
1669        let result = handle.is_running().await;
1670        assert!(
1671            result.is_ok(),
1672            "commands should work after reconnection: {:?}",
1673            result.err()
1674        );
1675
1676        shutdown2.cancel();
1677    }
1678
1679    // ═══════════════════════════════════════════════════════════════════════
1680    //  C3: ZERO-LENGTH FRAME REJECTION TESTS
1681    // ═══════════════════════════════════════════════════════════════════════
1682
1683    /// Verify that receiving a zero-length frame (\x00\x00\x00\x00) from the
1684    /// server produces an error, not `Ok(None)`. Zero-length frames are
1685    /// protocol violations and must be rejected explicitly.
1686    #[tokio::test]
1687    async fn ipc_client_rejects_zero_length_frame() {
1688        let path = crate::desktop::test_helpers::unique_socket_path();
1689        let listener = transport::bind(path.clone()).unwrap();
1690
1691        // Server that sends a zero-length frame immediately after accepting.
1692        let server_handle = tokio::spawn(async move {
1693            let (mut stream, _) = listener.accept().await.unwrap();
1694            use tokio::io::AsyncWriteExt;
1695            stream.write_all(&[0u8; 4]).await.unwrap();
1696            tokio::time::sleep(Duration::from_millis(500)).await;
1697        });
1698
1699        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1700
1701        // Reading a frame should return an error, not Ok(None)
1702        let result = client.read_frame().await;
1703        assert!(
1704            result.is_err(),
1705            "zero-length frame should return error, got {:?}",
1706            result
1707        );
1708        let err = result.unwrap_err().to_string();
1709        assert!(
1710            err.contains("zero-length frame"),
1711            "Error should mention 'zero-length frame': {err}"
1712        );
1713
1714        server_handle.abort();
1715        let _ = std::fs::remove_file(&path);
1716    }
1717
1718    /// Verify that an actual EOF (connection drop) still returns `Ok(None)`.
1719    /// This is the "clean close" case — distinct from a zero-length frame.
1720    #[tokio::test]
1721    async fn ipc_client_eof_returns_ok_none() {
1722        let path = crate::desktop::test_helpers::unique_socket_path();
1723        let listener = transport::bind(path.clone()).unwrap();
1724
1725        // Server that accepts a connection then immediately drops it.
1726        let server_handle = tokio::spawn(async move {
1727            let (stream, _) = listener.accept().await.unwrap();
1728            drop(stream);
1729        });
1730
1731        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1732        tokio::time::sleep(Duration::from_millis(20)).await;
1733
1734        // Reading a frame should return Ok(None) for clean EOF
1735        let result = client.read_frame().await;
1736        assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1737        assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1738
1739        server_handle.abort();
1740        let _ = std::fs::remove_file(&path);
1741    }
1742
1743    /// send_and_read collects multiple consecutive events before the response.
1744    #[tokio::test]
1745    async fn send_and_read_multiple_interleaved_events() {
1746        let path = crate::desktop::test_helpers::unique_socket_path();
1747        let server = buffered_server(
1748            &path,
1749            vec![
1750                IpcMessage::Event(IpcEvent::Started),
1751                IpcMessage::Event(IpcEvent::Error {
1752                    message: "warning".into(),
1753                }),
1754                IpcMessage::Event(IpcEvent::Stopped {
1755                    reason: "cancelled".into(),
1756                }),
1757                IpcMessage::Response(IpcResponse {
1758                    ok: true,
1759                    data: Some(serde_json::json!({"running": false})),
1760                    error: None,
1761                }),
1762            ],
1763        )
1764        .await;
1765
1766        let mut client = IpcClient::connect(path.clone()).await.unwrap();
1767        let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1768        assert!(response.ok, "response should be ok");
1769        assert_eq!(events.len(), 3, "should collect all three events");
1770        assert!(
1771            matches!(events[0], IpcEvent::Started),
1772            "first event should be Started"
1773        );
1774        assert!(
1775            matches!(events[1], IpcEvent::Error { .. }),
1776            "second event should be Error"
1777        );
1778        assert!(
1779            matches!(events[2], IpcEvent::Stopped { .. }),
1780            "third event should be Stopped"
1781        );
1782
1783        server.await.unwrap();
1784        let _ = std::fs::remove_file(&path);
1785    }
1786}