use crate::codec::{Cookie, Message, NewRegistration, RendezvousCodec};
use crate::handler::Error;
use crate::handler::PROTOCOL_IDENT;
use crate::substream_handler::{FutureSubstream, Next, PassthroughProtocol, SubstreamHandler};
use crate::{ErrorCode, Namespace, Registration, Ttl};
use asynchronous_codec::Framed;
use futures::{SinkExt, TryFutureExt, TryStreamExt};
use libp2p_swarm::{NegotiatedSubstream, SubstreamProtocol};
use std::task::Context;
use void::Void;
pub struct Stream(FutureSubstream<OutEvent, Error>);
impl SubstreamHandler for Stream {
type InEvent = Void;
type OutEvent = OutEvent;
type Error = Error;
type OpenInfo = OpenInfo;
fn upgrade(
open_info: Self::OpenInfo,
) -> SubstreamProtocol<PassthroughProtocol, Self::OpenInfo> {
SubstreamProtocol::new(PassthroughProtocol::new(PROTOCOL_IDENT), open_info)
}
fn new(substream: NegotiatedSubstream, info: Self::OpenInfo) -> Self {
let mut stream = Framed::new(substream, RendezvousCodec::default());
let sent_message = match info {
OpenInfo::RegisterRequest(new_registration) => Message::Register(new_registration),
OpenInfo::UnregisterRequest(namespace) => Message::Unregister(namespace),
OpenInfo::DiscoverRequest {
namespace,
cookie,
limit,
} => Message::Discover {
namespace,
cookie,
limit,
},
};
Self(FutureSubstream::new(async move {
use Message::*;
use OutEvent::*;
stream
.send(sent_message.clone())
.map_err(Error::WriteMessage)
.await?;
let received_message = stream.try_next().map_err(Error::ReadMessage).await?;
let received_message = received_message.ok_or(Error::UnexpectedEndOfStream)?;
let event = match (sent_message, received_message) {
(Register(registration), RegisterResponse(Ok(ttl))) => Registered {
namespace: registration.namespace,
ttl,
},
(Register(registration), RegisterResponse(Err(error))) => {
RegisterFailed(registration.namespace, error)
}
(Discover { .. }, DiscoverResponse(Ok((registrations, cookie)))) => Discovered {
registrations,
cookie,
},
(Discover { namespace, .. }, DiscoverResponse(Err(error))) => {
DiscoverFailed { namespace, error }
}
(.., other) => return Err(Error::BadMessage(other)),
};
stream.close().map_err(Error::WriteMessage).await?;
Ok(event)
}))
}
fn inject_event(self, event: Self::InEvent) -> Self {
void::unreachable(event)
}
fn advance(self, cx: &mut Context<'_>) -> Result<Next<Self, Self::OutEvent>, Self::Error> {
Ok(self.0.advance(cx)?.map_state(Stream))
}
}
#[derive(Debug, Clone)]
pub enum OutEvent {
Registered {
namespace: Namespace,
ttl: Ttl,
},
RegisterFailed(Namespace, ErrorCode),
Discovered {
registrations: Vec<Registration>,
cookie: Cookie,
},
DiscoverFailed {
namespace: Option<Namespace>,
error: ErrorCode,
},
}
#[allow(clippy::large_enum_variant)]
#[allow(clippy::enum_variant_names)]
#[derive(Debug)]
pub enum OpenInfo {
RegisterRequest(NewRegistration),
UnregisterRequest(Namespace),
DiscoverRequest {
namespace: Option<Namespace>,
cookie: Option<Cookie>,
limit: Option<Ttl>,
},
}