1use crate::{
5 AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6 SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25 buffer: &'static mut [u8],
26 offset: usize,
27 block_size: usize,
28}
29
30impl MmapSectionStorage {
31 pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32 Self {
33 buffer,
34 offset: 0,
35 block_size,
36 }
37 }
38
39 pub fn buffer_ptr(&self) -> *const u8 {
40 &self.buffer[0] as *const u8
41 }
42}
43
44impl SectionStorage for MmapSectionStorage {
45 fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46 self.post_update_header(header)?;
47 self.offset = self.block_size;
48 Ok(self.offset)
49 }
50
51 fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52 encode_into_slice(header, &mut self.buffer[0..], standard())
53 }
54
55 fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56 let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57 self.offset += size;
58 Ok(size)
59 }
60
61 fn flush(&mut self) -> CuResult<usize> {
62 Ok(self.offset)
64 }
65}
66
67pub enum MmapUnifiedLogger {
70 Read(MmapUnifiedLoggerRead),
71 Write(MmapUnifiedLoggerWrite),
72}
73
74pub struct MmapUnifiedLoggerBuilder {
76 file_base_name: Option<PathBuf>,
77 preallocated_size: Option<usize>,
78 write: bool,
79 create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl MmapUnifiedLoggerBuilder {
89 pub fn new() -> Self {
90 Self {
91 file_base_name: None,
92 preallocated_size: None,
93 write: false,
94 create: false, }
96 }
97
98 pub fn file_base_name(mut self, file_path: &Path) -> Self {
100 self.file_base_name = Some(file_path.to_path_buf());
101 self
102 }
103
104 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105 self.preallocated_size = Some(preallocated_size);
106 self
107 }
108
109 pub fn write(mut self, write: bool) -> Self {
110 self.write = write;
111 self
112 }
113
114 pub fn create(mut self, create: bool) -> Self {
115 self.create = create;
116 self
117 }
118
119 pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120 let page_size = page_size::get();
121
122 if self.write && self.create {
123 let file_path = self.file_base_name.ok_or_else(|| {
124 io::Error::new(
125 io::ErrorKind::InvalidInput,
126 "File path is required for write mode",
127 )
128 })?;
129 let preallocated_size = self.preallocated_size.ok_or_else(|| {
130 io::Error::new(
131 io::ErrorKind::InvalidInput,
132 "Preallocated size is required for write mode",
133 )
134 })?;
135 let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
136 Ok(MmapUnifiedLogger::Write(ulw))
137 } else {
138 let file_path = self.file_base_name.ok_or_else(|| {
139 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
140 })?;
141 let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
142 Ok(MmapUnifiedLogger::Read(ulr))
143 }
144 }
145}
146
147struct SlabEntry {
148 file: File,
149 mmap_buffer: ManuallyDrop<MmapMut>,
150 current_global_position: usize,
151 sections_offsets_in_flight: Vec<usize>,
152 flushed_until_offset: usize,
153 page_size: usize,
154 temporary_end_marker: Option<usize>,
155}
156
157impl Drop for SlabEntry {
158 fn drop(&mut self) {
159 self.flush_until(self.current_global_position);
160 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
162 if let Err(error) = self.file.set_len(self.current_global_position as u64) {
163 eprintln!("Failed to trim datalogger file: {}", error);
164 }
165
166 if !self.sections_offsets_in_flight.is_empty() {
167 eprintln!("Error: Slab not full flushed.");
168 }
169 }
170}
171
172impl SlabEntry {
173 fn new(file: File, page_size: usize) -> io::Result<Self> {
174 let mmap_buffer = ManuallyDrop::new(
175 unsafe { MmapMut::map_mut(&file) }
177 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
178 );
179 Ok(Self {
180 file,
181 mmap_buffer,
182 current_global_position: 0,
183 sections_offsets_in_flight: Vec::with_capacity(16),
184 flushed_until_offset: 0,
185 page_size,
186 temporary_end_marker: None,
187 })
188 }
189
190 fn flush_until(&mut self, until_position: usize) {
192 if (self.flushed_until_offset == until_position) || (until_position == 0) {
194 return;
195 }
196 self.mmap_buffer
197 .flush_async_range(
198 self.flushed_until_offset,
199 until_position - self.flushed_until_offset,
200 )
201 .expect("Failed to flush memory map");
202 self.flushed_until_offset = until_position;
203 }
204
205 fn clear_temporary_end_marker(&mut self) {
206 if let Some(marker_start) = self.temporary_end_marker.take() {
207 self.current_global_position = marker_start;
208 if self.flushed_until_offset > marker_start {
209 self.flushed_until_offset = marker_start;
210 }
211 }
212 }
213
214 fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
215 let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
216 let marker_start = self.align_to_next_page(self.current_global_position);
217 let total_marker_size = block_size; let marker_end = marker_start + total_marker_size;
219 if marker_end > self.mmap_buffer.len() {
220 return Err("Not enough space to write end-of-log marker".into());
221 }
222
223 let header = SectionHeader {
224 magic: SECTION_MAGIC,
225 block_size: SECTION_HEADER_COMPACT_SIZE,
226 entry_type: UnifiedLogType::LastEntry,
227 offset_to_next_section: total_marker_size as u32,
228 used: 0,
229 is_open: temporary,
230 };
231
232 encode_into_slice(
233 &header,
234 &mut self.mmap_buffer
235 [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
236 standard(),
237 )
238 .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
239
240 self.temporary_end_marker = Some(marker_start);
241 self.current_global_position = marker_end;
242 Ok(())
243 }
244
245 fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
246 let storage = section.get_storage();
247 let ptr = storage.buffer_ptr();
248 (ptr >= self.mmap_buffer.as_ptr())
249 && (ptr as usize)
250 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
251 }
252
253 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
256 section
257 .post_update_header()
258 .expect("Failed to update section header");
259
260 let storage = section.get_storage();
261 let ptr = storage.buffer_ptr();
262
263 if ptr < self.mmap_buffer.as_ptr()
264 || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
265 {
266 panic!("Invalid section buffer, not in the slab");
267 }
268
269 let base = self.mmap_buffer.as_ptr() as usize;
270 self.sections_offsets_in_flight
271 .retain(|&x| x != ptr as usize - base);
272
273 if self.sections_offsets_in_flight.is_empty() {
274 self.flush_until(self.current_global_position);
275 return;
276 }
277 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
278 self.flush_until(self.sections_offsets_in_flight[0]);
279 }
280 }
281
282 #[inline]
283 fn align_to_next_page(&self, ptr: usize) -> usize {
284 (ptr + self.page_size - 1) & !(self.page_size - 1)
285 }
286
287 fn add_section(
289 &mut self,
290 entry_type: UnifiedLogType,
291 requested_section_size: usize,
292 ) -> AllocatedSection<MmapSectionStorage> {
293 self.current_global_position = self.align_to_next_page(self.current_global_position);
295 let section_size = self.align_to_next_page(requested_section_size) as u32;
296
297 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
299 return AllocatedSection::NoMoreSpace;
300 }
301
302 #[cfg(feature = "compact")]
303 let block_size = SECTION_HEADER_COMPACT_SIZE;
304
305 #[cfg(not(feature = "compact"))]
306 let block_size = self.page_size as u16;
307
308 let section_header = SectionHeader {
309 magic: SECTION_MAGIC,
310 block_size,
311 entry_type,
312 offset_to_next_section: section_size,
313 used: 0u32,
314 is_open: true,
315 };
316
317 self.sections_offsets_in_flight
319 .push(self.current_global_position);
320 let end_of_section = self.current_global_position + requested_section_size;
321 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
322
323 let handle_buffer =
325 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
326 let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
327
328 self.current_global_position = end_of_section;
329
330 Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
331 }
332
333 #[cfg(test)]
334 fn used(&self) -> usize {
335 self.current_global_position
336 }
337}
338
339pub struct MmapUnifiedLoggerWrite {
341 front_slab: SlabEntry,
343 back_slabs: Vec<SlabEntry>,
345 base_file_path: PathBuf,
347 slab_size: usize,
349 front_slab_suffix: usize,
351}
352
353fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
354 let mut file_path = base_file_path.to_path_buf();
355 let stem = file_path.file_stem().ok_or_else(|| {
356 io::Error::new(
357 io::ErrorKind::InvalidInput,
358 "Base file path has no file name",
359 )
360 })?;
361 let stem = stem.to_str().ok_or_else(|| {
362 io::Error::new(
363 io::ErrorKind::InvalidInput,
364 "Base file name is not valid UTF-8",
365 )
366 })?;
367 let extension = file_path.extension().ok_or_else(|| {
368 io::Error::new(
369 io::ErrorKind::InvalidInput,
370 "Base file path has no extension",
371 )
372 })?;
373 let extension = extension.to_str().ok_or_else(|| {
374 io::Error::new(
375 io::ErrorKind::InvalidInput,
376 "Base file extension is not valid UTF-8",
377 )
378 })?;
379 if stem.is_empty() {
380 return Err(io::Error::new(
381 io::ErrorKind::InvalidInput,
382 "Base file name is empty",
383 ));
384 }
385 let file_name = format!("{stem}_{slab_index}.{extension}");
386 file_path.set_file_name(file_name);
387 Ok(file_path)
388}
389
390fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
391 let file_path = build_slab_path(base_file_path, slab_suffix)?;
392 let file = OpenOptions::new()
393 .read(true)
394 .write(true)
395 .create(true)
396 .truncate(true)
397 .open(&file_path)
398 .map_err(|e| {
399 io::Error::new(
400 e.kind(),
401 format!("Failed to open file {}: {e}", file_path.display()),
402 )
403 })?;
404 file.set_len(slab_size as u64).map_err(|e| {
405 io::Error::new(
406 e.kind(),
407 format!("Failed to set file length for {}: {e}", file_path.display()),
408 )
409 })?;
410 Ok(file)
411}
412
413fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
414 match std::fs::symlink_metadata(base_file_path) {
415 Ok(meta) => {
416 if meta.is_dir() {
417 return Err(io::Error::new(
418 io::ErrorKind::AlreadyExists,
419 format!(
420 "Cannot create base log alias at {} because a directory already exists there",
421 base_file_path.display()
422 ),
423 ));
424 }
425 std::fs::remove_file(base_file_path).map_err(|e| {
426 io::Error::new(
427 e.kind(),
428 format!(
429 "Failed to remove existing base log alias {}: {e}",
430 base_file_path.display()
431 ),
432 )
433 })
434 }
435 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
436 Err(e) => Err(io::Error::new(
437 e.kind(),
438 format!(
439 "Failed to inspect existing base log alias {}: {e}",
440 base_file_path.display()
441 ),
442 )),
443 }
444}
445
446fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
447 let first_slab_path = build_slab_path(base_file_path, 0)?;
448 remove_existing_alias(base_file_path)?;
449
450 #[cfg(unix)]
451 {
452 use std::os::unix::fs::symlink;
453 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
454 io::Error::new(
455 io::ErrorKind::InvalidInput,
456 "First slab file has no name component",
457 )
458 })?);
459 symlink(relative_target, base_file_path).map_err(|e| {
460 io::Error::new(
461 e.kind(),
462 format!(
463 "Failed to create base log alias {} -> {}: {e}",
464 base_file_path.display(),
465 first_slab_path.display()
466 ),
467 )
468 })
469 }
470
471 #[cfg(windows)]
472 {
473 use std::os::windows::fs::symlink_file;
474 let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
475 io::Error::new(
476 io::ErrorKind::InvalidInput,
477 "First slab file has no name component",
478 )
479 })?);
480 match symlink_file(relative_target, base_file_path) {
481 Ok(()) => Ok(()),
482 Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
483 |hard_link_err| {
484 io::Error::other(format!(
485 "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
486 base_file_path.display()
487 ))
488 },
489 ),
490 }?;
491 Ok(())
492 }
493
494 #[cfg(not(any(unix, windows)))]
495 {
496 std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
497 io::Error::new(
498 e.kind(),
499 format!(
500 "Failed to create base log alias {} -> {}: {e}",
501 base_file_path.display(),
502 first_slab_path.display()
503 ),
504 )
505 })
506 }
507}
508
509impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
510 fn add_section(
512 &mut self,
513 entry_type: UnifiedLogType,
514 requested_section_size: usize,
515 ) -> CuResult<SectionHandle<MmapSectionStorage>> {
516 self.garbage_collect_backslabs(); self.front_slab.clear_temporary_end_marker();
518 let maybe_section = self
519 .front_slab
520 .add_section(entry_type, requested_section_size);
521
522 match maybe_section {
523 AllocatedSection::NoMoreSpace => {
524 let new_slab = self.create_slab()?;
526 self.back_slabs
528 .push(mem::replace(&mut self.front_slab, new_slab));
529 match self
530 .front_slab
531 .add_section(entry_type, requested_section_size)
532 {
533 AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
534 Section(section) => {
535 self.place_end_marker(true)?;
536 Ok(section)
537 }
538 }
539 }
540 Section(section) => {
541 self.place_end_marker(true)?;
542 Ok(section)
543 }
544 }
545 }
546
547 fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
548 section.mark_closed();
549 for slab in self.back_slabs.iter_mut() {
550 if slab.is_it_my_section(section) {
551 slab.flush_section(section);
552 return;
553 }
554 }
555 self.front_slab.flush_section(section);
556 }
557
558 fn status(&self) -> UnifiedLogStatus {
559 UnifiedLogStatus {
560 total_used_space: self.front_slab.current_global_position,
561 total_allocated_space: self.slab_size * self.front_slab_suffix,
562 }
563 }
564}
565
566impl MmapUnifiedLoggerWrite {
567 fn next_slab(&mut self) -> io::Result<File> {
568 let next_suffix = self.front_slab_suffix + 1;
569 let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
570 self.front_slab_suffix = next_suffix;
571 Ok(file)
572 }
573
574 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
575 let file = make_slab_file(base_file_path, slab_size, 0)?;
576 create_base_alias_link(base_file_path)?;
577 let mut front_slab = SlabEntry::new(file, page_size)?;
578
579 let main_header = MainHeader {
581 magic: MAIN_MAGIC,
582 first_section_offset: page_size as u16,
583 page_size: page_size as u16,
584 };
585 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
586 .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
587 assert!(nb_bytes < page_size);
588 front_slab.current_global_position = page_size; Ok(Self {
591 front_slab,
592 back_slabs: Vec::new(),
593 base_file_path: base_file_path.to_path_buf(),
594 slab_size,
595 front_slab_suffix: 0,
596 })
597 }
598
599 fn garbage_collect_backslabs(&mut self) {
600 self.back_slabs
601 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
602 }
603
604 fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
605 match self.front_slab.write_end_marker(temporary) {
606 Ok(_) => Ok(()),
607 Err(_) => {
608 let new_slab = self.create_slab()?;
610 self.back_slabs
611 .push(mem::replace(&mut self.front_slab, new_slab));
612 self.front_slab.write_end_marker(temporary)
613 }
614 }
615 }
616
617 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
618 (
619 self.front_slab.current_global_position,
620 self.front_slab.sections_offsets_in_flight.clone(),
621 self.back_slabs.len(),
622 )
623 }
624
625 fn create_slab(&mut self) -> CuResult<SlabEntry> {
626 let file = self
627 .next_slab()
628 .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
629 SlabEntry::new(file, self.front_slab.page_size)
630 .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
631 }
632}
633
634impl Drop for MmapUnifiedLoggerWrite {
635 fn drop(&mut self) {
636 #[cfg(debug_assertions)]
637 eprintln!("Flushing the unified Logger ... "); self.front_slab.clear_temporary_end_marker();
640 if let Err(e) = self.place_end_marker(false) {
641 panic!("Failed to flush the unified logger: {}", e);
642 }
643 self.front_slab
644 .flush_until(self.front_slab.current_global_position);
645 self.garbage_collect_backslabs();
646 #[cfg(debug_assertions)]
647 eprintln!("Unified Logger flushed."); }
649}
650
651fn open_slab_index(
652 base_file_path: &Path,
653 slab_index: usize,
654) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
655 let mut options = OpenOptions::new();
656 let options = options.read(true);
657
658 let file_path = build_slab_path(base_file_path, slab_index)?;
659 let file = options.open(&file_path).map_err(|e| {
660 io::Error::new(
661 e.kind(),
662 format!("Failed to open slab file {}: {e}", file_path.display()),
663 )
664 })?;
665 let mmap = unsafe { Mmap::map(&file) }
667 .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
668 let mut prolog = 0u16;
669 let mut maybe_main_header: Option<MainHeader> = None;
670 if slab_index == 0 {
671 let main_header: MainHeader;
672 let _read: usize;
673 (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
674 io::Error::new(
675 io::ErrorKind::InvalidData,
676 format!("Failed to decode main header: {e}"),
677 )
678 })?;
679 if main_header.magic != MAIN_MAGIC {
680 return Err(io::Error::new(
681 io::ErrorKind::InvalidData,
682 "Invalid magic number in main header",
683 ));
684 }
685 prolog = main_header.first_section_offset;
686 maybe_main_header = Some(main_header);
687 }
688 Ok((file, mmap, prolog, maybe_main_header))
689}
690
691pub struct MmapUnifiedLoggerRead {
693 base_file_path: PathBuf,
694 main_header: MainHeader,
695 current_mmap_buffer: Mmap,
696 current_file: File,
697 current_slab_index: usize,
698 current_reading_position: usize,
699}
700
701#[derive(Clone, Copy, Debug, PartialEq, Eq)]
703pub struct LogPosition {
704 pub slab_index: usize,
705 pub offset: usize,
706}
707
708impl UnifiedLogRead for MmapUnifiedLoggerRead {
709 fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
710 loop {
712 if self.current_reading_position >= self.current_mmap_buffer.len() {
713 self.next_slab().map_err(|e| {
714 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
715 })?;
716 }
717
718 let header_result = self.read_section_header();
719 let header = header_result.map_err(|error| {
720 CuError::new_with_cause(
721 &format!(
722 "Could not read a sections header: {}/{}:{}",
723 self.base_file_path.as_os_str().to_string_lossy(),
724 self.current_slab_index,
725 self.current_reading_position,
726 ),
727 error,
728 )
729 })?;
730
731 if header.entry_type == UnifiedLogType::LastEntry {
733 return Ok(None);
734 }
735
736 if header.entry_type == datalogtype {
738 let result = Some(self.read_section_content(&header)?);
739 self.current_reading_position += header.offset_to_next_section as usize;
740 return Ok(result);
741 }
742
743 self.current_reading_position += header.offset_to_next_section as usize;
745 }
746 }
747
748 fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
750 if self.current_reading_position >= self.current_mmap_buffer.len() {
751 self.next_slab().map_err(|e| {
752 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
753 })?;
754 }
755
756 let read_result = self.read_section_header();
757
758 match read_result {
759 Err(error) => Err(CuError::new_with_cause(
760 &format!(
761 "Could not read a sections header: {}/{}:{}",
762 self.base_file_path.as_os_str().to_string_lossy(),
763 self.current_slab_index,
764 self.current_reading_position,
765 ),
766 error,
767 )),
768 Ok(header) => {
769 let data = self.read_section_content(&header)?;
770 self.current_reading_position += header.offset_to_next_section as usize;
771 Ok((header, data))
772 }
773 }
774 }
775}
776
777impl MmapUnifiedLoggerRead {
778 pub fn new(base_file_path: &Path) -> io::Result<Self> {
779 let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
780 let main_header = header.ok_or_else(|| {
781 io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
782 })?;
783
784 Ok(Self {
785 base_file_path: base_file_path.to_path_buf(),
786 main_header,
787 current_file: file,
788 current_mmap_buffer: mmap,
789 current_slab_index: 0,
790 current_reading_position: prolog as usize,
791 })
792 }
793
794 pub fn position(&self) -> LogPosition {
796 LogPosition {
797 slab_index: self.current_slab_index,
798 offset: self.current_reading_position,
799 }
800 }
801
802 pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
804 if pos.slab_index != self.current_slab_index {
805 let (file, mmap, _prolog, _header) =
806 open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
807 CuError::new_with_cause(
808 &format!("Failed to open slab {} for seek", pos.slab_index),
809 e,
810 )
811 })?;
812 self.current_file = file;
813 self.current_mmap_buffer = mmap;
814 self.current_slab_index = pos.slab_index;
815 }
816 self.current_reading_position = pos.offset;
817 Ok(())
818 }
819
820 fn next_slab(&mut self) -> io::Result<()> {
821 self.current_slab_index += 1;
822 let (file, mmap, prolog, _) =
823 open_slab_index(&self.base_file_path, self.current_slab_index)?;
824 self.current_file = file;
825 self.current_mmap_buffer = mmap;
826 self.current_reading_position = prolog as usize;
827 Ok(())
828 }
829
830 pub fn raw_main_header(&self) -> &MainHeader {
831 &self.main_header
832 }
833
834 pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
835 let mut total = 0u64;
836
837 loop {
838 if self.current_reading_position >= self.current_mmap_buffer.len() {
839 self.next_slab().map_err(|e| {
840 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
841 })?;
842 }
843
844 let header = self.read_section_header()?;
845
846 if header.entry_type == UnifiedLogType::LastEntry {
847 return Ok(total);
848 }
849
850 if header.entry_type == datalogtype {
851 total = total.saturating_add(header.used as u64);
852 }
853
854 self.current_reading_position += header.offset_to_next_section as usize;
855 }
856 }
857
858 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
860 let mut section_data = vec![0; header.used as usize];
862 let start_of_data = self.current_reading_position + header.block_size as usize;
863 section_data.copy_from_slice(
864 &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
865 );
866
867 Ok(section_data)
868 }
869
870 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
871 let section_header: SectionHeader;
872 (section_header, _) = decode_from_slice(
873 &self.current_mmap_buffer[self.current_reading_position..],
874 standard(),
875 )
876 .map_err(|e| {
877 CuError::new_with_cause(
878 &format!(
879 "Could not read a sections header: {}/{}:{}",
880 self.base_file_path.as_os_str().to_string_lossy(),
881 self.current_slab_index,
882 self.current_reading_position,
883 ),
884 e,
885 )
886 })?;
887 if section_header.magic != SECTION_MAGIC {
888 return Err("Invalid magic number in section header".into());
889 }
890
891 Ok(section_header)
892 }
893}
894
895pub struct UnifiedLoggerIOReader {
897 logger: MmapUnifiedLoggerRead,
898 log_type: UnifiedLogType,
899 buffer: Vec<u8>,
900 buffer_pos: usize,
901}
902
903impl UnifiedLoggerIOReader {
904 pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
905 Self {
906 logger,
907 log_type,
908 buffer: Vec::new(),
909 buffer_pos: 0,
910 }
911 }
912
913 fn fill_buffer(&mut self) -> io::Result<bool> {
915 match self.logger.read_next_section_type(self.log_type) {
916 Ok(Some(section)) => {
917 self.buffer = section;
918 self.buffer_pos = 0;
919 Ok(true)
920 }
921 Ok(None) => Ok(false), Err(e) => Err(io::Error::other(e.to_string())),
923 }
924 }
925}
926
927impl Read for UnifiedLoggerIOReader {
928 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
929 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
930 return Ok(0);
932 }
933
934 if self.buffer_pos >= self.buffer.len() {
936 return Ok(0);
937 }
938
939 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
941 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
942 self.buffer_pos += len;
943 Ok(len)
944 }
945}
946
947#[cfg(feature = "std")]
948#[cfg(test)]
949mod tests {
950 use super::*;
951 use crate::stream_write;
952 use bincode::de::read::SliceReader;
953 use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
954 use cu29_traits::WriteStream;
955 use std::path::PathBuf;
956 use std::sync::{Arc, Mutex};
957 use tempfile::TempDir;
958
959 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
963 tmp_dir: &TempDir,
964 slab_size: usize,
965 ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
966 let file_path = tmp_dir.path().join("test.bin");
967 let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
968 .write(true)
969 .create(true)
970 .file_base_name(&file_path)
971 .preallocated_size(slab_size)
972 .build()
973 .expect("Failed to create logger")
974 else {
975 panic!("Failed to create logger")
976 };
977
978 (Arc::new(Mutex::new(data_logger)), file_path)
979 }
980
981 #[test]
982 fn test_truncation_and_sections_creations() {
983 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
984 let file_path = tmp_dir.path().join("test.bin");
985 let _used = {
986 let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
987 .write(true)
988 .create(true)
989 .file_base_name(&file_path)
990 .preallocated_size(100000)
991 .build()
992 .expect("Failed to create logger")
993 else {
994 panic!("Failed to create logger")
995 };
996 logger
997 .add_section(UnifiedLogType::StructuredLogLine, 1024)
998 .unwrap();
999 logger
1000 .add_section(UnifiedLogType::CopperList, 2048)
1001 .unwrap();
1002 let used = logger.front_slab.used();
1003 assert!(used < 4 * page_size::get()); used
1007 };
1008
1009 let _file = OpenOptions::new()
1010 .read(true)
1011 .open(tmp_dir.path().join("test_0.bin"))
1012 .expect("Could not reopen the file");
1013 }
1020
1021 #[test]
1022 fn test_base_alias_exists_and_matches_first_slab() {
1023 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1024 let file_path = tmp_dir.path().join("test.bin");
1025 let _logger = MmapUnifiedLoggerBuilder::new()
1026 .write(true)
1027 .create(true)
1028 .file_base_name(&file_path)
1029 .preallocated_size(LARGE_SLAB)
1030 .build()
1031 .expect("Failed to create logger");
1032
1033 let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1034 assert!(file_path.exists(), "base alias does not exist");
1035 assert!(first_slab.exists(), "first slab does not exist");
1036
1037 let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1038 let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1039 assert_eq!(alias_bytes, slab_bytes);
1040 }
1041
1042 #[test]
1043 fn test_one_section_self_cleaning() {
1044 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1045 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1046 {
1047 let _stream = stream_write::<(), MmapSectionStorage>(
1048 logger.clone(),
1049 UnifiedLogType::StructuredLogLine,
1050 1024,
1051 );
1052 assert_eq!(
1053 logger
1054 .lock()
1055 .unwrap()
1056 .front_slab
1057 .sections_offsets_in_flight
1058 .len(),
1059 1
1060 );
1061 }
1062 assert_eq!(
1063 logger
1064 .lock()
1065 .unwrap()
1066 .front_slab
1067 .sections_offsets_in_flight
1068 .len(),
1069 0
1070 );
1071 let logger = logger.lock().unwrap();
1072 assert_eq!(
1073 logger.front_slab.flushed_until_offset,
1074 logger.front_slab.current_global_position
1075 );
1076 }
1077
1078 #[test]
1079 fn test_temporary_end_marker_is_created() {
1080 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1081 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1082 {
1083 let mut stream = stream_write::<u32, MmapSectionStorage>(
1084 logger.clone(),
1085 UnifiedLogType::StructuredLogLine,
1086 1024,
1087 )
1088 .unwrap();
1089 stream.log(&42u32).unwrap();
1090 }
1091
1092 let logger_guard = logger.lock().unwrap();
1093 let slab = &logger_guard.front_slab;
1094 let marker_start = slab
1095 .temporary_end_marker
1096 .expect("temporary end-of-log marker missing");
1097 let (eof_header, _) =
1098 decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1099 .expect("Could not decode end-of-log marker header");
1100 assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1101 assert!(eof_header.is_open);
1102 assert_eq!(eof_header.used, 0);
1103 }
1104
1105 #[test]
1106 fn test_final_end_marker_is_not_temporary() {
1107 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1108 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1109 {
1110 let mut stream = stream_write::<u32, MmapSectionStorage>(
1111 logger.clone(),
1112 UnifiedLogType::CopperList,
1113 1024,
1114 )
1115 .unwrap();
1116 stream.log(&1u32).unwrap();
1117 }
1118 drop(logger);
1119
1120 let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1121 .file_base_name(&f)
1122 .build()
1123 .expect("Failed to build reader")
1124 else {
1125 panic!("Failed to create reader");
1126 };
1127
1128 loop {
1129 let (header, _data) = reader
1130 .raw_read_section()
1131 .expect("Failed to read section while searching for EOF");
1132 if header.entry_type == UnifiedLogType::LastEntry {
1133 assert!(!header.is_open);
1134 break;
1135 }
1136 }
1137 }
1138
1139 #[test]
1140 fn test_two_sections_self_cleaning_in_order() {
1141 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1142 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1143 let s1 = stream_write::<(), MmapSectionStorage>(
1144 logger.clone(),
1145 UnifiedLogType::StructuredLogLine,
1146 1024,
1147 );
1148 assert_eq!(
1149 logger
1150 .lock()
1151 .unwrap()
1152 .front_slab
1153 .sections_offsets_in_flight
1154 .len(),
1155 1
1156 );
1157 let s2 = stream_write::<(), MmapSectionStorage>(
1158 logger.clone(),
1159 UnifiedLogType::StructuredLogLine,
1160 1024,
1161 );
1162 assert_eq!(
1163 logger
1164 .lock()
1165 .unwrap()
1166 .front_slab
1167 .sections_offsets_in_flight
1168 .len(),
1169 2
1170 );
1171 drop(s2);
1172 assert_eq!(
1173 logger
1174 .lock()
1175 .unwrap()
1176 .front_slab
1177 .sections_offsets_in_flight
1178 .len(),
1179 1
1180 );
1181 drop(s1);
1182 let lg = logger.lock().unwrap();
1183 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1184 assert_eq!(
1185 lg.front_slab.flushed_until_offset,
1186 lg.front_slab.current_global_position
1187 );
1188 }
1189
1190 #[test]
1191 fn test_two_sections_self_cleaning_out_of_order() {
1192 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1193 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1194 let s1 = stream_write::<(), MmapSectionStorage>(
1195 logger.clone(),
1196 UnifiedLogType::StructuredLogLine,
1197 1024,
1198 );
1199 assert_eq!(
1200 logger
1201 .lock()
1202 .unwrap()
1203 .front_slab
1204 .sections_offsets_in_flight
1205 .len(),
1206 1
1207 );
1208 let s2 = stream_write::<(), MmapSectionStorage>(
1209 logger.clone(),
1210 UnifiedLogType::StructuredLogLine,
1211 1024,
1212 );
1213 assert_eq!(
1214 logger
1215 .lock()
1216 .unwrap()
1217 .front_slab
1218 .sections_offsets_in_flight
1219 .len(),
1220 2
1221 );
1222 drop(s1);
1223 assert_eq!(
1224 logger
1225 .lock()
1226 .unwrap()
1227 .front_slab
1228 .sections_offsets_in_flight
1229 .len(),
1230 1
1231 );
1232 drop(s2);
1233 let lg = logger.lock().unwrap();
1234 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1235 assert_eq!(
1236 lg.front_slab.flushed_until_offset,
1237 lg.front_slab.current_global_position
1238 );
1239 }
1240
1241 #[test]
1242 fn test_write_then_read_one_section() {
1243 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1244 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1245 {
1246 let mut stream =
1247 stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1248 stream.log(&1u32).unwrap();
1249 stream.log(&2u32).unwrap();
1250 stream.log(&3u32).unwrap();
1251 }
1252 drop(logger);
1253 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1254 .file_base_name(&f)
1255 .build()
1256 .expect("Failed to build logger")
1257 else {
1258 panic!("Failed to build logger");
1259 };
1260 let section = dl
1261 .read_next_section_type(UnifiedLogType::StructuredLogLine)
1262 .expect("Failed to read section");
1263 assert!(section.is_some());
1264 let section = section.unwrap();
1265 let mut reader = SliceReader::new(§ion[..]);
1266 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1267 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1268 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1269 assert_eq!(v1, 1);
1270 assert_eq!(v2, 2);
1271 assert_eq!(v3, 3);
1272 }
1273
1274 #[derive(Debug, Encode, Decode)]
1277 enum CopperListStateMock {
1278 Free,
1279 ProcessingTasks,
1280 BeingSerialized,
1281 }
1282
1283 #[derive(Encode, Decode)]
1284 struct CopperList<P: bincode::enc::Encode> {
1285 state: CopperListStateMock,
1286 payload: P, }
1288
1289 #[test]
1290 fn test_copperlist_list_like_logging() {
1291 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1292 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1293 {
1294 let mut stream =
1295 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1296 let cl0 = CopperList {
1297 state: CopperListStateMock::Free,
1298 payload: (1u32, 2u32, 3u32),
1299 };
1300 let cl1 = CopperList {
1301 state: CopperListStateMock::ProcessingTasks,
1302 payload: (4u32, 5u32, 6u32),
1303 };
1304 stream.log(&cl0).unwrap();
1305 stream.log(&cl1).unwrap();
1306 }
1307 drop(logger);
1308
1309 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1310 .file_base_name(&f)
1311 .build()
1312 .expect("Failed to build logger")
1313 else {
1314 panic!("Failed to build logger");
1315 };
1316 let section = dl
1317 .read_next_section_type(UnifiedLogType::CopperList)
1318 .expect("Failed to read section");
1319 assert!(section.is_some());
1320 let section = section.unwrap();
1321
1322 let mut reader = SliceReader::new(§ion[..]);
1323 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1324 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1325 assert_eq!(cl0.payload.1, 2);
1326 assert_eq!(cl1.payload.2, 6);
1327 }
1328
1329 #[test]
1330 fn test_multi_slab_end2end() {
1331 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1332 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1333 {
1334 let mut stream =
1335 stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1336 let cl0 = CopperList {
1337 state: CopperListStateMock::Free,
1338 payload: (1u32, 2u32, 3u32),
1339 };
1340 for _ in 0..10000 {
1342 stream.log(&cl0).unwrap();
1343 }
1344 }
1345 drop(logger);
1346
1347 let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1348 .file_base_name(&f)
1349 .build()
1350 .expect("Failed to build logger")
1351 else {
1352 panic!("Failed to build logger");
1353 };
1354 let mut total_readback = 0;
1355 loop {
1356 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1357 if section.is_err() {
1358 break;
1359 }
1360 let section = section.unwrap();
1361 if section.is_none() {
1362 break;
1363 }
1364 let section = section.unwrap();
1365
1366 let mut reader = SliceReader::new(§ion[..]);
1367 loop {
1368 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1369 decode_from_reader(&mut reader, standard());
1370 if maybe_cl.is_ok() {
1371 total_readback += 1;
1372 } else {
1373 break;
1374 }
1375 }
1376 }
1377 assert_eq!(total_readback, 10000);
1378 }
1379}