aurora-db 0.5.7

A lightweight, real-time embedded database with built-in PubSub, reactive queries, background workers, and intelligent caching.
Documentation
# Aurora PubSub Module

Real-time event streaming for Aurora DB. This module provides a publish/subscribe system that notifies listeners whenever documents are inserted, updated, or deleted.

## Overview

- **`PubSubSystem`** — central hub; holds per-collection broadcast channels and a global catch-all channel
- **`ChangeChannel`** — thin wrapper around `tokio::sync::broadcast`
- **`ChangeEvent`** / **`ChangeType`** — event payload (Insert / Update / Delete)
- **`EventFilter`** — composable predicate tree applied on the listener side
- **`ChangeListener`** / **`ChangeStream`** — consumer types that drive async iteration

## Source: `channel.rs`

```rust
use super::events::ChangeEvent;
use tokio::sync::broadcast;

/// Wrapper around broadcast channel for change events
pub struct ChangeChannel {
    sender: broadcast::Sender<ChangeEvent>,
}

impl ChangeChannel {
    pub fn new(buffer_size: usize) -> Self {
        let (sender, _) = broadcast::channel(buffer_size);
        Self { sender }
    }

    pub fn publish(&self, event: ChangeEvent) -> Result<(), broadcast::error::SendError<ChangeEvent>> {
        self.sender.send(event).map(|_| ())
    }

    pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
        self.sender.subscribe()
    }

    pub fn receiver_count(&self) -> usize {
        self.sender.receiver_count()
    }
}
```

## Source: `events.rs`

```rust
/// Type of change that occurred
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ChangeType { Insert, Update, Delete }

/// Change event containing information about a database modification
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChangeEvent {
    pub collection: String,
    pub change_type: ChangeType,
    pub id: String,
    pub document: Option<Document>,
    pub old_document: Option<Document>,
}

/// Composable filter applied on the listener side — zero cost when unused
#[derive(Debug, Clone)]
pub enum EventFilter {
    All,
    ChangeType(ChangeType),
    HasField(String),
    FieldEquals(String, Value),
    FieldChanged(String),
    And(Vec<EventFilter>),
    Or(Vec<EventFilter>),
    Not(Box<EventFilter>),
    Gt(String, Value), Gte(String, Value),
    Lt(String, Value), Lte(String, Value),
    Ne(String, Value),
    In(String, Value), NotIn(String, Value),
    Contains(String, Value),
    StartsWith(String, Value),
    EndsWith(String, Value),
    IsNull(String), IsNotNull(String),
    /// Regex match — stores a *pre-compiled* `regex::Regex` to avoid per-event recompilation
    Matches(String, regex::Regex),
}
```

## Source: `listener.rs`

```rust
pub struct ChangeListener {
    collection: String,
    receiver: broadcast::Receiver<ChangeEvent>,
    filter: Option<EventFilter>,
}

impl ChangeListener {
    /// Add a filter — only events matching the predicate are delivered
    pub fn filter(mut self, filter: EventFilter) -> Self { ... }

    /// Async receive; skips non-matching events transparently
    pub async fn recv(&mut self) -> Result<ChangeEvent, broadcast::error::RecvError> { ... }

    /// Non-blocking receive
    pub fn try_recv(&mut self) -> Result<ChangeEvent, broadcast::error::TryRecvError> { ... }

    /// Convert into a stream for async-for iteration
    pub fn into_stream(self) -> ChangeStream { ... }
}
```

## Source: `mod.rs`

```rust
pub struct PubSubSystem {
    channels: Arc<DashMap<String, broadcast::Sender<ChangeEvent>>>,
    global_channel: broadcast::Sender<ChangeEvent>,
    buffer_size: usize,
}

impl PubSubSystem {
    pub fn new(buffer_size: usize) -> Self { ... }

    /// Publish to the collection channel + global channel
    pub fn publish(&self, event: ChangeEvent) -> Result<()> { ... }

    /// Subscribe to a specific collection
    pub fn listen(&self, collection: impl Into<String>) -> ChangeListener { ... }

    /// Subscribe to all collections
    pub fn listen_all(&self) -> ChangeListener { ... }
}
```

## Aurora entry points

Users never construct `PubSubSystem` directly — it lives inside `Aurora` and is exposed through three methods:

### `db.stream(aql)` — AQL subscription (async)

Subscribe via an AQL `subscription { ... }` query. Returns a `ChangeListener` filtered to the collection and fields named in the query.

```rust
let mut listener = db.stream(r#"
    subscription {
        products(where: { active: { eq: true } }) {
            id
            name
        }
    }
"#).await?;

while let Ok(event) = listener.recv().await {
    println!("Change: {:?} on {}", event.change_type, event._sid);
}
```

### `db.listen(collection)` — Fluent API

Subscribe to a single collection. Chain `.filter()` to narrow events.

```rust
use aurora_db::pubsub::{EventFilter, ChangeType};

// All changes to "orders"
let mut listener = db.listen("orders");

// Only inserts where amount > 1000
let mut listener = db.listen("orders")
    .filter(EventFilter::And(vec![
        EventFilter::ChangeType(ChangeType::Insert),
        EventFilter::Gt("amount".into(), Value::Float(1000.0)),
    ]));

tokio::spawn(async move {
    while let Ok(event) = listener.recv().await {
        println!("[{}] {:?}", event._sid, event.change_type);
    }
});
```

### `db.listen_all()` — Global listener

Receives every change across all collections. Useful for audit trails, replication, and CDC.

```rust
let mut listener = db.listen_all();

tokio::spawn(async move {
    while let Ok(event) = listener.recv().await {
        println!("Change in {}: {:?}", event.collection, event.change_type);
    }
});
```

## Internal usage

```rust
let pubsub = PubSubSystem::new(256);

// Filtered listener — only active-user inserts
let mut listener = pubsub
    .listen("users")
    .filter(EventFilter::And(vec![
        EventFilter::ChangeType(ChangeType::Insert),
        EventFilter::FieldEquals("active".into(), Value::Bool(true)),
    ]));

pubsub.publish(ChangeEvent::insert("users", "u1", doc))?;

let event = listener.recv().await?;
println!("Got: {:?}", event.change_type);
```