roplat 0.2.0

roplat: just a robot operation system
Documentation
use std::future::Future;
use std::io;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;

use futures::future::Either;
use serde::Serialize;

use crate::rhythm::Rhythm;

use super::frame::RhythmMeta;
use super::orchestrator::ReplayOrchestrator;
use super::recording::RecordingRhythm;
use super::replay::{ReplayMode, ReplayRhythm, ReplayTiming};
use super::rlog::{FrameWriter, RlogReader};

// ============================================================================
// SessionRecorder
// ============================================================================

/// 录制会话:简化多节律域录制的设置�?
///
/// 封装 `FrameWriter` 和全局序号计数器,提供便捷的节律包�?API�?
///
/// # 示例
///
/// ```no_run
/// use roplat::rhythm::replay::*;
/// use roplat::rhythm::*;
/// use std::time::Duration;
///
/// # async fn example() {
/// let recorder = SessionRecorder::create("session.rlog", &[
///     RhythmMeta {
///         rhythm_id: 0,
///         name: "timer".into(),
///         yield_type_name: "()".into(),
///         feed_type_name: "()".into(),
///     },
/// ]).unwrap();
///
/// let timer = SysTimer::new(Duration::from_millis(10));
/// let mut recording_timer = recorder.wrap(timer, 0);
/// // recording_timer.drive(...).await;
/// recorder.flush();
/// # }
/// ```
pub struct SessionRecorder {
    writer: Arc<FrameWriter>,
    global_seq: Arc<AtomicU64>,
}

impl SessionRecorder {
    /// 创建录制会话,写�?.rlog 头部
    pub fn create(path: impl AsRef<Path>, rhythms: &[RhythmMeta]) -> io::Result<Self> {
        let writer = FrameWriter::create(path, rhythms)?;
        let global_seq = Arc::new(AtomicU64::new(0));
        Ok(Self { writer, global_seq })
    }

    /// �?`ROPLAT_RECORD` 环境变量创建录制会话�?
    ///
    /// 若环境变量未设置则返�?`None`�?
    pub fn from_env(rhythms: &[RhythmMeta]) -> Option<Self> {
        let path = std::env::var("ROPLAT_RECORD").ok()?;
        Some(Self::create(path, rhythms).expect("SessionRecorder: 无法创建录制文件"))
    }

    /// 包装节律为录制节�?
    pub fn wrap<R>(&self, rhythm: R, rhythm_id: u16) -> RecordingRhythm<R> {
        RecordingRhythm::new(
            rhythm,
            rhythm_id,
            self.writer.clone(),
            self.global_seq.clone(),
        )
    }

    /// 条件包装:若提供 recorder 则录制,否则直通�?
    ///
    /// 用于在同一份代码中同时支持录制和非录制模式�?
    ///
    /// ```no_run
    /// # use roplat::rhythm::replay::*;
    /// # use roplat::rhythm::*;
    /// # use std::time::Duration;
    /// let recorder = SessionRecorder::from_env(&[/* rhythms */]);
    /// let timer = SysTimer::new(Duration::from_millis(10));
    /// let mut timer = SessionRecorder::maybe_wrap(recorder.as_ref(), timer, 0);
    /// // timer 实现 Rhythm,无论是否在录制
    /// ```
    pub fn maybe_wrap<R>(
        recorder: Option<&Self>,
        rhythm: R,
        rhythm_id: u16,
    ) -> MaybeRecordingRhythm<R> {
        match recorder {
            Some(rec) => MaybeRecordingRhythm::Recording(rec.wrap(rhythm, rhythm_id)),
            None => MaybeRecordingRhythm::Passthrough(rhythm),
        }
    }

    /// 显式 flush(`SessionRecorder` �?drop 时也会自�?flush�?
    pub fn flush(&self) {
        self.writer.flush();
    }
}

// ============================================================================
// SessionPlayer
// ============================================================================

