use std::future::Future;
use futures::future::TryFutureExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tower::{util::Oneshot, Service};
use zebra_chain::{chain_tip::NoChainTip, parameters::Network};
use crate::{
peer::{self, Client, ConnectedAddr, HandshakeRequest},
peer_set::ActiveConnectionCounter,
BoxError, Config, PeerSocketAddr, Request, Response,
};
#[cfg(test)]
mod tests;
pub fn connect_isolated<PeerTransport>(
network: &Network,
data_stream: PeerTransport,
user_agent: String,
) -> impl Future<Output = Result<Client, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
connect_isolated_with_inbound(network, data_stream, user_agent, nil_inbound_service)
}
pub fn connect_isolated_with_inbound<PeerTransport, InboundService>(
network: &Network,
data_stream: PeerTransport,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<Client, BoxError>>
where
PeerTransport: AsyncRead + AsyncWrite + Unpin + Send + 'static,
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let config = Config {
network: network.clone(),
..Config::default()
};
let handshake = peer::Handshake::builder()
.with_config(config)
.with_inbound_service(inbound_service)
.with_user_agent(user_agent)
.with_latest_chain_tip(NoChainTip)
.finish()
.expect("provided mandatory builder parameters");
let connected_addr = ConnectedAddr::new_isolated();
let connection_tracker = ActiveConnectionCounter::new_counter().track_connection();
Oneshot::new(
handshake,
HandshakeRequest {
data_stream,
connected_addr,
connection_tracker,
},
)
}
pub fn connect_isolated_tcp_direct(
network: &Network,
addr: impl Into<PeerSocketAddr>,
user_agent: String,
) -> impl Future<Output = Result<Client, BoxError>> {
let nil_inbound_service =
tower::service_fn(|_req| async move { Ok::<Response, BoxError>(Response::Nil) });
connect_isolated_tcp_direct_with_inbound(network, addr, user_agent, nil_inbound_service)
}
pub fn connect_isolated_tcp_direct_with_inbound<InboundService>(
network: &Network,
addr: impl Into<PeerSocketAddr>,
user_agent: String,
inbound_service: InboundService,
) -> impl Future<Output = Result<Client, BoxError>>
where
InboundService:
Service<Request, Response = Response, Error = BoxError> + Clone + Send + 'static,
InboundService::Future: Send,
{
let addr = addr.into();
let network = network.clone();
tokio::net::TcpStream::connect(*addr)
.err_into()
.and_then(move |tcp_stream| {
connect_isolated_with_inbound(&network, tcp_stream, user_agent, inbound_service)
})
}