roplat 0.2.0

roplat: just a robot operation system
Documentation
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use tokio::sync::Notify;

use super::frame::FrameRecord;
use super::rlog::RlogReader;

/// 全量重放编排器:协调多个节律域按全局序号顺序执行。
///
/// 在全量重放模式下,所有节律域都是 ReplayRhythm。每个域在各自的 tokio task 中运行,
/// 但必须按照录制时的全局序号交错执行,以保证确定性。
///
/// # 工作方式
///
/// 1. 编排器根据 .rlog 中的帧数据,为每个域分配帧序列
/// 2. 维护一个全局 `current_seq` 计数器
/// 3. 每个域在执行前检查:当前帧的 `global_seq` 是否等于 `current_seq`
/// 4. 如果不等,等待 Notify 信号
/// 5. 执行完成后递增 `current_seq` 并通知所有域
pub struct ReplayOrchestrator {
    /// 当前应执行的全局序号
    current_seq: Arc<AtomicU64>,
    /// 通知所有域"序号已更新"
    advance_notify: Arc<Notify>,
    /// 各域的帧序列(按全局序号排序)
    domain_frames: HashMap<u16, Vec<FrameRecord>>,
}

impl ReplayOrchestrator {
    /// 从 RlogReader 构建编排器
    pub fn from_rlog(rlog: &RlogReader) -> Self {
        let mut domain_frames: HashMap<u16, Vec<FrameRecord>> = HashMap::new();
        for frame in &rlog.frames {
            domain_frames
                .entry(frame.rhythm_id)
                .or_default()
                .push(frame.clone());
        }
        // 各域内部按全局序号排序
        for frames in domain_frames.values_mut() {
            frames.sort_by_key(|f| f.global_seq);
        }

        let start_seq = rlog.frames.iter().map(|f| f.global_seq).min().unwrap_or(0);

        Self {
            current_seq: Arc::new(AtomicU64::new(start_seq)),
            advance_notify: Arc::new(Notify::new()),
            domain_frames,
        }
    }

    /// 获取指定域的 TurnToken(传给该域的 ReplayRhythm 使用)
    pub fn turn_token(&self, rhythm_id: u16) -> TurnToken {
        TurnToken {
            current_seq: self.current_seq.clone(),
            advance_notify: self.advance_notify.clone(),
            rhythm_id,
        }
    }

    /// 获取指定域的帧序列
    pub fn take_frames(&mut self, rhythm_id: u16) -> Vec<FrameRecord> {
        self.domain_frames.remove(&rhythm_id).unwrap_or_default()
    }

    /// 所有域 ID
    pub fn rhythm_ids(&self) -> Vec<u16> {
        self.domain_frames.keys().copied().collect()
    }
}

/// 域的执行令牌:在每帧执行前 await,等待自己的全局序号轮次。
#[derive(Clone)]
pub struct TurnToken {
    current_seq: Arc<AtomicU64>,
    advance_notify: Arc<Notify>,
    /// 所属域 ID(调试用)
    #[allow(dead_code)]
    rhythm_id: u16,
}

impl TurnToken {
    /// 等待全局序号到达 `expected_seq`,然后执行并推进
    pub async fn wait_turn(&self, expected_seq: u64) {
        loop {
            let current = self.current_seq.load(Ordering::SeqCst);
            if current == expected_seq {
                return;
            }
            // 还没轮到,等通知
            self.advance_notify.notified().await;
        }
    }

    /// 当前帧执行完毕,推进全局序号
    pub fn advance(&self) {
        self.current_seq.fetch_add(1, Ordering::SeqCst);
        self.advance_notify.notify_waiters();
    }
}