use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use heed::{Env, EnvOpenOptions};
use crate::config::VarveConfig;
use crate::constants;
use crate::error::Result;
use crate::global::GlobalReader;
use crate::stream::{Stream, StreamCore};
use crate::types::{GlobalEventsDb, GlobalSequence, StreamIndexDb, StreamMetaDb};
#[cfg(feature = "notify")]
use crate::notify::WriteWatcher;
struct StreamDbs {
index_db: StreamIndexDb,
meta_db: StreamMetaDb,
}
pub struct Varve {
env: Env,
global_db: GlobalEventsDb,
next_global_seq: Arc<AtomicU64>,
stream_dbs: HashMap<String, StreamDbs>,
#[cfg(feature = "notify")]
watcher: WriteWatcher,
}
impl Varve {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
Self::with_config(path, VarveConfig::default())
}
pub fn with_config(path: impl AsRef<Path>, config: VarveConfig) -> Result<Self> {
let env = unsafe {
EnvOpenOptions::new()
.read_txn_with_tls()
.max_dbs(config.max_dbs)
.map_size(config.map_size)
.open(path)?
};
let global_db: GlobalEventsDb = {
let mut wtxn = env.write_txn()?;
let db = env.create_database(&mut wtxn, Some(constants::GLOBAL_EVENTS_DB_NAME))?;
wtxn.commit()?;
db
};
let next_global_seq = {
let rtxn = env.read_txn()?;
match global_db.last(&rtxn)? {
Some((last_key, _)) => last_key.saturating_add(1),
None => 0,
}
};
let next_global_seq_arc = Arc::new(AtomicU64::new(next_global_seq));
#[cfg(feature = "notify")]
let watcher = WriteWatcher::new(GlobalSequence(next_global_seq));
Ok(Self {
env,
global_db,
next_global_seq: next_global_seq_arc,
stream_dbs: HashMap::new(),
#[cfg(feature = "notify")]
watcher,
})
}
pub fn next_global_seq(&self) -> GlobalSequence {
GlobalSequence(self.next_global_seq.load(Ordering::Relaxed))
}
pub fn stream<T, const N: usize>(&mut self, name: &str) -> Result<Stream<T, N>> {
let (index_db, meta_db) = self.get_or_create_stream_dbs(name)?;
let stream_core = Arc::new(StreamCore {
env: self.env.clone(),
stream_name: name.to_string(),
index_db,
meta_db,
global_db: self.global_db,
next_global_seq: Arc::clone(&self.next_global_seq),
#[cfg(feature = "notify")]
watcher: self.watcher.clone(),
});
Ok(Stream::new(stream_core))
}
fn get_or_create_stream_dbs(&mut self, name: &str) -> Result<(StreamIndexDb, StreamMetaDb)> {
if let Some(dbs) = self.stream_dbs.get(name) {
return Ok((dbs.index_db, dbs.meta_db));
}
let index_db_name = format!(
"{}{}{}",
constants::STREAM_DB_PREFIX,
name,
constants::STREAM_INDEX_SUFFIX
);
let meta_db_name = format!(
"{}{}{}",
constants::STREAM_DB_PREFIX,
name,
constants::STREAM_META_SUFFIX
);
let index_db: StreamIndexDb = {
let mut wtxn = self.env.write_txn()?;
let db = self.env.create_database(&mut wtxn, Some(&index_db_name))?;
wtxn.commit()?;
db
};
let meta_db: StreamMetaDb = {
let mut wtxn = self.env.write_txn()?;
let db = self.env.create_database(&mut wtxn, Some(&meta_db_name))?;
wtxn.commit()?;
db
};
self.stream_dbs
.insert(name.to_string(), StreamDbs { index_db, meta_db });
Ok((index_db, meta_db))
}
pub fn global_reader(&self) -> GlobalReader {
GlobalReader {
env: self.env.clone(),
global_db: self.global_db,
scratch: rkyv::util::AlignedVec::new(),
#[cfg(feature = "notify")]
watcher: self.watcher.clone(),
}
}
#[cfg(feature = "notify")]
pub fn watcher(&self) -> WriteWatcher {
self.watcher.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{StreamId, StreamSequence};
use rkyv::{Archive, Deserialize, Serialize};
use tempfile::tempdir;
#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
#[rkyv(attr(derive(Debug)))]
pub struct SimpleEvent {
pub id: u64,
pub timestamp: u64,
pub value: i32,
}
#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
#[rkyv(attr(derive(Debug)))]
pub struct OrderEvent {
pub order_id: String,
pub customer_id: String,
pub amount: u64,
}
#[derive(Debug, Clone, PartialEq, Archive, Serialize, Deserialize)]
#[rkyv(attr(derive(Debug)))]
pub struct UserEvent {
pub user_id: String,
pub email: String,
pub action: String,
}
#[test]
fn test_create_stream_and_append_simple_event() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("test_stream")
.expect("Failed to create stream");
let event = SimpleEvent {
id: 1,
timestamp: 1702400000,
value: 42,
};
let stream_id = StreamId(100);
let (stream_seq, global_seq) = stream
.append(stream_id, &event)
.expect("Failed to append event");
assert_eq!(stream_seq.0, 0);
assert_eq!(global_seq.0, 0);
let bytes = stream
.get_bytes(stream_id, stream_seq)
.expect("Failed to get bytes")
.expect("Event not found");
let archived =
rkyv::access::<rkyv::Archived<SimpleEvent>, rkyv::rancor::Error>(&bytes).unwrap();
assert_eq!(archived.id, 1);
assert_eq!(archived.timestamp, 1702400000);
assert_eq!(archived.value, 42);
}
#[test]
fn test_append_multiple_events_to_same_stream() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id = StreamId(1);
for i in 0..10u64 {
let event = SimpleEvent {
id: i,
timestamp: 1702400000 + i,
value: (i * 10) as i32,
};
let (stream_seq, global_seq) = stream
.append(stream_id, &event)
.expect("Failed to append event");
assert_eq!(stream_seq.0, i);
assert_eq!(global_seq.0, i);
}
for i in 0..10u64 {
let bytes = stream
.get_bytes(stream_id, StreamSequence(i))
.expect("Failed to get bytes")
.expect("Event not found");
let archived =
rkyv::access::<rkyv::Archived<SimpleEvent>, rkyv::rancor::Error>(&bytes).unwrap();
assert_eq!(archived.id, i);
}
}
#[test]
fn test_multiple_stream_ids_in_same_stream() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id_1 = StreamId(1);
let stream_id_2 = StreamId(2);
let (seq1_0, _) = stream
.append(
stream_id_1,
&SimpleEvent {
id: 100,
timestamp: 1,
value: 1,
},
)
.unwrap();
assert_eq!(seq1_0.0, 0);
let (seq2_0, _) = stream
.append(
stream_id_2,
&SimpleEvent {
id: 200,
timestamp: 2,
value: 2,
},
)
.unwrap();
assert_eq!(seq2_0.0, 0);
let (seq1_1, _) = stream
.append(
stream_id_1,
&SimpleEvent {
id: 101,
timestamp: 3,
value: 3,
},
)
.unwrap();
assert_eq!(seq1_1.0, 1);
let bytes = stream
.get_bytes(stream_id_1, StreamSequence(0))
.unwrap()
.unwrap();
let archived =
rkyv::access::<rkyv::Archived<SimpleEvent>, rkyv::rancor::Error>(&bytes).unwrap();
assert_eq!(archived.id, 100);
let bytes = stream
.get_bytes(stream_id_2, StreamSequence(0))
.unwrap()
.unwrap();
let archived =
rkyv::access::<rkyv::Archived<SimpleEvent>, rkyv::rancor::Error>(&bytes).unwrap();
assert_eq!(archived.id, 200);
}
#[test]
fn test_multiple_stream_names_share_global_sequence() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut orders = varve
.stream::<OrderEvent, 4096>("orders")
.expect("Failed to create orders stream");
let mut users = varve
.stream::<UserEvent, 4096>("users")
.expect("Failed to create users stream");
let (stream_seq_1, global_seq_1) = orders
.append_alloc(
StreamId(1),
&OrderEvent {
order_id: "ord_001".to_string(),
customer_id: "cust_001".to_string(),
amount: 100,
},
)
.unwrap();
assert_eq!(global_seq_1.0, 0);
let (stream_seq_2, global_seq_2) = users
.append_alloc(
StreamId(1),
&UserEvent {
user_id: "usr_001".to_string(),
email: "test@example.com".to_string(),
action: "registered".to_string(),
},
)
.unwrap();
assert_eq!(global_seq_2.0, 1);
let (stream_seq_3, global_seq_3) = orders
.append_alloc(
StreamId(2),
&OrderEvent {
order_id: "ord_002".to_string(),
customer_id: "cust_002".to_string(),
amount: 200,
},
)
.unwrap();
assert_eq!(global_seq_3.0, 2);
assert_eq!(stream_seq_1.0, 0);
assert_eq!(stream_seq_2.0, 0);
assert_eq!(stream_seq_3.0, 0);
}
#[test]
fn test_same_stream_id_different_stream_names() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut orders = varve
.stream::<SimpleEvent, 1024>("orders")
.expect("Failed to create orders stream");
let mut users = varve
.stream::<SimpleEvent, 1024>("users")
.expect("Failed to create users stream");
let (order_seq, _) = orders
.append(
StreamId(1),
&SimpleEvent {
id: 100,
timestamp: 1,
value: 1,
},
)
.unwrap();
assert_eq!(order_seq.0, 0);
let (user_seq, _) = users
.append(
StreamId(1),
&SimpleEvent {
id: 200,
timestamp: 2,
value: 2,
},
)
.unwrap();
assert_eq!(user_seq.0, 0);
let (order_seq_2, _) = orders
.append(
StreamId(1),
&SimpleEvent {
id: 101,
timestamp: 3,
value: 3,
},
)
.unwrap();
assert_eq!(order_seq_2.0, 1);
let (user_seq_2, _) = users
.append(
StreamId(1),
&SimpleEvent {
id: 201,
timestamp: 4,
value: 4,
},
)
.unwrap();
assert_eq!(user_seq_2.0, 1);
}
#[test]
fn test_append_alloc_with_strings() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<OrderEvent, 4096>("orders")
.expect("Failed to create stream");
let stream_id = StreamId(12345);
let event = OrderEvent {
order_id: "ord_abc123".to_string(),
customer_id: "cust_xyz789".to_string(),
amount: 9999,
};
let (stream_seq, global_seq) = stream
.append_alloc(stream_id, &event)
.expect("Failed to append event");
assert_eq!(stream_seq.0, 0);
assert_eq!(global_seq.0, 0);
let bytes = stream.get_bytes(stream_id, stream_seq).unwrap().unwrap();
let archived =
rkyv::access::<rkyv::Archived<OrderEvent>, rkyv::rancor::Error>(&bytes).unwrap();
assert_eq!(archived.order_id.as_str(), "ord_abc123");
assert_eq!(archived.customer_id.as_str(), "cust_xyz789");
assert_eq!(archived.amount, 9999);
}
#[test]
fn test_batch_append() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id = StreamId(1);
let events: Vec<SimpleEvent> = (0..100)
.map(|i| SimpleEvent {
id: i,
timestamp: 1702400000 + i,
value: (i * 10) as i32,
})
.collect();
let results = stream
.append_batch(stream_id, &events)
.expect("Failed to batch append");
assert_eq!(results.len(), 100);
for (i, (stream_seq, global_seq)) in results.iter().enumerate() {
assert_eq!(stream_seq.0, i as u64);
assert_eq!(global_seq.0, i as u64);
}
}
#[test]
fn test_global_iteration() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut orders = varve
.stream::<OrderEvent, 4096>("orders")
.expect("Failed to create orders stream");
orders
.append_alloc(
StreamId(1),
&OrderEvent {
order_id: "ord_001".to_string(),
customer_id: "cust_001".to_string(),
amount: 100,
},
)
.unwrap();
let mut users = varve
.stream::<UserEvent, 4096>("users")
.expect("Failed to create users stream");
users
.append_alloc(
StreamId(1),
&UserEvent {
user_id: "usr_001".to_string(),
email: "test@example.com".to_string(),
action: "registered".to_string(),
},
)
.unwrap();
let mut orders2 = varve
.stream::<OrderEvent, 4096>("orders")
.expect("Failed to get orders stream");
orders2
.append_alloc(
StreamId(2),
&OrderEvent {
order_id: "ord_002".to_string(),
customer_id: "cust_002".to_string(),
amount: 200,
},
)
.unwrap();
let reader = varve.global_reader();
let iter = reader
.iter_from(GlobalSequence(0))
.expect("Failed to create iterator");
let events = iter.collect_all().expect("Failed to collect events");
assert_eq!(events.len(), 3);
assert_eq!(events[0].stream_name, "orders");
assert_eq!(events[0].global_seq.0, 0);
assert_eq!(events[1].stream_name, "users");
assert_eq!(events[1].global_seq.0, 1);
assert_eq!(events[2].stream_name, "orders");
assert_eq!(events[2].global_seq.0, 2);
}
#[test]
fn test_stream_reader_iteration() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id = StreamId(42);
for i in 0..5u64 {
stream
.append(
stream_id,
&SimpleEvent {
id: i,
timestamp: 1000 + i,
value: (i * 2) as i32,
},
)
.unwrap();
}
let reader = stream.reader();
let iter = reader
.iter_stream(stream_id, None)
.expect("Failed to create iterator");
let events = iter.collect_bytes().expect("Failed to collect bytes");
assert_eq!(events.len(), 5);
for (i, (seq, bytes)) in events.iter().enumerate() {
assert_eq!(seq.0, i as u64);
let archived =
rkyv::access::<rkyv::Archived<SimpleEvent>, rkyv::rancor::Error>(bytes).unwrap();
assert_eq!(archived.id, i as u64);
}
}
#[test]
fn test_stream_reader_iteration_from_sequence() {
let dir = tempdir().expect("Failed to create temp dir");
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id = StreamId(1);
for i in 0..10u64 {
stream
.append(
stream_id,
&SimpleEvent {
id: i,
timestamp: 1000 + i,
value: (i * 2) as i32,
},
)
.unwrap();
}
let reader = stream.reader();
let iter = reader
.iter_stream(stream_id, Some(StreamSequence(5)))
.expect("Failed to create iterator");
let events = iter.collect_bytes().expect("Failed to collect bytes");
assert_eq!(events.len(), 5);
for (i, (seq, _)) in events.iter().enumerate() {
assert_eq!(seq.0, (5 + i) as u64);
}
}
#[test]
fn test_persistence_across_reopen() {
let dir = tempdir().expect("Failed to create temp dir");
{
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to create stream");
let stream_id = StreamId(1);
stream
.append(
stream_id,
&SimpleEvent {
id: 0,
timestamp: 1000,
value: 10,
},
)
.unwrap();
stream
.append(
stream_id,
&SimpleEvent {
id: 1,
timestamp: 1001,
value: 20,
},
)
.unwrap();
}
{
let mut varve = Varve::new(dir.path()).expect("Failed to reopen Varve");
assert_eq!(varve.next_global_seq().0, 2);
let mut stream = varve
.stream::<SimpleEvent, 1024>("events")
.expect("Failed to get stream");
let stream_id = StreamId(1);
let (stream_seq, global_seq) = stream
.append(
stream_id,
&SimpleEvent {
id: 2,
timestamp: 1002,
value: 30,
},
)
.unwrap();
assert_eq!(stream_seq.0, 2);
assert_eq!(global_seq.0, 2);
let reader = stream.reader();
let iter = reader.iter_stream(stream_id, None).unwrap();
let events = iter.collect_bytes().unwrap();
assert_eq!(events.len(), 3);
}
}
#[test]
#[cfg(feature = "notify")]
fn test_watcher_initializes_to_next_global_seq_on_open_and_reopen() {
let dir = tempdir().expect("Failed to create temp dir");
{
let mut varve = Varve::new(dir.path()).expect("Failed to create Varve");
assert_eq!(
varve.watcher().committed_next_global_seq(),
varve.next_global_seq()
);
let mut stream = varve
.stream::<u64, 64>("events")
.expect("Failed to create stream");
stream.append(StreamId(1), &1u64).unwrap();
stream.append(StreamId(1), &2u64).unwrap();
assert_eq!(varve.next_global_seq(), GlobalSequence(2));
assert_eq!(
varve.watcher().committed_next_global_seq(),
GlobalSequence(2)
);
}
let varve = Varve::new(dir.path()).expect("Failed to reopen Varve");
assert_eq!(varve.next_global_seq(), GlobalSequence(2));
assert_eq!(
varve.watcher().committed_next_global_seq(),
GlobalSequence(2)
);
}
}