1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::path::PathBuf;

use async_std::io::{Read, Write};
use async_std::net::{TcpListener, TcpStream};
use async_std::os::unix::net::{UnixListener, UnixStream};
use async_tls::TlsAcceptor;
use async_trait::async_trait;

use crate::error::Error;
use crate::network::tls::{get_tls_connector, get_tls_listener};
use crate::settings::Shared;

/// Unix specific cleanup handling when getting a SIGINT/SIGTERM.
pub fn socket_cleanup(settings: &Shared) -> Result<(), std::io::Error> {
    // Clean up the unix socket if we're using it and it exists.
    if settings.use_unix_socket && PathBuf::from(&settings.unix_socket_path()).exists() {
        std::fs::remove_file(&settings.unix_socket_path())?;
    }

    Ok(())
}

/// A new trait, which can be used to represent Unix- and TcpListeners. \
/// This is necessary to easily write generic functions where both types can be used.
#[async_trait]
pub trait Listener: Sync + Send {
    async fn accept<'a>(&'a self) -> Result<GenericStream, Error>;
}

/// This is a helper struct for TCP connections.
/// TCP should always be used in conjunction with TLS.
/// That's why this helper exists, which encapsulates the logic of accepting a new
/// connection and initializing the TLS layer on top of it.
/// This way we can expose an `accept` function and implement the Listener trait.
pub(crate) struct TlsTcpListener {
    tcp_listener: TcpListener,
    tls_acceptor: TlsAcceptor,
}

#[async_trait]
impl Listener for TlsTcpListener {
    async fn accept<'a>(&'a self) -> Result<GenericStream, Error> {
        let (stream, _) = self.tcp_listener.accept().await?;
        Ok(Box::new(self.tls_acceptor.accept(stream).await?))
    }
}

#[async_trait]
impl Listener for UnixListener {
    async fn accept<'a>(&'a self) -> Result<GenericStream, Error> {
        let (stream, _) = self.accept().await?;
        Ok(Box::new(stream))
    }
}

/// A new trait, which can be used to represent Unix- and Tls encrypted TcpStreams. \
/// This is necessary to write generic functions where both types can be used.
pub trait Stream: Read + Write + Unpin + Send {}
impl Stream for UnixStream {}
impl Stream for async_tls::server::TlsStream<TcpStream> {}
impl Stream for async_tls::client::TlsStream<TcpStream> {}

/// Convenience type, so we don't have type write `Box<dyn Listener>` all the time.
pub type GenericListener = Box<dyn Listener>;
/// Convenience type, so we don't have type write `Box<dyn Stream>` all the time. \
/// This also prevents name collisions, since `Stream` is imported in many preludes.
pub type GenericStream = Box<dyn Stream>;

/// Get a new stream for the client. \
/// This can either be a UnixStream or a Tls encrypted TCPStream, depending on the parameters.
pub async fn get_client_stream(settings: &Shared) -> Result<GenericStream, Error> {
    // Create a unix socket, if the config says so.
    if settings.use_unix_socket {
        if !PathBuf::from(&settings.unix_socket_path()).exists() {
            return Err(Error::FileNotFound(format!(
                "Unix socket at path {:?}. Is the daemon started?",
                &settings.unix_socket_path
            )));
        }
        let stream = UnixStream::connect(&settings.unix_socket_path()).await?;
        return Ok(Box::new(stream));
    }

    // Connect to the daemon via TCP
    let address = format!("{}:{}", &settings.host, &settings.port);
    let tcp_stream = TcpStream::connect(&address).await.map_err(|_| {
        Error::Connection(format!(
            "Failed to connect to the daemon on {}. Did you start it?",
            &address
        ))
    })?;

    // Get the configured rustls TlsConnector
    let tls_connector = get_tls_connector(&settings)
        .await
        .map_err(|err| Error::Connection(format!("Failed to initialize tls connector {}.", err)))?;

    // Initialize the TLS layer
    let stream = tls_connector
        .connect("pueue.local", tcp_stream)
        .await
        .map_err(|err| Error::Connection(format!("Failed to initialize tls {}.", err)))?;

    Ok(Box::new(stream))
}

/// Get a new listener for the daemon. \
/// This can either be a UnixListener or a TCPlistener, depending on the parameters.
pub async fn get_listener(settings: &Shared) -> Result<GenericListener, Error> {
    if settings.use_unix_socket {
        // Check, if the socket already exists
        // In case it does, we have to check, if it's an active socket.
        // If it is, we have to throw an error, because another daemon is already running.
        // Otherwise, we can simply remove it.
        if PathBuf::from(&settings.unix_socket_path()).exists() {
            if get_client_stream(&settings).await.is_ok() {
                return Err(Error::UnixSocketExists);
            }

            std::fs::remove_file(&settings.unix_socket_path())?;
        }

        return Ok(Box::new(
            UnixListener::bind(&settings.unix_socket_path()).await?,
        ));
    }

    // This is the listener, which accepts low-level TCP connections
    let address = format!("{}:{}", &settings.host, &settings.port);
    let tcp_listener = TcpListener::bind(&address).await?;

    // This is the TLS acceptor, which initializes the TLS layer
    let tls_acceptor = get_tls_listener(&settings)?;

    // Create a struct, which accepts connections and initializes a TLS layer in one go.
    let tls_listener = TlsTcpListener {
        tcp_listener,
        tls_acceptor,
    };

    Ok(Box::new(tls_listener))
}