1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::path::PathBuf;
use async_std::io::{Read, Write};
use async_std::net::{TcpListener, TcpStream};
use async_std::os::unix::net::{UnixListener, UnixStream};
use async_tls::TlsAcceptor;
use async_trait::async_trait;
use crate::error::Error;
use crate::network::tls::{get_tls_connector, get_tls_listener};
use crate::settings::Shared;
pub fn socket_cleanup(settings: &Shared) -> Result<(), std::io::Error> {
if settings.use_unix_socket && PathBuf::from(&settings.unix_socket_path()).exists() {
std::fs::remove_file(&settings.unix_socket_path())?;
}
Ok(())
}
#[async_trait]
pub trait Listener: Sync + Send {
async fn accept<'a>(&'a self) -> Result<GenericStream, Error>;
}
pub(crate) struct TlsTcpListener {
tcp_listener: TcpListener,
tls_acceptor: TlsAcceptor,
}
#[async_trait]
impl Listener for TlsTcpListener {
async fn accept<'a>(&'a self) -> Result<GenericStream, Error> {
let (stream, _) = self.tcp_listener.accept().await?;
Ok(Box::new(self.tls_acceptor.accept(stream).await?))
}
}
#[async_trait]
impl Listener for UnixListener {
async fn accept<'a>(&'a self) -> Result<GenericStream, Error> {
let (stream, _) = self.accept().await?;
Ok(Box::new(stream))
}
}
pub trait Stream: Read + Write + Unpin + Send {}
impl Stream for UnixStream {}
impl Stream for async_tls::server::TlsStream<TcpStream> {}
impl Stream for async_tls::client::TlsStream<TcpStream> {}
pub type GenericListener = Box<dyn Listener>;
pub type GenericStream = Box<dyn Stream>;
pub async fn get_client_stream(settings: &Shared) -> Result<GenericStream, Error> {
if settings.use_unix_socket {
if !PathBuf::from(&settings.unix_socket_path()).exists() {
return Err(Error::FileNotFound(format!(
"Unix socket at path {:?}. Is the daemon started?",
&settings.unix_socket_path
)));
}
let stream = UnixStream::connect(&settings.unix_socket_path()).await?;
return Ok(Box::new(stream));
}
let address = format!("{}:{}", &settings.host, &settings.port);
let tcp_stream = TcpStream::connect(&address).await.map_err(|_| {
Error::Connection(format!(
"Failed to connect to the daemon on {}. Did you start it?",
&address
))
})?;
let tls_connector = get_tls_connector(&settings)
.await
.map_err(|err| Error::Connection(format!("Failed to initialize tls connector {}.", err)))?;
let stream = tls_connector
.connect("pueue.local", tcp_stream)
.await
.map_err(|err| Error::Connection(format!("Failed to initialize tls {}.", err)))?;
Ok(Box::new(stream))
}
pub async fn get_listener(settings: &Shared) -> Result<GenericListener, Error> {
if settings.use_unix_socket {
if PathBuf::from(&settings.unix_socket_path()).exists() {
if get_client_stream(&settings).await.is_ok() {
return Err(Error::UnixSocketExists);
}
std::fs::remove_file(&settings.unix_socket_path())?;
}
return Ok(Box::new(
UnixListener::bind(&settings.unix_socket_path()).await?,
));
}
let address = format!("{}:{}", &settings.host, &settings.port);
let tcp_listener = TcpListener::bind(&address).await?;
let tls_acceptor = get_tls_listener(&settings)?;
let tls_listener = TlsTcpListener {
tcp_listener,
tls_acceptor,
};
Ok(Box::new(tls_listener))
}