smux_rust 0.2.1

A simple multiplexing library for Rust, inspired by xtaci/smux
use crate::frame::Frame;
use tokio::sync::Mutex;
use std::collections::{BinaryHeap, HashMap, VecDeque};

/// 帧类别
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ClassId {
    /// 控制信号(优先)
    Ctrl = 0,
    /// 数据
    Data = 1,
}

/// 写请求
#[derive(Debug, Clone)]
pub struct WriteRequest {
    pub class: ClassId,
    pub frame: Frame,
    pub seq: u32,
}

/// 写结果
#[derive(Debug)]
pub struct WriteResult {
    pub n: usize,
    pub err: Option<String>,
}

/// 流量整形队列
/// 使用轮询调度算法在多个流之间公平分配带宽
pub struct ShaperQueue {
    /// 每个流的请求堆(按类别和序列号排序)
    streams: HashMap<u32, BinaryHeap<WriteRequest>>,
    /// 轮询列表(流ID的循环队列)
    rr_list: VecDeque<u32>,
    /// 下一个要处理的流索引
    next: usize,
    /// 总请求数
    count: usize,
    mu: Mutex<()>,
}

impl ShaperQueue {
    /// 创建新的整形队列
    pub fn new() -> Self {
        Self {
            streams: HashMap::new(),
            rr_list: VecDeque::new(),
            next: 0,
            count: 0,
            mu: Mutex::new(()),
        }
    }

    /// 添加写请求到队列
    pub async fn push(&mut self, req: WriteRequest) {
        let _guard = self.mu.lock().await;
        let sid = req.frame.sid;

        // 如果流不存在,创建新的堆
        if !self.streams.contains_key(&sid) {
            self.streams.insert(sid, BinaryHeap::new());
            self.rr_list.push_back(sid);
            if self.rr_list.len() == 1 {
                self.next = 0;
            }
        }

        // 将请求推入对应流的堆
        if let Some(heap) = self.streams.get_mut(&sid) {
            heap.push(req);
            self.count += 1;
        }
    }

    /// 使用轮询算法从队列中弹出请求
    /// 首先检查所有流中的控制帧,然后按轮询顺序处理数据帧
    pub async fn pop(&mut self) -> Option<WriteRequest> {
        let _guard = self.mu.lock().await;

        if self.rr_list.is_empty() || self.count == 0 {
            return None;
        }

        // 首先查找所有流中的控制帧(全局优先级)
        let start = self.next;
        let mut current = start;
        loop {
            let sid = self.rr_list[current];
            if let Some(heap) = self.streams.get_mut(&sid) {
                // 检查堆顶是否是控制帧
                // BinaryHeap 是最大堆,堆顶是优先级最高的元素
                if !heap.is_empty() {
                    // 创建一个临时副本来检查顶部元素(不修改原堆)
                    let top_req = heap.peek().unwrap();
                    if top_req.class == ClassId::Ctrl {
                        let req = heap.pop().unwrap();
                        self.count -= 1;

                        // 更新下一个索引
                        self.next = (current + 1) % self.rr_list.len();

                        // 如果堆为空,删除流
                        if heap.is_empty() {
                            self.streams.remove(&sid);
                            self.rr_list.remove(current);
                            if self.rr_list.is_empty() {
                                self.next = 0;
                            } else {
                                // 调整索引
                                if self.next >= self.rr_list.len() {
                                    self.next = 0;
                                }
                            }
                        }

                        return Some(req);
                    }
                }
            }

            // 移动到下一个流
            current = (current + 1) % self.rr_list.len();
            if current == start {
                break;
            }
        }

        // 如果没有控制帧,按轮询顺序处理数据帧
        let start = self.next;
        let mut current = start;
        loop {
            let sid = self.rr_list[current];
            if let Some(heap) = self.streams.get_mut(&sid) {
                if let Some(req) = heap.pop() {
                    self.count -= 1;

                    // 更新下一个索引
                    self.next = (current + 1) % self.rr_list.len();

                    // 如果堆为空,删除流
                    if heap.is_empty() {
                        self.streams.remove(&sid);
                        self.rr_list.remove(current);
                        if self.rr_list.is_empty() {
                            self.next = 0;
                        } else {
                            // 调整索引
                            if self.next >= self.rr_list.len() {
                                self.next = 0;
                            }
                        }
                    }

                    return Some(req);
                }
            }

            // 移动到下一个流
            current = (current + 1) % self.rr_list.len();
            if current == start {
                // 完成一轮,没有找到请求
                break;
            }
        }

        None
    }

    /// 检查队列是否为空
    pub async fn is_empty(&self) -> bool {
        let _guard = self.mu.lock().await;
        self.count == 0
    }

    /// 返回队列中的总请求数
    pub async fn len(&self) -> usize {
        let _guard = self.mu.lock().await;
        self.count
    }
}

impl Default for ShaperQueue {
    fn default() -> Self {
        Self::new()
    }
}

// 为 WriteRequest 实现 Ord,以便在 BinaryHeap 中使用
impl Ord for WriteRequest {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        // BinaryHeap 是最大堆,所以我们需要反转比较
        // 控制帧(Ctrl=0)应该比数据帧(Data=1)优先
        // 所以我们反转类别比较,使得 Ctrl > Data
        match other.class.cmp(&self.class) {  // 注意这里反转了 self 和 other
            std::cmp::Ordering::Equal => {
                // 相同类别时,按序列号排序(较小的序列号优先)
                other.seq.cmp(&self.seq)
            }
            ord => ord,
        }
    }
}

impl PartialOrd for WriteRequest {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for WriteRequest {
    fn eq(&self, other: &Self) -> bool {
        self.class == other.class && self.seq == other.seq
    }
}

impl Eq for WriteRequest {}