Skip to main content

cu29_unifiedlog/
memmap.rs

1//! This is the memory map file implementation for the unified logger for Copper.
2//! It is std only.
3
4use crate::{
5    AllocatedSection, MAIN_MAGIC, MainHeader, SECTION_MAGIC, SectionHandle, SectionHeader,
6    SectionStorage, UnifiedLogRead, UnifiedLogStatus, UnifiedLogWrite,
7};
8
9use crate::SECTION_HEADER_COMPACT_SIZE;
10
11use AllocatedSection::Section;
12use bincode::config::standard;
13use bincode::error::EncodeError;
14use bincode::{Encode, decode_from_slice, encode_into_slice};
15use core::slice::from_raw_parts_mut;
16use cu29_traits::{CuError, CuResult, UnifiedLogType};
17use memmap2::{Mmap, MmapMut};
18use std::fs::{File, OpenOptions};
19use std::io::Read;
20use std::mem::ManuallyDrop;
21use std::path::{Path, PathBuf};
22use std::{io, mem};
23
24pub struct MmapSectionStorage {
25    buffer: &'static mut [u8],
26    offset: usize,
27    block_size: usize,
28}
29
30impl MmapSectionStorage {
31    pub fn new(buffer: &'static mut [u8], block_size: usize) -> Self {
32        Self {
33            buffer,
34            offset: 0,
35            block_size,
36        }
37    }
38
39    pub fn buffer_ptr(&self) -> *const u8 {
40        &self.buffer[0] as *const u8
41    }
42}
43
44impl SectionStorage for MmapSectionStorage {
45    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
46        self.post_update_header(header)?;
47        self.offset = self.block_size;
48        Ok(self.offset)
49    }
50
51    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError> {
52        encode_into_slice(header, &mut self.buffer[0..], standard())
53    }
54
55    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError> {
56        let size = encode_into_slice(entry, &mut self.buffer[self.offset..], standard())?;
57        self.offset += size;
58        Ok(size)
59    }
60
61    fn flush(&mut self) -> CuResult<usize> {
62        // Flushing is handled at the slab level for mmap-backed storage.
63        Ok(self.offset)
64    }
65}
66
67///
68/// Holds the read or write side of the datalogger.
69pub enum MmapUnifiedLogger {
70    Read(MmapUnifiedLoggerRead),
71    Write(MmapUnifiedLoggerWrite),
72}
73
74/// Use this builder to create a new DataLogger.
75pub struct MmapUnifiedLoggerBuilder {
76    file_base_name: Option<PathBuf>,
77    preallocated_size: Option<usize>,
78    write: bool,
79    create: bool,
80}
81
82impl Default for MmapUnifiedLoggerBuilder {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl MmapUnifiedLoggerBuilder {
89    pub fn new() -> Self {
90        Self {
91            file_base_name: None,
92            preallocated_size: None,
93            write: false,
94            create: false, // This is the safest default
95        }
96    }
97
98    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
99    pub fn file_base_name(mut self, file_path: &Path) -> Self {
100        self.file_base_name = Some(file_path.to_path_buf());
101        self
102    }
103
104    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
105        self.preallocated_size = Some(preallocated_size);
106        self
107    }
108
109    pub fn write(mut self, write: bool) -> Self {
110        self.write = write;
111        self
112    }
113
114    pub fn create(mut self, create: bool) -> Self {
115        self.create = create;
116        self
117    }
118
119    pub fn build(self) -> io::Result<MmapUnifiedLogger> {
120        let page_size = page_size::get();
121
122        if self.write && self.create {
123            let file_path = self.file_base_name.ok_or_else(|| {
124                io::Error::new(
125                    io::ErrorKind::InvalidInput,
126                    "File path is required for write mode",
127                )
128            })?;
129            let preallocated_size = self.preallocated_size.ok_or_else(|| {
130                io::Error::new(
131                    io::ErrorKind::InvalidInput,
132                    "Preallocated size is required for write mode",
133                )
134            })?;
135            let ulw = MmapUnifiedLoggerWrite::new(&file_path, preallocated_size, page_size)?;
136            Ok(MmapUnifiedLogger::Write(ulw))
137        } else {
138            let file_path = self.file_base_name.ok_or_else(|| {
139                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
140            })?;
141            let ulr = MmapUnifiedLoggerRead::new(&file_path)?;
142            Ok(MmapUnifiedLogger::Read(ulr))
143        }
144    }
145}
146
147struct SlabEntry {
148    file: File,
149    mmap_buffer: ManuallyDrop<MmapMut>,
150    current_global_position: usize,
151    sections_offsets_in_flight: Vec<usize>,
152    flushed_until_offset: usize,
153    page_size: usize,
154    temporary_end_marker: Option<usize>,
155}
156
157impl Drop for SlabEntry {
158    fn drop(&mut self) {
159        self.flush_until(self.current_global_position);
160        // SAFETY: We own the mapping and must drop it before trimming the file.
161        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
162        if let Err(error) = self.file.set_len(self.current_global_position as u64) {
163            eprintln!("Failed to trim datalogger file: {}", error);
164        }
165
166        if !self.sections_offsets_in_flight.is_empty() {
167            eprintln!("Error: Slab not full flushed.");
168        }
169    }
170}
171
172impl SlabEntry {
173    fn new(file: File, page_size: usize) -> io::Result<Self> {
174        let mmap_buffer = ManuallyDrop::new(
175            // SAFETY: The file descriptor is valid and mapping is confined to this struct.
176            unsafe { MmapMut::map_mut(&file) }
177                .map_err(|e| io::Error::new(e.kind(), format!("Failed to map file: {e}")))?,
178        );
179        Ok(Self {
180            file,
181            mmap_buffer,
182            current_global_position: 0,
183            sections_offsets_in_flight: Vec::with_capacity(16),
184            flushed_until_offset: 0,
185            page_size,
186            temporary_end_marker: None,
187        })
188    }
189
190    /// Unsure the underlying mmap is flush to disk until the given position.
191    fn flush_until(&mut self, until_position: usize) {
192        // This is tolerated under linux, but crashes on macos
193        if (self.flushed_until_offset == until_position) || (until_position == 0) {
194            return;
195        }
196        self.mmap_buffer
197            .flush_async_range(
198                self.flushed_until_offset,
199                until_position - self.flushed_until_offset,
200            )
201            .expect("Failed to flush memory map");
202        self.flushed_until_offset = until_position;
203    }
204
205    fn clear_temporary_end_marker(&mut self) {
206        if let Some(marker_start) = self.temporary_end_marker.take() {
207            self.current_global_position = marker_start;
208            if self.flushed_until_offset > marker_start {
209                self.flushed_until_offset = marker_start;
210            }
211        }
212    }
213
214    fn write_end_marker(&mut self, temporary: bool) -> CuResult<()> {
215        let block_size = SECTION_HEADER_COMPACT_SIZE as usize;
216        let marker_start = self.align_to_next_page(self.current_global_position);
217        let total_marker_size = block_size; // header only
218        let marker_end = marker_start + total_marker_size;
219        if marker_end > self.mmap_buffer.len() {
220            return Err("Not enough space to write end-of-log marker".into());
221        }
222
223        let header = SectionHeader {
224            magic: SECTION_MAGIC,
225            block_size: SECTION_HEADER_COMPACT_SIZE,
226            entry_type: UnifiedLogType::LastEntry,
227            offset_to_next_section: total_marker_size as u32,
228            used: 0,
229            is_open: temporary,
230        };
231
232        encode_into_slice(
233            &header,
234            &mut self.mmap_buffer
235                [marker_start..marker_start + SECTION_HEADER_COMPACT_SIZE as usize],
236            standard(),
237        )
238        .map_err(|e| CuError::new_with_cause("Failed to encode end-of-log header", e))?;
239
240        self.temporary_end_marker = Some(marker_start);
241        self.current_global_position = marker_end;
242        Ok(())
243    }
244
245    fn is_it_my_section(&self, section: &SectionHandle<MmapSectionStorage>) -> bool {
246        let storage = section.get_storage();
247        let ptr = storage.buffer_ptr();
248        (ptr >= self.mmap_buffer.as_ptr())
249            && (ptr as usize)
250                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
251    }
252
253    /// Flush the section to disk.
254    /// the flushing is permanent and the section is considered closed.
255    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
256        section
257            .post_update_header()
258            .expect("Failed to update section header");
259
260        let storage = section.get_storage();
261        let ptr = storage.buffer_ptr();
262
263        if ptr < self.mmap_buffer.as_ptr()
264            || ptr as usize > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
265        {
266            panic!("Invalid section buffer, not in the slab");
267        }
268
269        let base = self.mmap_buffer.as_ptr() as usize;
270        self.sections_offsets_in_flight
271            .retain(|&x| x != ptr as usize - base);
272
273        if self.sections_offsets_in_flight.is_empty() {
274            self.flush_until(self.current_global_position);
275            return;
276        }
277        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
278            self.flush_until(self.sections_offsets_in_flight[0]);
279        }
280    }
281
282    #[inline]
283    fn align_to_next_page(&self, ptr: usize) -> usize {
284        (ptr + self.page_size - 1) & !(self.page_size - 1)
285    }
286
287    /// The returned slice is section_size or greater.
288    fn add_section(
289        &mut self,
290        entry_type: UnifiedLogType,
291        requested_section_size: usize,
292    ) -> AllocatedSection<MmapSectionStorage> {
293        // align current_position to the next page
294        self.current_global_position = self.align_to_next_page(self.current_global_position);
295        let section_size = self.align_to_next_page(requested_section_size) as u32;
296
297        // We need to have enough space to store the section in that slab
298        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
299            return AllocatedSection::NoMoreSpace;
300        }
301
302        #[cfg(feature = "compact")]
303        let block_size = SECTION_HEADER_COMPACT_SIZE;
304
305        #[cfg(not(feature = "compact"))]
306        let block_size = self.page_size as u16;
307
308        let section_header = SectionHeader {
309            magic: SECTION_MAGIC,
310            block_size,
311            entry_type,
312            offset_to_next_section: section_size,
313            used: 0u32,
314            is_open: true,
315        };
316
317        // save the position to keep track for in flight sections
318        self.sections_offsets_in_flight
319            .push(self.current_global_position);
320        let end_of_section = self.current_global_position + requested_section_size;
321        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
322
323        // SAFETY: We have exclusive access to user_buffer for the handle's lifetime.
324        let handle_buffer =
325            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
326        let storage = MmapSectionStorage::new(handle_buffer, block_size as usize);
327
328        self.current_global_position = end_of_section;
329
330        Section(SectionHandle::create(section_header, storage).expect("Failed to create section"))
331    }
332
333    #[cfg(test)]
334    fn used(&self) -> usize {
335        self.current_global_position
336    }
337}
338
339/// A write side of the datalogger.
340pub struct MmapUnifiedLoggerWrite {
341    /// the front slab is the current active slab for any new section.
342    front_slab: SlabEntry,
343    /// the back slab is the previous slab that is being flushed.
344    back_slabs: Vec<SlabEntry>,
345    /// base file path to create the backing files from.
346    base_file_path: PathBuf,
347    /// allocation size for the backing files.
348    slab_size: usize,
349    /// current suffix for the backing files.
350    front_slab_suffix: usize,
351}
352
353fn build_slab_path(base_file_path: &Path, slab_index: usize) -> io::Result<PathBuf> {
354    let mut file_path = base_file_path.to_path_buf();
355    let stem = file_path.file_stem().ok_or_else(|| {
356        io::Error::new(
357            io::ErrorKind::InvalidInput,
358            "Base file path has no file name",
359        )
360    })?;
361    let stem = stem.to_str().ok_or_else(|| {
362        io::Error::new(
363            io::ErrorKind::InvalidInput,
364            "Base file name is not valid UTF-8",
365        )
366    })?;
367    let extension = file_path.extension().ok_or_else(|| {
368        io::Error::new(
369            io::ErrorKind::InvalidInput,
370            "Base file path has no extension",
371        )
372    })?;
373    let extension = extension.to_str().ok_or_else(|| {
374        io::Error::new(
375            io::ErrorKind::InvalidInput,
376            "Base file extension is not valid UTF-8",
377        )
378    })?;
379    if stem.is_empty() {
380        return Err(io::Error::new(
381            io::ErrorKind::InvalidInput,
382            "Base file name is empty",
383        ));
384    }
385    let file_name = format!("{stem}_{slab_index}.{extension}");
386    file_path.set_file_name(file_name);
387    Ok(file_path)
388}
389
390fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> io::Result<File> {
391    let file_path = build_slab_path(base_file_path, slab_suffix)?;
392    let file = OpenOptions::new()
393        .read(true)
394        .write(true)
395        .create(true)
396        .truncate(true)
397        .open(&file_path)
398        .map_err(|e| {
399            io::Error::new(
400                e.kind(),
401                format!("Failed to open file {}: {e}", file_path.display()),
402            )
403        })?;
404    file.set_len(slab_size as u64).map_err(|e| {
405        io::Error::new(
406            e.kind(),
407            format!("Failed to set file length for {}: {e}", file_path.display()),
408        )
409    })?;
410    Ok(file)
411}
412
413fn remove_existing_alias(base_file_path: &Path) -> io::Result<()> {
414    match std::fs::symlink_metadata(base_file_path) {
415        Ok(meta) => {
416            if meta.is_dir() {
417                return Err(io::Error::new(
418                    io::ErrorKind::AlreadyExists,
419                    format!(
420                        "Cannot create base log alias at {} because a directory already exists there",
421                        base_file_path.display()
422                    ),
423                ));
424            }
425            std::fs::remove_file(base_file_path).map_err(|e| {
426                io::Error::new(
427                    e.kind(),
428                    format!(
429                        "Failed to remove existing base log alias {}: {e}",
430                        base_file_path.display()
431                    ),
432                )
433            })
434        }
435        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
436        Err(e) => Err(io::Error::new(
437            e.kind(),
438            format!(
439                "Failed to inspect existing base log alias {}: {e}",
440                base_file_path.display()
441            ),
442        )),
443    }
444}
445
446fn create_base_alias_link(base_file_path: &Path) -> io::Result<()> {
447    let first_slab_path = build_slab_path(base_file_path, 0)?;
448    remove_existing_alias(base_file_path)?;
449
450    #[cfg(unix)]
451    {
452        use std::os::unix::fs::symlink;
453        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
454            io::Error::new(
455                io::ErrorKind::InvalidInput,
456                "First slab file has no name component",
457            )
458        })?);
459        symlink(relative_target, base_file_path).map_err(|e| {
460            io::Error::new(
461                e.kind(),
462                format!(
463                    "Failed to create base log alias {} -> {}: {e}",
464                    base_file_path.display(),
465                    first_slab_path.display()
466                ),
467            )
468        })
469    }
470
471    #[cfg(windows)]
472    {
473        use std::os::windows::fs::symlink_file;
474        let relative_target = Path::new(first_slab_path.file_name().ok_or_else(|| {
475            io::Error::new(
476                io::ErrorKind::InvalidInput,
477                "First slab file has no name component",
478            )
479        })?);
480        match symlink_file(relative_target, base_file_path) {
481            Ok(()) => Ok(()),
482            Err(symlink_err) => std::fs::hard_link(&first_slab_path, base_file_path).map_err(
483                |hard_link_err| {
484                    io::Error::other(format!(
485                        "Failed to create base log alias {}. Symlink error: {symlink_err}. Hard-link fallback error: {hard_link_err}",
486                        base_file_path.display()
487                    ))
488                },
489            ),
490        }?;
491        Ok(())
492    }
493
494    #[cfg(not(any(unix, windows)))]
495    {
496        std::fs::hard_link(&first_slab_path, base_file_path).map_err(|e| {
497            io::Error::new(
498                e.kind(),
499                format!(
500                    "Failed to create base log alias {} -> {}: {e}",
501                    base_file_path.display(),
502                    first_slab_path.display()
503                ),
504            )
505        })
506    }
507}
508
509impl UnifiedLogWrite<MmapSectionStorage> for MmapUnifiedLoggerWrite {
510    /// The returned slice is section_size or greater.
511    fn add_section(
512        &mut self,
513        entry_type: UnifiedLogType,
514        requested_section_size: usize,
515    ) -> CuResult<SectionHandle<MmapSectionStorage>> {
516        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
517        self.front_slab.clear_temporary_end_marker();
518        let maybe_section = self
519            .front_slab
520            .add_section(entry_type, requested_section_size);
521
522        match maybe_section {
523            AllocatedSection::NoMoreSpace => {
524                // move the front slab to the back slab.
525                let new_slab = self.create_slab()?;
526                // keep the slab until all its sections has been flushed.
527                self.back_slabs
528                    .push(mem::replace(&mut self.front_slab, new_slab));
529                match self
530                    .front_slab
531                    .add_section(entry_type, requested_section_size)
532                {
533                    AllocatedSection::NoMoreSpace => Err(CuError::from("out of space")),
534                    Section(section) => {
535                        self.place_end_marker(true)?;
536                        Ok(section)
537                    }
538                }
539            }
540            Section(section) => {
541                self.place_end_marker(true)?;
542                Ok(section)
543            }
544        }
545    }
546
547    fn flush_section(&mut self, section: &mut SectionHandle<MmapSectionStorage>) {
548        section.mark_closed();
549        for slab in self.back_slabs.iter_mut() {
550            if slab.is_it_my_section(section) {
551                slab.flush_section(section);
552                return;
553            }
554        }
555        self.front_slab.flush_section(section);
556    }
557
558    fn status(&self) -> UnifiedLogStatus {
559        UnifiedLogStatus {
560            total_used_space: self.front_slab.current_global_position,
561            total_allocated_space: self.slab_size * self.front_slab_suffix,
562        }
563    }
564}
565
566impl MmapUnifiedLoggerWrite {
567    fn next_slab(&mut self) -> io::Result<File> {
568        let next_suffix = self.front_slab_suffix + 1;
569        let file = make_slab_file(&self.base_file_path, self.slab_size, next_suffix)?;
570        self.front_slab_suffix = next_suffix;
571        Ok(file)
572    }
573
574    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> io::Result<Self> {
575        let file = make_slab_file(base_file_path, slab_size, 0)?;
576        create_base_alias_link(base_file_path)?;
577        let mut front_slab = SlabEntry::new(file, page_size)?;
578
579        // This is the first slab so add the main header.
580        let main_header = MainHeader {
581            magic: MAIN_MAGIC,
582            first_section_offset: page_size as u16,
583            page_size: page_size as u16,
584        };
585        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
586            .map_err(|e| io::Error::other(format!("Failed to encode main header: {e}")))?;
587        assert!(nb_bytes < page_size);
588        front_slab.current_global_position = page_size; // align to the next page
589
590        Ok(Self {
591            front_slab,
592            back_slabs: Vec::new(),
593            base_file_path: base_file_path.to_path_buf(),
594            slab_size,
595            front_slab_suffix: 0,
596        })
597    }
598
599    fn garbage_collect_backslabs(&mut self) {
600        self.back_slabs
601            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
602    }
603
604    fn place_end_marker(&mut self, temporary: bool) -> CuResult<()> {
605        match self.front_slab.write_end_marker(temporary) {
606            Ok(_) => Ok(()),
607            Err(_) => {
608                // Not enough space in the current slab, roll to a new one.
609                let new_slab = self.create_slab()?;
610                self.back_slabs
611                    .push(mem::replace(&mut self.front_slab, new_slab));
612                self.front_slab.write_end_marker(temporary)
613            }
614        }
615    }
616
617    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
618        (
619            self.front_slab.current_global_position,
620            self.front_slab.sections_offsets_in_flight.clone(),
621            self.back_slabs.len(),
622        )
623    }
624
625    fn create_slab(&mut self) -> CuResult<SlabEntry> {
626        let file = self
627            .next_slab()
628            .map_err(|e| CuError::new_with_cause("Failed to create slab file", e))?;
629        SlabEntry::new(file, self.front_slab.page_size)
630            .map_err(|e| CuError::new_with_cause("Failed to create slab memory map", e))
631    }
632}
633
634impl Drop for MmapUnifiedLoggerWrite {
635    fn drop(&mut self) {
636        #[cfg(debug_assertions)]
637        eprintln!("Flushing the unified Logger ... "); // Note this cannot be a structured log writing in this log.
638
639        self.front_slab.clear_temporary_end_marker();
640        if let Err(e) = self.place_end_marker(false) {
641            panic!("Failed to flush the unified logger: {}", e);
642        }
643        self.front_slab
644            .flush_until(self.front_slab.current_global_position);
645        self.garbage_collect_backslabs();
646        #[cfg(debug_assertions)]
647        eprintln!("Unified Logger flushed."); // Note this cannot be a structured log writing in this log.
648    }
649}
650
651fn open_slab_index(
652    base_file_path: &Path,
653    slab_index: usize,
654) -> io::Result<(File, Mmap, u16, Option<MainHeader>)> {
655    let mut options = OpenOptions::new();
656    let options = options.read(true);
657
658    let file_path = build_slab_path(base_file_path, slab_index)?;
659    let file = options.open(&file_path).map_err(|e| {
660        io::Error::new(
661            e.kind(),
662            format!("Failed to open slab file {}: {e}", file_path.display()),
663        )
664    })?;
665    // SAFETY: The file is kept open for the lifetime of the mapping.
666    let mmap = unsafe { Mmap::map(&file) }
667        .map_err(|e| io::Error::new(e.kind(), format!("Failed to map slab file: {e}")))?;
668    let mut prolog = 0u16;
669    let mut maybe_main_header: Option<MainHeader> = None;
670    if slab_index == 0 {
671        let main_header: MainHeader;
672        let _read: usize;
673        (main_header, _read) = decode_from_slice(&mmap[..], standard()).map_err(|e| {
674            io::Error::new(
675                io::ErrorKind::InvalidData,
676                format!("Failed to decode main header: {e}"),
677            )
678        })?;
679        if main_header.magic != MAIN_MAGIC {
680            return Err(io::Error::new(
681                io::ErrorKind::InvalidData,
682                "Invalid magic number in main header",
683            ));
684        }
685        prolog = main_header.first_section_offset;
686        maybe_main_header = Some(main_header);
687    }
688    Ok((file, mmap, prolog, maybe_main_header))
689}
690
691/// A read side of the memory map based unified logger.
692pub struct MmapUnifiedLoggerRead {
693    base_file_path: PathBuf,
694    main_header: MainHeader,
695    current_mmap_buffer: Mmap,
696    current_file: File,
697    current_slab_index: usize,
698    current_reading_position: usize,
699}
700
701/// Absolute position inside a unified log (slab index + byte offset).
702#[derive(Clone, Copy, Debug, PartialEq, Eq)]
703pub struct LogPosition {
704    pub slab_index: usize,
705    pub offset: usize,
706}
707
708impl UnifiedLogRead for MmapUnifiedLoggerRead {
709    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>> {
710        // TODO: eventually implement a 0 copy of this too.
711        loop {
712            if self.current_reading_position >= self.current_mmap_buffer.len() {
713                self.next_slab().map_err(|e| {
714                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
715                })?;
716            }
717
718            let header_result = self.read_section_header();
719            let header = header_result.map_err(|error| {
720                CuError::new_with_cause(
721                    &format!(
722                        "Could not read a sections header: {}/{}:{}",
723                        self.base_file_path.as_os_str().to_string_lossy(),
724                        self.current_slab_index,
725                        self.current_reading_position,
726                    ),
727                    error,
728                )
729            })?;
730
731            // Reached the end of file
732            if header.entry_type == UnifiedLogType::LastEntry {
733                return Ok(None);
734            }
735
736            // Found a section of the requested type
737            if header.entry_type == datalogtype {
738                let result = Some(self.read_section_content(&header)?);
739                self.current_reading_position += header.offset_to_next_section as usize;
740                return Ok(result);
741            }
742
743            // Keep reading until we find the requested type
744            self.current_reading_position += header.offset_to_next_section as usize;
745        }
746    }
747
748    /// Reads the section from the section header pos.
749    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)> {
750        if self.current_reading_position >= self.current_mmap_buffer.len() {
751            self.next_slab().map_err(|e| {
752                CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
753            })?;
754        }
755
756        let read_result = self.read_section_header();
757
758        match read_result {
759            Err(error) => Err(CuError::new_with_cause(
760                &format!(
761                    "Could not read a sections header: {}/{}:{}",
762                    self.base_file_path.as_os_str().to_string_lossy(),
763                    self.current_slab_index,
764                    self.current_reading_position,
765                ),
766                error,
767            )),
768            Ok(header) => {
769                let data = self.read_section_content(&header)?;
770                self.current_reading_position += header.offset_to_next_section as usize;
771                Ok((header, data))
772            }
773        }
774    }
775}
776
777impl MmapUnifiedLoggerRead {
778    pub fn new(base_file_path: &Path) -> io::Result<Self> {
779        let (file, mmap, prolog, header) = open_slab_index(base_file_path, 0)?;
780        let main_header = header.ok_or_else(|| {
781            io::Error::new(io::ErrorKind::InvalidData, "Missing main header in slab 0")
782        })?;
783
784        Ok(Self {
785            base_file_path: base_file_path.to_path_buf(),
786            main_header,
787            current_file: file,
788            current_mmap_buffer: mmap,
789            current_slab_index: 0,
790            current_reading_position: prolog as usize,
791        })
792    }
793
794    /// Current cursor position (start of next section header).
795    pub fn position(&self) -> LogPosition {
796        LogPosition {
797            slab_index: self.current_slab_index,
798            offset: self.current_reading_position,
799        }
800    }
801
802    /// Seek to an absolute position (start of a section header).
803    pub fn seek(&mut self, pos: LogPosition) -> CuResult<()> {
804        if pos.slab_index != self.current_slab_index {
805            let (file, mmap, _prolog, _header) =
806                open_slab_index(&self.base_file_path, pos.slab_index).map_err(|e| {
807                    CuError::new_with_cause(
808                        &format!("Failed to open slab {} for seek", pos.slab_index),
809                        e,
810                    )
811                })?;
812            self.current_file = file;
813            self.current_mmap_buffer = mmap;
814            self.current_slab_index = pos.slab_index;
815        }
816        self.current_reading_position = pos.offset;
817        Ok(())
818    }
819
820    fn next_slab(&mut self) -> io::Result<()> {
821        self.current_slab_index += 1;
822        let (file, mmap, prolog, _) =
823            open_slab_index(&self.base_file_path, self.current_slab_index)?;
824        self.current_file = file;
825        self.current_mmap_buffer = mmap;
826        self.current_reading_position = prolog as usize;
827        Ok(())
828    }
829
830    pub fn raw_main_header(&self) -> &MainHeader {
831        &self.main_header
832    }
833
834    pub fn scan_section_bytes(&mut self, datalogtype: UnifiedLogType) -> CuResult<u64> {
835        let mut total = 0u64;
836
837        loop {
838            if self.current_reading_position >= self.current_mmap_buffer.len() {
839                self.next_slab().map_err(|e| {
840                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
841                })?;
842            }
843
844            let header = self.read_section_header()?;
845
846            if header.entry_type == UnifiedLogType::LastEntry {
847                return Ok(total);
848            }
849
850            if header.entry_type == datalogtype {
851                total = total.saturating_add(header.used as u64);
852            }
853
854            self.current_reading_position += header.offset_to_next_section as usize;
855        }
856    }
857
858    /// Reads the section content from the section header pos.
859    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
860        // TODO: we could optimize by asking the buffer to fill
861        let mut section_data = vec![0; header.used as usize];
862        let start_of_data = self.current_reading_position + header.block_size as usize;
863        section_data.copy_from_slice(
864            &self.current_mmap_buffer[start_of_data..start_of_data + header.used as usize],
865        );
866
867        Ok(section_data)
868    }
869
870    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
871        let section_header: SectionHeader;
872        (section_header, _) = decode_from_slice(
873            &self.current_mmap_buffer[self.current_reading_position..],
874            standard(),
875        )
876        .map_err(|e| {
877            CuError::new_with_cause(
878                &format!(
879                    "Could not read a sections header: {}/{}:{}",
880                    self.base_file_path.as_os_str().to_string_lossy(),
881                    self.current_slab_index,
882                    self.current_reading_position,
883                ),
884                e,
885            )
886        })?;
887        if section_header.magic != SECTION_MAGIC {
888            return Err("Invalid magic number in section header".into());
889        }
890
891        Ok(section_header)
892    }
893}
894
895/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
896pub struct UnifiedLoggerIOReader {
897    logger: MmapUnifiedLoggerRead,
898    log_type: UnifiedLogType,
899    buffer: Vec<u8>,
900    buffer_pos: usize,
901}
902
903impl UnifiedLoggerIOReader {
904    pub fn new(logger: MmapUnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
905        Self {
906            logger,
907            log_type,
908            buffer: Vec::new(),
909            buffer_pos: 0,
910        }
911    }
912
913    /// returns true if there is more data to read.
914    fn fill_buffer(&mut self) -> io::Result<bool> {
915        match self.logger.read_next_section_type(self.log_type) {
916            Ok(Some(section)) => {
917                self.buffer = section;
918                self.buffer_pos = 0;
919                Ok(true)
920            }
921            Ok(None) => Ok(false), // No more sections of this type
922            Err(e) => Err(io::Error::other(e.to_string())),
923        }
924    }
925}
926
927impl Read for UnifiedLoggerIOReader {
928    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
929        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
930            // This means we hit the last section.
931            return Ok(0);
932        }
933
934        // If we still have no data after trying to fill the buffer, we're at EOF
935        if self.buffer_pos >= self.buffer.len() {
936            return Ok(0);
937        }
938
939        // Copy as much as we can from the buffer to `buf`
940        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
941        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
942        self.buffer_pos += len;
943        Ok(len)
944    }
945}
946
947#[cfg(feature = "std")]
948#[cfg(test)]
949mod tests {
950    use super::*;
951    use crate::stream_write;
952    use bincode::de::read::SliceReader;
953    use bincode::{Decode, Encode, decode_from_reader, decode_from_slice};
954    use cu29_traits::WriteStream;
955    use std::path::PathBuf;
956    use std::sync::{Arc, Mutex};
957    use tempfile::TempDir;
958
959    const LARGE_SLAB: usize = 100 * 1024; // 100KB
960    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
961
962    fn make_a_logger(
963        tmp_dir: &TempDir,
964        slab_size: usize,
965    ) -> (Arc<Mutex<MmapUnifiedLoggerWrite>>, PathBuf) {
966        let file_path = tmp_dir.path().join("test.bin");
967        let MmapUnifiedLogger::Write(data_logger) = MmapUnifiedLoggerBuilder::new()
968            .write(true)
969            .create(true)
970            .file_base_name(&file_path)
971            .preallocated_size(slab_size)
972            .build()
973            .expect("Failed to create logger")
974        else {
975            panic!("Failed to create logger")
976        };
977
978        (Arc::new(Mutex::new(data_logger)), file_path)
979    }
980
981    #[test]
982    fn test_truncation_and_sections_creations() {
983        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
984        let file_path = tmp_dir.path().join("test.bin");
985        let _used = {
986            let MmapUnifiedLogger::Write(mut logger) = MmapUnifiedLoggerBuilder::new()
987                .write(true)
988                .create(true)
989                .file_base_name(&file_path)
990                .preallocated_size(100000)
991                .build()
992                .expect("Failed to create logger")
993            else {
994                panic!("Failed to create logger")
995            };
996            logger
997                .add_section(UnifiedLogType::StructuredLogLine, 1024)
998                .unwrap();
999            logger
1000                .add_section(UnifiedLogType::CopperList, 2048)
1001                .unwrap();
1002            let used = logger.front_slab.used();
1003            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
1004            // logger drops
1005
1006            used
1007        };
1008
1009        let _file = OpenOptions::new()
1010            .read(true)
1011            .open(tmp_dir.path().join("test_0.bin"))
1012            .expect("Could not reopen the file");
1013        // Check if we have correctly truncated the file
1014        // TODO: recompute this math
1015        //assert_eq!(
1016        //    file.metadata().unwrap().len(),
1017        //    (used + size_of::<SectionHeader>()) as u64
1018        //);
1019    }
1020
1021    #[test]
1022    fn test_base_alias_exists_and_matches_first_slab() {
1023        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1024        let file_path = tmp_dir.path().join("test.bin");
1025        let _logger = MmapUnifiedLoggerBuilder::new()
1026            .write(true)
1027            .create(true)
1028            .file_base_name(&file_path)
1029            .preallocated_size(LARGE_SLAB)
1030            .build()
1031            .expect("Failed to create logger");
1032
1033        let first_slab = build_slab_path(&file_path, 0).expect("Failed to build first slab path");
1034        assert!(file_path.exists(), "base alias does not exist");
1035        assert!(first_slab.exists(), "first slab does not exist");
1036
1037        let alias_bytes = std::fs::read(&file_path).expect("Failed to read base alias");
1038        let slab_bytes = std::fs::read(&first_slab).expect("Failed to read first slab");
1039        assert_eq!(alias_bytes, slab_bytes);
1040    }
1041
1042    #[test]
1043    fn test_one_section_self_cleaning() {
1044        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1045        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1046        {
1047            let _stream = stream_write::<(), MmapSectionStorage>(
1048                logger.clone(),
1049                UnifiedLogType::StructuredLogLine,
1050                1024,
1051            );
1052            assert_eq!(
1053                logger
1054                    .lock()
1055                    .unwrap()
1056                    .front_slab
1057                    .sections_offsets_in_flight
1058                    .len(),
1059                1
1060            );
1061        }
1062        assert_eq!(
1063            logger
1064                .lock()
1065                .unwrap()
1066                .front_slab
1067                .sections_offsets_in_flight
1068                .len(),
1069            0
1070        );
1071        let logger = logger.lock().unwrap();
1072        assert_eq!(
1073            logger.front_slab.flushed_until_offset,
1074            logger.front_slab.current_global_position
1075        );
1076    }
1077
1078    #[test]
1079    fn test_temporary_end_marker_is_created() {
1080        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1081        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1082        {
1083            let mut stream = stream_write::<u32, MmapSectionStorage>(
1084                logger.clone(),
1085                UnifiedLogType::StructuredLogLine,
1086                1024,
1087            )
1088            .unwrap();
1089            stream.log(&42u32).unwrap();
1090        }
1091
1092        let logger_guard = logger.lock().unwrap();
1093        let slab = &logger_guard.front_slab;
1094        let marker_start = slab
1095            .temporary_end_marker
1096            .expect("temporary end-of-log marker missing");
1097        let (eof_header, _) =
1098            decode_from_slice::<SectionHeader, _>(&slab.mmap_buffer[marker_start..], standard())
1099                .expect("Could not decode end-of-log marker header");
1100        assert_eq!(eof_header.entry_type, UnifiedLogType::LastEntry);
1101        assert!(eof_header.is_open);
1102        assert_eq!(eof_header.used, 0);
1103    }
1104
1105    #[test]
1106    fn test_final_end_marker_is_not_temporary() {
1107        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1108        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1109        {
1110            let mut stream = stream_write::<u32, MmapSectionStorage>(
1111                logger.clone(),
1112                UnifiedLogType::CopperList,
1113                1024,
1114            )
1115            .unwrap();
1116            stream.log(&1u32).unwrap();
1117        }
1118        drop(logger);
1119
1120        let MmapUnifiedLogger::Read(mut reader) = MmapUnifiedLoggerBuilder::new()
1121            .file_base_name(&f)
1122            .build()
1123            .expect("Failed to build reader")
1124        else {
1125            panic!("Failed to create reader");
1126        };
1127
1128        loop {
1129            let (header, _data) = reader
1130                .raw_read_section()
1131                .expect("Failed to read section while searching for EOF");
1132            if header.entry_type == UnifiedLogType::LastEntry {
1133                assert!(!header.is_open);
1134                break;
1135            }
1136        }
1137    }
1138
1139    #[test]
1140    fn test_two_sections_self_cleaning_in_order() {
1141        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1142        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1143        let s1 = stream_write::<(), MmapSectionStorage>(
1144            logger.clone(),
1145            UnifiedLogType::StructuredLogLine,
1146            1024,
1147        );
1148        assert_eq!(
1149            logger
1150                .lock()
1151                .unwrap()
1152                .front_slab
1153                .sections_offsets_in_flight
1154                .len(),
1155            1
1156        );
1157        let s2 = stream_write::<(), MmapSectionStorage>(
1158            logger.clone(),
1159            UnifiedLogType::StructuredLogLine,
1160            1024,
1161        );
1162        assert_eq!(
1163            logger
1164                .lock()
1165                .unwrap()
1166                .front_slab
1167                .sections_offsets_in_flight
1168                .len(),
1169            2
1170        );
1171        drop(s2);
1172        assert_eq!(
1173            logger
1174                .lock()
1175                .unwrap()
1176                .front_slab
1177                .sections_offsets_in_flight
1178                .len(),
1179            1
1180        );
1181        drop(s1);
1182        let lg = logger.lock().unwrap();
1183        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1184        assert_eq!(
1185            lg.front_slab.flushed_until_offset,
1186            lg.front_slab.current_global_position
1187        );
1188    }
1189
1190    #[test]
1191    fn test_two_sections_self_cleaning_out_of_order() {
1192        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1193        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
1194        let s1 = stream_write::<(), MmapSectionStorage>(
1195            logger.clone(),
1196            UnifiedLogType::StructuredLogLine,
1197            1024,
1198        );
1199        assert_eq!(
1200            logger
1201                .lock()
1202                .unwrap()
1203                .front_slab
1204                .sections_offsets_in_flight
1205                .len(),
1206            1
1207        );
1208        let s2 = stream_write::<(), MmapSectionStorage>(
1209            logger.clone(),
1210            UnifiedLogType::StructuredLogLine,
1211            1024,
1212        );
1213        assert_eq!(
1214            logger
1215                .lock()
1216                .unwrap()
1217                .front_slab
1218                .sections_offsets_in_flight
1219                .len(),
1220            2
1221        );
1222        drop(s1);
1223        assert_eq!(
1224            logger
1225                .lock()
1226                .unwrap()
1227                .front_slab
1228                .sections_offsets_in_flight
1229                .len(),
1230            1
1231        );
1232        drop(s2);
1233        let lg = logger.lock().unwrap();
1234        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
1235        assert_eq!(
1236            lg.front_slab.flushed_until_offset,
1237            lg.front_slab.current_global_position
1238        );
1239    }
1240
1241    #[test]
1242    fn test_write_then_read_one_section() {
1243        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1244        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1245        {
1246            let mut stream =
1247                stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024).unwrap();
1248            stream.log(&1u32).unwrap();
1249            stream.log(&2u32).unwrap();
1250            stream.log(&3u32).unwrap();
1251        }
1252        drop(logger);
1253        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1254            .file_base_name(&f)
1255            .build()
1256            .expect("Failed to build logger")
1257        else {
1258            panic!("Failed to build logger");
1259        };
1260        let section = dl
1261            .read_next_section_type(UnifiedLogType::StructuredLogLine)
1262            .expect("Failed to read section");
1263        assert!(section.is_some());
1264        let section = section.unwrap();
1265        let mut reader = SliceReader::new(&section[..]);
1266        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1267        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1268        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
1269        assert_eq!(v1, 1);
1270        assert_eq!(v2, 2);
1271        assert_eq!(v3, 3);
1272    }
1273
1274    /// Mimic a basic CopperList implementation.
1275
1276    #[derive(Debug, Encode, Decode)]
1277    enum CopperListStateMock {
1278        Free,
1279        ProcessingTasks,
1280        BeingSerialized,
1281    }
1282
1283    #[derive(Encode, Decode)]
1284    struct CopperList<P: bincode::enc::Encode> {
1285        state: CopperListStateMock,
1286        payload: P, // This is generated from the runtime.
1287    }
1288
1289    #[test]
1290    fn test_copperlist_list_like_logging() {
1291        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1292        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
1293        {
1294            let mut stream =
1295                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1296            let cl0 = CopperList {
1297                state: CopperListStateMock::Free,
1298                payload: (1u32, 2u32, 3u32),
1299            };
1300            let cl1 = CopperList {
1301                state: CopperListStateMock::ProcessingTasks,
1302                payload: (4u32, 5u32, 6u32),
1303            };
1304            stream.log(&cl0).unwrap();
1305            stream.log(&cl1).unwrap();
1306        }
1307        drop(logger);
1308
1309        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1310            .file_base_name(&f)
1311            .build()
1312            .expect("Failed to build logger")
1313        else {
1314            panic!("Failed to build logger");
1315        };
1316        let section = dl
1317            .read_next_section_type(UnifiedLogType::CopperList)
1318            .expect("Failed to read section");
1319        assert!(section.is_some());
1320        let section = section.unwrap();
1321
1322        let mut reader = SliceReader::new(&section[..]);
1323        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1324        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1325        assert_eq!(cl0.payload.1, 2);
1326        assert_eq!(cl1.payload.2, 6);
1327    }
1328
1329    #[test]
1330    fn test_multi_slab_end2end() {
1331        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1332        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1333        {
1334            let mut stream =
1335                stream_write(logger.clone(), UnifiedLogType::CopperList, 1024).unwrap();
1336            let cl0 = CopperList {
1337                state: CopperListStateMock::Free,
1338                payload: (1u32, 2u32, 3u32),
1339            };
1340            // large enough so we are sure to create a few slabs
1341            for _ in 0..10000 {
1342                stream.log(&cl0).unwrap();
1343            }
1344        }
1345        drop(logger);
1346
1347        let MmapUnifiedLogger::Read(mut dl) = MmapUnifiedLoggerBuilder::new()
1348            .file_base_name(&f)
1349            .build()
1350            .expect("Failed to build logger")
1351        else {
1352            panic!("Failed to build logger");
1353        };
1354        let mut total_readback = 0;
1355        loop {
1356            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1357            if section.is_err() {
1358                break;
1359            }
1360            let section = section.unwrap();
1361            if section.is_none() {
1362                break;
1363            }
1364            let section = section.unwrap();
1365
1366            let mut reader = SliceReader::new(&section[..]);
1367            loop {
1368                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1369                    decode_from_reader(&mut reader, standard());
1370                if maybe_cl.is_ok() {
1371                    total_readback += 1;
1372                } else {
1373                    break;
1374                }
1375            }
1376        }
1377        assert_eq!(total_readback, 10000);
1378    }
1379}