roplat 0.2.0

roplat: just a robot operation system
Documentation
use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use serde::Serialize;

use crate::rhythm::Rhythm;

use super::frame::FrameRecord;
use super::rlog::FrameWriter;

/// 录制包装器:透明地包装任�?Rhythm,在 drive 中拦截每帧的 Yield �?Feed�?
///
/// # 用法
///
/// ```no_run
/// # use roplat::rhythm::replay::*;
/// # use roplat::rhythm::*;
/// # use std::time::Duration;
/// // 1. 创建 FrameWriter(多�?RecordingRhythm 共享同一�?writer�?
/// let writer = FrameWriter::create("session.rlog", &[
///     RhythmMeta {
///         rhythm_id: 0,
///         name: "control_timer".into(),
///         yield_type_name: "EventMeta".into(),
///         feed_type_name: "()".into(),
///     },
/// ]).unwrap();
///
/// let global_seq = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
///
/// // 2. 包装真实节律
/// let timer = SysTimer::new(Duration::from_millis(10));
/// let mut recording = RecordingRhythm::new(timer, 0, writer, global_seq);
///
/// // 3. 像普�?Rhythm 一样使用——节点无感知
/// // recording.drive(nodes, op).await;
/// ```
pub struct RecordingRhythm<R> {
    inner: R,
    rhythm_id: u16,
    writer: Arc<FrameWriter>,
    seq: Arc<AtomicU64>,
}

impl<R> RecordingRhythm<R> {
    /// 构造录制包装节律。
    pub fn new(inner: R, rhythm_id: u16, writer: Arc<FrameWriter>, seq: Arc<AtomicU64>) -> Self {
        Self { inner, rhythm_id, writer, seq }
    }

    /// 获取内部节律的可变引�?
    pub fn inner_mut(&mut self) -> &mut R {
        &mut self.inner
    }
}

impl<R> Rhythm for RecordingRhythm<R>
where
    R: Rhythm + Send,
    R::Yield: Serialize + Clone + Send,
    R::Feed: Serialize + Send,
    R::Output: Send,
    R::Error: Send,
{
    type Yield = R::Yield;
    type Feed = R::Feed;
    type Output = R::Output;
    type Error = R::Error;

    fn drive<N, F, Fut>(
        &mut self,
        nodes: N,
        mut op_domain: F,
    ) -> impl Future<Output = (Self::Output, N)> + Send
    where
        N: Send,
        F: FnMut(N, Self::Yield) -> Fut + Send,
        Fut: Future<Output = (Self::Feed, N)> + Send,
    {
        let writer = self.writer.clone();
        let seq = self.seq.clone();
        let rid = self.rhythm_id;

        // 用拦截闭包包�?op_domain
        let wrapped = move |nodes: N, yield_val: Self::Yield| {
            let yield_bytes =
                bincode::serialize(&yield_val).expect("RecordingRhythm: yield serialize failed");
            let t = writer.elapsed_ns();

            let fut = op_domain(nodes, yield_val);

            let writer = writer.clone();
            let seq = seq.clone();

            async move {
                let (feed_val, returned_nodes) = fut.await;

                let feed_bytes =
                    bincode::serialize(&feed_val).expect("RecordingRhythm: feed serialize failed");

                writer.write_frame(&FrameRecord {
                    global_seq: seq.fetch_add(1, Ordering::SeqCst),
                    rhythm_id: rid,
                    timestamp_ns: t,
                    yield_data: yield_bytes,
                    feed_data: feed_bytes,
                });

                (feed_val, returned_nodes)
            }
        };

        self.inner.drive(nodes, wrapped)
    }
}