aurora-db 0.5.7

A lightweight, real-time embedded database with built-in PubSub, reactive queries, background workers, and intelligent caching.
Documentation
pub mod events;

pub mod channel;
pub mod listener;

pub use channel::ChangeChannel;
pub use events::{ChangeEvent, ChangeType, EventFilter};
pub use listener::ChangeListener;

use crate::error::Result;
use dashmap::DashMap;
use std::sync::Arc;
use tokio::sync::broadcast;

pub struct PubSubSystem {
    channels: Arc<DashMap<String, broadcast::Sender<ChangeEvent>>>,
    global_channel: broadcast::Sender<ChangeEvent>,
    buffer_size: usize,
}

impl PubSubSystem {
    pub fn new(buffer_size: usize) -> Self {
        let (global_tx, _) = broadcast::channel(buffer_size);

        Self {
            channels: Arc::new(DashMap::new()),
            global_channel: global_tx,
            buffer_size,
        }
    }

    pub fn publish(&self, event: ChangeEvent) -> Result<()> {
        if let Some(tx) = self.channels.get(&event.collection) {
            let _ = tx.send(event.clone());
        }

        let _ = self.global_channel.send(event);

        Ok(())
    }

    pub fn listen(&self, collection: impl Into<String>) -> ChangeListener {
        let collection = collection.into();

        if !self.channels.contains_key(&collection) {
            self.channels
                .retain(|_, sender| sender.receiver_count() > 0);
        }

        let tx = self
            .channels
            .entry(collection.clone())
            .or_insert_with(|| broadcast::channel(self.buffer_size).0)
            .clone();

        ChangeListener::new(collection, tx.subscribe())
    }

    pub fn listen_all(&self) -> ChangeListener {
        ChangeListener::new("*".to_string(), self.global_channel.subscribe())
    }

    pub fn listener_count(&self, collection: &str) -> usize {
        self.channels
            .get(collection)
            .map(|tx| tx.receiver_count())
            .unwrap_or(0)
    }

    pub fn total_listeners(&self) -> usize {
        self.channels
            .iter()
            .map(|entry| entry.value().receiver_count())
            .sum::<usize>()
            + self.global_channel.receiver_count()
    }
}

impl Clone for PubSubSystem {
    fn clone(&self) -> Self {
        Self {
            channels: Arc::clone(&self.channels),
            global_channel: self.global_channel.clone(),
            buffer_size: self.buffer_size,
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{Document, Value};
    use std::collections::HashMap;

    #[tokio::test]
    async fn test_pubsub_basic() {
        let pubsub = PubSubSystem::new(100);

        let mut listener = pubsub.listen("users");

        let mut data = HashMap::new();
        data.insert("id".to_string(), Value::String("123".into()));
        data.insert("name".to_string(), Value::String("Alice".into()));

        let event = ChangeEvent::insert("users", "123", Document {
            _sid: "123".to_string(),
            data,
        });

        pubsub.publish(event.clone()).unwrap();

        let received = listener.recv().await.unwrap();
        assert_eq!(received.collection, "users");
        assert_eq!(received._sid, "123");
        assert!(matches!(received.change_type, ChangeType::Insert));
    }

    #[tokio::test]
    async fn test_pubsub_multiple_listeners() {
        let pubsub = PubSubSystem::new(100);

        let mut listener1 = pubsub.listen("users");
        let mut listener2 = pubsub.listen("users");

        assert_eq!(pubsub.listener_count("users"), 2);

        let event = ChangeEvent::delete("users", "123");

        pubsub.publish(event).unwrap();

        assert!(listener1.recv().await.is_ok());
        assert!(listener2.recv().await.is_ok());
    }

    #[tokio::test]
    async fn test_pubsub_global_listener() {
        let pubsub = PubSubSystem::new(100);

        let mut global_listener = pubsub.listen_all();

        pubsub.publish(ChangeEvent::delete("users", "1")).unwrap();
        pubsub.publish(ChangeEvent::delete("posts", "2")).unwrap();

        let event1 = global_listener.recv().await.unwrap();
        let event2 = global_listener.recv().await.unwrap();

        assert_eq!(event1.collection, "users");
        assert_eq!(event2.collection, "posts");
    }
}