mod cchan;
pub(crate) mod chan;
pub(crate) mod config;
mod connstream;
mod holepunch;
mod ichan;
mod istream;
mod tls;
use crate::consts::ALPN_CONEC;
use crate::types::{ConecConn, ConecConnError};
use crate::Coord;
pub use chan::{BroadcastCounting, BroadcastCountingError, ClientChanError};
use chan::{ClientChan, ClientChanDriver, ClientChanRef};
use config::{CertGenError, ClientConfig};
use connstream::ConnectingStreamHandle;
pub use connstream::{ConnectingStream, ConnectingStreamError};
use holepunch::{Holepunch, HolepunchDriver, HolepunchEvent, HolepunchRef};
pub use ichan::{ClosingChannel, ConnectingChannel, IncomingChannelsError, NewChannelError};
use ichan::{IncomingChannels, IncomingChannelsDriver, IncomingChannelsRef};
pub use istream::{IncomingStreams, NewInStream, StreamId};
use istream::{IncomingStreamsDriver, IncomingStreamsRef};
use err_derive::Error;
use futures::channel::mpsc;
use quinn::{crypto::rustls::TLSError, ClientConfigBuilder, Endpoint, EndpointError, ParseError};
use std::net::UdpSocket;
#[derive(Debug, Error)]
pub enum ClientError {
#[error(display = "Adding certificate authority: {:?}", _0)]
CertificateAuthority(#[source] webpki::Error),
#[error(display = "Binding port: {:?}", _0)]
Bind(#[source] EndpointError),
#[error(display = "Connecting to coordinator: {:?}", _0)]
Connect(#[source] ConecConnError),
#[error(display = "Connecting control stream to coordinator: {:?}", _0)]
Control(#[error(source, no_from)] ConecConnError),
#[error(display = "Generating certificate for client: {:?}", _0)]
CertificateGen(#[source] CertGenError),
#[error(display = "Certificate chain: {:?}", _0)]
CertificateChain(#[source] TLSError),
#[error(display = "Ephemeral cert: {:?}", _0)]
CertificateParse(#[source] ParseError),
#[error(display = "Starting new stream: {:?}", _0)]
NewStream(#[source] ClientChanError),
}
def_into_error!(ClientError);
pub struct Client {
#[allow(dead_code)]
in_streams: IncomingStreamsRef,
in_channels: IncomingChannels,
#[allow(dead_code)]
holepunch: Option<Holepunch>,
coord: ClientChan,
ctr: u64,
}
impl Client {
pub async fn new(config: ClientConfig) -> Result<(Self, IncomingStreams), ClientError> {
let config = {
let mut config = config;
config.gen_certs()?;
config
};
let (cert, privkey, key) = config.cert_and_key.unwrap();
let mut qccb = ClientConfigBuilder::new({
let mut qcc = quinn::ClientConfig::default();
let clt_cert = cert.iter().next().unwrap().0.clone();
qcc.crypto = tls::build_rustls_client_config(clt_cert, key)?;
qcc
});
qccb.protocols(ALPN_CONEC);
if config.keylog {
qccb.enable_keylog();
}
if let Some(ca) = config.extra_ca {
qccb.add_certificate_authority(ca)?;
}
let qcc = qccb.build();
let mut endpoint = Endpoint::builder();
endpoint.default_client_config(qcc.clone());
if config.listen {
let qsc = Coord::build_config(
config.stateless_retry,
config.keylog,
cert,
privkey,
config.client_ca.clone(),
)?;
endpoint.listen(qsc);
}
let (socket, (mut endpoint, incoming)) = {
let socket = UdpSocket::bind(&config.srcaddr).map_err(EndpointError::Socket)?;
let socket2 = socket.try_clone().map_err(EndpointError::Socket)?;
(socket, endpoint.with_socket(socket2)?)
};
let (mut conn, ibi) = ConecConn::connect(&mut endpoint, &config.coord, config.addr, None).await?;
let ctrl = conn
.connect_ctrl(config.id.clone())
.await
.map_err(ClientError::Control)?;
let (stream_sender, incoming_streams) = mpsc::unbounded();
let (chan_sender, chan_events) = mpsc::unbounded();
let (in_channels, ichan_sender) = {
let (inner, sender) = IncomingChannelsRef::new(
endpoint,
config.id,
config.keepalive,
incoming,
stream_sender.clone(),
qcc,
config.client_ca,
chan_sender.clone(),
);
let driver = IncomingChannelsDriver(inner.clone());
tokio::spawn(async move { driver.await });
(IncomingChannels::new(inner, sender.clone()), sender)
};
let (holepunch, holepunch_sender) = if config.holepunch && config.listen {
let (inner, sender) = HolepunchRef::new(socket);
let driver = HolepunchDriver(inner.clone());
tokio::spawn(async move { driver.await });
(Some(Holepunch(inner)), Some(sender))
} else {
(None, None)
};
let coord = {
let inner = ClientChanRef::new(conn, ctrl, ichan_sender, holepunch_sender, chan_events, config.listen);
let driver = ClientChanDriver::new(inner.clone(), config.keepalive);
tokio::spawn(async move { driver.await });
ClientChan::new(inner, chan_sender)
};
let in_streams = IncomingStreamsRef::new(ibi, stream_sender);
let driver = IncomingStreamsDriver(in_streams.clone());
tokio::spawn(async move { driver.await });
Ok((
Self {
in_streams,
in_channels,
holepunch,
coord,
ctr: 1u64 << 63,
},
incoming_streams,
))
}
pub fn new_proxied_stream(&mut self, to: String) -> ConnectingStream {
self.new_x_stream(to, StreamId::Proxied)
}
pub fn new_direct_stream(&mut self, to: String) -> ConnectingStream {
self.new_x_stream(to, StreamId::Direct)
}
fn new_x_stream<F>(&mut self, to: String, as_id: F) -> ConnectingStream
where
F: FnOnce(u64) -> StreamId,
{
let sid = as_id(self.ctr);
self.ctr += 1;
self.new_stream_with_id(to, sid)
}
pub fn new_stream_with_id(&self, to: String, sid: StreamId) -> ConnectingStream {
match sid {
StreamId::Proxied(sid) => self.coord.new_stream(to, sid),
StreamId::Direct(sid) => self.in_channels.new_stream(to, sid),
}
}
pub fn new_channel(&mut self, to: String) -> ConnectingChannel {
let ctr = self.ctr;
self.ctr += 1;
self.in_channels.new_channel(to, ctr)
}
pub fn close_channel(&self, peer: String) -> ClosingChannel {
self.in_channels.close_channel(peer)
}
pub fn new_broadcast(&mut self, chan: String) -> ConnectingStream {
let ctr = self.ctr;
self.ctr += 1;
self.coord.new_broadcast(chan, ctr)
}
pub fn new_stream(&mut self, to: String) -> ConnectingStream {
let csnd = self.coord.get_sender();
let isnd = self.in_channels.get_sender();
let conn_chan = self.new_channel(to.clone());
let sid = self.ctr;
self.ctr += 1;
ConnectingStream::new(Some((conn_chan, csnd, isnd, to, sid))).0
}
pub fn get_broadcast_count(&mut self, chan: String) -> BroadcastCounting {
let ctr = self.ctr;
self.ctr += 1;
self.coord.get_broadcast_count(chan, ctr)
}
}