cu29-runtime 0.15.0

Copper Runtime Runtime crate. Copper is an engine for robotics.
Documentation
#![cfg(all(test, feature = "std"))]

use bincode::{Decode, Encode, config::standard, encode_to_vec};
use cu29::bincode::de::Decoder;
use cu29::bincode::enc::Encoder;
use cu29::bincode::error::{DecodeError, EncodeError};
use cu29::prelude::copper_runtime;
use cu29::prelude::*;
use cu29::simulation::recorded_copperlist_timestamp;
use cu29_export::{copperlists_reader, keyframes_reader};
use cu29_unifiedlog::memmap::{MmapSectionStorage, MmapUnifiedLoggerWrite};
use cu29_unifiedlog::{UnifiedLogger, UnifiedLoggerBuilder, UnifiedLoggerIOReader};
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};

#[derive(Default, Debug, Clone, Encode, Decode, Serialize, Deserialize, Reflect)]
struct CounterMsg {
    value: u32,
}

#[derive(Default, Debug, Clone, Encode, Decode, Serialize, Deserialize, Reflect)]
struct AccumMsg {
    sum: u32,
}

#[derive(Reflect)]
struct CounterSrc {
    next: u32,
}

impl Freezable for CounterSrc {
    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
        Encode::encode(&self.next, encoder)
    }

    fn thaw<D: Decoder>(&mut self, decoder: &mut D) -> Result<(), DecodeError> {
        self.next = Decode::decode(decoder)?;
        Ok(())
    }
}

impl CuSrcTask for CounterSrc {
    type Resources<'r> = ();
    type Output<'m> = output_msg!(CounterMsg);

    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self> {
        Ok(Self { next: 0 })
    }

    fn process(&mut self, _ctx: &CuContext, output: &mut Self::Output<'_>) -> CuResult<()> {
        self.next += 1;
        output.set_payload(CounterMsg { value: self.next });
        Ok(())
    }
}

#[derive(Reflect)]
struct Accumulator {
    sum: u32,
}

impl Freezable for Accumulator {
    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
        Encode::encode(&self.sum, encoder)
    }

    fn thaw<D: Decoder>(&mut self, decoder: &mut D) -> Result<(), DecodeError> {
        self.sum = Decode::decode(decoder)?;
        Ok(())
    }
}

impl CuTask for Accumulator {
    type Resources<'r> = ();
    type Input<'m> = input_msg!(CounterMsg);
    type Output<'m> = output_msg!(AccumMsg);

    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self> {
        Ok(Self { sum: 0 })
    }

    fn process(
        &mut self,
        _ctx: &CuContext,
        input: &Self::Input<'_>,
        output: &mut Self::Output<'_>,
    ) -> CuResult<()> {
        if let Some(msg) = input.payload() {
            self.sum += msg.value;
            output.set_payload(AccumMsg { sum: self.sum });
        } else {
            output.clear_payload();
        }
        Ok(())
    }
}

#[derive(Reflect)]
struct SpySink {
    last: Option<u32>,
}

impl Freezable for SpySink {
    fn freeze<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
        Encode::encode(&self.last, encoder)
    }

    fn thaw<D: Decoder>(&mut self, decoder: &mut D) -> Result<(), DecodeError> {
        self.last = Decode::decode(decoder)?;
        Ok(())
    }
}

impl CuSinkTask for SpySink {
    type Resources<'r> = ();
    type Input<'m> = input_msg!(AccumMsg);

    fn new(_config: Option<&ComponentConfig>, _resources: Self::Resources<'_>) -> CuResult<Self> {
        Ok(Self { last: None })
    }

    fn process(&mut self, _ctx: &CuContext, input: &Self::Input<'_>) -> CuResult<()> {
        self.last = input.payload().map(|payload| payload.sum);
        Ok(())
    }
}

#[copper_runtime(config = "tests/replay_primitives_config.ron", sim_mode = true)]
struct ReplayApp {}

fn build_logger(path: &Path) -> CuResult<Arc<Mutex<MmapUnifiedLoggerWrite>>> {
    if let Some(parent) = path.parent() {
        fs::create_dir_all(parent)
            .map_err(|e| cu29::CuError::new_with_cause("create log dir failed", e))?;
    }
    let UnifiedLogger::Write(writer) = UnifiedLoggerBuilder::new()
        .write(true)
        .create(true)
        .preallocated_size(16 * 1024 * 1024)
        .file_base_name(path)
        .build()
        .map_err(|e| cu29::CuError::new_with_cause("logger init failed", e))?
    else {
        return Err(cu29::CuError::from("logger builder did not return writer"));
    };
    Ok(Arc::new(Mutex::new(writer)))
}

fn read_first_copperlist<P: CopperListTuple>(path: &Path) -> CuResult<CopperList<P>> {
    let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
        .file_base_name(path)
        .build()
        .map_err(|e| cu29::CuError::new_with_cause("open copperlist log failed", e))?
    else {
        return Err(cu29::CuError::from("logger builder did not return reader"));
    };
    let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::CopperList);
    copperlists_reader::<P>(&mut reader)
        .next()
        .ok_or_else(|| cu29::CuError::from("recorded log did not contain a copperlist"))
}