/// 回放会话:简�?.rlog 文件的加载和回放设置�?
///
/// # 示例
///
/// ```no_run
/// use roplat::rhythm::replay::*;
/// use roplat::rhythm::Rhythm;
///
/// # async fn example() {
/// let player = SessionPlayer::open("session.rlog").unwrap();
///
/// // 查看录制信息
/// println!("帧数: {}", player.frame_count());
/// for id in player.rhythm_ids() {
///     let meta = player.rhythm_meta(id).unwrap();
///     println!("  [{}] {}", id, meta.name);
/// }
///
/// // 创建回放节律
/// let mut replay: ReplayRhythm<usize, ()> = player.replay_rhythm(
///     0,
///     ReplayMode::InputOnly,
///     ReplayTiming::AsFast,
/// );
/// // replay.drive(...).await;
/// # }
/// ```
pub struct SessionPlayer {
    rlog: RlogReader,
}

impl SessionPlayer {
    /// 打开 .rlog 文件
    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
        let rlog = RlogReader::open(path)?;
        Ok(Self { rlog })
    }

    /// �?`ROPLAT_REPLAY` 环境变量打开回放会话�?
    ///
    /// 若环境变量未设置则返�?`None`�?
    pub fn from_env() -> Option<Self> {
        let path = std::env::var("ROPLAT_REPLAY").ok()?;
        Some(Self::open(path).expect("SessionPlayer: 无法打开 .rlog 文件"))
    }

    /// 创建指定节律域的回放节律
    pub fn replay_rhythm<Y, D>(
        &self,
        rhythm_id: u16,
        mode: ReplayMode,
        timing: ReplayTiming,
    ) -> ReplayRhythm<Y, D> {
        let frames = self.rlog.frames_for(rhythm_id);
        ReplayRhythm::new(frames, mode, timing)
    }

    /// 创建全局回放编排器(多域同步回放�?
    pub fn orchestrator(&self) -> ReplayOrchestrator {
        ReplayOrchestrator::from_rlog(&self.rlog)
    }

    /// 获取所有节律域 ID
    pub fn rhythm_ids(&self) -> Vec<u16> {
        let mut ids: Vec<u16> = self.rlog.frames.iter().map(|f| f.rhythm_id).collect();
        ids.sort_unstable();
        ids.dedup();
        ids
    }

    /// 获取节律域元信息
    pub fn rhythm_meta(&self, rhythm_id: u16) -> Option<&RhythmMeta> {
        self.rlog.rhythm_meta(rhythm_id)
    }

    /// 帧总数
    pub fn frame_count(&self) -> usize {
        self.rlog.frame_count()
    }

    /// 获取底层 RlogReader
    pub fn rlog(&self) -> &RlogReader {
        &self.rlog
    }
}

// ============================================================================
// MaybeRecordingRhythm
// ============================================================================

/// 条件录制节律:运行时决定是直通还是录制�?
///
/// 两个变体都实现相同的 `Rhythm` 关联类型,因此可以在不改�?
/// 下游调度代码的前提下透明切换录制/非录制模式�?
///
/// �?[`SessionRecorder::maybe_wrap()`] 创建�?
pub enum MaybeRecordingRhythm<R> {
    /// 直通:不录制,行为与原始节律完全一�?
    Passthrough(R),
    /// 录制:透明拦截每帧�?Yield/Feed 并写�?.rlog
    Recording(RecordingRhythm<R>),
}

impl<R> Rhythm for MaybeRecordingRhythm<R>
where
    R: Rhythm + Send,
    R::Yield: Serialize + Clone + Send,
    R::Feed: Serialize + Send,
    R::Output: Send,
    R::Error: Send,
{
    type Yield = R::Yield;
    type Feed = R::Feed;
    type Output = R::Output;
    type Error = R::Error;

    fn drive<N, F, Fut>(
        &mut self,
        nodes: N,
        op_domain: F,
    ) -> impl Future<Output = (Self::Output, N)> + Send
    where
        N: Send,
        F: FnMut(N, Self::Yield) -> Fut + Send,
        Fut: Future<Output = (Self::Feed, N)> + Send,
    {
        match self {
            Self::Passthrough(r) => Either::Left(r.drive(nodes, op_domain)),
            Self::Recording(r) => Either::Right(r.drive(nodes, op_domain)),
        }
    }
}