use std::marker::PhantomData;
use std::sync::Arc;
use heed::{RoTxn, WithTls};
use crate::error::Result;
use crate::stream::core::StreamCore;
use crate::types::{
GlobalEventRecord, GlobalEventsDb, StreamId, StreamIndexDb, StreamKey, StreamSequence,
};
#[cfg(feature = "notify")]
use crate::notify::WriteWatcher;
pub struct StreamReader<T> {
pub(crate) core: Arc<StreamCore>,
pub(crate) scratch: rkyv::util::AlignedVec<16>,
pub(crate) _marker: PhantomData<T>,
}
impl<T> StreamReader<T> {
pub fn name(&self) -> &str {
&self.core.stream_name
}
pub fn get_bytes(
&mut self,
stream_id: StreamId,
stream_seq: StreamSequence,
) -> Result<Option<&[u8]>> {
let index_key = StreamKey::new(stream_id, stream_seq);
let rtxn = self.core.env.read_txn()?;
let Some(global_seq) = self.core.index_db.get(&rtxn, &index_key)? else {
return Ok(None);
};
let Some(global_bytes) = self.core.global_db.get(&rtxn, &global_seq)? else {
return Ok(None);
};
let Some(payload) = GlobalEventRecord::payload_from_bytes(global_bytes) else {
return Ok(None);
};
self.scratch.clear();
self.scratch.extend_from_slice(payload);
Ok(Some(&self.scratch))
}
pub fn get_archived(
&mut self,
stream_id: StreamId,
stream_seq: StreamSequence,
) -> Result<Option<&rkyv::Archived<T>>>
where
T: rkyv::Archive,
rkyv::Archived<T>: rkyv::Portable
+ for<'a> rkyv::bytecheck::CheckBytes<
rkyv::api::high::HighValidator<'a, rkyv::rancor::Error>,
>,
{
let Some(bytes) = self.get_bytes(stream_id, stream_seq)? else {
return Ok(None);
};
let archived = rkyv::access::<rkyv::Archived<T>, rkyv::rancor::Error>(bytes)?;
Ok(Some(archived))
}
pub unsafe fn get_archived_unchecked(
&mut self,
stream_id: StreamId,
stream_seq: StreamSequence,
) -> Result<Option<&rkyv::Archived<T>>>
where
T: rkyv::Archive,
rkyv::Archived<T>: rkyv::Portable,
{
let Some(bytes) = self.get_bytes(stream_id, stream_seq)? else {
return Ok(None);
};
Ok(Some(unsafe {
rkyv::access_unchecked::<rkyv::Archived<T>>(bytes)
}))
}
pub fn iter_stream(
&self,
stream_id: StreamId,
from: Option<StreamSequence>,
) -> Result<StreamIterator<'_>> {
let rtxn = self.core.env.read_txn()?;
let start_key = StreamKey::new(stream_id, from.unwrap_or(StreamSequence(0)));
let end_key = StreamKey::new(StreamId(stream_id.0 + 1), StreamSequence(0));
Ok(StreamIterator {
index_db: self.core.index_db,
global_db: self.core.global_db,
rtxn,
start_key,
end_key,
stream_id,
})
}
#[cfg(feature = "notify")]
pub fn watcher(&self) -> WriteWatcher {
self.core.watcher.clone()
}
}
impl<T> Clone for StreamReader<T> {
fn clone(&self) -> Self {
Self {
core: Arc::clone(&self.core),
scratch: rkyv::util::AlignedVec::new(),
_marker: PhantomData,
}
}
}
pub struct StreamIterator<'a> {
pub(crate) index_db: StreamIndexDb,
pub(crate) global_db: GlobalEventsDb,
pub(crate) rtxn: RoTxn<'a, WithTls>,
pub(crate) start_key: StreamKey,
pub(crate) end_key: StreamKey,
pub(crate) stream_id: StreamId,
}
impl<'a> StreamIterator<'a> {
pub fn collect_bytes(self) -> Result<Vec<(StreamSequence, Vec<u8>)>> {
let mut results = Vec::new();
let iter = self
.index_db
.range(&self.rtxn, &(self.start_key..self.end_key))?;
for item in iter {
let (key, global_seq) = item?;
if key.stream_id != self.stream_id {
break;
}
if let Some(global_bytes) = self.global_db.get(&self.rtxn, &global_seq)? {
if let Some(payload) = GlobalEventRecord::payload_from_bytes(global_bytes) {
results.push((key.stream_seq, payload.to_vec()));
}
}
}
Ok(results)
}
pub fn for_each<F>(self, mut f: F) -> Result<()>
where
F: FnMut(StreamSequence, &[u8]),
{
let iter = self
.index_db
.range(&self.rtxn, &(self.start_key..self.end_key))?;
for item in iter {
let (key, global_seq) = item?;
if key.stream_id != self.stream_id {
break;
}
if let Some(global_bytes) = self.global_db.get(&self.rtxn, &global_seq)? {
if let Some(payload) = GlobalEventRecord::payload_from_bytes(global_bytes) {
f(key.stream_seq, payload);
}
}
}
Ok(())
}
}
#[cfg(all(test, feature = "notify"))]
mod notify_tests {
use rstest::fixture;
use rstest::rstest;
use tempfile::TempDir;
use crate::{GlobalSequence, StreamId, Varve};
struct TestDb {
varve: Varve,
_dir: TempDir,
}
#[fixture]
fn db() -> TestDb {
let dir = tempfile::tempdir().expect("Failed to create temp dir");
let varve = Varve::new(dir.path()).expect("Failed to create Varve");
TestDb { varve, _dir: dir }
}
#[rstest]
fn stream_reader_watcher_sees_new_writes(mut db: TestDb) {
let mut stream = db
.varve
.stream::<u64, 64>("events")
.expect("Failed to create stream");
let reader = stream.reader();
let watcher = reader.watcher();
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(0));
stream
.append(StreamId(1), &123u64)
.expect("Failed to append");
assert_eq!(watcher.committed_next_global_seq(), GlobalSequence(1));
}
#[rstest]
fn stream_reader_clone_preserves_watcher(mut db: TestDb) {
let mut stream = db
.varve
.stream::<u64, 64>("events")
.expect("Failed to create stream");
let reader = stream.reader();
let reader2 = reader.clone();
let w1 = reader.watcher();
let w2 = reader2.watcher();
assert_eq!(w1.committed_next_global_seq(), GlobalSequence(0));
assert_eq!(w2.committed_next_global_seq(), GlobalSequence(0));
stream.append(StreamId(1), &1u64).unwrap();
stream.append(StreamId(1), &2u64).unwrap();
assert_eq!(w1.committed_next_global_seq(), GlobalSequence(2));
assert_eq!(w2.committed_next_global_seq(), GlobalSequence(2));
}
}