1use crate::error::MmapVecDequeError;
2use parking_lot::Mutex;
3use serde::{Serialize, Deserialize};
4use std::fs::{self, OpenOptions, File};
5use std::io::Write;
6use std::path::{Path, PathBuf};
7use std::{ptr, mem::size_of};
8use memmap2::{MmapMut, MmapOptions};
9use std::marker::PhantomData;
10use atomicwrites::{AtomicFile, AllowOverwrite};
11
12const DEFAULT_CHUNK_SIZE: usize = 10_000;
13const LARGE_OFFSET: u64 = 1 << 32;
14
15#[derive(Serialize, Deserialize, Debug)]
16struct Metadata {
17 type_name: String,
18 element_size: usize,
19 chunk_size: usize,
20 start: u64,
21 end: u64,
22}
23
24impl Metadata {
25 fn len(&self) -> usize {
26 (self.end - self.start) as usize
27 }
28}
29
30struct Chunk {
31 mmap: MmapMut,
32 file: File,
33}
34
35pub struct MmapVecDeque<T: Copy> {
36 dir: PathBuf,
37 meta: Mutex<Metadata>,
38 chunks: Mutex<Vec<Chunk>>,
39 base_chunk: Mutex<u64>, _marker: PhantomData<T>,
41 dirty: Mutex<bool>,
42}
43
44impl<T: Copy> MmapVecDeque<T> {
45 pub fn open_or_create(dir: &Path, chunk_size: Option<usize>) -> Result<Self, MmapVecDequeError> {
46 let chunk_size = chunk_size.unwrap_or(DEFAULT_CHUNK_SIZE);
47 let element_size = size_of::<T>();
48 if element_size == 0 {
49 return Err(MmapVecDequeError::ZeroSizedType);
50 }
51
52 if !dir.exists() {
53 fs::create_dir_all(dir)?;
54 }
55
56 let metadata_file = dir.join("metadata.bin");
57 let type_name = std::any::type_name::<T>().to_string();
58
59 let meta = if metadata_file.exists() {
60 let data = fs::read(&metadata_file)?;
61 let meta: Metadata = postcard::from_bytes(&data)?;
62 if meta.element_size != element_size {
63 return Err(MmapVecDequeError::ElementSizeMismatch {
64 stored: meta.element_size,
65 requested: element_size,
66 });
67 }
68 if meta.type_name != type_name {
69 return Err(MmapVecDequeError::TypeMismatch {
70 stored: meta.type_name.clone(),
71 requested: type_name.clone(),
72 });
73 }
74 if meta.chunk_size != chunk_size {
75 return Err(MmapVecDequeError::ChunkSizeMismatch {
76 stored: meta.chunk_size,
77 requested: chunk_size,
78 });
79 }
80 meta
81 } else {
82 let meta = Metadata {
83 type_name: type_name.clone(),
84 element_size,
85 chunk_size,
86 start: LARGE_OFFSET,
87 end: LARGE_OFFSET,
88 };
89 Self::atomic_write_metadata(dir, &meta)?;
90 meta
91 };
92
93 let deque = MmapVecDeque {
94 dir: dir.to_path_buf(),
95 meta: Mutex::new(meta),
96 chunks: Mutex::new(Vec::new()),
97 base_chunk: Mutex::new(0),
98 _marker: PhantomData,
99 dirty: Mutex::new(false),
100 };
101
102 deque.load_chunks()?;
103 Ok(deque)
104 }
105
106 fn atomic_write_metadata(dir: &Path, meta: &Metadata) -> Result<(), MmapVecDequeError> {
107 let data = postcard::to_stdvec(meta)?;
108 let af = AtomicFile::new(dir.join("metadata.bin"), AllowOverwrite);
109 af.write(|f| {
110 f.write_all(&data)
111 })?;
112 let dir_file = OpenOptions::new().read(true).open(dir)?;
113 dir_file.sync_all()?;
114 Ok(())
115 }
116
117 fn load_chunks(&self) -> Result<(), MmapVecDequeError> {
118 let meta = self.meta.lock();
119 let start_chunk = meta.start / meta.chunk_size as u64;
120 let end_chunk = if meta.start == meta.end {
121 start_chunk
122 } else {
123 (meta.end - 1) / meta.chunk_size as u64
124 };
125 let chunk_count = if start_chunk > end_chunk {
126 1
127 } else {
128 (end_chunk - start_chunk) + 1
129 };
130 drop(meta);
131
132 let mut chunks = self.chunks.lock();
133 chunks.clear();
134 for ch in start_chunk..(start_chunk + chunk_count) {
135 let (mmap, file) = self.open_chunk(ch, true)?;
136 chunks.push(Chunk { mmap, file });
137 }
138 drop(chunks);
139
140 *self.base_chunk.lock() = start_chunk;
141 Ok(())
142 }
143
144 fn chunk_path(&self, index: u64) -> PathBuf {
145 self.dir.join(format!("chunk_{}.bin", index))
146 }
147
148 fn open_chunk(&self, index: u64, create: bool) -> Result<(MmapMut, File), MmapVecDequeError> {
149 let meta = self.meta.lock();
150 let chunk_byte_size = meta.chunk_size * meta.element_size;
151 drop(meta);
152
153 let path = self.chunk_path(index);
154 if create && !path.exists() {
155 let f = OpenOptions::new().write(true).create(true).open(&path)?;
156 f.set_len(chunk_byte_size as u64)?;
157 f.sync_all()?;
158 }
159 let file = OpenOptions::new().read(true).write(true).open(&path)?;
160 let mmap = unsafe {
161 MmapOptions::new()
162 .len(chunk_byte_size)
163 .map_mut(&file)?
164 };
165 Ok((mmap, file))
166 }
167
168 fn flush_all_chunks(&self) -> Result<(), MmapVecDequeError> {
169 let chunks = self.chunks.lock();
170 for chunk in chunks.iter() {
171 chunk.mmap.flush()?;
172 chunk.file.sync_all()?;
173 }
174 Ok(())
175 }
176
177 fn global_to_local(&self, index: u64) -> (usize, usize) {
178 let meta = self.meta.lock();
179 let chunk_size = meta.chunk_size as u64;
180 drop(meta);
181
182 let base = *self.base_chunk.lock();
183 let chunk_idx = ((index / chunk_size) - base) as usize;
184 let elem_idx = (index % chunk_size) as usize;
185 (chunk_idx, elem_idx)
186 }
187
188 fn ensure_capacity_for(&self, index: u64) -> Result<(), MmapVecDequeError> {
189 let meta = self.meta.lock();
190 let chunk_size = meta.chunk_size as u64;
191 let needed_chunk = index / chunk_size;
192 drop(meta);
193
194 let mut chunks = self.chunks.lock();
195 let base = *self.base_chunk.lock();
196 let current_count = chunks.len() as u64;
197 if current_count == 0 {
198 let (mmap, file) = self.open_chunk(needed_chunk, true)?;
199 chunks.push(Chunk { mmap, file });
200 drop(chunks);
201 *self.base_chunk.lock() = needed_chunk;
202 return Ok(());
203 }
204
205 let current_start_chunk = base;
206 let current_end_chunk = current_start_chunk + current_count - 1;
207
208 if needed_chunk > current_end_chunk {
209 for new_idx in (current_end_chunk + 1)..=needed_chunk {
211 let (mmap, file) = self.open_chunk(new_idx, true)?;
212 chunks.push(Chunk { mmap, file });
213 }
214 } else if needed_chunk < current_start_chunk {
215 for new_idx in (needed_chunk..current_start_chunk).rev() {
217 let (mmap, file) = self.open_chunk(new_idx, true)?;
218 chunks.insert(0, Chunk { mmap, file });
219 }
220 drop(chunks);
221 *self.base_chunk.lock() = needed_chunk;
222 return Ok(());
223 }
224 drop(chunks);
225 Ok(())
226 }
227
228 fn write_element(&self, index: u64, value: T) -> Result<(), MmapVecDequeError> {
229 self.ensure_capacity_for(index)?;
230 let (chunk_idx, elem_idx) = self.global_to_local(index);
231 let chunks = self.chunks.lock();
232 let meta = self.meta.lock();
233 let element_size = meta.element_size;
234 drop(meta);
235
236 if chunk_idx >= chunks.len() {
237 return Err(MmapVecDequeError::IndexOutOfRange);
238 }
239
240 let mmap = &chunks[chunk_idx].mmap;
241 let ptr = mmap.as_ptr() as *mut u8;
242 unsafe {
243 let elem_ptr = ptr.add(elem_idx * element_size) as *mut T;
244 ptr::write(elem_ptr, value);
245 }
246 *self.dirty.lock() = true;
247 Ok(())
248 }
249
250 fn read_element(&self, index: u64) -> Result<T, MmapVecDequeError> {
251 let (chunk_idx, elem_idx) = self.global_to_local(index);
252 let chunks = self.chunks.lock();
253 let meta = self.meta.lock();
254 let element_size = meta.element_size;
255 drop(meta);
256
257 if chunk_idx >= chunks.len() {
258 return Err(MmapVecDequeError::IndexOutOfRange);
259 }
260 let mmap = &chunks[chunk_idx].mmap;
261 let ptr = mmap.as_ptr();
262 unsafe {
263 let elem_ptr = ptr.add(elem_idx * element_size) as *const T;
264 Ok(ptr::read(elem_ptr))
265 }
266 }
267
268 pub fn len(&self) -> usize {
269 let meta = self.meta.lock();
270 meta.len()
271 }
272
273 pub fn is_empty(&self) -> bool {
274 self.len() == 0
275 }
276
277 pub fn push_back(&mut self, value: T) -> Result<(), MmapVecDequeError> {
278 let mut meta = self.meta.lock();
279 let pos = meta.end;
280 meta.end += 1;
281 drop(meta);
282
283 self.write_element(pos, value)
284 }
285
286 pub fn push_front(&mut self, value: T) -> Result<(), MmapVecDequeError> {
287 let mut meta = self.meta.lock();
288 meta.start = meta.start.checked_sub(1).ok_or_else(|| MmapVecDequeError::Other("Start index underflow".to_string()))?;
289 let pos = meta.start;
290 drop(meta);
291
292 self.write_element(pos, value)
293 }
294
295 pub fn pop_back(&mut self) -> Result<Option<T>, MmapVecDequeError> {
296 let mut meta = self.meta.lock();
297 if meta.start == meta.end {
298 return Ok(None);
299 }
300 let pos = meta.end - 1;
301 meta.end = pos;
302 drop(meta);
303
304 let val = self.read_element(pos)?;
305 Ok(Some(val))
306 }
307
308 pub fn pop_front(&mut self) -> Result<Option<T>, MmapVecDequeError> {
309 let mut meta = self.meta.lock();
310 if meta.start == meta.end {
311 return Ok(None);
312 }
313 let pos = meta.start;
314 meta.start = pos.checked_add(1).ok_or_else(|| MmapVecDequeError::Other("Start index overflow".to_string()))?;
315 drop(meta);
316
317 let val = self.read_element(pos)?;
318 Ok(Some(val))
319 }
320
321 pub fn front(&self) -> Option<T> where T: Clone {
322 if self.is_empty() {
323 return None;
324 }
325 self.get(0)
326 }
327
328 pub fn back(&self) -> Option<T> where T: Clone {
329 let l = self.len();
330 if l == 0 {
331 return None;
332 }
333 self.get(l - 1)
334 }
335
336 pub fn clear(&mut self) -> Result<(), MmapVecDequeError> {
337 let mut meta = self.meta.lock();
338 meta.start = LARGE_OFFSET;
339 meta.end = LARGE_OFFSET;
340 drop(meta);
341 *self.dirty.lock() = true;
342 Ok(())
343 }
344
345 pub fn get(&self, index: usize) -> Option<T> where T: Clone {
346 let meta = self.meta.lock();
347 if index >= meta.len() {
348 return None;
349 }
350 let global_idx = meta.start + index as u64;
351 drop(meta);
352
353 match self.read_element(global_idx) {
354 Ok(val) => Some(val),
355 Err(_) => None,
356 }
357 }
358
359 pub fn get_mut(&mut self, index: usize) -> Result<Option<T>, MmapVecDequeError> where T: Clone {
360 let meta = self.meta.lock();
361 if index >= meta.len() {
362 return Ok(None);
363 }
364 let global_idx = meta.start + index as u64;
365 drop(meta);
366
367 let mut chunks = self.chunks.lock();
368 let (chunk_idx, elem_idx) = self.global_to_local(global_idx);
369 if chunk_idx >= chunks.len() {
370 return Err(MmapVecDequeError::IndexOutOfRange);
371 }
372 let mmap = &mut chunks[chunk_idx].mmap;
373 let ptr = mmap.as_mut_ptr();
374 unsafe {
375 let elem_ptr = ptr.add(elem_idx * size_of::<T>()) as *mut T;
376 let value = ptr::read(elem_ptr);
377 Ok(Some(value))
378 }
379 }
380
381 pub fn commit(&self) -> Result<(), MmapVecDequeError> {
382 if *self.dirty.lock() {
383 self.flush_all_chunks()?;
384 *self.dirty.lock() = false;
385 }
386
387 let meta = self.meta.lock();
388 Self::atomic_write_metadata(&self.dir, &*meta)?;
389 drop(meta);
390
391 self.maybe_shrink_chunks()?;
392 Ok(())
393 }
394
395 fn maybe_shrink_chunks(&self) -> Result<(), MmapVecDequeError> {
396 let meta = self.meta.lock();
397 let chunk_size = meta.chunk_size as u64;
398 let start_chunk = meta.start / chunk_size;
399 let end_chunk = if meta.end == meta.start {
400 start_chunk
401 } else {
402 (meta.end - 1) / chunk_size
403 };
404 drop(meta);
405
406 let mut chunks = self.chunks.lock();
407 let base = *self.base_chunk.lock();
408 let mut current_len = chunks.len() as u64;
409 if current_len == 0 {
410 return Ok(());
411 }
412
413 let mut current_start_chunk = base;
414
415 while chunks.len() > 1 && current_start_chunk < start_chunk {
417 chunks.remove(0);
418 current_start_chunk += 1;
419 }
420
421 while chunks.len() > 1 {
423 current_len = chunks.len() as u64;
424 let current_end_chunk = current_start_chunk + current_len - 1;
425 if current_end_chunk > end_chunk {
426 chunks.pop();
427 } else {
428 break;
429 }
430 }
431
432 *self.base_chunk.lock() = current_start_chunk;
433
434 Ok(())
435 }
436
437 pub fn iter(&self) -> Iter<'_, T> {
438 let len = self.len();
439 let mut pointers = Vec::with_capacity(len);
440
441 let meta = self.meta.lock();
442 let start = meta.start;
443 let chunk_size = meta.chunk_size as u64;
444 let element_size = meta.element_size;
445 drop(meta);
446
447 let base = *self.base_chunk.lock();
448 let chunks = self.chunks.lock();
449 for i in 0..len {
450 let global_idx = start + i as u64;
451 let chunk_idx = ((global_idx / chunk_size) - base) as usize;
452 let elem_idx = (global_idx % chunk_size) as usize;
453 let mmap = &chunks[chunk_idx].mmap;
454 let ptr = mmap.as_ptr();
455 let elem_ptr = unsafe { ptr.add(elem_idx * element_size) as *const T };
456 pointers.push(elem_ptr);
457 }
458 drop(chunks);
459
460 Iter {
461 pointers,
462 index: 0,
463 len,
464 _marker: PhantomData
465 }
466 }
467
468 pub fn iter_mut(&mut self) -> IterMut<'_, T> {
469 let len = self.len();
470
471 let meta = self.meta.lock();
472 let start = meta.start;
473 let chunk_size = meta.chunk_size as u64;
474 let element_size = meta.element_size;
475 drop(meta);
476
477 let base = *self.base_chunk.lock();
478 let mut chunks_guard = self.chunks.lock();
481
482 let mut pointers = Vec::with_capacity(len);
483 for i in 0..len {
484 let global_idx = start + i as u64;
485 let chunk_idx = ((global_idx / chunk_size) - base) as usize;
486 let elem_idx = (global_idx % chunk_size) as usize;
487 let mmap = &mut chunks_guard[chunk_idx].mmap;
488 let ptr = mmap.as_mut_ptr();
489 let elem_ptr = unsafe { ptr.add(elem_idx * element_size) as *mut T };
490 pointers.push(elem_ptr);
491 }
492
493 IterMut {
494 pointers,
495 index: 0,
496 len,
497 _marker: PhantomData,
498 _chunks_guard: chunks_guard,
499 }
500 }
501}
502
503pub struct Iter<'a, T: Copy> {
504 pointers: Vec<*const T>,
505 index: usize,
506 len: usize,
507 _marker: PhantomData<&'a T>,
508}
509
510impl<'a, T: Copy> Iterator for Iter<'a, T> {
511 type Item = T;
512
513 fn next(&mut self) -> Option<Self::Item> {
514 if self.index < self.len {
515 let ptr = self.pointers[self.index];
516 self.index += 1;
517 unsafe { Some(*ptr) }
518 } else {
519 None
520 }
521 }
522}
523
524impl<'a, T: Copy> ExactSizeIterator for Iter<'a, T> {}
525
526pub struct IterMut<'a, T: Copy> {
527 pointers: Vec<*mut T>,
528 index: usize,
529 len: usize,
530 _marker: PhantomData<&'a mut T>,
531 _chunks_guard: parking_lot::MutexGuard<'a, Vec<Chunk>>,
533}
534
535impl<'a, T: Copy> Iterator for IterMut<'a, T> {
536 type Item = &'a mut T;
537
538 fn next(&mut self) -> Option<Self::Item> {
539 if self.index < self.len {
540 let ptr = self.pointers[self.index];
541 self.index += 1;
542 unsafe { Some(&mut *ptr) }
543 } else {
544 None
545 }
546 }
547}
548
549impl<'a, T: Copy> ExactSizeIterator for IterMut<'a, T> {}