Skip to main content

cu29_runtime/
logcodec.rs

1#[cfg(not(feature = "std"))]
2extern crate alloc;
3
4use crate::config::{ComponentConfig, CuConfig, LoggingCodecSpec};
5use crate::cutask::{CuMsg, CuMsgMetadata, CuMsgPayload};
6use alloc::boxed::Box;
7use alloc::format;
8use alloc::string::{String, ToString};
9#[cfg(feature = "std")]
10use bincode::config::standard;
11use bincode::de::{Decode, Decoder};
12#[cfg(feature = "std")]
13use bincode::decode_from_std_read;
14use bincode::enc::{Encode, Encoder};
15use bincode::error::{DecodeError, EncodeError};
16use core::any::TypeId;
17use cu29_clock::Tov;
18use cu29_traits::{CuError, CuResult, observed_encode_bytes};
19use hashbrown::HashMap;
20use portable_atomic::{AtomicU64, Ordering};
21use serde::de::DeserializeOwned;
22#[cfg(feature = "std")]
23use std::io::Read;
24#[cfg(feature = "std")]
25use std::path::Path;
26
27#[cfg(not(feature = "std"))]
28mod imp {
29    pub use spin::Mutex;
30    pub use spin::once::Once as OnceLock;
31}
32
33#[cfg(feature = "std")]
34mod imp {
35    pub use std::sync::{Mutex, OnceLock};
36}
37
38use imp::*;
39
40#[cfg(feature = "std")]
41use crate::curuntime::{RuntimeLifecycleEvent, RuntimeLifecycleRecord};
42#[cfg(feature = "std")]
43use cu29_unifiedlog::{UnifiedLogger, UnifiedLoggerBuilder, UnifiedLoggerIOReader};
44
45#[cfg(feature = "std")]
46fn lock_mutex<T>(m: &Mutex<T>) -> std::sync::MutexGuard<'_, T> {
47    m.lock().unwrap_or_else(|e| e.into_inner())
48}
49
50#[cfg(not(feature = "std"))]
51fn lock_mutex<T>(m: &Mutex<T>) -> spin::MutexGuard<'_, T> {
52    m.lock()
53}
54
55pub trait CuLogCodec<P: CuMsgPayload>: 'static {
56    type Config: DeserializeOwned + Default;
57
58    fn new(config: Self::Config) -> CuResult<Self>
59    where
60        Self: Sized;
61
62    /// Returns handle-backed source bytes read directly by the codec.
63    ///
64    /// This reports only extra handle-backed residency beyond the payload's
65    /// fixed `size_of::<P>()` footprint already accounted by runtime
66    /// monitoring. Codecs must implement this explicitly so they opt into the
67    /// correct accounting model for their payload type.
68    fn source_payload_handle_bytes(&self, payload: &P) -> usize;
69
70    fn encode_payload<E: Encoder>(
71        &mut self,
72        payload: &P,
73        encoder: &mut E,
74    ) -> Result<(), EncodeError>;
75
76    fn decode_payload<D: Decoder<Context = ()>>(
77        &mut self,
78        decoder: &mut D,
79    ) -> Result<P, DecodeError>;
80}
81
82pub struct CodecState<C> {
83    inner: Mutex<Option<(u64, C)>>,
84}
85
86impl<C> CodecState<C> {
87    pub const fn new() -> Self {
88        Self {
89            inner: Mutex::new(None),
90        }
91    }
92}
93
94impl<C> Default for CodecState<C> {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100pub struct EffectiveConfigEntry {
101    version: AtomicU64,
102    ron: Mutex<String>,
103}
104
105impl EffectiveConfigEntry {
106    fn new(ron: &str) -> Self {
107        Self {
108            version: AtomicU64::new(1),
109            ron: Mutex::new(ron.to_string()),
110        }
111    }
112
113    pub fn version(&self) -> u64 {
114        self.version.load(Ordering::Acquire)
115    }
116
117    pub fn ron(&self) -> String {
118        lock_mutex(&self.ron).clone()
119    }
120
121    fn set(&self, ron: &str) {
122        *lock_mutex(&self.ron) = ron.to_string();
123        self.version.fetch_add(1, Ordering::AcqRel);
124    }
125}
126
127type EffectiveConfigRegistry = HashMap<TypeId, &'static EffectiveConfigEntry>;
128
129static EFFECTIVE_CONFIGS: OnceLock<Mutex<EffectiveConfigRegistry>> = OnceLock::new();
130
131#[cfg(feature = "std")]
132fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
133    EFFECTIVE_CONFIGS.get_or_init(|| Mutex::new(HashMap::new()))
134}
135
136#[cfg(not(feature = "std"))]
137fn effective_config_registry() -> &'static Mutex<EffectiveConfigRegistry> {
138    EFFECTIVE_CONFIGS.call_once(|| Mutex::new(HashMap::new()))
139}
140
141pub fn effective_config_entry<T: 'static>(default_ron: &str) -> &'static EffectiveConfigEntry {
142    let registry = effective_config_registry();
143    let mut registry = lock_mutex(registry);
144    if let Some(entry) = registry.get(&TypeId::of::<T>()) {
145        return entry;
146    }
147
148    let entry = Box::leak(Box::new(EffectiveConfigEntry::new(default_ron)));
149    registry.insert(TypeId::of::<T>(), entry);
150    entry
151}
152
153pub fn set_effective_config_ron<T: 'static>(ron: &str) {
154    effective_config_entry::<T>(ron).set(ron);
155}
156
157pub fn with_codec_for_encode<C, R, B, F>(
158    state: &'static CodecState<C>,
159    config_entry: &EffectiveConfigEntry,
160    build: B,
161    f: F,
162) -> Result<R, EncodeError>
163where
164    B: FnOnce(&str) -> CuResult<C>,
165    F: FnOnce(&mut C) -> Result<R, EncodeError>,
166{
167    let version = config_entry.version();
168    let mut guard = lock_mutex(&state.inner);
169    if guard
170        .as_ref()
171        .is_none_or(|(cached_version, _)| *cached_version != version)
172    {
173        let effective_config_ron = config_entry.ron();
174        let codec = build(&effective_config_ron)
175            .map_err(|err| EncodeError::OtherString(err.to_string()))?;
176        *guard = Some((version, codec));
177    }
178    let (_, codec) = guard
179        .as_mut()
180        .expect("codec state must be initialized after build");
181    f(codec)
182}
183
184pub fn with_codec_for_decode<C, R, B, F>(
185    state: &'static CodecState<C>,
186    config_entry: &EffectiveConfigEntry,
187    build: B,
188    f: F,
189) -> Result<R, DecodeError>
190where
191    B: FnOnce(&str) -> CuResult<C>,
192    F: FnOnce(&mut C) -> Result<R, DecodeError>,
193{
194    let version = config_entry.version();
195    let mut guard = lock_mutex(&state.inner);
196    if guard
197        .as_ref()
198        .is_none_or(|(cached_version, _)| *cached_version != version)
199    {
200        let effective_config_ron = config_entry.ron();
201        let codec = build(&effective_config_ron)
202            .map_err(|err| DecodeError::OtherString(err.to_string()))?;
203        *guard = Some((version, codec));
204    }
205    let (_, codec) = guard
206        .as_mut()
207        .expect("codec state must be initialized after build");
208    f(codec)
209}
210
211pub fn resolve_task_output_codec<'a>(
212    config: &'a CuConfig,
213    mission_id: Option<&str>,
214    task_id: &str,
215    msg_type: &str,
216) -> CuResult<Option<&'a LoggingCodecSpec>> {
217    let node = config.find_task_node(mission_id, task_id).ok_or_else(|| {
218        CuError::from(format!(
219            "Could not find task '{task_id}' while resolving log codec for '{msg_type}'."
220        ))
221    })?;
222
223    let codec_id = node
224        .get_logging()
225        .and_then(|logging| logging.codec_for_msg_type(msg_type));
226    let Some(codec_id) = codec_id else {
227        return Ok(None);
228    };
229
230    config
231        .find_logging_codec_spec(codec_id)
232        .map(Some)
233        .ok_or_else(|| {
234            CuError::from(format!(
235                "Task '{task_id}' binds output '{msg_type}' to unknown logging codec '{codec_id}'."
236            ))
237        })
238}
239
240pub fn instantiate_codec<C, P>(
241    effective_config_ron: &str,
242    mission_id: Option<&str>,
243    task_id: &str,
244    msg_type: &str,
245    expected_type_path: &str,
246) -> CuResult<C>
247where
248    C: CuLogCodec<P>,
249    P: CuMsgPayload,
250{
251    let config = CuConfig::deserialize_ron(effective_config_ron)?;
252    let spec = resolve_task_output_codec(&config, mission_id, task_id, msg_type)?.ok_or_else(
253        || {
254            CuError::from(format!(
255                "Task '{task_id}' output '{msg_type}' has no configured logging codec in the effective config."
256            ))
257        },
258    )?;
259
260    if spec.type_ != expected_type_path {
261        return Err(CuError::from(format!(
262            "Task '{task_id}' output '{msg_type}' resolved logging codec type '{}' but '{}' was compiled for this slot.",
263            spec.type_, expected_type_path
264        )));
265    }
266
267    let codec_config = deserialize_codec_config::<C, P>(spec.config.as_ref())?;
268    C::new(codec_config)
269}
270
271pub fn deserialize_codec_config<C, P>(config: Option<&ComponentConfig>) -> CuResult<C::Config>
272where
273    C: CuLogCodec<P>,
274    P: CuMsgPayload,
275{
276    match config {
277        Some(config) => config.deserialize_into::<C::Config>().map_err(|err| {
278            CuError::from(format!(
279                "Failed to deserialize logging codec config for payload '{}': {err}",
280                core::any::type_name::<P>()
281            ))
282        }),
283        None => Ok(C::Config::default()),
284    }
285}
286
287pub fn encode_msg_with_codec<T, C, E>(
288    msg: &CuMsg<T>,
289    codec: &mut C,
290    encoder: &mut E,
291) -> Result<(), EncodeError>
292where
293    T: CuMsgPayload,
294    C: CuLogCodec<T>,
295    E: Encoder,
296{
297    match msg.payload() {
298        None => {
299            0u8.encode(encoder)?;
300        }
301        Some(payload) => {
302            1u8.encode(encoder)?;
303            let encoded_start = observed_encode_bytes();
304            let handle_start = crate::monitoring::current_payload_handle_bytes();
305            let source_handle_bytes = codec.source_payload_handle_bytes(payload);
306            if source_handle_bytes > 0 {
307                crate::monitoring::record_payload_handle_bytes(source_handle_bytes);
308            }
309            codec.encode_payload(payload, encoder)?;
310            let encoded_bytes = observed_encode_bytes().saturating_sub(encoded_start);
311            let handle_bytes =
312                crate::monitoring::current_payload_handle_bytes().saturating_sub(handle_start);
313            crate::monitoring::record_current_slot_payload_io_stats(
314                core::mem::size_of::<T>(),
315                encoded_bytes,
316                handle_bytes,
317            );
318        }
319    }
320    msg.tov.encode(encoder)?;
321    msg.metadata.encode(encoder)?;
322    Ok(())
323}
324
325pub fn decode_msg_with_codec<T, C, D>(
326    decoder: &mut D,
327    codec: &mut C,
328) -> Result<CuMsg<T>, DecodeError>
329where
330    T: CuMsgPayload,
331    C: CuLogCodec<T>,
332    D: Decoder<Context = ()>,
333{
334    let present: u8 = Decode::decode(decoder)?;
335    let payload = match present {
336        0 => None,
337        1 => Some(codec.decode_payload(decoder)?),
338        value => {
339            return Err(DecodeError::OtherString(format!(
340                "Invalid CuMsg presence tag {value} for payload '{}'",
341                core::any::type_name::<T>()
342            )));
343        }
344    };
345    let tov: Tov = Decode::decode(decoder)?;
346    let metadata: CuMsgMetadata = Decode::decode(decoder)?;
347    Ok(CuMsg::from_parts(payload, tov, metadata))
348}
349
350#[cfg(feature = "std")]
351fn read_next_entry<T: Decode<()>>(src: &mut impl Read) -> CuResult<Option<T>> {
352    match decode_from_std_read::<T, _, _>(src, standard()) {
353        Ok(entry) => Ok(Some(entry)),
354        Err(DecodeError::UnexpectedEnd { .. }) => Ok(None),
355        Err(DecodeError::Io { inner, .. }) if inner.kind() == std::io::ErrorKind::UnexpectedEof => {
356            Ok(None)
357        }
358        Err(err) => Err(CuError::new_with_cause(
359            "Failed to decode runtime lifecycle entry while loading effective log config",
360            err,
361        )),
362    }
363}
364
365#[cfg(feature = "std")]
366pub fn read_effective_config_ron_from_log(log_base: &Path) -> CuResult<Option<String>> {
367    let logger = UnifiedLoggerBuilder::new()
368        .file_base_name(log_base)
369        .build()
370        .map_err(|err| {
371            CuError::new_with_cause(
372                &format!(
373                    "Failed to open Copper log '{}' while loading effective log config",
374                    log_base.display()
375                ),
376                err,
377            )
378        })?;
379    let UnifiedLogger::Read(read_logger) = logger else {
380        return Err(CuError::from(
381            "Expected readable unified logger while loading effective log config",
382        ));
383    };
384
385    let mut reader =
386        UnifiedLoggerIOReader::new(read_logger, cu29_traits::UnifiedLogType::RuntimeLifecycle);
387    while let Some(record) = read_next_entry::<RuntimeLifecycleRecord>(&mut reader)? {
388        if let RuntimeLifecycleEvent::Instantiated {
389            effective_config_ron,
390            ..
391        } = record.event
392        {
393            return Ok(Some(effective_config_ron));
394        }
395    }
396
397    Ok(None)
398}
399
400#[cfg(feature = "std")]
401pub fn seed_effective_config_from_log<T: 'static>(log_base: &Path) -> CuResult<Option<String>> {
402    let effective_config_ron = read_effective_config_ron_from_log(log_base)?;
403    if let Some(ref ron) = effective_config_ron {
404        set_effective_config_ron::<T>(ron);
405    }
406    Ok(effective_config_ron)
407}