mmap_vecdeque/
mmap_vecdeque.rs

1use crate::error::MmapVecDequeError;
2use parking_lot::Mutex;
3use serde::{Serialize, Deserialize};
4use std::fs::{self, OpenOptions, File};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7use std::{ptr, mem::size_of};
8use memmap2::{MmapMut, MmapOptions};
9use std::marker::PhantomData;
10use atomicwrites::{AtomicFile, AllowOverwrite};
11
12const DEFAULT_CHUNK_SIZE: usize = 10_000;
13const LARGE_OFFSET: u64 = 1 << 32;
14
15#[derive(Serialize, Deserialize, Debug)]
16struct Metadata {
17  type_name: String,
18  element_size: usize,
19  chunk_size: usize,
20  start: u64,
21  end: u64,
22}
23
24impl Metadata {
25  fn len(&self) -> usize {
26    (self.end - self.start) as usize
27  }
28}
29
30struct Chunk {
31  mmap: MmapMut,
32  file: File,
33}
34
35pub struct MmapVecDeque<T: Copy> {
36  dir: PathBuf,
37  meta: Mutex<Metadata>,
38  chunks: Mutex<Vec<Chunk>>,
39  base_chunk: Mutex<u64>, // Tracks which chunk index corresponds to chunks[0]
40  _marker: PhantomData<T>,
41  dirty: Mutex<bool>,
42}
43
44impl<T: Copy> MmapVecDeque<T> {
45  pub fn open_or_create(dir: &Path, chunk_size: Option<usize>) -> Result<Self, MmapVecDequeError> {
46    let chunk_size = chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
47    let element_size = size_of::<T>();
48    if element_size == 0 {
49      return Err(MmapVecDequeError::ZeroSizedType);
50    }
51
52    if !dir.exists() {
53      fs::create_dir_all(dir)?;
54    }
55
56    let metadata_file = dir.join("metadata.bin");
57    let type_name = std::any::type_name::<T>().to_string();
58
59    let meta = if metadata_file.exists() {
60      let data = fs::read(&metadata_file)?;
61      let meta: Metadata = postcard::from_bytes(&data)?;
62      if meta.element_size != element_size {
63        return Err(MmapVecDequeError::ElementSizeMismatch {
64          stored: meta.element_size,
65          requested: element_size,
66        });
67      }
68      if meta.type_name != type_name {
69        return Err(MmapVecDequeError::TypeMismatch {
70          stored: meta.type_name.clone(),
71          requested: type_name.clone(),
72        });
73      }
74      if meta.chunk_size != chunk_size {
75        return Err(MmapVecDequeError::ChunkSizeMismatch {
76          stored: meta.chunk_size,
77          requested: chunk_size,
78        });
79      }
80      meta
81    } else {
82      let meta = Metadata {
83        type_name: type_name.clone(),
84        element_size,
85        chunk_size,
86        start: LARGE_OFFSET,
87        end: LARGE_OFFSET,
88      };
89      Self::atomic_write_metadata(dir, &meta)?;
90      meta
91    };
92
93    let deque = MmapVecDeque {
94      dir: dir.to_path_buf(),
95      meta: Mutex::new(meta),
96      chunks: Mutex::new(Vec::new()),
97      base_chunk: Mutex::new(0),
98      _marker: PhantomData,
99      dirty: Mutex::new(false),
100    };
101
102    deque.load_chunks()?;
103    Ok(deque)
104  }
105
106  fn atomic_write_metadata(dir: &Path, meta: &Metadata) -> Result<(), MmapVecDequeError> {
107    let data = postcard::to_stdvec(meta)?;
108    let af = AtomicFile::new(dir.join("metadata.bin"), AllowOverwrite);
109    af.write(|f| {
110      f.write_all(&data)
111    })?;
112    let dir_file = OpenOptions::new().read(true).open(dir)?;
113    dir_file.sync_all()?;
114    Ok(())
115  }
116
117  fn load_chunks(&self) -> Result<(), MmapVecDequeError> {
118    let meta = self.meta.lock();
119    let start_chunk = meta.start / meta.chunk_size as u64;
120    let end_chunk = if meta.start == meta.end {
121      start_chunk
122    } else {
123      (meta.end - 1) / meta.chunk_size as u64
124    };
125    let chunk_count = if start_chunk > end_chunk {
126      1
127    } else {
128      (end_chunk - start_chunk) + 1
129    };
130    drop(meta);
131
132    let mut chunks = self.chunks.lock();
133    chunks.clear();
134    for ch in start_chunk..(start_chunk + chunk_count) {
135      let (mmap, file) = self.open_chunk(ch, true)?;
136      chunks.push(Chunk { mmap, file });
137    }
138    drop(chunks);
139
140    *self.base_chunk.lock() = start_chunk;
141    Ok(())
142  }
143
144  fn chunk_path(&self, index: u64) -> PathBuf {
145    self.dir.join(format!("chunk_{}.bin", index))
146  }
147
148  fn open_chunk(&self, index: u64, create: bool) -> Result<(MmapMut, File), MmapVecDequeError> {
149    let meta = self.meta.lock();
150    let chunk_byte_size = meta.chunk_size * meta.element_size;
151    drop(meta);
152
153    let path = self.chunk_path(index);
154    if create && !path.exists() {
155      let f = OpenOptions::new().write(true).create(true).open(&path)?;
156      f.set_len(chunk_byte_size as u64)?;
157      f.sync_all()?;
158    }
159    let file = OpenOptions::new().read(true).write(true).open(&path)?;
160    let mmap = unsafe {
161      MmapOptions::new()
162        .len(chunk_byte_size)
163        .map_mut(&file)?
164    };
165    Ok((mmap, file))
166  }
167
168  fn flush_all_chunks(&self) -> Result<(), MmapVecDequeError> {
169    let chunks = self.chunks.lock();
170    for chunk in chunks.iter() {
171      chunk.mmap.flush()?;
172      chunk.file.sync_all()?;
173    }
174    Ok(())
175  }
176
177  fn global_to_local(&self, index: u64) -> (usize, usize) {
178    let meta = self.meta.lock();
179    let chunk_size = meta.chunk_size as u64;
180    drop(meta);
181
182    let base = *self.base_chunk.lock();
183    let chunk_idx = ((index / chunk_size) - base) as usize;
184    let elem_idx = (index % chunk_size) as usize;
185    (chunk_idx, elem_idx)
186  }
187
188  fn ensure_capacity_for(&self, index: u64) -> Result<(), MmapVecDequeError> {
189    let meta = self.meta.lock();
190    let chunk_size = meta.chunk_size as u64;
191    let needed_chunk = index / chunk_size;
192    drop(meta);
193
194    let mut chunks = self.chunks.lock();
195    let base = *self.base_chunk.lock();
196    let current_count = chunks.len() as u64;
197    if current_count == 0 {
198      let (mmap, file) = self.open_chunk(needed_chunk, true)?;
199      chunks.push(Chunk { mmap, file });
200      drop(chunks);
201      *self.base_chunk.lock() = needed_chunk;
202      return Ok(());
203    }
204
205    let current_start_chunk = base;
206    let current_end_chunk = current_start_chunk + current_count - 1;
207
208    if needed_chunk > current_end_chunk {
209      // Add chunks at the end
210      for new_idx in (current_end_chunk + 1)..=needed_chunk {
211        let (mmap, file) = self.open_chunk(new_idx, true)?;
212        chunks.push(Chunk { mmap, file });
213      }
214    } else if needed_chunk < current_start_chunk {
215      // Add chunks at the front
216      for new_idx in (needed_chunk..current_start_chunk).rev() {
217        let (mmap, file) = self.open_chunk(new_idx, true)?;
218        chunks.insert(0, Chunk { mmap, file });
219      }
220      drop(chunks);
221      *self.base_chunk.lock() = needed_chunk;
222      return Ok(());
223    }
224    drop(chunks);
225    Ok(())
226  }
227
228  fn write_element(&self, index: u64, value: T) -> Result<(), MmapVecDequeError> {
229    self.ensure_capacity_for(index)?;
230    let (chunk_idx, elem_idx) = self.global_to_local(index);
231    let chunks = self.chunks.lock();
232    let meta = self.meta.lock();
233    let element_size = meta.element_size;
234    drop(meta);
235
236    if chunk_idx >= chunks.len() {
237      return Err(MmapVecDequeError::IndexOutOfRange);
238    }
239
240    let mmap = &chunks[chunk_idx].mmap;
241    let ptr = mmap.as_ptr() as *mut u8;
242    unsafe {
243      let elem_ptr = ptr.add(elem_idx * element_size) as *mut T;
244      ptr::write(elem_ptr, value);
245    }
246    *self.dirty.lock() = true;
247    Ok(())
248  }
249
250  fn read_element(&self, index: u64) -> Result<T, MmapVecDequeError> {
251    let (chunk_idx, elem_idx) = self.global_to_local(index);
252    let chunks = self.chunks.lock();
253    let meta = self.meta.lock();
254    let element_size = meta.element_size;
255    drop(meta);
256
257    if chunk_idx >= chunks.len() {
258      return Err(MmapVecDequeError::IndexOutOfRange);
259    }
260    let mmap = &chunks[chunk_idx].mmap;
261    let ptr = mmap.as_ptr();
262    unsafe {
263      let elem_ptr = ptr.add(elem_idx * element_size) as *const T;
264      Ok(ptr::read(elem_ptr))
265    }
266  }
267
268  pub fn len(&self) -> usize {
269    let meta = self.meta.lock();
270    meta.len()
271  }
272
273  pub fn is_empty(&self) -> bool {
274    self.len() == 0
275  }
276
277  pub fn push_back(&mut self, value: T) -> Result<(), MmapVecDequeError> {
278    let mut meta = self.meta.lock();
279    let pos = meta.end;
280    meta.end += 1;
281    drop(meta);
282
283    self.write_element(pos, value)
284  }
285
286  pub fn push_front(&mut self, value: T) -> Result<(), MmapVecDequeError> {
287    let mut meta = self.meta.lock();
288    meta.start = meta.start.checked_sub(1).ok_or_else(|| MmapVecDequeError::Other("Start index underflow".to_string()))?;
289    let pos = meta.start;
290    drop(meta);
291
292    self.write_element(pos, value)
293  }
294
295  pub fn pop_back(&mut self) -> Result<Option<T>, MmapVecDequeError> {
296    let mut meta = self.meta.lock();
297    if meta.start == meta.end {
298      return Ok(None);
299    }
300    let pos = meta.end - 1;
301    meta.end = pos;
302    drop(meta);
303
304    let val = self.read_element(pos)?;
305    Ok(Some(val))
306  }
307
308  pub fn pop_front(&mut self) -> Result<Option<T>, MmapVecDequeError> {
309    let mut meta = self.meta.lock();
310    if meta.start == meta.end {
311      return Ok(None);
312    }
313    let pos = meta.start;
314    meta.start = pos.checked_add(1).ok_or_else(|| MmapVecDequeError::Other("Start index overflow".to_string()))?;
315    drop(meta);
316
317    let val = self.read_element(pos)?;
318    Ok(Some(val))
319  }
320
321  pub fn front(&self) -> Option<T> where T: Clone {
322    if self.is_empty() {
323      return None;
324    }
325    self.get(0)
326  }
327
328  pub fn back(&self) -> Option<T> where T: Clone {
329    let l = self.len();
330    if l == 0 {
331      return None;
332    }
333    self.get(l - 1)
334  }
335
336  pub fn clear(&mut self) -> Result<(), MmapVecDequeError> {
337    let mut meta = self.meta.lock();
338    meta.start = LARGE_OFFSET;
339    meta.end = LARGE_OFFSET;
340    drop(meta);
341    *self.dirty.lock() = true;
342    Ok(())
343  }
344
345  pub fn get(&self, index: usize) -> Option<T> where T: Clone {
346    let meta = self.meta.lock();
347    if index >= meta.len() {
348      return None;
349    }
350    let global_idx = meta.start + index as u64;
351    drop(meta);
352
353    match self.read_element(global_idx) {
354      Ok(val) => Some(val),
355      Err(_) => None,
356    }
357  }
358
359  pub fn get_mut(&mut self, index: usize) -> Result<Option<T>, MmapVecDequeError> where T: Clone {
360    let meta = self.meta.lock();
361    if index >= meta.len() {
362      return Ok(None);
363    }
364    let global_idx = meta.start + index as u64;
365    drop(meta);
366
367    let mut chunks = self.chunks.lock();
368    let (chunk_idx, elem_idx) = self.global_to_local(global_idx);
369    if chunk_idx >= chunks.len() {
370      return Err(MmapVecDequeError::IndexOutOfRange);
371    }
372    let mmap = &mut chunks[chunk_idx].mmap;
373    let ptr = mmap.as_mut_ptr();
374    unsafe {
375      let elem_ptr = ptr.add(elem_idx * size_of::<T>()) as *mut T;
376      let value = ptr::read(elem_ptr);
377      Ok(Some(value))
378    }
379  }
380
381  pub fn commit(&self) -> Result<(), MmapVecDequeError> {
382    if *self.dirty.lock() {
383      self.flush_all_chunks()?;
384      *self.dirty.lock() = false;
385    }
386
387    let meta = self.meta.lock();
388    Self::atomic_write_metadata(&self.dir, &*meta)?;
389    drop(meta);
390
391    self.maybe_shrink_chunks()?;
392    Ok(())
393  }
394
395  fn maybe_shrink_chunks(&self) -> Result<(), MmapVecDequeError> {
396    let meta = self.meta.lock();
397    let chunk_size = meta.chunk_size as u64;
398    let start_chunk = meta.start / chunk_size;
399    let end_chunk = if meta.end == meta.start {
400      start_chunk
401    } else {
402      (meta.end - 1) / chunk_size
403    };
404    drop(meta);
405
406    let mut chunks = self.chunks.lock();
407    let base = *self.base_chunk.lock();
408    let mut current_len = chunks.len() as u64;
409    if current_len == 0 {
410      return Ok(());
411    }
412
413    let mut current_start_chunk = base;
414
415    // Remove front chunks if not needed
416    while chunks.len() > 1 && current_start_chunk < start_chunk {
417      chunks.remove(0);
418      current_start_chunk += 1;
419    }
420
421    // Remove end chunks if not needed
422    while chunks.len() > 1 {
423      current_len = chunks.len() as u64;
424      let current_end_chunk = current_start_chunk + current_len - 1;
425      if current_end_chunk > end_chunk {
426        chunks.pop();
427      } else {
428        break;
429      }
430    }
431
432    *self.base_chunk.lock() = current_start_chunk;
433
434    Ok(())
435  }
436
437  pub fn iter(&self) -> Iter<'_, T> {
438    let len = self.len();
439    let mut pointers = Vec::with_capacity(len);
440
441    let meta = self.meta.lock();
442    let start = meta.start;
443    let chunk_size = meta.chunk_size as u64;
444    let element_size = meta.element_size;
445    drop(meta);
446
447    let base = *self.base_chunk.lock();
448    let chunks = self.chunks.lock();
449    for i in 0..len {
450      let global_idx = start + i as u64;
451      let chunk_idx = ((global_idx / chunk_size) - base) as usize;
452      let elem_idx = (global_idx % chunk_size) as usize;
453      let mmap = &chunks[chunk_idx].mmap;
454      let ptr = mmap.as_ptr();
455      let elem_ptr = unsafe { ptr.add(elem_idx * element_size) as *const T };
456      pointers.push(elem_ptr);
457    }
458    drop(chunks);
459
460    Iter {
461      pointers,
462      index: 0,
463      len,
464      _marker: PhantomData
465    }
466  }
467
468  pub fn iter_mut(&mut self) -> IterMut<'_, T> {
469    let len = self.len();
470
471    let meta = self.meta.lock();
472    let start = meta.start;
473    let chunk_size = meta.chunk_size as u64;
474    let element_size = meta.element_size;
475    drop(meta);
476
477    let base = *self.base_chunk.lock();
478    // Lock chunks for the entire iteration.
479    // The guard is stored in the iterator to keep it alive.
480    let mut chunks_guard = self.chunks.lock();
481
482    let mut pointers = Vec::with_capacity(len);
483    for i in 0..len {
484      let global_idx = start + i as u64;
485      let chunk_idx = ((global_idx / chunk_size) - base) as usize;
486      let elem_idx = (global_idx % chunk_size) as usize;
487      let mmap = &mut chunks_guard[chunk_idx].mmap;
488      let ptr = mmap.as_mut_ptr();
489      let elem_ptr = unsafe { ptr.add(elem_idx * element_size) as *mut T };
490      pointers.push(elem_ptr);
491    }
492
493    IterMut {
494      pointers,
495      index: 0,
496      len,
497      _marker: PhantomData,
498      _chunks_guard: chunks_guard,
499    }
500  }
501}
502
503pub struct Iter<'a, T: Copy> {
504  pointers: Vec<*const T>,
505  index: usize,
506  len: usize,
507  _marker: PhantomData<&'a T>,
508}
509
510impl<'a, T: Copy> Iterator for Iter<'a, T> {
511  type Item = T;
512
513  fn next(&mut self) -> Option<Self::Item> {
514    if self.index < self.len {
515      let ptr = self.pointers[self.index];
516      self.index += 1;
517      unsafe { Some(*ptr) }
518    } else {
519      None
520    }
521  }
522}
523
524impl<'a, T: Copy> ExactSizeIterator for Iter<'a, T> {}
525
526pub struct IterMut<'a, T: Copy> {
527  pointers: Vec<*mut T>,
528  index: usize,
529  len: usize,
530  _marker: PhantomData<&'a mut T>,
531  // Keep the chunks guard so memory remains valid and locked.
532  _chunks_guard: parking_lot::MutexGuard<'a, Vec<Chunk>>,
533}
534
535impl<'a, T: Copy> Iterator for IterMut<'a, T> {
536  type Item = &'a mut T;
537
538  fn next(&mut self) -> Option<Self::Item> {
539    if self.index < self.len {
540      let ptr = self.pointers[self.index];
541      self.index += 1;
542      unsafe { Some(&mut *ptr) }
543    } else {
544      None
545    }
546  }
547}
548
549impl<'a, T: Copy> ExactSizeIterator for IterMut<'a, T> {}