MioPoll

Struct MioPoll 

Source
pub struct MioPoll { /* private fields */ }
Expand description

Ref-counting wrapper around a mio Poll instance

After creation, pass cloned copies of this to all interested parties. A MioPoll reference is also available from the associated Stakker instance using cx.anymap_get::<MioPoll>().

Implementations§

Source§

impl MioPoll

Source

pub fn new( stakker: &mut Stakker, poll: Poll, events: Events, waker_pri: u32, ) -> Result<Self>

Create a new MioPoll instance wrapping the given mio Poll instance and mio Events queue (which the caller should size according to their requirements). The waker priority should also be provided, in the range 0..=10. Sets up the Stakker instance to use MioPoll as the poll-waker, and puts a MioPoll clone into the Stakker anymap.

Examples found in repository?
examples/echo_server.rs (line 46)
43fn main() -> Result<(), Box<dyn Error>> {
44    let mut stakker = Stakker::new(Instant::now());
45    let s = &mut stakker;
46    let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48    let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50    // Don't need `idle!` handling
51    s.run(Instant::now(), false);
52    while s.not_shutdown() {
53        let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54        miopoll.poll(maxdur)?;
55        s.run(Instant::now(), false);
56    }
57
58    println!("Shutdown: {}", s.shutdown_reason().unwrap());
59    Ok(())
60}
Source

pub fn add<S: Source>( &self, source: S, ready: Interest, pri: u32, fwd: Fwd<Ready>, ) -> Result<MioSource<S>>

Register a mio Source object with the poll instance. Returns a MioSource which takes care of cleaning up the token and handler when it is dropped.

This uses edge-triggering: whenever one of the Interest flags included in ready changes state, the given Fwd instance will be invoked with the new Ready value. The contract with the handler is that there may be spurious calls to it, so it must be ready for that.

pri gives a priority level: 0..=10. If handlers are registered at different priority levels, then higher priority events get handled before lower priority events. Under constant very heavy load, lower priority events might be delayed indefinitely.

Examples found in repository?
examples/echo_server.rs (lines 84-89)
80    fn setup(cx: CX![]) -> std::io::Result<Self> {
81        let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
82        let listen = TcpListener::bind(SocketAddr::V4(addr))?;
83        let miopoll = cx.anymap_get::<MioPoll>();
84        let listener = miopoll.add(
85            listen,
86            Interest::READABLE,
87            10,
88            fwd_to!([cx], connect() as (Ready)),
89        )?;
90        println!("Listening on port 7777 for incoming telnet connections ...");
91
92        let mut this = Self {
93            listener,
94            children: ActorOwnSlab::new(),
95            inactivity: MaxTimerKey::default(),
96        };
97        this.activity(cx);
98
99        Ok(this)
100    }
101
102    // Register activity, pushing back the inactivity timer
103    fn activity(&mut self, cx: CX![]) {
104        timer_max!(
105            &mut self.inactivity,
106            cx.now() + Duration::from_secs(60),
107            [cx],
108            |_this, cx| {
109                fail!(cx, "Timed out waiting for connection");
110            }
111        );
112    }
113
114    fn connect(&mut self, cx: CX![], _: Ready) {
115        loop {
116            match self.listener.accept() {
117                Ok((stream, addr)) => {
118                    println!("New connection from {}", addr);
119                    actor_in_slab!(
120                        self.children,
121                        cx,
122                        Echoer::init(stream),
123                        ret_some_to!([cx], |_this, cx, cause: StopCause| {
124                            // Mostly just report child failure, but watch out for
125                            // AbortError to terminate this actor, which in turn shuts
126                            // down the whole process
127                            println!("Child actor terminated: {}", cause);
128
129                            if let StopCause::Failed(e) = cause {
130                                if e.downcast::<AbortError>().is_ok() {
131                                    fail!(cx, "Aborted");
132                                }
133                            }
134                        })
135                    );
136                    self.activity(cx);
137                }
138                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
139                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
140                Err(e) => {
141                    fail!(cx, "TCP listen socket failure on accept: {}", e);
142                    return;
143                }
144            }
145        }
146    }
147}
148
149/// Echoes received data back to sender, with a delay
150struct Echoer {
151    tcp: TcpStreamBuf,
152}
153
154impl Echoer {
155    fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
156        match Self::setup(cx, stream) {
157            Err(e) => {
158                fail!(cx, "Failed to set up a new TCP stream: {}", e);
159                None
160            }
161            Ok(this) => Some(this),
162        }
163    }
164
165    fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
166        let miopoll = cx.anymap_get::<MioPoll>();
167        let source = miopoll.add(
168            stream,
169            Interest::READABLE | Interest::WRITABLE,
170            10,
171            fwd_to!([cx], ready() as (Ready)),
172        )?;
173
174        let mut tcp = TcpStreamBuf::new();
175        tcp.init(source);
176
177        Ok(Self { tcp })
178    }
Source

pub fn poll(&self, max_delay: Duration) -> Result<bool>

Poll for new events and queue all the events of the highest available priority level. Events of lower priority levels are queued internally to be used on a future call to this method.

So the expected pattern is that highest-priority handlers get run, and when all the resulting processing has completed in Stakker, then the main loop polls again, and if more high-priority events have occurred, then those too will get processed. Lower-priority handlers will only get a chance to run when nothing higher-priority needs handling.

On success returns Ok(true) if an event was processed, or Ok(false) if there were no new events.

Examples found in repository?
examples/echo_server.rs (line 54)
43fn main() -> Result<(), Box<dyn Error>> {
44    let mut stakker = Stakker::new(Instant::now());
45    let s = &mut stakker;
46    let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48    let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50    // Don't need `idle!` handling
51    s.run(Instant::now(), false);
52    while s.not_shutdown() {
53        let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54        miopoll.poll(maxdur)?;
55        s.run(Instant::now(), false);
56    }
57
58    println!("Shutdown: {}", s.shutdown_reason().unwrap());
59    Ok(())
60}
Source

pub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)

Set the handler for “wake” events. There can only be one handler for “wake” events, so setting it here drops the previous handler. Don’t call this unless you wish to override the default wake handling which calls stakker::Stakker::poll_wake.

Source

pub fn waker(&mut self) -> Arc<Waker>

Get a cloned reference to the waker for this MioPoll instance. This can be passed to other threads, which can call wake() on it to cause the wake handler to be run in the main polling thread.

Trait Implementations§

Source§

impl Clone for MioPoll

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl Freeze for MioPoll

§

impl !RefUnwindSafe for MioPoll

§

impl !Send for MioPoll

§

impl !Sync for MioPoll

§

impl Unpin for MioPoll

§

impl !UnwindSafe for MioPoll

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> Any for T
where T: Any,

Source§

impl<T> CloneAny for T
where T: Any + Clone,