roplat 0.2.0

roplat: just a robot operation system
Documentation
use std::fmt::Debug;
use std::future::Future;
use std::marker::PhantomData;
use std::time::Duration;

use serde::Serialize;
use serde::de::DeserializeOwned;

use crate::RoplatError;
use crate::rhythm::Rhythm;

use super::frame::FrameRecord;

// ============================================================================
// 回放模式配置
// ============================================================================

/// Feed 校验策略
#[derive(Debug, Clone)]
pub enum FeedCheck {
    /// 不校�?
    None,
    /// 严格字节级比�?
    Exact,
    /// 浮点容差比较
    Tolerance(f64),
}

/// 回放模式
#[derive(Debug, Clone)]
pub enum ReplayMode {
    /// 全量回放:喂录制�?Yield,执行节点,校验 Feed
    Full {
        /// Feed validation strategy.
        tolerance: FeedCheck,
    },
    /// 仅输入回放:喂录制的 Yield,执行节点,不校�?Feed(允许节点逻辑已修改)
    InputOnly,
    /// 输出回放:不执行节点,直接产出录制的 Feed(用于给下游 live 域提供数据)
    OutputOnly,
}

/// 回放时序控制
#[derive(Debug)]
pub enum ReplayTiming {
    /// 全速回放(测试/调试用)
    AsFast,
    /// 按录制时的时间间隔回放(�?live 域同步)
    Realtime,
    /// 倍速回�?
    Scaled(f64),
}

// ============================================================================
// ReplayRhythm
// ============================================================================

/// 回放节律:用录制的帧数据替代真实节律源,驱动节点执行�?
///
/// # 类型参数
///
/// - `Y`: Yield 类型(录制时节律给节点的输入�?
/// - `D`: Feed 类型(录制时节点返回给节律的输出�?
///
/// # 示例
///
/// ```no_run
/// # use roplat::rhythm::replay::*;
/// # use roplat::rhythm::Rhythm;
/// # async fn example() {
/// let rlog = RlogReader::open("session.rlog").unwrap();
/// let frames = rlog.frames_for(0); // 获取节律�?0 的帧
///
/// let mut replay = ReplayRhythm::<f64, ()>::new(
///     frames,
///     ReplayMode::InputOnly,
///     ReplayTiming::AsFast,
/// );
///
/// // 像普�?Rhythm 一样使�?
/// // replay.drive(nodes, op).await;
/// # }
/// ```
pub struct ReplayRhythm<Y, D> {
    frames: Vec<FrameRecord>,
    cursor: usize,
    mode: ReplayMode,
    timing: ReplayTiming,
    last_timestamp_ns: Option<u64>,
    _phantom: PhantomData<(Y, D)>,
}

impl<Y, D> ReplayRhythm<Y, D> {
    /// 构造回放节律。
    pub fn new(frames: Vec<FrameRecord>, mode: ReplayMode, timing: ReplayTiming) -> Self {
        Self {
            frames,
            cursor: 0,
            mode,
            timing,
            last_timestamp_ns: None,
            _phantom: PhantomData,
        }
    }

    /// 剩余帧数
    pub fn remaining(&self) -> usize {
        self.frames.len().saturating_sub(self.cursor)
    }

    /// 是否已结�?
    pub fn is_finished(&self) -> bool {
        self.cursor >= self.frames.len()
    }

    /// 重置到开�?
    pub fn rewind(&mut self) {
        self.cursor = 0;
        self.last_timestamp_ns = None;
    }

    /// 计算帧间等待时间
    fn frame_delay(&mut self, timestamp_ns: u64) -> Option<Duration> {
        let delay = match &self.timing {
            ReplayTiming::AsFast => None,
            ReplayTiming::Realtime => {
                if let Some(last) = self.last_timestamp_ns {
                    let diff = timestamp_ns.saturating_sub(last);
                    Some(Duration::from_nanos(diff))
                } else {
                    None
                }
            }
            ReplayTiming::Scaled(factor) => {
                if let Some(last) = self.last_timestamp_ns {
                    let diff = timestamp_ns.saturating_sub(last);
                    let scaled = (diff as f64 / factor) as u64;
                    Some(Duration::from_nanos(scaled))
                } else {
                    None
                }
            }
        };
        self.last_timestamp_ns = Some(timestamp_ns);
        delay
    }
}

