use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tauri::{Emitter, Runtime};
use crate::desktop::ipc::{
decode_frame, encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse,
};
use crate::desktop::transport::{self, TransportReadHalf, TransportStream, TransportWriteHalf};
use crate::error::ServiceError;
use crate::models::{PluginEvent, ServiceStatus, StartConfig};
pub struct IpcClient {
stream: TransportStream,
}
impl IpcClient {
pub async fn connect(path: PathBuf) -> Result<Self, ServiceError> {
let stream = transport::connect(&path).await?;
Ok(Self { stream })
}
pub async fn start(&mut self, config: StartConfig) -> Result<(), ServiceError> {
let request = IpcRequest::Start { config };
let (response, _events) = self.send_and_read(&request).await?;
if response.ok {
Ok(())
} else {
Err(ServiceError::Ipc(
response.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
pub async fn stop(&mut self) -> Result<(), ServiceError> {
let (response, _events) = self.send_and_read(&IpcRequest::Stop).await?;
if response.ok {
Ok(())
} else {
Err(ServiceError::Ipc(
response.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
pub async fn is_running(&mut self) -> Result<bool, ServiceError> {
let (response, _events) = self.send_and_read(&IpcRequest::IsRunning).await?;
if response.ok {
Ok(response
.data
.and_then(|d| d.get("running").and_then(|v| v.as_bool()))
.unwrap_or(false))
} else {
Err(ServiceError::Ipc(
response.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
pub async fn get_state(&mut self) -> Result<ServiceStatus, ServiceError> {
let (response, _events) = self.send_and_read(&IpcRequest::GetState).await?;
if response.ok {
response
.data
.ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
.and_then(|d| {
serde_json::from_value::<ServiceStatus>(d)
.map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
})
} else {
Err(ServiceError::Ipc(
response.error.unwrap_or_else(|| "unknown error".into()),
))
}
}
pub async fn read_event(&mut self) -> Result<Option<IpcEvent>, ServiceError> {
let frame = match self.read_frame().await? {
Some(f) => f,
None => return Ok(None),
};
match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode event: {e}")))? {
IpcMessage::Event(event) => Ok(Some(event)),
other => Err(ServiceError::Ipc(format!(
"expected event frame, got {:?}",
std::mem::discriminant(&other),
))),
}
}
pub fn listen_events<R: Runtime>(mut self, app: tauri::AppHandle<R>) {
tokio::spawn(async move {
loop {
match self.read_event().await {
Ok(Some(event)) => {
let plugin_event = ipc_event_to_plugin_event(event);
let _ = app.emit("background-service://event", plugin_event);
}
Ok(None) => break,
Err(_) => break,
}
}
});
}
async fn send_and_read(
&mut self,
request: &IpcRequest,
) -> Result<(IpcResponse, Vec<IpcEvent>), ServiceError> {
self.send_request(request).await?;
let mut events = Vec::new();
loop {
let frame = self
.read_frame()
.await?
.ok_or_else(|| ServiceError::Ipc("connection closed".into()))?;
match decode_frame(&frame).map_err(|e| ServiceError::Ipc(format!("decode: {e}")))? {
IpcMessage::Response(resp) => return Ok((resp, events)),
IpcMessage::Event(e) => {
events.push(e);
}
IpcMessage::Request(_) => {
return Err(ServiceError::Ipc("unexpected request frame".into()));
}
}
}
}
async fn send_request(&mut self, request: &IpcRequest) -> Result<(), ServiceError> {
let msg = IpcMessage::Request(request.clone());
let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
transport::write_frame(&mut self.stream, &frame)
.await
.map_err(ServiceError::Ipc)?;
Ok(())
}
async fn read_frame(&mut self) -> Result<Option<Vec<u8>>, ServiceError> {
transport::read_frame(&mut self.stream)
.await
.map_err(ServiceError::Ipc)
}
}
pub fn ipc_event_to_plugin_event(event: IpcEvent) -> PluginEvent {
match event {
IpcEvent::Started => PluginEvent::Started,
IpcEvent::Stopped { reason } => PluginEvent::Stopped { reason },
IpcEvent::Error { message } => PluginEvent::Error { message },
}
}
enum IpcCommand {
Start {
config: StartConfig,
reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
},
Stop {
reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
},
IsRunning {
reply: tokio::sync::oneshot::Sender<Result<bool, ServiceError>>,
},
GetState {
reply: tokio::sync::oneshot::Sender<Result<ServiceStatus, ServiceError>>,
},
EnableAutoRestart {
config: Option<StartConfig>,
reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
},
DisableAutoRestart {
reply: tokio::sync::oneshot::Sender<Result<(), ServiceError>>,
},
GetDesiredState {
reply: tokio::sync::oneshot::Sender<
Result<Option<crate::desired_state::DesiredState>, ServiceError>,
>,
},
ValidateSetup {
reply: tokio::sync::oneshot::Sender<
Result<crate::models::SetupValidationReport, ServiceError>,
>,
},
}
pub struct PersistentIpcClientHandle {
cmd_tx: tokio::sync::mpsc::Sender<IpcCommand>,
shutdown: tokio_util::sync::CancellationToken,
connected: Arc<AtomicBool>,
socket_path: PathBuf,
}
impl Drop for PersistentIpcClientHandle {
fn drop(&mut self) {
self.shutdown.cancel();
}
}
impl PersistentIpcClientHandle {
pub fn spawn<R: Runtime>(socket_path: PathBuf, app: tauri::AppHandle<R>) -> Self {
let (cmd_tx, cmd_rx) = tokio::sync::mpsc::channel(16);
let shutdown = tokio_util::sync::CancellationToken::new();
let connected = Arc::new(AtomicBool::new(false));
tokio::spawn(persistent_client_loop(
socket_path.clone(),
app,
cmd_rx,
shutdown.clone(),
connected.clone(),
));
Self {
cmd_tx,
shutdown,
connected,
socket_path,
}
}
pub async fn start(&self, config: StartConfig) -> Result<(), ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::Start {
config,
reply: reply_tx,
})
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn stop(&self) -> Result<(), ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::Stop { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn is_running(&self) -> Result<bool, ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::IsRunning { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn get_state(&self) -> Result<ServiceStatus, ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::GetState { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub fn is_connected(&self) -> bool {
self.connected.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn socket_path(&self) -> &PathBuf {
&self.socket_path
}
pub async fn wait_for_connected(&self, timeout: Duration) -> Result<bool, ServiceError> {
let deadline = tokio::time::Instant::now() + timeout;
let poll_interval = Duration::from_millis(500);
while tokio::time::Instant::now() < deadline {
if self.is_connected() {
return Ok(true);
}
let remaining = deadline - tokio::time::Instant::now();
let sleep_dur = poll_interval.min(remaining);
tokio::time::sleep(sleep_dur).await;
}
if self.is_connected() {
Ok(true)
} else {
Ok(false)
}
}
pub async fn enable_auto_restart(
&self,
config: Option<StartConfig>,
) -> Result<(), ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::EnableAutoRestart {
config,
reply: reply_tx,
})
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn disable_auto_restart(&self) -> Result<(), ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::DisableAutoRestart { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn get_desired_state(
&self,
) -> Result<Option<crate::desired_state::DesiredState>, ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::GetDesiredState { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
pub async fn validate_setup(
&self,
) -> Result<crate::models::SetupValidationReport, ServiceError> {
let (reply_tx, reply_rx) = tokio::sync::oneshot::channel();
self.cmd_tx
.send(IpcCommand::ValidateSetup { reply: reply_tx })
.await
.map_err(|_| ServiceError::Ipc("persistent client shut down".into()))?;
reply_rx
.await
.map_err(|_| ServiceError::Ipc("command dropped".into()))?
}
}
async fn persistent_client_loop<R: Runtime>(
socket_path: PathBuf,
app: tauri::AppHandle<R>,
mut cmd_rx: tokio::sync::mpsc::Receiver<IpcCommand>,
shutdown: tokio_util::sync::CancellationToken,
connected: Arc<AtomicBool>,
) {
use backon::BackoffBuilder;
let backoff_builder = backon::ExponentialBuilder::default()
.with_min_delay(Duration::from_secs(1))
.with_max_delay(Duration::from_secs(30))
.with_max_times(10)
.with_jitter();
let mut attempts = backoff_builder.build();
loop {
tokio::select! {
biased;
_ = shutdown.cancelled() => {
log::info!("Persistent IPC client shutting down");
connected.store(false, std::sync::atomic::Ordering::Relaxed);
break;
}
connect_result = transport::connect(&socket_path) => {
match connect_result {
Ok(stream) => {
log::info!("Persistent IPC client connected");
connected.store(true, std::sync::atomic::Ordering::Relaxed);
let result = run_persistent_connection(stream, &app, &mut cmd_rx, &connected).await;
attempts = backoff_builder.build();
if result.is_err() {
log::info!("Persistent IPC connection lost, reconnecting...");
connected.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
Err(_) => {
log::debug!("Persistent IPC client: connection failed, retrying...");
connected.store(false, std::sync::atomic::Ordering::Relaxed);
}
}
let delay = match attempts.next() {
Some(d) => d,
None => {
log::warn!("Persistent IPC client: backoff exhausted, giving up");
break;
}
};
tokio::select! {
biased;
_ = shutdown.cancelled() => {
log::info!("Persistent IPC client shutting down");
connected.store(false, std::sync::atomic::Ordering::Relaxed);
break;
}
_ = tokio::time::sleep(delay) => {}
}
}
}
}
}
async fn run_persistent_connection<R: Runtime>(
stream: TransportStream,
app: &tauri::AppHandle<R>,
cmd_rx: &mut tokio::sync::mpsc::Receiver<IpcCommand>,
connected: &Arc<AtomicBool>,
) -> Result<(), ServiceError> {
let (read_half, mut write_half) = transport::split(stream);
let response_slot: std::sync::Arc<
tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>,
> = std::sync::Arc::new(tokio::sync::Mutex::new(None));
let slot_writer = response_slot.clone();
let app_clone = app.clone();
let connected_reader = connected.clone();
let reader_handle = tokio::spawn(async move {
let mut read_half = read_half;
loop {
let frame = match read_frame_from(&mut read_half).await {
Ok(Some(f)) => f,
Ok(None) => break, Err(_) => break,
};
match decode_frame(&frame) {
Ok(IpcMessage::Response(resp)) => {
let mut slot = slot_writer.lock().await;
if let Some(sender) = slot.take() {
let _ = sender.send(resp);
}
continue;
}
Ok(IpcMessage::Event(event)) => {
let plugin_event = ipc_event_to_plugin_event(event);
let _ = app_clone.emit("background-service://event", plugin_event);
continue;
}
Ok(IpcMessage::Request(_)) => {
log::warn!("unexpected request frame on client connection");
continue;
}
Err(e) => {
log::debug!("failed to decode IPC frame: {e}");
continue;
}
}
}
connected_reader.store(false, std::sync::atomic::Ordering::Relaxed);
});
let result = loop {
tokio::select! {
cmd = cmd_rx.recv() => {
let cmd = match cmd {
Some(c) => c,
None => break Err(ServiceError::Ipc("command channel closed".into())),
};
match cmd {
IpcCommand::Start { config, reply } => {
let request = IpcRequest::Start { config };
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &request).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => Ok(()),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::Stop { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::Stop).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => Ok(()),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::IsRunning { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::IsRunning).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => Ok(resp
.data
.and_then(|d| d.get("running").and_then(|v| v.as_bool()))
.unwrap_or(false)),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::GetState { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetState).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => resp
.data
.ok_or_else(|| ServiceError::Ipc("missing data in GetState response".into()))
.and_then(|d| {
serde_json::from_value::<ServiceStatus>(d)
.map_err(|e| ServiceError::Ipc(format!("deserialize GetState: {e}")))
}),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::EnableAutoRestart { config, reply } => {
let request = IpcRequest::EnableAutoRestart { config };
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &request).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => Ok(()),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::DisableAutoRestart { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::DisableAutoRestart).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => Ok(()),
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::GetDesiredState { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::GetDesiredState).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => {
match resp.data {
Some(d) => serde_json::from_value::<crate::desired_state::DesiredState>(d)
.map(Some)
.map_err(|e| ServiceError::Ipc(format!("deserialize GetDesiredState: {e}"))),
None => Ok(None),
}
}
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
IpcCommand::ValidateSetup { reply } => {
let rx = prepare_response_slot(&response_slot).await;
if let Err(e) = send_request_to(&mut write_half, &IpcRequest::ValidateSetup).await {
let _ = reply.send(Err(e));
break Err(ServiceError::Ipc("send failed".into()));
}
let response = await_response(rx).await;
let result = match response {
Ok(resp) if resp.ok => {
match resp.data {
Some(d) => serde_json::from_value::<crate::models::SetupValidationReport>(d)
.map_err(|e| ServiceError::Ipc(format!("deserialize ValidateSetup: {e}"))),
None => Err(ServiceError::Ipc("missing ValidateSetup response data".into())),
}
}
Ok(resp) => Err(ServiceError::Ipc(
resp.error.unwrap_or_else(|| "unknown error".into()),
)),
Err(e) => Err(e),
};
let _ = reply.send(result);
}
}
}
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {
if reader_handle.is_finished() {
break Err(ServiceError::Ipc("reader task died".into()));
}
}
}
};
reader_handle.abort();
result
}
async fn send_request_to(
write_half: &mut TransportWriteHalf,
request: &IpcRequest,
) -> Result<(), ServiceError> {
let msg = IpcMessage::Request(request.clone());
let frame = encode_frame(&msg).map_err(|e| ServiceError::Ipc(format!("encode: {e}")))?;
transport::write_frame(write_half, &frame)
.await
.map_err(ServiceError::Ipc)?;
Ok(())
}
async fn prepare_response_slot(
slot: &std::sync::Arc<tokio::sync::Mutex<Option<tokio::sync::oneshot::Sender<IpcResponse>>>>,
) -> tokio::sync::oneshot::Receiver<IpcResponse> {
let (tx, rx) = tokio::sync::oneshot::channel();
let mut guard = slot.lock().await;
debug_assert!(
guard.is_none(),
"response slot overwritten — sequential command invariant violated"
);
*guard = Some(tx);
rx
}
async fn await_response(
rx: tokio::sync::oneshot::Receiver<IpcResponse>,
) -> Result<IpcResponse, ServiceError> {
tokio::select! {
response = rx => {
response.map_err(|_| ServiceError::Ipc("response channel closed".into()))
}
_ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
Err(ServiceError::Ipc("response timeout".into()))
}
}
}
async fn read_frame_from(
read_half: &mut TransportReadHalf,
) -> Result<Option<Vec<u8>>, ServiceError> {
transport::read_frame(read_half)
.await
.map_err(ServiceError::Ipc)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::desktop::test_helpers::{
setup_server, setup_server_with_factory, BlockingService, ImmediateSuccessService,
};
use std::sync::atomic::Ordering;
use std::time::Duration;
use tauri::Listener;
#[tokio::test]
async fn ipc_client_connect() {
let (path, shutdown, _event_tx) = setup_server();
let result = IpcClient::connect(path).await;
assert!(result.is_ok(), "client should connect: {:?}", result.err());
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_send_start() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
let result = client.start(StartConfig::default()).await;
assert!(result.is_ok(), "start should succeed: {:?}", result.err());
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_send_stop() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
let result = client.stop().await;
assert!(result.is_ok(), "stop should succeed: {:?}", result.err());
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_is_running() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
let running = client.is_running().await.unwrap();
assert!(!running, "should not be running initially");
client.start(StartConfig::default()).await.unwrap();
let running = client.is_running().await.unwrap();
assert!(running, "should be running after start");
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_get_state_initial() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
let status = client.get_state().await.unwrap();
assert!(
matches!(status.state, crate::models::ServiceState::Idle),
"expected Idle, got {:?}",
status.state
);
assert_eq!(status.last_error, None);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_get_state_after_start() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
let status = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let s = client.get_state().await.unwrap();
if matches!(s.state, crate::models::ServiceState::Running) {
return s;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for Running state");
assert_eq!(status.last_error, None);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_get_state_after_stop() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
client.stop().await.unwrap();
let status = client.get_state().await.unwrap();
assert!(
matches!(status.state, crate::models::ServiceState::Stopped),
"expected Stopped, got {:?}",
status.state
);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_receive_events() {
let (path, shutdown, event_tx) =
setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
let _ = event_tx.send(IpcEvent::Started);
let event = tokio::time::timeout(Duration::from_millis(500), client.read_event())
.await
.expect("timed out waiting for event")
.expect("read_event failed");
assert!(event.is_some(), "should receive an event");
let event = event.unwrap();
assert!(
matches!(event, IpcEvent::Started),
"Expected Started event, got {:?}",
event
);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_stop_when_not_running() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
let result = client.stop().await;
assert!(result.is_err(), "stop when not running should fail");
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_connect_to_nonexistent() {
let path = std::env::temp_dir().join("nonexistent-test-socket.sock");
let result = IpcClient::connect(path).await;
assert!(
result.is_err(),
"should fail to connect to nonexistent socket"
);
}
#[test]
fn ipc_event_to_plugin_event_started() {
let event = IpcEvent::Started;
let plugin = ipc_event_to_plugin_event(event);
assert!(matches!(plugin, PluginEvent::Started));
}
#[test]
fn ipc_event_to_plugin_event_stopped() {
let event = IpcEvent::Stopped {
reason: "cancelled".into(),
};
let plugin = ipc_event_to_plugin_event(event);
match plugin {
PluginEvent::Stopped { reason } => assert_eq!(reason, "cancelled"),
other => panic!("Expected Stopped, got {other:?}"),
}
}
#[test]
fn ipc_event_to_plugin_event_error() {
let event = IpcEvent::Error {
message: "init failed".into(),
};
let plugin = ipc_event_to_plugin_event(event);
match plugin {
PluginEvent::Error { message } => assert_eq!(message, "init failed"),
other => panic!("Expected Error, got {other:?}"),
}
}
#[tokio::test]
async fn ipc_client_full_lifecycle() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
assert!(!client.is_running().await.unwrap());
client.start(StartConfig::default()).await.unwrap();
assert!(client.is_running().await.unwrap());
client.stop().await.unwrap();
assert!(!client.is_running().await.unwrap());
shutdown.cancel();
}
#[tokio::test]
async fn ipc_client_listen_events() {
let (path, shutdown, event_tx) =
setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
let app = tauri::test::mock_app();
let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let received_clone = received.clone();
app.listen("background-service://event", move |_event| {
received_clone.store(true, Ordering::SeqCst);
});
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
client.listen_events(app.handle().clone());
let _ = event_tx.send(IpcEvent::Started);
tokio::time::timeout(Duration::from_millis(500), async {
while !received.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for event via listen_events");
assert!(
received.load(Ordering::SeqCst),
"should have received event"
);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_loopback_full_lifecycle_with_events() {
let (path, shutdown, event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
assert!(
!client.is_running().await.unwrap(),
"should not be running initially"
);
client
.start(StartConfig::default())
.await
.expect("start should succeed");
let _ = event_tx.send(IpcEvent::Started);
let started = tokio::time::timeout(Duration::from_millis(500), client.read_event())
.await
.expect("timed out waiting for Started event")
.expect("read_event failed")
.expect("should receive event");
assert!(
matches!(started, IpcEvent::Started),
"Expected Started event, got {started:?}"
);
assert!(
client.is_running().await.unwrap(),
"should be running after start"
);
client.stop().await.expect("stop should succeed");
let _ = event_tx.send(IpcEvent::Stopped {
reason: "cancelled".into(),
});
let stopped = tokio::time::timeout(Duration::from_millis(500), client.read_event())
.await
.expect("timed out waiting for Stopped event")
.expect("read_event failed")
.expect("should receive event");
assert!(
matches!(stopped, IpcEvent::Stopped { .. }),
"Expected Stopped event, got {stopped:?}"
);
assert!(
!client.is_running().await.unwrap(),
"should not be running after stop"
);
shutdown.cancel();
}
#[tokio::test]
async fn ipc_loopback_event_streaming_plugin_event_conversion() {
let (path, shutdown, event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
let _ = event_tx.send(IpcEvent::Started);
let started_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
.await
.expect("timed out")
.expect("read_event failed")
.expect("should receive event");
let started_plugin = ipc_event_to_plugin_event(started_ipc);
assert!(
matches!(started_plugin, PluginEvent::Started),
"Expected PluginEvent::Started, got {started_plugin:?}"
);
client.stop().await.unwrap();
let _ = event_tx.send(IpcEvent::Stopped {
reason: "cancelled".into(),
});
let stopped_ipc = tokio::time::timeout(Duration::from_millis(500), client.read_event())
.await
.expect("timed out")
.expect("read_event failed")
.expect("should receive event");
let stopped_plugin = ipc_event_to_plugin_event(stopped_ipc);
match stopped_plugin {
PluginEvent::Stopped { reason } => {
assert_eq!(reason, "cancelled", "Expected 'cancelled' reason");
}
other => panic!("Expected PluginEvent::Stopped, got {other:?}"),
}
shutdown.cancel();
}
#[tokio::test]
async fn ipc_loopback_connection_drop_returns_error() {
let path = crate::desktop::test_helpers::unique_socket_path();
let listener = transport::bind(path.clone()).unwrap();
let path_clone = path.clone();
let client_handle =
tokio::spawn(async move { IpcClient::connect(path_clone).await.unwrap() });
let (server_stream, _) = listener.accept().await.unwrap();
drop(server_stream);
tokio::time::sleep(Duration::from_millis(20)).await;
let mut client = client_handle.await.unwrap();
let result = client.is_running().await;
assert!(
result.is_err(),
"should get error after server drops connection"
);
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn ipc_loopback_double_start_returns_error() {
let (path, shutdown, _event_tx) = setup_server();
let mut client = IpcClient::connect(path).await.unwrap();
client.start(StartConfig::default()).await.unwrap();
let result = client.start(StartConfig::default()).await;
assert!(result.is_err(), "double start should return error");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.to_lowercase().contains("already"),
"Error should mention 'already': {err_msg}"
);
shutdown.cancel();
}
#[tokio::test]
async fn persistent_client_connects() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
tokio::time::sleep(Duration::from_millis(100)).await;
let running = handle.is_running().await;
assert!(
running.is_ok(),
"should get response via persistent connection: {:?}",
running.err()
);
assert!(!running.unwrap(), "should not be running initially");
shutdown.cancel();
}
#[tokio::test]
async fn persistent_client_reconnects() {
use crate::desktop::ipc_server::IpcServer;
use crate::manager::{manager_loop, ServiceFactory};
use tokio_util::sync::CancellationToken;
let (path, shutdown1, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
tokio::time::sleep(Duration::from_millis(100)).await;
let result = handle.is_running().await;
assert!(
result.is_ok(),
"should connect to first server: {:?}",
result.err()
);
shutdown1.cancel();
tokio::time::sleep(Duration::from_millis(150)).await;
let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
let factory: ServiceFactory<tauri::test::MockRuntime> =
Box::new(|| Box::new(BlockingService));
tokio::spawn(manager_loop(
cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
));
let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
let shutdown2 = CancellationToken::new();
let s2 = shutdown2.clone();
tokio::spawn(async move { server2.run(s2).await });
let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
loop {
tokio::time::sleep(Duration::from_millis(200)).await;
if handle.is_running().await.is_ok() {
break;
}
}
})
.await;
assert!(
reconnected.is_ok(),
"persistent client should reconnect to second server"
);
shutdown2.cancel();
}
#[tokio::test]
async fn event_relay() {
let (path, shutdown, event_tx) =
setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
let app = tauri::test::mock_app();
let received = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let received_clone = received.clone();
app.listen("background-service://event", move |_event| {
received_clone.store(true, Ordering::SeqCst);
});
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
let result = handle.start(StartConfig::default()).await;
assert!(result.is_ok(), "start should succeed: {:?}", result.err());
let _ = event_tx.send(IpcEvent::Started);
tokio::time::timeout(Duration::from_millis(500), async {
while !received.load(Ordering::SeqCst) {
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for event relay via app.emit()");
assert!(
received.load(Ordering::SeqCst),
"event should be relayed through app.emit()"
);
shutdown.cancel();
}
#[tokio::test]
async fn start_stop_lifecycle() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
let running = handle.is_running().await.unwrap();
assert!(!running, "should not be running initially");
handle
.start(StartConfig::default())
.await
.expect("start should succeed");
let running = handle.is_running().await.unwrap();
assert!(running, "should be running after start");
handle.stop().await.expect("stop should succeed");
let running = handle.is_running().await.unwrap();
assert!(!running, "should not be running after stop");
shutdown.cancel();
}
#[tokio::test]
async fn persistent_client_get_state() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
tokio::time::sleep(Duration::from_millis(100)).await;
let status = handle.get_state().await.unwrap();
assert!(
matches!(status.state, crate::models::ServiceState::Idle),
"expected Idle, got {:?}",
status.state
);
handle.start(StartConfig::default()).await.unwrap();
let status = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let s = handle.get_state().await.unwrap();
if matches!(s.state, crate::models::ServiceState::Running) {
return s;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for Running state");
assert!(
matches!(status.state, crate::models::ServiceState::Running),
"expected Running, got {:?}",
status.state
);
shutdown.cancel();
}
#[tokio::test]
async fn persistent_client_timeout_on_unresponsive_server() {
let path = crate::desktop::test_helpers::unique_socket_path();
let listener = transport::bind(path.clone()).unwrap();
let server_handle = tokio::spawn(async move {
let (_stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_secs(60)).await;
});
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
tokio::time::sleep(Duration::from_millis(100)).await;
let result = tokio::time::timeout(
Duration::from_secs(15),
handle.start(StartConfig::default()),
)
.await;
assert!(
result.is_ok(),
"start should not hang — expected error, got outer timeout"
);
let inner = result.unwrap();
assert!(
inner.is_err(),
"start should return error when server is unresponsive"
);
server_handle.abort();
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn persistent_client_terminates_on_handle_drop() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
tokio::time::sleep(Duration::from_millis(100)).await;
drop(handle);
tokio::time::sleep(Duration::from_secs(2)).await;
shutdown.cancel();
}
async fn buffered_server(
path: &std::path::Path,
frames: Vec<IpcMessage>,
) -> tokio::task::JoinHandle<()> {
let listener = transport::bind(path.to_path_buf()).unwrap();
tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut len_buf = [0u8; 4];
if stream.read_exact(&mut len_buf).await.is_err() {
return;
}
let len = u32::from_be_bytes(len_buf) as usize;
let mut payload = vec![0u8; len];
if stream.read_exact(&mut payload).await.is_err() {
return;
}
for msg in &frames {
let frame = crate::desktop::ipc::encode_frame(msg).unwrap();
if stream.write_all(&frame).await.is_err() {
return;
}
}
})
}
#[tokio::test]
async fn send_and_read_no_interleaved_events() {
let path = crate::desktop::test_helpers::unique_socket_path();
let server = buffered_server(
&path,
vec![IpcMessage::Response(IpcResponse {
ok: true,
data: None,
error: None,
})],
)
.await;
let mut client = IpcClient::connect(path.clone()).await.unwrap();
let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
assert!(response.ok, "response should be ok");
assert!(
events.is_empty(),
"events should be empty when no events interleave, got {:?}",
events
);
server.await.unwrap();
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn send_and_read_single_interleaved_event() {
let path = crate::desktop::test_helpers::unique_socket_path();
let server = buffered_server(
&path,
vec![
IpcMessage::Event(IpcEvent::Started),
IpcMessage::Response(IpcResponse {
ok: true,
data: None,
error: None,
}),
],
)
.await;
let mut client = IpcClient::connect(path.clone()).await.unwrap();
let (response, events) = client
.send_and_read(&IpcRequest::Start {
config: StartConfig::default(),
})
.await
.unwrap();
assert!(response.ok, "response should be ok");
assert_eq!(events.len(), 1, "should collect exactly one event");
assert!(
matches!(events[0], IpcEvent::Started),
"expected Started event, got {:?}",
events[0]
);
server.await.unwrap();
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn is_connected_false_before_server() {
let app = tauri::test::mock_app();
let path = crate::desktop::test_helpers::unique_socket_path();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
!handle.is_connected(),
"should not be connected when no server is running"
);
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn is_connected_true_after_connect() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
tokio::time::timeout(Duration::from_secs(2), async {
while !handle.is_connected() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for is_connected to become true");
assert!(
handle.is_connected(),
"should be connected after server is up"
);
shutdown.cancel();
}
#[tokio::test]
async fn is_connected_false_after_server_shutdown() {
let path = crate::desktop::test_helpers::unique_socket_path();
let path_clone = path.clone();
let listener = transport::bind(path.clone()).unwrap();
let server_handle = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
drop(stream);
let _ = std::fs::remove_file(&path_clone);
});
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
tokio::time::timeout(Duration::from_secs(2), async {
while !handle.is_connected() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for initial connection");
assert!(handle.is_connected(), "should be connected initially");
tokio::time::timeout(Duration::from_secs(3), async {
while handle.is_connected() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("timed out waiting for is_connected to become false");
assert!(
!handle.is_connected(),
"should not be connected after server shutdown"
);
server_handle.abort();
let _ = std::fs::remove_file(&path);
}
#[test]
fn backoff_builder_produces_increasing_delays() {
use backon::BackoffBuilder;
let builder = backon::ExponentialBuilder::default()
.with_min_delay(Duration::from_secs(1))
.with_max_delay(Duration::from_secs(30))
.with_max_times(10)
.with_jitter();
let mut attempts = builder.build();
let mut delays = Vec::new();
while let Some(d) = attempts.next() {
delays.push(d);
}
assert_eq!(delays.len(), 10, "should produce exactly 10 delays");
assert!(
delays[0] >= Duration::from_millis(500),
"first delay too short: {:?}",
delays[0]
);
assert!(
delays[0] <= Duration::from_secs(2),
"first delay too long: {:?}",
delays[0]
);
assert!(
delays[9] >= Duration::from_secs(15),
"last delay should approach max: {:?}",
delays[9]
);
for d in &delays {
assert!(
*d <= Duration::from_secs(60),
"delay exceeds max_delay + jitter margin: {:?}",
d
);
}
assert!(
attempts.next().is_none(),
"should return None after 10 attempts"
);
}
#[ignore]
#[tokio::test]
async fn persistent_client_exits_after_max_retries() {
let app = tauri::test::mock_app();
let path = crate::desktop::test_helpers::unique_socket_path();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
let exited = tokio::time::timeout(Duration::from_secs(180), async {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
if let Err(e) = handle.is_running().await {
if e.to_string().contains("shut down") {
return;
}
}
}
})
.await;
assert!(
exited.is_ok(),
"persistent client should exit after max retries"
);
assert!(!handle.is_connected(), "should not be connected after exit");
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn persistent_client_reconnects_after_server_restart() {
use crate::desktop::ipc_server::IpcServer;
use crate::manager::{manager_loop, ServiceFactory};
use tokio_util::sync::CancellationToken;
let (path, shutdown1, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
tokio::time::timeout(Duration::from_secs(2), async {
while !handle.is_connected() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("should connect to first server");
let result = handle.is_running().await;
assert!(
result.is_ok(),
"command should succeed on first server: {:?}",
result.err()
);
shutdown1.cancel();
tokio::time::sleep(Duration::from_millis(150)).await;
let (cmd_tx2, cmd_rx2) = tokio::sync::mpsc::channel(16);
let factory: ServiceFactory<tauri::test::MockRuntime> =
Box::new(|| Box::new(BlockingService));
tokio::spawn(manager_loop(
cmd_rx2, factory, 0.0, 0.0, 0.0, 0.0, false, false, None,
));
let server2 = IpcServer::bind(path.clone(), cmd_tx2, app.handle().clone()).unwrap();
let shutdown2 = CancellationToken::new();
let s2 = shutdown2.clone();
tokio::spawn(async move { server2.run(s2).await });
let reconnected = tokio::time::timeout(Duration::from_secs(3), async {
loop {
if handle.is_connected() {
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
})
.await;
assert!(
reconnected.is_ok(),
"persistent client should reconnect after server restart (backoff resets)"
);
let result = handle.is_running().await;
assert!(
result.is_ok(),
"commands should work after reconnection: {:?}",
result.err()
);
shutdown2.cancel();
}
#[tokio::test]
async fn ipc_client_rejects_zero_length_frame() {
let path = crate::desktop::test_helpers::unique_socket_path();
let listener = transport::bind(path.clone()).unwrap();
let server_handle = tokio::spawn(async move {
let (mut stream, _) = listener.accept().await.unwrap();
use tokio::io::AsyncWriteExt;
stream.write_all(&[0u8; 4]).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
});
let mut client = IpcClient::connect(path.clone()).await.unwrap();
let result = client.read_frame().await;
assert!(
result.is_err(),
"zero-length frame should return error, got {:?}",
result
);
let err = result.unwrap_err().to_string();
assert!(
err.contains("zero-length frame"),
"Error should mention 'zero-length frame': {err}"
);
server_handle.abort();
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn ipc_client_eof_returns_ok_none() {
let path = crate::desktop::test_helpers::unique_socket_path();
let listener = transport::bind(path.clone()).unwrap();
let server_handle = tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
drop(stream);
});
let mut client = IpcClient::connect(path.clone()).await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
let result = client.read_frame().await;
assert!(result.is_ok(), "EOF should return Ok, got {:?}", result);
assert!(result.unwrap().is_none(), "EOF should return Ok(None)");
server_handle.abort();
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn wait_for_connected_returns_immediately_when_connected() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
tokio::time::timeout(Duration::from_secs(2), async {
while !handle.is_connected() {
tokio::time::sleep(Duration::from_millis(50)).await;
}
})
.await
.expect("should connect");
let result = handle
.wait_for_connected(Duration::from_secs(5))
.await
.unwrap();
assert!(result, "should return true when connected");
shutdown.cancel();
}
#[tokio::test]
async fn wait_for_connected_times_out_when_no_server() {
let app = tauri::test::mock_app();
let path = crate::desktop::test_helpers::unique_socket_path();
let handle = PersistentIpcClientHandle::spawn(path.clone(), app.handle().clone());
let result = handle
.wait_for_connected(Duration::from_millis(200))
.await
.unwrap();
assert!(!result, "should return false when no server and timeout");
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn wait_for_connected_succeeds_after_server_starts() {
let (path, shutdown, _event_tx) = setup_server();
let app = tauri::test::mock_app();
let handle = PersistentIpcClientHandle::spawn(path, app.handle().clone());
let result = handle
.wait_for_connected(Duration::from_secs(5))
.await
.unwrap();
assert!(result, "should connect within timeout");
shutdown.cancel();
}
#[tokio::test]
async fn send_and_read_multiple_interleaved_events() {
let path = crate::desktop::test_helpers::unique_socket_path();
let server = buffered_server(
&path,
vec![
IpcMessage::Event(IpcEvent::Started),
IpcMessage::Event(IpcEvent::Error {
message: "warning".into(),
}),
IpcMessage::Event(IpcEvent::Stopped {
reason: "cancelled".into(),
}),
IpcMessage::Response(IpcResponse {
ok: true,
data: Some(serde_json::json!({"running": false})),
error: None,
}),
],
)
.await;
let mut client = IpcClient::connect(path.clone()).await.unwrap();
let (response, events) = client.send_and_read(&IpcRequest::IsRunning).await.unwrap();
assert!(response.ok, "response should be ok");
assert_eq!(events.len(), 3, "should collect all three events");
assert!(
matches!(events[0], IpcEvent::Started),
"first event should be Started"
);
assert!(
matches!(events[1], IpcEvent::Error { .. }),
"second event should be Error"
);
assert!(
matches!(events[2], IpcEvent::Stopped { .. }),
"third event should be Stopped"
);
server.await.unwrap();
let _ = std::fs::remove_file(&path);
}
}