1use crate::{read_to_vec, read_u64, safe_write, write_u64, Address, GrowFailed, Memory, Storable};
58use std::borrow::Cow;
59use std::cell::RefCell;
60use std::fmt;
61use std::marker::PhantomData;
62use std::thread::LocalKey;
63
64#[cfg(test)]
65mod tests;
66
67pub const INDEX_MAGIC: &[u8; 3] = b"GLI";
69pub const DATA_MAGIC: &[u8; 3] = b"GLD";
71
72const LAYOUT_VERSION: u8 = 1;
74
75const HEADER_V1_SIZE: u64 = 4;
77
78const RESERVED_HEADER_SIZE: u64 = 28;
80
81const HEADER_OFFSET: u64 = HEADER_V1_SIZE + RESERVED_HEADER_SIZE;
83
84struct HeaderV1 {
85 magic: [u8; 3],
86 version: u8,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90pub enum InitError {
91 IncompatibleDataVersion {
92 last_supported_version: u8,
93 decoded_version: u8,
94 },
95 IncompatibleIndexVersion {
96 last_supported_version: u8,
97 decoded_version: u8,
98 },
99 InvalidIndex,
100}
101
102impl fmt::Display for InitError {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 InitError::IncompatibleDataVersion {
106 last_supported_version,
107 decoded_version,
108 } => write!(
109 f,
110 "Incompatible data version: last supported version is {}, but decoded version is {}",
111 last_supported_version, decoded_version
112 ),
113 InitError::IncompatibleIndexVersion {
114 last_supported_version,
115 decoded_version,
116 } => write!(
117 f,
118 "Incompatible index version: last supported version is {}, but decoded version is {}",
119 last_supported_version, decoded_version
120 ),
121 InitError::InvalidIndex => write!(f, "Invalid index"),
122 }
123 }
124}
125
126#[derive(Debug, PartialEq, Eq)]
127pub enum WriteError {
128 GrowFailed { current_size: u64, delta: u64 },
129}
130
131impl From<GrowFailed> for WriteError {
132 fn from(
133 GrowFailed {
134 current_size,
135 delta,
136 }: GrowFailed,
137 ) -> Self {
138 Self::GrowFailed {
139 current_size,
140 delta,
141 }
142 }
143}
144
145#[derive(Debug, PartialEq, Eq)]
146pub struct NoSuchEntry;
147
148pub struct Log<T: Storable, INDEX: Memory, DATA: Memory> {
150 index_memory: INDEX,
151 data_memory: DATA,
152 _marker: PhantomData<T>,
153}
154
155impl<T: Storable, INDEX: Memory, DATA: Memory> Log<T, INDEX, DATA> {
156 pub fn new(index_memory: INDEX, data_memory: DATA) -> Self {
158 let log = Self {
159 index_memory,
160 data_memory,
161 _marker: PhantomData,
162 };
163 Self::write_header(
164 &log.index_memory,
165 &HeaderV1 {
166 magic: *INDEX_MAGIC,
167 version: LAYOUT_VERSION,
168 },
169 );
170 Self::write_header(
171 &log.data_memory,
172 &HeaderV1 {
173 magic: *DATA_MAGIC,
174 version: LAYOUT_VERSION,
175 },
176 );
177
178 write_u64(&log.index_memory, Address::from(HEADER_OFFSET), 0);
180 log
181 }
182
183 pub fn init(index_memory: INDEX, data_memory: DATA) -> Result<Self, InitError> {
187 if data_memory.size() == 0 {
189 return Ok(Self::new(index_memory, data_memory));
190 }
191 let data_header = Self::read_header(&data_memory);
192 if &data_header.magic != DATA_MAGIC {
193 return Ok(Self::new(index_memory, data_memory));
194 }
195
196 if data_header.version != LAYOUT_VERSION {
197 return Err(InitError::IncompatibleDataVersion {
198 last_supported_version: LAYOUT_VERSION,
199 decoded_version: data_header.version,
200 });
201 }
202
203 let index_header = Self::read_header(&index_memory);
204 if &index_header.magic != INDEX_MAGIC {
205 return Err(InitError::InvalidIndex);
206 }
207
208 if index_header.version != LAYOUT_VERSION {
209 return Err(InitError::IncompatibleIndexVersion {
210 last_supported_version: LAYOUT_VERSION,
211 decoded_version: index_header.version,
212 });
213 }
214
215 #[cfg(debug_assertions)]
216 {
217 assert_eq!(Ok(()), Self::validate_v1_index(&index_memory));
218 }
219
220 Ok(Self {
221 index_memory,
222 data_memory,
223 _marker: PhantomData,
224 })
225 }
226
227 fn write_header(memory: &impl Memory, header: &HeaderV1) {
229 if memory.size() < 1 {
230 assert!(
231 memory.grow(1) != -1,
232 "failed to allocate the first memory page"
233 );
234 }
235 memory.write(0, &header.magic);
236 memory.write(3, &[header.version]);
237 }
238
239 fn read_header(memory: &impl Memory) -> HeaderV1 {
242 let mut magic = [0u8; 3];
243 let mut version = [0u8; 1];
244 memory.read(0, &mut magic);
245 memory.read(3, &mut version);
246 HeaderV1 {
247 magic,
248 version: version[0],
249 }
250 }
251
252 #[cfg(debug_assertions)]
253 fn validate_v1_index(memory: &INDEX) -> Result<(), String> {
254 let num_entries = read_u64(memory, Address::from(HEADER_OFFSET));
255
256 if num_entries == 0 {
257 return Ok(());
258 }
259
260 let mut prev_entry = read_u64(memory, Address::from(HEADER_OFFSET + 8));
262 for i in 1..num_entries {
263 let entry = read_u64(memory, Address::from(HEADER_OFFSET + 8 + i * 8));
264 if entry < prev_entry {
265 return Err(format!("invalid entry I[{i}]: {entry} < {prev_entry}"));
266 }
267 prev_entry = entry;
268 }
269 Ok(())
270 }
271
272 pub fn into_memories(self) -> (INDEX, DATA) {
274 (self.index_memory, self.data_memory)
275 }
276
277 pub fn is_empty(&self) -> bool {
279 self.len() == 0
280 }
281
282 pub fn index_size_bytes(&self) -> u64 {
284 let num_entries = read_u64(&self.index_memory, Address::from(HEADER_OFFSET));
285 self.index_entry_offset(num_entries).get()
286 }
287
288 pub fn data_size_bytes(&self) -> u64 {
290 self.log_size_bytes() + HEADER_OFFSET
291 }
292
293 pub fn log_size_bytes(&self) -> u64 {
295 let num_entries = self.len();
296 if num_entries == 0 {
297 0
298 } else {
299 read_u64(&self.index_memory, self.index_entry_offset(num_entries - 1))
300 }
301 }
302
303 pub fn len(&self) -> u64 {
305 read_u64(&self.index_memory, Address::from(HEADER_OFFSET))
306 }
307
308 pub fn get(&self, idx: u64) -> Option<T> {
311 let mut buf = vec![];
312 self.read_entry(idx, &mut buf).ok()?;
313 Some(T::from_bytes(Cow::Owned(buf)))
314 }
315
316 pub fn iter(&self) -> Iter<'_, T, INDEX, DATA> {
318 Iter {
319 log: self,
320 buf: vec![],
321 pos: 0,
322 }
323 }
324
325 pub fn read_entry(&self, idx: u64, buf: &mut Vec<u8>) -> Result<(), NoSuchEntry> {
333 let (offset, len) = self.entry_meta(idx).ok_or(NoSuchEntry)?;
334 read_to_vec(&self.data_memory, (HEADER_OFFSET + offset).into(), buf, len);
335 Ok(())
336 }
337
338 pub fn append(&self, item: &T) -> Result<u64, WriteError> {
343 let idx = self.len();
344 let data_offset = if idx == 0 {
345 0
346 } else {
347 read_u64(&self.index_memory, self.index_entry_offset(idx - 1))
348 };
349
350 let bytes = item.to_bytes();
351 let new_offset = data_offset
352 .checked_add(bytes.len() as u64)
353 .expect("address overflow");
354
355 let entry_offset = HEADER_OFFSET
356 .checked_add(data_offset)
357 .expect("address overflow");
358
359 debug_assert!(new_offset >= data_offset);
360
361 safe_write(&self.data_memory, entry_offset, &bytes[..])?;
363
364 safe_write(
366 &self.index_memory,
367 self.index_entry_offset(idx).get(),
368 &new_offset.to_le_bytes(),
369 )?;
370 write_u64(&self.index_memory, Address::from(HEADER_OFFSET), idx + 1);
372
373 debug_assert_eq!(self.get(idx).unwrap().to_bytes(), bytes);
374
375 Ok(idx)
376 }
377
378 fn entry_meta(&self, idx: u64) -> Option<(u64, usize)> {
380 if self.len() <= idx {
381 return None;
382 }
383
384 if idx == 0 {
385 Some((
386 0,
387 read_u64(&self.index_memory, self.index_entry_offset(0)) as usize,
388 ))
389 } else {
390 let offset = read_u64(&self.index_memory, self.index_entry_offset(idx - 1));
391 let next = read_u64(&self.index_memory, self.index_entry_offset(idx));
392
393 debug_assert!(offset <= next);
394
395 Some((offset, (next - offset) as usize))
396 }
397 }
398
399 fn index_entry_offset(&self, idx: u64) -> Address {
401 Address::from(
402 HEADER_OFFSET + std::mem::size_of::<u64>() as u64 + idx * (std::mem::size_of::<u64>() as u64), )
405 }
406}
407
408pub struct Iter<'a, T, I, D>
409where
410 T: Storable,
411 I: Memory,
412 D: Memory,
413{
414 log: &'a Log<T, I, D>,
415 buf: Vec<u8>,
416 pos: u64,
417}
418
419impl<T, I, D> Iterator for Iter<'_, T, I, D>
420where
421 T: Storable,
422 I: Memory,
423 D: Memory,
424{
425 type Item = T;
426
427 fn next(&mut self) -> Option<T> {
428 match self.log.read_entry(self.pos, &mut self.buf) {
429 Ok(()) => {
430 self.pos = self.pos.saturating_add(1);
431 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
432 }
433 Err(NoSuchEntry) => None,
434 }
435 }
436
437 fn size_hint(&self) -> (usize, Option<usize>) {
438 (self.log.len().saturating_sub(self.pos) as usize, None)
439 }
440
441 fn count(self) -> usize {
442 let n = self.log.len().saturating_sub(self.pos);
443 if n > usize::MAX as u64 {
444 panic!("The number of items in the log {n} does not fit into usize");
445 }
446 n as usize
447 }
448
449 fn nth(&mut self, n: usize) -> Option<T> {
450 self.pos = self.pos.saturating_add(n as u64);
451 self.next()
452 }
453}
454
455pub fn iter_thread_local<T, I, D>(
457 local_key: &'static LocalKey<RefCell<Log<T, I, D>>>,
458) -> ThreadLocalRefIterator<T, I, D>
459where
460 T: Storable,
461 I: Memory,
462 D: Memory,
463{
464 ThreadLocalRefIterator {
465 log: local_key,
466 buf: vec![],
467 pos: 0,
468 }
469}
470
471pub struct ThreadLocalRefIterator<T, I, D>
472where
473 T: Storable + 'static,
474 I: Memory + 'static,
475 D: Memory + 'static,
476{
477 log: &'static LocalKey<RefCell<Log<T, I, D>>>,
478 buf: Vec<u8>,
479 pos: u64,
480}
481
482impl<T, I, D> Iterator for ThreadLocalRefIterator<T, I, D>
483where
484 T: Storable,
485 I: Memory,
486 D: Memory,
487{
488 type Item = T;
489
490 fn next(&mut self) -> Option<T> {
491 self.log.with(
492 |log| match log.borrow().read_entry(self.pos, &mut self.buf) {
493 Ok(()) => {
494 self.pos = self.pos.saturating_add(1);
495 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
496 }
497 Err(NoSuchEntry) => None,
498 },
499 )
500 }
501
502 fn size_hint(&self) -> (usize, Option<usize>) {
503 let count = self.log.with(|cell| cell.borrow().len());
504 (count.saturating_sub(self.pos) as usize, None)
505 }
506
507 fn count(self) -> usize {
508 self.size_hint().0
509 }
510
511 fn nth(&mut self, n: usize) -> Option<T> {
512 self.pos = self.pos.saturating_add(n as u64);
513 self.next()
514 }
515}