use std::{io, net::SocketAddr, time::Duration};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream, ToSocketAddrs},
task::JoinHandle,
};
use super::{registry, wire};
const DEFAULT_GRAVE_WINDOW: Duration = Duration::from_secs(5);
#[derive(Debug, Clone)]
pub struct Console {
grave_window: Duration,
}
impl Default for Console {
fn default() -> Self {
Console {
grave_window: DEFAULT_GRAVE_WINDOW,
}
}
}
impl Console {
pub fn builder() -> Console {
Console::default()
}
pub fn grave_window(mut self, grave_window: Duration) -> Self {
self.grave_window = grave_window;
self
}
pub async fn serve(self, addr: impl ToSocketAddrs) -> io::Result<ConsoleHandle> {
let listener = TcpListener::bind(addr).await?;
let local_addr = listener.local_addr()?;
let grave_window = self.grave_window;
let task = tokio::spawn(async move {
loop {
match listener.accept().await {
Ok((stream, _peer)) => {
tokio::spawn(serve_client(stream, grave_window));
}
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::warn!("console failed to accept connection: {_err}");
}
}
}
});
Ok(ConsoleHandle { task, local_addr })
}
}
pub async fn serve(addr: impl ToSocketAddrs) -> io::Result<ConsoleHandle> {
Console::builder().serve(addr).await
}
#[derive(Debug)]
#[must_use = "keep the handle to later call shutdown(); dropping it detaches the server, which keeps running"]
pub struct ConsoleHandle {
task: JoinHandle<()>,
local_addr: SocketAddr,
}
impl ConsoleHandle {
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
pub fn shutdown(self) {
self.task.abort();
}
}
async fn serve_client(mut stream: TcpStream, grave_window: Duration) {
let mut request = [0u8; 1];
loop {
if stream.read_exact(&mut request).await.is_err() {
break;
}
let message = wire::Message::Snapshot(registry::snapshot(grave_window).await);
let bytes = match rmp_serde::to_vec_named(&message) {
Ok(bytes) => bytes,
Err(_err) => {
#[cfg(feature = "tracing")]
tracing::error!("console failed to encode snapshot: {_err}");
break;
}
};
let len = (bytes.len() as u32).to_be_bytes();
if stream.write_all(&len).await.is_err() || stream.write_all(&bytes).await.is_err() {
break;
}
}
}