use std::io;
use std::iter::once;
use std::str::{self, FromStr};
use std::time::Duration;
use gistit_project::var;
use gistit_proto::bytes::BytesMut;
use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed};
use libp2p::core::ProtocolName;
use libp2p::futures::{AsyncRead, AsyncWrite, AsyncWriteExt};
use libp2p::{autonat, Multiaddr, NetworkBehaviour};
use libp2p::autonat::{Behaviour as Autonat, Event as AutonatEvent};
use libp2p::core::PeerId;
use libp2p::identify::{Identify, IdentifyConfig, IdentifyEvent};
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p::ping::{Behaviour as PingBehaviour, Config as PingConfig, Event as PingEvent, Ping};
use libp2p::relay::v2::client::{self, Client, Event as ClientEvent};
use libp2p::relay::v2::relay::{self, Event as RelayEvent, Relay};
use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseCodec, RequestResponseConfig,
RequestResponseEvent,
};
use async_trait::async_trait;
use gistit_proto::prost::Message;
use gistit_proto::Gistit;
use crate::config::Config;
use crate::Result;
pub const BOOTNODES: [&str; 4] = [
"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
];
pub const BOOTADDR: &str = "/dnsaddr/bootstrap.libp2p.io";
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event", event_process = false)]
pub struct Behaviour {
pub request_response: RequestResponse<ExchangeCodec>,
pub kademlia: Kademlia<MemoryStore>,
pub identify: Identify,
pub relay: Relay,
pub autonat: Autonat,
pub ping: Ping,
pub client: Client,
}
impl Behaviour {
pub fn new_behaviour_and_transport(
config: &Config,
) -> Result<(Self, client::transport::ClientTransport)> {
let request_response = RequestResponse::new(
ExchangeCodec,
once((ExchangeProtocol, ProtocolSupport::Full)),
RequestResponseConfig::default(),
);
let kademlia = {
let mut cfg = KademliaConfig::default();
cfg.set_query_timeout(Duration::from_secs(5 * 60));
let store = MemoryStore::new(config.peer_id);
let mut behaviour = Kademlia::with_config(config.peer_id, store, cfg);
let bootaddr = Multiaddr::from_str(BOOTADDR)?;
if config.bootstrap {
for peer in BOOTNODES {
behaviour.add_address(
&PeerId::from_str(peer).expect("peer id to be valid"),
bootaddr.clone(),
);
}
behaviour.bootstrap().expect("to bootstrap");
}
behaviour
};
let identify = Identify::new(IdentifyConfig::new(
"/ipfs/0.1.0".into(),
config.keypair.public(),
));
let relay = relay::Relay::new(
PeerId::from(config.keypair.public()),
relay::Config::default(),
);
let (client_transport, client) =
client::Client::new_transport_and_behaviour(config.peer_id);
let autonat = {
let mut behaviour = autonat::Behaviour::new(
PeerId::from(config.keypair.public()),
autonat::Config::default(),
);
if config.bootstrap {
for peer in BOOTNODES {
let bootaddr = Multiaddr::from_str(BOOTADDR)?;
behaviour.add_server(
PeerId::from_str(peer).expect("peer id to be valid"),
Some(bootaddr),
);
}
}
behaviour
};
let ping = PingBehaviour::new(PingConfig::new().with_keep_alive(true));
Ok((
Self {
request_response,
kademlia,
identify,
relay,
autonat,
ping,
client,
},
client_transport,
))
}
}
#[derive(Debug)]
pub enum Event {
RequestResponse(RequestResponseEvent<Request, Response>),
Kademlia(KademliaEvent),
Identify(IdentifyEvent),
Relay(RelayEvent),
Autonat(AutonatEvent),
Ping(PingEvent),
Client(ClientEvent),
}
impl From<RequestResponseEvent<Request, Response>> for Event {
fn from(event: RequestResponseEvent<Request, Response>) -> Self {
Self::RequestResponse(event)
}
}
impl From<KademliaEvent> for Event {
fn from(event: KademliaEvent) -> Self {
Self::Kademlia(event)
}
}
impl From<IdentifyEvent> for Event {
fn from(event: IdentifyEvent) -> Self {
Self::Identify(event)
}
}
impl From<RelayEvent> for Event {
fn from(event: RelayEvent) -> Self {
Self::Relay(event)
}
}
impl From<AutonatEvent> for Event {
fn from(event: AutonatEvent) -> Self {
Self::Autonat(event)
}
}
impl From<PingEvent> for Event {
fn from(event: PingEvent) -> Self {
Self::Ping(event)
}
}
impl From<ClientEvent> for Event {
fn from(event: ClientEvent) -> Self {
Self::Client(event)
}
}
#[derive(Debug, Clone)]
pub struct ExchangeProtocol;
impl ProtocolName for ExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
b"/gistit/1"
}
}
#[derive(Clone)]
pub struct ExchangeCodec;
#[derive(Debug, Clone, PartialEq)]
pub struct Request(pub Vec<u8>);
#[derive(Debug, Clone, PartialEq)]
pub struct Response(pub Gistit);
impl std::fmt::Display for Response {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
impl std::error::Error for Response {
fn description(&self) -> &str {
"failed to respond"
}
}
#[async_trait]
impl RequestResponseCodec for ExchangeCodec {
type Protocol = ExchangeProtocol;
type Request = Request;
type Response = Response;
async fn read_request<T: Send + Unpin + AsyncRead>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Request> {
let hash = read_length_prefixed(io, var::GISTIT_HASH_LENGTH).await?;
log::debug!("Read request {:?}", std::str::from_utf8(&hash).unwrap());
if hash.is_empty() {
Err(io::ErrorKind::UnexpectedEof.into())
} else {
Ok(Request(hash))
}
}
async fn read_response<T: Send + Unpin + AsyncRead>(
&mut self,
_: &Self::Protocol,
io: &mut T,
) -> io::Result<Self::Response> {
let bytes = read_length_prefixed(io, var::GISTIT_MAX_SIZE).await?;
let gistit = Gistit::decode(&*bytes).map_err(|_| io::ErrorKind::InvalidInput)?;
log::debug!("Read response: {:?}", gistit);
if bytes.is_empty() {
Err(io::ErrorKind::UnexpectedEof.into())
} else {
Ok(Response(gistit))
}
}
async fn write_request<T: Send + Unpin + AsyncWrite>(
&mut self,
_: &Self::Protocol,
io: &mut T,
Request(req): Self::Request,
) -> io::Result<()> {
log::debug!("Write request {:?}", std::str::from_utf8(&req).unwrap());
write_length_prefixed(io, req).await?;
io.close().await?;
Ok(())
}
async fn write_response<T>(
&mut self,
_: &Self::Protocol,
io: &mut T,
Response(gistit): Self::Response,
) -> io::Result<()>
where
T: AsyncWrite + Unpin + Send,
{
let mut buf = BytesMut::with_capacity(var::GISTIT_MAX_SIZE);
gistit
.encode(&mut buf)
.map_err(|_| io::ErrorKind::InvalidInput)?;
log::debug!("Write response {:?} bytes", buf.len());
write_length_prefixed(io, buf).await?;
io.close().await?;
Ok(())
}
}