use futures::channel::mpsc::UnboundedSender;
use futures::stream::StreamExt;
use hypercore_protocol::{hypercore::Hypercore, Channel, ChannelReceiver, Message};
use random_access_storage::RandomAccess;
use std::fmt::Debug;
use std::sync::Arc;
use tracing::{debug, instrument};
use super::{
messaging::{create_broadcast_message, create_initial_synchronize},
on_message, PeerState,
};
use crate::{
common::{utils::Mutex, FeedEvent, FeedEventContent},
PeermergeError,
};
#[instrument(level = "debug", skip_all)]
pub(super) async fn on_feed<T>(
mut hypercore: Arc<Mutex<Hypercore<T>>>,
mut peer_state: PeerState,
mut channel: Channel,
mut channel_receiver: ChannelReceiver<Message>,
feed_event_sender: &mut UnboundedSender<FeedEvent>,
) -> Result<(), PeermergeError>
where
T: RandomAccess + Debug + Send + 'static,
{
let messages = create_initial_synchronize(&mut hypercore, &mut peer_state).await;
channel.send_batch(&messages).await?;
debug!("Start listening on channel messages");
while let Some(message) = channel_receiver.next().await {
let exit = process_message(
message,
&mut hypercore,
&mut peer_state,
&mut channel,
feed_event_sender,
)
.await?;
if exit {
break;
}
}
debug!("Exiting");
Ok(())
}
#[instrument(level = "debug", skip_all)]
pub(super) async fn on_doc_feed<T>(
mut hypercore: Arc<Mutex<Hypercore<T>>>,
mut peer_state: PeerState,
mut channel: Channel,
mut channel_receiver: ChannelReceiver<Message>,
feed_event_sender: &mut UnboundedSender<FeedEvent>,
) -> Result<(), PeermergeError>
where
T: RandomAccess + Debug + Send + 'static,
{
let mut messages: Vec<Message> = vec![create_broadcast_message(
peer_state.feeds_state.as_ref().unwrap(),
&peer_state.child_documents,
None,
)];
messages.extend(create_initial_synchronize(&mut hypercore, &mut peer_state).await);
channel.send_batch(&messages).await?;
debug!("Start listening on doc channel messages");
while let Some(message) = channel_receiver.next().await {
let exit = process_message(
message,
&mut hypercore,
&mut peer_state,
&mut channel,
feed_event_sender,
)
.await?;
if exit {
break;
}
}
debug!("Exiting");
Ok(())
}
#[instrument(level = "debug", skip_all)]
async fn process_message<T>(
message: Message,
hypercore: &mut Arc<Mutex<Hypercore<T>>>,
peer_state: &mut PeerState,
channel: &mut Channel,
feed_event_sender: &mut UnboundedSender<FeedEvent>,
) -> Result<bool, PeermergeError>
where
T: RandomAccess + Debug + Send + 'static,
{
let events = on_message(hypercore, peer_state, channel, message).await?;
let mut disconnect_channel_id = None;
for event in events {
if let FeedEventContent::FeedDisconnected { channel } = event.content {
disconnect_channel_id = Some(channel);
}
feed_event_sender.unbounded_send(event)?;
}
if let Some(id) = disconnect_channel_id {
debug!("Disconnected channel {id}");
return Ok(true);
}
Ok(false)
}