use std::sync::Arc;
use futures::FutureExt;
use url::Url;
use crate::{Dialer, Repo, Transport};
pub struct TcpDialer {
host: String,
port: u16,
}
pub enum TcpDialerError {
InvalidUrl(String),
RepoStopped,
}
impl std::fmt::Display for TcpDialerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self {
TcpDialerError::InvalidUrl(message) => {
write!(f, "Error creating TcpDialer: {}", message)
}
TcpDialerError::RepoStopped => write!(f, "{}", crate::Stopped {}),
}
}
}
impl std::fmt::Debug for TcpDialerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self}")
}
}
impl std::error::Error for TcpDialerError {}
impl TcpDialer {
pub fn new_host_port(host: String, port: u16) -> Self {
Self { host, port }
}
pub fn new(url: Url) -> Result<Self, TcpDialerError> {
if url.scheme() != "tcp" {
return Err(TcpDialerError::InvalidUrl(format!(
"Provided URL {url} is not of scheme tcp://!"
)));
};
let Some(host) = url.host_str() else {
return Err(TcpDialerError::InvalidUrl(format!(
"No host provided for {url}!"
)));
};
let Some(port) = url.port() else {
return Err(TcpDialerError::InvalidUrl(format!(
"No port provided for {url}!"
)));
};
Ok(Self {
host: host.to_string(),
port,
})
}
}
impl Dialer for TcpDialer {
fn url(&self) -> url::Url {
Url::parse(&format!("tcp://{}:{}", self.host, self.port)).unwrap()
}
fn connect(
&self,
) -> futures::future::BoxFuture<
'static,
Result<crate::Transport, Box<dyn std::error::Error + Send + Sync + 'static>>,
> {
let host = self.host.clone();
let port = self.port;
async move {
let io = tokio::net::TcpStream::connect((host, port)).await?;
let transport = Transport::from_tokio_io(io);
Ok(transport)
}
.boxed()
}
}
impl Repo {
pub fn dial_tcp(
&self,
url: Url,
backoff: crate::BackoffConfig,
) -> Result<crate::DialerHandle, TcpDialerError> {
let dialer = Arc::new(TcpDialer::new(url)?);
self.dial(backoff, dialer)
.map_err(|_| TcpDialerError::RepoStopped)
}
}