1use std::path::PathBuf;
10use std::sync::atomic::AtomicBool;
11use std::sync::Arc;
12use std::time::Duration;
13
14use tauri::{Emitter, Runtime};
15
16use crate::desktop::ipc::{
17 decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
18};
19use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
20use crate::error::ServiceError;
21#[cfg(test)]
22use crate::models::StopReason;
23use crate::models::{PluginEvent, ServiceStatus, StartConfig};
24
25pub struct IpcClient {
35 stream: TransportStream,
36}
37
38impl IpcClient {
39 pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
41 let stream = transport::connect(&path).await?;
42 Ok(Self { stream })
43 }
44
45 pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
47 let request = IpcRequest::Start { config };
48 let (response, _events) = self.send_and_read(&request).await?;
49 if response.ok {
50 Ok(())
51 } else {
52 Err(ServiceError::Ipc(
53 response.error.unwrap_or_else(|| "unknown error".into()),
54 ))
55 }
56 }
57
58 pub async fn stop(&mut self) -> Result<(), ServiceError> {
60 let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
61 if response.ok {
62 Ok(())
63 } else {
64 Err(ServiceError::Ipc(
65 response.error.unwrap_or_else(|| "unknown error".into()),
66 ))
67 }
68 }
69
70 pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
72 let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
73 if response.ok {
74 Ok(response
75 .data
76 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
77 .unwrap_or(false))
78 } else {
79 Err(ServiceError::Ipc(
80 response.error.unwrap_or_else(|| "unknown error".into()),
81 ))
82 }
83 }
84
85 pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
87 let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
88 if response.ok {
89 response
90 .data
91 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
92 .and_then(|d| {
93 serde_json::from_value::<ServiceStatus>(d)
94 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
95 })
96 } else {
97 Err(ServiceError::Ipc(
98 response.error.unwrap_or_else(|| "unknown error".into()),
99 ))
100 }
101 }
102
103 pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
107 let frame = match self.read_frame().await? {
108 Some(f) => f,
109 None => return Ok(None),
110 };
111 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
112 IpcMessage::Event(event) => Ok(Some(event)),
113 other => Err(ServiceError::Ipc(format!(
114 "expected event frame, got {:?}",
115 std::mem::discriminant(&other),
116 ))),
117 }
118 }
119
120 pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
125 tokio::spawn(async move {
126 loop {
127 match self.read_event().await {
128 Ok(Some(event)) => {
129 let plugin_event = ipc_event_to_plugin_event(event);
130 let _ = app.emit("background-service://event", plugin_event);
131 }
132 Ok(None) => break,
133 Err(_) => break,
134 }
135 }
136 });
137 }
138
139 async fn send_and_read(
142 &mut self,
143 request: &IpcRequest,
144 ) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
145 self.send_request(request).await?;
146 let mut events = Vec::new();
150 loop {
151 let frame = self
152 .read_frame()
153 .await?
154 .ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
155 match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
156 IpcMessage::Response(resp) => return Ok((resp, events)),
157 IpcMessage::Event(e) => {
158 events.push(e);
159 }
160 IpcMessage::Request(_) => {
161 return Err(ServiceError::Ipc("unexpected request frame".into()));
162 }
163 }
164 }
165 }
166
167 async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
168 let msg = IpcMessage::Request(request.clone());
169 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
170 transport::write_frame(&mut self.stream, &frame)
171 .await
172 .map_err(ServiceError::Ipc)?;
173 Ok(())
174 }
175
176 async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
181 transport::read_frame(&mut self.stream)
182 .await
183 .map_err(ServiceError::Ipc)
184 }
185}
186
187pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
189 match event {
190 IpcEvent::Started => PluginEvent::Started,
191 IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
192 IpcEvent::Error { message } => PluginEvent::Error { message },
193 }
194}
195
196enum IpcCommand {
200 Start {
201 config: StartConfig,
202 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
203 },
204 Stop {
205 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
206 },
207 IsRunning {
208 reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
209 },
210 GetState {
211 reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
212 },
213 EnableAutoRestart {
214 config: Option<StartConfig>,
215 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
216 },
217 DisableAutoRestart {
218 reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
219 },
220 GetDesiredState {
221 reply: tokio::sync::oneshot::Sender<
222 Result<Option<crate::desired_state::DesiredState>, ServiceError>,
223 >,
224 },
225 ValidateSetup {
226 reply: tokio::sync::oneshot::Sender<
227 Result<crate::models::SetupValidationReport, ServiceError>,
228 >,
229 },
230 GetLifecycleStatus {
231 reply: tokio::sync::oneshot::Sender<Result<crate::models::LifecycleStatus, ServiceError>>,
232 },
233}
234
235pub struct PersistentIpcClientHandle {
243 cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
244 shutdown: tokio_util::sync::CancellationToken,
245 connected: Arc<AtomicBool>,
246 socket_path: PathBuf,
247}
248
249impl Drop for PersistentIpcClientHandle {
250 fn drop(&mut self) {
251 self.shutdown.cancel();
252 }
253}
254
255impl PersistentIpcClientHandle {
256 pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
262 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
263 let shutdown = tokio_util::sync::CancellationToken::new();
264 let connected = Arc::new(AtomicBool::new(false));
265
266 tokio::spawn(persistent_client_loop(
267 socket_path.clone(),
268 app,
269 cmd_rx,
270 shutdown.clone(),
271 connected.clone(),
272 ));
273
274 Self {
275 cmd_tx,
276 shutdown,
277 connected,
278 socket_path,
279 }
280 }
281
282 pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
284 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
285 self.cmd_tx
286 .send(IpcCommand::Start {
287 config,
288 reply: reply_tx,
289 })
290 .await
291 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
292 reply_rx
293 .await
294 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
295 }
296
297 pub async fn stop(&self) -> Result<(), ServiceError> {
299 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
300 self.cmd_tx
301 .send(IpcCommand::Stop { reply: reply_tx })
302 .await
303 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
304 reply_rx
305 .await
306 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
307 }
308
309 pub async fn is_running(&self) -> Result<bool, ServiceError> {
311 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
312 self.cmd_tx
313 .send(IpcCommand::IsRunning { reply: reply_tx })
314 .await
315 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
316 reply_rx
317 .await
318 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
319 }
320
321 pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
323 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
324 self.cmd_tx
325 .send(IpcCommand::GetState { reply: reply_tx })
326 .await
327 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
328 reply_rx
329 .await
330 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
331 }
332
333 pub fn is_connected(&self) -> bool {
336 self.connected.load(std::sync::atomic::Ordering::Relaxed)
337 }
338
339 pub fn socket_path(&self) -> &PathBuf {
341 &self.socket_path
342 }
343
344 pub async fn wait_for_connected(&self, timeout: Duration) -> Result<bool, ServiceError> {
350 let deadline = tokio::time::Instant::now() + timeout;
351 let poll_interval = Duration::from_millis(500);
352
353 while tokio::time::Instant::now() < deadline {
354 if self.is_connected() {
355 return Ok(true);
356 }
357 let remaining = deadline - tokio::time::Instant::now();
358 let sleep_dur = poll_interval.min(remaining);
359 tokio::time::sleep(sleep_dur).await;
360 }
361
362 if self.is_connected() {
363 Ok(true)
364 } else {
365 Ok(false)
366 }
367 }
368
369 pub async fn enable_auto_restart(
371 &self,
372 config: Option<StartConfig>,
373 ) -> Result<(), ServiceError> {
374 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
375 self.cmd_tx
376 .send(IpcCommand::EnableAutoRestart {
377 config,
378 reply: reply_tx,
379 })
380 .await
381 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
382 reply_rx
383 .await
384 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
385 }
386
387 pub async fn disable_auto_restart(&self) -> Result<(), ServiceError> {
389 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
390 self.cmd_tx
391 .send(IpcCommand::DisableAutoRestart { reply: reply_tx })
392 .await
393 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
394 reply_rx
395 .await
396 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
397 }
398
399 pub async fn get_desired_state(
401 &self,
402 ) -> Result<Option<crate::desired_state::DesiredState>, ServiceError> {
403 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
404 self.cmd_tx
405 .send(IpcCommand::GetDesiredState { reply: reply_tx })
406 .await
407 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
408 reply_rx
409 .await
410 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
411 }
412
413 pub async fn validate_setup(
415 &self,
416 ) -> Result<crate::models::SetupValidationReport, ServiceError> {
417 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
418 self.cmd_tx
419 .send(IpcCommand::ValidateSetup { reply: reply_tx })
420 .await
421 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
422 reply_rx
423 .await
424 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
425 }
426
427 pub async fn get_lifecycle_status(
429 &self,
430 ) -> Result<crate::models::LifecycleStatus, ServiceError> {
431 let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
432 self.cmd_tx
433 .send(IpcCommand::GetLifecycleStatus { reply: reply_tx })
434 .await
435 .map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
436 reply_rx
437 .await
438 .map_err(|_| ServiceError::Ipc("command dropped".into()))?
439 }
440}
441
442async fn persistent_client_loop<R: Runtime>(
444 socket_path: PathBuf,
445 app: tauri::AppHandle<R>,
446 mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
447 shutdown: tokio_util::sync::CancellationToken,
448 connected: Arc<AtomicBool>,
449) {
450 use backon::BackoffBuilder;
451
452 let backoff_builder = backon::ExponentialBuilder::default()
453 .with_min_delay(Duration::from_secs(1))
454 .with_max_delay(Duration::from_secs(30))
455 .with_max_times(10)
456 .with_jitter();
457
458 let mut attempts = backoff_builder.build();
459
460 loop {
461 tokio::select! {
462 biased;
463 _ = shutdown.cancelled() => {
464 log::info!("Persistent IPC client shutting down");
465 connected.store(false, std::sync::atomic::Ordering::Relaxed);
466 break;
467 }
468 connect_result = transport::connect(&socket_path) => {
469 match connect_result {
470 Ok(stream) => {
471 log::info!("Persistent IPC client connected");
472 connected.store(true, std::sync::atomic::Ordering::Relaxed);
473 let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
474 attempts = backoff_builder.build();
476 if result.is_err() {
477 log::info!("Persistent IPC connection lost, reconnecting...");
478 connected.store(false, std::sync::atomic::Ordering::Relaxed);
479 }
480 }
481 Err(_) => {
482 log::debug!("Persistent IPC client: connection failed, retrying...");
483 connected.store(false, std::sync::atomic::Ordering::Relaxed);
484 }
485 }
486 let delay = match attempts.next() {
487 Some(d) => d,
488 None => {
489 log::warn!("Persistent IPC client: backoff exhausted, giving up");
490 break;
491 }
492 };
493 tokio::select! {
494 biased;
495 _ = shutdown.cancelled() => {
496 log::info!("Persistent IPC client shutting down");
497 connected.store(false, std::sync::atomic::Ordering::Relaxed);
498 break;
499 }
500 _ = tokio::time::sleep(delay) => {}
501 }
502 }
503 }
504 }
505}
506
507async fn run_persistent_connection<R: Runtime>(
514 stream: TransportStream,
515 app: &tauri::AppHandle<R>,
516 cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
517 connected: &Arc<AtomicBool>,
518) -> Result<(), ServiceError> {
519 let (read_half, mut write_half) = transport::split(stream);
520
521 let response_slot: std::sync::Arc<
523 tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
524 > = std::sync::Arc::new(tokio::sync::Mutex::new(None));
525
526 let slot_writer = response_slot.clone();
527 let app_clone = app.clone();
528 let connected_reader = connected.clone();
529
530 let reader_handle = tokio::spawn(async move {
532 let mut read_half = read_half;
533 loop {
534 let frame = match read_frame_from(&mut read_half).await {
535 Ok(Some(f)) => f,
536 Ok(None) => break, Err(_) => break,
538 };
539
540 match decode_frame(&frame) {
541 Ok(IpcMessage::Response(resp)) => {
542 let mut slot = slot_writer.lock().await;
543 if let Some(sender) = slot.take() {
544 let _ = sender.send(resp);
545 }
546 continue;
547 }
548 Ok(IpcMessage::Event(event)) => {
549 let plugin_event = ipc_event_to_plugin_event(event);
550 let _ = app_clone.emit("background-service://event", plugin_event);
551 continue;
552 }
553 Ok(IpcMessage::Request(_)) => {
554 log::warn!("unexpected request frame on client connection");
555 continue;
556 }
557 Err(e) => {
558 log::debug!("failed to decode IPC frame: {e}");
559 continue;
560 }
561 }
562 }
563 connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
565 });
566
567 let result = loop {
569 tokio::select! {
570 cmd = cmd_rx.recv() => {
571 let cmd = match cmd {
572 Some(c) => c,
573 None => break Err(ServiceError::Ipc("command channel closed".into())),
574 };
575
576 match cmd {
577 IpcCommand::Start { config, reply } => {
578 let request = IpcRequest::Start { config };
579 let rx = prepare_response_slot(&response_slot).await;
580 if let Err(e) = send_request_to(&mut write_half, &request).await {
581 let _ = reply.send(Err(e));
582 break Err(ServiceError::Ipc("send failed".into()));
583 }
584 let response = await_response(rx).await;
585 let result = match response {
586 Ok(resp) if resp.ok => Ok(()),
587 Ok(resp) => Err(ServiceError::Ipc(
588 resp.error.unwrap_or_else(|| "unknown error".into()),
589 )),
590 Err(e) => Err(e),
591 };
592 let _ = reply.send(result);
593 }
594 IpcCommand::Stop { reply } => {
595 let rx = prepare_response_slot(&response_slot).await;
596 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
597 let _ = reply.send(Err(e));
598 break Err(ServiceError::Ipc("send failed".into()));
599 }
600 let response = await_response(rx).await;
601 let result = match response {
602 Ok(resp) if resp.ok => Ok(()),
603 Ok(resp) => Err(ServiceError::Ipc(
604 resp.error.unwrap_or_else(|| "unknown error".into()),
605 )),
606 Err(e) => Err(e),
607 };
608 let _ = reply.send(result);
609 }
610 IpcCommand::IsRunning { reply } => {
611 let rx = prepare_response_slot(&response_slot).await;
612 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
613 let _ = reply.send(Err(e));
614 break Err(ServiceError::Ipc("send failed".into()));
615 }
616 let response = await_response(rx).await;
617 let result = match response {
618 Ok(resp) if resp.ok => Ok(resp
619 .data
620 .and_then(|d| d.get("running").and_then(|v| v.as_bool()))
621 .unwrap_or(false)),
622 Ok(resp) => Err(ServiceError::Ipc(
623 resp.error.unwrap_or_else(|| "unknown error".into()),
624 )),
625 Err(e) => Err(e),
626 };
627 let _ = reply.send(result);
628 }
629 IpcCommand::GetState { reply } => {
630 let rx = prepare_response_slot(&response_slot).await;
631 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
632 let _ = reply.send(Err(e));
633 break Err(ServiceError::Ipc("send failed".into()));
634 }
635 let response = await_response(rx).await;
636 let result = match response {
637 Ok(resp) if resp.ok => resp
638 .data
639 .ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
640 .and_then(|d| {
641 serde_json::from_value::<ServiceStatus>(d)
642 .map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
643 }),
644 Ok(resp) => Err(ServiceError::Ipc(
645 resp.error.unwrap_or_else(|| "unknown error".into()),
646 )),
647 Err(e) => Err(e),
648 };
649 let _ = reply.send(result);
650 }
651 IpcCommand::EnableAutoRestart { config, reply } => {
652 let request = IpcRequest::EnableAutoRestart { config };
653 let rx = prepare_response_slot(&response_slot).await;
654 if let Err(e) = send_request_to(&mut write_half, &request).await {
655 let _ = reply.send(Err(e));
656 break Err(ServiceError::Ipc("send failed".into()));
657 }
658 let response = await_response(rx).await;
659 let result = match response {
660 Ok(resp) if resp.ok => Ok(()),
661 Ok(resp) => Err(ServiceError::Ipc(
662 resp.error.unwrap_or_else(|| "unknown error".into()),
663 )),
664 Err(e) => Err(e),
665 };
666 let _ = reply.send(result);
667 }
668 IpcCommand::DisableAutoRestart { reply } => {
669 let rx = prepare_response_slot(&response_slot).await;
670 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::DisableAutoRestart).await {
671 let _ = reply.send(Err(e));
672 break Err(ServiceError::Ipc("send failed".into()));
673 }
674 let response = await_response(rx).await;
675 let result = match response {
676 Ok(resp) if resp.ok => Ok(()),
677 Ok(resp) => Err(ServiceError::Ipc(
678 resp.error.unwrap_or_else(|| "unknown error".into()),
679 )),
680 Err(e) => Err(e),
681 };
682 let _ = reply.send(result);
683 }
684 IpcCommand::GetDesiredState { reply } => {
685 let rx = prepare_response_slot(&response_slot).await;
686 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetDesiredState).await {
687 let _ = reply.send(Err(e));
688 break Err(ServiceError::Ipc("send failed".into()));
689 }
690 let response = await_response(rx).await;
691 let result = match response {
692 Ok(resp) if resp.ok => {
693 match resp.data {
694 Some(d) => serde_json::from_value::<crate::desired_state::DesiredState>(d)
695 .map(Some)
696 .map_err(|e| ServiceError::Ipc(format!("deserialize GetDesiredState: {e}"))),
697 None => Ok(None),
698 }
699 }
700 Ok(resp) => Err(ServiceError::Ipc(
701 resp.error.unwrap_or_else(|| "unknown error".into()),
702 )),
703 Err(e) => Err(e),
704 };
705 let _ = reply.send(result);
706 }
707 IpcCommand::ValidateSetup { reply } => {
708 let rx = prepare_response_slot(&response_slot).await;
709 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::ValidateSetup).await {
710 let _ = reply.send(Err(e));
711 break Err(ServiceError::Ipc("send failed".into()));
712 }
713 let response = await_response(rx).await;
714 let result = match response {
715 Ok(resp) if resp.ok => {
716 match resp.data {
717 Some(d) => serde_json::from_value::<crate::models::SetupValidationReport>(d)
718 .map_err(|e| ServiceError::Ipc(format!("deserialize ValidateSetup: {e}"))),
719 None => Err(ServiceError::Ipc("missing ValidateSetup response data".into())),
720 }
721 }
722 Ok(resp) => Err(ServiceError::Ipc(
723 resp.error.unwrap_or_else(|| "unknown error".into()),
724 )),
725 Err(e) => Err(e),
726 };
727 let _ = reply.send(result);
728 }
729 IpcCommand::GetLifecycleStatus { reply } => {
730 let rx = prepare_response_slot(&response_slot).await;
731 if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetLifecycleStatus).await {
732 let _ = reply.send(Err(e));
733 break Err(ServiceError::Ipc("send failed".into()));
734 }
735 let response = await_response(rx).await;
736 let result = match response {
737 Ok(resp) if resp.ok => {
738 match resp.data {
739 Some(d) => serde_json::from_value::<crate::models::LifecycleStatus>(d)
740 .map_err(|e| ServiceError::Ipc(format!("deserialize GetLifecycleStatus: {e}"))),
741 None => Err(ServiceError::Ipc("missing GetLifecycleStatus response data".into())),
742 }
743 }
744 Ok(resp) => Err(ServiceError::Ipc(
745 resp.error.unwrap_or_else(|| "unknown error".into()),
746 )),
747 Err(e) => Err(e),
748 };
749 let _ = reply.send(result);
750 }
751 }
752 }
753 _ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
754 if reader_handle.is_finished() {
756 break Err(ServiceError::Ipc("reader task died".into()));
757 }
758 }
759 }
760 };
761
762 reader_handle.abort();
763 result
764}
765
766async fn send_request_to(
768 write_half: &mut TransportWriteHalf,
769 request: &IpcRequest,
770) -> Result<(), ServiceError> {
771 let msg = IpcMessage::Request(request.clone());
772 let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
773 transport::write_frame(write_half, &frame)
774 .await
775 .map_err(ServiceError::Ipc)?;
776 Ok(())
777}
778
779async fn prepare_response_slot(
787 slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
788) -> tokio::sync::oneshot::Receiver<IpcResponse> {
789 let (tx, rx) = tokio::sync::oneshot::channel();
790 let mut guard = slot.lock().await;
791 debug_assert!(
792 guard.is_none(),
793 "response slot overwritten — sequential command invariant violated"
794 );
795 *guard = Some(tx);
796 rx
797}
798
799async fn await_response(
804 rx: tokio::sync::oneshot::Receiver<IpcResponse>,
805) -> Result<IpcResponse, ServiceError> {
806 tokio::select! {
807 response = rx => {
808 response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
809 }
810 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
811 Err(ServiceError::Ipc("response timeout".into()))
812 }
813 }
814}
815
816async fn read_frame_from(
820 read_half: &mut TransportReadHalf,
821) -> Result<Option<Vec<u8>>, ServiceError> {
822 transport::read_frame(read_half)
823 .await
824 .map_err(ServiceError::Ipc)
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830 use crate::desktop::test_helpers::{
831 setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
832 };
833 use std::sync::atomic::Ordering;
834 use std::time::Duration;
835 use tauri::Listener;
836
837 #[tokio::test]
840 async fn ipc_client_connect() {
841 let (path, shutdown, _event_tx) = setup_server();
842 let result = IpcClient::connect(path).await;
843 assert!(result.is_ok(), "client should connect: {:?}", result.err());
844 shutdown.cancel();
845 }
846
847 #[tokio::test]
850 async fn ipc_client_send_start() {
851 let (path, shutdown, _event_tx) = setup_server();
852 let mut client = IpcClient::connect(path).await.unwrap();
853 let result = client.start(StartConfig::default()).await;
854 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
855 shutdown.cancel();
856 }
857
858 #[tokio::test]
861 async fn ipc_client_send_stop() {
862 let (path, shutdown, _event_tx) = setup_server();
863 let mut client = IpcClient::connect(path).await.unwrap();
864 client.start(StartConfig::default()).await.unwrap();
865 let result = client.stop().await;
866 assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
867 shutdown.cancel();
868 }
869
870 #[tokio::test]
873 async fn ipc_client_is_running() {
874 let (path, shutdown, _event_tx) = setup_server();
875 let mut client = IpcClient::connect(path).await.unwrap();
876
877 let running = client.is_running().await.unwrap();
878 assert!(!running, "should not be running initially");
879
880 client.start(StartConfig::default()).await.unwrap();
881 let running = client.is_running().await.unwrap();
882 assert!(running, "should be running after start");
883
884 shutdown.cancel();
885 }
886
887 #[tokio::test]
890 async fn ipc_client_get_state_initial() {
891 let (path, shutdown, _event_tx) = setup_server();
892 let mut client = IpcClient::connect(path).await.unwrap();
893
894 let status = client.get_state().await.unwrap();
895 assert!(
896 matches!(status.state, crate::models::ServiceState::Idle),
897 "expected Idle, got {:?}",
898 status.state
899 );
900 assert_eq!(status.last_error, None);
901
902 shutdown.cancel();
903 }
904
905 #[tokio::test]
906 async fn ipc_client_get_state_after_start() {
907 let (path, shutdown, _event_tx) = setup_server();
908 let mut client = IpcClient::connect(path).await.unwrap();
909
910 client.start(StartConfig::default()).await.unwrap();
911
912 let status = tokio::time::timeout(Duration::from_secs(2), async {
915 loop {
916 let s = client.get_state().await.unwrap();
917 if matches!(s.state, crate::models::ServiceState::Running) {
918 return s;
919 }
920 tokio::time::sleep(Duration::from_millis(10)).await;
921 }
922 })
923 .await
924 .expect("timed out waiting for Running state");
925 assert_eq!(status.last_error, None);
926
927 shutdown.cancel();
928 }
929
930 #[tokio::test]
931 async fn ipc_client_get_state_after_stop() {
932 let (path, shutdown, _event_tx) = setup_server();
933 let mut client = IpcClient::connect(path).await.unwrap();
934
935 client.start(StartConfig::default()).await.unwrap();
936 client.stop().await.unwrap();
937 let status = client.get_state().await.unwrap();
938 assert!(
939 matches!(status.state, crate::models::ServiceState::Stopped),
940 "expected Stopped, got {:?}",
941 status.state
942 );
943
944 shutdown.cancel();
945 }
946
947 #[tokio::test]
950 async fn ipc_client_receive_events() {
951 let (path, shutdown, event_tx) =
952 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
953 let mut client = IpcClient::connect(path).await.unwrap();
954 client.start(StartConfig::default()).await.unwrap();
955
956 let _ = event_tx.send(IpcEvent::Started);
958
959 let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
960 .await
961 .expect("timed out waiting for event")
962 .expect("read_event failed");
963
964 assert!(event.is_some(), "should receive an event");
965 let event = event.unwrap();
966 assert!(
967 matches!(event, IpcEvent::Started),
968 "Expected Started event, got {:?}",
969 event
970 );
971
972 shutdown.cancel();
973 }
974
975 #[tokio::test]
978 async fn ipc_client_stop_when_not_running() {
979 let (path, shutdown, _event_tx) = setup_server();
980 let mut client = IpcClient::connect(path).await.unwrap();
981 let result = client.stop().await;
982 assert!(result.is_err(), "stop when not running should fail");
983 shutdown.cancel();
984 }
985
986 #[tokio::test]
989 async fn ipc_client_connect_to_nonexistent() {
990 let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
991 let result = IpcClient::connect(path).await;
992 assert!(
993 result.is_err(),
994 "should fail to connect to nonexistent socket"
995 );
996 }
997
998 #[test]
1001 fn ipc_event_to_plugin_event_started() {
1002 let event = IpcEvent::Started;
1003 let plugin = ipc_event_to_plugin_event(event);
1004 assert!(matches!(plugin, PluginEvent::Started));
1005 }
1006
1007 #[test]
1008 fn ipc_event_to_plugin_event_stopped() {
1009 let event = IpcEvent::Stopped {
1010 reason: StopReason::UserStop,
1011 };
1012 let plugin = ipc_event_to_plugin_event(event);
1013 match plugin {
1014 PluginEvent::Stopped { reason } => assert_eq!(reason, StopReason::UserStop),
1015 other => panic!("Expected Stopped, got {other:?}"),
1016 }
1017 }
1018
1019 #[test]
1020 fn ipc_event_to_plugin_event_error() {
1021 let event = IpcEvent::Error {
1022 message: "init failed".into(),
1023 };
1024 let plugin = ipc_event_to_plugin_event(event);
1025 match plugin {
1026 PluginEvent::Error { message } => assert_eq!(message, "init failed"),
1027 other => panic!("Expected Error, got {other:?}"),
1028 }
1029 }
1030
1031 #[tokio::test]
1034 async fn ipc_client_full_lifecycle() {
1035 let (path, shutdown, _event_tx) = setup_server();
1036 let mut client = IpcClient::connect(path).await.unwrap();
1037
1038 assert!(!client.is_running().await.unwrap());
1039 client.start(StartConfig::default()).await.unwrap();
1040 assert!(client.is_running().await.unwrap());
1041 client.stop().await.unwrap();
1042 assert!(!client.is_running().await.unwrap());
1043
1044 shutdown.cancel();
1045 }
1046
1047 #[tokio::test]
1050 async fn ipc_client_listen_events() {
1051 let (path, shutdown, event_tx) =
1052 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1053 let app = tauri::test::mock_app();
1054
1055 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1056 let received_clone = received.clone();
1057 app.listen("background-service://event", move |_event| {
1058 received_clone.store(true, Ordering::SeqCst);
1059 });
1060
1061 let mut client = IpcClient::connect(path).await.unwrap();
1062 client.start(StartConfig::default()).await.unwrap();
1063 client.listen_events(app.handle().clone());
1064
1065 let _ = event_tx.send(IpcEvent::Started);
1067
1068 tokio::time::timeout(Duration::from_millis(500), async {
1069 while !received.load(Ordering::SeqCst) {
1070 tokio::time::sleep(Duration::from_millis(10)).await;
1071 }
1072 })
1073 .await
1074 .expect("timed out waiting for event via listen_events");
1075
1076 assert!(
1077 received.load(Ordering::SeqCst),
1078 "should have received event"
1079 );
1080 shutdown.cancel();
1081 }
1082
1083 #[tokio::test]
1095 async fn ipc_loopback_full_lifecycle_with_events() {
1096 let (path, shutdown, event_tx) = setup_server();
1097 let mut client = IpcClient::connect(path).await.unwrap();
1098
1099 assert!(
1101 !client.is_running().await.unwrap(),
1102 "should not be running initially"
1103 );
1104
1105 client
1107 .start(StartConfig::default())
1108 .await
1109 .expect("start should succeed");
1110
1111 let _ = event_tx.send(IpcEvent::Started);
1113
1114 let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1117 .await
1118 .expect("timed out waiting for Started event")
1119 .expect("read_event failed")
1120 .expect("should receive event");
1121 assert!(
1122 matches!(started, IpcEvent::Started),
1123 "Expected Started event, got {started:?}"
1124 );
1125
1126 assert!(
1128 client.is_running().await.unwrap(),
1129 "should be running after start"
1130 );
1131
1132 client.stop().await.expect("stop should succeed");
1134
1135 let _ = event_tx.send(IpcEvent::Stopped {
1137 reason: StopReason::UserStop,
1138 });
1139
1140 let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1142 .await
1143 .expect("timed out waiting for Stopped event")
1144 .expect("read_event failed")
1145 .expect("should receive event");
1146 assert!(
1147 matches!(stopped, IpcEvent::Stopped { .. }),
1148 "Expected Stopped event, got {stopped:?}"
1149 );
1150
1151 assert!(
1153 !client.is_running().await.unwrap(),
1154 "should not be running after stop"
1155 );
1156
1157 shutdown.cancel();
1158 }
1159
1160 #[tokio::test]
1164 async fn ipc_loopback_event_streaming_plugin_event_conversion() {
1165 let (path, shutdown, event_tx) = setup_server();
1166 let mut client = IpcClient::connect(path).await.unwrap();
1167
1168 client.start(StartConfig::default()).await.unwrap();
1170 let _ = event_tx.send(IpcEvent::Started);
1171 let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1172 .await
1173 .expect("timed out")
1174 .expect("read_event failed")
1175 .expect("should receive event");
1176 let started_plugin = ipc_event_to_plugin_event(started_ipc);
1177 assert!(
1178 matches!(started_plugin, PluginEvent::Started),
1179 "Expected PluginEvent::Started, got {started_plugin:?}"
1180 );
1181
1182 client.stop().await.unwrap();
1184 let _ = event_tx.send(IpcEvent::Stopped {
1185 reason: StopReason::UserStop,
1186 });
1187 let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
1188 .await
1189 .expect("timed out")
1190 .expect("read_event failed")
1191 .expect("should receive event");
1192 let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
1193 match stopped_plugin {
1194 PluginEvent::Stopped { reason } => {
1195 assert_eq!(reason, StopReason::UserStop, "Expected UserStop reason");
1196 }
1197 other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
1198 }
1199
1200 shutdown.cancel();
1201 }
1202
1203 #[tokio::test]
1208 async fn ipc_loopback_connection_drop_returns_error() {
1209 let path = crate::desktop::test_helpers::unique_socket_path();
1210
1211 let listener = transport::bind(path.clone()).unwrap();
1213 let path_clone = path.clone();
1214
1215 let client_handle =
1216 tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
1217
1218 let (server_stream, _) = listener.accept().await.unwrap();
1220 drop(server_stream);
1221 tokio::time::sleep(Duration::from_millis(20)).await;
1222
1223 let mut client = client_handle.await.unwrap();
1224
1225 let result = client.is_running().await;
1227 assert!(
1228 result.is_err(),
1229 "should get error after server drops connection"
1230 );
1231
1232 let _ = std::fs::remove_file(&path);
1233 }
1234
1235 #[tokio::test]
1239 async fn ipc_loopback_double_start_returns_error() {
1240 let (path, shutdown, _event_tx) = setup_server();
1241 let mut client = IpcClient::connect(path).await.unwrap();
1242
1243 client.start(StartConfig::default()).await.unwrap();
1244
1245 let result = client.start(StartConfig::default()).await;
1246 assert!(result.is_err(), "double start should return error");
1247 let err_msg = result.unwrap_err().to_string();
1248 assert!(
1249 err_msg.to_lowercase().contains("already"),
1250 "Error should mention 'already': {err_msg}"
1251 );
1252
1253 shutdown.cancel();
1254 }
1255
1256 #[tokio::test]
1265 async fn persistent_client_connects() {
1266 let (path, shutdown, _event_tx) = setup_server();
1267 let app = tauri::test::mock_app();
1268
1269 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1270
1271 tokio::time::sleep(Duration::from_millis(100)).await;
1273
1274 let running = handle.is_running().await;
1276 assert!(
1277 running.is_ok(),
1278 "should get response via persistent connection: {:?}",
1279 running.err()
1280 );
1281 assert!(!running.unwrap(), "should not be running initially");
1282
1283 shutdown.cancel();
1284 }
1285
1286 #[tokio::test]
1290 async fn persistent_client_reconnects() {
1291 use crate::desktop::ipc_server::IpcServer;
1292 use crate::manager::{manager_loop, ServiceFactory};
1293 use tokio_util::sync::CancellationToken;
1294
1295 let (path, shutdown1, _event_tx) = setup_server();
1297 let app = tauri::test::mock_app();
1298
1299 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1300
1301 tokio::time::sleep(Duration::from_millis(100)).await;
1303 let result = handle.is_running().await;
1304 assert!(
1305 result.is_ok(),
1306 "should connect to first server: {:?}",
1307 result.err()
1308 );
1309
1310 shutdown1.cancel();
1312 tokio::time::sleep(Duration::from_millis(150)).await;
1313
1314 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1316 let factory: ServiceFactory<tauri::test::MockRuntime> =
1317 Box::new(|| Box::new(BlockingService));
1318 tokio::spawn(manager_loop(
1319 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1320 ));
1321 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1322 let shutdown2 = CancellationToken::new();
1323 let s2 = shutdown2.clone();
1324 tokio::spawn(async move { server2.run(s2).await });
1325
1326 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1328 loop {
1329 tokio::time::sleep(Duration::from_millis(200)).await;
1330 if handle.is_running().await.is_ok() {
1331 break;
1332 }
1333 }
1334 })
1335 .await;
1336 assert!(
1337 reconnected.is_ok(),
1338 "persistent client should reconnect to second server"
1339 );
1340
1341 shutdown2.cancel();
1342 }
1343
1344 #[tokio::test]
1349 async fn event_relay() {
1350 let (path, shutdown, event_tx) =
1351 setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
1352 let app = tauri::test::mock_app();
1353
1354 let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1355 let received_clone = received.clone();
1356 app.listen("background-service://event", move |_event| {
1357 received_clone.store(true, Ordering::SeqCst);
1358 });
1359
1360 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1361
1362 let result = handle.start(StartConfig::default()).await;
1364 assert!(result.is_ok(), "start should succeed: {:?}", result.err());
1365
1366 let _ = event_tx.send(IpcEvent::Started);
1368
1369 tokio::time::timeout(Duration::from_millis(500), async {
1371 while !received.load(Ordering::SeqCst) {
1372 tokio::time::sleep(Duration::from_millis(10)).await;
1373 }
1374 })
1375 .await
1376 .expect("timed out waiting for event relay via app.emit()");
1377
1378 assert!(
1379 received.load(Ordering::SeqCst),
1380 "event should be relayed through app.emit()"
1381 );
1382
1383 shutdown.cancel();
1384 }
1385
1386 #[tokio::test]
1391 async fn start_stop_lifecycle() {
1392 let (path, shutdown, _event_tx) = setup_server();
1393 let app = tauri::test::mock_app();
1394
1395 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1396
1397 let running = handle.is_running().await.unwrap();
1399 assert!(!running, "should not be running initially");
1400
1401 handle
1403 .start(StartConfig::default())
1404 .await
1405 .expect("start should succeed");
1406 let running = handle.is_running().await.unwrap();
1407 assert!(running, "should be running after start");
1408
1409 handle.stop().await.expect("stop should succeed");
1411 let running = handle.is_running().await.unwrap();
1412 assert!(!running, "should not be running after stop");
1413
1414 shutdown.cancel();
1415 }
1416
1417 #[tokio::test]
1420 async fn persistent_client_get_state() {
1421 let (path, shutdown, _event_tx) = setup_server();
1422 let app = tauri::test::mock_app();
1423
1424 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1425
1426 tokio::time::sleep(Duration::from_millis(100)).await;
1428
1429 let status = handle.get_state().await.unwrap();
1430 assert!(
1431 matches!(status.state, crate::models::ServiceState::Idle),
1432 "expected Idle, got {:?}",
1433 status.state
1434 );
1435
1436 handle.start(StartConfig::default()).await.unwrap();
1437
1438 let status = tokio::time::timeout(Duration::from_secs(2), async {
1441 loop {
1442 let s = handle.get_state().await.unwrap();
1443 if matches!(s.state, crate::models::ServiceState::Running) {
1444 return s;
1445 }
1446 tokio::time::sleep(Duration::from_millis(10)).await;
1447 }
1448 })
1449 .await
1450 .expect("timed out waiting for Running state");
1451 assert!(
1452 matches!(status.state, crate::models::ServiceState::Running),
1453 "expected Running, got {:?}",
1454 status.state
1455 );
1456
1457 shutdown.cancel();
1458 }
1459
1460 #[tokio::test]
1469 async fn persistent_client_timeout_on_unresponsive_server() {
1470 let path = crate::desktop::test_helpers::unique_socket_path();
1471 let listener = transport::bind(path.clone()).unwrap();
1472
1473 let server_handle = tokio::spawn(async move {
1475 let (_stream, _) = listener.accept().await.unwrap();
1476 tokio::time::sleep(Duration::from_secs(60)).await;
1478 });
1479
1480 let app = tauri::test::mock_app();
1481 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1482
1483 tokio::time::sleep(Duration::from_millis(100)).await;
1485
1486 let result = tokio::time::timeout(
1488 Duration::from_secs(15),
1489 handle.start(StartConfig::default()),
1490 )
1491 .await;
1492
1493 assert!(
1494 result.is_ok(),
1495 "start should not hang — expected error, got outer timeout"
1496 );
1497 let inner = result.unwrap();
1498 assert!(
1499 inner.is_err(),
1500 "start should return error when server is unresponsive"
1501 );
1502
1503 server_handle.abort();
1504 let _ = std::fs::remove_file(&path);
1505 }
1506
1507 #[tokio::test]
1513 async fn persistent_client_terminates_on_handle_drop() {
1514 let (path, shutdown, _event_tx) = setup_server();
1515 let app = tauri::test::mock_app();
1516
1517 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1518
1519 tokio::time::sleep(Duration::from_millis(100)).await;
1521
1522 drop(handle);
1524
1525 tokio::time::sleep(Duration::from_secs(2)).await;
1530
1531 shutdown.cancel();
1532 }
1533
1534 async fn buffered_server(
1541 path: &std::path::Path,
1542 frames: Vec<IpcMessage>,
1543 ) -> tokio::task::JoinHandle<()> {
1544 let listener = transport::bind(path.to_path_buf()).unwrap();
1545 tokio::spawn(async move {
1546 let (mut stream, _) = listener.accept().await.unwrap();
1547 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1548 let mut len_buf = [0u8; 4];
1550 if stream.read_exact(&mut len_buf).await.is_err() {
1551 return;
1552 }
1553 let len = u32::from_be_bytes(len_buf) as usize;
1554 let mut payload = vec![0u8; len];
1555 if stream.read_exact(&mut payload).await.is_err() {
1556 return;
1557 }
1558 for msg in &frames {
1560 let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
1561 if stream.write_all(&frame).await.is_err() {
1562 return;
1563 }
1564 }
1565 })
1566 }
1567
1568 #[tokio::test]
1570 async fn send_and_read_no_interleaved_events() {
1571 let path = crate::desktop::test_helpers::unique_socket_path();
1572 let server = buffered_server(
1573 &path,
1574 vec![IpcMessage::Response(IpcResponse {
1575 ok: true,
1576 data: None,
1577 error: None,
1578 })],
1579 )
1580 .await;
1581
1582 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1583 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
1584 assert!(response.ok, "response should be ok");
1585 assert!(
1586 events.is_empty(),
1587 "events should be empty when no events interleave, got {:?}",
1588 events
1589 );
1590
1591 server.await.unwrap();
1592 let _ = std::fs::remove_file(&path);
1593 }
1594
1595 #[tokio::test]
1597 async fn send_and_read_single_interleaved_event() {
1598 let path = crate::desktop::test_helpers::unique_socket_path();
1599 let server = buffered_server(
1600 &path,
1601 vec![
1602 IpcMessage::Event(IpcEvent::Started),
1603 IpcMessage::Response(IpcResponse {
1604 ok: true,
1605 data: None,
1606 error: None,
1607 }),
1608 ],
1609 )
1610 .await;
1611
1612 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1613 let (response, events) = client
1614 .send_and_read(&IpcRequest::Start {
1615 config: StartConfig::default(),
1616 })
1617 .await
1618 .unwrap();
1619 assert!(response.ok, "response should be ok");
1620 assert_eq!(events.len(), 1, "should collect exactly one event");
1621 assert!(
1622 matches!(events[0], IpcEvent::Started),
1623 "expected Started event, got {:?}",
1624 events[0]
1625 );
1626
1627 server.await.unwrap();
1628 let _ = std::fs::remove_file(&path);
1629 }
1630
1631 #[tokio::test]
1638 async fn is_connected_false_before_server() {
1639 let app = tauri::test::mock_app();
1640 let path = crate::desktop::test_helpers::unique_socket_path();
1641 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1643 tokio::time::sleep(Duration::from_millis(50)).await;
1646 assert!(
1647 !handle.is_connected(),
1648 "should not be connected when no server is running"
1649 );
1650 let _ = std::fs::remove_file(&path);
1651 }
1652
1653 #[tokio::test]
1656 async fn is_connected_true_after_connect() {
1657 let (path, shutdown, _event_tx) = setup_server();
1658 let app = tauri::test::mock_app();
1659 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1660
1661 tokio::time::timeout(Duration::from_secs(2), async {
1663 while !handle.is_connected() {
1664 tokio::time::sleep(Duration::from_millis(50)).await;
1665 }
1666 })
1667 .await
1668 .expect("timed out waiting for is_connected to become true");
1669
1670 assert!(
1671 handle.is_connected(),
1672 "should be connected after server is up"
1673 );
1674
1675 shutdown.cancel();
1676 }
1677
1678 #[tokio::test]
1684 async fn is_connected_false_after_server_shutdown() {
1685 let path = crate::desktop::test_helpers::unique_socket_path();
1686 let path_clone = path.clone();
1687 let listener = transport::bind(path.clone()).unwrap();
1688
1689 let server_handle = tokio::spawn(async move {
1692 let (stream, _) = listener.accept().await.unwrap();
1693 tokio::time::sleep(Duration::from_millis(200)).await;
1695 drop(stream);
1697 let _ = std::fs::remove_file(&path_clone);
1699 });
1700
1701 let app = tauri::test::mock_app();
1702 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1703
1704 tokio::time::timeout(Duration::from_secs(2), async {
1706 while !handle.is_connected() {
1707 tokio::time::sleep(Duration::from_millis(50)).await;
1708 }
1709 })
1710 .await
1711 .expect("timed out waiting for initial connection");
1712
1713 assert!(handle.is_connected(), "should be connected initially");
1714
1715 tokio::time::timeout(Duration::from_secs(3), async {
1717 while handle.is_connected() {
1718 tokio::time::sleep(Duration::from_millis(50)).await;
1719 }
1720 })
1721 .await
1722 .expect("timed out waiting for is_connected to become false");
1723
1724 assert!(
1725 !handle.is_connected(),
1726 "should not be connected after server shutdown"
1727 );
1728
1729 server_handle.abort();
1730 let _ = std::fs::remove_file(&path);
1731 }
1732
1733 #[test]
1741 fn backoff_builder_produces_increasing_delays() {
1742 use backon::BackoffBuilder;
1743
1744 let builder = backon::ExponentialBuilder::default()
1745 .with_min_delay(Duration::from_secs(1))
1746 .with_max_delay(Duration::from_secs(30))
1747 .with_max_times(10)
1748 .with_jitter();
1749
1750 let mut attempts = builder.build();
1751 let mut delays = Vec::new();
1752 for d in attempts.by_ref() {
1753 delays.push(d);
1754 }
1755
1756 assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
1757
1758 assert!(
1760 delays[0] >= Duration::from_millis(500),
1761 "first delay too short: {:?}",
1762 delays[0]
1763 );
1764 assert!(
1765 delays[0] <= Duration::from_secs(2),
1766 "first delay too long: {:?}",
1767 delays[0]
1768 );
1769
1770 assert!(
1772 delays[9] >= Duration::from_secs(15),
1773 "last delay should approach max: {:?}",
1774 delays[9]
1775 );
1776
1777 for d in &delays {
1779 assert!(
1780 *d <= Duration::from_secs(60),
1781 "delay exceeds max_delay + jitter margin: {:?}",
1782 d
1783 );
1784 }
1785
1786 assert!(
1788 attempts.next().is_none(),
1789 "should return None after 10 attempts"
1790 );
1791 }
1792
1793 #[ignore]
1800 #[tokio::test]
1801 async fn persistent_client_exits_after_max_retries() {
1802 let app = tauri::test::mock_app();
1803 let path = crate::desktop::test_helpers::unique_socket_path();
1804 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1805
1806 let exited = tokio::time::timeout(Duration::from_secs(180), async {
1810 loop {
1811 tokio::time::sleep(Duration::from_secs(5)).await;
1812 if let Err(e) = handle.is_running().await {
1813 if e.to_string().contains("shut down") {
1814 return;
1815 }
1816 }
1817 }
1818 })
1819 .await;
1820
1821 assert!(
1822 exited.is_ok(),
1823 "persistent client should exit after max retries"
1824 );
1825 assert!(!handle.is_connected(), "should not be connected after exit");
1826
1827 let _ = std::fs::remove_file(&path);
1828 }
1829
1830 #[tokio::test]
1834 async fn persistent_client_reconnects_after_server_restart() {
1835 use crate::desktop::ipc_server::IpcServer;
1836 use crate::manager::{manager_loop, ServiceFactory};
1837 use tokio_util::sync::CancellationToken;
1838
1839 let (path, shutdown1, _event_tx) = setup_server();
1841 let app = tauri::test::mock_app();
1842 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
1843
1844 tokio::time::timeout(Duration::from_secs(2), async {
1846 while !handle.is_connected() {
1847 tokio::time::sleep(Duration::from_millis(50)).await;
1848 }
1849 })
1850 .await
1851 .expect("should connect to first server");
1852
1853 let result = handle.is_running().await;
1855 assert!(
1856 result.is_ok(),
1857 "command should succeed on first server: {:?}",
1858 result.err()
1859 );
1860
1861 shutdown1.cancel();
1863 tokio::time::sleep(Duration::from_millis(150)).await;
1864
1865 let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
1867 let factory: ServiceFactory<tauri::test::MockRuntime> =
1868 Box::new(|| Box::new(BlockingService));
1869 tokio::spawn(manager_loop(
1870 cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
1871 ));
1872 let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
1873 let shutdown2 = CancellationToken::new();
1874 let s2 = shutdown2.clone();
1875 tokio::spawn(async move { server2.run(s2).await });
1876
1877 let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
1880 loop {
1881 if handle.is_connected() {
1882 break;
1883 }
1884 tokio::time::sleep(Duration::from_millis(100)).await;
1885 }
1886 })
1887 .await;
1888
1889 assert!(
1890 reconnected.is_ok(),
1891 "persistent client should reconnect after server restart (backoff resets)"
1892 );
1893
1894 let result = handle.is_running().await;
1896 assert!(
1897 result.is_ok(),
1898 "commands should work after reconnection: {:?}",
1899 result.err()
1900 );
1901
1902 shutdown2.cancel();
1903 }
1904
1905 #[tokio::test]
1913 async fn ipc_client_rejects_zero_length_frame() {
1914 let path = crate::desktop::test_helpers::unique_socket_path();
1915 let listener = transport::bind(path.clone()).unwrap();
1916
1917 let server_handle = tokio::spawn(async move {
1919 let (mut stream, _) = listener.accept().await.unwrap();
1920 use tokio::io::AsyncWriteExt;
1921 stream.write_all(&[0u8; 4]).await.unwrap();
1922 tokio::time::sleep(Duration::from_millis(500)).await;
1923 });
1924
1925 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1926
1927 let result = client.read_frame().await;
1929 assert!(
1930 result.is_err(),
1931 "zero-length frame should return error, got {:?}",
1932 result
1933 );
1934 let err = result.unwrap_err().to_string();
1935 assert!(
1936 err.contains("zero-length frame"),
1937 "Error should mention 'zero-length frame': {err}"
1938 );
1939
1940 server_handle.abort();
1941 let _ = std::fs::remove_file(&path);
1942 }
1943
1944 #[tokio::test]
1947 async fn ipc_client_eof_returns_ok_none() {
1948 let path = crate::desktop::test_helpers::unique_socket_path();
1949 let listener = transport::bind(path.clone()).unwrap();
1950
1951 let server_handle = tokio::spawn(async move {
1953 let (stream, _) = listener.accept().await.unwrap();
1954 drop(stream);
1955 });
1956
1957 let mut client = IpcClient::connect(path.clone()).await.unwrap();
1958 tokio::time::sleep(Duration::from_millis(20)).await;
1959
1960 let result = client.read_frame().await;
1962 assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
1963 assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
1964
1965 server_handle.abort();
1966 let _ = std::fs::remove_file(&path);
1967 }
1968
1969 #[tokio::test]
1975 async fn wait_for_connected_returns_immediately_when_connected() {
1976 let (path, shutdown, _event_tx) = setup_server();
1977 let app = tauri::test::mock_app();
1978 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
1979
1980 tokio::time::timeout(Duration::from_secs(2), async {
1982 while !handle.is_connected() {
1983 tokio::time::sleep(Duration::from_millis(50)).await;
1984 }
1985 })
1986 .await
1987 .expect("should connect");
1988
1989 let result = handle
1991 .wait_for_connected(Duration::from_secs(5))
1992 .await
1993 .unwrap();
1994 assert!(result, "should return true when connected");
1995
1996 shutdown.cancel();
1997 }
1998
1999 #[tokio::test]
2001 async fn wait_for_connected_times_out_when_no_server() {
2002 let app = tauri::test::mock_app();
2003 let path = crate::desktop::test_helpers::unique_socket_path();
2004 let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
2005
2006 let result = handle
2008 .wait_for_connected(Duration::from_millis(200))
2009 .await
2010 .unwrap();
2011 assert!(!result, "should return false when no server and timeout");
2012
2013 let _ = std::fs::remove_file(&path);
2014 }
2015
2016 #[tokio::test]
2018 async fn wait_for_connected_succeeds_after_server_starts() {
2019 let (path, shutdown, _event_tx) = setup_server();
2020 let app = tauri::test::mock_app();
2021 let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
2022
2023 let result = handle
2025 .wait_for_connected(Duration::from_secs(5))
2026 .await
2027 .unwrap();
2028 assert!(result, "should connect within timeout");
2029
2030 shutdown.cancel();
2031 }
2032
2033 #[tokio::test]
2035 async fn send_and_read_multiple_interleaved_events() {
2036 let path = crate::desktop::test_helpers::unique_socket_path();
2037 let server = buffered_server(
2038 &path,
2039 vec![
2040 IpcMessage::Event(IpcEvent::Started),
2041 IpcMessage::Event(IpcEvent::Error {
2042 message: "warning".into(),
2043 }),
2044 IpcMessage::Event(IpcEvent::Stopped {
2045 reason: StopReason::UserStop,
2046 }),
2047 IpcMessage::Response(IpcResponse {
2048 ok: true,
2049 data: Some(serde_json::json!({"running": false})),
2050 error: None,
2051 }),
2052 ],
2053 )
2054 .await;
2055
2056 let mut client = IpcClient::connect(path.clone()).await.unwrap();
2057 let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
2058 assert!(response.ok, "response should be ok");
2059 assert_eq!(events.len(), 3, "should collect all three events");
2060 assert!(
2061 matches!(events[0], IpcEvent::Started),
2062 "first event should be Started"
2063 );
2064 assert!(
2065 matches!(events[1], IpcEvent::Error { .. }),
2066 "second event should be Error"
2067 );
2068 assert!(
2069 matches!(events[2], IpcEvent::Stopped { .. }),
2070 "third event should be Stopped"
2071 );
2072
2073 server.await.unwrap();
2074 let _ = std::fs::remove_file(&path);
2075 }
2076}