#![feature(doc_cfg)]
mod alertable;
mod event;
mod publisher;
mod ring_buffer;
mod sequence;
mod subscriber;
mod wait_strategy;
#[cfg(feature = "async")]
#[doc(cfg(feature = "async"))]
mod futures;
#[cfg(feature = "async")]
#[doc(cfg(feature = "async"))]
pub use crate::futures::{AsyncPublisher, AsyncSubscriber, PublishError};
#[cfg(feature = "async")]
#[doc(cfg(feature = "async"))]
pub use ::futures::{SinkExt, StreamExt};
pub use event::EventRead;
pub use publisher::Publisher;
pub use subscriber::Subscriber;
pub use wait_strategy::WaitStrategy;
use crate::ring_buffer::RingBuffer;
use crate::sequence::Sequence;
use std::sync::Arc;
#[derive(Clone)]
pub struct Eventador {
ring: Arc<RingBuffer>,
}
impl Eventador {
pub fn new(capacity: u64) -> anyhow::Result<Self> {
Ok(Self {
ring: Arc::new(RingBuffer::new(capacity, WaitStrategy::AllSubscribers)?),
})
}
pub fn with_strategy(capacity: u64, wait_strategy: WaitStrategy) -> anyhow::Result<Self> {
Ok(Self {
ring: Arc::new(RingBuffer::new(capacity, wait_strategy)?),
})
}
pub fn publish<T: 'static + Send + Sync>(&self, message: T) {
let sequence = self.ring.next();
if let Some(event_store) = self.ring.get_envelope(sequence).clone() {
event_store.overwrite::<T>(sequence, message);
}
}
pub fn publisher(&self) -> Publisher {
Publisher::new(self.ring.clone())
}
pub fn subscribe<T: 'static + Send>(&self) -> Subscriber<T> {
let sequence = Arc::new(Sequence::with_value(self.ring.sequencer().get() + 1));
self.ring
.sequencer()
.register_gating_sequence(sequence.clone());
Subscriber::new(self.ring.clone(), sequence)
}
#[cfg(feature = "async")]
#[doc(cfg(feature = "async"))]
pub fn async_publisher<T: 'static + Send + Sync + Unpin>(
&self,
buffer_size: usize,
) -> AsyncPublisher<T> {
AsyncPublisher::new(self.ring.clone(), buffer_size)
}
#[cfg(feature = "async")]
#[doc(cfg(feature = "async"))]
pub fn async_subscriber<T: Send + Unpin>(&self) -> AsyncSubscriber<T> {
let sequence = Arc::new(Sequence::with_value(self.ring.sequencer().get() + 1));
self.ring
.sequencer()
.register_gating_sequence(sequence.clone());
AsyncSubscriber::new(self.ring.clone(), sequence)
}
}
impl From<RingBuffer> for Eventador {
fn from(ring: RingBuffer) -> Self {
Self {
ring: Arc::new(ring),
}
}
}
impl From<Arc<RingBuffer>> for Eventador {
fn from(ring: Arc<RingBuffer>) -> Self {
Self { ring }
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "async")]
use crate::futures::publisher::{AsyncPublisher, PublishError};
#[cfg(feature = "async")]
use futures::{
future::{AbortHandle, Abortable},
SinkExt, StreamExt,
};
#[cfg(feature = "async")]
use async_channel::unbounded;
#[cfg(feature = "async")]
use ntest::timeout;
use crate::Eventador;
#[test]
fn publish_and_subscribe() {
let res = Eventador::new(2);
assert!(res.is_ok());
let eventbus: Eventador = res.unwrap();
let subscriber = eventbus.subscribe::<usize>();
assert_eq!(1, subscriber.sequence());
let mut i: usize = 1234;
eventbus.publish(i);
let mut msg = subscriber.recv();
assert_eq!(i, *msg);
i += 1111;
let eventbus2 = eventbus.clone();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_secs(1));
eventbus2.publish(i);
});
msg = subscriber.recv();
assert_eq!(i, *msg);
}
#[async_std::test]
#[timeout(5000)]
#[cfg(feature = "async")]
async fn async_publish() {
println!("Starting test!");
let res = Eventador::new(4);
assert!(res.is_ok());
let eventbus: Eventador = res.unwrap();
let mut subscriber = eventbus.async_subscriber::<usize>();
let mut publisher: AsyncPublisher<usize> = eventbus.async_publisher(4);
let (sender, mut receiver) = unbounded::<Result<usize, PublishError>>();
let mut i: usize = 1234;
let mut sent = sender.send(Ok(i)).await;
assert!(sent.is_ok());
let (handle, reg) = AbortHandle::new_pair();
async_std::task::spawn(Abortable::new(
async move {
publisher.send_all(&mut receiver).await.unwrap();
},
reg,
));
let mut msg = subscriber.next().await.unwrap();
assert_eq!(i, *msg);
println!("Passed part 1!");
i += 1111;
let eventbus2 = eventbus.clone();
async_std::task::spawn(async move {
async_std::task::sleep(std::time::Duration::from_secs(1)).await;
eventbus2.publish(i);
});
msg = subscriber.next().await.unwrap();
assert_eq!(i, *msg);
println!("Passed part 2!");
i += 1111;
sent = sender.send(Ok(i)).await;
assert!(sent.is_ok());
msg = subscriber.next().await.unwrap();
assert_eq!(i, *msg);
println!("Passed part 3! Done.");
handle.abort();
}
#[derive(Debug, Eq, PartialEq)]
enum TestEnum {
SampleA,
}
#[test]
fn enum_specific_subscription() {
let res = Eventador::new(4);
assert!(res.is_ok());
println!("Passed part 1!");
let eventbus: Eventador = res.unwrap();
let subscriber = eventbus.subscribe::<TestEnum>();
assert_eq!(1, subscriber.sequence());
println!("Passed part 2!");
eventbus.publish(TestEnum::SampleA);
let msg = subscriber.recv();
assert_eq!(TestEnum::SampleA, *msg);
println!("Passed part 3! Done.");
}
}