fn read_first_keyframe(path: &Path) -> CuResult<KeyFrame> {
    let UnifiedLogger::Read(read_logger) = UnifiedLoggerBuilder::new()
        .file_base_name(path)
        .build()
        .map_err(|e| cu29::CuError::new_with_cause("open keyframe log failed", e))?
    else {
        return Err(cu29::CuError::from("logger builder did not return reader"));
    };
    let mut reader = UnifiedLoggerIOReader::new(read_logger, UnifiedLogType::FrozenTasks);
    keyframes_reader(&mut reader)
        .next()
        .ok_or_else(|| cu29::CuError::from("recorded log did not contain a keyframe"))
}

fn encode_bytes<T: Encode>(value: &T) -> Vec<u8> {
    encode_to_vec(value, standard()).expect("encode value for deterministic comparison")
}

fn record_reference_run(
    log_path: &Path,
) -> CuResult<(CopperList<default::CuStampedDataSet>, KeyFrame)> {
    let logger = build_logger(log_path)?;
    let (clock, _clock_mock) = RobotClock::mock();
    let mut noop = |_step: default::SimStep<'_>| SimOverride::ExecuteByRuntime;

    let mut app = ReplayApp::builder()
        .with_clock(clock)
        .with_logger::<MmapSectionStorage, MmapUnifiedLoggerWrite>(logger)
        .with_sim_callback(&mut noop)
        .build()?;

    app.start_all_tasks(&mut noop)?;
    app.run_one_iteration(&mut noop)?;
    app.stop_all_tasks(&mut noop)?;

    drop(app);

    Ok((
        read_first_copperlist::<default::CuStampedDataSet>(log_path)?,
        read_first_keyframe(log_path)?,
    ))
}

fn replay_run(
    log_path: &Path,
    recorded_cl: &CopperList<default::CuStampedDataSet>,
    keyframe: Option<&KeyFrame>,
) -> CuResult<(CopperList<default::CuStampedDataSet>, KeyFrame, u64)> {
    let logger = build_logger(log_path)?;
    let (clock, clock_mock) = RobotClock::mock();
    let mut noop = |_step: default::SimStep<'_>| SimOverride::ExecuteByRuntime;

    let mut app = ReplayApp::builder()
        .with_clock(clock)
        .with_logger::<MmapSectionStorage, MmapUnifiedLoggerWrite>(logger)
        .with_sim_callback(&mut noop)
        .build()?;

    app.start_all_tasks(&mut noop)?;
    app.replay_recorded_copperlist(&clock_mock, recorded_cl, keyframe)?;
    app.stop_all_tasks(&mut noop)?;

    let clock_value = clock_mock.value();
    drop(app);

    Ok((
        read_first_copperlist::<default::CuStampedDataSet>(log_path)?,
        read_first_keyframe(log_path)?,
        clock_value,
    ))
}

#[test]
fn replay_recorded_copperlist_reproduces_copperlist_and_keyframe() -> CuResult<()> {
    let temp_dir = tempfile::tempdir()
        .map_err(|e| cu29::CuError::new_with_cause("create temp dir failed", e))?;
    let record_path = temp_dir.path().join("recorded.copper");
    let replay_path = temp_dir.path().join("replayed.copper");

    let (recorded_cl, recorded_kf) = record_reference_run(&record_path)?;
    let (replayed_cl, replayed_kf, replay_clock) =
        replay_run(&replay_path, &recorded_cl, Some(&recorded_kf))?;

    assert_eq!(encode_bytes(&replayed_cl), encode_bytes(&recorded_cl));
    assert_eq!(encode_bytes(&replayed_kf), encode_bytes(&recorded_kf));
    assert_eq!(replay_clock, recorded_kf.timestamp.as_nanos());

    Ok(())
}

#[test]
fn replay_recorded_copperlist_without_keyframe_uses_recorded_timestamp() -> CuResult<()> {
    let temp_dir = tempfile::tempdir()
        .map_err(|e| cu29::CuError::new_with_cause("create temp dir failed", e))?;
    let record_path = temp_dir.path().join("recorded_no_kf.copper");
    let replay_path = temp_dir.path().join("replayed_no_kf.copper");

    let (recorded_cl, _recorded_kf) = record_reference_run(&record_path)?;
    let expected_timestamp = recorded_copperlist_timestamp(&recorded_cl)
        .expect("reference copperlist should contain a process timestamp")
        .as_nanos();

    let (replayed_cl, _replayed_kf, replay_clock) = replay_run(&replay_path, &recorded_cl, None)?;

    assert_eq!(encode_bytes(&replayed_cl), encode_bytes(&recorded_cl));
    assert_eq!(replay_clock, expected_timestamp);

    Ok(())
}

#[test]
fn builder_propagates_instance_id_into_runtime() -> CuResult<()> {
    let temp_dir = tempfile::tempdir()
        .map_err(|e| cu29::CuError::new_with_cause("create temp dir failed", e))?;
    let log_path = temp_dir.path().join("instance_id_runtime.copper");
    let logger = build_logger(&log_path)?;
    let (clock, _clock_mock) = RobotClock::mock();
    let mut noop = |_step: default::SimStep<'_>| SimOverride::ExecuteByRuntime;

    let mut app = ReplayApp::builder()
        .with_clock(clock)
        .with_logger::<MmapSectionStorage, MmapUnifiedLoggerWrite>(logger)
        .with_instance_id(42)
        .with_sim_callback(&mut noop)
        .build()?;

    assert_eq!(app.copper_runtime_mut().instance_id(), 42);
    Ok(())
}