#[cfg(windows)]
use futures::{SinkExt, StreamExt};
#[cfg(windows)]
use std::sync::Arc;
#[cfg(windows)]
use std::time::Duration;
#[cfg(windows)]
use tokio::net::windows::named_pipe::{NamedPipeServer, ServerOptions};
#[cfg(windows)]
use tokio::sync::{mpsc, Mutex};
#[cfg(windows)]
use tokio_util::codec::Framed;
#[cfg(windows)]
use tracing::{debug, error, info, instrument, warn};
#[cfg(windows)]
use crate::core::codec::PacketCodec;
#[cfg(windows)]
use crate::error::Result;
#[cfg(windows)]
#[instrument(skip(pipe_name), fields(pipe = %pipe_name))]
pub async fn start_server(pipe_name: &str) -> Result<()> {
let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1);
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
info!("Received CTRL+C signal, shutting down");
let _ = shutdown_tx_clone.send(()).await;
}
});
start_server_with_shutdown(pipe_name, shutdown_rx).await
}
#[cfg(windows)]
#[instrument(skip(pipe_name, shutdown_rx), fields(pipe = %pipe_name))]
pub async fn start_server_with_shutdown(
pipe_name: &str,
mut shutdown_rx: mpsc::Receiver<()>,
) -> Result<()> {
info!(pipe = %pipe_name, "Starting named pipe server");
let active_connections = Arc::new(Mutex::new(0u32));
let mut server = ServerOptions::new()
.first_pipe_instance(true)
.create(pipe_name)?;
info!(pipe = %pipe_name, "Named pipe server listening");
loop {
tokio::select! {
_ = shutdown_rx.recv() => {
info!("Shutting down server. Waiting for connections to close...");
let timeout = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(timeout);
loop {
tokio::select! {
_ = &mut timeout => {
warn!("Shutdown timeout reached, forcing exit");
break;
}
_ = tokio::time::sleep(Duration::from_millis(500)) => {
let connections = *active_connections.lock().await;
info!(connections = %connections, "Waiting for connections to close");
if connections == 0 {
info!("All connections closed, shutting down");
break;
}
}
}
}
return Ok(());
}
result = server.connect() => {
match result {
Ok(()) => {
let active_connections = active_connections.clone();
{
let mut count = active_connections.lock().await;
*count += 1;
info!(connections = *count, "New pipe connection established");
}
let client_pipe = server;
server = ServerOptions::new().create(pipe_name)?;
tokio::spawn(async move {
handle_pipe_connection(client_pipe, active_connections).await;
});
}
Err(e) => {
error!(error = %e, "Error accepting pipe connection");
match ServerOptions::new().create(pipe_name) {
Ok(new_server) => {
server = new_server;
debug!("Recreated server pipe after error");
}
Err(recreate_err) => {
error!(error = %recreate_err, "Failed to recreate server pipe");
return Err(recreate_err.into());
}
}
}
}
}
}
}
}
#[cfg(windows)]
async fn handle_pipe_connection(pipe: NamedPipeServer, active_connections: Arc<Mutex<u32>>) {
let mut framed = Framed::new(pipe, PacketCodec);
while let Some(result) = framed.next().await {
match result {
Ok(packet) => {
debug!("Received packet of {} bytes", packet.payload.len());
if let Err(e) = framed.send(packet).await {
error!(error = %e, "Failed to send packet");
break;
}
}
Err(e) => {
error!(error = %e, "Error reading from pipe");
break;
}
}
}
let mut count = active_connections.lock().await;
*count -= 1;
info!(connections = *count, "Pipe connection closed");
}
#[cfg(windows)]
#[instrument(skip(pipe_name), fields(pipe = %pipe_name))]
pub async fn connect(
pipe_name: &str,
) -> Result<Framed<tokio::net::windows::named_pipe::NamedPipeClient, PacketCodec>> {
use tokio::net::windows::named_pipe::ClientOptions;
let client = ClientOptions::new().open(pipe_name)?;
info!(pipe = %pipe_name, "Connected to named pipe");
Ok(Framed::new(client, PacketCodec))
}
#[cfg(not(windows))]
compile_error!("This module is only available on Windows");