1use crate::{read_to_vec, read_u64, safe_write, write_u64, Address, GrowFailed, Memory, Storable};
58use std::borrow::Cow;
59use std::cell::RefCell;
60use std::fmt;
61use std::marker::PhantomData;
62use std::thread::LocalKey;
63
64#[cfg(test)]
65mod tests;
66
67pub const INDEX_MAGIC: &[u8; 3] = b"GLI";
69pub const DATA_MAGIC: &[u8; 3] = b"GLD";
71
72const LAYOUT_VERSION: u8 = 1;
74
75const HEADER_V1_SIZE: u64 = 4;
77
78const RESERVED_HEADER_SIZE: u64 = 28;
80
81const HEADER_OFFSET: u64 = HEADER_V1_SIZE + RESERVED_HEADER_SIZE;
83
84struct HeaderV1 {
85 magic: [u8; 3],
86 version: u8,
87}
88
89#[derive(Debug, PartialEq, Eq)]
90pub enum InitError {
91 IncompatibleDataVersion {
92 last_supported_version: u8,
93 decoded_version: u8,
94 },
95 IncompatibleIndexVersion {
96 last_supported_version: u8,
97 decoded_version: u8,
98 },
99 InvalidIndex,
100}
101
102impl fmt::Display for InitError {
103 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
104 match self {
105 InitError::IncompatibleDataVersion {
106 last_supported_version,
107 decoded_version,
108 } => write!(
109 f,
110 "Incompatible data version: last supported version is {}, but decoded version is {}",
111 last_supported_version, decoded_version
112 ),
113 InitError::IncompatibleIndexVersion {
114 last_supported_version,
115 decoded_version,
116 } => write!(
117 f,
118 "Incompatible index version: last supported version is {}, but decoded version is {}",
119 last_supported_version, decoded_version
120 ),
121 InitError::InvalidIndex => write!(f, "Invalid index"),
122 }
123 }
124}
125
126#[derive(Debug, PartialEq, Eq)]
127pub enum WriteError {
128 GrowFailed { current_size: u64, delta: u64 },
129}
130
131impl From<GrowFailed> for WriteError {
132 fn from(
133 GrowFailed {
134 current_size,
135 delta,
136 }: GrowFailed,
137 ) -> Self {
138 Self::GrowFailed {
139 current_size,
140 delta,
141 }
142 }
143}
144
145#[derive(Debug, PartialEq, Eq)]
146pub struct NoSuchEntry;
147
148pub struct Log<T: Storable, INDEX: Memory, DATA: Memory> {
150 index_memory: INDEX,
151 data_memory: DATA,
152 _marker: PhantomData<T>,
153}
154
155impl<T: Storable, INDEX: Memory, DATA: Memory> Log<T, INDEX, DATA> {
156 pub fn new(index_memory: INDEX, data_memory: DATA) -> Self {
158 let log = Self {
159 index_memory,
160 data_memory,
161 _marker: PhantomData,
162 };
163 Self::write_header(
164 &log.index_memory,
165 &HeaderV1 {
166 magic: *INDEX_MAGIC,
167 version: LAYOUT_VERSION,
168 },
169 );
170 Self::write_header(
171 &log.data_memory,
172 &HeaderV1 {
173 magic: *DATA_MAGIC,
174 version: LAYOUT_VERSION,
175 },
176 );
177
178 write_u64(&log.index_memory, Address::from(HEADER_OFFSET), 0);
180 log
181 }
182
183 pub fn init(index_memory: INDEX, data_memory: DATA) -> Self {
187 if data_memory.size() == 0 {
189 return Self::new(index_memory, data_memory);
190 }
191 let data_header = Self::read_header(&data_memory);
192 if &data_header.magic != DATA_MAGIC {
193 return Self::new(index_memory, data_memory);
194 }
195
196 if data_header.version != LAYOUT_VERSION {
197 panic!(
198 "Failed to initialize log: {}",
199 InitError::IncompatibleDataVersion {
200 last_supported_version: LAYOUT_VERSION,
201 decoded_version: data_header.version,
202 }
203 );
204 }
205
206 let index_header = Self::read_header(&index_memory);
207 if &index_header.magic != INDEX_MAGIC {
208 panic!("Failed to initialize log: {}", InitError::InvalidIndex);
209 }
210
211 if index_header.version != LAYOUT_VERSION {
212 panic!(
213 "Failed to initialize log: {}",
214 InitError::IncompatibleIndexVersion {
215 last_supported_version: LAYOUT_VERSION,
216 decoded_version: index_header.version,
217 }
218 );
219 }
220
221 #[cfg(debug_assertions)]
222 {
223 assert_eq!(Ok(()), Self::validate_v1_index(&index_memory));
224 }
225
226 Self {
227 index_memory,
228 data_memory,
229 _marker: PhantomData,
230 }
231 }
232
233 fn write_header(memory: &impl Memory, header: &HeaderV1) {
235 if memory.size() < 1 {
236 assert!(
237 memory.grow(1) != -1,
238 "failed to allocate the first memory page"
239 );
240 }
241 memory.write(0, &header.magic);
242 memory.write(3, &[header.version]);
243 }
244
245 fn read_header(memory: &impl Memory) -> HeaderV1 {
248 let mut magic = [0u8; 3];
249 let mut version = [0u8; 1];
250 memory.read(0, &mut magic);
251 memory.read(3, &mut version);
252 HeaderV1 {
253 magic,
254 version: version[0],
255 }
256 }
257
258 #[cfg(debug_assertions)]
259 fn validate_v1_index(memory: &INDEX) -> Result<(), String> {
260 let num_entries = read_u64(memory, Address::from(HEADER_OFFSET));
261
262 if num_entries == 0 {
263 return Ok(());
264 }
265
266 let mut prev_entry = read_u64(memory, Address::from(HEADER_OFFSET + 8));
268 for i in 1..num_entries {
269 let entry = read_u64(memory, Address::from(HEADER_OFFSET + 8 + i * 8));
270 if entry < prev_entry {
271 return Err(format!("invalid entry I[{i}]: {entry} < {prev_entry}"));
272 }
273 prev_entry = entry;
274 }
275 Ok(())
276 }
277
278 pub fn into_memories(self) -> (INDEX, DATA) {
280 (self.index_memory, self.data_memory)
281 }
282
283 pub fn is_empty(&self) -> bool {
285 self.len() == 0
286 }
287
288 pub fn index_size_bytes(&self) -> u64 {
290 let num_entries = read_u64(&self.index_memory, Address::from(HEADER_OFFSET));
291 self.index_entry_offset(num_entries).get()
292 }
293
294 pub fn data_size_bytes(&self) -> u64 {
296 self.log_size_bytes() + HEADER_OFFSET
297 }
298
299 pub fn log_size_bytes(&self) -> u64 {
301 let num_entries = self.len();
302 if num_entries == 0 {
303 0
304 } else {
305 read_u64(&self.index_memory, self.index_entry_offset(num_entries - 1))
306 }
307 }
308
309 pub fn len(&self) -> u64 {
311 read_u64(&self.index_memory, Address::from(HEADER_OFFSET))
312 }
313
314 pub fn get(&self, idx: u64) -> Option<T> {
317 let mut buf = vec![];
318 self.read_entry(idx, &mut buf).ok()?;
319 Some(T::from_bytes(Cow::Owned(buf)))
320 }
321
322 pub fn iter(&self) -> Iter<'_, T, INDEX, DATA> {
324 Iter {
325 log: self,
326 buf: vec![],
327 pos: 0,
328 }
329 }
330
331 pub fn read_entry(&self, idx: u64, buf: &mut Vec<u8>) -> Result<(), NoSuchEntry> {
339 let (offset, len) = self.entry_meta(idx).ok_or(NoSuchEntry)?;
340 read_to_vec(&self.data_memory, (HEADER_OFFSET + offset).into(), buf, len);
341 Ok(())
342 }
343
344 pub fn append(&self, item: &T) -> Result<u64, WriteError> {
349 let idx = self.len();
350 let data_offset = if idx == 0 {
351 0
352 } else {
353 read_u64(&self.index_memory, self.index_entry_offset(idx - 1))
354 };
355
356 let bytes = item.to_bytes();
357 let new_offset = data_offset
358 .checked_add(bytes.len() as u64)
359 .expect("address overflow");
360
361 let entry_offset = HEADER_OFFSET
362 .checked_add(data_offset)
363 .expect("address overflow");
364
365 debug_assert!(new_offset >= data_offset);
366
367 safe_write(&self.data_memory, entry_offset, &bytes[..])?;
369
370 safe_write(
372 &self.index_memory,
373 self.index_entry_offset(idx).get(),
374 &new_offset.to_le_bytes(),
375 )?;
376 write_u64(&self.index_memory, Address::from(HEADER_OFFSET), idx + 1);
378
379 debug_assert_eq!(self.get(idx).unwrap().to_bytes(), bytes);
380
381 Ok(idx)
382 }
383
384 fn entry_meta(&self, idx: u64) -> Option<(u64, usize)> {
386 if self.len() <= idx {
387 return None;
388 }
389
390 if idx == 0 {
391 Some((
392 0,
393 read_u64(&self.index_memory, self.index_entry_offset(0)) as usize,
394 ))
395 } else {
396 let offset = read_u64(&self.index_memory, self.index_entry_offset(idx - 1));
397 let next = read_u64(&self.index_memory, self.index_entry_offset(idx));
398
399 debug_assert!(offset <= next);
400
401 Some((offset, (next - offset) as usize))
402 }
403 }
404
405 fn index_entry_offset(&self, idx: u64) -> Address {
407 Address::from(
408 HEADER_OFFSET + std::mem::size_of::<u64>() as u64 + idx * (std::mem::size_of::<u64>() as u64), )
411 }
412}
413
414pub struct Iter<'a, T, I, D>
415where
416 T: Storable,
417 I: Memory,
418 D: Memory,
419{
420 log: &'a Log<T, I, D>,
421 buf: Vec<u8>,
422 pos: u64,
423}
424
425impl<T, I, D> Iterator for Iter<'_, T, I, D>
426where
427 T: Storable,
428 I: Memory,
429 D: Memory,
430{
431 type Item = T;
432
433 fn next(&mut self) -> Option<T> {
434 match self.log.read_entry(self.pos, &mut self.buf) {
435 Ok(()) => {
436 self.pos = self.pos.saturating_add(1);
437 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
438 }
439 Err(NoSuchEntry) => None,
440 }
441 }
442
443 fn size_hint(&self) -> (usize, Option<usize>) {
444 (self.log.len().saturating_sub(self.pos) as usize, None)
445 }
446
447 fn count(self) -> usize {
448 let n = self.log.len().saturating_sub(self.pos);
449 if n > usize::MAX as u64 {
450 panic!("The number of items in the log {n} does not fit into usize");
451 }
452 n as usize
453 }
454
455 fn nth(&mut self, n: usize) -> Option<T> {
456 self.pos = self.pos.saturating_add(n as u64);
457 self.next()
458 }
459}
460
461pub fn iter_thread_local<T, I, D>(
463 local_key: &'static LocalKey<RefCell<Log<T, I, D>>>,
464) -> ThreadLocalRefIterator<T, I, D>
465where
466 T: Storable,
467 I: Memory,
468 D: Memory,
469{
470 ThreadLocalRefIterator {
471 log: local_key,
472 buf: vec![],
473 pos: 0,
474 }
475}
476
477pub struct ThreadLocalRefIterator<T, I, D>
478where
479 T: Storable + 'static,
480 I: Memory + 'static,
481 D: Memory + 'static,
482{
483 log: &'static LocalKey<RefCell<Log<T, I, D>>>,
484 buf: Vec<u8>,
485 pos: u64,
486}
487
488impl<T, I, D> Iterator for ThreadLocalRefIterator<T, I, D>
489where
490 T: Storable,
491 I: Memory,
492 D: Memory,
493{
494 type Item = T;
495
496 fn next(&mut self) -> Option<T> {
497 self.log.with(
498 |log| match log.borrow().read_entry(self.pos, &mut self.buf) {
499 Ok(()) => {
500 self.pos = self.pos.saturating_add(1);
501 Some(T::from_bytes(Cow::Borrowed(&self.buf)))
502 }
503 Err(NoSuchEntry) => None,
504 },
505 )
506 }
507
508 fn size_hint(&self) -> (usize, Option<usize>) {
509 let count = self.log.with(|cell| cell.borrow().len());
510 (count.saturating_sub(self.pos) as usize, None)
511 }
512
513 fn count(self) -> usize {
514 self.size_hint().0
515 }
516
517 fn nth(&mut self, n: usize) -> Option<T> {
518 self.pos = self.pos.saturating_add(n as u64);
519 self.next()
520 }
521}