1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
use crate::client::UnixClientTransport;
use crate::misc::parse_uds_addr;
use async_trait::async_trait;
use rsocket_rust::{error::RSocketError, transport::ServerTransport, Result};
use tokio::net::UnixListener;

#[derive(Debug)]
pub struct UnixServerTransport {
    addr: String,
    listener: Option<UnixListener>,
}

impl UnixServerTransport {
    fn new(addr: String) -> UnixServerTransport {
        UnixServerTransport {
            addr,
            listener: None,
        }
    }
}

#[async_trait]
impl ServerTransport for UnixServerTransport {
    type Item = UnixClientTransport;

    async fn start(&mut self) -> Result<()> {
        if self.listener.is_some() {
            return Ok(());
        }
        match UnixListener::bind(&self.addr) {
            Ok(listener) => {
                self.listener = Some(listener);
                debug!("listening on: {}", &self.addr);
                Ok(())
            }
            Err(e) => Err(RSocketError::IO(e).into()),
        }
    }

    async fn next(&mut self) -> Option<Result<Self::Item>> {
        match self.listener.as_mut() {
            Some(listener) => match listener.accept().await {
                Ok((socket, _)) => Some(Ok(UnixClientTransport::from(socket))),
                Err(e) => Some(Err(RSocketError::IO(e).into())),
            },
            None => None,
        }
    }
}

impl Drop for UnixServerTransport {
    fn drop(&mut self) {
        if let Err(e) = std::fs::remove_file(&self.addr) {
            warn!("remove unix sock file failed: {}", e);
        }
    }
}

impl From<String> for UnixServerTransport {
    fn from(addr: String) -> UnixServerTransport {
        UnixServerTransport::new(parse_uds_addr(addr))
    }
}

impl From<&str> for UnixServerTransport {
    fn from(addr: &str) -> UnixServerTransport {
        UnixServerTransport::new(parse_uds_addr(addr))
    }
}