1use crate::{read_u64, safe_write, write_u64, Address, GrowFailed, Memory, Storable};
58use std::borrow::Cow;
59use std::cell::RefCell;
60use std::fmt;
61use std::marker::PhantomData;
62use std::thread::LocalKey;
63
64#[cfg(test)]
65mod tests;
66
67pub const INDEX_MAGIC: &[u8; 3] = b"GLI";
69pub const DATA_MAGIC: &[u8; 3] = b"GLD";
71
72const LAYOUT_VERSION: u8 = 1;
74
75const HEADER_V1_SIZE: u64 = 4;
77
78const RESERVED_HEADER_SIZE: u64 = 28;
80
81const HEADER_OFFSET: u64 = HEADER_V1_SIZE + RESERVED_HEADER_SIZE;
83
84struct HeaderV1 {
85 magic: [u8; 3],
86 version: u8,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90pub enum InitError {
91 IncompatibleDataVersion {
92 last_supported_version: u8,
93 decoded_version: u8,
94 },
95 IncompatibleIndexVersion {
96 last_supported_version: u8,
97 decoded_version: u8,
98 },
99 InvalidIndex,
100}
101
102impl fmt::Display for InitError {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 InitError::IncompatibleDataVersion {
106 last_supported_version,
107 decoded_version,
108 } => write!(
109 f,
110 "Incompatible data version: last supported version is {}, but decoded version is {}",
111 last_supported_version, decoded_version
112 ),
113 InitError::IncompatibleIndexVersion {
114 last_supported_version,
115 decoded_version,
116 } => write!(
117 f,
118 "Incompatible index version: last supported version is {}, but decoded version is {}",
119 last_supported_version, decoded_version
120 ),
121 InitError::InvalidIndex => write!(f, "Invalid index"),
122 }
123 }
124}
125
126#[derive(Debug, PartialEq, Eq)]
127pub enum WriteError {
128 GrowFailed { current_size: u64, delta: u64 },
129}
130
131impl From<GrowFailed> for WriteError {
132 fn from(
133 GrowFailed {
134 current_size,
135 delta,
136 }: GrowFailed,
137 ) -> Self {
138 Self::GrowFailed {
139 current_size,
140 delta,
141 }
142 }
143}
144
145#[derive(Debug, PartialEq, Eq)]
146pub struct NoSuchEntry;
147
148pub struct Log<T: Storable, INDEX: Memory, DATA: Memory> {
150 index_memory: INDEX,
151 data_memory: DATA,
152 _marker: PhantomData<T>,
153}
154
155impl<T: Storable, INDEX: Memory, DATA: Memory> Log<T, INDEX, DATA> {
156 pub fn new(index_memory: INDEX, data_memory: DATA) -> Self {
158 let log = Self {
159 index_memory,
160 data_memory,
161 _marker: PhantomData,
162 };
163 Self::write_header(
164 &log.index_memory,
165 &HeaderV1 {
166 magic: *INDEX_MAGIC,
167 version: LAYOUT_VERSION,
168 },
169 );
170 Self::write_header(
171 &log.data_memory,
172 &HeaderV1 {
173 magic: *DATA_MAGIC,
174 version: LAYOUT_VERSION,
175 },
176 );
177
178 write_u64(&log.index_memory, Address::from(HEADER_OFFSET), 0);
180 log
181 }
182
183 pub fn init(index_memory: INDEX, data_memory: DATA) -> Result<Self, InitError> {
187 if data_memory.size() == 0 {
189 return Ok(Self::new(index_memory, data_memory));
190 }
191 let data_header = Self::read_header(&data_memory);
192 if &data_header.magic != DATA_MAGIC {
193 return Ok(Self::new(index_memory, data_memory));
194 }
195
196 if data_header.version != LAYOUT_VERSION {
197 return Err(InitError::IncompatibleDataVersion {
198 last_supported_version: LAYOUT_VERSION,
199 decoded_version: data_header.version,
200 });
201 }
202
203 let index_header = Self::read_header(&index_memory);
204 if &index_header.magic != INDEX_MAGIC {
205 return Err(InitError::InvalidIndex);
206 }
207
208 if index_header.version != LAYOUT_VERSION {
209 return Err(InitError::IncompatibleIndexVersion {
210 last_supported_version: LAYOUT_VERSION,
211 decoded_version: index_header.version,
212 });
213 }
214
215 #[cfg(debug_assertions)]
216 {
217 assert_eq!(Ok(()), Self::validate_v1_index(&index_memory));
218 }
219
220 Ok(Self {
221 index_memory,
222 data_memory,
223 _marker: PhantomData,
224 })
225 }
226
227 fn write_header(memory: &impl Memory, header: &HeaderV1) {
229 if memory.size() < 1 {
230 assert!(
231 memory.grow(1) != -1,
232 "failed to allocate the first memory page"
233 );
234 }
235 memory.write(0, &header.magic);
236 memory.write(3, &[header.version]);
237 }
238
239 fn read_header(memory: &impl Memory) -> HeaderV1 {
242 let mut magic = [0u8; 3];
243 let mut version = [0u8; 1];
244 memory.read(0, &mut magic);
245 memory.read(3, &mut version);
246 HeaderV1 {
247 magic,
248 version: version[0],
249 }
250 }
251
252 #[cfg(debug_assertions)]
253 fn validate_v1_index(memory: &INDEX) -> Result<(), String> {
254 let num_entries = read_u64(memory, Address::from(HEADER_OFFSET));
255
256 if num_entries == 0 {
257 return Ok(());
258 }
259
260 let mut prev_entry = read_u64(memory, Address::from(HEADER_OFFSET + 8));
262 for i in 1..num_entries {
263 let entry = read_u64(memory, Address::from(HEADER_OFFSET + 8 + i * 8));
264 if entry < prev_entry {
265 return Err(format!("invalid entry I[{i}]: {entry} < {prev_entry}"));
266 }
267 prev_entry = entry;
268 }
269 Ok(())
270 }
271
272 pub fn into_memories(self) -> (INDEX, DATA) {
274 (self.index_memory, self.data_memory)
275 }
276
277 pub fn is_empty(&self) -> bool {
279 self.len() == 0
280 }
281
282 pub fn index_size_bytes(&self) -> u64 {
284 let num_entries = read_u64(&self.index_memory, Address::from(HEADER_OFFSET));
285 self.index_entry_offset(num_entries).get()
286 }
287
288 pub fn data_size_bytes(&self) -> u64 {
290 self.log_size_bytes() + HEADER_OFFSET
291 }
292
293 pub fn log_size_bytes(&self) -> u64 {
295 let num_entries = self.len();
296 if num_entries == 0 {
297 0
298 } else {
299 read_u64(&self.index_memory, self.index_entry_offset(num_entries - 1))
300 }
301 }
302
303 pub fn len(&self) -> u64 {
305 read_u64(&self.index_memory, Address::from(HEADER_OFFSET))
306 }
307
308 pub fn get(&self, idx: u64) -> Option<T> {
311 let mut buf = vec![];
312 self.read_entry(idx, &mut buf).ok()?;
313 Some(T::from_bytes(Cow::Owned(buf)))
314 }
315
316 pub fn iter(&self) -> Iter<'_, T, INDEX, DATA> {
318 Iter {
319 log: self,
320 buf: vec![],
321 pos: 0,
322 }
323 }
324
325 pub fn read_entry(&self, idx: u64, buf: &mut Vec<u8>) -> Result<(), NoSuchEntry> {
333 let (offset, len) = self.entry_meta(idx).ok_or(NoSuchEntry)?;
334 buf.resize(len, 0);
335 self.data_memory.read(HEADER_OFFSET + offset, buf);
336 Ok(())
337 }
338
339 pub fn append(&self, item: &T) -> Result<u64, WriteError> {
344 let idx = self.len();
345 let data_offset = if idx == 0 {
346 0
347 } else {
348 read_u64(&self.index_memory, self.index_entry_offset(idx - 1))
349 };
350
351 let bytes = item.to_bytes();
352 let new_offset = data_offset
353 .checked_add(bytes.len() as u64)
354 .expect("address overflow");
355
356 let entry_offset = HEADER_OFFSET
357 .checked_add(data_offset)
358 .expect("address overflow");
359
360 debug_assert!(new_offset >= data_offset);
361
362 safe_write(&self.data_memory, entry_offset, &bytes[..])?;
364
365 safe_write(
367 &self.index_memory,
368 self.index_entry_offset(idx).get(),
369 &new_offset.to_le_bytes(),
370 )?;
371 write_u64(&self.index_memory, Address::from(HEADER_OFFSET), idx + 1);
373
374 debug_assert_eq!(self.get(idx).unwrap().to_bytes(), bytes);
375
376 Ok(idx)
377 }
378
379 fn entry_meta(&self, idx: u64) -> Option<(u64, usize)> {
381 if self.len() <= idx {
382 return None;
383 }
384
385 if idx == 0 {
386 Some((
387 0,
388 read_u64(&self.index_memory, self.index_entry_offset(0)) as usize,
389 ))
390 } else {
391 let offset = read_u64(&self.index_memory, self.index_entry_offset(idx - 1));
392 let next = read_u64(&self.index_memory, self.index_entry_offset(idx));
393
394 debug_assert!(offset <= next);
395
396 Some((offset, (next - offset) as usize))
397 }
398 }
399
400 fn index_entry_offset(&self, idx: u64) -> Address {
402 Address::from(
403 HEADER_OFFSET + std::mem::size_of::<u64>() as u64 + idx * (std::mem::size_of::<u64>() as u64), )
406 }
407}
408
409pub struct Iter<'a, T, I, D>
410where
411 T: Storable,
412 I: Memory,
413 D: Memory,
414{
415 log: &'a Log<T, I, D>,
416 buf: Vec<u8>,
417 pos: u64,
418}
419
420impl<T, I, D> Iterator for Iter<'_, T, I, D>
421where
422 T: Storable,
423 I: Memory,
424 D: Memory,
425{
426 type Item = T;
427
428 fn next(&mut self) -> Option<T> {
429 match self.log.read_entry(self.pos, &mut self.buf) {
430 Ok(()) => {
431 self.pos = self.pos.saturating_add(1);
432 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
433 }
434 Err(NoSuchEntry) => None,
435 }
436 }
437
438 fn size_hint(&self) -> (usize, Option<usize>) {
439 (self.log.len().saturating_sub(self.pos) as usize, None)
440 }
441
442 fn count(self) -> usize {
443 let n = self.log.len().saturating_sub(self.pos);
444 if n > usize::MAX as u64 {
445 panic!("The number of items in the log {n} does not fit into usize");
446 }
447 n as usize
448 }
449
450 fn nth(&mut self, n: usize) -> Option<T> {
451 self.pos = self.pos.saturating_add(n as u64);
452 self.next()
453 }
454}
455
456pub fn iter_thread_local<T, I, D>(
458 local_key: &'static LocalKey<RefCell<Log<T, I, D>>>,
459) -> ThreadLocalRefIterator<T, I, D>
460where
461 T: Storable,
462 I: Memory,
463 D: Memory,
464{
465 ThreadLocalRefIterator {
466 log: local_key,
467 buf: vec![],
468 pos: 0,
469 }
470}
471
472pub struct ThreadLocalRefIterator<T, I, D>
473where
474 T: Storable + 'static,
475 I: Memory + 'static,
476 D: Memory + 'static,
477{
478 log: &'static LocalKey<RefCell<Log<T, I, D>>>,
479 buf: Vec<u8>,
480 pos: u64,
481}
482
483impl<T, I, D> Iterator for ThreadLocalRefIterator<T, I, D>
484where
485 T: Storable,
486 I: Memory,
487 D: Memory,
488{
489 type Item = T;
490
491 fn next(&mut self) -> Option<T> {
492 self.log.with(
493 |log| match log.borrow().read_entry(self.pos, &mut self.buf) {
494 Ok(()) => {
495 self.pos = self.pos.saturating_add(1);
496 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
497 }
498 Err(NoSuchEntry) => None,
499 },
500 )
501 }
502
503 fn size_hint(&self) -> (usize, Option<usize>) {
504 let count = self.log.with(|cell| cell.borrow().len());
505 (count.saturating_sub(self.pos) as usize, None)
506 }
507
508 fn count(self) -> usize {
509 self.size_hint().0
510 }
511
512 fn nth(&mut self, n: usize) -> Option<T> {
513 self.pos = self.pos.saturating_add(n as u64);
514 self.next()
515 }
516}