1use crate::{read_to_vec, read_u64, safe_write, write_u64, Address, GrowFailed, Memory, Storable};
58use std::borrow::Cow;
59use std::cell::RefCell;
60use std::fmt;
61use std::marker::PhantomData;
62use std::thread::LocalKey;
63
64#[cfg(test)]
65mod tests;
66
67pub const INDEX_MAGIC: &[u8; 3] = b"GLI";
69pub const DATA_MAGIC: &[u8; 3] = b"GLD";
71
72const LAYOUT_VERSION: u8 = 1;
74
75const HEADER_V1_SIZE: u64 = 4;
77
78const RESERVED_HEADER_SIZE: u64 = 28;
80
81const HEADER_OFFSET: u64 = HEADER_V1_SIZE + RESERVED_HEADER_SIZE;
83
84struct HeaderV1 {
85 magic: [u8; 3],
86 version: u8,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90pub enum InitError {
91 IncompatibleDataVersion {
92 last_supported_version: u8,
93 decoded_version: u8,
94 },
95 IncompatibleIndexVersion {
96 last_supported_version: u8,
97 decoded_version: u8,
98 },
99 InvalidIndex,
100}
101
102impl fmt::Display for InitError {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 InitError::IncompatibleDataVersion {
106 last_supported_version,
107 decoded_version,
108 } => write!(
109 f,
110 "Incompatible data version: last supported version is {}, but decoded version is {}",
111 last_supported_version, decoded_version
112 ),
113 InitError::IncompatibleIndexVersion {
114 last_supported_version,
115 decoded_version,
116 } => write!(
117 f,
118 "Incompatible index version: last supported version is {}, but decoded version is {}",
119 last_supported_version, decoded_version
120 ),
121 InitError::InvalidIndex => write!(f, "Invalid index"),
122 }
123 }
124}
125
126#[derive(Debug, PartialEq, Eq)]
127pub enum WriteError {
128 GrowFailed { current_size: u64, delta: u64 },
129}
130
131impl From<GrowFailed> for WriteError {
132 fn from(
133 GrowFailed {
134 current_size,
135 delta,
136 }: GrowFailed,
137 ) -> Self {
138 Self::GrowFailed {
139 current_size,
140 delta,
141 }
142 }
143}
144
145#[derive(Debug, PartialEq, Eq)]
146pub struct NoSuchEntry;
147
148pub struct Log<T: Storable, INDEX: Memory, DATA: Memory> {
150 index_memory: INDEX,
151 data_memory: DATA,
152 _marker: PhantomData<T>,
153}
154
155impl<T: Storable, INDEX: Memory, DATA: Memory> Log<T, INDEX, DATA> {
156 pub fn new(index_memory: INDEX, data_memory: DATA) -> Self {
158 let log = Self {
159 index_memory,
160 data_memory,
161 _marker: PhantomData,
162 };
163 Self::write_header(
164 &log.index_memory,
165 &HeaderV1 {
166 magic: *INDEX_MAGIC,
167 version: LAYOUT_VERSION,
168 },
169 );
170 Self::write_header(
171 &log.data_memory,
172 &HeaderV1 {
173 magic: *DATA_MAGIC,
174 version: LAYOUT_VERSION,
175 },
176 );
177
178 write_u64(&log.index_memory, Address::from(HEADER_OFFSET), 0);
180 log
181 }
182
183 #[inline]
187 pub fn first(&self) -> Option<T> {
188 self.get(0)
189 }
190
191 #[inline]
195 pub fn last(&self) -> Option<T> {
196 let len = self.len();
197 if len == 0 {
198 return None;
199 }
200 self.get(len - 1)
201 }
202
203 pub fn init(index_memory: INDEX, data_memory: DATA) -> Self {
207 if data_memory.size() == 0 {
209 return Self::new(index_memory, data_memory);
210 }
211 let data_header = Self::read_header(&data_memory);
212 if &data_header.magic != DATA_MAGIC {
213 return Self::new(index_memory, data_memory);
214 }
215
216 if data_header.version != LAYOUT_VERSION {
217 panic!(
218 "Failed to initialize log: {}",
219 InitError::IncompatibleDataVersion {
220 last_supported_version: LAYOUT_VERSION,
221 decoded_version: data_header.version,
222 }
223 );
224 }
225
226 let index_header = Self::read_header(&index_memory);
227 if &index_header.magic != INDEX_MAGIC {
228 panic!("Failed to initialize log: {}", InitError::InvalidIndex);
229 }
230
231 if index_header.version != LAYOUT_VERSION {
232 panic!(
233 "Failed to initialize log: {}",
234 InitError::IncompatibleIndexVersion {
235 last_supported_version: LAYOUT_VERSION,
236 decoded_version: index_header.version,
237 }
238 );
239 }
240
241 #[cfg(debug_assertions)]
242 {
243 assert_eq!(Ok(()), Self::validate_v1_index(&index_memory));
244 }
245
246 Self {
247 index_memory,
248 data_memory,
249 _marker: PhantomData,
250 }
251 }
252
253 fn write_header(memory: &impl Memory, header: &HeaderV1) {
255 if memory.size() < 1 {
256 assert!(
257 memory.grow(1) != -1,
258 "failed to allocate the first memory page"
259 );
260 }
261 memory.write(0, &header.magic);
262 memory.write(3, &[header.version]);
263 }
264
265 fn read_header(memory: &impl Memory) -> HeaderV1 {
268 let mut magic = [0u8; 3];
269 let mut version = [0u8; 1];
270 memory.read(0, &mut magic);
271 memory.read(3, &mut version);
272 HeaderV1 {
273 magic,
274 version: version[0],
275 }
276 }
277
278 #[cfg(debug_assertions)]
279 fn validate_v1_index(memory: &INDEX) -> Result<(), String> {
280 let num_entries = read_u64(memory, Address::from(HEADER_OFFSET));
281
282 if num_entries == 0 {
283 return Ok(());
284 }
285
286 let mut prev_entry = read_u64(memory, Address::from(HEADER_OFFSET + 8));
288 for i in 1..num_entries {
289 let entry = read_u64(memory, Address::from(HEADER_OFFSET + 8 + i * 8));
290 if entry < prev_entry {
291 return Err(format!("invalid entry I[{i}]: {entry} < {prev_entry}"));
292 }
293 prev_entry = entry;
294 }
295 Ok(())
296 }
297
298 pub fn into_memories(self) -> (INDEX, DATA) {
300 (self.index_memory, self.data_memory)
301 }
302
303 pub fn is_empty(&self) -> bool {
305 self.len() == 0
306 }
307
308 pub fn index_size_bytes(&self) -> u64 {
310 let num_entries = read_u64(&self.index_memory, Address::from(HEADER_OFFSET));
311 self.index_entry_offset(num_entries).get()
312 }
313
314 pub fn data_size_bytes(&self) -> u64 {
316 self.log_size_bytes() + HEADER_OFFSET
317 }
318
319 pub fn log_size_bytes(&self) -> u64 {
321 let num_entries = self.len();
322 if num_entries == 0 {
323 0
324 } else {
325 read_u64(&self.index_memory, self.index_entry_offset(num_entries - 1))
326 }
327 }
328
329 pub fn len(&self) -> u64 {
331 read_u64(&self.index_memory, Address::from(HEADER_OFFSET))
332 }
333
334 pub fn get(&self, idx: u64) -> Option<T> {
337 let mut buf = vec![];
338 self.read_entry(idx, &mut buf).ok()?;
339 Some(T::from_bytes(Cow::Owned(buf)))
340 }
341
342 pub fn iter(&self) -> Iter<'_, T, INDEX, DATA> {
344 Iter {
345 log: self,
346 buf: vec![],
347 pos: 0,
348 }
349 }
350
351 pub fn read_entry(&self, idx: u64, buf: &mut Vec<u8>) -> Result<(), NoSuchEntry> {
359 let (offset, len) = self.entry_meta(idx).ok_or(NoSuchEntry)?;
360 read_to_vec(&self.data_memory, (HEADER_OFFSET + offset).into(), buf, len);
361 Ok(())
362 }
363
364 pub fn append(&self, item: &T) -> Result<u64, WriteError> {
369 let idx = self.len();
370 let data_offset = if idx == 0 {
371 0
372 } else {
373 read_u64(&self.index_memory, self.index_entry_offset(idx - 1))
374 };
375
376 let bytes = item.to_bytes();
377 let new_offset = data_offset
378 .checked_add(bytes.len() as u64)
379 .expect("address overflow");
380
381 let entry_offset = HEADER_OFFSET
382 .checked_add(data_offset)
383 .expect("address overflow");
384
385 debug_assert!(new_offset >= data_offset);
386
387 safe_write(&self.data_memory, entry_offset, &bytes[..])?;
389
390 safe_write(
392 &self.index_memory,
393 self.index_entry_offset(idx).get(),
394 &new_offset.to_le_bytes(),
395 )?;
396 write_u64(&self.index_memory, Address::from(HEADER_OFFSET), idx + 1);
398
399 debug_assert_eq!(self.get(idx).unwrap().to_bytes(), bytes);
400
401 Ok(idx)
402 }
403
404 fn entry_meta(&self, idx: u64) -> Option<(u64, usize)> {
406 if self.len() <= idx {
407 return None;
408 }
409
410 if idx == 0 {
411 Some((
412 0,
413 read_u64(&self.index_memory, self.index_entry_offset(0)) as usize,
414 ))
415 } else {
416 let offset = read_u64(&self.index_memory, self.index_entry_offset(idx - 1));
417 let next = read_u64(&self.index_memory, self.index_entry_offset(idx));
418
419 debug_assert!(offset <= next);
420
421 Some((offset, (next - offset) as usize))
422 }
423 }
424
425 fn index_entry_offset(&self, idx: u64) -> Address {
427 Address::from(
428 HEADER_OFFSET + std::mem::size_of::<u64>() as u64 + idx * (std::mem::size_of::<u64>() as u64), )
431 }
432}
433
434pub struct Iter<'a, T, I, D>
435where
436 T: Storable,
437 I: Memory,
438 D: Memory,
439{
440 log: &'a Log<T, I, D>,
441 buf: Vec<u8>,
442 pos: u64,
443}
444
445impl<T, I, D> Iterator for Iter<'_, T, I, D>
446where
447 T: Storable,
448 I: Memory,
449 D: Memory,
450{
451 type Item = T;
452
453 fn next(&mut self) -> Option<T> {
454 match self.log.read_entry(self.pos, &mut self.buf) {
455 Ok(()) => {
456 self.pos = self.pos.saturating_add(1);
457 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
458 }
459 Err(NoSuchEntry) => None,
460 }
461 }
462
463 fn size_hint(&self) -> (usize, Option<usize>) {
464 (self.log.len().saturating_sub(self.pos) as usize, None)
465 }
466
467 fn count(self) -> usize {
468 let n = self.log.len().saturating_sub(self.pos);
469 if n > usize::MAX as u64 {
470 panic!("The number of items in the log {n} does not fit into usize");
471 }
472 n as usize
473 }
474
475 fn nth(&mut self, n: usize) -> Option<T> {
476 self.pos = self.pos.saturating_add(n as u64);
477 self.next()
478 }
479}
480
481pub fn iter_thread_local<T, I, D>(
483 local_key: &'static LocalKey<RefCell<Log<T, I, D>>>,
484) -> ThreadLocalRefIterator<T, I, D>
485where
486 T: Storable,
487 I: Memory,
488 D: Memory,
489{
490 ThreadLocalRefIterator {
491 log: local_key,
492 buf: vec![],
493 pos: 0,
494 }
495}
496
497pub struct ThreadLocalRefIterator<T, I, D>
498where
499 T: Storable + 'static,
500 I: Memory + 'static,
501 D: Memory + 'static,
502{
503 log: &'static LocalKey<RefCell<Log<T, I, D>>>,
504 buf: Vec<u8>,
505 pos: u64,
506}
507
508impl<T, I, D> Iterator for ThreadLocalRefIterator<T, I, D>
509where
510 T: Storable,
511 I: Memory,
512 D: Memory,
513{
514 type Item = T;
515
516 fn next(&mut self) -> Option<T> {
517 self.log.with(
518 |log| match log.borrow().read_entry(self.pos, &mut self.buf) {
519 Ok(()) => {
520 self.pos = self.pos.saturating_add(1);
521 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
522 }
523 Err(NoSuchEntry) => None,
524 },
525 )
526 }
527
528 fn size_hint(&self) -> (usize, Option<usize>) {
529 let count = self.log.with(|cell| cell.borrow().len());
530 (count.saturating_sub(self.pos) as usize, None)
531 }
532
533 fn count(self) -> usize {
534 self.size_hint().0
535 }
536
537 fn nth(&mut self, n: usize) -> Option<T> {
538 self.pos = self.pos.saturating_add(n as u64);
539 self.next()
540 }
541}