use std::path::PathBuf;
use tauri::{AppHandle, Runtime};
use tokio::io::AsyncWriteExt;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use crate::desktop::ipc::{encode_frame, IpcEvent, IpcMessage, IpcRequest, IpcResponse};
use crate::desktop::transport::{self, TransportListener, TransportReadHalf, TransportStream};
use crate::error::ServiceError;
use crate::manager::ManagerCommand;
#[non_exhaustive]
enum ReadError {
Io(#[allow(dead_code)] std::io::Error),
Json(String),
TooLarge(#[allow(dead_code)] usize),
ZeroLength,
}
enum Incoming {
Request(IpcRequest),
Error(String),
Done,
}
pub(crate) struct IpcServer<R: Runtime> {
listener: TransportListener,
cmd_tx: mpsc::Sender<ManagerCommand<R>>,
app: AppHandle<R>,
event_tx: broadcast::Sender<IpcEvent>,
socket_path: PathBuf,
}
impl<R: Runtime> IpcServer<R> {
pub fn bind(
path: PathBuf,
cmd_tx: mpsc::Sender<ManagerCommand<R>>,
app: AppHandle<R>,
) -> Result<Self, ServiceError> {
let listener = transport::bind(path.clone())?;
let (event_tx, _) = broadcast::channel(32);
Ok(Self {
listener,
cmd_tx,
app,
event_tx,
socket_path: path,
})
}
pub fn event_sender(&self) -> broadcast::Sender<IpcEvent> {
self.event_tx.clone()
}
pub async fn run(mut self, shutdown: CancellationToken) {
let socket_path = self.socket_path.clone();
loop {
tokio::select! {
accept_result = transport::accept(&mut self.listener) => {
match accept_result {
Ok(stream) => {
let cmd_tx = self.cmd_tx.clone();
let app = self.app.clone();
let event_tx = self.event_tx.clone();
tokio::spawn(handle_connection(stream, cmd_tx, app, event_tx));
}
Err(e) => {
log::warn!("IPC accept error: {e}");
break;
}
}
}
_ = shutdown.cancelled() => {
log::info!("IPC server shutting down");
break;
}
}
}
transport::cleanup(&socket_path);
}
}
async fn request_reader(mut stream: TransportReadHalf, tx: mpsc::Sender<Incoming>) {
loop {
match read_request(&mut stream).await {
Ok(req) => {
if tx.send(Incoming::Request(req)).await.is_err() {
break;
}
}
Err(ReadError::Json(msg)) => {
if tx.send(Incoming::Error(msg)).await.is_err() {
break;
}
}
Err(_) => {
let _ = tx.send(Incoming::Done).await;
break;
}
}
}
}
async fn handle_connection<R: Runtime>(
stream: TransportStream,
cmd_tx: mpsc::Sender<ManagerCommand<R>>,
app: AppHandle<R>,
event_tx: broadcast::Sender<IpcEvent>,
) {
if !transport::peer_cred_check(&stream) {
return;
}
let mut event_rx = event_tx.subscribe();
let (stream_read, mut stream_write) = transport::split(stream);
let (incoming_tx, mut incoming_rx) = mpsc::channel::<Incoming>(16);
let reader_handle = tokio::spawn(request_reader(stream_read, incoming_tx));
loop {
tokio::select! {
incoming = incoming_rx.recv() => {
match incoming {
Some(Incoming::Request(request)) => {
let response = handle_request(
request, &cmd_tx, &app,
)
.await;
let resp_msg = IpcMessage::Response(response);
let resp_frame = match encode_frame(&resp_msg) {
Ok(f) => f,
Err(e) => {
log::warn!("IPC encode response error: {e}");
break;
}
};
if stream_write.write_all(&resp_frame).await.is_err() {
break;
}
}
Some(Incoming::Error(msg)) => {
let resp = IpcResponse {
ok: false,
data: None,
error: Some(msg),
};
let resp_msg = IpcMessage::Response(resp);
let frame = match encode_frame(&resp_msg) {
Ok(f) => f,
Err(e) => {
log::warn!("IPC encode error response: {e}");
break;
}
};
if stream_write.write_all(&frame).await.is_err() {
break;
}
}
Some(Incoming::Done) | None => break,
}
}
event_result = event_rx.recv() => {
match event_result {
Ok(event) => {
let event_msg = IpcMessage::Event(event);
let frame = match encode_frame(&event_msg) {
Ok(f) => f,
Err(e) => {
log::warn!("IPC encode event error: {e}");
break;
}
};
if stream_write.write_all(&frame).await.is_err() {
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
log::warn!("IPC client lagged {n} events");
}
Err(_) => break,
}
}
}
}
reader_handle.abort();
}
async fn read_request<R: tokio::io::AsyncRead + Unpin>(
stream: &mut R,
) -> Result<IpcRequest, ReadError> {
let payload = match transport::read_frame(stream).await {
Ok(Some(p)) => p,
Ok(None) => {
return Err(ReadError::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"connection closed",
)))
}
Err(e) => {
if e.contains("too large") {
return Err(ReadError::TooLarge(0));
}
if e.contains("zero-length") {
return Err(ReadError::ZeroLength);
}
return Err(ReadError::Io(std::io::Error::other(e)));
}
};
match serde_json::from_slice::<IpcMessage>(&payload) {
Ok(IpcMessage::Request(req)) => Ok(req),
Ok(_) => Err(ReadError::Json("expected request frame".into())),
Err(e) => Err(ReadError::Json(e.to_string())),
}
}
async fn handle_request<R: Runtime>(
request: IpcRequest,
cmd_tx: &mpsc::Sender<ManagerCommand<R>>,
app: &AppHandle<R>,
) -> IpcResponse {
match request {
IpcRequest::Start { config } => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::Start {
config,
reply,
app: app.clone(),
})
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(Ok(())) => IpcResponse {
ok: true,
data: None,
error: None,
},
Ok(Err(e)) => IpcResponse {
ok: false,
data: None,
error: Some(e.to_string()),
},
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::Stop => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx.send(ManagerCommand::Stop { reply }).await.is_err() {
return error_response("manager shut down");
}
match rx.await {
Ok(Ok(())) => IpcResponse {
ok: true,
data: None,
error: None,
},
Ok(Err(e)) => IpcResponse {
ok: false,
data: None,
error: Some(e.to_string()),
},
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::IsRunning => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::IsRunning { reply })
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(running) => IpcResponse {
ok: true,
data: Some(serde_json::json!({ "running": running })),
error: None,
},
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::GetState => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::GetState { reply })
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(status) => IpcResponse {
ok: true,
data: Some(serde_json::to_value(&status).unwrap_or_default()),
error: None,
},
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::EnableAutoRestart { config } => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::EnableAutoRestart { config, reply })
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(Ok(())) => IpcResponse {
ok: true,
data: None,
error: None,
},
Ok(Err(e)) => error_response(&e.to_string()),
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::DisableAutoRestart => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::DisableAutoRestart { reply })
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(Ok(())) => IpcResponse {
ok: true,
data: None,
error: None,
},
Ok(Err(e)) => error_response(&e.to_string()),
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::GetDesiredState => {
let (reply, rx) = tokio::sync::oneshot::channel();
if cmd_tx
.send(ManagerCommand::GetDesiredState { reply })
.await
.is_err()
{
return error_response("manager shut down");
}
match rx.await {
Ok(Some(state)) => IpcResponse {
ok: true,
data: Some(serde_json::to_value(&state).unwrap_or_default()),
error: None,
},
Ok(None) => IpcResponse {
ok: true,
data: None,
error: None,
},
Err(_) => error_response("manager dropped reply"),
}
}
IpcRequest::ValidateSetup => {
use crate::validator::SetupValidator;
let (platform, _) = crate::capabilities::CapabilityProvider::detect_platform(None);
let report = SetupValidator::validate(platform);
IpcResponse {
ok: true,
data: Some(serde_json::to_value(&report).unwrap_or_default()),
error: None,
}
}
}
}
fn error_response(msg: &str) -> IpcResponse {
IpcResponse {
ok: false,
data: None,
error: Some(msg.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::desktop::test_helpers::{
connect, read_event, read_response, send_request, setup_server_raw, unique_socket_path,
BlockingService, ImmediateSuccessService,
};
use std::time::Duration;
fn setup_server_with_factory(
factory: crate::manager::ServiceFactory<tauri::test::MockRuntime>,
) -> (
IpcServer<tauri::test::MockRuntime>,
PathBuf,
CancellationToken,
) {
setup_server_raw(factory)
}
fn setup_server() -> (
IpcServer<tauri::test::MockRuntime>,
PathBuf,
CancellationToken,
) {
setup_server_raw(Box::new(|| Box::new(BlockingService)))
}
#[tokio::test]
async fn ipc_server_accepts_connection() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let result = transport::connect(&path).await;
assert!(result.is_ok(), "client should connect");
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_start_command() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let response = read_response(&mut stream).await;
assert!(response.ok, "Start should succeed: {:?}", response.error);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_stop_command() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
send_request(&mut stream, &IpcRequest::Stop).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok, "Stop should succeed: {:?}", resp.error);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_streams_started_event() {
let (server, path, shutdown) =
setup_server_with_factory(Box::new(|| Box::new(ImmediateSuccessService)));
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
let _ = event_tx.send(IpcEvent::Started);
let event = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream))
.await
.expect("timed out waiting for Started event");
assert!(
matches!(event, IpcEvent::Started),
"Expected Started event, got {:?}",
event
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_rejects_malformed_frame() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
let payload = b"not valid json!!!";
let mut frame = Vec::with_capacity(4 + payload.len());
frame.extend_from_slice(&(payload.len() as u32).to_be_bytes());
frame.extend_from_slice(payload);
stream.write_all(&frame).await.unwrap();
let resp = read_response(&mut stream).await;
assert!(!resp.ok, "should be error response");
assert!(resp.error.is_some(), "should have error message");
send_request(&mut stream, &IpcRequest::IsRunning).await;
let resp2 = read_response(&mut stream).await;
assert!(
resp2.ok,
"connection should still work after malformed frame"
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_handles_client_disconnect() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
{
let _stream = connect(&path).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let result = transport::connect(&path).await;
assert!(
result.is_ok(),
"server should still accept connections after client disconnect"
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_is_running_returns_false_initially() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(&mut stream, &IpcRequest::IsRunning).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
assert_eq!(resp.data.unwrap()["running"], false);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_get_state_returns_idle_initially() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(&mut stream, &IpcRequest::GetState).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok, "GetState should succeed: {:?}", resp.error);
let data = resp.data.unwrap();
assert_eq!(data["state"], "idle");
assert_eq!(data["lastError"], serde_json::Value::Null);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_get_state_returns_running_after_start() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
let state = tokio::time::timeout(Duration::from_secs(2), async {
loop {
send_request(&mut stream, &IpcRequest::GetState).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok, "GetState should succeed: {:?}", resp.error);
let data = resp.data.as_ref().unwrap();
if data["state"] == "running" {
return data.clone();
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("timed out waiting for Running state");
assert_eq!(state["state"], "running");
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_stop_when_not_running() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(&mut stream, &IpcRequest::Stop).await;
let resp = read_response(&mut stream).await;
assert!(!resp.ok, "stop when not running should fail");
assert!(resp.error.unwrap().contains("not running"));
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_stopped_event_on_stop() {
let (server, path, shutdown) = setup_server();
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
let _ = event_tx.send(IpcEvent::Started);
let _ = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream)).await;
send_request(&mut stream, &IpcRequest::Stop).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
let _ = event_tx.send(IpcEvent::Stopped {
reason: "cancelled".into(),
});
let event = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream))
.await
.expect("timed out waiting for Stopped event");
assert!(
matches!(event, IpcEvent::Stopped { .. }),
"Expected Stopped event, got {:?}",
event
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_multiple_clients() {
let (server, path, shutdown) = setup_server();
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream1 = connect(&path).await;
let mut stream2 = connect(&path).await;
send_request(
&mut stream1,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp1 = read_response(&mut stream1).await;
assert!(resp1.ok);
let _ = event_tx.send(IpcEvent::Started);
let _ = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream1)).await;
let _ = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream2)).await;
send_request(&mut stream2, &IpcRequest::IsRunning).await;
let resp2 = read_response(&mut stream2).await;
assert!(resp2.ok);
assert_eq!(resp2.data.unwrap()["running"], true);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_graceful_shutdown() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let result = transport::connect(&path).await;
assert!(result.is_ok(), "should connect before shutdown");
shutdown.cancel();
let _ = handle.await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
!path.exists(),
"socket file should be removed after graceful shutdown"
);
}
#[tokio::test]
async fn ipc_server_broadcasts_events_to_all_clients() {
let (server, path, shutdown) = setup_server();
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream1 = connect(&path).await;
let mut stream2 = connect(&path).await;
send_request(
&mut stream1,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp1 = read_response(&mut stream1).await;
assert!(resp1.ok);
let _ = event_tx.send(IpcEvent::Started);
let event1 = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream1))
.await
.expect("client 1 timed out waiting for Started event");
assert!(
matches!(event1, IpcEvent::Started),
"Client 1: expected Started, got {:?}",
event1
);
let event2 = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream2))
.await
.expect("client 2 timed out waiting for broadcast Started event");
assert!(
matches!(event2, IpcEvent::Started),
"Client 2: expected broadcast Started, got {:?}",
event2
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_no_duplicate_events() {
let (server, path, shutdown) = setup_server();
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok);
let _ = event_tx.send(IpcEvent::Started);
let event = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream))
.await
.expect("timed out waiting for Started event");
assert!(matches!(event, IpcEvent::Started));
let result =
tokio::time::timeout(Duration::from_millis(100), read_event(&mut stream)).await;
assert!(
result.is_err(),
"should not receive a duplicate Started event"
);
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn peer_cred_check() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
send_request(&mut stream, &IpcRequest::IsRunning).await;
let resp = read_response(&mut stream).await;
assert!(resp.ok, "same-UID connection should pass peer-cred check");
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_bind_removes_stale_socket() {
let path = unique_socket_path();
let app = tauri::test::mock_app();
let (cmd_tx, _cmd_rx) = mpsc::channel(16);
std::fs::write(&path, b"stale").unwrap();
assert!(path.exists());
let result = IpcServer::bind(path.clone(), cmd_tx, app.handle().clone());
assert!(result.is_ok(), "bind should remove stale socket");
let _ = std::fs::remove_file(&path);
}
#[tokio::test]
async fn ipc_server_bind_rejects_symlink() {
let target = unique_socket_path();
let link = unique_socket_path();
std::fs::write(&target, b"target").unwrap();
std::os::unix::fs::symlink(&target, &link).unwrap();
let app = tauri::test::mock_app();
let (cmd_tx, _cmd_rx) = mpsc::channel(16);
let result = IpcServer::bind(link.clone(), cmd_tx, app.handle().clone());
assert!(result.is_err(), "bind should reject symlink");
let err = result.err().unwrap().to_string();
assert!(
err.contains("symlink"),
"Error should mention symlink: {err}"
);
let _ = std::fs::remove_file(&link);
let _ = std::fs::remove_file(&target);
}
#[tokio::test]
async fn ipc_server_bind_rejects_dangling_symlink() {
let link = unique_socket_path();
let target = unique_socket_path();
assert!(!target.exists(), "target must not exist for dangling test");
std::os::unix::fs::symlink(&target, &link).unwrap();
assert!(!link.exists(), "dangling symlink must report !exists()");
let app = tauri::test::mock_app();
let (cmd_tx, _cmd_rx) = mpsc::channel(16);
let result = IpcServer::bind(link.clone(), cmd_tx, app.handle().clone());
assert!(result.is_err(), "bind should reject dangling symlink");
let err = result.err().unwrap().to_string();
assert!(
err.contains("symlink"),
"Error should mention symlink: {err}"
);
let _ = std::fs::remove_file(&link);
}
#[tokio::test]
async fn ipc_server_rejects_zero_length_frame() {
let (server, path, shutdown) = setup_server();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
stream.write_all(&[0u8; 4]).await.unwrap();
let result = tokio::time::timeout(Duration::from_millis(500), async {
let mut buf = [0u8; 1];
stream.read(&mut buf).await
})
.await;
match result {
Ok(Ok(0)) => { }
Ok(Ok(n)) => {
panic!("Expected connection close after zero-length frame, but read {n} bytes");
}
Ok(Err(_)) => { }
Err(_) => { }
}
shutdown.cancel();
let _ = handle.await;
}
#[tokio::test]
async fn ipc_server_no_duplicate_events_with_relay() {
let (server, path, shutdown) = setup_server();
let event_tx = server.event_sender();
let s = shutdown.clone();
let handle = tokio::spawn(async move { server.run(s).await });
let mut stream = connect(&path).await;
let relay_tx = event_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = relay_tx.send(IpcEvent::Started);
});
send_request(
&mut stream,
&IpcRequest::Start {
config: crate::models::StartConfig::default(),
},
)
.await;
let resp = read_response(&mut stream).await;
assert!(resp.ok, "Start should succeed");
let event1 = tokio::time::timeout(Duration::from_millis(500), read_event(&mut stream))
.await
.expect("timed out waiting for first Started event");
assert!(
matches!(event1, IpcEvent::Started),
"Expected Started, got {event1:?}"
);
let result =
tokio::time::timeout(Duration::from_millis(200), read_event(&mut stream)).await;
assert!(
result.is_err(),
"should not receive a duplicate Started event when relay is active"
);
shutdown.cancel();
let _ = handle.await;
}
}