Struct portus::RunBuilder

source ·
pub struct RunBuilder<I: Ipc, U, Spawnness> { /* private fields */ }
Expand description

Main execution loop of CCP for the static pipeline use case. The run method blocks ‘forever’; it only returns in two cases:

  1. The IPC socket is closed.
  2. An invalid message is received.

Callers must construct a BackendBuilder. Algorithm implementations should

  1. Initializes an ipc backendbuilder (depending on the datapath).
  2. Calls run(), or spawn() passing the BackendBuilder b. Run() or spawn() create Arc objects, which are passed into run_inner to build the backend, so spawn() can create a CCPHandle that references this boolean to kill the thread.

Example

Configuration:

use std::collections::HashMap;
use portus::{CongAlg, Flow, Datapath, DatapathInfo, DatapathTrait, Report};
use portus::ipc::Ipc;
use portus::lang::Scope;
use portus::lang::Bin;
use portus::RunBuilder;
use portus::ipc::{BackendBuilder};

const PROG: &str = "
      (def (Report
          (volatile minrtt +infinity)
      ))
      (when true
          (:= Report.minrtt (min Report.minrtt Flow.rtt_sample_us))
      )
      (when (> Micros 42000)
          (report)
          (reset)
      )";

#[derive(Clone, Default)]
struct AlgOne(Scope);
impl<I: Ipc> CongAlg<I> for AlgOne {
    type Flow = Self;

    fn name() -> &'static str {
        "Default Alg"
    }
    fn datapath_programs(&self) -> HashMap<&'static str, String> {
        let mut h = HashMap::default();
        h.insert("MyProgram", PROG.to_owned());
        h
    }
    fn new_flow(&self, mut control: Datapath<I>, info: DatapathInfo) -> Self::Flow {
        let sc = control.set_program("MyProgram", None).unwrap();
        AlgOne(sc)
    }
}

impl Flow for AlgOne {
    fn on_report(&mut self, sock_id: u32, m: Report) {
        println!("alg1 minrtt: {:?}", m.get_field("Report.minrtt", &self.0).unwrap());
    }
}

#[derive(Clone, Default)]
struct AlgTwo(Scope);
impl<I: Ipc> CongAlg<I> for AlgTwo {
    type Flow = Self;

    fn name() -> &'static str {
        "Alg2"
    }
    fn datapath_programs(&self) -> HashMap<&'static str, String> {
        let mut h = HashMap::default();
        h.insert("MyProgram", PROG.to_owned());
        h
    }
    fn new_flow(&self, mut control: Datapath<I>, info: DatapathInfo) -> Self::Flow {
        let sc = control.set_program("MyProgram", None).unwrap();
        AlgTwo(sc)
    }
}

impl Flow for AlgTwo {
    fn on_report(&mut self, sock_id: u32, m: Report) {
        println!("alg2 minrtt: {:?}", m.get_field("Report.minrtt", &self.0).unwrap());
    }
}

fn main() {
  let b = portus::ipc::unix::Socket::<portus::ipc::Blocking>::new("portus").map(|sk| BackendBuilder { sock: sk }).expect("ipc initialization");
  let rb = RunBuilder::new(b)
    .default_alg(AlgOne::default())
    .additional_alg(AlgTwo::default())
    .additional_alg::<AlgOne, _>(None);
    // .spawn_thread() to spawn runtime in a thread
    // .with_stop_handle() to pass in an Arc<AtomicBool> that will stop the runtime
  rb.run();
}

Implementations§

source§

impl<I: Ipc> RunBuilder<I, (), NoSpawn>

source

pub fn new(backend_builder: BackendBuilder<I>) -> Self

source§

impl<I: Ipc, S> RunBuilder<I, (), S>

source

pub fn default_alg<A>(self, alg: A) -> RunBuilder<I, AlgListNil<A>, S>

Set the default congestion control algorithm. This is required to run or spawn anything.

This is the algorithm that will be used if the name the datapath requests doesn’t match anything.

source§

impl<I: Ipc, U, S> RunBuilder<I, U, S>

source

pub fn additional_alg<A: CongAlg<I>, O: Into<Option<A>>>( self, alg: O ) -> RunBuilder<I, AlgList<Option<A>, U>, S>

Set an additional congestion control algorithm.

If the name duplicates one already given, the later one will win.

source

pub fn try_additional_alg<A: CongAlg<I>>( self, alg: Option<A> ) -> RunBuilder<I, AlgList<Option<A>, U>, S>

source

pub fn with_stop_handle(self, handle: Arc<AtomicBool>) -> Self

Pass an AtomicBool stop handle.

source

pub unsafe fn with_raw_stop_handle(self, handle_ptr: *const AtomicBool) -> Self

Pass a raw pointer to an AtomicBool stop handle.

Safety

handle_ptr must be from Arc::into_raw().

source§

impl<I: Ipc, U> RunBuilder<I, U, NoSpawn>

source

pub fn spawn_thread(self) -> RunBuilder<I, U, Spawn>

Spawn a thread which will perform the CCP execution loop. Returns a CCPHandle, which the caller can use to cause the execution loop to stop. The run method blocks ‘forever’; it only returns in three cases:

  1. The IPC socket is closed.
  2. An invalid message is received.
  3. The caller calls CCPHandle::kill()

See run for more information.

source§

impl<I, U> RunBuilder<I, U, NoSpawn>where I: Ipc, for<'a> &'a U: Pick<'a, I> + CollectDps<I>,

source

pub fn run(self) -> Result<()>

source§

impl<I, U> RunBuilder<I, U, Spawn>where I: Ipc, U: Send + 'static, for<'a> &'a U: Pick<'a, I> + CollectDps<I>,

source

pub fn run(self) -> Result<CCPHandle>

Auto Trait Implementations§

§

impl<I, U, Spawnness> RefUnwindSafe for RunBuilder<I, U, Spawnness>where I: RefUnwindSafe, Spawnness: RefUnwindSafe, U: RefUnwindSafe,

§

impl<I, U, Spawnness> !Send for RunBuilder<I, U, Spawnness>

§

impl<I, U, Spawnness> !Sync for RunBuilder<I, U, Spawnness>

§

impl<I, U, Spawnness> Unpin for RunBuilder<I, U, Spawnness>where I: Unpin, Spawnness: Unpin, U: Unpin,

§

impl<I, U, Spawnness> UnwindSafe for RunBuilder<I, U, Spawnness>where I: UnwindSafe, Spawnness: UnwindSafe, U: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize = mem::align_of::<T>()

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more