pub struct MioPoll { /* private fields */ }Expand description
Ref-counting wrapper around a mio Poll instance
After creation, pass cloned copies of this to all interested
parties. A MioPoll reference is also available from the
associated Stakker instance using
cx.anymap_get::<MioPoll>().
Implementations§
Source§impl MioPoll
impl MioPoll
Sourcepub fn new(
stakker: &mut Stakker,
poll: Poll,
events: Events,
waker_pri: u32,
) -> Result<Self>
pub fn new( stakker: &mut Stakker, poll: Poll, events: Events, waker_pri: u32, ) -> Result<Self>
Create a new MioPoll instance wrapping the given mio Poll
instance and mio Events queue (which the caller should size
according to their requirements). The waker priority should
also be provided, in the range 0..=10. Sets up the
Stakker instance to use MioPoll as the poll-waker, and
puts a MioPoll clone into the Stakker anymap.
Examples found in repository?
43fn main() -> Result<(), Box<dyn Error>> {
44 let mut stakker = Stakker::new(Instant::now());
45 let s = &mut stakker;
46 let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48 let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50 // Don't need `idle!` handling
51 s.run(Instant::now(), false);
52 while s.not_shutdown() {
53 let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54 miopoll.poll(maxdur)?;
55 s.run(Instant::now(), false);
56 }
57
58 println!("Shutdown: {}", s.shutdown_reason().unwrap());
59 Ok(())
60}Sourcepub fn add<S: Source>(
&self,
source: S,
ready: Interest,
pri: u32,
fwd: Fwd<Ready>,
) -> Result<MioSource<S>>
pub fn add<S: Source>( &self, source: S, ready: Interest, pri: u32, fwd: Fwd<Ready>, ) -> Result<MioSource<S>>
Register a mio Source object with the poll instance.
Returns a MioSource which takes care of cleaning up the
token and handler when it is dropped.
This uses edge-triggering: whenever one of the Interest flags
included in ready changes state, the given Fwd instance
will be invoked with the new Ready value. The contract with
the handler is that there may be spurious calls to it, so it
must be ready for that.
pri gives a priority level: 0..=10. If handlers are
registered at different priority levels, then higher priority
events get handled before lower priority events. Under
constant very heavy load, lower priority events might be
delayed indefinitely.
Examples found in repository?
80 fn setup(cx: CX![]) -> std::io::Result<Self> {
81 let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, PORT);
82 let listen = TcpListener::bind(SocketAddr::V4(addr))?;
83 let miopoll = cx.anymap_get::<MioPoll>();
84 let listener = miopoll.add(
85 listen,
86 Interest::READABLE,
87 10,
88 fwd_to!([cx], connect() as (Ready)),
89 )?;
90 println!("Listening on port 7777 for incoming telnet connections ...");
91
92 let mut this = Self {
93 listener,
94 children: ActorOwnSlab::new(),
95 inactivity: MaxTimerKey::default(),
96 };
97 this.activity(cx);
98
99 Ok(this)
100 }
101
102 // Register activity, pushing back the inactivity timer
103 fn activity(&mut self, cx: CX![]) {
104 timer_max!(
105 &mut self.inactivity,
106 cx.now() + Duration::from_secs(60),
107 [cx],
108 |_this, cx| {
109 fail!(cx, "Timed out waiting for connection");
110 }
111 );
112 }
113
114 fn connect(&mut self, cx: CX![], _: Ready) {
115 loop {
116 match self.listener.accept() {
117 Ok((stream, addr)) => {
118 println!("New connection from {}", addr);
119 actor_in_slab!(
120 self.children,
121 cx,
122 Echoer::init(stream),
123 ret_some_to!([cx], |_this, cx, cause: StopCause| {
124 // Mostly just report child failure, but watch out for
125 // AbortError to terminate this actor, which in turn shuts
126 // down the whole process
127 println!("Child actor terminated: {}", cause);
128
129 if let StopCause::Failed(e) = cause {
130 if e.downcast::<AbortError>().is_ok() {
131 fail!(cx, "Aborted");
132 }
133 }
134 })
135 );
136 self.activity(cx);
137 }
138 Err(ref e) if e.kind() == ErrorKind::WouldBlock => break,
139 Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
140 Err(e) => {
141 fail!(cx, "TCP listen socket failure on accept: {}", e);
142 return;
143 }
144 }
145 }
146 }
147}
148
149/// Echoes received data back to sender, with a delay
150struct Echoer {
151 tcp: TcpStreamBuf,
152}
153
154impl Echoer {
155 fn init(cx: CX![], stream: TcpStream) -> Option<Self> {
156 match Self::setup(cx, stream) {
157 Err(e) => {
158 fail!(cx, "Failed to set up a new TCP stream: {}", e);
159 None
160 }
161 Ok(this) => Some(this),
162 }
163 }
164
165 fn setup(cx: CX![], stream: TcpStream) -> std::io::Result<Self> {
166 let miopoll = cx.anymap_get::<MioPoll>();
167 let source = miopoll.add(
168 stream,
169 Interest::READABLE | Interest::WRITABLE,
170 10,
171 fwd_to!([cx], ready() as (Ready)),
172 )?;
173
174 let mut tcp = TcpStreamBuf::new();
175 tcp.init(source);
176
177 Ok(Self { tcp })
178 }Sourcepub fn poll(&self, max_delay: Duration) -> Result<bool>
pub fn poll(&self, max_delay: Duration) -> Result<bool>
Poll for new events and queue all the events of the highest available priority level. Events of lower priority levels are queued internally to be used on a future call to this method.
So the expected pattern is that highest-priority handlers get run, and when all the resulting processing has completed in Stakker, then the main loop polls again, and if more high-priority events have occurred, then those too will get processed. Lower-priority handlers will only get a chance to run when nothing higher-priority needs handling.
On success returns Ok(true) if an event was processed, or
Ok(false) if there were no new events.
Examples found in repository?
43fn main() -> Result<(), Box<dyn Error>> {
44 let mut stakker = Stakker::new(Instant::now());
45 let s = &mut stakker;
46 let miopoll = MioPoll::new(s, Poll::new()?, Events::with_capacity(1024), 0)?;
47
48 let _listener = actor!(s, Listener::init(), ret_shutdown!(s));
49
50 // Don't need `idle!` handling
51 s.run(Instant::now(), false);
52 while s.not_shutdown() {
53 let maxdur = s.next_wait_max(Instant::now(), Duration::from_secs(60), false);
54 miopoll.poll(maxdur)?;
55 s.run(Instant::now(), false);
56 }
57
58 println!("Shutdown: {}", s.shutdown_reason().unwrap());
59 Ok(())
60}Sourcepub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)
pub fn set_wake_fwd(&mut self, fwd: Fwd<Ready>)
Set the handler for “wake” events. There can only be one
handler for “wake” events, so setting it here drops the
previous handler. Don’t call this unless you wish to override
the default wake handling which calls
stakker::Stakker::poll_wake.