#[derive(Debug)]
pub struct LookupStream<A> {
inner: LookupStreamInner,
_phantom: PhantomData<fn() -> A>,
}
impl<A> LookupStream<A> {
fn new(swarm_tx: SwarmSender, reply_rx: mpsc::UnboundedReceiver<LookupResult>) -> Self {
Self {
inner: LookupStreamInner::Stream { swarm_tx, reply_rx },
_phantom: PhantomData,
}
}
pub(crate) fn new_err() -> Self {
Self {
inner: LookupStreamInner::SwarmNotBootstrapped { done: false },
_phantom: PhantomData,
}
}
}
#[derive(Debug)]
enum LookupStreamInner {
SwarmNotBootstrapped {
done: bool,
},
Stream {
swarm_tx: SwarmSender,
reply_rx: mpsc::UnboundedReceiver<Result<ActorRegistration<'static>, RegistryError>>,
},
}
impl<A: Actor + RemoteActor> Stream for LookupStream<A> {
type Item = Result<RemoteActorRef<A>, RegistryError>;
fn poll_next(
self: pin::Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match &mut this.inner {
LookupStreamInner::SwarmNotBootstrapped { done } => {
if *done {
Poll::Ready(None)
} else {
*done = true;
Poll::Ready(Some(Err(RegistryError::SwarmNotBootstrapped)))
}
}
LookupStreamInner::Stream { swarm_tx, reply_rx } => {
match ready!(reply_rx.poll_recv(cx)) {
Some(Ok(registration)) => {
if A::REMOTE_ID != registration.remote_id {
Poll::Ready(Some(Err(RegistryError::BadActorType)))
} else {
Poll::Ready(Some(Ok(RemoteActorRef::new(
registration.actor_id,
swarm_tx.clone(),
))))
}
}
Some(Err(err)) => Poll::Ready(Some(Err(err))),
None => Poll::Ready(None),
}
}
}
}
}