use log::*;
use std::time::Instant;
use crate::connection::path::PathMap;
use crate::connection::space::BufferType;
use crate::connection::space::PacketNumSpaceMap;
use crate::connection::space::SentPacket;
use crate::connection::stream::StreamMap;
use crate::frame::Frame;
use crate::multipath_scheduler::MultipathScheduler;
use crate::Error;
use crate::MultipathConfig;
use crate::Result;
pub struct RedundantScheduler {}
impl RedundantScheduler {
pub fn new(_conf: &MultipathConfig) -> RedundantScheduler {
RedundantScheduler {}
}
}
impl MultipathScheduler for RedundantScheduler {
fn on_select(
&mut self,
paths: &mut PathMap,
spaces: &mut PacketNumSpaceMap,
streams: &mut StreamMap,
) -> Result<usize> {
for (pid, path) in paths.iter_mut() {
if !path.active() || !path.recovery.can_send() {
continue;
}
return Ok(pid);
}
Err(Error::Done)
}
fn on_sent(
&mut self,
packet: &SentPacket,
now: Instant,
path_id: usize,
paths: &mut PathMap,
spaces: &mut PacketNumSpaceMap,
streams: &mut StreamMap,
) {
if packet.buffer_flags.has_buffered() {
return;
}
for (pid, path) in paths.iter() {
if pid == path_id || !path.active() {
continue;
}
let space = match spaces.get_mut(path.space_id) {
Some(space) => space,
None => return,
};
for frame in &packet.frames {
if let Frame::Stream { .. } = frame {
debug!("RedundantScheduler: inject {:?} on path {:?}", frame, pid);
space.buffered.push_back(frame.clone(), BufferType::High);
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::multipath_scheduler::tests::MultipathTester;
#[test]
fn redundant_select() -> Result<()> {
let mut t = MultipathTester::new()?;
t.add_path("127.0.0.1:443", "127.0.0.2:8443", 50)?;
let mut s = RedundantScheduler {};
assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 0);
t.set_path_active(0, false)?;
assert_eq!(s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams)?, 1);
t.set_path_active(1, false)?;
assert_eq!(
s.on_select(&mut t.paths, &mut t.spaces, &mut t.streams),
Err(Error::Done)
);
Ok(())
}
}