libblobd_lite/
stream.rs

1#[cfg(test)]
2use crate::test_util::device::TestSeekableAsyncFile as SeekableAsyncFile;
3#[cfg(test)]
4use crate::test_util::journal::TestTransaction as Transaction;
5use dashmap::DashMap;
6use num_derive::FromPrimitive;
7use num_traits::FromPrimitive;
8use off64::int::create_u64_be;
9use off64::int::Off64ReadInt;
10use off64::int::Off64WriteMutInt;
11use off64::usz;
12use off64::Off64Read;
13use rustc_hash::FxHasher;
14#[cfg(not(test))]
15use seekable_async_file::SeekableAsyncFile;
16use std::error::Error;
17use std::fmt;
18use std::fmt::Display;
19use std::hash::BuildHasherDefault;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use struct_name::StructName;
24use struct_name_macro::StructName;
25use tracing::debug;
26use tracing::warn;
27#[cfg(not(test))]
28use write_journal::Transaction;
29
30/**
31
32STREAM
33======
34
35Instead of reading and writing directly from/to mmap, we simply load into memory. This avoids subtle race conditions and handling complexity, as we don't have ability to atomically read/write directly from mmap, and we need to ensure strictly sequential event IDs, careful handling of buffer wrapping, not reading and writing simultaneously (which is hard as all writes are external via the journal), etc. It's not much memory anyway and the storage layout is no better optimised than an in-memory Vec.
36
37Events aren't valid until the transaction for the changes they represent has been committed, but the event is written to the device in the same transaction. Therefore, the in-memory data structure is separate from the device data, and the event is separately inserted into the in-memory structure *after* the journal transaction has committed.
38
39We must have strictly sequential event IDs, and we must assign and store them:
40- If a replicating client asks for events past N, and N+1 does not exist but N+2 does, we must know that it's because N+1 has been erased, and not just possibly due to holes in event IDs.
41- If we don't assign one to each event or store them, when a client asks for N, we won't know if the event at (N % STREAM_EVENT_CAP) is actually N or some other multiple.
42
43The value of `virtual_head` represents the head position in the ring buffer, as well as that entry's event ID. This saves us from storing a sequential event ID with every event.
44
45Structure
46---------
47
48u64 virtual_head
49{
50  u8 type
51  u48 bucket_id
52  u64 object_id
53}[STREAM_EVENT_CAP] events_ring_buffer
54
55**/
56
57pub(crate) const STREVT_OFFSETOF_TYPE: u64 = 0;
58pub(crate) const STREVT_OFFSETOF_BUCKET_ID: u64 = STREVT_OFFSETOF_TYPE + 1;
59pub(crate) const STREVT_OFFSETOF_OBJECT_ID: u64 = STREVT_OFFSETOF_BUCKET_ID + 6;
60pub(crate) const STREVT_SIZE: u64 = STREVT_OFFSETOF_OBJECT_ID + 8;
61
62pub(crate) const STREAM_OFFSETOF_VIRTUAL_HEAD: u64 = 0;
63pub(crate) const STREAM_OFFSETOF_EVENTS: u64 = STREAM_OFFSETOF_VIRTUAL_HEAD + 8;
64pub(crate) fn STREAM_OFFSETOF_EVENT(event_id: u64) -> u64 {
65  STREAM_OFFSETOF_EVENTS + (STREVT_SIZE * (event_id % STREAM_EVENT_CAP))
66}
67// Ensure this is large enough such that a replica can be created without a high chance of the stream wrapping before the storage is replicated. An event is only 15 bytes, so we can be gracious here.
68pub(crate) const STREAM_EVENT_CAP: u64 = 8_000_000;
69pub(crate) const STREAM_SIZE: u64 = STREAM_OFFSETOF_EVENTS + (STREAM_EVENT_CAP * STREVT_SIZE);
70
71#[derive(PartialEq, Eq, Clone, Copy, Debug, FromPrimitive)]
72#[repr(u8)]
73pub enum StreamEventType {
74  // WARNING: Values must start from 1, so that empty slots are automatically detected since formatting fills storage with zero.
75  ObjectCommit = 1,
76  ObjectDelete,
77}
78
79#[derive(Clone, Debug)]
80pub struct StreamEvent {
81  pub typ: StreamEventType,
82  pub bucket_id: u64,
83  pub object_id: u64,
84}
85
86pub(crate) struct StreamInMemory {
87  virtual_head: AtomicU64,
88  events: DashMap<u64, StreamEvent, BuildHasherDefault<FxHasher>>,
89}
90
91pub(crate) struct Stream {
92  dev_offset: u64,
93  in_memory: Arc<StreamInMemory>,
94}
95
96#[must_use]
97pub(crate) struct CreatedStreamEvent {
98  id: u64,
99  event: StreamEvent,
100}
101
102impl Stream {
103  pub async fn load_from_device(
104    dev: &SeekableAsyncFile,
105    dev_offset: u64,
106  ) -> (Stream, Arc<StreamInMemory>) {
107    let raw_all = dev.read_at(dev_offset, STREAM_SIZE).await;
108    let virtual_head = raw_all.read_u64_be_at(STREAM_OFFSETOF_VIRTUAL_HEAD);
109    let events = DashMap::<u64, StreamEvent, BuildHasherDefault<FxHasher>>::default();
110    for i in 0..STREAM_EVENT_CAP {
111      let event_id = virtual_head + i;
112      let raw = raw_all.read_at(STREAM_OFFSETOF_EVENT(event_id), STREVT_SIZE);
113      let typ_raw = raw[usz!(STREVT_OFFSETOF_TYPE)];
114      if typ_raw == 0 {
115        break;
116      };
117      let typ = StreamEventType::from_u8(typ_raw).unwrap();
118      let bucket_id = raw.read_u48_be_at(STREVT_OFFSETOF_BUCKET_ID);
119      let object_id = raw.read_u64_be_at(STREVT_OFFSETOF_OBJECT_ID);
120      events.insert(event_id, StreamEvent {
121        bucket_id,
122        object_id,
123        typ,
124      });
125    }
126    debug!(event_count = events.len(), virtual_head, "stream loaded");
127    let in_memory = Arc::new(StreamInMemory {
128      events,
129      virtual_head: AtomicU64::new(virtual_head),
130    });
131    (
132      Stream {
133        dev_offset,
134        in_memory: in_memory.clone(),
135      },
136      in_memory,
137    )
138  }
139
140  pub async fn format_device(dev: &SeekableAsyncFile, dev_offset: u64) {
141    dev.write_at(dev_offset, vec![0u8; usz!(STREAM_SIZE)]).await;
142  }
143
144  /// Returns the event ID. This won't add to the in-memory list, so remember to do that after the transaction has committed.
145  pub fn create_event_on_device(
146    &self,
147    txn: &mut Transaction,
148    e: StreamEvent,
149  ) -> CreatedStreamEvent {
150    let event_id = self.in_memory.virtual_head.fetch_add(1, Ordering::Relaxed);
151    if event_id > STREAM_EVENT_CAP {
152      // This may not exist.
153      self.in_memory.events.remove(&(event_id - STREAM_EVENT_CAP));
154    };
155
156    // New head.
157    txn.write(
158      self.dev_offset + STREAM_OFFSETOF_VIRTUAL_HEAD,
159      create_u64_be(event_id + 1),
160    );
161    // New event.
162    let mut raw = vec![0u8; usz!(STREVT_SIZE)];
163    raw[usz!(STREVT_OFFSETOF_TYPE)] = e.typ as u8;
164    raw.write_u48_be_at(STREVT_OFFSETOF_BUCKET_ID, e.bucket_id);
165    raw.write_u64_be_at(STREVT_OFFSETOF_OBJECT_ID, e.object_id);
166    txn.write(self.dev_offset + STREAM_OFFSETOF_EVENT(event_id), raw);
167
168    CreatedStreamEvent {
169      id: event_id,
170      event: e,
171    }
172  }
173}
174
175#[derive(Clone, Copy, PartialEq, Eq, Debug, StructName)]
176pub struct StreamEventExpiredError;
177
178impl Display for StreamEventExpiredError {
179  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
180    f.write_str(Self::struct_name())
181  }
182}
183
184impl Error for StreamEventExpiredError {}
185
186impl StreamInMemory {
187  pub fn add_event_to_in_memory_list(&self, c: CreatedStreamEvent) {
188    let virtual_head = self.virtual_head.load(Ordering::Relaxed);
189    if virtual_head >= STREAM_EVENT_CAP && virtual_head - STREAM_EVENT_CAP > c.id {
190      warn!("event stream is rotating too quickly, recently created event has already expired");
191      return;
192    };
193    let None = self.events.insert(c.id, c.event) else {
194      unreachable!();
195    };
196  }
197
198  pub fn get_event(&self, id: u64) -> Result<Option<StreamEvent>, StreamEventExpiredError> {
199    if id >= STREAM_EVENT_CAP && self.virtual_head.load(Ordering::Relaxed) > id - STREAM_EVENT_CAP {
200      return Err(StreamEventExpiredError);
201    };
202    Ok(self.events.get(&id).map(|e| e.clone()))
203  }
204}