pub mod client_handler;
pub mod session_bridge;
mod session_relay;
mod session_setup;
mod shared;
pub mod socket;
use crate::session::SessionManager;
use std::sync::Arc;
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Mutex;
use tracing::{info, warn};
pub use socket::socket_path;
pub(super) async fn drop_blocking_with_timeout<T: Send + 'static>(value: T, label: &str) {
let label = label.to_string();
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
tokio::task::spawn_blocking(move || drop(value)),
)
.await;
match result {
Ok(Ok(())) => {}
Ok(Err(join_err)) => warn!(%label, error = %join_err, "drop task panicked"),
Err(_) => warn!(%label, "timed out dropping value on blocking thread"),
}
}
const CLEANUP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(30);
pub async fn run_server() -> anyhow::Result<()> {
use nix::sys::signal::{signal, SigHandler, Signal};
unsafe { signal(Signal::SIGHUP, SigHandler::SigIgn) }
.map_err(|e| anyhow::anyhow!("failed to ignore SIGHUP: {}", e))?;
let path = socket_path()?;
if path.exists() {
match tokio::net::UnixStream::connect(&path).await {
Ok(_) => {
anyhow::bail!("another server is already running on {:?}", path);
}
Err(_) => {
if let Err(e) = std::fs::remove_file(&path) {
warn!(path = ?path, error = %e, "failed to remove stale socket");
}
}
}
}
let listener = UnixListener::bind(&path)?;
info!(path = ?path, "server listening");
let _socket_guard = SocketGuard(path.clone());
let manager = Arc::new(Mutex::new(SessionManager::new()));
let cleanup_manager = manager.clone();
let cleanup_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
loop {
interval.tick().await;
let dead_sessions = {
let mut mgr = cleanup_manager.lock().await;
mgr.take_dead_sessions()
};
if !dead_sessions.is_empty() {
drop_blocking_with_timeout(dead_sessions, "dead session cleanup").await;
}
}
});
let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?;
let mut sigint = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())?;
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _)) => {
if !peer_uid_allowed(&stream) {
continue;
}
let manager = manager.clone();
tokio::spawn(async move {
if let Err(e) = client_handler::handle_client(stream, manager).await {
warn!(error = %e, "client error");
}
});
}
Err(e) => {
warn!(error = %e, "accept failed, retrying");
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
_ = sigterm.recv() => {
info!("received SIGTERM, shutting down");
break;
}
_ = sigint.recv() => {
info!("received SIGINT, shutting down");
break;
}
}
}
cleanup_handle.abort();
let _ = cleanup_handle.await;
let all_sessions: Vec<crate::session::Session> = {
let mut mgr = manager.lock().await;
mgr.drain_all()
};
if !all_sessions.is_empty() {
info!(
count = all_sessions.len(),
"cleaning up sessions on shutdown"
);
drop_blocking_with_timeout(all_sessions, "shutdown session cleanup").await;
}
Ok(())
}
fn peer_uid_allowed(stream: &UnixStream) -> bool {
let euid = nix::unistd::geteuid().as_raw();
match stream.peer_cred() {
Ok(cred) if cred.uid() == euid => true,
Ok(cred) => {
warn!(
peer_uid = cred.uid(),
server_uid = euid,
"rejecting connection from foreign uid"
);
false
}
Err(e) => {
warn!(error = %e, "cannot read peer credentials, rejecting connection");
false
}
}
}
struct SocketGuard(std::path::PathBuf);
impl Drop for SocketGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}