use heed::{Env, RoTxn, WithTls};
use crate::error::Result;
use crate::event::GlobalEvent;
use crate::types::{GlobalEventRecord, GlobalEventsDb, GlobalSequence};
#[cfg(feature = "notify")]
use crate::notify::WriteWatcher;
pub struct GlobalReader {
pub(crate) env: Env,
pub(crate) global_db: GlobalEventsDb,
pub(crate) scratch: rkyv::util::AlignedVec<16>,
#[cfg(feature = "notify")]
pub(crate) watcher: WriteWatcher,
}
impl GlobalReader {
pub fn get(&mut self, global_seq: GlobalSequence) -> Result<Option<GlobalEvent>> {
let rtxn = self.env.read_txn()?;
let bytes = self.global_db.get(&rtxn, &global_seq.0)?;
match bytes {
Some(b) => {
self.scratch.clear();
self.scratch.extend_from_slice(b);
match GlobalEventRecord::from_bytes(&self.scratch) {
Some(record) => Ok(Some(GlobalEvent {
global_seq,
stream_name: record.stream_name,
stream_id: record.stream_id,
stream_seq: record.stream_seq,
payload: record.payload,
})),
None => Ok(None),
}
}
None => Ok(None),
}
}
pub fn get_bytes(&mut self, global_seq: GlobalSequence) -> Result<Option<&[u8]>> {
let rtxn = self.env.read_txn()?;
let bytes = self.global_db.get(&rtxn, &global_seq.0)?;
match bytes {
Some(b) => {
self.scratch.clear();
self.scratch.extend_from_slice(b);
Ok(GlobalEventRecord::payload_from_bytes(&self.scratch))
}
None => Ok(None),
}
}
pub fn iter_from(&self, from: GlobalSequence) -> Result<GlobalIterator<'_>> {
let rtxn = self.env.read_txn()?;
Ok(GlobalIterator {
db: self.global_db,
rtxn,
from: from.0,
})
}
#[cfg(feature = "notify")]
pub fn watcher(&self) -> WriteWatcher {
self.watcher.clone()
}
}
impl Clone for GlobalReader {
fn clone(&self) -> Self {
Self {
env: self.env.clone(),
global_db: self.global_db,
scratch: rkyv::util::AlignedVec::new(),
#[cfg(feature = "notify")]
watcher: self.watcher.clone(),
}
}
}
pub struct GlobalIterator<'a> {
pub(crate) db: GlobalEventsDb,
pub(crate) rtxn: RoTxn<'a, WithTls>,
pub(crate) from: u64,
}
impl<'a> GlobalIterator<'a> {
pub fn collect_all(self) -> Result<Vec<GlobalEvent>> {
let mut results = Vec::new();
let iter = self.db.range(&self.rtxn, &(self.from..))?;
for item in iter {
let (seq, bytes) = item?;
if let Some(record) = GlobalEventRecord::from_bytes(bytes) {
results.push(GlobalEvent {
global_seq: GlobalSequence(seq),
stream_name: record.stream_name,
stream_id: record.stream_id,
stream_seq: record.stream_seq,
payload: record.payload,
});
}
}
Ok(results)
}
pub fn for_each<F>(self, mut f: F) -> Result<()>
where
F: FnMut(GlobalEvent),
{
let iter = self.db.range(&self.rtxn, &(self.from..))?;
for item in iter {
let (seq, bytes) = item?;
if let Some(record) = GlobalEventRecord::from_bytes(bytes) {
f(GlobalEvent {
global_seq: GlobalSequence(seq),
stream_name: record.stream_name,
stream_id: record.stream_id,
stream_seq: record.stream_seq,
payload: record.payload,
});
}
}
Ok(())
}
}
#[cfg(all(test, feature = "notify"))]
mod notify_tests {
use rstest::fixture;
use rstest::rstest;
use tempfile::TempDir;
use crate::{GlobalSequence, StreamId, Varve};
struct TestDb {
varve: Varve,
_dir: TempDir,
}
#[fixture]
fn db() -> TestDb {
let dir = tempfile::tempdir().expect("Failed to create temp dir");
let varve = Varve::new(dir.path()).expect("Failed to create Varve");
TestDb { varve, _dir: dir }
}
#[rstest]
fn global_reader_watcher_sees_new_writes(mut db: TestDb) {
let global_reader = db.varve.global_reader();
let watcher = global_reader.watcher();
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(0));
let mut stream = db
.varve
.stream::<u64, 64>("events")
.expect("Failed to create stream");
stream
.append(StreamId(1), &123u64)
.expect("Failed to append");
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(1));
}
#[rstest]
fn global_reader_clone_preserves_watcher(mut db: TestDb) {
let global_reader = db.varve.global_reader();
let global_reader2 = global_reader.clone();
let w1 = global_reader.watcher();
let w2 = global_reader2.watcher();
assert_eq!(w1.committed_next_global_seq(), GlobalSequence(0));
assert_eq!(w2.committed_next_global_seq(), GlobalSequence(0));
let mut stream = db
.varve
.stream::<u64, 64>("events")
.expect("Failed to create stream");
stream.append(StreamId(1), &1u64).unwrap();
stream.append(StreamId(1), &2u64).unwrap();
assert_eq!(w1.committed_next_global_seq(), GlobalSequence(2));
assert_eq!(w2.committed_next_global_seq(), GlobalSequence(2));
}
}