mod error;
mod pubsub;
pub mod remote_consumer;
mod subscriber;
mod types;
pub use self::error::Error;
pub use self::pubsub::Pubsub;
pub use self::subscriber::{Subscriber, SubscriptionRequest};
pub use self::types::*;
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use serde::{Deserialize, Serialize};
use super::subscriber::SubscriptionRequest;
use super::{Error, Event, Pubsub, Spec, Subscriber};
#[derive(Clone, Debug, Serialize, Eq, PartialEq, Deserialize)]
pub struct Message {
pub foo: u64,
pub bar: u64,
}
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Deserialize, Serialize)]
pub enum IndexTest {
Foo(u64),
Bar(u64),
}
impl Event for Message {
type Topic = IndexTest;
fn get_topics(&self) -> Vec<Self::Topic> {
vec![IndexTest::Foo(self.foo), IndexTest::Bar(self.bar)]
}
}
pub struct CustomPubSub {
pub storage: Arc<RwLock<HashMap<IndexTest, Message>>>,
}
#[async_trait::async_trait]
impl Spec for CustomPubSub {
type Topic = IndexTest;
type Event = Message;
type SubscriptionId = String;
type Context = ();
fn new_instance(_context: Self::Context) -> Arc<Self>
where
Self: Sized,
{
Arc::new(Self {
storage: Default::default(),
})
}
async fn fetch_events(
self: &Arc<Self>,
topics: Vec<<Self::Event as Event>::Topic>,
reply_to: Subscriber<Self>,
) where
Self: Sized,
{
let storage = self.storage.read().unwrap();
for index in topics {
if let Some(value) = storage.get(&index) {
let _ = reply_to.send(value.clone());
}
}
}
}
#[derive(Debug, Clone)]
pub enum SubscriptionReq {
Foo(u64),
Bar(u64),
}
impl SubscriptionRequest for SubscriptionReq {
type Topic = IndexTest;
type SubscriptionId = String;
fn try_get_topics(&self) -> Result<Vec<Self::Topic>, Error> {
Ok(vec![match self {
SubscriptionReq::Bar(n) => IndexTest::Bar(*n),
SubscriptionReq::Foo(n) => IndexTest::Foo(*n),
}])
}
fn subscription_name(&self) -> Arc<Self::SubscriptionId> {
Arc::new("test".to_owned())
}
}
#[tokio::test]
async fn delivery_twice_realtime() {
let pubsub = Pubsub::new(CustomPubSub::new_instance(()));
assert_eq!(pubsub.active_subscribers(), 0);
let mut subscriber = pubsub.subscribe(SubscriptionReq::Foo(2)).unwrap();
assert_eq!(pubsub.active_subscribers(), 1);
let _ = pubsub.publish_now(Message { foo: 2, bar: 1 });
let _ = pubsub.publish_now(Message { foo: 2, bar: 2 });
assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(1));
assert_eq!(subscriber.recv().await.map(|x| x.bar), Some(2));
assert!(subscriber.try_recv().is_none());
drop(subscriber);
assert_eq!(pubsub.active_subscribers(), 0);
}
#[tokio::test]
async fn read_from_storage() {
let x = CustomPubSub::new_instance(());
let storage = x.storage.clone();
let pubsub = Pubsub::new(x);
{
let mut s = storage.write().unwrap();
s.insert(IndexTest::Bar(2), Message { foo: 3, bar: 2 });
}
let mut subscriber = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(3));
let _ = pubsub.publish_now(Message { foo: 1, bar: 2 });
assert_eq!(subscriber.recv().await.map(|x| x.foo), Some(1));
{
let mut s = storage.write().unwrap();
s.insert(IndexTest::Bar(2), Message { foo: 1, bar: 2 });
}
let mut y = pubsub.subscribe(SubscriptionReq::Bar(2)).unwrap();
assert_eq!(y.recv().await.map(|x| x.foo), Some(1));
}
}