/// Feed 校验失败时的信息
#[derive(Debug)]
pub struct FeedMismatch {
    /// 发生校验失败的全局序号。
    pub global_seq: u64,
    /// 对应节律域 ID。
    pub rhythm_id: u16,
}

impl<Y, D> Rhythm for ReplayRhythm<Y, D>
where
    Y: DeserializeOwned + Send,
    D: DeserializeOwned + Serialize + Send,
{
    type Yield = Y;
    type Feed = D;
    type Output = Vec<FeedMismatch>;
    type Error = RoplatError;

    async fn drive<N, F, Fut>(
        &mut self,
        mut nodes: N,
        mut op_domain: F,
    ) -> (Self::Output, N)
    where
        N: Send,
        F: FnMut(N, Self::Yield) -> Fut + Send,
        Fut: Future<Output = (Self::Feed, N)> + Send,
    {
        let mut mismatches = Vec::new();

        while self.cursor < self.frames.len() {
            // 预先拷贝帧数据,避免不可变借用与可变借用冲突
            let frame_seq = self.frames[self.cursor].global_seq;
            let frame_rid = self.frames[self.cursor].rhythm_id;
            let frame_ts = self.frames[self.cursor].timestamp_ns;
            let yield_data = self.frames[self.cursor].yield_data.clone();
            let feed_data = self.frames[self.cursor].feed_data.clone();
            self.cursor += 1;

            // 时序控制
            if let Some(delay) = self.frame_delay(frame_ts)
                && !delay.is_zero()
            {
                tokio::time::sleep(delay).await;
            }

            // 反序列化 Yield
            let yield_val: Y = bincode::deserialize(&yield_data)
                .expect("ReplayRhythm: failed to deserialize yield data; type mismatch?");

            if let ReplayMode::OutputOnly = &self.mode {
                // 不执行节点,跳过
                // 注意:OutputOnly 模式�?Feed 通过其他途径传递给下游
                continue;
            }

            // 执行节点
            let (feed_val, returned) = op_domain(nodes, yield_val).await;
            nodes = returned;

            // 校验 Feed
            if let ReplayMode::Full { tolerance } = &self.mode {
                let matched = match tolerance {
                    FeedCheck::None => true,
                    FeedCheck::Exact => {
                        let actual_bytes = bincode::serialize(&feed_val)
                            .expect("ReplayRhythm: feed serialize failed");
                        actual_bytes == feed_data
                    }
                    FeedCheck::Tolerance(eps) => {
                        let actual_bytes = bincode::serialize(&feed_val)
                            .expect("ReplayRhythm: feed serialize failed");
                        if actual_bytes.len() != feed_data.len() {
                            false
                        } else {
                            tolerance_check(&actual_bytes, &feed_data, *eps)
                        }
                    }
                };

                if !matched {
                    mismatches.push(FeedMismatch { global_seq: frame_seq, rhythm_id: frame_rid });
                }
            }
        }

        (mismatches, nodes)
    }
}

/// 简易浮点容差检查:将字节序列视�?f64 数组逐一比较
fn tolerance_check(a: &[u8], b: &[u8], eps: f64) -> bool {
    if a.len() != b.len() {
        return false;
    }

    // 如果长度不是 8 的整数倍,做字节级比较
    if !a.len().is_multiple_of(8) {
        return a == b;
    }

    for chunk_idx in (0..a.len()).step_by(8) {
        let va = f64::from_le_bytes(a[chunk_idx..chunk_idx + 8].try_into().unwrap());
        let vb = f64::from_le_bytes(b[chunk_idx..chunk_idx + 8].try_into().unwrap());
        if (va - vb).abs() > eps {
            return false;
        }
    }
    true
}