Skip to main content

cu29_unifiedlog/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
4extern crate core;
5
6#[cfg(feature = "std")]
7pub mod memmap;
8pub mod noop;
9
10#[cfg(feature = "std")]
11mod compat {
12    // backward compatibility for the std implementation
13    pub use crate::memmap::LogPosition;
14    pub use crate::memmap::MmapUnifiedLogger as UnifiedLogger;
15    pub use crate::memmap::MmapUnifiedLoggerBuilder as UnifiedLoggerBuilder;
16    pub use crate::memmap::MmapUnifiedLoggerRead as UnifiedLoggerRead;
17    pub use crate::memmap::MmapUnifiedLoggerWrite as UnifiedLoggerWrite;
18    pub use crate::memmap::UnifiedLoggerIOReader;
19}
20
21#[cfg(feature = "std")]
22pub use compat::*;
23pub use noop::{NoopLogger, NoopSectionStorage};
24
25use alloc::string::ToString;
26use alloc::sync::Arc;
27use alloc::vec::Vec;
28use core::fmt::{Debug, Display, Formatter, Result as FmtResult};
29#[cfg(not(feature = "std"))]
30use spin::Mutex;
31#[cfg(feature = "std")]
32use std::sync::Mutex;
33
34use bincode::error::EncodeError;
35use bincode::{Decode, Encode};
36use cu29_traits::{CuError, CuResult, UnifiedLogType, WriteStream};
37
38/// ID to spot the beginning of a Copper Log
39#[allow(dead_code)]
40pub const MAIN_MAGIC: [u8; 4] = [0xB4, 0xA5, 0x50, 0xFF]; // BRASS OFF
41
42/// ID to spot a section of Copper Log
43pub const SECTION_MAGIC: [u8; 2] = [0xFA, 0x57]; // FAST
44
45pub const SECTION_HEADER_COMPACT_SIZE: u16 = 512; // Usual minimum size for a disk sector.
46
47/// The main file header of the datalogger.
48#[derive(Encode, Decode, Debug)]
49pub struct MainHeader {
50    pub magic: [u8; 4],            // Magic number to identify the file.
51    pub first_section_offset: u16, // This is to align with a page at write time.
52    pub page_size: u16,
53}
54
55impl Display for MainHeader {
56    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
57        writeln!(
58            f,
59            "  Magic -> {:2x}{:2x}{:2x}{:2x}",
60            self.magic[0], self.magic[1], self.magic[2], self.magic[3]
61        )?;
62        writeln!(f, "  first_section_offset -> {}", self.first_section_offset)?;
63        writeln!(f, "  page_size -> {}", self.page_size)
64    }
65}
66
67/// Each concurrent sublogger is tracked through a section header.
68/// They form a linked list of sections.
69/// The entry type is used to identify the type of data in the section.
70#[derive(Encode, Decode, Debug)]
71pub struct SectionHeader {
72    pub magic: [u8; 2],  // Magic number to identify the section.
73    pub block_size: u16, // IMPORTANT: we assume this header fits in this block size.
74    pub entry_type: UnifiedLogType,
75    pub offset_to_next_section: u32, // offset from the first byte of this header to the first byte of the next header (MAGIC to MAGIC).
76    pub used: u32,                   // how much of the section is filled.
77    pub is_open: bool,               // true while being written, false once closed.
78}
79
80impl Display for SectionHeader {
81    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
82        writeln!(f, "    Magic -> {:2x}{:2x}", self.magic[0], self.magic[1])?;
83        writeln!(f, "    type -> {:?}", self.entry_type)?;
84        write!(
85            f,
86            "    use  -> {} / {} (open: {})",
87            self.used, self.offset_to_next_section, self.is_open
88        )
89    }
90}
91
92impl Default for SectionHeader {
93    fn default() -> Self {
94        Self {
95            magic: SECTION_MAGIC,
96            block_size: 512,
97            entry_type: UnifiedLogType::Empty,
98            offset_to_next_section: 0,
99            used: 0,
100            is_open: true,
101        }
102    }
103}
104
105pub enum AllocatedSection<S: SectionStorage> {
106    NoMoreSpace,
107    Section(SectionHandle<S>),
108}
109
110/// A Storage is an append-only structure that can update a header section.
111pub trait SectionStorage: Send + Sync {
112    /// This rewinds the storage, serialize the header and jumps to the beginning of the user data storage.
113    fn initialize<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
114    /// This updates the header leaving the position to the end of the user data storage.
115    fn post_update_header<E: Encode>(&mut self, header: &E) -> Result<usize, EncodeError>;
116    /// Appends the entry to the user data storage.
117    fn append<E: Encode>(&mut self, entry: &E) -> Result<usize, EncodeError>;
118    /// Flushes the section to the underlying storage
119    fn flush(&mut self) -> CuResult<usize>;
120}
121
122/// A SectionHandle is a handle to a section in the datalogger.
123/// It allows tracking the lifecycle of the section.
124#[derive(Default)]
125pub struct SectionHandle<S: SectionStorage> {
126    header: SectionHeader, // keep a copy of the header as metadata
127    storage: S,
128}
129
130impl<S: SectionStorage> SectionHandle<S> {
131    pub fn create(header: SectionHeader, mut storage: S) -> CuResult<Self> {
132        // Write the first version of the header.
133        let _ = storage.initialize(&header).map_err(|e| e.to_string())?;
134        Ok(Self { header, storage })
135    }
136
137    pub fn mark_closed(&mut self) {
138        self.header.is_open = false;
139    }
140    pub fn append<E: Encode>(&mut self, entry: E) -> Result<usize, EncodeError> {
141        self.storage.append(&entry)
142    }
143
144    pub fn get_storage(&self) -> &S {
145        &self.storage
146    }
147
148    pub fn get_storage_mut(&mut self) -> &mut S {
149        &mut self.storage
150    }
151
152    pub fn post_update_header(&mut self) -> Result<usize, EncodeError> {
153        self.storage.post_update_header(&self.header)
154    }
155}
156
157/// Basic statistics for the unified logger.
158/// Note: the total_allocated_space might grow for the std implementation
159pub struct UnifiedLogStatus {
160    pub total_used_space: usize,
161    pub total_allocated_space: usize,
162}
163
164/// Payload stored in the end-of-log section to signal whether the log was cleanly closed.
165#[derive(Encode, Decode, Debug, Clone)]
166pub struct EndOfLogMarker {
167    pub temporary: bool,
168}
169
170/// The writing interface to the unified logger.
171/// Writing is "almost" linear as various streams can allocate sections and track them until
172/// they drop them.
173pub trait UnifiedLogWrite<S: SectionStorage>: Send + Sync {
174    /// A section is a contiguous chunk of memory that can be used to write data.
175    /// It can store various types of data as specified by the entry_type.
176    /// The requested_section_size is the size of the section to allocate.
177    /// It returns a handle to the section that can be used to write data until
178    /// it is flushed with flush_section, it is then considered unmutable.
179    fn add_section(
180        &mut self,
181        entry_type: UnifiedLogType,
182        requested_section_size: usize,
183    ) -> CuResult<SectionHandle<S>>;
184
185    /// Flush the given section to the underlying storage.
186    fn flush_section(&mut self, section: &mut SectionHandle<S>);
187
188    /// Returns the current status of the unified logger.
189    fn status(&self) -> UnifiedLogStatus;
190}
191
192/// Read back a unified log linearly.
193pub trait UnifiedLogRead {
194    /// Read through the unified logger until it reaches the UnifiedLogType given in datalogtype.
195    /// It will return the byte array of the section if found.
196    fn read_next_section_type(&mut self, datalogtype: UnifiedLogType) -> CuResult<Option<Vec<u8>>>;
197
198    /// Read through the next section entry regardless of its type.
199    /// It will return the header and the byte array of the section.
200    /// Note the last Entry should be of UnifiedLogType::LastEntry if the log is not corrupted.
201    fn raw_read_section(&mut self) -> CuResult<(SectionHeader, Vec<u8>)>;
202}
203
204/// Create a new stream to write to the unifiedlogger.
205pub fn stream_write<E: Encode, S: SectionStorage>(
206    logger: Arc<Mutex<impl UnifiedLogWrite<S>>>,
207    entry_type: UnifiedLogType,
208    minimum_allocation_amount: usize,
209) -> CuResult<impl WriteStream<E>> {
210    LogStream::new(entry_type, logger, minimum_allocation_amount)
211}
212
213/// A wrapper around the unifiedlogger that implements the Write trait.
214pub struct LogStream<S: SectionStorage, L: UnifiedLogWrite<S>> {
215    entry_type: UnifiedLogType,
216    parent_logger: Arc<Mutex<L>>,
217    current_section: SectionHandle<S>,
218    current_position: usize,
219    minimum_allocation_amount: usize,
220    last_log_bytes: usize,
221}
222
223impl<S: SectionStorage, L: UnifiedLogWrite<S>> LogStream<S, L> {
224    fn new(
225        entry_type: UnifiedLogType,
226        parent_logger: Arc<Mutex<L>>,
227        minimum_allocation_amount: usize,
228    ) -> CuResult<Self> {
229        #[cfg(feature = "std")]
230        let section = parent_logger
231            .lock()
232            .map_err(|e| {
233                CuError::from("Could not lock a section at LogStream creation")
234                    .add_cause(e.to_string().as_str())
235            })?
236            .add_section(entry_type, minimum_allocation_amount)?;
237
238        #[cfg(not(feature = "std"))]
239        let section = parent_logger
240            .lock()
241            .add_section(entry_type, minimum_allocation_amount)?;
242
243        Ok(Self {
244            entry_type,
245            parent_logger,
246            current_section: section,
247            current_position: 0,
248            minimum_allocation_amount,
249            last_log_bytes: 0,
250        })
251    }
252}
253
254impl<S: SectionStorage, L: UnifiedLogWrite<S>> Debug for LogStream<S, L> {
255    fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
256        write!(
257            f,
258            "MmapStream {{ entry_type: {:?}, current_position: {}, minimum_allocation_amount: {} }}",
259            self.entry_type, self.current_position, self.minimum_allocation_amount
260        )
261    }
262}
263
264impl<E: Encode, S: SectionStorage, L: UnifiedLogWrite<S>> WriteStream<E> for LogStream<S, L> {
265    fn log(&mut self, obj: &E) -> CuResult<()> {
266        //let dst = self.current_section.get_user_buffer();
267        // let result = encode_into_slice(obj, dst, standard());
268        let result = self.current_section.append(obj);
269        match result {
270            Ok(nb_bytes) => {
271                self.current_position += nb_bytes;
272                self.current_section.header.used += nb_bytes as u32;
273                self.last_log_bytes = nb_bytes;
274                // Track encoded bytes so monitoring can compute actual bytes written.
275                Ok(())
276            }
277            Err(e) => match e {
278                EncodeError::UnexpectedEnd => {
279                    #[cfg(feature = "std")]
280                    let logger_guard = self.parent_logger.lock();
281
282                    #[cfg(not(feature = "std"))]
283                    let mut logger_guard = self.parent_logger.lock();
284
285                    #[cfg(feature = "std")]
286                    let mut logger_guard =
287                        match logger_guard {
288                            Ok(g) => g,
289                            Err(_) => return Err(
290                                "Logger mutex poisoned while reporting EncodeError::UnexpectedEnd"
291                                    .into(),
292                            ), // It will retry but at least not completely crash.
293                        };
294
295                    logger_guard.flush_section(&mut self.current_section);
296                    self.current_section = logger_guard
297                        .add_section(self.entry_type, self.minimum_allocation_amount)?;
298
299                    let result = self
300                        .current_section
301                        .append(obj)
302                        .map_err(|e| {
303                            CuError::from(
304                                "Failed to encode object in a newly minted section. Unrecoverable failure.",
305                            )
306                            .add_cause(e.to_string().as_str())
307                        })?; // If we fail just after creating a section, there is not much we can do.
308
309                    self.current_position += result;
310                    self.current_section.header.used += result as u32;
311                    self.last_log_bytes = result;
312                    Ok(())
313                }
314                _ => {
315                    let err =
316                        <&str as Into<CuError>>::into("Unexpected error while encoding object.")
317                            .add_cause(e.to_string().as_str());
318                    Err(err)
319                }
320            },
321        }
322    }
323
324    fn last_log_bytes(&self) -> Option<usize> {
325        Some(self.last_log_bytes)
326    }
327}
328
329impl<S: SectionStorage, L: UnifiedLogWrite<S>> Drop for LogStream<S, L> {
330    fn drop(&mut self) {
331        #[cfg(feature = "std")]
332        match self.parent_logger.lock() {
333            Ok(mut logger_guard) => {
334                logger_guard.flush_section(&mut self.current_section);
335            }
336            Err(_) => {
337                // Only surface the warning when a real poisoning occurred.
338                if !std::thread::panicking() {
339                    eprintln!("⚠️ MmapStream::drop: logger mutex poisoned");
340                }
341            }
342        }
343
344        #[cfg(not(feature = "std"))]
345        {
346            let mut logger_guard = self.parent_logger.lock();
347            logger_guard.flush_section(&mut self.current_section);
348        }
349    }
350}