use {
super::super::{
Streams,
accept::{ConsumerHandshake, StartStream},
consumer::builder::ConsumerConfig,
status::{State, Stats},
},
crate::{
Datum,
discovery::{Discovery, PeerEntry},
network::{LocalNode, error::*, link::*},
primitives::Short,
streams::{NotAllowed, StreamNotFound, status::ChannelInfo},
},
backoff::backoff::Backoff,
core::{future::pending, ops::ControlFlow, time::Duration},
futures::{FutureExt, TryFutureExt},
iroh::EndpointAddr,
std::sync::Arc,
tokio::sync::{mpsc, watch},
tokio_util::sync::{CancellationToken, ReusableBoxFuture},
};
pub(super) struct Receiver<D: Datum> {
config: Arc<ConsumerConfig>,
discovery: Discovery,
local: LocalNode,
peer: Arc<PeerEntry>,
data_tx: mpsc::UnboundedSender<(D, usize)>,
state_tx: watch::Sender<State>,
cancel: CancellationToken,
next_recv: ReusableBoxFuture<'static, (DatumRecvResult<D>, Link<Streams>)>,
next_connect: ReusableBoxFuture<'static, Result<Link<Streams>, LinkError>>,
backoff: Option<Box<dyn Backoff + Send + Sync + 'static>>,
stats: Arc<Stats>,
}
impl<D: Datum> Receiver<D> {
pub fn spawn(
peer: PeerEntry,
local: &LocalNode,
discovery: &Discovery,
cancel: &CancellationToken,
data_tx: &mpsc::UnboundedSender<(D, usize)>,
config: Arc<ConsumerConfig>,
) -> ChannelInfo {
let local = local.clone();
let cancel = cancel.child_token();
let data_tx = data_tx.clone();
let discovery = discovery.clone();
let peer = Arc::new(peer);
let stats = Arc::new(Stats::default());
let next_recv = ReusableBoxFuture::new(pending());
let next_connect = ReusableBoxFuture::new(pending());
let (state_tx, state) = watch::channel(State::Connecting);
let channel_info = ChannelInfo {
stream_id: config.stream_id,
criteria: config.criteria.clone(),
producer_id: *peer.id(),
consumer_id: local.id(),
stats: Arc::clone(&stats),
peer: Arc::clone(&peer),
state,
};
let worker = Self {
local,
discovery,
data_tx,
state_tx,
next_recv,
next_connect,
backoff: None,
cancel: cancel.child_token(),
peer: Arc::clone(&peer),
stats: Arc::clone(&stats),
config,
};
tokio::spawn(worker.run());
channel_info
}
}
impl<D: Datum> Receiver<D> {
pub async fn run(mut self) {
let connect_fut = self.connect();
self.next_connect.set(connect_fut);
let mut conn_state = self.state_tx.subscribe();
loop {
tokio::select! {
(result, link) = &mut self.next_recv => {
self.on_next_recv(result, link);
}
result = &mut self.next_connect => {
self.handle_connect_result(result);
}
_ = conn_state.wait_for(|s| *s == State::Terminated) => {
tracing::debug!(
stream_id = %Short(self.config.stream_id),
producer_id = %Short(&self.peer.id()),
"stream subscription terminated",
);
break;
}
}
}
}
fn on_next_recv(&mut self, result: DatumRecvResult<D>, link: Link<Streams>) {
match result {
Ok((datum, bytes_len)) => {
self.data_tx.send((datum, bytes_len)).ok();
self.stats.increment_datums();
self.stats.increment_bytes(bytes_len);
if !self.cancel.is_cancelled() {
if let Some(ref mut backoff) = self.backoff {
backoff.reset();
}
self.next_recv.set(self.make_next_recv_future(link));
}
}
Err(error) => {
self.handle_recv_error(error, Some(link));
}
}
}
#[expect(clippy::unused_self)]
fn make_next_recv_future(
&self,
mut link: Link<Streams>,
) -> impl Future<Output = (Result<(D, usize), LinkError>, Link<Streams>)> + 'static
{
async move {
let fut = link.recv_with_size::<D>().map_err(LinkError::Recv);
(fut.await, link)
}
.fuse()
}
fn connect(
&mut self,
) -> impl Future<Output = Result<Link<Streams>, LinkError>> + 'static {
self.state_tx.send(State::Connecting).ok();
let net_id = *self.local.network_id();
let cancel = self.cancel.clone();
let config = Arc::clone(&self.config);
let peer_id = *self.peer.id();
let peer_addr = self.peer.address().clone();
let backoff_fut = self.apply_backoff();
let connect_fut =
self.local.connect_with_cancel::<Streams>(peer_addr, cancel);
async move {
if backoff_fut.await.is_break() {
return Err(LinkError::Cancelled);
}
tracing::debug!(
stream_id = %Short(config.stream_id),
producer_id = %Short(&peer_id),
criteria = ?config.criteria,
"connecting to stream producer",
);
let mut link = connect_fut.await?;
link
.send(&ConsumerHandshake::new(
net_id,
config.stream_id,
config.criteria.clone(),
))
.await?;
let start = link.recv::<StartStream>().await?;
if start.network_id() != net_id {
tracing::warn!(
stream_id = %Short(config.stream_id),
producer_id = %Short(&peer_id),
expected_network = %Short(net_id),
received_network = %Short(start.network_id()),
"producer is on a different network",
);
link.close(DifferentNetwork).await.ok();
return Err(LinkError::Recv(RecvError::closed(DifferentNetwork)));
}
if start.stream_id() != config.stream_id {
tracing::warn!(
stream_id = %Short(config.stream_id),
producer_id = %Short(&peer_id),
"producer is producing a different stream than requested",
);
link.close(StreamNotFound).await.ok();
return Err(LinkError::Recv(RecvError::closed(StreamNotFound)));
}
Ok(link)
}
.fuse()
}
fn handle_recv_error(&mut self, error: LinkError, _: Option<Link<Streams>>) {
let close_reason = error.close_reason().cloned();
macro_rules! unrecoverable {
() => {
self.cancel.cancel();
self.state_tx.send(State::Terminated).ok();
self.stats.disconnected();
return;
};
($msg:expr, $e:expr) => {
tracing::warn!(
stream_id = %Short(self.config.stream_id),
producer_id = %Short(&self.peer.id()),
criteria = ?self.config.criteria,
error = %$e,
$msg,
);
self.cancel.cancel();
self.state_tx.send(State::Terminated).ok();
self.stats.disconnected();
return;
};
}
match (error, close_reason) {
(LinkError::Cancelled | LinkError::Recv(RecvError::Cancelled), _) => {
unrecoverable!();
}
(LinkError::Recv(RecvError::Decode(err)), _) => {
unrecoverable!("producer sent invalid datum", err);
}
(_, Some(reason)) if reason == UnknownPeer => {
let addr = self.peer.address().clone();
let reconnect_fut = self.sync_catalog_then_connect(addr);
self.next_connect.set(reconnect_fut);
return;
}
(_, Some(reason)) if reason == NotAllowed => {
unrecoverable!("not allowed", reason);
}
(_, Some(reason)) if reason == GracefulShutdown => {
unrecoverable!("producer is shutting down", "none");
}
(e, Some(reason)) if reason == StreamNotFound => {
unrecoverable!("producer does not have the requested stream", e);
}
(e, Some(reason)) if reason == DifferentNetwork => {
unrecoverable!("producer is on a different network", e);
}
(_, Some(reason)) => {
tracing::warn!(
stream_id = %Short(self.config.stream_id),
producer_id = %Short(&self.peer.id()),
reason = %reason,
"subscription refused by producer",
);
}
(e, _) => {
tracing::warn!(
error = %e,
stream_id = %Short(self.config.stream_id),
producer_id = %Short(self.peer.id()),
);
}
}
let fut = self.connect();
self.next_connect.set(fut);
}
fn handle_connect_result(
&mut self,
result: Result<Link<Streams>, LinkError>,
) {
match result {
Ok(link) => {
tracing::info!(
stream_id = %Short(self.config.stream_id),
producer_id = %Short(&self.peer.id()),
criteria = ?self.config.criteria,
"connected to stream producer",
);
self.state_tx.send(State::Connected).ok();
self.stats.connected();
self.next_recv.set(self.make_next_recv_future(link));
}
Err(error) => {
self.handle_recv_error(error, None);
}
}
}
fn apply_backoff(
&mut self,
) -> impl Future<Output = ControlFlow<()>> + use<D> {
let producer_id = *self.peer.id();
let next_step: Result<ControlFlow<()>, Duration> = match self.backoff {
None => {
self.backoff = Some((self.config.backoff)());
Ok(ControlFlow::Continue(()))
}
Some(ref mut backoff) => {
if let Some(duration) = backoff.next_backoff() {
Err(duration)
} else {
tracing::debug!(
stream_id = %Short(self.config.stream_id),
producer_id = %Short(producer_id),
criteria = ?self.config.criteria,
"exhausted all reconnection attempts, terminating",
);
self.cancel.cancel();
self.state_tx.send(State::Terminated).ok();
Ok(ControlFlow::Break(()))
}
}
};
let cancel = self.cancel.clone();
let stream_id = self.config.stream_id;
async move {
match next_step {
Ok(step) => step,
Err(duration) => {
tracing::debug!(
stream_id = %Short(stream_id),
producer_id = %Short(producer_id),
"waiting {duration:?} before reconnecting",
);
tokio::select! {
() = tokio::time::sleep(duration) => {
ControlFlow::Continue(())
}
() = cancel.cancelled() => {
ControlFlow::Break(())
}
}
}
}
}
.fuse()
}
fn sync_catalog_then_connect(
&mut self,
peer_addr: EndpointAddr,
) -> impl Future<Output = Result<Link<Streams>, LinkError>> + 'static {
let stream_id = self.config.stream_id;
let discovery = self.discovery.clone();
let producer_id = *self.peer.id();
let connect_fut = self.connect();
tracing::trace!(
stream_id = %Short(stream_id),
producer_id = %Short(producer_id),
"producer is not recognizing this consumer, will sync catalog then reconnect",
);
async move {
discovery.sync_with(peer_addr).await.map_err(|e| {
tracing::warn!(
stream_id = %Short(stream_id),
producer_id = %Short(&producer_id),
error = %e,
"failed to sync discovery catalog with producer",
);
LinkError::Cancelled
})?;
connect_fut.await
}
.fuse()
}
}
type DatumRecvResult<D> = Result<(D, usize), LinkError>;