#[cfg(test)]
use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
#[cfg(test)]
use crate::test_util::journal::TestTransaction as Transaction;
use dashmap::DashMap;
use num_derive::FromPrimitive;
use num_traits::FromPrimitive;
use off64::int::create_u64_be;
use off64::int::Off64ReadInt;
use off64::int::Off64WriteMutInt;
use off64::usz;
use off64::Off64Read;
use rustc_hash::FxHasher;
#[cfg(not(test))]
use seekable_async_file::SeekableAsyncFile;
use std::error::Error;
use std::fmt;
use std::fmt::Display;
use std::hash::BuildHasherDefault;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use struct_name::StructName;
use struct_name_macro::StructName;
use tracing::debug;
use tracing::warn;
#[cfg(not(test))]
use write_journal::Transaction;
pub(crate) const STREVT_OFFSETOF_TYPE: u64 = 0;
pub(crate) const STREVT_OFFSETOF_BUCKET_ID: u64 = STREVT_OFFSETOF_TYPE + 1;
pub(crate) const STREVT_OFFSETOF_OBJECT_ID: u64 = STREVT_OFFSETOF_BUCKET_ID + 6;
pub(crate) const STREVT_SIZE: u64 = STREVT_OFFSETOF_OBJECT_ID + 8;
pub(crate) const STREAM_OFFSETOF_VIRTUAL_HEAD: u64 = 0;
pub(crate) const STREAM_OFFSETOF_EVENTS: u64 = STREAM_OFFSETOF_VIRTUAL_HEAD + 8;
pub(crate) fn STREAM_OFFSETOF_EVENT(event_id: u64) -> u64 {
STREAM_OFFSETOF_EVENTS + (STREVT_SIZE * (event_id % STREAM_EVENT_CAP))
}
pub(crate) const STREAM_EVENT_CAP: u64 = 8_000_000;
pub(crate) const STREAM_SIZE: u64 = STREAM_OFFSETOF_EVENTS + (STREAM_EVENT_CAP * STREVT_SIZE);
#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
#[repr(u8)]
pub enum StreamEventType {
ObjectCommit = 1,
ObjectDelete,
}
#[derive(Clone, Debug)]
pub struct StreamEvent {
pub typ: StreamEventType,
pub bucket_id: u64,
pub object_id: u64,
}
pub(crate) struct StreamInMemory {
virtual_head: AtomicU64,
events: DashMap<u64, StreamEvent, BuildHasherDefault<FxHasher>>,
}
pub(crate) struct Stream {
dev_offset: u64,
in_memory: Arc<StreamInMemory>,
}
#[must_use]
pub(crate) struct CreatedStreamEvent {
id: u64,
event: StreamEvent,
}
impl Stream {
pub async fn load_from_device(
dev: &SeekableAsyncFile,
dev_offset: u64,
) -> (Stream, Arc<StreamInMemory>) {
let raw_all = dev.read_at(dev_offset, STREAM_SIZE).await;
let virtual_head = raw_all.read_u64_be_at(STREAM_OFFSETOF_VIRTUAL_HEAD);
let events = DashMap::<u64, StreamEvent, BuildHasherDefault<FxHasher>>::default();
for i in 0..STREAM_EVENT_CAP {
let event_id = virtual_head + i;
let raw = raw_all.read_at(STREAM_OFFSETOF_EVENT(event_id), STREVT_SIZE);
let typ_raw = raw[usz!(STREVT_OFFSETOF_TYPE)];
if typ_raw == 0 {
break;
};
let typ = StreamEventType::from_u8(typ_raw).unwrap();
let bucket_id = raw.read_u48_be_at(STREVT_OFFSETOF_BUCKET_ID);
let object_id = raw.read_u64_be_at(STREVT_OFFSETOF_OBJECT_ID);
events.insert(event_id, StreamEvent {
bucket_id,
object_id,
typ,
});
}
debug!(event_count = events.len(), virtual_head, "stream loaded");
let in_memory = Arc::new(StreamInMemory {
events,
virtual_head: AtomicU64::new(virtual_head),
});
(
Stream {
dev_offset,
in_memory: in_memory.clone(),
},
in_memory,
)
}
pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64) {
dev.write_at(dev_offset, vec![0u8; usz!(STREAM_SIZE)]).await;
}
pub fn create_event_on_device(
&self,
txn: &mut Transaction,
e: StreamEvent,
) -> CreatedStreamEvent {
let event_id = self.in_memory.virtual_head.fetch_add(1, Ordering::Relaxed);
if event_id > STREAM_EVENT_CAP {
self.in_memory.events.remove(&(event_id - STREAM_EVENT_CAP));
};
txn.write(
self.dev_offset + STREAM_OFFSETOF_VIRTUAL_HEAD,
create_u64_be(event_id + 1),
);
let mut raw = vec![0u8; usz!(STREVT_SIZE)];
raw[usz!(STREVT_OFFSETOF_TYPE)] = e.typ as u8;
raw.write_u48_be_at(STREVT_OFFSETOF_BUCKET_ID, e.bucket_id);
raw.write_u64_be_at(STREVT_OFFSETOF_OBJECT_ID, e.object_id);
txn.write(self.dev_offset + STREAM_OFFSETOF_EVENT(event_id), raw);
CreatedStreamEvent {
id: event_id,
event: e,
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, StructName)]
pub struct StreamEventExpiredError;
impl Display for StreamEventExpiredError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(Self::struct_name())
}
}
impl Error for StreamEventExpiredError {}
impl StreamInMemory {
pub fn add_event_to_in_memory_list(&self, c: CreatedStreamEvent) {
let virtual_head = self.virtual_head.load(Ordering::Relaxed);
if virtual_head >= STREAM_EVENT_CAP && virtual_head - STREAM_EVENT_CAP > c.id {
warn!("event stream is rotating too quickly, recently created event has already expired");
return;
};
let None = self.events.insert(c.id, c.event) else {
unreachable!();
};
}
pub fn get_event(&self, id: u64) -> Result<Option<StreamEvent>, StreamEventExpiredError> {
if id >= STREAM_EVENT_CAP && self.virtual_head.load(Ordering::Relaxed) > id - STREAM_EVENT_CAP {
return Err(StreamEventExpiredError);
};
Ok(self.events.get(&id).map(|e| e.clone()))
}
}