1use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17 decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21use crate::models::{PluginEvent, ServiceStatus, StartConfig};
22
23pub struct IpcClient {
33 stream: TransportStream,
34}
35
36impl IpcClient {
37 pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
39 let stream = transport::connect(&path).await?;
40 Ok(Self { stream })
41 }
42
43 pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
45 let request = IpcRequest::Start { config };
46 let (response, _events) = self.send_and_read(&request).await?;
47 if response.ok {
48 Ok(())
49 } else {
50 Err(ServiceError::Ipc(
51 response.error.unwrap_or_else(|| "unknown error".into()),
52 ))
53 }
54 }
55
56 pub async fn stop(&mut self) -> Result<(), ServiceError> {
58 let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
59 if response.ok {
60 Ok(())
61 } else {
62 Err(ServiceError::Ipc(
63 response.error.unwrap_or_else(|| "unknown error".into()),
64 ))
65 }
66 }
67
68 pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
70 let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
71 if response.ok {
72 Ok(response
73 .data
74 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
75 .unwrap_or(false))
76 } else {
77 Err(ServiceError::Ipc(
78 response.error.unwrap_or_else(|| "unknown error".into()),
79 ))
80 }
81 }
82
83 pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
85 let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
86 if response.ok {
87 response
88 .data
89 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
90 .and_then(|d| {
91 serde_json::from_value::<ServiceStatus>(d)
92 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
93 })
94 } else {
95 Err(ServiceError::Ipc(
96 response.error.unwrap_or_else(|| "unknown error".into()),
97 ))
98 }
99 }
100
101 pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
105 let frame = match self.read_frame().await? {
106 Some(f) => f,
107 None => return Ok(None),
108 };
109 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
110 IpcMessage::Event(event) => Ok(Some(event)),
111 other => Err(ServiceError::Ipc(format!(
112 "expected event frame, got {:?}",
113 std::mem::discriminant(&other),
114 ))),
115 }
116 }
117
118 pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
123 tokio::spawn(async move {
124 loop {
125 match self.read_event().await {
126 Ok(Some(event)) => {
127 let plugin_event = ipc_event_to_plugin_event(event);
128 let _ = app.emit("background-service://event", plugin_event);
129 }
130 Ok(None) => break,
131 Err(_) => break,
132 }
133 }
134 });
135 }
136
137 async fn send_and_read(
140 &mut self,
141 request: &IpcRequest,
142 ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
143 self.send_request(request).await?;
144 let mut events = Vec::new();
148 loop {
149 let frame = self
150 .read_frame()
151 .await?
152 .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
153 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
154 IpcMessage::Response(resp) => return Ok((resp, events)),
155 IpcMessage::Event(e) => {
156 events.push(e);
157 }
158 IpcMessage::Request(_) => {
159 return Err(ServiceError::Ipc("unexpected request frame".into()));
160 }
161 }
162 }
163 }
164
165 async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
166 let msg = IpcMessage::Request(request.clone());
167 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
168 transport::write_frame(&mut self.stream, &frame)
169 .await
170 .map_err(ServiceError::Ipc)?;
171 Ok(())
172 }
173
174 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
179 transport::read_frame(&mut self.stream)
180 .await
181 .map_err(ServiceError::Ipc)
182 }
183}
184
185pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
187 match event {
188 IpcEvent::Started => PluginEvent::Started,
189 IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
190 IpcEvent::Error { message } => PluginEvent::Error { message },
191 }
192}
193
194enum IpcCommand {
198 Start {
199 config: StartConfig,
200 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
201 },
202 Stop {
203 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
204 },
205 IsRunning {
206 reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
207 },
208 GetState {
209 reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
210 },
211 EnableAutoRestart {
212 config: Option<StartConfig>,
213 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
214 },
215 DisableAutoRestart {
216 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
217 },
218 GetDesiredState {
219 reply: tokio::sync::oneshot::Sender<
220 Result<Option<crate::desired_state::DesiredState>, ServiceError>,
221 >,
222 },
223 ValidateSetup {
224 reply: tokio::sync::oneshot::Sender<
225 Result<crate::models::SetupValidationReport, ServiceError>,
226 >,
227 },
228}
229
230pub struct PersistentIpcClientHandle {
238 cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
239 shutdown: tokio_util::sync::CancellationToken,
240 connected: Arc<AtomicBool>,
241 socket_path: PathBuf,
242}
243
244impl Drop for PersistentIpcClientHandle {
245 fn drop(&mut self) {
246 self.shutdown.cancel();
247 }
248}
249
250impl PersistentIpcClientHandle {
251 pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
257 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
258 let shutdown = tokio_util::sync::CancellationToken::new();
259 let connected = Arc::new(AtomicBool::new(false));
260
261 tokio::spawn(persistent_client_loop(
262 socket_path.clone(),
263 app,
264 cmd_rx,
265 shutdown.clone(),
266 connected.clone(),
267 ));
268
269 Self {
270 cmd_tx,
271 shutdown,
272 connected,
273 socket_path,
274 }
275 }
276
277 pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
279 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
280 self.cmd_tx
281 .send(IpcCommand::Start {
282 config,
283 reply: reply_tx,
284 })
285 .await
286 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
287 reply_rx
288 .await
289 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
290 }
291
292 pub async fn stop(&self) -> Result<(), ServiceError> {
294 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
295 self.cmd_tx
296 .send(IpcCommand::Stop { reply: reply_tx })
297 .await
298 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
299 reply_rx
300 .await
301 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
302 }
303
304 pub async fn is_running(&self) -> Result<bool, ServiceError> {
306 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
307 self.cmd_tx
308 .send(IpcCommand::IsRunning { reply: reply_tx })
309 .await
310 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
311 reply_rx
312 .await
313 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
314 }
315
316 pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
318 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
319 self.cmd_tx
320 .send(IpcCommand::GetState { reply: reply_tx })
321 .await
322 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
323 reply_rx
324 .await
325 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
326 }
327
328 pub fn is_connected(&self) -> bool {
331 self.connected.load(std::sync::atomic::Ordering::Relaxed)
332 }
333
334 pub fn socket_path(&self) -> &PathBuf {
336 &self.socket_path
337 }
338
339 pub async fn wait_for_connected(&self, timeout: Duration) -> Result<bool, ServiceError> {
345 let deadline = tokio::time::Instant::now() + timeout;
346 let poll_interval = Duration::from_millis(500);
347
348 while tokio::time::Instant::now() < deadline {
349 if self.is_connected() {
350 return Ok(true);
351 }
352 let remaining = deadline - tokio::time::Instant::now();
353 let sleep_dur = poll_interval.min(remaining);
354 tokio::time::sleep(sleep_dur).await;
355 }
356
357 if self.is_connected() {
358 Ok(true)
359 } else {
360 Ok(false)
361 }
362 }
363
364 pub async fn enable_auto_restart(
366 &self,
367 config: Option<StartConfig>,
368 ) -> Result<(), ServiceError> {
369 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
370 self.cmd_tx
371 .send(IpcCommand::EnableAutoRestart {
372 config,
373 reply: reply_tx,
374 })
375 .await
376 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
377 reply_rx
378 .await
379 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
380 }
381
382 pub async fn disable_auto_restart(&self) -> Result<(), ServiceError> {
384 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
385 self.cmd_tx
386 .send(IpcCommand::DisableAutoRestart { reply: reply_tx })
387 .await
388 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
389 reply_rx
390 .await
391 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
392 }
393
394 pub async fn get_desired_state(
396 &self,
397 ) -> Result<Option<crate::desired_state::DesiredState>, ServiceError> {
398 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
399 self.cmd_tx
400 .send(IpcCommand::GetDesiredState { reply: reply_tx })
401 .await
402 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
403 reply_rx
404 .await
405 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
406 }
407
408 pub async fn validate_setup(
410 &self,
411 ) -> Result<crate::models::SetupValidationReport, ServiceError> {
412 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
413 self.cmd_tx
414 .send(IpcCommand::ValidateSetup { reply: reply_tx })
415 .await
416 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
417 reply_rx
418 .await
419 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
420 }
421}
422
423async fn persistent_client_loop<R: Runtime>(
425 socket_path: PathBuf,
426 app: tauri::AppHandle<R>,
427 mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
428 shutdown: tokio_util::sync::CancellationToken,
429 connected: Arc<AtomicBool>,
430) {
431 use backon::BackoffBuilder;
432
433 let backoff_builder = backon::ExponentialBuilder::default()
434 .with_min_delay(Duration::from_secs(1))
435 .with_max_delay(Duration::from_secs(30))
436 .with_max_times(10)
437 .with_jitter();
438
439 let mut attempts = backoff_builder.build();
440
441 loop {
442 tokio::select! {
443 biased;
444 _ = shutdown.cancelled() => {
445 log::info!("Persistent IPC client shutting down");
446 connected.store(false, std::sync::atomic::Ordering::Relaxed);
447 break;
448 }
449 connect_result = transport::connect(&socket_path) => {
450 match connect_result {
451 Ok(stream) => {
452 log::info!("Persistent IPC client connected");
453 connected.store(true, std::sync::atomic::Ordering::Relaxed);
454 let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
455 attempts = backoff_builder.build();
457 if result.is_err() {
458 log::info!("Persistent IPC connection lost, reconnecting...");
459 connected.store(false, std::sync::atomic::Ordering::Relaxed);
460 }
461 }
462 Err(_) => {
463 log::debug!("Persistent IPC client: connection failed, retrying...");
464 connected.store(false, std::sync::atomic::Ordering::Relaxed);
465 }
466 }
467 let delay = match attempts.next() {
468 Some(d) => d,
469 None => {
470 log::warn!("Persistent IPC client: backoff exhausted, giving up");
471 break;
472 }
473 };
474 tokio::select! {
475 biased;
476 _ = shutdown.cancelled() => {
477 log::info!("Persistent IPC client shutting down");
478 connected.store(false, std::sync::atomic::Ordering::Relaxed);
479 break;
480 }
481 _ = tokio::time::sleep(delay) => {}
482 }
483 }
484 }
485 }
486}
487
488async fn run_persistent_connection<R: Runtime>(
495 stream: TransportStream,
496 app: &tauri::AppHandle<R>,
497 cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
498 connected: &Arc<AtomicBool>,
499) -> Result<(), ServiceError> {
500 let (read_half, mut write_half) = transport::split(stream);
501
502 let response_slot: std::sync::Arc<
504 tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
505 > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
506
507 let slot_writer = response_slot.clone();
508 let app_clone = app.clone();
509 let connected_reader = connected.clone();
510
511 let reader_handle = tokio::spawn(async move {
513 let mut read_half = read_half;
514 loop {
515 let frame = match read_frame_from(&mut read_half).await {
516 Ok(Some(f)) => f,
517 Ok(None) => break, Err(_) => break,
519 };
520
521 match decode_frame(&frame) {
522 Ok(IpcMessage::Response(resp)) => {
523 let mut slot = slot_writer.lock().await;
524 if let Some(sender) = slot.take() {
525 let _ = sender.send(resp);
526 }
527 continue;
528 }
529 Ok(IpcMessage::Event(event)) => {
530 let plugin_event = ipc_event_to_plugin_event(event);
531 let _ = app_clone.emit("background-service://event", plugin_event);
532 continue;
533 }
534 Ok(IpcMessage::Request(_)) => {
535 log::warn!("unexpected request frame on client connection");
536 continue;
537 }
538 Err(e) => {
539 log::debug!("failed to decode IPC frame: {e}");
540 continue;
541 }
542 }
543 }
544 connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
546 });
547
548 let result = loop {
550 tokio::select! {
551 cmd = cmd_rx.recv() => {
552 let cmd = match cmd {
553 Some(c) => c,
554 None => break Err(ServiceError::Ipc("command channel closed".into())),
555 };
556
557 match cmd {
558 IpcCommand::Start { config, reply } => {
559 let request = IpcRequest::Start { config };
560 let rx = prepare_response_slot(&response_slot).await;
561 if let Err(e) = send_request_to(&mut write_half, &request).await {
562 let _ = reply.send(Err(e));
563 break Err(ServiceError::Ipc("send failed".into()));
564 }
565 let response = await_response(rx).await;
566 let result = match response {
567 Ok(resp) if resp.ok => Ok(()),
568 Ok(resp) => Err(ServiceError::Ipc(
569 resp.error.unwrap_or_else(|| "unknown error".into()),
570 )),
571 Err(e) => Err(e),
572 };
573 let _ = reply.send(result);
574 }
575 IpcCommand::Stop { reply } => {
576 let rx = prepare_response_slot(&response_slot).await;
577 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
578 let _ = reply.send(Err(e));
579 break Err(ServiceError::Ipc("send failed".into()));
580 }
581 let response = await_response(rx).await;
582 let result = match response {
583 Ok(resp) if resp.ok => Ok(()),
584 Ok(resp) => Err(ServiceError::Ipc(
585 resp.error.unwrap_or_else(|| "unknown error".into()),
586 )),
587 Err(e) => Err(e),
588 };
589 let _ = reply.send(result);
590 }
591 IpcCommand::IsRunning { reply } => {
592 let rx = prepare_response_slot(&response_slot).await;
593 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
594 let _ = reply.send(Err(e));
595 break Err(ServiceError::Ipc("send failed".into()));
596 }
597 let response = await_response(rx).await;
598 let result = match response {
599 Ok(resp) if resp.ok => Ok(resp
600 .data
601 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
602 .unwrap_or(false)),
603 Ok(resp) => Err(ServiceError::Ipc(
604 resp.error.unwrap_or_else(|| "unknown error".into()),
605 )),
606 Err(e) => Err(e),
607 };
608 let _ = reply.send(result);
609 }
610 IpcCommand::GetState { reply } => {
611 let rx = prepare_response_slot(&response_slot).await;
612 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
613 let _ = reply.send(Err(e));
614 break Err(ServiceError::Ipc("send failed".into()));
615 }
616 let response = await_response(rx).await;
617 let result = match response {
618 Ok(resp) if resp.ok => resp
619 .data
620 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
621 .and_then(|d| {
622 serde_json::from_value::<ServiceStatus>(d)
623 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
624 }),
625 Ok(resp) => Err(ServiceError::Ipc(
626 resp.error.unwrap_or_else(|| "unknown error".into()),
627 )),
628 Err(e) => Err(e),
629 };
630 let _ = reply.send(result);
631 }
632 IpcCommand::EnableAutoRestart { config, reply } => {
633 let request = IpcRequest::EnableAutoRestart { config };
634 let rx = prepare_response_slot(&response_slot).await;
635 if let Err(e) = send_request_to(&mut write_half, &request).await {
636 let _ = reply.send(Err(e));
637 break Err(ServiceError::Ipc("send failed".into()));
638 }
639 let response = await_response(rx).await;
640 let result = match response {
641 Ok(resp) if resp.ok => Ok(()),
642 Ok(resp) => Err(ServiceError::Ipc(
643 resp.error.unwrap_or_else(|| "unknown error".into()),
644 )),
645 Err(e) => Err(e),
646 };
647 let _ = reply.send(result);
648 }
649 IpcCommand::DisableAutoRestart { reply } => {
650 let rx = prepare_response_slot(&response_slot).await;
651 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::DisableAutoRestart).await {
652 let _ = reply.send(Err(e));
653 break Err(ServiceError::Ipc("send failed".into()));
654 }
655 let response = await_response(rx).await;
656 let result = match response {
657 Ok(resp) if resp.ok => Ok(()),
658 Ok(resp) => Err(ServiceError::Ipc(
659 resp.error.unwrap_or_else(|| "unknown error".into()),
660 )),
661 Err(e) => Err(e),
662 };
663 let _ = reply.send(result);
664 }
665 IpcCommand::GetDesiredState { reply } => {
666 let rx = prepare_response_slot(&response_slot).await;
667 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetDesiredState).await {
668 let _ = reply.send(Err(e));
669 break Err(ServiceError::Ipc("send failed".into()));
670 }
671 let response = await_response(rx).await;
672 let result = match response {
673 Ok(resp) if resp.ok => {
674 match resp.data {
675 Some(d) => serde_json::from_value::<crate::desired_state::DesiredState>(d)
676 .map(Some)
677 .map_err(|e| ServiceError::Ipc(format!("deserialize GetDesiredState: {e}"))),
678 None => Ok(None),
679 }
680 }
681 Ok(resp) => Err(ServiceError::Ipc(
682 resp.error.unwrap_or_else(|| "unknown error".into()),
683 )),
684 Err(e) => Err(e),
685 };
686 let _ = reply.send(result);
687 }
688 IpcCommand::ValidateSetup { reply } => {
689 let rx = prepare_response_slot(&response_slot).await;
690 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::ValidateSetup).await {
691 let _ = reply.send(Err(e));
692 break Err(ServiceError::Ipc("send failed".into()));
693 }
694 let response = await_response(rx).await;
695 let result = match response {
696 Ok(resp) if resp.ok => {
697 match resp.data {
698 Some(d) => serde_json::from_value::<crate::models::SetupValidationReport>(d)
699 .map_err(|e| ServiceError::Ipc(format!("deserialize ValidateSetup: {e}"))),
700 None => Err(ServiceError::Ipc("missing ValidateSetup response data".into())),
701 }
702 }
703 Ok(resp) => Err(ServiceError::Ipc(
704 resp.error.unwrap_or_else(|| "unknown error".into()),
705 )),
706 Err(e) => Err(e),
707 };
708 let _ = reply.send(result);
709 }
710 }
711 }
712 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
713 if reader_handle.is_finished() {
715 break Err(ServiceError::Ipc("reader task died".into()));
716 }
717 }
718 }
719 };
720
721 reader_handle.abort();
722 result
723}
724
725async fn send_request_to(
727 write_half: &mut TransportWriteHalf,
728 request: &IpcRequest,
729) -> Result<(), ServiceError> {
730 let msg = IpcMessage::Request(request.clone());
731 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
732 transport::write_frame(write_half, &frame)
733 .await
734 .map_err(ServiceError::Ipc)?;
735 Ok(())
736}
737
738async fn prepare_response_slot(
746 slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
747) -> tokio::sync::oneshot::Receiver<IpcResponse> {
748 let (tx, rx) = tokio::sync::oneshot::channel();
749 let mut guard = slot.lock().await;
750 debug_assert!(
751 guard.is_none(),
752 "response slot overwritten — sequential command invariant violated"
753 );
754 *guard = Some(tx);
755 rx
756}
757
758async fn await_response(
763 rx: tokio::sync::oneshot::Receiver<IpcResponse>,
764) -> Result<IpcResponse, ServiceError> {
765 tokio::select! {
766 response = rx => {
767 response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
768 }
769 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
770 Err(ServiceError::Ipc("response timeout".into()))
771 }
772 }
773}
774
775async fn read_frame_from(
779 read_half: &mut TransportReadHalf,
780) -> Result<Option<Vec<u8>>, ServiceError> {
781 transport::read_frame(read_half)
782 .await
783 .map_err(ServiceError::Ipc)
784}
785
786#[cfg(test)]
787mod tests {
788 use super::*;
789 use crate::desktop::test_helpers::{
790 setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
791 };
792 use std::sync::atomic::Ordering;
793 use std::time::Duration;
794 use tauri::Listener;
795
796 #[tokio::test]
799 async fn ipc_client_connect() {
800 let (path, shutdown, _event_tx) = setup_server();
801 let result = IpcClient::connect(path).await;
802 assert!(result.is_ok(), "client should connect: {:?}", result.err());
803 shutdown.cancel();
804 }
805
806 #[tokio::test]
809 async fn ipc_client_send_start() {
810 let (path, shutdown, _event_tx) = setup_server();
811 let mut client = IpcClient::connect(path).await.unwrap();
812 let result = client.start(StartConfig::default()).await;
813 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
814 shutdown.cancel();
815 }
816
817 #[tokio::test]
820 async fn ipc_client_send_stop() {
821 let (path, shutdown, _event_tx) = setup_server();
822 let mut client = IpcClient::connect(path).await.unwrap();
823 client.start(StartConfig::default()).await.unwrap();
824 let result = client.stop().await;
825 assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
826 shutdown.cancel();
827 }
828
829 #[tokio::test]
832 async fn ipc_client_is_running() {
833 let (path, shutdown, _event_tx) = setup_server();
834 let mut client = IpcClient::connect(path).await.unwrap();
835
836 let running = client.is_running().await.unwrap();
837 assert!(!running, "should not be running initially");
838
839 client.start(StartConfig::default()).await.unwrap();
840 let running = client.is_running().await.unwrap();
841 assert!(running, "should be running after start");
842
843 shutdown.cancel();
844 }
845
846 #[tokio::test]
849 async fn ipc_client_get_state_initial() {
850 let (path, shutdown, _event_tx) = setup_server();
851 let mut client = IpcClient::connect(path).await.unwrap();
852
853 let status = client.get_state().await.unwrap();
854 assert!(
855 matches!(status.state, crate::models::ServiceState::Idle),
856 "expected Idle, got {:?}",
857 status.state
858 );
859 assert_eq!(status.last_error, None);
860
861 shutdown.cancel();
862 }
863
864 #[tokio::test]
865 async fn ipc_client_get_state_after_start() {
866 let (path, shutdown, _event_tx) = setup_server();
867 let mut client = IpcClient::connect(path).await.unwrap();
868
869 client.start(StartConfig::default()).await.unwrap();
870
871 let status = tokio::time::timeout(Duration::from_secs(2), async {
874 loop {
875 let s = client.get_state().await.unwrap();
876 if matches!(s.state, crate::models::ServiceState::Running) {
877 return s;
878 }
879 tokio::time::sleep(Duration::from_millis(10)).await;
880 }
881 })
882 .await
883 .expect("timed out waiting for Running state");
884 assert_eq!(status.last_error, None);
885
886 shutdown.cancel();
887 }
888
889 #[tokio::test]
890 async fn ipc_client_get_state_after_stop() {
891 let (path, shutdown, _event_tx) = setup_server();
892 let mut client = IpcClient::connect(path).await.unwrap();
893
894 client.start(StartConfig::default()).await.unwrap();
895 client.stop().await.unwrap();
896 let status = client.get_state().await.unwrap();
897 assert!(
898 matches!(status.state, crate::models::ServiceState::Stopped),
899 "expected Stopped, got {:?}",
900 status.state
901 );
902
903 shutdown.cancel();
904 }
905
906 #[tokio::test]
909 async fn ipc_client_receive_events() {
910 let (path, shutdown, event_tx) =
911 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
912 let mut client = IpcClient::connect(path).await.unwrap();
913 client.start(StartConfig::default()).await.unwrap();
914
915 let _ = event_tx.send(IpcEvent::Started);
917
918 let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
919 .await
920 .expect("timed out waiting for event")
921 .expect("read_event failed");
922
923 assert!(event.is_some(), "should receive an event");
924 let event = event.unwrap();
925 assert!(
926 matches!(event, IpcEvent::Started),
927 "Expected Started event, got {:?}",
928 event
929 );
930
931 shutdown.cancel();
932 }
933
934 #[tokio::test]
937 async fn ipc_client_stop_when_not_running() {
938 let (path, shutdown, _event_tx) = setup_server();
939 let mut client = IpcClient::connect(path).await.unwrap();
940 let result = client.stop().await;
941 assert!(result.is_err(), "stop when not running should fail");
942 shutdown.cancel();
943 }
944
945 #[tokio::test]
948 async fn ipc_client_connect_to_nonexistent() {
949 let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
950 let result = IpcClient::connect(path).await;
951 assert!(
952 result.is_err(),
953 "should fail to connect to nonexistent socket"
954 );
955 }
956
957 #[test]
960 fn ipc_event_to_plugin_event_started() {
961 let event = IpcEvent::Started;
962 let plugin = ipc_event_to_plugin_event(event);
963 assert!(matches!(plugin, PluginEvent::Started));
964 }
965
966 #[test]
967 fn ipc_event_to_plugin_event_stopped() {
968 let event = IpcEvent::Stopped {
969 reason: "cancelled".into(),
970 };
971 let plugin = ipc_event_to_plugin_event(event);
972 match plugin {
973 PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
974 other => panic!("Expected Stopped, got {other:?}"),
975 }
976 }
977
978 #[test]
979 fn ipc_event_to_plugin_event_error() {
980 let event = IpcEvent::Error {
981 message: "init failed".into(),
982 };
983 let plugin = ipc_event_to_plugin_event(event);
984 match plugin {
985 PluginEvent::Error { message } => assert_eq!(message, "init failed"),
986 other => panic!("Expected Error, got {other:?}"),
987 }
988 }
989
990 #[tokio::test]
993 async fn ipc_client_full_lifecycle() {
994 let (path, shutdown, _event_tx) = setup_server();
995 let mut client = IpcClient::connect(path).await.unwrap();
996
997 assert!(!client.is_running().await.unwrap());
998 client.start(StartConfig::default()).await.unwrap();
999 assert!(client.is_running().await.unwrap());
1000 client.stop().await.unwrap();
1001 assert!(!client.is_running().await.unwrap());
1002
1003 shutdown.cancel();
1004 }
1005
1006 #[tokio::test]
1009 async fn ipc_client_listen_events() {
1010 let (path, shutdown, event_tx) =
1011 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1012 let app = tauri::test::mock_app();
1013
1014 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1015 let received_clone = received.clone();
1016 app.listen("background-service://event", move |_event| {
1017 received_clone.store(true, Ordering::SeqCst);
1018 });
1019
1020 let mut client = IpcClient::connect(path).await.unwrap();
1021 client.start(StartConfig::default()).await.unwrap();
1022 client.listen_events(app.handle().clone());
1023
1024 let _ = event_tx.send(IpcEvent::Started);
1026
1027 tokio::time::timeout(Duration::from_millis(500), async {
1028 while !received.load(Ordering::SeqCst) {
1029 tokio::time::sleep(Duration::from_millis(10)).await;
1030 }
1031 })
1032 .await
1033 .expect("timed out waiting for event via listen_events");
1034
1035 assert!(
1036 received.load(Ordering::SeqCst),
1037 "should have received event"
1038 );
1039 shutdown.cancel();
1040 }
1041
1042 #[tokio::test]
1054 async fn ipc_loopback_full_lifecycle_with_events() {
1055 let (path, shutdown, event_tx) = setup_server();
1056 let mut client = IpcClient::connect(path).await.unwrap();
1057
1058 assert!(
1060 !client.is_running().await.unwrap(),
1061 "should not be running initially"
1062 );
1063
1064 client
1066 .start(StartConfig::default())
1067 .await
1068 .expect("start should succeed");
1069
1070 let _ = event_tx.send(IpcEvent::Started);
1072
1073 let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1076 .await
1077 .expect("timed out waiting for Started event")
1078 .expect("read_event failed")
1079 .expect("should receive event");
1080 assert!(
1081 matches!(started, IpcEvent::Started),
1082 "Expected Started event, got {started:?}"
1083 );
1084
1085 assert!(
1087 client.is_running().await.unwrap(),
1088 "should be running after start"
1089 );
1090
1091 client.stop().await.expect("stop should succeed");
1093
1094 let _ = event_tx.send(IpcEvent::Stopped {
1096 reason: "cancelled".into(),
1097 });
1098
1099 let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1101 .await
1102 .expect("timed out waiting for Stopped event")
1103 .expect("read_event failed")
1104 .expect("should receive event");
1105 assert!(
1106 matches!(stopped, IpcEvent::Stopped { .. }),
1107 "Expected Stopped event, got {stopped:?}"
1108 );
1109
1110 assert!(
1112 !client.is_running().await.unwrap(),
1113 "should not be running after stop"
1114 );
1115
1116 shutdown.cancel();
1117 }
1118
1119 #[tokio::test]
1123 async fn ipc_loopback_event_streaming_plugin_event_conversion() {
1124 let (path, shutdown, event_tx) = setup_server();
1125 let mut client = IpcClient::connect(path).await.unwrap();
1126
1127 client.start(StartConfig::default()).await.unwrap();
1129 let _ = event_tx.send(IpcEvent::Started);
1130 let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1131 .await
1132 .expect("timed out")
1133 .expect("read_event failed")
1134 .expect("should receive event");
1135 let started_plugin = ipc_event_to_plugin_event(started_ipc);
1136 assert!(
1137 matches!(started_plugin, PluginEvent::Started),
1138 "Expected PluginEvent::Started, got {started_plugin:?}"
1139 );
1140
1141 client.stop().await.unwrap();
1143 let _ = event_tx.send(IpcEvent::Stopped {
1144 reason: "cancelled".into(),
1145 });
1146 let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1147 .await
1148 .expect("timed out")
1149 .expect("read_event failed")
1150 .expect("should receive event");
1151 let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
1152 match stopped_plugin {
1153 PluginEvent::Stopped { reason } => {
1154 assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
1155 }
1156 other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
1157 }
1158
1159 shutdown.cancel();
1160 }
1161
1162 #[tokio::test]
1167 async fn ipc_loopback_connection_drop_returns_error() {
1168 let path = crate::desktop::test_helpers::unique_socket_path();
1169
1170 let listener = transport::bind(path.clone()).unwrap();
1172 let path_clone = path.clone();
1173
1174 let client_handle =
1175 tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
1176
1177 let (server_stream, _) = listener.accept().await.unwrap();
1179 drop(server_stream);
1180 tokio::time::sleep(Duration::from_millis(20)).await;
1181
1182 let mut client = client_handle.await.unwrap();
1183
1184 let result = client.is_running().await;
1186 assert!(
1187 result.is_err(),
1188 "should get error after server drops connection"
1189 );
1190
1191 let _ = std::fs::remove_file(&path);
1192 }
1193
1194 #[tokio::test]
1198 async fn ipc_loopback_double_start_returns_error() {
1199 let (path, shutdown, _event_tx) = setup_server();
1200 let mut client = IpcClient::connect(path).await.unwrap();
1201
1202 client.start(StartConfig::default()).await.unwrap();
1203
1204 let result = client.start(StartConfig::default()).await;
1205 assert!(result.is_err(), "double start should return error");
1206 let err_msg = result.unwrap_err().to_string();
1207 assert!(
1208 err_msg.to_lowercase().contains("already"),
1209 "Error should mention 'already': {err_msg}"
1210 );
1211
1212 shutdown.cancel();
1213 }
1214
1215 #[tokio::test]
1224 async fn persistent_client_connects() {
1225 let (path, shutdown, _event_tx) = setup_server();
1226 let app = tauri::test::mock_app();
1227
1228 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1229
1230 tokio::time::sleep(Duration::from_millis(100)).await;
1232
1233 let running = handle.is_running().await;
1235 assert!(
1236 running.is_ok(),
1237 "should get response via persistent connection: {:?}",
1238 running.err()
1239 );
1240 assert!(!running.unwrap(), "should not be running initially");
1241
1242 shutdown.cancel();
1243 }
1244
1245 #[tokio::test]
1249 async fn persistent_client_reconnects() {
1250 use crate::desktop::ipc_server::IpcServer;
1251 use crate::manager::{manager_loop, ServiceFactory};
1252 use tokio_util::sync::CancellationToken;
1253
1254 let (path, shutdown1, _event_tx) = setup_server();
1256 let app = tauri::test::mock_app();
1257
1258 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1259
1260 tokio::time::sleep(Duration::from_millis(100)).await;
1262 let result = handle.is_running().await;
1263 assert!(
1264 result.is_ok(),
1265 "should connect to first server: {:?}",
1266 result.err()
1267 );
1268
1269 shutdown1.cancel();
1271 tokio::time::sleep(Duration::from_millis(150)).await;
1272
1273 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1275 let factory: ServiceFactory<tauri::test::MockRuntime> =
1276 Box::new(|| Box::new(BlockingService));
1277 tokio::spawn(manager_loop(
1278 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1279 ));
1280 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1281 let shutdown2 = CancellationToken::new();
1282 let s2 = shutdown2.clone();
1283 tokio::spawn(async move { server2.run(s2).await });
1284
1285 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1287 loop {
1288 tokio::time::sleep(Duration::from_millis(200)).await;
1289 if handle.is_running().await.is_ok() {
1290 break;
1291 }
1292 }
1293 })
1294 .await;
1295 assert!(
1296 reconnected.is_ok(),
1297 "persistent client should reconnect to second server"
1298 );
1299
1300 shutdown2.cancel();
1301 }
1302
1303 #[tokio::test]
1308 async fn event_relay() {
1309 let (path, shutdown, event_tx) =
1310 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1311 let app = tauri::test::mock_app();
1312
1313 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1314 let received_clone = received.clone();
1315 app.listen("background-service://event", move |_event| {
1316 received_clone.store(true, Ordering::SeqCst);
1317 });
1318
1319 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1320
1321 let result = handle.start(StartConfig::default()).await;
1323 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1324
1325 let _ = event_tx.send(IpcEvent::Started);
1327
1328 tokio::time::timeout(Duration::from_millis(500), async {
1330 while !received.load(Ordering::SeqCst) {
1331 tokio::time::sleep(Duration::from_millis(10)).await;
1332 }
1333 })
1334 .await
1335 .expect("timed out waiting for event relay via app.emit()");
1336
1337 assert!(
1338 received.load(Ordering::SeqCst),
1339 "event should be relayed through app.emit()"
1340 );
1341
1342 shutdown.cancel();
1343 }
1344
1345 #[tokio::test]
1350 async fn start_stop_lifecycle() {
1351 let (path, shutdown, _event_tx) = setup_server();
1352 let app = tauri::test::mock_app();
1353
1354 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1355
1356 let running = handle.is_running().await.unwrap();
1358 assert!(!running, "should not be running initially");
1359
1360 handle
1362 .start(StartConfig::default())
1363 .await
1364 .expect("start should succeed");
1365 let running = handle.is_running().await.unwrap();
1366 assert!(running, "should be running after start");
1367
1368 handle.stop().await.expect("stop should succeed");
1370 let running = handle.is_running().await.unwrap();
1371 assert!(!running, "should not be running after stop");
1372
1373 shutdown.cancel();
1374 }
1375
1376 #[tokio::test]
1379 async fn persistent_client_get_state() {
1380 let (path, shutdown, _event_tx) = setup_server();
1381 let app = tauri::test::mock_app();
1382
1383 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1384
1385 tokio::time::sleep(Duration::from_millis(100)).await;
1387
1388 let status = handle.get_state().await.unwrap();
1389 assert!(
1390 matches!(status.state, crate::models::ServiceState::Idle),
1391 "expected Idle, got {:?}",
1392 status.state
1393 );
1394
1395 handle.start(StartConfig::default()).await.unwrap();
1396
1397 let status = tokio::time::timeout(Duration::from_secs(2), async {
1400 loop {
1401 let s = handle.get_state().await.unwrap();
1402 if matches!(s.state, crate::models::ServiceState::Running) {
1403 return s;
1404 }
1405 tokio::time::sleep(Duration::from_millis(10)).await;
1406 }
1407 })
1408 .await
1409 .expect("timed out waiting for Running state");
1410 assert!(
1411 matches!(status.state, crate::models::ServiceState::Running),
1412 "expected Running, got {:?}",
1413 status.state
1414 );
1415
1416 shutdown.cancel();
1417 }
1418
1419 #[tokio::test]
1428 async fn persistent_client_timeout_on_unresponsive_server() {
1429 let path = crate::desktop::test_helpers::unique_socket_path();
1430 let listener = transport::bind(path.clone()).unwrap();
1431
1432 let server_handle = tokio::spawn(async move {
1434 let (_stream, _) = listener.accept().await.unwrap();
1435 tokio::time::sleep(Duration::from_secs(60)).await;
1437 });
1438
1439 let app = tauri::test::mock_app();
1440 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1441
1442 tokio::time::sleep(Duration::from_millis(100)).await;
1444
1445 let result = tokio::time::timeout(
1447 Duration::from_secs(15),
1448 handle.start(StartConfig::default()),
1449 )
1450 .await;
1451
1452 assert!(
1453 result.is_ok(),
1454 "start should not hang — expected error, got outer timeout"
1455 );
1456 let inner = result.unwrap();
1457 assert!(
1458 inner.is_err(),
1459 "start should return error when server is unresponsive"
1460 );
1461
1462 server_handle.abort();
1463 let _ = std::fs::remove_file(&path);
1464 }
1465
1466 #[tokio::test]
1472 async fn persistent_client_terminates_on_handle_drop() {
1473 let (path, shutdown, _event_tx) = setup_server();
1474 let app = tauri::test::mock_app();
1475
1476 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1477
1478 tokio::time::sleep(Duration::from_millis(100)).await;
1480
1481 drop(handle);
1483
1484 tokio::time::sleep(Duration::from_secs(2)).await;
1489
1490 shutdown.cancel();
1491 }
1492
1493 async fn buffered_server(
1500 path: &std::path::Path,
1501 frames: Vec<IpcMessage>,
1502 ) -> tokio::task::JoinHandle<()> {
1503 let listener = transport::bind(path.to_path_buf()).unwrap();
1504 tokio::spawn(async move {
1505 let (mut stream, _) = listener.accept().await.unwrap();
1506 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1507 let mut len_buf = [0u8; 4];
1509 if stream.read_exact(&mut len_buf).await.is_err() {
1510 return;
1511 }
1512 let len = u32::from_be_bytes(len_buf) as usize;
1513 let mut payload = vec![0u8; len];
1514 if stream.read_exact(&mut payload).await.is_err() {
1515 return;
1516 }
1517 for msg in &frames {
1519 let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1520 if stream.write_all(&frame).await.is_err() {
1521 return;
1522 }
1523 }
1524 })
1525 }
1526
1527 #[tokio::test]
1529 async fn send_and_read_no_interleaved_events() {
1530 let path = crate::desktop::test_helpers::unique_socket_path();
1531 let server = buffered_server(
1532 &path,
1533 vec![IpcMessage::Response(IpcResponse {
1534 ok: true,
1535 data: None,
1536 error: None,
1537 })],
1538 )
1539 .await;
1540
1541 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1542 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1543 assert!(response.ok, "response should be ok");
1544 assert!(
1545 events.is_empty(),
1546 "events should be empty when no events interleave, got {:?}",
1547 events
1548 );
1549
1550 server.await.unwrap();
1551 let _ = std::fs::remove_file(&path);
1552 }
1553
1554 #[tokio::test]
1556 async fn send_and_read_single_interleaved_event() {
1557 let path = crate::desktop::test_helpers::unique_socket_path();
1558 let server = buffered_server(
1559 &path,
1560 vec![
1561 IpcMessage::Event(IpcEvent::Started),
1562 IpcMessage::Response(IpcResponse {
1563 ok: true,
1564 data: None,
1565 error: None,
1566 }),
1567 ],
1568 )
1569 .await;
1570
1571 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1572 let (response, events) = client
1573 .send_and_read(&IpcRequest::Start {
1574 config: StartConfig::default(),
1575 })
1576 .await
1577 .unwrap();
1578 assert!(response.ok, "response should be ok");
1579 assert_eq!(events.len(), 1, "should collect exactly one event");
1580 assert!(
1581 matches!(events[0], IpcEvent::Started),
1582 "expected Started event, got {:?}",
1583 events[0]
1584 );
1585
1586 server.await.unwrap();
1587 let _ = std::fs::remove_file(&path);
1588 }
1589
1590 #[tokio::test]
1597 async fn is_connected_false_before_server() {
1598 let app = tauri::test::mock_app();
1599 let path = crate::desktop::test_helpers::unique_socket_path();
1600 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1602 tokio::time::sleep(Duration::from_millis(50)).await;
1605 assert!(
1606 !handle.is_connected(),
1607 "should not be connected when no server is running"
1608 );
1609 let _ = std::fs::remove_file(&path);
1610 }
1611
1612 #[tokio::test]
1615 async fn is_connected_true_after_connect() {
1616 let (path, shutdown, _event_tx) = setup_server();
1617 let app = tauri::test::mock_app();
1618 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1619
1620 tokio::time::timeout(Duration::from_secs(2), async {
1622 while !handle.is_connected() {
1623 tokio::time::sleep(Duration::from_millis(50)).await;
1624 }
1625 })
1626 .await
1627 .expect("timed out waiting for is_connected to become true");
1628
1629 assert!(
1630 handle.is_connected(),
1631 "should be connected after server is up"
1632 );
1633
1634 shutdown.cancel();
1635 }
1636
1637 #[tokio::test]
1643 async fn is_connected_false_after_server_shutdown() {
1644 let path = crate::desktop::test_helpers::unique_socket_path();
1645 let path_clone = path.clone();
1646 let listener = transport::bind(path.clone()).unwrap();
1647
1648 let server_handle = tokio::spawn(async move {
1651 let (stream, _) = listener.accept().await.unwrap();
1652 tokio::time::sleep(Duration::from_millis(200)).await;
1654 drop(stream);
1656 let _ = std::fs::remove_file(&path_clone);
1658 });
1659
1660 let app = tauri::test::mock_app();
1661 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1662
1663 tokio::time::timeout(Duration::from_secs(2), async {
1665 while !handle.is_connected() {
1666 tokio::time::sleep(Duration::from_millis(50)).await;
1667 }
1668 })
1669 .await
1670 .expect("timed out waiting for initial connection");
1671
1672 assert!(handle.is_connected(), "should be connected initially");
1673
1674 tokio::time::timeout(Duration::from_secs(3), async {
1676 while handle.is_connected() {
1677 tokio::time::sleep(Duration::from_millis(50)).await;
1678 }
1679 })
1680 .await
1681 .expect("timed out waiting for is_connected to become false");
1682
1683 assert!(
1684 !handle.is_connected(),
1685 "should not be connected after server shutdown"
1686 );
1687
1688 server_handle.abort();
1689 let _ = std::fs::remove_file(&path);
1690 }
1691
1692 #[test]
1700 fn backoff_builder_produces_increasing_delays() {
1701 use backon::BackoffBuilder;
1702
1703 let builder = backon::ExponentialBuilder::default()
1704 .with_min_delay(Duration::from_secs(1))
1705 .with_max_delay(Duration::from_secs(30))
1706 .with_max_times(10)
1707 .with_jitter();
1708
1709 let mut attempts = builder.build();
1710 let mut delays = Vec::new();
1711 while let Some(d) = attempts.next() {
1712 delays.push(d);
1713 }
1714
1715 assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1716
1717 assert!(
1719 delays[0] >= Duration::from_millis(500),
1720 "first delay too short: {:?}",
1721 delays[0]
1722 );
1723 assert!(
1724 delays[0] <= Duration::from_secs(2),
1725 "first delay too long: {:?}",
1726 delays[0]
1727 );
1728
1729 assert!(
1731 delays[9] >= Duration::from_secs(15),
1732 "last delay should approach max: {:?}",
1733 delays[9]
1734 );
1735
1736 for d in &delays {
1738 assert!(
1739 *d <= Duration::from_secs(60),
1740 "delay exceeds max_delay + jitter margin: {:?}",
1741 d
1742 );
1743 }
1744
1745 assert!(
1747 attempts.next().is_none(),
1748 "should return None after 10 attempts"
1749 );
1750 }
1751
1752 #[ignore]
1759 #[tokio::test]
1760 async fn persistent_client_exits_after_max_retries() {
1761 let app = tauri::test::mock_app();
1762 let path = crate::desktop::test_helpers::unique_socket_path();
1763 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1764
1765 let exited = tokio::time::timeout(Duration::from_secs(180), async {
1769 loop {
1770 tokio::time::sleep(Duration::from_secs(5)).await;
1771 if let Err(e) = handle.is_running().await {
1772 if e.to_string().contains("shut down") {
1773 return;
1774 }
1775 }
1776 }
1777 })
1778 .await;
1779
1780 assert!(
1781 exited.is_ok(),
1782 "persistent client should exit after max retries"
1783 );
1784 assert!(!handle.is_connected(), "should not be connected after exit");
1785
1786 let _ = std::fs::remove_file(&path);
1787 }
1788
1789 #[tokio::test]
1793 async fn persistent_client_reconnects_after_server_restart() {
1794 use crate::desktop::ipc_server::IpcServer;
1795 use crate::manager::{manager_loop, ServiceFactory};
1796 use tokio_util::sync::CancellationToken;
1797
1798 let (path, shutdown1, _event_tx) = setup_server();
1800 let app = tauri::test::mock_app();
1801 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1802
1803 tokio::time::timeout(Duration::from_secs(2), async {
1805 while !handle.is_connected() {
1806 tokio::time::sleep(Duration::from_millis(50)).await;
1807 }
1808 })
1809 .await
1810 .expect("should connect to first server");
1811
1812 let result = handle.is_running().await;
1814 assert!(
1815 result.is_ok(),
1816 "command should succeed on first server: {:?}",
1817 result.err()
1818 );
1819
1820 shutdown1.cancel();
1822 tokio::time::sleep(Duration::from_millis(150)).await;
1823
1824 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1826 let factory: ServiceFactory<tauri::test::MockRuntime> =
1827 Box::new(|| Box::new(BlockingService));
1828 tokio::spawn(manager_loop(
1829 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1830 ));
1831 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1832 let shutdown2 = CancellationToken::new();
1833 let s2 = shutdown2.clone();
1834 tokio::spawn(async move { server2.run(s2).await });
1835
1836 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1839 loop {
1840 if handle.is_connected() {
1841 break;
1842 }
1843 tokio::time::sleep(Duration::from_millis(100)).await;
1844 }
1845 })
1846 .await;
1847
1848 assert!(
1849 reconnected.is_ok(),
1850 "persistent client should reconnect after server restart (backoff resets)"
1851 );
1852
1853 let result = handle.is_running().await;
1855 assert!(
1856 result.is_ok(),
1857 "commands should work after reconnection: {:?}",
1858 result.err()
1859 );
1860
1861 shutdown2.cancel();
1862 }
1863
1864 #[tokio::test]
1872 async fn ipc_client_rejects_zero_length_frame() {
1873 let path = crate::desktop::test_helpers::unique_socket_path();
1874 let listener = transport::bind(path.clone()).unwrap();
1875
1876 let server_handle = tokio::spawn(async move {
1878 let (mut stream, _) = listener.accept().await.unwrap();
1879 use tokio::io::AsyncWriteExt;
1880 stream.write_all(&[0u8; 4]).await.unwrap();
1881 tokio::time::sleep(Duration::from_millis(500)).await;
1882 });
1883
1884 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1885
1886 let result = client.read_frame().await;
1888 assert!(
1889 result.is_err(),
1890 "zero-length frame should return error, got {:?}",
1891 result
1892 );
1893 let err = result.unwrap_err().to_string();
1894 assert!(
1895 err.contains("zero-length frame"),
1896 "Error should mention 'zero-length frame': {err}"
1897 );
1898
1899 server_handle.abort();
1900 let _ = std::fs::remove_file(&path);
1901 }
1902
1903 #[tokio::test]
1906 async fn ipc_client_eof_returns_ok_none() {
1907 let path = crate::desktop::test_helpers::unique_socket_path();
1908 let listener = transport::bind(path.clone()).unwrap();
1909
1910 let server_handle = tokio::spawn(async move {
1912 let (stream, _) = listener.accept().await.unwrap();
1913 drop(stream);
1914 });
1915
1916 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1917 tokio::time::sleep(Duration::from_millis(20)).await;
1918
1919 let result = client.read_frame().await;
1921 assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1922 assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1923
1924 server_handle.abort();
1925 let _ = std::fs::remove_file(&path);
1926 }
1927
1928 #[tokio::test]
1934 async fn wait_for_connected_returns_immediately_when_connected() {
1935 let (path, shutdown, _event_tx) = setup_server();
1936 let app = tauri::test::mock_app();
1937 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1938
1939 tokio::time::timeout(Duration::from_secs(2), async {
1941 while !handle.is_connected() {
1942 tokio::time::sleep(Duration::from_millis(50)).await;
1943 }
1944 })
1945 .await
1946 .expect("should connect");
1947
1948 let result = handle
1950 .wait_for_connected(Duration::from_secs(5))
1951 .await
1952 .unwrap();
1953 assert!(result, "should return true when connected");
1954
1955 shutdown.cancel();
1956 }
1957
1958 #[tokio::test]
1960 async fn wait_for_connected_times_out_when_no_server() {
1961 let app = tauri::test::mock_app();
1962 let path = crate::desktop::test_helpers::unique_socket_path();
1963 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1964
1965 let result = handle
1967 .wait_for_connected(Duration::from_millis(200))
1968 .await
1969 .unwrap();
1970 assert!(!result, "should return false when no server and timeout");
1971
1972 let _ = std::fs::remove_file(&path);
1973 }
1974
1975 #[tokio::test]
1977 async fn wait_for_connected_succeeds_after_server_starts() {
1978 let (path, shutdown, _event_tx) = setup_server();
1979 let app = tauri::test::mock_app();
1980 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1981
1982 let result = handle
1984 .wait_for_connected(Duration::from_secs(5))
1985 .await
1986 .unwrap();
1987 assert!(result, "should connect within timeout");
1988
1989 shutdown.cancel();
1990 }
1991
1992 #[tokio::test]
1994 async fn send_and_read_multiple_interleaved_events() {
1995 let path = crate::desktop::test_helpers::unique_socket_path();
1996 let server = buffered_server(
1997 &path,
1998 vec![
1999 IpcMessage::Event(IpcEvent::Started),
2000 IpcMessage::Event(IpcEvent::Error {
2001 message: "warning".into(),
2002 }),
2003 IpcMessage::Event(IpcEvent::Stopped {
2004 reason: "cancelled".into(),
2005 }),
2006 IpcMessage::Response(IpcResponse {
2007 ok: true,
2008 data: Some(serde_json::json!({"running": false})),
2009 error: None,
2010 }),
2011 ],
2012 )
2013 .await;
2014
2015 let mut client = IpcClient::connect(path.clone()).await.unwrap();
2016 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
2017 assert!(response.ok, "response should be ok");
2018 assert_eq!(events.len(), 3, "should collect all three events");
2019 assert!(
2020 matches!(events[0], IpcEvent::Started),
2021 "first event should be Started"
2022 );
2023 assert!(
2024 matches!(events[1], IpcEvent::Error { .. }),
2025 "second event should be Error"
2026 );
2027 assert!(
2028 matches!(events[2], IpcEvent::Stopped { .. }),
2029 "third event should be Stopped"
2030 );
2031
2032 server.await.unwrap();
2033 let _ = std::fs::remove_file(&path);
2034 }
2035}