1use core::iter::Extend;
2use core::sync::atomic::{AtomicU64, Ordering};
3use memmap2::MmapRaw;
4
5pub struct Writer {
7 pub(crate) head: Head,
8}
9
10pub struct File {
14 pub(crate) head: Head,
15}
16
17pub struct FileDiscovery<'lt> {
19 pub(crate) file: &'lt File,
20 pub(crate) configuration: ConfigureFile,
21}
22
23#[derive(Default, Debug)]
25pub struct ConfigureFile {
26 pub entries: u64,
28 pub data: u64,
30 pub initial_offset: u64,
32 pub(crate) layout_version: u64,
36}
37
38pub struct Head {
39 head: WriteHead,
40 #[allow(dead_code)]
43 file: MmapRaw,
44}
45
46#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
48pub struct Snapshot {
49 pub offset: u64,
51 pub length: u64,
54}
55
56pub(crate) trait Collect<T> {
57 fn insert_one(&mut self, _: T) -> bool;
58}
59
60impl<T> Collect<T> for Vec<T> {
61 fn insert_one(&mut self, val: T) -> bool {
62 self.push(val);
63 true
64 }
65}
66
67pub(crate) struct Entry<'lt> {
68 index: u64,
69 offset: u64,
70 length: u64,
71 head: &'lt mut WriteHead,
72}
73
74pub struct PreparedTransaction<'lt> {
76 offset: u64,
77 length: u64,
78 head: &'lt mut WriteHead,
79 tail: &'lt [DataPage],
80}
81
82pub(crate) struct WriteHead {
95 pub(crate) cache: HeadCache,
99 pub(crate) meta: &'static HeadPage,
100 pub(crate) sequence: &'static [SequencePage],
101 pub(crate) data: &'static [DataPage],
102 pub(crate) tail: &'static [DataPage],
104}
105
106struct HeadMapRaw {
107 meta: *const HeadPage,
108 sequence: *const [SequencePage],
109 data: *const [DataPage],
110}
111
112impl Head {
113 fn fitting_power_of_two(value: u64) -> u64 {
114 const HIGEST_BIT_SET: u64 = !((!0) >> 1);
115 HIGEST_BIT_SET >> value.leading_zeros()
117 }
118
119 pub(crate) fn discover(&self, cfg: &mut ConfigureFile) {
120 let entry_mask = self.head.meta.entry_mask.load(Ordering::Relaxed);
121 let data_mask = self.head.meta.page_mask.load(Ordering::Relaxed);
122 let page_write_offset = self.head.meta.page_write_offset.load(Ordering::Relaxed);
123
124 let layout_version = self.head.meta.version.load(Ordering::Relaxed);
125 assert!(entry_mask < usize::MAX as u64);
126 assert!(data_mask < usize::MAX as u64);
127
128 let sequence = (entry_mask + 1) as usize;
129 let pages = self.head.data.len();
131 let psequence = sequence / SequencePage::DATA_COUNT
132 + usize::from(sequence % SequencePage::DATA_COUNT != 0);
133
134 let data_space = (pages - psequence) as u64 * core::mem::size_of::<DataPage>() as u64;
135 let available_entries = Self::fitting_power_of_two(entry_mask + 1);
136 let available_data = Self::fitting_power_of_two(data_space);
137
138 cfg.entries = available_entries;
139 cfg.data = available_data.min(data_mask + 1);
140 cfg.initial_offset = page_write_offset;
141 cfg.layout_version = layout_version;
142 }
143
144 pub(crate) fn configure(&mut self, cfg: &ConfigureFile) {
145 Self::configure_head(&mut self.head, cfg)
146 }
147
148 fn configure_head(head: &mut WriteHead, cfg: &ConfigureFile) {
149 assert!(cfg.entries.next_power_of_two() == cfg.entries);
150 assert!(cfg.data.next_power_of_two() == cfg.data);
151 assert!(cfg.is_initialized());
152
153 head.pre_configure_entries(cfg.entries);
154 head.pre_configure_pages(cfg.data);
155 head.pre_configure_write(cfg.initial_offset);
156 head.configure_pages();
157 }
158
159 #[inline(always)]
160 pub(crate) fn valid(&self, into: &mut impl Extend<Snapshot>) {
161 Self::valid_in_head(&self.head, into)
162 }
163
164 pub(crate) fn valid_at(&self, into: &mut impl Extend<Snapshot>, cfg: &ConfigureFile) {
165 let mut alternate_head = WriteHead {
166 cache: HeadCache { ..self.head.cache },
167 ..self.head
168 };
169
170 Self::configure_head(&mut alternate_head, cfg);
171 Self::valid_in_head(&alternate_head, into);
172 }
173
174 pub(crate) fn retain_at(&self, retain: &dyn super::RetainSnapshot, cfg: &ConfigureFile) {
175 let mut alternate_head = WriteHead {
176 cache: HeadCache { ..self.head.cache },
177 ..self.head
178 };
179
180 Self::configure_head(&mut alternate_head, cfg);
181 Self::retain_in_head(&alternate_head, retain);
182 }
183
184 pub(crate) fn entry_at(&self, idx: super::SnapshotIndex) -> Snapshot {
185 let snapshot = self.head.entry_at_relaxed(idx.entry);
186 core::sync::atomic::fence(Ordering::Acquire);
187 snapshot
188 }
189
190 fn valid_in_head(head: &WriteHead, into: &mut impl Extend<Snapshot>) {
191 struct Collector<T>(T);
192
193 impl<T, V> Collect<T> for Collector<&'_ mut V>
194 where
195 V: Extend<T>,
196 {
197 fn insert_one(&mut self, val: T) -> bool {
198 self.0.extend(core::iter::once(val));
199 true
200 }
201 }
202
203 head.iter_valid(&mut Collector(into), Ordering::Relaxed);
205 }
206
207 fn retain_in_head(head: &WriteHead, into: &dyn super::RetainSnapshot) {
208 struct Retain<'lt>(&'lt dyn super::RetainSnapshot);
209
210 impl Collect<Snapshot> for Retain<'_> {
211 fn insert_one(&mut self, val: Snapshot) -> bool {
212 self.0.contains(&val)
213 }
214 }
215
216 head.iter_valid(&mut Retain(into), Ordering::Relaxed);
217 }
218
219 pub(crate) fn read(&self, snapshot: &Snapshot, into: &mut [u8]) {
220 self.head.read(snapshot, into);
221 }
222
223 pub(crate) fn read_at(&self, snapshot: &Snapshot, into: &mut [u8], cfg: &ConfigureFile) {
224 let mut alternate_head = WriteHead {
225 cache: HeadCache { ..self.head.cache },
226 ..self.head
227 };
228
229 Self::configure_head(&mut alternate_head, cfg);
230 alternate_head.read(snapshot, into);
231 }
232
233 pub(crate) fn from_map(file: MmapRaw) -> Self {
235 static FALLBACK_HEAD: HeadPage = HeadPage {
241 version: AtomicU64::new(ConfigureFile::MAGIC_VERSION),
242 entry_mask: AtomicU64::new(0),
243 page_mask: AtomicU64::new(0),
244 page_write_offset: AtomicU64::new(0),
245 };
246
247 let ptr = file.as_mut_ptr();
248 let len = file.len();
249
250 let head = if let Some(head) = unsafe { Self::map_all_raw(ptr, len) } {
251 unsafe {
259 WriteHead {
260 cache: HeadCache::new(),
261 meta: &*head.meta,
262 sequence: &*head.sequence,
263 data: &*head.data,
264 tail: &[],
265 }
266 }
267 } else {
268 WriteHead {
269 cache: HeadCache::new(),
270 meta: &FALLBACK_HEAD,
271 data: &[],
272 sequence: &[],
273 tail: &[],
274 }
275 };
276
277 Head { head, file }
278 }
279
280 pub(crate) fn tail(&self) -> &'_ [AtomicU64] {
281 DataPage::as_slice_of_u64(self.head.tail)
282 }
283
284 unsafe fn map_all_raw(ptr: *mut u8, len: usize) -> Option<HeadMapRaw> {
289 let tail_len = len.checked_sub(HeadPage::PAGE_SZ)?;
290 let tail = ptr.add(HeadPage::PAGE_SZ);
291
292 let sequence_ptr = tail as *const SequencePage;
293 let sequence_len = tail_len / core::mem::size_of::<SequencePage>();
294 let data_ptr = tail as *const DataPage;
295 let data_len = tail_len / core::mem::size_of::<DataPage>();
296
297 Some(HeadMapRaw {
298 meta: ptr as *const HeadPage,
299 sequence: core::ptr::slice_from_raw_parts(sequence_ptr, sequence_len),
300 data: core::ptr::slice_from_raw_parts(data_ptr, data_len),
301 })
302 }
303}
304
305impl ConfigureFile {
306 pub(crate) const MAGIC_VERSION: u64 = 0x96c2_a6f4b68519b3;
307
308 pub fn is_initialized(&self) -> bool {
310 self.layout_version == Self::MAGIC_VERSION
311 }
312
313 pub fn or_insert_with(&mut self, replace: impl FnOnce(&mut Self)) {
315 if !self.is_initialized() {
316 replace(self);
317 self.layout_version = ConfigureFile::MAGIC_VERSION;
318 }
319 }
320}
321
322impl Head {
323 pub(crate) fn write_with(
324 &mut self,
325 data: &[u8],
326 intermediate: &mut dyn FnMut(PreparedTransaction) -> bool,
327 ) -> Result<u64, ()> {
328 let mut entry = self.head.entry();
329 let Some(end_ptr) = entry.new_write_offset(data.len()) else {
330 return Err(());
331 };
332
333 entry.invalidate_heads(end_ptr);
334 entry.copy_from_slice(data);
335
336 if intermediate(PreparedTransaction {
337 offset: entry.offset,
338 length: entry.length,
339 tail: entry.head.tail,
340 head: entry.head,
341 }) {
342 Ok(entry.commit())
343 } else {
344 Err(())
345 }
346 }
347}
348
349impl WriteHead {
350 pub(crate) fn pre_configure_entries(&mut self, num: u64) {
351 assert!(num.next_power_of_two() == num);
352 self.cache.entry_mask = num - 1;
353 }
354
355 pub(crate) fn pre_configure_pages(&mut self, num: u64) {
356 assert!(num.next_power_of_two() == num);
357 self.cache.page_mask = num - 1;
358 }
359
360 pub(crate) fn pre_configure_write(&mut self, offset: u64) {
361 self.cache.page_write_offset = offset;
362 }
363
364 pub(crate) fn configure_pages(&mut self) {
365 assert_eq!(
366 core::mem::size_of::<DataPage>(),
367 core::mem::size_of::<SequencePage>()
368 );
369
370 let sequence: usize = (self.cache.entry_mask + 1)
371 .try_into()
372 .expect("Invalid configured entry mask");
373 let sequence = sequence.next_power_of_two();
374
375 let data: usize = (self.cache.page_mask + 1)
376 .try_into()
377 .expect("Invalid configured page mask");
378 let data = data.next_power_of_two();
379
380 let psequence = sequence / SequencePage::DATA_COUNT
381 + usize::from(sequence % SequencePage::DATA_COUNT != 0);
382 let pdata = data / core::mem::size_of::<DataPage>()
383 + usize::from(data % core::mem::size_of::<DataPage>() != 0);
384
385 self.sequence = &self.sequence[..psequence];
386 let (data, tail) = self.data[psequence..].split_at(pdata);
387 self.data = data;
388 self.tail = tail;
389
390 self.meta
391 .entry_mask
392 .store(self.cache.entry_mask, Ordering::Relaxed);
393 self.meta
394 .page_mask
395 .store(self.cache.page_mask, Ordering::Relaxed);
396 self.meta
397 .page_write_offset
398 .store(self.cache.page_write_offset, Ordering::Relaxed);
399
400 self.meta
401 .version
402 .store(ConfigureFile::MAGIC_VERSION, Ordering::Release);
403 }
404
405 pub(crate) fn entry(&mut self) -> Entry<'_> {
406 let index = self.cache.entry_write_offset;
407 let offset = self.cache.page_write_offset;
408 Entry {
409 head: self,
410 length: 0,
411 index,
412 offset,
413 }
414 }
415
416 pub(crate) fn iter_valid(&self, extend: &mut dyn Collect<Snapshot>, ordering: Ordering) {
417 let max = self.meta.entry_mask.load(ordering);
422 let seqs = self.sequence.iter().flat_map(|seq| &seq.data);
423
424 for (idx, seq) in seqs.enumerate() {
425 if idx as u64 > max {
426 break;
427 }
428
429 let length = seq.length.load(ordering);
430
431 if length == 0 {
432 continue;
433 }
434
435 if !extend.insert_one(Snapshot {
436 length,
437 offset: seq.offset.load(ordering),
438 }) {
439 seq.length.store(0, ordering);
440 }
441 }
442 }
443
444 pub(crate) fn new_write_offset(&self, n: usize) -> Option<u64> {
445 let len = u64::try_from(n);
446 if let Some(len) = len.ok().filter(|&l| l <= self.cache.page_mask) {
447 Some(self.cache.page_write_offset.wrapping_add(len))
448 } else {
449 None
450 }
451 }
452
453 pub(crate) fn invalidate_heads_to(&mut self, end: u64) {
455 let mut entry = self.cache.entry_read_offset;
456 let mut data = self.cache.page_read_offset;
457
458 loop {
459 if data >= end {
460 break;
461 }
462
463 if entry == self.cache.entry_write_offset {
465 data = end;
466 break;
467 }
468
469 let length = self.invalidate_at(entry);
470 entry = entry.wrapping_add(1);
471 data = data.wrapping_add(length);
472 }
473
474 self.cache.entry_read_offset = entry;
475 self.cache.page_read_offset = data;
476 }
477
478 pub(crate) fn copy_from_slice(&mut self, data: &[u8]) -> u64 {
479 let mut n = self.cache.page_write_offset;
480
481 for (&b, idx) in data.iter().zip(n..) {
482 self.write_at(idx, b);
483 n = n.wrapping_add(1);
484 }
485
486 let count = n.wrapping_sub(self.cache.page_write_offset);
487 self.cache.page_write_offset = n;
488 count
489 }
490
491 pub(crate) fn read(&self, snapshot: &Snapshot, into: &mut [u8]) {
492 for (b, offset) in into.iter_mut().zip(0..snapshot.length) {
493 let idx = snapshot.offset.wrapping_add(offset);
494 *b = self.read_at(idx);
495 }
496 }
497
498 fn get_entry_atomic(&self, idx: u64) -> &SequenceEntry {
499 let idx = (idx & self.cache.entry_mask) as usize;
500
501 let page = idx / SequencePage::DATA_COUNT;
502 let entry = idx % SequencePage::DATA_COUNT;
503
504 &self.sequence[page].data[entry]
505 }
506
507 fn invalidate_at(&mut self, idx: u64) -> u64 {
508 let entry = self.get_entry_atomic(idx);
509 entry.length.swap(0, Ordering::Relaxed)
510 }
511
512 fn insert_at(&mut self, idx: u64, snap: Snapshot) {
513 let entry = self.get_entry_atomic(idx);
514
515 entry.offset.store(snap.offset, Ordering::Release);
516 entry.length.store(snap.length, Ordering::Release);
517 }
518
519 fn entry_at_relaxed(&self, idx: u64) -> Snapshot {
520 let entry = self.get_entry_atomic(idx);
521
522 Snapshot {
523 offset: entry.offset.load(Ordering::Relaxed),
524 length: entry.length.load(Ordering::Relaxed),
525 }
526 }
527
528 fn idx_at(&self, idx: u64) -> (usize, usize, u32) {
529 let idx = idx & self.cache.page_mask;
530
531 let offset = idx % 8;
532 let idx = idx / 8;
533 let shift = 8 * offset;
534
535 let data_idx = idx as usize % DataPage::DATA_COUNT;
536 let page_idx = idx as usize / DataPage::DATA_COUNT;
537 (page_idx, data_idx, shift as u32)
538 }
539
540 fn write_at(&self, idx: u64, byte: u8) {
541 let (page_idx, data_idx, shift) = self.idx_at(idx);
542 let word = &self.data[page_idx].data[data_idx];
543 let mask = 0xffu64 << shift;
544
545 let old = word.load(Ordering::Relaxed) & !mask;
546 let new = old | (u64::from(byte) << shift);
547 word.store(new, Ordering::Relaxed);
548 }
549
550 fn read_at(&self, idx: u64) -> u8 {
551 let (page_idx, data_idx, shift) = self.idx_at(idx);
552
553 let word = &self.data[page_idx].data[data_idx];
554 let old = word.load(Ordering::Relaxed);
555
556 ((old >> shift) & 0xff) as u8
557 }
558}
559
560impl Entry<'_> {
561 pub(crate) fn commit(self) -> u64 {
563 let end = self.head.cache.page_write_offset;
564 self.head
565 .meta
566 .page_write_offset
567 .store(end, Ordering::Relaxed);
568
569 debug_assert!(
570 end.wrapping_sub(self.offset) >= self.length,
571 "Failed to reserve enough space in the data section for the entry, risking corrupted data with following writes"
572 );
573
574 self.head.insert_at(
575 self.index,
576 Snapshot {
577 length: self.length,
578 offset: self.offset,
579 },
580 );
581
582 self.index
583 }
584
585 pub(crate) fn new_write_offset(&self, n: usize) -> Option<u64> {
586 self.head.new_write_offset(n)
587 }
588
589 pub(crate) fn invalidate_heads(&mut self, end: u64) {
590 self.head.invalidate_heads_to(end);
591 }
592
593 pub(crate) fn copy_from_slice(&mut self, data: &[u8]) {
594 self.length += self.head.copy_from_slice(data);
595 }
596}
597
598impl<'lt> PreparedTransaction<'lt> {
599 pub fn replace(&mut self, data: &[u8]) {
600 assert!(
601 data.len() as u64 <= self.length,
602 "{} > {}",
603 data.len(),
604 self.length
605 );
606 let mut n = self.offset;
607
608 for (&b, idx) in data.iter().zip(n..) {
609 self.head.write_at(idx, b);
610 n = n.wrapping_add(1);
611 }
612 }
613
614 pub fn tail(&self) -> &'lt [AtomicU64] {
615 DataPage::as_slice_of_u64(self.tail)
616 }
617}
618
619pub(crate) struct HeadCache {
620 entry_mask: u64,
621 entry_read_offset: u64,
622 entry_write_offset: u64,
623 page_mask: u64,
624 page_write_offset: u64,
625 page_read_offset: u64,
626}
627
628impl HeadCache {
629 pub(crate) fn new() -> Self {
630 HeadCache {
631 entry_mask: 0,
632 entry_read_offset: 0,
633 entry_write_offset: 0,
634 page_mask: 0,
635 page_write_offset: 0,
636 page_read_offset: 0,
637 }
638 }
639}
640
641#[derive(Default)]
642pub(crate) struct HeadPage {
643 version: AtomicU64,
645 entry_mask: AtomicU64,
647 page_mask: AtomicU64,
649 page_write_offset: AtomicU64,
651}
652
653impl HeadPage {
654 const PAGE_SZ: usize = 4096;
655}
656
657pub(crate) struct SequencePage {
658 data: [SequenceEntry; Self::DATA_COUNT],
659}
660
661struct SequenceEntry {
662 offset: AtomicU64,
663 length: AtomicU64,
664}
665
666impl Default for SequencePage {
667 fn default() -> Self {
668 SequencePage {
669 data: [0; Self::DATA_COUNT].map(|_i| SequenceEntry {
670 offset: AtomicU64::new(0),
671 length: AtomicU64::new(0),
672 }),
673 }
674 }
675}
676
677impl SequencePage {
678 const DATA_COUNT: usize = 4096 / 16;
682}
683
684pub struct DataPage {
685 pub data: [AtomicU64; Self::DATA_COUNT],
686}
687
688impl DataPage {
689 const DATA_COUNT: usize = 4096 / 8;
691
692 pub fn as_slice_of_u64(this: &[DataPage]) -> &[AtomicU64] {
693 let count = Self::DATA_COUNT * this.len();
694 unsafe { &*core::ptr::slice_from_raw_parts(this.as_ptr() as *const AtomicU64, count) }
695 }
696}
697
698impl Default for DataPage {
699 fn default() -> Self {
700 DataPage {
701 data: [0; Self::DATA_COUNT].map(|_i| AtomicU64::new(0)),
702 }
703 }
704}