Struct stakker_mio::MioPoll

source ·
pub struct MioPoll { /* private fields */ }
Expand description

Ref-counting wrapper around a mio Poll instance

After creation, pass cloned copies of this to all interested parties. A MioPoll reference is also available from the associated Stakker instance using cx.anymap_get::<MioPoll>().

Implementations§

source§

impl MioPoll

source

pub fn new( stakker: &mut Stakker, poll: Poll, events: Events, waker_pri: u32 ) -> Result<Self>

Create a new MioPoll instance wrapping the given mio Poll instance and mio Events queue (which the caller should size according to their requirements). The waker priority should also be provided, in the range 0..=10. Sets up the Stakker instance to use MioPoll as the poll-waker, and puts a MioPoll clone into the Stakker anymap.

Examples found in repository?
examples/echo_server.rs (line 46)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
fn main() -> Result<(), Box<dyn Error>> {
    let mut stakker = Stakker::new(Instant::now());
    let s = &mut stakker;
    let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;

    let _listener = actor!(s, Listener::init(), ret_shutdown!(s));

    // Don't need `idle!` handling
    s.run(Instant::now(), false);
    while s.not_shutdown() {
        let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        miopoll.poll(maxdur)?;
        s.run(Instant::now(), false);
    }

    println!("Shutdown: {}", s.shutdown_reason().unwrap());
    Ok(())
}
source

pub fn add<S: Source>( &self, source: S, ready: Interest, pri: u32, fwd: Fwd<Ready> ) -> Result<MioSource<S>>

Register a mio Source object with the poll instance. Returns a MioSource which takes care of cleaning up the token and handler when it is dropped.

This uses edge-triggering: whenever one of the Interest flags included in ready changes state, the given Fwd instance will be invoked with the new Ready value. The contract with the handler is that there may be spurious calls to it, so it must be ready for that.

pri gives a priority level: 0..=10. If handlers are registered at different priority levels, then higher priority events get handled before lower priority events. Under constant very heavy load, lower priority events might be delayed indefinitely.

Examples found in repository?
examples/echo_server.rs (lines 84-89)
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
    fn setup(cx: CX![]) -> std::io::Result<Self> {
        let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
        let listen = TcpListener::bind(SocketAddr::V4(addr))?;
        let miopoll = cx.anymap_get::<MioPoll>();
        let listener = miopoll.add(
            listen,
            Interest::READABLE,
            10,
            fwd_to!([cx], connect() as (Ready)),
        )?;
        println!("Listening on port 7777 for incoming telnet connections ...");

        let mut this = Self {
            listener,
            children: ActorOwnSlab::new(),
            inactivity: MaxTimerKey::default(),
        };
        this.activity(cx);

        Ok(this)
    }

    // Register activity, pushing back the inactivity timer
    fn activity(&mut self, cx: CX![]) {
        timer_max!(
            &mut self.inactivity,
            cx.now() + Duration::from_secs(60),
            [cx],
            |_this, cx| {
                fail!(cx, "Timed out waiting for connection");
            }
        );
    }

    fn connect(&mut self, cx: CX![], _: Ready) {
        loop {
            match self.listener.accept() {
                Ok((stream, addr)) => {
                    println!("New connection from {}", addr);
                    actor_in_slab!(
                        self.children,
                        cx,
                        Echoer::init(stream),
                        ret_some_to!([cx], |_this, cx, cause: StopCause| {
                            // Mostly just report child failure, but watch out for
                            // AbortError to terminate this actor, which in turn shuts
                            // down the whole process
                            println!("Child actor terminated: {}", cause);

                            if let StopCause::Failed(e) = cause {
                                if e.downcast::<AbortError>().is_ok() {
                                    fail!(cx, "Aborted");
                                }
                            }
                        })
                    );
                    self.activity(cx);
                }
                Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
                Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
                Err(e) => {
                    fail!(cx, "TCP listen socket failure on accept: {}", e);
                    return;
                }
            }
        }
    }
}

/// Echoes received data back to sender, with a delay
struct Echoer {
    tcp: TcpStreamBuf,
}

impl Echoer {
    fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
        match Self::setup(cx, stream) {
            Err(e) => {
                fail!(cx, "Failed to set up a new TCP stream: {}", e);
                None
            }
            Ok(this) => Some(this),
        }
    }

    fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
        let miopoll = cx.anymap_get::<MioPoll>();
        let source = miopoll.add(
            stream,
            Interest::READABLE | Interest::WRITABLE,
            10,
            fwd_to!([cx], ready() as (Ready)),
        )?;

        let mut tcp = TcpStreamBuf::new();
        tcp.init(source);

        Ok(Self { tcp })
    }
source

pub fn poll(&self, max_delay: Duration) -> Result<bool>

Poll for new events and queue all the events of the highest available priority level. Events of lower priority levels are queued internally to be used on a future call to this method.

So the expected pattern is that highest-priority handlers get run, and when all the resulting processing has completed in Stakker, then the main loop polls again, and if more high-priority events have occurred, then those too will get processed. Lower-priority handlers will only get a chance to run when nothing higher-priority needs handling.

On success returns Ok(true) if an event was processed, or Ok(false) if there were no new events.

Examples found in repository?
examples/echo_server.rs (line 54)
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
fn main() -> Result<(), Box<dyn Error>> {
    let mut stakker = Stakker::new(Instant::now());
    let s = &mut stakker;
    let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;

    let _listener = actor!(s, Listener::init(), ret_shutdown!(s));

    // Don't need `idle!` handling
    s.run(Instant::now(), false);
    while s.not_shutdown() {
        let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
        miopoll.poll(maxdur)?;
        s.run(Instant::now(), false);
    }

    println!("Shutdown: {}", s.shutdown_reason().unwrap());
    Ok(())
}
source

pub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)

Set the handler for “wake” events. There can only be one handler for “wake” events, so setting it here drops the previous handler. Don’t call this unless you wish to override the default wake handling which calls stakker::Stakker::poll_wake.

source

pub fn waker(&mut self) -> Arc<Waker>

Get a cloned reference to the waker for this MioPoll instance. This can be passed to other threads, which can call wake() on it to cause the wake handler to be run in the main polling thread.

Trait Implementations§

source§

impl Clone for MioPoll

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for MioPoll

§

impl !Send for MioPoll

§

impl !Sync for MioPoll

§

impl Unpin for MioPoll

§

impl !UnwindSafe for MioPoll

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for Twhere T: Clone,

§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<T> Any for Twhere T: Any,

§

impl<T> CloneAny for Twhere T: Any + Clone,