Struct portus::RunBuilder
source · pub struct RunBuilder<I: Ipc, U, Spawnness> { /* private fields */ }
Expand description
Main execution loop of CCP for the static pipeline use case.
The run
method blocks ‘forever’; it only returns in two cases:
- The IPC socket is closed.
- An invalid message is received.
Callers must construct a BackendBuilder
.
Algorithm implementations should
- Initializes an ipc backendbuilder (depending on the datapath).
- Calls
run()
, orspawn()
passing theBackendBuilder b
. Run() or spawn() create Arcobjects, which are passed into run_inner to build the backend, so spawn() can create a CCPHandle that references this boolean to kill the thread.
Example
Configuration:
use std::collections::HashMap;
use portus::{CongAlg, Flow, Datapath, DatapathInfo, DatapathTrait, Report};
use portus::ipc::Ipc;
use portus::lang::Scope;
use portus::lang::Bin;
use portus::RunBuilder;
use portus::ipc::{BackendBuilder};
const PROG: &str = "
(def (Report
(volatile minrtt +infinity)
))
(when true
(:= Report.minrtt (min Report.minrtt Flow.rtt_sample_us))
)
(when (> Micros 42000)
(report)
(reset)
)";
#[derive(Clone, Default)]
struct AlgOne(Scope);
impl<I: Ipc> CongAlg<I> for AlgOne {
type Flow = Self;
fn name() -> &'static str {
"Default Alg"
}
fn datapath_programs(&self) -> HashMap<&'static str, String> {
let mut h = HashMap::default();
h.insert("MyProgram", PROG.to_owned());
h
}
fn new_flow(&self, mut control: Datapath<I>, info: DatapathInfo) -> Self::Flow {
let sc = control.set_program("MyProgram", None).unwrap();
AlgOne(sc)
}
}
impl Flow for AlgOne {
fn on_report(&mut self, sock_id: u32, m: Report) {
println!("alg1 minrtt: {:?}", m.get_field("Report.minrtt", &self.0).unwrap());
}
}
#[derive(Clone, Default)]
struct AlgTwo(Scope);
impl<I: Ipc> CongAlg<I> for AlgTwo {
type Flow = Self;
fn name() -> &'static str {
"Alg2"
}
fn datapath_programs(&self) -> HashMap<&'static str, String> {
let mut h = HashMap::default();
h.insert("MyProgram", PROG.to_owned());
h
}
fn new_flow(&self, mut control: Datapath<I>, info: DatapathInfo) -> Self::Flow {
let sc = control.set_program("MyProgram", None).unwrap();
AlgTwo(sc)
}
}
impl Flow for AlgTwo {
fn on_report(&mut self, sock_id: u32, m: Report) {
println!("alg2 minrtt: {:?}", m.get_field("Report.minrtt", &self.0).unwrap());
}
}
fn main() {
let b = portus::ipc::unix::Socket::<portus::ipc::Blocking>::new("portus").map(|sk| BackendBuilder { sock: sk }).expect("ipc initialization");
let rb = RunBuilder::new(b)
.default_alg(AlgOne::default())
.additional_alg(AlgTwo::default())
.additional_alg::<AlgOne, _>(None);
// .spawn_thread() to spawn runtime in a thread
// .with_stop_handle() to pass in an Arc<AtomicBool> that will stop the runtime
rb.run();
}
Implementations§
source§impl<I: Ipc> RunBuilder<I, (), NoSpawn>
impl<I: Ipc> RunBuilder<I, (), NoSpawn>
pub fn new(backend_builder: BackendBuilder<I>) -> Self
source§impl<I: Ipc, S> RunBuilder<I, (), S>
impl<I: Ipc, S> RunBuilder<I, (), S>
sourcepub fn default_alg<A>(self, alg: A) -> RunBuilder<I, AlgListNil<A>, S>
pub fn default_alg<A>(self, alg: A) -> RunBuilder<I, AlgListNil<A>, S>
Set the default congestion control algorithm. This is required to run or spawn anything.
This is the algorithm that will be used if the name the datapath requests doesn’t match anything.
source§impl<I: Ipc, U, S> RunBuilder<I, U, S>
impl<I: Ipc, U, S> RunBuilder<I, U, S>
sourcepub fn additional_alg<A: CongAlg<I>, O: Into<Option<A>>>(
self,
alg: O
) -> RunBuilder<I, AlgList<Option<A>, U>, S>
pub fn additional_alg<A: CongAlg<I>, O: Into<Option<A>>>( self, alg: O ) -> RunBuilder<I, AlgList<Option<A>, U>, S>
Set an additional congestion control algorithm.
If the name duplicates one already given, the later one will win.
pub fn try_additional_alg<A: CongAlg<I>>( self, alg: Option<A> ) -> RunBuilder<I, AlgList<Option<A>, U>, S>
sourcepub fn with_stop_handle(self, handle: Arc<AtomicBool>) -> Self
pub fn with_stop_handle(self, handle: Arc<AtomicBool>) -> Self
Pass an AtomicBool
stop handle.
sourcepub unsafe fn with_raw_stop_handle(self, handle_ptr: *const AtomicBool) -> Self
pub unsafe fn with_raw_stop_handle(self, handle_ptr: *const AtomicBool) -> Self
source§impl<I: Ipc, U> RunBuilder<I, U, NoSpawn>
impl<I: Ipc, U> RunBuilder<I, U, NoSpawn>
sourcepub fn spawn_thread(self) -> RunBuilder<I, U, Spawn>
pub fn spawn_thread(self) -> RunBuilder<I, U, Spawn>
Spawn a thread which will perform the CCP execution loop. Returns
a CCPHandle
, which the caller can use to cause the execution loop
to stop.
The run
method blocks ‘forever’; it only returns in three cases:
- The IPC socket is closed.
- An invalid message is received.
- The caller calls
CCPHandle::kill()
See run
for more information.