1#[cfg(test)]
2use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
3#[cfg(test)]
4use crate::test_util::journal::TestTransaction as Transaction;
5use dashmap::DashMap;
6use num_derive::FromPrimitive;
7use num_traits::FromPrimitive;
8use off64::int::create_u64_be;
9use off64::int::Off64ReadInt;
10use off64::int::Off64WriteMutInt;
11use off64::usz;
12use off64::Off64Read;
13use rustc_hash::FxHasher;
14#[cfg(not(test))]
15use seekable_async_file::SeekableAsyncFile;
16use std::error::Error;
17use std::fmt;
18use std::fmt::Display;
19use std::hash::BuildHasherDefault;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use struct_name::StructName;
24use struct_name_macro::StructName;
25use tracing::debug;
26use tracing::warn;
27#[cfg(not(test))]
28use write_journal::Transaction;
29
30pub(crate) const STREVT_OFFSETOF_TYPE: u64 = 0;
58pub(crate) const STREVT_OFFSETOF_BUCKET_ID: u64 = STREVT_OFFSETOF_TYPE + 1;
59pub(crate) const STREVT_OFFSETOF_OBJECT_ID: u64 = STREVT_OFFSETOF_BUCKET_ID + 6;
60pub(crate) const STREVT_SIZE: u64 = STREVT_OFFSETOF_OBJECT_ID + 8;
61
62pub(crate) const STREAM_OFFSETOF_VIRTUAL_HEAD: u64 = 0;
63pub(crate) const STREAM_OFFSETOF_EVENTS: u64 = STREAM_OFFSETOF_VIRTUAL_HEAD + 8;
64pub(crate) fn STREAM_OFFSETOF_EVENT(event_id: u64) -> u64 {
65 STREAM_OFFSETOF_EVENTS + (STREVT_SIZE * (event_id % STREAM_EVENT_CAP))
66}
67pub(crate) const STREAM_EVENT_CAP: u64 = 8_000_000;
69pub(crate) const STREAM_SIZE: u64 = STREAM_OFFSETOF_EVENTS + (STREAM_EVENT_CAP * STREVT_SIZE);
70
71#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
72#[repr(u8)]
73pub enum StreamEventType {
74 ObjectCommit = 1,
76 ObjectDelete,
77}
78
79#[derive(Clone, Debug)]
80pub struct StreamEvent {
81 pub typ: StreamEventType,
82 pub bucket_id: u64,
83 pub object_id: u64,
84}
85
86pub(crate) struct StreamInMemory {
87 virtual_head: AtomicU64,
88 events: DashMap<u64, StreamEvent, BuildHasherDefault<FxHasher>>,
89}
90
91pub(crate) struct Stream {
92 dev_offset: u64,
93 in_memory: Arc<StreamInMemory>,
94}
95
96#[must_use]
97pub(crate) struct CreatedStreamEvent {
98 id: u64,
99 event: StreamEvent,
100}
101
102impl Stream {
103 pub async fn load_from_device(
104 dev: &SeekableAsyncFile,
105 dev_offset: u64,
106 ) -> (Stream, Arc<StreamInMemory>) {
107 let raw_all = dev.read_at(dev_offset, STREAM_SIZE).await;
108 let virtual_head = raw_all.read_u64_be_at(STREAM_OFFSETOF_VIRTUAL_HEAD);
109 let events = DashMap::<u64, StreamEvent, BuildHasherDefault<FxHasher>>::default();
110 for i in 0..STREAM_EVENT_CAP {
111 let event_id = virtual_head + i;
112 let raw = raw_all.read_at(STREAM_OFFSETOF_EVENT(event_id), STREVT_SIZE);
113 let typ_raw = raw[usz!(STREVT_OFFSETOF_TYPE)];
114 if typ_raw == 0 {
115 break;
116 };
117 let typ = StreamEventType::from_u8(typ_raw).unwrap();
118 let bucket_id = raw.read_u48_be_at(STREVT_OFFSETOF_BUCKET_ID);
119 let object_id = raw.read_u64_be_at(STREVT_OFFSETOF_OBJECT_ID);
120 events.insert(event_id, StreamEvent {
121 bucket_id,
122 object_id,
123 typ,
124 });
125 }
126 debug!(event_count = events.len(), virtual_head, "stream loaded");
127 let in_memory = Arc::new(StreamInMemory {
128 events,
129 virtual_head: AtomicU64::new(virtual_head),
130 });
131 (
132 Stream {
133 dev_offset,
134 in_memory: in_memory.clone(),
135 },
136 in_memory,
137 )
138 }
139
140 pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64) {
141 dev.write_at(dev_offset, vec![0u8; usz!(STREAM_SIZE)]).await;
142 }
143
144 pub fn create_event_on_device(
146 &self,
147 txn: &mut Transaction,
148 e: StreamEvent,
149 ) -> CreatedStreamEvent {
150 let event_id = self.in_memory.virtual_head.fetch_add(1, Ordering::Relaxed);
151 if event_id > STREAM_EVENT_CAP {
152 self.in_memory.events.remove(&(event_id - STREAM_EVENT_CAP));
154 };
155
156 txn.write(
158 self.dev_offset + STREAM_OFFSETOF_VIRTUAL_HEAD,
159 create_u64_be(event_id + 1),
160 );
161 let mut raw = vec![0u8; usz!(STREVT_SIZE)];
163 raw[usz!(STREVT_OFFSETOF_TYPE)] = e.typ as u8;
164 raw.write_u48_be_at(STREVT_OFFSETOF_BUCKET_ID, e.bucket_id);
165 raw.write_u64_be_at(STREVT_OFFSETOF_OBJECT_ID, e.object_id);
166 txn.write(self.dev_offset + STREAM_OFFSETOF_EVENT(event_id), raw);
167
168 CreatedStreamEvent {
169 id: event_id,
170 event: e,
171 }
172 }
173}
174
175#[derive(Clone, Copy, PartialEq, Eq, Debug, StructName)]
176pub struct StreamEventExpiredError;
177
178impl Display for StreamEventExpiredError {
179 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180 f.write_str(Self::struct_name())
181 }
182}
183
184impl Error for StreamEventExpiredError {}
185
186impl StreamInMemory {
187 pub fn add_event_to_in_memory_list(&self, c: CreatedStreamEvent) {
188 let virtual_head = self.virtual_head.load(Ordering::Relaxed);
189 if virtual_head >= STREAM_EVENT_CAP && virtual_head - STREAM_EVENT_CAP > c.id {
190 warn!("event stream is rotating too quickly, recently created event has already expired");
191 return;
192 };
193 let None = self.events.insert(c.id, c.event) else {
194 unreachable!();
195 };
196 }
197
198 pub fn get_event(&self, id: u64) -> Result<Option<StreamEvent>, StreamEventExpiredError> {
199 if id >= STREAM_EVENT_CAP && self.virtual_head.load(Ordering::Relaxed) > id - STREAM_EVENT_CAP {
200 return Err(StreamEventExpiredError);
201 };
202 Ok(self.events.get(&id).map(|e| e.clone()))
203 }
204}