aegis-streaming
Real-time streaming engine for the Aegis Database Platform.
Overview
aegis-streaming provides pub/sub messaging, change data capture (CDC), event sourcing, and stream processing capabilities. It enables real-time data flows and reactive applications.
Features
- Pub/Sub Messaging - Topic-based message publishing and subscription
- Change Data Capture - Track all data changes in real-time
- Event Sourcing - Store events as the source of truth
- Stream Processing - Transform and aggregate streaming data
- Persistent Subscriptions - Durable message delivery
Architecture
┌─────────────────────────────────────────────────┐
│ Streaming Engine │
├─────────────────────────────────────────────────┤
│ Channel Manager │
│ ┌──────────┬──────────────┬─────────────────┐ │
│ │ Topics │ Partitions │ Consumer │ │
│ │ │ │ Groups │ │
│ └──────────┴──────────────┴─────────────────┘ │
├─────────────────────────────────────────────────┤
│ CDC Engine │
│ ┌──────────┬──────────────┬─────────────────┐ │
│ │ WAL │ Change │ Subscription │ │
│ │ Reader │ Tracker │ Manager │ │
│ └──────────┴──────────────┴─────────────────┘ │
├─────────────────────────────────────────────────┤
│ Event Store (Append-Only) │
└─────────────────────────────────────────────────┘
Modules
| Module | Description |
|---|---|
engine |
Main streaming engine |
channel |
Pub/sub channel management |
stream |
Stream abstraction |
subscriber |
Subscription handling |
cdc |
Change data capture |
event |
Event definitions |
Usage
[]
= { = "../aegis-streaming" }
Pub/Sub Messaging
use ;
let engine = new?;
// Create a topic
engine.create_topic?;
// Publish messages
engine.publish.await?;
// Subscribe to messages
let mut subscriber = engine.subscribe.await?;
while let Some = subscriber.next.await
Change Data Capture (CDC)
use ;
let cdc = new?;
// Subscribe to changes on a table
let mut changes = cdc.subscribe.await?;
while let Some = changes.next.await
Event Sourcing
use ;
let store = new?;
// Append events
store.append.await?;
// Load aggregate from events
let events = store.load.await?;
let account = from_events;
println!;
// Subscribe to events
let mut stream = store.subscribe.await?;
while let Some = stream.next.await
Stream Processing
use ;
let processor = new;
// Define processing pipeline
processor
.source
.filter
.map
.window
.aggregate
.sink
.start
.await?;
Consumer Groups
// Multiple consumers in a group share the load
let consumer1 = engine.subscribe.await?;
let consumer2 = engine.subscribe.await?;
// Each message is delivered to only one consumer in the group
Message Delivery Guarantees
| Mode | Description |
|---|---|
AtMostOnce |
Fire and forget, may lose messages |
AtLeastOnce |
Guaranteed delivery, may duplicate |
ExactlyOnce |
Guaranteed exactly-once (with transactions) |
Configuration
[]
= "1MB"
= "7d"
[]
= 500
= false
= "30s"
[]
= true
= 10000
Tests
Test count: 31 tests
License
Apache-2.0