Struct stakker_mio::MioPoll
source · pub struct MioPoll { /* private fields */ }
Expand description
Ref-counting wrapper around a mio Poll
instance
After creation, pass cloned copies of this to all interested
parties. A MioPoll
reference is also available from the
associated Stakker instance using
cx.anymap_get::<MioPoll>()
.
Implementations§
source§impl MioPoll
impl MioPoll
sourcepub fn new(
stakker: &mut Stakker,
poll: Poll,
events: Events,
waker_pri: u32
) -> Result<Self>
pub fn new( stakker: &mut Stakker, poll: Poll, events: Events, waker_pri: u32 ) -> Result<Self>
Create a new MioPoll instance wrapping the given mio Poll
instance and mio Events
queue (which the caller should size
according to their requirements). The waker priority should
also be provided, in the range 0..=10
. Sets up the
Stakker instance to use MioPoll
as the poll-waker, and
puts a MioPoll
clone into the Stakker anymap.
Examples found in repository?
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
fn main() -> Result<(), Box<dyn Error>> {
let mut stakker = Stakker::new(Instant::now());
let s = &mut stakker;
let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
// Don't need `idle!` handling
s.run(Instant::now(), false);
while s.not_shutdown() {
let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
miopoll.poll(maxdur)?;
s.run(Instant::now(), false);
}
println!("Shutdown: {}", s.shutdown_reason().unwrap());
Ok(())
}
sourcepub fn add<S: Source>(
&self,
source: S,
ready: Interest,
pri: u32,
fwd: Fwd<Ready>
) -> Result<MioSource<S>>
pub fn add<S: Source>( &self, source: S, ready: Interest, pri: u32, fwd: Fwd<Ready> ) -> Result<MioSource<S>>
Register a mio Source
object with the poll instance.
Returns a MioSource
which takes care of cleaning up the
token and handler when it is dropped.
This uses edge-triggering: whenever one of the Interest flags
included in ready
changes state, the given Fwd
instance
will be invoked with the new Ready
value. The contract with
the handler is that there may be spurious calls to it, so it
must be ready for that.
pri
gives a priority level: 0..=10
. If handlers are
registered at different priority levels, then higher priority
events get handled before lower priority events. Under
constant very heavy load, lower priority events might be
delayed indefinitely.
Examples found in repository?
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
fn setup(cx: CX![]) -> std::io::Result<Self> {
let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
let listen = TcpListener::bind(SocketAddr::V4(addr))?;
let miopoll = cx.anymap_get::<MioPoll>();
let listener = miopoll.add(
listen,
Interest::READABLE,
10,
fwd_to!([cx], connect() as (Ready)),
)?;
println!("Listening on port 7777 for incoming telnet connections ...");
let mut this = Self {
listener,
children: ActorOwnSlab::new(),
inactivity: MaxTimerKey::default(),
};
this.activity(cx);
Ok(this)
}
// Register activity, pushing back the inactivity timer
fn activity(&mut self, cx: CX![]) {
timer_max!(
&mut self.inactivity,
cx.now() + Duration::from_secs(60),
[cx],
|_this, cx| {
fail!(cx, "Timed out waiting for connection");
}
);
}
fn connect(&mut self, cx: CX![], _: Ready) {
loop {
match self.listener.accept() {
Ok((stream, addr)) => {
println!("New connection from {}", addr);
actor_in_slab!(
self.children,
cx,
Echoer::init(stream),
ret_some_to!([cx], |_this, cx, cause: StopCause| {
// Mostly just report child failure, but watch out for
// AbortError to terminate this actor, which in turn shuts
// down the whole process
println!("Child actor terminated: {}", cause);
if let StopCause::Failed(e) = cause {
if e.downcast::<AbortError>().is_ok() {
fail!(cx, "Aborted");
}
}
})
);
self.activity(cx);
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => {
fail!(cx, "TCP listen socket failure on accept: {}", e);
return;
}
}
}
}
}
/// Echoes received data back to sender, with a delay
struct Echoer {
tcp: TcpStreamBuf,
}
impl Echoer {
fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
match Self::setup(cx, stream) {
Err(e) => {
fail!(cx, "Failed to set up a new TCP stream: {}", e);
None
}
Ok(this) => Some(this),
}
}
fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
let miopoll = cx.anymap_get::<MioPoll>();
let source = miopoll.add(
stream,
Interest::READABLE | Interest::WRITABLE,
10,
fwd_to!([cx], ready() as (Ready)),
)?;
let mut tcp = TcpStreamBuf::new();
tcp.init(source);
Ok(Self { tcp })
}
sourcepub fn poll(&self, max_delay: Duration) -> Result<bool>
pub fn poll(&self, max_delay: Duration) -> Result<bool>
Poll for new events and queue all the events of the highest available priority level. Events of lower priority levels are queued internally to be used on a future call to this method.
So the expected pattern is that highest-priority handlers get run, and when all the resulting processing has completed in Stakker, then the main loop polls again, and if more high-priority events have occurred, then those too will get processed. Lower-priority handlers will only get a chance to run when nothing higher-priority needs handling.
On success returns Ok(true)
if an event was processed, or
Ok(false)
if there were no new events.
Examples found in repository?
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
fn main() -> Result<(), Box<dyn Error>> {
let mut stakker = Stakker::new(Instant::now());
let s = &mut stakker;
let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
// Don't need `idle!` handling
s.run(Instant::now(), false);
while s.not_shutdown() {
let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
miopoll.poll(maxdur)?;
s.run(Instant::now(), false);
}
println!("Shutdown: {}", s.shutdown_reason().unwrap());
Ok(())
}
sourcepub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)
pub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)
Set the handler for “wake” events. There can only be one
handler for “wake” events, so setting it here drops the
previous handler. Don’t call this unless you wish to override
the default wake handling which calls
stakker::Stakker::poll_wake
.