cu29_unifiedlog/
lib.rs

1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22/// The main file header of the datalogger.
23#[derive(Encode, Decode, Debug)]
24struct MainHeader {
25    magic: [u8; 4],            // Magic number to identify the file.
26    first_section_offset: u16, // This is to align with a page at write time.
27    page_size: u16,
28}
29
30/// Each concurrent sublogger is tracked through a section header.
31/// They form a linked list of sections.
32/// The entry type is used to identify the type of data in the section.
33#[derive(Encode, Decode, Debug)]
34pub struct SectionHeader {
35    magic: [u8; 2], // Magic number to identify the section.
36    entry_type: UnifiedLogType,
37    section_size: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
38    filled_size: u32,  // how much of the section is filled.
39}
40
41const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; // 3 == additional worse case scenario for the 3 int variable encoding
42
43impl Default for SectionHeader {
44    fn default() -> Self {
45        Self {
46            magic: SECTION_MAGIC,
47            entry_type: UnifiedLogType::Empty,
48            section_size: 0,
49            filled_size: 0,
50        }
51    }
52}
53
54/// A wrapper around a memory mapped file to write to.
55struct MmapStream {
56    entry_type: UnifiedLogType,
57    parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58    current_section: SectionHandle,
59    current_position: usize,
60    minimum_allocation_amount: usize,
61}
62
63impl MmapStream {
64    fn new(
65        entry_type: UnifiedLogType,
66        parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
67        minimum_allocation_amount: usize,
68    ) -> Self {
69        let section = parent_logger
70            .lock()
71            .unwrap()
72            .add_section(entry_type, minimum_allocation_amount);
73        Self {
74            entry_type,
75            parent_logger,
76            current_section: section,
77            current_position: 0,
78            minimum_allocation_amount,
79        }
80    }
81}
82
83impl Debug for MmapStream {
84    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85        write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
86    }
87}
88
89impl<E: Encode> WriteStream<E> for MmapStream {
90    fn log(&mut self, obj: &E) -> CuResult<()> {
91        let dst = self.current_section.get_user_buffer();
92        let result = encode_into_slice(obj, dst, standard());
93        match result {
94            Ok(nb_bytes) => {
95                self.current_position += nb_bytes;
96                self.current_section.used += nb_bytes as u32;
97                Ok(())
98            }
99            Err(e) => match e {
100                EncodeError::UnexpectedEnd => {
101                    let mut logger_guard = self.parent_logger.lock().unwrap();
102                    logger_guard.flush_section(&mut self.current_section);
103                    self.current_section =
104                        logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);
105
106                    let result = encode_into_slice(
107                        obj,
108                        self.current_section.get_user_buffer(),
109                        standard(),
110                    )
111                    .expect(
112                        "Failed to encode object in a newly minted section. Unrecoverable failure.",
113                    ); // If we fail just after creating a section, there is not much we can do, we need to bail.
114                    self.current_position += result;
115                    self.current_section.used += result as u32;
116                    Ok(())
117                }
118                _ => {
119                    let err =
120                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
121                            .add_cause(e.to_string().as_str());
122                    Err(err)
123                }
124            },
125        }
126    }
127}
128
129impl Drop for MmapStream {
130    fn drop(&mut self) {
131        let mut logger_guard = self.parent_logger.lock().unwrap();
132        logger_guard.flush_section(&mut self.current_section);
133    }
134}
135
136/// Create a new stream to write to the unifiedlogger.
137pub fn stream_write<E: Encode>(
138    logger: Arc<Mutex<UnifiedLoggerWrite>>,
139    entry_type: UnifiedLogType,
140    minimum_allocation_amount: usize,
141) -> impl WriteStream<E> {
142    MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
143}
144
145/// Holder of the read or write side of the datalogger.
146pub enum UnifiedLogger {
147    Read(UnifiedLoggerRead),
148    Write(UnifiedLoggerWrite),
149}
150
151/// Use this builder to create a new DataLogger.
152pub struct UnifiedLoggerBuilder {
153    file_base_name: Option<PathBuf>,
154    preallocated_size: Option<usize>,
155    write: bool,
156    create: bool,
157}
158
159impl Default for UnifiedLoggerBuilder {
160    fn default() -> Self {
161        Self::new()
162    }
163}
164
165impl UnifiedLoggerBuilder {
166    pub fn new() -> Self {
167        Self {
168            file_base_name: None,
169            preallocated_size: None,
170            write: false,
171            create: false, // This is the safest default
172        }
173    }
174
175    /// If "something/toto.copper" is given, it will find or create "something/toto_0.copper",  "something/toto_1.copper" etc.
176    pub fn file_base_name(mut self, file_path: &Path) -> Self {
177        self.file_base_name = Some(file_path.to_path_buf());
178        self
179    }
180
181    pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
182        self.preallocated_size = Some(preallocated_size);
183        self
184    }
185
186    pub fn write(mut self, write: bool) -> Self {
187        self.write = write;
188        self
189    }
190
191    pub fn create(mut self, create: bool) -> Self {
192        self.create = create;
193        self
194    }
195
196    pub fn build(self) -> io::Result<UnifiedLogger> {
197        let page_size = page_size::get();
198
199        if self.write && self.create {
200            let ulw = UnifiedLoggerWrite::new(
201                &self.file_base_name.unwrap(),
202                self.preallocated_size.unwrap(),
203                page_size,
204            );
205
206            Ok(UnifiedLogger::Write(ulw))
207        } else {
208            let file_path = self.file_base_name.ok_or_else(|| {
209                io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
210            })?;
211            let ulr = UnifiedLoggerRead::new(&file_path)?;
212            Ok(UnifiedLogger::Read(ulr))
213        }
214    }
215}
216
217/// A read side of the datalogger.
218pub struct UnifiedLoggerRead {
219    base_file_path: PathBuf,
220    current_mmap_buffer: Mmap,
221    current_file: File,
222    current_slab_index: usize,
223    current_reading_position: usize,
224}
225
226struct SlabEntry {
227    file: File,
228    mmap_buffer: ManuallyDrop<MmapMut>,
229    current_global_position: usize,
230    sections_offsets_in_flight: Vec<usize>,
231    flushed_until_offset: usize,
232    page_size: usize,
233}
234
235impl Drop for SlabEntry {
236    fn drop(&mut self) {
237        self.flush_until(self.current_global_position);
238        unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
239        self.file
240            .set_len(self.current_global_position as u64)
241            .expect("Failed to trim datalogger file");
242
243        if !self.sections_offsets_in_flight.is_empty() {
244            eprintln!("Error: Slab not full flushed.");
245        }
246    }
247}
248
249pub enum AllocatedSection {
250    NoMoreSpace,
251    Section(SectionHandle),
252}
253
254impl SlabEntry {
255    fn new(file: File, page_size: usize) -> Self {
256        let mmap_buffer =
257            ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
258        Self {
259            file,
260            mmap_buffer,
261            current_global_position: 0,
262            sections_offsets_in_flight: Vec::with_capacity(16),
263            flushed_until_offset: 0,
264            page_size,
265        }
266    }
267
268    /// Unsure the underlying mmap is flush to disk until the given position.
269    fn flush_until(&mut self, until_position: usize) {
270        // This is tolerated under linux, but crashes on macos
271        if (self.flushed_until_offset == until_position) || (until_position == 0) {
272            return;
273        }
274        self.mmap_buffer
275            .flush_async_range(
276                self.flushed_until_offset,
277                until_position - self.flushed_until_offset,
278            )
279            .expect("Failed to flush memory map");
280        self.flushed_until_offset = until_position;
281    }
282
283    fn is_it_my_section(&self, section: &SectionHandle) -> bool {
284        (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
285            && (section.buffer.as_ptr() as usize)
286                < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
287    }
288
289    /// Flush the section to disk.
290    /// the flushing is permanent and the section is considered closed.
291    fn flush_section(&mut self, section: &mut SectionHandle) {
292        if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
293            || section.buffer.as_ptr() as usize
294                > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
295        {
296            panic!("Invalid section buffer, not in the slab");
297        }
298
299        // Be sure that the header reflects the actual size of the section.
300        section.update_header();
301
302        let _sz = encode_into_slice(&section.section_header, section.buffer, standard())
303            .expect("Failed to encode section header");
304
305        let base = self.mmap_buffer.as_ptr() as usize;
306        let section_buffer_addr = section.buffer.as_ptr() as usize;
307        self.sections_offsets_in_flight
308            .retain(|&x| x != section_buffer_addr - base);
309
310        if self.sections_offsets_in_flight.is_empty() {
311            self.flush_until(self.current_global_position);
312            return;
313        }
314        if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
315            self.flush_until(self.sections_offsets_in_flight[0]);
316        }
317    }
318
319    #[inline]
320    fn align_to_next_page(&self, ptr: usize) -> usize {
321        (ptr + self.page_size - 1) & !(self.page_size - 1)
322    }
323
324    /// The returned slice is section_size or greater.
325    fn add_section(
326        &mut self,
327        entry_type: UnifiedLogType,
328        requested_section_size: usize,
329    ) -> AllocatedSection {
330        // align current_position to the next page
331        self.current_global_position = self.align_to_next_page(self.current_global_position);
332        let section_size = self.align_to_next_page(requested_section_size) as u32;
333
334        // We need to have enough space to store the section in that slab
335        if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
336            return AllocatedSection::NoMoreSpace;
337        }
338
339        let section_header = SectionHeader {
340            magic: SECTION_MAGIC,
341            entry_type,
342            section_size,
343            filled_size: 0u32,
344        };
345
346        let nb_bytes = encode_into_slice(
347            &section_header,
348            &mut self.mmap_buffer[self.current_global_position..],
349            standard(),
350        )
351        .expect("Failed to encode section header");
352        assert!(nb_bytes < self.page_size);
353
354        // save the position to keep track for in flight sections
355        self.sections_offsets_in_flight
356            .push(self.current_global_position);
357        let end_of_section = self.current_global_position + requested_section_size;
358        let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
359
360        // here we have the guarantee for exclusive access to that memory for the lifetime of the handle, the borrow checker cannot understand that ever.
361        let handle_buffer =
362            unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
363
364        self.current_global_position = end_of_section;
365
366        AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
367    }
368
369    #[cfg(test)]
370    fn used(&self) -> usize {
371        self.current_global_position
372    }
373}
374
375/// A SectionHandle is a handle to a section in the datalogger.
376/// It allows to track the lifecycle of a section of the datalogger.
377#[derive(Default)]
378pub struct SectionHandle {
379    section_header: SectionHeader,
380    buffer: &'static mut [u8], // This includes the encoded header for end of section patching.
381    used: u32,                 // this is the size of the used part of the buffer.
382}
383
384// This is for a placeholder to unsure an orderly cleanup as we dodge the borrow checker.
385
386impl SectionHandle {
387    // The buffer is considered static as it is a dedicated piece for the section.
388    pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
389        // here we assume with are passed a valid section.
390        if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
391            panic!("Invalid section buffer, magic number not found");
392        }
393
394        if buffer.len() < MAX_HEADER_SIZE {
395            panic!(
396                "Invalid section buffer, too small: {}, it needs to be > {}",
397                buffer.len(),
398                MAX_HEADER_SIZE
399            );
400        }
401
402        Self {
403            section_header,
404            buffer,
405            used: 0,
406        }
407    }
408    pub fn get_user_buffer(&mut self) -> &mut [u8] {
409        &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
410    }
411
412    pub fn update_header(&mut self) {
413        // no need to do anything if we never used the section.
414        if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
415            return;
416        }
417        self.section_header.filled_size = self.used;
418
419        // FIX ME: This was flushed before and cannot be written back to.
420        // let _sz = encode_into_slice(&self.section_header, &mut self.buffer, standard())
421        //     .expect("Failed to encode section header");
422    }
423}
424
425/// A write side of the datalogger.
426pub struct UnifiedLoggerWrite {
427    /// the front slab is the current active slab for any new section.
428    front_slab: SlabEntry,
429    /// the back slab is the previous slab that is being flushed.
430    back_slabs: Vec<SlabEntry>,
431    /// base file path to create the backing files from.
432    base_file_path: PathBuf,
433    /// allocation size for the backing files.
434    slab_size: usize,
435    /// current suffix for the backing files.
436    front_slab_suffix: usize,
437}
438
439fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
440    let mut file_path = base_file_path.to_path_buf();
441    let file_name = file_path.file_name().unwrap().to_str().unwrap();
442    let mut file_name = file_name.split('.').collect::<Vec<&str>>();
443    let extension = file_name.pop().unwrap();
444    let file_name = file_name.join(".");
445    let file_name = format!("{file_name}_{slab_index}.{extension}");
446    file_path.set_file_name(file_name);
447    file_path
448}
449
450fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
451    let file_path = build_slab_path(base_file_path, slab_suffix);
452    let file = OpenOptions::new()
453        .read(true)
454        .write(true)
455        .create(true)
456        .truncate(true)
457        .open(&file_path)
458        .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
459    file.set_len(slab_size as u64)
460        .expect("Failed to set file length");
461    file
462}
463
464impl UnifiedLoggerWrite {
465    fn next_slab(&mut self) -> File {
466        self.front_slab_suffix += 1;
467
468        make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
469    }
470
471    fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
472        let file = make_slab_file(base_file_path, slab_size, 0);
473        let mut front_slab = SlabEntry::new(file, page_size);
474
475        // This is the first slab so add the main header.
476        let main_header = MainHeader {
477            magic: MAIN_MAGIC,
478            first_section_offset: page_size as u16,
479            page_size: page_size as u16,
480        };
481        let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
482            .expect("Failed to encode main header");
483        assert!(nb_bytes < page_size);
484        front_slab.current_global_position = page_size; // align to the next page
485
486        Self {
487            front_slab,
488            back_slabs: Vec::new(),
489            base_file_path: base_file_path.to_path_buf(),
490            slab_size,
491            front_slab_suffix: 0,
492        }
493    }
494
495    pub fn flush_section(&mut self, section: &mut SectionHandle) {
496        for slab in self.back_slabs.iter_mut() {
497            if slab.is_it_my_section(section) {
498                slab.flush_section(section);
499                return;
500            }
501        }
502        self.front_slab.flush_section(section);
503    }
504
505    fn garbage_collect_backslabs(&mut self) {
506        self.back_slabs
507            .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
508    }
509
510    /// The returned slice is section_size or greater.
511    fn add_section(
512        &mut self,
513        entry_type: UnifiedLogType,
514        requested_section_size: usize,
515    ) -> SectionHandle {
516        self.garbage_collect_backslabs(); // Take the opportunity to keep up and close stale back slabs.
517
518        let maybe_section = self
519            .front_slab
520            .add_section(entry_type, requested_section_size);
521
522        match maybe_section {
523            AllocatedSection::NoMoreSpace => {
524                // move the front slab to the back slab.
525                let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
526                // keep the slab until all its sections has been flushed.
527                self.back_slabs
528                    .push(mem::replace(&mut self.front_slab, new_slab));
529                match self
530                    .front_slab
531                    .add_section(entry_type, requested_section_size)
532                {
533                    AllocatedSection::NoMoreSpace => {
534                        panic!("Failed to allocate a section in a new slab");
535                    }
536                    AllocatedSection::Section(section) => section,
537                }
538            }
539            AllocatedSection::Section(section) => section,
540        }
541    }
542
543    pub fn stats(&self) -> (usize, Vec<usize>, usize) {
544        (
545            self.front_slab.current_global_position,
546            self.front_slab.sections_offsets_in_flight.clone(),
547            self.back_slabs.len(),
548        )
549    }
550}
551
552impl Drop for UnifiedLoggerWrite {
553    fn drop(&mut self) {
554        let mut section = self.add_section(UnifiedLogType::LastEntry, 80); // TODO: determine that exactly
555        self.front_slab.flush_section(&mut section);
556        self.garbage_collect_backslabs();
557    }
558}
559
560fn open_slab_index(base_file_path: &Path, slab_index: usize) -> io::Result<(File, Mmap, u16)> {
561    let mut options = OpenOptions::new();
562    let options = options.read(true);
563
564    let file_path = build_slab_path(base_file_path, slab_index);
565    let file = options.open(file_path)?;
566    let mmap = unsafe { Mmap::map(&file) }?;
567    let mut prolog = 0u16;
568    if slab_index == 0 {
569        let main_header: MainHeader;
570        let _read: usize;
571        (main_header, _read) =
572            decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
573        if main_header.magic != MAIN_MAGIC {
574            return Err(io::Error::new(
575                io::ErrorKind::InvalidData,
576                "Invalid magic number in main header",
577            ));
578        }
579        prolog = main_header.first_section_offset;
580    }
581    Ok((file, mmap, prolog))
582}
583
584impl UnifiedLoggerRead {
585    pub fn new(base_file_path: &Path) -> io::Result<Self> {
586        let (file, mmap, prolog) = open_slab_index(base_file_path, 0)?;
587
588        Ok(Self {
589            base_file_path: base_file_path.to_path_buf(),
590            current_file: file,
591            current_mmap_buffer: mmap,
592            current_slab_index: 0,
593            current_reading_position: prolog as usize,
594        })
595    }
596
597    fn next_slab(&mut self) -> io::Result<()> {
598        self.current_slab_index += 1;
599        let (file, mmap, prolog) = open_slab_index(&self.base_file_path, self.current_slab_index)?;
600        self.current_file = file;
601        self.current_mmap_buffer = mmap;
602        self.current_reading_position = prolog as usize;
603        Ok(())
604    }
605
606    pub fn read_next_section_type(
607        &mut self,
608        datalogtype: UnifiedLogType,
609    ) -> CuResult<Option<Vec<u8>>> {
610        // TODO: eventually implement a 0 copy of this too.
611        loop {
612            if self.current_reading_position >= self.current_mmap_buffer.len() {
613                self.next_slab().map_err(|e| {
614                    CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
615                })?;
616            }
617
618            let header_result = self.read_section_header();
619            if let Err(error) = header_result {
620                return Err(CuError::new_with_cause(
621                    "Could not read a sections header",
622                    error,
623                ));
624            };
625            let header = header_result.unwrap();
626
627            // Reached the end of file
628            if header.entry_type == UnifiedLogType::LastEntry {
629                return Ok(None);
630            }
631
632            // Found a section of the requested type
633            if header.entry_type == datalogtype {
634                let result = Some(self.read_section_content(&header)?);
635                self.current_reading_position += header.section_size as usize;
636                return Ok(result);
637            }
638
639            // Keep reading until we find the requested type
640            self.current_reading_position += header.section_size as usize;
641        }
642    }
643
644    /// Reads the section from the section header pos.
645    pub fn read_section(&mut self) -> CuResult<Vec<u8>> {
646        let read_result = self.read_section_header();
647        if let Err(error) = read_result {
648            return Err(CuError::new_with_cause(
649                "Could not read a sections header",
650                error,
651            ));
652        };
653
654        self.read_section_content(&read_result.unwrap())
655    }
656
657    /// Reads the section content from the section header pos.
658    fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
659        // TODO: we could optimize by asking the buffer to fill
660        if header.filled_size == 0 {
661            eprintln!("Warning: read an empty section");
662        }
663        let mut section = vec![0; header.filled_size as usize];
664        let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
665        section.copy_from_slice(
666            &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
667        );
668
669        Ok(section)
670    }
671
672    fn read_section_header(&mut self) -> CuResult<SectionHeader> {
673        let section_header: SectionHeader;
674        (section_header, _) = decode_from_slice(
675            &self.current_mmap_buffer[self.current_reading_position..],
676            standard(),
677        )
678        .expect("Failed to decode section header");
679        if section_header.magic != SECTION_MAGIC {
680            return Err("Invalid magic number in section header".into());
681        }
682        Ok(section_header)
683    }
684}
685
686/// This a convenience wrapper around the UnifiedLoggerRead to implement the Read trait.
687pub struct UnifiedLoggerIOReader {
688    logger: UnifiedLoggerRead,
689    log_type: UnifiedLogType,
690    buffer: Vec<u8>,
691    buffer_pos: usize,
692}
693
694impl UnifiedLoggerIOReader {
695    pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
696        Self {
697            logger,
698            log_type,
699            buffer: Vec::new(),
700            buffer_pos: 0,
701        }
702    }
703
704    /// returns true if there is more data to read.
705    fn fill_buffer(&mut self) -> io::Result<bool> {
706        match self.logger.read_next_section_type(self.log_type) {
707            Ok(Some(section)) => {
708                self.buffer = section;
709                self.buffer_pos = 0;
710                Ok(true)
711            }
712            Ok(None) => Ok(false), // No more sections of this type
713            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
714        }
715    }
716}
717
718impl Read for UnifiedLoggerIOReader {
719    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
720        if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
721            // This means we hit the last section.
722            return Ok(0);
723        }
724
725        // If we still have no data after trying to fill the buffer, we're at EOF
726        if self.buffer_pos >= self.buffer.len() {
727            return Ok(0);
728        }
729
730        // Copy as much as we can from the buffer to `buf`
731        let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
732        buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
733        self.buffer_pos += len;
734        Ok(len)
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use super::*;
741    use bincode::decode_from_reader;
742    use std::io::BufReader;
743    use std::path::PathBuf;
744    use tempfile::TempDir;
745
746    const LARGE_SLAB: usize = 100 * 1024; // 100KB
747    const SMALL_SLAB: usize = 16 * 2 * 1024; // 16KB is the page size on MacOS for example
748
749    fn make_a_logger(
750        tmp_dir: &TempDir,
751        slab_size: usize,
752    ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
753        let file_path = tmp_dir.path().join("test.bin");
754        let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
755            .write(true)
756            .create(true)
757            .file_base_name(&file_path)
758            .preallocated_size(slab_size)
759            .build()
760            .expect("Failed to create logger")
761        else {
762            panic!("Failed to create logger")
763        };
764
765        (Arc::new(Mutex::new(data_logger)), file_path)
766    }
767
768    #[test]
769    fn test_truncation_and_sections_creations() {
770        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
771        let file_path = tmp_dir.path().join("test.bin");
772        let _used = {
773            let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
774                .write(true)
775                .create(true)
776                .file_base_name(&file_path)
777                .preallocated_size(100000)
778                .build()
779                .expect("Failed to create logger")
780            else {
781                panic!("Failed to create logger")
782            };
783            logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
784            logger.add_section(UnifiedLogType::CopperList, 2048);
785            let used = logger.front_slab.used();
786            assert!(used < 4 * page_size::get()); // ie. 3 headers, 1 page max per
787                                                  // logger drops
788
789            used
790        };
791
792        let _file = OpenOptions::new()
793            .read(true)
794            .open(tmp_dir.path().join("test_0.bin"))
795            .expect("Could not reopen the file");
796        // Check if we have correctly truncated the file
797        // TODO: recompute this math
798        //assert_eq!(
799        //    file.metadata().unwrap().len(),
800        //    (used + size_of::<SectionHeader>()) as u64
801        //);
802    }
803
804    #[test]
805    fn test_one_section_self_cleaning() {
806        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
807        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
808        {
809            let _stream =
810                stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
811            assert_eq!(
812                logger
813                    .lock()
814                    .unwrap()
815                    .front_slab
816                    .sections_offsets_in_flight
817                    .len(),
818                1
819            );
820        }
821        assert_eq!(
822            logger
823                .lock()
824                .unwrap()
825                .front_slab
826                .sections_offsets_in_flight
827                .len(),
828            0
829        );
830        let logger = logger.lock().unwrap();
831        assert_eq!(
832            logger.front_slab.flushed_until_offset,
833            logger.front_slab.current_global_position
834        );
835    }
836
837    #[test]
838    fn test_two_sections_self_cleaning_in_order() {
839        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
840        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
841        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
842        assert_eq!(
843            logger
844                .lock()
845                .unwrap()
846                .front_slab
847                .sections_offsets_in_flight
848                .len(),
849            1
850        );
851        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
852        assert_eq!(
853            logger
854                .lock()
855                .unwrap()
856                .front_slab
857                .sections_offsets_in_flight
858                .len(),
859            2
860        );
861        drop(s2);
862        assert_eq!(
863            logger
864                .lock()
865                .unwrap()
866                .front_slab
867                .sections_offsets_in_flight
868                .len(),
869            1
870        );
871        drop(s1);
872        let lg = logger.lock().unwrap();
873        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
874        assert_eq!(
875            lg.front_slab.flushed_until_offset,
876            lg.front_slab.current_global_position
877        );
878    }
879
880    #[test]
881    fn test_two_sections_self_cleaning_out_of_order() {
882        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
883        let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
884        let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
885        assert_eq!(
886            logger
887                .lock()
888                .unwrap()
889                .front_slab
890                .sections_offsets_in_flight
891                .len(),
892            1
893        );
894        let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
895        assert_eq!(
896            logger
897                .lock()
898                .unwrap()
899                .front_slab
900                .sections_offsets_in_flight
901                .len(),
902            2
903        );
904        drop(s1);
905        assert_eq!(
906            logger
907                .lock()
908                .unwrap()
909                .front_slab
910                .sections_offsets_in_flight
911                .len(),
912            1
913        );
914        drop(s2);
915        let lg = logger.lock().unwrap();
916        assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
917        assert_eq!(
918            lg.front_slab.flushed_until_offset,
919            lg.front_slab.current_global_position
920        );
921    }
922
923    #[test]
924    fn test_write_then_read_one_section() {
925        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
926        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
927        {
928            let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
929            stream.log(&1u32).unwrap();
930            stream.log(&2u32).unwrap();
931            stream.log(&3u32).unwrap();
932        }
933        drop(logger);
934        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
935            .file_base_name(&f)
936            .build()
937            .expect("Failed to build logger")
938        else {
939            panic!("Failed to build logger");
940        };
941        let section = dl
942            .read_next_section_type(UnifiedLogType::StructuredLogLine)
943            .expect("Failed to read section");
944        assert!(section.is_some());
945        let section = section.unwrap();
946
947        let mut reader = BufReader::new(&section[..]);
948        let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
949        let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
950        let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
951        assert_eq!(v1, 1);
952        assert_eq!(v2, 2);
953        assert_eq!(v3, 3);
954    }
955
956    /// Mimic a basic CopperList implementation.
957
958    #[derive(Debug, Encode, Decode)]
959    enum CopperListStateMock {
960        Free,
961        ProcessingTasks,
962        BeingSerialized,
963    }
964
965    #[derive(Encode, Decode)]
966    struct CopperList<P: bincode::enc::Encode> {
967        state: CopperListStateMock,
968        payload: P, // This is generated from the runtime.
969    }
970
971    #[test]
972    fn test_copperlist_list_like_logging() {
973        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
974        let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
975        {
976            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
977            let cl0 = CopperList {
978                state: CopperListStateMock::Free,
979                payload: (1u32, 2u32, 3u32),
980            };
981            let cl1 = CopperList {
982                state: CopperListStateMock::ProcessingTasks,
983                payload: (4u32, 5u32, 6u32),
984            };
985            stream.log(&cl0).unwrap();
986            stream.log(&cl1).unwrap();
987        }
988        drop(logger);
989
990        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
991            .file_base_name(&f)
992            .build()
993            .expect("Failed to build logger")
994        else {
995            panic!("Failed to build logger");
996        };
997        let section = dl
998            .read_next_section_type(UnifiedLogType::CopperList)
999            .expect("Failed to read section");
1000        assert!(section.is_some());
1001        let section = section.unwrap();
1002
1003        let mut reader = BufReader::new(&section[..]);
1004        let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1005        let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1006        assert_eq!(cl0.payload.1, 2);
1007        assert_eq!(cl1.payload.2, 6);
1008    }
1009
1010    #[test]
1011    fn test_multi_slab_end2end() {
1012        let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1013        let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1014        {
1015            let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1016            let cl0 = CopperList {
1017                state: CopperListStateMock::Free,
1018                payload: (1u32, 2u32, 3u32),
1019            };
1020            // large enough so we are sure to create a few slabs
1021            for _ in 0..10000 {
1022                stream.log(&cl0).unwrap();
1023            }
1024        }
1025        drop(logger);
1026
1027        let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1028            .file_base_name(&f)
1029            .build()
1030            .expect("Failed to build logger")
1031        else {
1032            panic!("Failed to build logger");
1033        };
1034        let mut total_readback = 0;
1035        loop {
1036            let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1037            if section.is_err() {
1038                break;
1039            }
1040            let section = section.unwrap();
1041            if section.is_none() {
1042                break;
1043            }
1044            let section = section.unwrap();
1045
1046            let mut reader = BufReader::new(&section[..]);
1047            loop {
1048                let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1049                    decode_from_reader(&mut reader, standard());
1050                if maybe_cl.is_ok() {
1051                    total_readback += 1;
1052                } else {
1053                    break;
1054                }
1055            }
1056        }
1057        assert_eq!(total_readback, 10000);
1058    }
1059}