use std::io;
use std::os::unix::net::UnixListener;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use crate::RpcServer;
struct IdleState {
conn_count: usize,
deadline: Option<Instant>,
}
fn lock<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
m.lock().unwrap_or_else(|e| e.into_inner())
}
pub fn serve_unix<F: FnOnce()>(
server: Arc<RpcServer>,
path: &str,
idle_timeout: Option<Duration>,
shutdown: Arc<AtomicBool>,
on_bound: F,
) -> io::Result<()> {
let _ = std::fs::remove_file(path);
let listener = UnixListener::bind(path)?;
listener.set_nonblocking(true).ok();
on_bound();
let startup_deadline = idle_timeout.map(|t| Instant::now() + t.max(Duration::from_secs(60)));
let state = Arc::new(Mutex::new(IdleState {
conn_count: 0,
deadline: startup_deadline,
}));
let mut threads: Vec<thread::JoinHandle<()>> = Vec::new();
loop {
if shutdown.load(Ordering::Relaxed) {
break;
}
if idle_timeout.is_some() {
let st = lock(&state);
if st.conn_count == 0 {
if let Some(dl) = st.deadline {
if Instant::now() >= dl {
break;
}
}
}
}
match listener.accept() {
Ok((mut conn, _)) => {
conn.set_nonblocking(false).ok();
{
let mut st = lock(&state);
st.conn_count += 1;
st.deadline = None; }
let srv = server.clone();
let state2 = state.clone();
threads.push(thread::spawn(move || {
if let Ok(mut reader) = conn.try_clone() {
srv.serve(&mut reader, &mut conn);
}
let mut st = lock(&state2);
st.conn_count -= 1;
if st.conn_count == 0 {
if let Some(t) = idle_timeout {
st.deadline = Some(Instant::now() + t);
}
}
}));
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
thread::sleep(Duration::from_millis(50));
}
Err(_) => break,
}
}
drop(listener);
let _ = std::fs::remove_file(path);
let deadline = Instant::now() + Duration::from_secs(2);
for t in threads {
if Instant::now() >= deadline {
break;
}
let _ = t.join();
}
Ok(())
}