1use memmap2::{Mmap, MmapMut};
2use std::fmt::{Debug, Formatter};
3use std::fs::{File, OpenOptions};
4use std::io::Read;
5use std::mem::ManuallyDrop;
6use std::path::{Path, PathBuf};
7use std::slice::from_raw_parts_mut;
8use std::sync::{Arc, Mutex};
9use std::{io, mem};
10
11use bincode::config::standard;
12use bincode::decode_from_slice;
13use bincode::encode_into_slice;
14use bincode::error::EncodeError;
15use bincode::{Decode, Encode};
16use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
17
18const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF];
19
20const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57];
21
22#[derive(Encode, Decode, Debug)]
24struct MainHeader {
25 magic: [u8; 4], first_section_offset: u16, page_size: u16,
28}
29
30#[derive(Encode, Decode, Debug)]
34pub struct SectionHeader {
35 magic: [u8; 2], entry_type: UnifiedLogType,
37 section_size: u32, filled_size: u32, }
40
41const MAX_HEADER_SIZE: usize = mem::size_of::<SectionHeader>() + 3usize; impl Default for SectionHeader {
44 fn default() -> Self {
45 Self {
46 magic: SECTION_MAGIC,
47 entry_type: UnifiedLogType::Empty,
48 section_size: 0,
49 filled_size: 0,
50 }
51 }
52}
53
54struct MmapStream {
56 entry_type: UnifiedLogType,
57 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
58 current_section: SectionHandle,
59 current_position: usize,
60 minimum_allocation_amount: usize,
61}
62
63impl MmapStream {
64 fn new(
65 entry_type: UnifiedLogType,
66 parent_logger: Arc<Mutex<UnifiedLoggerWrite>>,
67 minimum_allocation_amount: usize,
68 ) -> Self {
69 let section = parent_logger
70 .lock()
71 .unwrap()
72 .add_section(entry_type, minimum_allocation_amount);
73 Self {
74 entry_type,
75 parent_logger,
76 current_section: section,
77 current_position: 0,
78 minimum_allocation_amount,
79 }
80 }
81}
82
83impl Debug for MmapStream {
84 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
85 write!(f, "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}", self.entry_type, self.current_position, self.minimum_allocation_amount)
86 }
87}
88
89impl<E: Encode> WriteStream<E> for MmapStream {
90 fn log(&mut self, obj: &E) -> CuResult<()> {
91 let dst = self.current_section.get_user_buffer();
92 let result = encode_into_slice(obj, dst, standard());
93 match result {
94 Ok(nb_bytes) => {
95 self.current_position += nb_bytes;
96 self.current_section.used += nb_bytes as u32;
97 Ok(())
98 }
99 Err(e) => match e {
100 EncodeError::UnexpectedEnd => {
101 let mut logger_guard = self.parent_logger.lock().unwrap();
102 logger_guard.flush_section(&mut self.current_section);
103 self.current_section =
104 logger_guard.add_section(self.entry_type, self.minimum_allocation_amount);
105
106 let result = encode_into_slice(
107 obj,
108 self.current_section.get_user_buffer(),
109 standard(),
110 )
111 .expect(
112 "Failed to encode object in a newly minted section. Unrecoverable failure.",
113 ); self.current_position += result;
115 self.current_section.used += result as u32;
116 Ok(())
117 }
118 _ => {
119 let err =
120 <&str as Into<CuError>>::into("Unexpected error while encoding object.")
121 .add_cause(e.to_string().as_str());
122 Err(err)
123 }
124 },
125 }
126 }
127}
128
129impl Drop for MmapStream {
130 fn drop(&mut self) {
131 let mut logger_guard = self.parent_logger.lock().unwrap();
132 logger_guard.flush_section(&mut self.current_section);
133 }
134}
135
136pub fn stream_write<E: Encode>(
138 logger: Arc<Mutex<UnifiedLoggerWrite>>,
139 entry_type: UnifiedLogType,
140 minimum_allocation_amount: usize,
141) -> impl WriteStream<E> {
142 MmapStream::new(entry_type, logger.clone(), minimum_allocation_amount)
143}
144
145pub enum UnifiedLogger {
147 Read(UnifiedLoggerRead),
148 Write(UnifiedLoggerWrite),
149}
150
151pub struct UnifiedLoggerBuilder {
153 file_base_name: Option<PathBuf>,
154 preallocated_size: Option<usize>,
155 write: bool,
156 create: bool,
157}
158
159impl Default for UnifiedLoggerBuilder {
160 fn default() -> Self {
161 Self::new()
162 }
163}
164
165impl UnifiedLoggerBuilder {
166 pub fn new() -> Self {
167 Self {
168 file_base_name: None,
169 preallocated_size: None,
170 write: false,
171 create: false, }
173 }
174
175 pub fn file_base_name(mut self, file_path: &Path) -> Self {
177 self.file_base_name = Some(file_path.to_path_buf());
178 self
179 }
180
181 pub fn preallocated_size(mut self, preallocated_size: usize) -> Self {
182 self.preallocated_size = Some(preallocated_size);
183 self
184 }
185
186 pub fn write(mut self, write: bool) -> Self {
187 self.write = write;
188 self
189 }
190
191 pub fn create(mut self, create: bool) -> Self {
192 self.create = create;
193 self
194 }
195
196 pub fn build(self) -> io::Result<UnifiedLogger> {
197 let page_size = page_size::get();
198
199 if self.write && self.create {
200 let ulw = UnifiedLoggerWrite::new(
201 &self.file_base_name.unwrap(),
202 self.preallocated_size.unwrap(),
203 page_size,
204 );
205
206 Ok(UnifiedLogger::Write(ulw))
207 } else {
208 let file_path = self.file_base_name.ok_or_else(|| {
209 io::Error::new(io::ErrorKind::InvalidInput, "File path is required")
210 })?;
211 let ulr = UnifiedLoggerRead::new(&file_path)?;
212 Ok(UnifiedLogger::Read(ulr))
213 }
214 }
215}
216
217pub struct UnifiedLoggerRead {
219 base_file_path: PathBuf,
220 current_mmap_buffer: Mmap,
221 current_file: File,
222 current_slab_index: usize,
223 current_reading_position: usize,
224}
225
226struct SlabEntry {
227 file: File,
228 mmap_buffer: ManuallyDrop<MmapMut>,
229 current_global_position: usize,
230 sections_offsets_in_flight: Vec<usize>,
231 flushed_until_offset: usize,
232 page_size: usize,
233}
234
235impl Drop for SlabEntry {
236 fn drop(&mut self) {
237 self.flush_until(self.current_global_position);
238 unsafe { ManuallyDrop::drop(&mut self.mmap_buffer) };
239 self.file
240 .set_len(self.current_global_position as u64)
241 .expect("Failed to trim datalogger file");
242
243 if !self.sections_offsets_in_flight.is_empty() {
244 eprintln!("Error: Slab not full flushed.");
245 }
246 }
247}
248
249pub enum AllocatedSection {
250 NoMoreSpace,
251 Section(SectionHandle),
252}
253
254impl SlabEntry {
255 fn new(file: File, page_size: usize) -> Self {
256 let mmap_buffer =
257 ManuallyDrop::new(unsafe { MmapMut::map_mut(&file).expect("Failed to map file") });
258 Self {
259 file,
260 mmap_buffer,
261 current_global_position: 0,
262 sections_offsets_in_flight: Vec::with_capacity(16),
263 flushed_until_offset: 0,
264 page_size,
265 }
266 }
267
268 fn flush_until(&mut self, until_position: usize) {
270 if (self.flushed_until_offset == until_position) || (until_position == 0) {
272 return;
273 }
274 self.mmap_buffer
275 .flush_async_range(
276 self.flushed_until_offset,
277 until_position - self.flushed_until_offset,
278 )
279 .expect("Failed to flush memory map");
280 self.flushed_until_offset = until_position;
281 }
282
283 fn is_it_my_section(&self, section: &SectionHandle) -> bool {
284 (section.buffer.as_ptr() >= self.mmap_buffer.as_ptr())
285 && (section.buffer.as_ptr() as usize)
286 < (self.mmap_buffer.as_ref().as_ptr() as usize + self.mmap_buffer.as_ref().len())
287 }
288
289 fn flush_section(&mut self, section: &mut SectionHandle) {
292 if section.buffer.as_ptr() < self.mmap_buffer.as_ptr()
293 || section.buffer.as_ptr() as usize
294 > self.mmap_buffer.as_ptr() as usize + self.mmap_buffer.len()
295 {
296 panic!("Invalid section buffer, not in the slab");
297 }
298
299 section.update_header();
301
302 let _sz = encode_into_slice(§ion.section_header, section.buffer, standard())
303 .expect("Failed to encode section header");
304
305 let base = self.mmap_buffer.as_ptr() as usize;
306 let section_buffer_addr = section.buffer.as_ptr() as usize;
307 self.sections_offsets_in_flight
308 .retain(|&x| x != section_buffer_addr - base);
309
310 if self.sections_offsets_in_flight.is_empty() {
311 self.flush_until(self.current_global_position);
312 return;
313 }
314 if self.flushed_until_offset < self.sections_offsets_in_flight[0] {
315 self.flush_until(self.sections_offsets_in_flight[0]);
316 }
317 }
318
319 #[inline]
320 fn align_to_next_page(&self, ptr: usize) -> usize {
321 (ptr + self.page_size - 1) & !(self.page_size - 1)
322 }
323
324 fn add_section(
326 &mut self,
327 entry_type: UnifiedLogType,
328 requested_section_size: usize,
329 ) -> AllocatedSection {
330 self.current_global_position = self.align_to_next_page(self.current_global_position);
332 let section_size = self.align_to_next_page(requested_section_size) as u32;
333
334 if self.current_global_position + section_size as usize > self.mmap_buffer.len() {
336 return AllocatedSection::NoMoreSpace;
337 }
338
339 let section_header = SectionHeader {
340 magic: SECTION_MAGIC,
341 entry_type,
342 section_size,
343 filled_size: 0u32,
344 };
345
346 let nb_bytes = encode_into_slice(
347 §ion_header,
348 &mut self.mmap_buffer[self.current_global_position..],
349 standard(),
350 )
351 .expect("Failed to encode section header");
352 assert!(nb_bytes < self.page_size);
353
354 self.sections_offsets_in_flight
356 .push(self.current_global_position);
357 let end_of_section = self.current_global_position + requested_section_size;
358 let user_buffer = &mut self.mmap_buffer[self.current_global_position..end_of_section];
359
360 let handle_buffer =
362 unsafe { from_raw_parts_mut(user_buffer.as_mut_ptr(), user_buffer.len()) };
363
364 self.current_global_position = end_of_section;
365
366 AllocatedSection::Section(SectionHandle::create(section_header, handle_buffer))
367 }
368
369 #[cfg(test)]
370 fn used(&self) -> usize {
371 self.current_global_position
372 }
373}
374
375#[derive(Default)]
378pub struct SectionHandle {
379 section_header: SectionHeader,
380 buffer: &'static mut [u8], used: u32, }
383
384impl SectionHandle {
387 pub fn create(section_header: SectionHeader, buffer: &'static mut [u8]) -> Self {
389 if buffer[0] != SECTION_MAGIC[0] || buffer[1] != SECTION_MAGIC[1] {
391 panic!("Invalid section buffer, magic number not found");
392 }
393
394 if buffer.len() < MAX_HEADER_SIZE {
395 panic!(
396 "Invalid section buffer, too small: {}, it needs to be > {}",
397 buffer.len(),
398 MAX_HEADER_SIZE
399 );
400 }
401
402 Self {
403 section_header,
404 buffer,
405 used: 0,
406 }
407 }
408 pub fn get_user_buffer(&mut self) -> &mut [u8] {
409 &mut self.buffer[MAX_HEADER_SIZE + self.used as usize..]
410 }
411
412 pub fn update_header(&mut self) {
413 if self.section_header.entry_type == UnifiedLogType::Empty || self.used == 0 {
415 return;
416 }
417 self.section_header.filled_size = self.used;
418
419 }
423}
424
425pub struct UnifiedLoggerWrite {
427 front_slab: SlabEntry,
429 back_slabs: Vec<SlabEntry>,
431 base_file_path: PathBuf,
433 slab_size: usize,
435 front_slab_suffix: usize,
437}
438
439fn build_slab_path(base_file_path: &Path, slab_index: usize) -> PathBuf {
440 let mut file_path = base_file_path.to_path_buf();
441 let file_name = file_path.file_name().unwrap().to_str().unwrap();
442 let mut file_name = file_name.split('.').collect::<Vec<&str>>();
443 let extension = file_name.pop().unwrap();
444 let file_name = file_name.join(".");
445 let file_name = format!("{file_name}_{slab_index}.{extension}");
446 file_path.set_file_name(file_name);
447 file_path
448}
449
450fn make_slab_file(base_file_path: &Path, slab_size: usize, slab_suffix: usize) -> File {
451 let file_path = build_slab_path(base_file_path, slab_suffix);
452 let file = OpenOptions::new()
453 .read(true)
454 .write(true)
455 .create(true)
456 .truncate(true)
457 .open(&file_path)
458 .unwrap_or_else(|_| panic!("Failed to open file: {}", file_path.display()));
459 file.set_len(slab_size as u64)
460 .expect("Failed to set file length");
461 file
462}
463
464impl UnifiedLoggerWrite {
465 fn next_slab(&mut self) -> File {
466 self.front_slab_suffix += 1;
467
468 make_slab_file(&self.base_file_path, self.slab_size, self.front_slab_suffix)
469 }
470
471 fn new(base_file_path: &Path, slab_size: usize, page_size: usize) -> Self {
472 let file = make_slab_file(base_file_path, slab_size, 0);
473 let mut front_slab = SlabEntry::new(file, page_size);
474
475 let main_header = MainHeader {
477 magic: MAIN_MAGIC,
478 first_section_offset: page_size as u16,
479 page_size: page_size as u16,
480 };
481 let nb_bytes = encode_into_slice(&main_header, &mut front_slab.mmap_buffer[..], standard())
482 .expect("Failed to encode main header");
483 assert!(nb_bytes < page_size);
484 front_slab.current_global_position = page_size; Self {
487 front_slab,
488 back_slabs: Vec::new(),
489 base_file_path: base_file_path.to_path_buf(),
490 slab_size,
491 front_slab_suffix: 0,
492 }
493 }
494
495 pub fn flush_section(&mut self, section: &mut SectionHandle) {
496 for slab in self.back_slabs.iter_mut() {
497 if slab.is_it_my_section(section) {
498 slab.flush_section(section);
499 return;
500 }
501 }
502 self.front_slab.flush_section(section);
503 }
504
505 fn garbage_collect_backslabs(&mut self) {
506 self.back_slabs
507 .retain_mut(|slab| !slab.sections_offsets_in_flight.is_empty());
508 }
509
510 fn add_section(
512 &mut self,
513 entry_type: UnifiedLogType,
514 requested_section_size: usize,
515 ) -> SectionHandle {
516 self.garbage_collect_backslabs(); let maybe_section = self
519 .front_slab
520 .add_section(entry_type, requested_section_size);
521
522 match maybe_section {
523 AllocatedSection::NoMoreSpace => {
524 let new_slab = SlabEntry::new(self.next_slab(), self.front_slab.page_size);
526 self.back_slabs
528 .push(mem::replace(&mut self.front_slab, new_slab));
529 match self
530 .front_slab
531 .add_section(entry_type, requested_section_size)
532 {
533 AllocatedSection::NoMoreSpace => {
534 panic!("Failed to allocate a section in a new slab");
535 }
536 AllocatedSection::Section(section) => section,
537 }
538 }
539 AllocatedSection::Section(section) => section,
540 }
541 }
542
543 pub fn stats(&self) -> (usize, Vec<usize>, usize) {
544 (
545 self.front_slab.current_global_position,
546 self.front_slab.sections_offsets_in_flight.clone(),
547 self.back_slabs.len(),
548 )
549 }
550}
551
552impl Drop for UnifiedLoggerWrite {
553 fn drop(&mut self) {
554 let mut section = self.add_section(UnifiedLogType::LastEntry, 80); self.front_slab.flush_section(&mut section);
556 self.garbage_collect_backslabs();
557 }
558}
559
560fn open_slab_index(base_file_path: &Path, slab_index: usize) -> io::Result<(File, Mmap, u16)> {
561 let mut options = OpenOptions::new();
562 let options = options.read(true);
563
564 let file_path = build_slab_path(base_file_path, slab_index);
565 let file = options.open(file_path)?;
566 let mmap = unsafe { Mmap::map(&file) }?;
567 let mut prolog = 0u16;
568 if slab_index == 0 {
569 let main_header: MainHeader;
570 let _read: usize;
571 (main_header, _read) =
572 decode_from_slice(&mmap[..], standard()).expect("Failed to decode main header");
573 if main_header.magic != MAIN_MAGIC {
574 return Err(io::Error::new(
575 io::ErrorKind::InvalidData,
576 "Invalid magic number in main header",
577 ));
578 }
579 prolog = main_header.first_section_offset;
580 }
581 Ok((file, mmap, prolog))
582}
583
584impl UnifiedLoggerRead {
585 pub fn new(base_file_path: &Path) -> io::Result<Self> {
586 let (file, mmap, prolog) = open_slab_index(base_file_path, 0)?;
587
588 Ok(Self {
589 base_file_path: base_file_path.to_path_buf(),
590 current_file: file,
591 current_mmap_buffer: mmap,
592 current_slab_index: 0,
593 current_reading_position: prolog as usize,
594 })
595 }
596
597 fn next_slab(&mut self) -> io::Result<()> {
598 self.current_slab_index += 1;
599 let (file, mmap, prolog) = open_slab_index(&self.base_file_path, self.current_slab_index)?;
600 self.current_file = file;
601 self.current_mmap_buffer = mmap;
602 self.current_reading_position = prolog as usize;
603 Ok(())
604 }
605
606 pub fn read_next_section_type(
607 &mut self,
608 datalogtype: UnifiedLogType,
609 ) -> CuResult<Option<Vec<u8>>> {
610 loop {
612 if self.current_reading_position >= self.current_mmap_buffer.len() {
613 self.next_slab().map_err(|e| {
614 CuError::new_with_cause("Failed to read next slab, is the log complete?", e)
615 })?;
616 }
617
618 let header_result = self.read_section_header();
619 if let Err(error) = header_result {
620 return Err(CuError::new_with_cause(
621 "Could not read a sections header",
622 error,
623 ));
624 };
625 let header = header_result.unwrap();
626
627 if header.entry_type == UnifiedLogType::LastEntry {
629 return Ok(None);
630 }
631
632 if header.entry_type == datalogtype {
634 let result = Some(self.read_section_content(&header)?);
635 self.current_reading_position += header.section_size as usize;
636 return Ok(result);
637 }
638
639 self.current_reading_position += header.section_size as usize;
641 }
642 }
643
644 pub fn read_section(&mut self) -> CuResult<Vec<u8>> {
646 let read_result = self.read_section_header();
647 if let Err(error) = read_result {
648 return Err(CuError::new_with_cause(
649 "Could not read a sections header",
650 error,
651 ));
652 };
653
654 self.read_section_content(&read_result.unwrap())
655 }
656
657 fn read_section_content(&mut self, header: &SectionHeader) -> CuResult<Vec<u8>> {
659 if header.filled_size == 0 {
661 eprintln!("Warning: read an empty section");
662 }
663 let mut section = vec![0; header.filled_size as usize];
664 let start_of_data = self.current_reading_position + MAX_HEADER_SIZE;
665 section.copy_from_slice(
666 &self.current_mmap_buffer[start_of_data..start_of_data + header.filled_size as usize],
667 );
668
669 Ok(section)
670 }
671
672 fn read_section_header(&mut self) -> CuResult<SectionHeader> {
673 let section_header: SectionHeader;
674 (section_header, _) = decode_from_slice(
675 &self.current_mmap_buffer[self.current_reading_position..],
676 standard(),
677 )
678 .expect("Failed to decode section header");
679 if section_header.magic != SECTION_MAGIC {
680 return Err("Invalid magic number in section header".into());
681 }
682 Ok(section_header)
683 }
684}
685
686pub struct UnifiedLoggerIOReader {
688 logger: UnifiedLoggerRead,
689 log_type: UnifiedLogType,
690 buffer: Vec<u8>,
691 buffer_pos: usize,
692}
693
694impl UnifiedLoggerIOReader {
695 pub fn new(logger: UnifiedLoggerRead, log_type: UnifiedLogType) -> Self {
696 Self {
697 logger,
698 log_type,
699 buffer: Vec::new(),
700 buffer_pos: 0,
701 }
702 }
703
704 fn fill_buffer(&mut self) -> io::Result<bool> {
706 match self.logger.read_next_section_type(self.log_type) {
707 Ok(Some(section)) => {
708 self.buffer = section;
709 self.buffer_pos = 0;
710 Ok(true)
711 }
712 Ok(None) => Ok(false), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
714 }
715 }
716}
717
718impl Read for UnifiedLoggerIOReader {
719 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
720 if self.buffer_pos >= self.buffer.len() && !self.fill_buffer()? {
721 return Ok(0);
723 }
724
725 if self.buffer_pos >= self.buffer.len() {
727 return Ok(0);
728 }
729
730 let len = std::cmp::min(buf.len(), self.buffer.len() - self.buffer_pos);
732 buf[..len].copy_from_slice(&self.buffer[self.buffer_pos..self.buffer_pos + len]);
733 self.buffer_pos += len;
734 Ok(len)
735 }
736}
737
738#[cfg(test)]
739mod tests {
740 use super::*;
741 use bincode::decode_from_reader;
742 use std::io::BufReader;
743 use std::path::PathBuf;
744 use tempfile::TempDir;
745
746 const LARGE_SLAB: usize = 100 * 1024; const SMALL_SLAB: usize = 16 * 2 * 1024; fn make_a_logger(
750 tmp_dir: &TempDir,
751 slab_size: usize,
752 ) -> (Arc<Mutex<UnifiedLoggerWrite>>, PathBuf) {
753 let file_path = tmp_dir.path().join("test.bin");
754 let UnifiedLogger::Write(data_logger) = UnifiedLoggerBuilder::new()
755 .write(true)
756 .create(true)
757 .file_base_name(&file_path)
758 .preallocated_size(slab_size)
759 .build()
760 .expect("Failed to create logger")
761 else {
762 panic!("Failed to create logger")
763 };
764
765 (Arc::new(Mutex::new(data_logger)), file_path)
766 }
767
768 #[test]
769 fn test_truncation_and_sections_creations() {
770 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
771 let file_path = tmp_dir.path().join("test.bin");
772 let _used = {
773 let UnifiedLogger::Write(mut logger) = UnifiedLoggerBuilder::new()
774 .write(true)
775 .create(true)
776 .file_base_name(&file_path)
777 .preallocated_size(100000)
778 .build()
779 .expect("Failed to create logger")
780 else {
781 panic!("Failed to create logger")
782 };
783 logger.add_section(UnifiedLogType::StructuredLogLine, 1024);
784 logger.add_section(UnifiedLogType::CopperList, 2048);
785 let used = logger.front_slab.used();
786 assert!(used < 4 * page_size::get()); used
790 };
791
792 let _file = OpenOptions::new()
793 .read(true)
794 .open(tmp_dir.path().join("test_0.bin"))
795 .expect("Could not reopen the file");
796 }
803
804 #[test]
805 fn test_one_section_self_cleaning() {
806 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
807 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
808 {
809 let _stream =
810 stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
811 assert_eq!(
812 logger
813 .lock()
814 .unwrap()
815 .front_slab
816 .sections_offsets_in_flight
817 .len(),
818 1
819 );
820 }
821 assert_eq!(
822 logger
823 .lock()
824 .unwrap()
825 .front_slab
826 .sections_offsets_in_flight
827 .len(),
828 0
829 );
830 let logger = logger.lock().unwrap();
831 assert_eq!(
832 logger.front_slab.flushed_until_offset,
833 logger.front_slab.current_global_position
834 );
835 }
836
837 #[test]
838 fn test_two_sections_self_cleaning_in_order() {
839 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
840 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
841 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
842 assert_eq!(
843 logger
844 .lock()
845 .unwrap()
846 .front_slab
847 .sections_offsets_in_flight
848 .len(),
849 1
850 );
851 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
852 assert_eq!(
853 logger
854 .lock()
855 .unwrap()
856 .front_slab
857 .sections_offsets_in_flight
858 .len(),
859 2
860 );
861 drop(s2);
862 assert_eq!(
863 logger
864 .lock()
865 .unwrap()
866 .front_slab
867 .sections_offsets_in_flight
868 .len(),
869 1
870 );
871 drop(s1);
872 let lg = logger.lock().unwrap();
873 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
874 assert_eq!(
875 lg.front_slab.flushed_until_offset,
876 lg.front_slab.current_global_position
877 );
878 }
879
880 #[test]
881 fn test_two_sections_self_cleaning_out_of_order() {
882 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
883 let (logger, _) = make_a_logger(&tmp_dir, LARGE_SLAB);
884 let s1 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
885 assert_eq!(
886 logger
887 .lock()
888 .unwrap()
889 .front_slab
890 .sections_offsets_in_flight
891 .len(),
892 1
893 );
894 let s2 = stream_write::<()>(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
895 assert_eq!(
896 logger
897 .lock()
898 .unwrap()
899 .front_slab
900 .sections_offsets_in_flight
901 .len(),
902 2
903 );
904 drop(s1);
905 assert_eq!(
906 logger
907 .lock()
908 .unwrap()
909 .front_slab
910 .sections_offsets_in_flight
911 .len(),
912 1
913 );
914 drop(s2);
915 let lg = logger.lock().unwrap();
916 assert_eq!(lg.front_slab.sections_offsets_in_flight.len(), 0);
917 assert_eq!(
918 lg.front_slab.flushed_until_offset,
919 lg.front_slab.current_global_position
920 );
921 }
922
923 #[test]
924 fn test_write_then_read_one_section() {
925 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
926 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
927 {
928 let mut stream = stream_write(logger.clone(), UnifiedLogType::StructuredLogLine, 1024);
929 stream.log(&1u32).unwrap();
930 stream.log(&2u32).unwrap();
931 stream.log(&3u32).unwrap();
932 }
933 drop(logger);
934 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
935 .file_base_name(&f)
936 .build()
937 .expect("Failed to build logger")
938 else {
939 panic!("Failed to build logger");
940 };
941 let section = dl
942 .read_next_section_type(UnifiedLogType::StructuredLogLine)
943 .expect("Failed to read section");
944 assert!(section.is_some());
945 let section = section.unwrap();
946
947 let mut reader = BufReader::new(§ion[..]);
948 let v1: u32 = decode_from_reader(&mut reader, standard()).unwrap();
949 let v2: u32 = decode_from_reader(&mut reader, standard()).unwrap();
950 let v3: u32 = decode_from_reader(&mut reader, standard()).unwrap();
951 assert_eq!(v1, 1);
952 assert_eq!(v2, 2);
953 assert_eq!(v3, 3);
954 }
955
956 #[derive(Debug, Encode, Decode)]
959 enum CopperListStateMock {
960 Free,
961 ProcessingTasks,
962 BeingSerialized,
963 }
964
965 #[derive(Encode, Decode)]
966 struct CopperList<P: bincode::enc::Encode> {
967 state: CopperListStateMock,
968 payload: P, }
970
971 #[test]
972 fn test_copperlist_list_like_logging() {
973 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
974 let (logger, f) = make_a_logger(&tmp_dir, LARGE_SLAB);
975 {
976 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
977 let cl0 = CopperList {
978 state: CopperListStateMock::Free,
979 payload: (1u32, 2u32, 3u32),
980 };
981 let cl1 = CopperList {
982 state: CopperListStateMock::ProcessingTasks,
983 payload: (4u32, 5u32, 6u32),
984 };
985 stream.log(&cl0).unwrap();
986 stream.log(&cl1).unwrap();
987 }
988 drop(logger);
989
990 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
991 .file_base_name(&f)
992 .build()
993 .expect("Failed to build logger")
994 else {
995 panic!("Failed to build logger");
996 };
997 let section = dl
998 .read_next_section_type(UnifiedLogType::CopperList)
999 .expect("Failed to read section");
1000 assert!(section.is_some());
1001 let section = section.unwrap();
1002
1003 let mut reader = BufReader::new(§ion[..]);
1004 let cl0: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1005 let cl1: CopperList<(u32, u32, u32)> = decode_from_reader(&mut reader, standard()).unwrap();
1006 assert_eq!(cl0.payload.1, 2);
1007 assert_eq!(cl1.payload.2, 6);
1008 }
1009
1010 #[test]
1011 fn test_multi_slab_end2end() {
1012 let tmp_dir = TempDir::new().expect("could not create a tmp dir");
1013 let (logger, f) = make_a_logger(&tmp_dir, SMALL_SLAB);
1014 {
1015 let mut stream = stream_write(logger.clone(), UnifiedLogType::CopperList, 1024);
1016 let cl0 = CopperList {
1017 state: CopperListStateMock::Free,
1018 payload: (1u32, 2u32, 3u32),
1019 };
1020 for _ in 0..10000 {
1022 stream.log(&cl0).unwrap();
1023 }
1024 }
1025 drop(logger);
1026
1027 let UnifiedLogger::Read(mut dl) = UnifiedLoggerBuilder::new()
1028 .file_base_name(&f)
1029 .build()
1030 .expect("Failed to build logger")
1031 else {
1032 panic!("Failed to build logger");
1033 };
1034 let mut total_readback = 0;
1035 loop {
1036 let section = dl.read_next_section_type(UnifiedLogType::CopperList);
1037 if section.is_err() {
1038 break;
1039 }
1040 let section = section.unwrap();
1041 if section.is_none() {
1042 break;
1043 }
1044 let section = section.unwrap();
1045
1046 let mut reader = BufReader::new(§ion[..]);
1047 loop {
1048 let maybe_cl: Result<CopperList<(u32, u32, u32)>, _> =
1049 decode_from_reader(&mut reader, standard());
1050 if maybe_cl.is_ok() {
1051 total_readback += 1;
1052 } else {
1053 break;
1054 }
1055 }
1056 }
1057 assert_eq!(total_readback, 10000);
1058 }
1059}