use crate::{HypercoreError, common::BitfieldUpdate};
use async_broadcast::{InactiveReceiver, Receiver, Sender, broadcast};
static MAX_EVENT_QUEUE_CAPACITY: usize = 32;
#[derive(Debug, Clone)]
pub struct Get {
pub index: u64,
pub get_result: Sender<()>,
}
#[derive(Debug, Clone)]
pub struct DataUpgrade {}
#[derive(Debug, Clone)]
pub struct Have {
pub start: u64,
pub length: u64,
pub drop: bool,
}
impl From<&BitfieldUpdate> for Have {
fn from(
BitfieldUpdate {
start,
length,
drop,
}: &BitfieldUpdate,
) -> Self {
Have {
start: *start,
length: *length,
drop: *drop,
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
Get(Get),
DataUpgrade(DataUpgrade),
Have(Have),
}
macro_rules! impl_from_for_enum_variant {
($enum_name:ident, $variant_and_msg_name:ident) => {
impl From<$variant_and_msg_name> for $enum_name {
fn from(value: $variant_and_msg_name) -> Self {
$enum_name::$variant_and_msg_name(value)
}
}
};
}
impl_from_for_enum_variant!(Event, Get);
impl_from_for_enum_variant!(Event, DataUpgrade);
impl_from_for_enum_variant!(Event, Have);
#[derive(Debug)]
pub(crate) struct Events {
pub(crate) channel: Sender<Event>,
_receiver: InactiveReceiver<Event>,
}
impl Events {
pub(crate) fn new() -> Self {
let (mut channel, receiver) = broadcast(MAX_EVENT_QUEUE_CAPACITY);
channel.set_await_active(false);
let mut _receiver = receiver.deactivate();
_receiver.set_overflow(true);
Self { channel, _receiver }
}
pub(crate) fn send<T: Into<Event>>(&self, evt: T) -> Result<(), HypercoreError> {
let _errs_when_no_replicators_subscribed = self.channel.try_broadcast(evt.into());
Ok(())
}
pub(crate) fn send_on_get(&self, index: u64) -> Receiver<()> {
let (mut tx, rx) = broadcast(1);
tx.set_await_active(false);
let _ = self.send(Get {
index,
get_result: tx,
});
rx
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::replication::CoreMethodsError;
#[async_std::test]
async fn test_events() -> Result<(), CoreMethodsError> {
let mut core = crate::core::tests::create_hypercore_with_data(0).await?;
let mut rx = core.event_subscribe();
let handle = async_std::task::spawn(async move {
let mut out = vec![];
loop {
if out.len() == 2 {
return (out, rx);
}
if let Ok(evt) = rx.recv().await {
out.push(evt);
}
}
});
core.append(b"foo").await?;
let (res, mut rx) = handle.await;
assert!(matches!(res[0], Event::DataUpgrade(_)));
assert!(matches!(
res[1],
Event::Have(Have {
start: 0,
length: 1,
drop: false
})
));
assert!(rx.is_empty());
let handle = async_std::task::spawn(async move {
let mut out = vec![];
loop {
if out.len() == 1 {
return (out, rx);
}
if let Ok(evt) = rx.recv().await {
out.push(evt);
}
}
});
assert_eq!(core.get(1).await?, None);
let (res, rx) = handle.await;
assert!(matches!(
res[0],
Event::Get(Get {
index: 1,
get_result: _
})
));
assert!(rx.is_empty());
Ok(())
}
}