arti_rpc_client_core/nb_stream.rs
1//! Low-level nonblocking stream implementation.
2//!
3//! This module defines two main types: [`NonblockingStream`].
4//! (a low-level type for use by external tools
5//! that want to implement their own nonblocking IO),
6//! and [`PollingStream`] (a slightly higher-level type
7//! that we use internally when we are asked to provide
8//! our own nonblocking IO loop(s)).
9//!
10//! This module also defines several traits for use by these types.
11//!
12//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
13//!
14//! TODO nb: For now, nothing in this module is actually public; we'll want to expose some of these types.
15
16use mio::Interest;
17
18use crate::{
19 msgs::{request::ValidatedRequest, response::UnparsedResponse},
20 util::define_from_for_arc,
21};
22use std::{
23 io::{self, Read as _, Write as _},
24 mem,
25 sync::{Arc, Mutex},
26};
27
28/// An IO stream to Arti, along with any supporting logic necessary to check it for readiness.
29///
30/// Internally, this uses `mio` along with a [`NonblockingStream`] to check for events.
31///
32/// To use this type, mark the stream as nonblocking
33/// with e.g. [TcpStream::set_nonblocking](std::net::TcpStream::set_nonblocking),
34/// convert it into a [`mio::event::Source`],
35/// and pass it to [`PollingStream::new()`]
36///
37/// At this point, you can read and write messages via nonblocking IO.
38///
39/// The [`PollingStream::writer()`] method will return a handle that you can use from any thread
40/// that you can use to queue an outbound message.
41///
42/// No messages are actually sent or received unless some thread is calling [`PollingStream::interact()`].
43///
44/// ## Concurrency and interior mutability
45///
46/// A `PollingStream` has (limited) interior mutability.
47///
48/// Only a single call to `interact` can be made at the same time.
49/// So only one thread can be waiting for responses, and
50/// the caller of `interact` must demultiplex responses as necessary.
51///
52/// But, one or more [`WriteHandle`]s can be created,
53/// and these are `'static + Send + Sync`.
54/// Using `WriteHandle`, multiple threads can enqueue requests,
55/// with [`send_valid`](WriteHandle::send_valid), concurrently.
56///
57/// (All these restrictions imposed on the caller are enforced by the Rust type system.)
58#[derive(Debug)]
59pub(crate) struct PollingStream {
60 /// The poll object.
61 ///
62 /// (This typically corresponds to a kqueue or epoll handle.)
63 ///
64 /// ## IO Safety
65 ///
66 /// This object (semantically) contains references to the `fd`s or `SOCKETS`
67 /// of any inserted [`mio::event::Source`]. Therefore it must not outlive those sources.
68 /// Further, according to `mio`'s documentation, every Source must be deregistered
69 /// before it can be dropped.
70 ///
71 /// We ensure these properties are obeyed as follows:
72 /// - We hold the stream via `stream`, the NonblockingStream member of this struct.
73 /// We do not let anybody outside this module have the stream or the `Poll`.
74 /// - We declare a Drop implementation that deregisters the stream.
75 /// This method ensures that the stream is dropped before it is closed.
76 poll: mio::Poll,
77
78 /// A small buffer to receive IO readiness events.
79 events: mio::Events,
80
81 /// The underlying stream.
82 ///
83 /// Invariant: `stream.stream` is a [`MioStream`], so [`Stream::as_mio_stream`] will return
84 /// Some when we call it.
85 stream: NonblockingStream,
86}
87
88/// A `mio` token corresponding to the Waker we use to tell the interactor about new writes.
89const WAKE_TOKEN: mio::Token = mio::Token(0);
90
91/// A `mio` token corresponding to the Stream connecting to the RPC
92const STREAM_TOKEN: mio::Token = mio::Token(1);
93
94impl PollingStream {
95 /// Create a new PollingStream.
96 ///
97 /// The `stream` will be set to use nonblocking IO;
98 /// on Unix this will affect the behaviour of other `dup`s of the same fd!
99 pub(crate) fn new(stream: Box<dyn MioStream>) -> io::Result<Self> {
100 let poll = mio::Poll::new()?;
101 let waker = mio::Waker::new(poll.registry(), WAKE_TOKEN)?;
102
103 let stream = NonblockingStream::new(Box::new(waker), stream);
104
105 let mut cio = Self {
106 poll,
107 events: mio::Events::with_capacity(4),
108 stream,
109 };
110
111 // We register the stream here, since we want to use it exclusively with `reregister`
112 // later on. We do not deregister the stream until `Drop::drop` is called.
113 cio.poll.registry().register(
114 cio.stream
115 .stream
116 .as_mio_stream()
117 .expect("logic error: not a mio stream."),
118 STREAM_TOKEN,
119 Interest::READABLE,
120 )?;
121
122 Ok(cio)
123 }
124
125 /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
126 pub(crate) fn writer(&self) -> WriteHandle {
127 self.stream.writer()
128 }
129
130 /// Interact with the peer until some response is received.
131 ///
132 /// Sends all requests given to [`WriteHandle::send_valid`]
133 /// (including calls to `send_valid` made while `interact` is running)
134 /// while looking for a response from the server.
135 /// Returns when the first response is received.
136 ///
137 ///
138 /// Returns an error if an IO condition has failed.
139 /// Returns None if the other side has closed the stream.
140 /// Otherwise, returns an unparsed message from the RPC server.
141 ///
142 /// Unless some thread is calling this method, nobody will actually be reading or writing from
143 /// the [`PollingStream`], and so nobody's requests will be sent or answered.
144 pub(crate) fn interact(&mut self) -> io::Result<Option<UnparsedResponse>> {
145 // Should we try to read and write? Start out by assuming "yes".
146 let mut try_writing = true;
147 let mut try_reading = true;
148
149 loop {
150 // Try interacting with the underlying stream.
151 let want_io = match self.stream.interact_once(try_writing, try_reading)? {
152 PollStatus::Closed => return Ok(None),
153 PollStatus::Msg(msg) => return Ok(Some(msg)),
154 PollStatus::WouldBlock(w) => w,
155 };
156
157 // We're blocking on reading and possibly writing. Register our interest,
158 // so that we get woken as appropriate.
159 self.poll.registry().reregister(
160 self.stream
161 .stream
162 .as_mio_stream()
163 .expect("logic error: not a mio stream!"),
164 STREAM_TOKEN,
165 want_io.into(),
166 )?;
167
168 // Poll until the socket is ready to read or write,
169 // _or_ until somebody invokes the Waker because they have queued more to write.
170 let () = retry_eintr(|| self.poll.poll(&mut self.events, None))?;
171
172 // Now that we've been woken, see which events we've been woken with,
173 // and adjust our plans accordingly on the next time through the loop.
174 try_reading = false;
175 try_writing = false;
176 for event in self.events.iter() {
177 if event.token() == STREAM_TOKEN {
178 if event.is_readable() {
179 try_reading = true;
180 }
181 if event.is_writable() {
182 try_writing = true;
183 }
184 } else if event.token() == WAKE_TOKEN {
185 try_writing = true;
186 }
187 }
188 }
189 }
190}
191
192impl Drop for PollingStream {
193 fn drop(&mut self) {
194 // IO SAFETY: See "IO Safety" note in documentation for PollingStream.
195 let s = self
196 .stream
197 .stream
198 .as_mio_stream()
199 .expect("Logic error: Stream was not a MIO stream.");
200 self.poll
201 .registry()
202 .deregister(s)
203 .expect("Deregister operation failed");
204 }
205}
206
207/// A handle that can be used to queue outgoing messages for a nonblocking stream.
208///
209/// Note that queueing a message has no effect unless some party is polling the stream,
210/// either with [`PollingStream::interact()`], or [`NonblockingStream::interact_once()`].
211#[derive(Clone, Debug)]
212pub(crate) struct WriteHandle {
213 /// The actual implementation type for this writer.
214 inner: Arc<Mutex<WriteHandleImpl>>,
215}
216
217impl WriteHandle {
218 /// Queue an outgoing message for a nonblocking stream.
219 pub(crate) fn send_valid(&self, msg: &ValidatedRequest) -> io::Result<()> {
220 let mut w = self.inner.lock().expect("Poisoned lock");
221 w.write_buf.extend_from_slice(msg.as_ref().as_bytes());
222
223 // See TOCTOU note on `WriteHandleImpl`: we need to wake() while we are holding the
224 // above mutex.
225 w.waker.wake()
226 }
227}
228
229/// An error that has occurred while sending a request.
230#[derive(Clone, Debug, thiserror::Error)]
231#[non_exhaustive]
232pub enum SendRequestError {
233 /// An IO error occurred while sending a request.
234 #[error("Unable to wake poling loop")]
235 Io(#[source] Arc<io::Error>),
236 /// We found a problem in the JSON while sending a request.
237 #[error("Invalid Json request")]
238 InvalidRequest(#[from] crate::InvalidRequestError),
239 /// Internal error while re-encoding request. Should be impossible.
240 #[error("Unable to re-encode request after parsing it‽")]
241 ReEncode(#[source] Arc<serde_json::Error>),
242}
243define_from_for_arc!( io::Error => SendRequestError [Io] );
244
245/// The inner implementation for [`WriteHandle`].
246///
247/// NOTE: We need to be careful to avoid TOCTOU problems with this type:
248/// It would be bad if a writing thread called `waker.wake()`, and then the interactor checked the
249/// buffer and found it empty, and only then did the writing thread add to the buffer.
250///
251/// To solve this, we put the write_buf and the waker behind the same lock:
252/// While the interactor is checking the buffer, nobody is able to add to the buffer _or_ wake the
253/// interactor.
254#[derive(derive_more::Debug)]
255struct WriteHandleImpl {
256 /// An underlying buffer holding messages to be sent to the RPC server.
257 //
258 // TODO: Consider using a VecDeque or BytesMut or such.
259 write_buf: Vec<u8>,
260
261 /// The waker to use to wake the polling loop.
262 #[debug(ignore)]
263 waker: Box<dyn Waker>,
264}
265
266/// A lower-level implementation of nonblocking IO for an open stream to the RPC server.
267///
268/// Unlike [`PollingStream`], this type _does not_ handle the IO event polling loops:
269/// the caller is required to provide their own.
270#[derive(derive_more::Debug)]
271pub(crate) struct NonblockingStream {
272 /// A write handle used to write onto this stream.
273 #[debug(ignore)]
274 write_handle: WriteHandle,
275
276 /// A buffer of incoming messages (possibly partial) from the RPC server.
277 //
278 // TODO: Consider using a VecDeque or BytesMut or such.
279 read_buf: Vec<u8>,
280
281 /// The underlying nonblocking stream.
282 #[debug(ignore)]
283 stream: Box<dyn Stream>,
284}
285
286/// Helper to return which events a [`NonblockingStream`] is interested in.
287#[derive(Clone, Debug, Default, Copy)]
288pub(crate) struct WantIo {
289 /// True if the stream is interested in writing.
290 ///
291 /// (It is always interested in reading.)
292 write: bool,
293}
294
295#[allow(dead_code)] // TODO nb: remove or expose.
296impl WantIo {
297 /// Return true if the stream is interested in reading.
298 fn want_read(&self) -> bool {
299 true
300 }
301
302 /// Return true if the stream is interested in writing.
303 fn want_write(&self) -> bool {
304 self.write
305 }
306}
307
308impl From<WantIo> for mio::Interest {
309 fn from(value: WantIo) -> Self {
310 if value.write {
311 mio::Interest::WRITABLE | mio::Interest::READABLE
312 } else {
313 mio::Interest::READABLE
314 }
315 }
316}
317
318/// A return value from [`NonblockingStream::interact_once`].
319#[derive(Debug, Clone)]
320pub(crate) enum PollStatus {
321 /// The stream is closed.
322 Closed,
323
324 /// No progress can be made until the stream is available for further IO.
325 WouldBlock(WantIo),
326
327 /// We have received a message.
328 Msg(UnparsedResponse),
329}
330
331impl NonblockingStream {
332 /// Create a new `NonblockingStream` from a provided [`Waker`] and [`Stream`].
333 pub(crate) fn new(waker: Box<dyn Waker>, stream: Box<dyn Stream>) -> Self {
334 Self {
335 write_handle: WriteHandle {
336 inner: Arc::new(Mutex::new(WriteHandleImpl {
337 write_buf: Default::default(),
338 waker,
339 })),
340 },
341 read_buf: Default::default(),
342 stream,
343 }
344 }
345
346 /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
347 pub(crate) fn writer(&self) -> WriteHandle {
348 self.write_handle.clone()
349 }
350
351 /// Try to exchange messages with the RPC server.
352 ///
353 /// If `try_reading` is true, then we should try reading from the RPC server.
354 /// If `try_writing` is true, then we should try flushing messages to the RPC server
355 /// (if we have any).
356 ///
357 /// If the stream proves to be closed, returns [`PollStatus::Closed`].
358 ///
359 /// If a message is available, returns [`PollStatus::Msg`].
360 /// (Note that a message may be available in the internal buffer here even if try_reading is false.)
361 ///
362 /// If no message is available, return [`PollStatus::WouldBlock`] with a [`WantIo`]
363 /// describing which IO operations we would like to perform.
364 pub(crate) fn interact_once(
365 &mut self,
366 try_writing: bool,
367 try_reading: bool,
368 ) -> io::Result<PollStatus> {
369 use io::ErrorKind::WouldBlock;
370
371 if let Some(msg) = self.extract_msg()? {
372 return Ok(PollStatus::Msg(msg));
373 }
374
375 let mut want_io = WantIo::default();
376
377 if try_writing {
378 match self.flush_queue() {
379 Ok(()) => {}
380 Err(e) if e.kind() == WouldBlock => want_io.write = true,
381 Err(e) => return Err(e),
382 }
383 }
384 if try_reading {
385 match self.read_msg() {
386 Ok(Some(msg)) => return Ok(PollStatus::Msg(msg)),
387 Ok(None) => return Ok(PollStatus::Closed),
388 Err(e) if e.kind() == WouldBlock => {}
389 Err(e) => return Err(e),
390 }
391 }
392
393 if !want_io.write && self.has_data_to_write() {
394 want_io.write = true;
395 }
396
397 Ok(PollStatus::WouldBlock(want_io))
398 }
399
400 /// Internal helper: Try to get a buffered message out of our `read_buf`.
401 ///
402 /// Returns Ok(None) if there are no complete lines in the buffer.
403 ///
404 /// If there is a line, but it is not valid UTF-8, returns an error and discards the line.
405 fn extract_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
406 // Look for an eol within the buffer.
407 let Some(eol_pos) = memchr::memchr(b'\n', &self.read_buf[..]) else {
408 return Ok(None);
409 };
410 // Split off the part of the buffer ending with the EOF from the remainder.
411 let mut line = self.read_buf.split_off(eol_pos + 1);
412 // Put the message in "line" and the remainder of the buffer in read_buf.
413 mem::swap(&mut line, &mut self.read_buf);
414 // Try to convert the line to an UnparsedResponse.
415 match String::from_utf8(line) {
416 Ok(s) => Ok(Some(UnparsedResponse::new(s))),
417 Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
418 }
419 }
420
421 /// Internal helper: Return true if there is any outgoing data queued to be written.
422 fn has_data_to_write(&self) -> bool {
423 let w = self.write_handle.inner.lock().expect("Lock poisoned");
424 // See TOCTOU note on WriteHandleImpl: Our rule is to check whether we have data to write
425 // within the same lock used to hold the waker, so that we can't lose any data.
426 !w.write_buf.is_empty()
427 }
428
429 /// Helper: Try to get a message, reading into our read_buf as needed.
430 ///
431 /// (We don't use a BufReader here because its behavior with nonblocking IO is kind of underspecified.)
432 fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
433 const READLEN: usize = 4096;
434 loop {
435 if let Some(msg) = self.extract_msg()? {
436 return Ok(Some(msg));
437 }
438
439 let len_orig = self.read_buf.len();
440 // TODO: Impose a maximum length?
441 self.read_buf.resize(len_orig + READLEN, 0);
442 let result = retry_eintr(|| self.stream.read(&mut self.read_buf[len_orig..]));
443 match result {
444 Ok(0) => return Ok(None),
445 Ok(n) => {
446 self.read_buf.truncate(len_orig + n);
447 }
448 Err(e) => {
449 self.read_buf.truncate(len_orig);
450 return Err(e);
451 }
452 }
453 }
454 }
455
456 /// Try to flush data from the underlying write buffer.
457 ///
458 /// Returns Ok() only if all of the data is flushed, and the write buffer has become empty.
459 fn flush_queue(&mut self) -> io::Result<()> {
460 let mut w = self.write_handle.inner.lock().expect("Poisoned lock.");
461 loop {
462 if w.write_buf.is_empty() {
463 return Ok(());
464 }
465
466 let n = retry_eintr(|| self.stream.write(&w.write_buf[..]))?;
467 vec_pop_from_front(&mut w.write_buf, n);
468
469 // This is a no-op for the streams we support so far, but it could be necessary if
470 // we support more kinds in the future.
471 let () = retry_eintr(|| self.stream.flush())?;
472 }
473 }
474}
475
476/// Any type we can use as a target for [`NonblockingStream`].
477pub(crate) trait Stream: io::Read + io::Write + Send {
478 /// If this Stream object is a [`MioStream`], upcast it to one.
479 ///
480 /// Otherwise return None.
481 fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream>;
482}
483
484/// A [`Stream`] that we can use inside a [`PollingStream`].
485pub(crate) trait MioStream: Stream + mio::event::Source {}
486
487/// An object that can wake a pending IO poller.
488///
489/// When the underlying IO loop is `mio`, this is a [`mio::Waker`];
490/// otherwise, it is some user-provided type.
491pub(crate) trait Waker: Send + Sync {
492 /// Alert the polling thread.
493 fn wake(&mut self) -> io::Result<()>;
494}
495
496impl Waker for mio::Waker {
497 fn wake(&mut self) -> io::Result<()> {
498 mio::Waker::wake(self)
499 }
500}
501
502/// Implement Stream and MioStream for a related pair of types.
503macro_rules! impl_traits {
504 { $stream:ty => $mio_stream:ty } => {
505 impl Stream for $stream {
506 fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
507 None
508 }
509 }
510 impl Stream for $mio_stream {
511 fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
512 Some(self as _)
513 }
514 }
515 impl MioStream for $mio_stream {}
516 }
517}
518
519impl_traits! { std::net::TcpStream => mio::net::TcpStream }
520#[cfg(unix)]
521impl_traits! { std::os::unix::net::UnixStream => mio::net::UnixStream }
522
523/// Remove n elements from the front of v.
524///
525/// # Panics
526///
527/// Panics if `n > v.len()`.
528fn vec_pop_from_front(v: &mut Vec<u8>, n: usize) {
529 // This returns an iterator, but we don't need to actually iterate over the elements.
530 // The compiler appears to be smart enough to optimize it away.
531 // (Cargo asm indicates that this optimizes down to a memmove.)
532 v.drain(0..n);
533}
534
535/// Retry `f` until it returns Ok() or an error whose kind is not `Interrupted`.
536fn retry_eintr<F, T>(mut f: F) -> io::Result<T>
537where
538 F: FnMut() -> io::Result<T>,
539{
540 loop {
541 let r = f();
542 match r {
543 Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
544 _ => return r,
545 }
546 }
547}
548
549#[cfg(test)]
550mod test {
551 // @@ begin test lint list maintained by maint/add_warning @@
552 #![allow(clippy::bool_assert_comparison)]
553 #![allow(clippy::clone_on_copy)]
554 #![allow(clippy::dbg_macro)]
555 #![allow(clippy::mixed_attributes_style)]
556 #![allow(clippy::print_stderr)]
557 #![allow(clippy::print_stdout)]
558 #![allow(clippy::single_char_pattern)]
559 #![allow(clippy::unwrap_used)]
560 #![allow(clippy::unchecked_time_subtraction)]
561 #![allow(clippy::useless_vec)]
562 #![allow(clippy::needless_pass_by_value)]
563 //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
564
565 use std::cmp::min;
566
567 use super::*;
568
569 impl super::PollStatus {
570 fn unwrap_wantio(self) -> WantIo {
571 match self {
572 PollStatus::WouldBlock(want_io) => want_io,
573 other => panic!("Wanted WantIo; found {other:?}"),
574 }
575 }
576
577 fn unwrap_msg(self) -> UnparsedResponse {
578 match self {
579 PollStatus::Msg(msg) => msg,
580 other => panic!("Wanted Msg; found {other:?}"),
581 }
582 }
583 }
584
585 #[derive(Default, Debug)]
586 struct TestWaker {
587 n_wakes: usize,
588 }
589 impl Waker for TestWaker {
590 fn wake(&mut self) -> io::Result<()> {
591 self.n_wakes += 1;
592 Ok(())
593 }
594 }
595
596 // Helper: Simulates nonblocking IO.
597 //
598 // Has interior mutability so we can inspect it.
599 #[derive(Default, Debug, Clone)]
600 struct TestStream {
601 inner: Arc<Mutex<TestStreamInner>>,
602 }
603 #[derive(Default, Debug, Clone)]
604 struct TestStreamInner {
605 // Bytes that we have _received_ from the client.
606 received: Vec<u8>,
607 // Bytes that we are _sending_ to the client.
608 sending: Vec<u8>,
609 receive_capacity: Option<usize>,
610 }
611 impl TestStream {
612 fn push(&self, b: &[u8]) {
613 self.inner.lock().unwrap().sending.extend_from_slice(b);
614 }
615 fn drain(&self, n: usize) -> Vec<u8> {
616 let mut s = self.inner.lock().unwrap();
617 let n = min(n, s.received.len());
618 let mut v = vec![0_u8; n];
619 v[..].copy_from_slice(&s.received[..n]);
620 vec_pop_from_front(&mut s.received, n);
621 v
622 }
623 }
624
625 impl io::Read for TestStream {
626 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
627 let mut s = self.inner.lock().unwrap();
628 if s.sending.is_empty() {
629 return Err(io::ErrorKind::WouldBlock.into());
630 }
631
632 let n_to_copy = min(s.sending.len(), buf.len());
633 buf[..n_to_copy].copy_from_slice(&s.sending[..n_to_copy]);
634 vec_pop_from_front(&mut s.sending, n_to_copy);
635 Ok(n_to_copy)
636 }
637 }
638
639 impl io::Write for TestStream {
640 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
641 if buf.is_empty() {
642 return Ok(0);
643 }
644
645 let mut s = self.inner.lock().unwrap();
646
647 let n_to_copy = match s.receive_capacity {
648 Some(0) => return Err(io::ErrorKind::WouldBlock.into()),
649 Some(n) => min(n, buf.len()),
650 None => buf.len(),
651 };
652
653 s.received.extend_from_slice(&buf[..n_to_copy]);
654 if let Some(ref mut n) = s.receive_capacity {
655 *n -= n_to_copy;
656 }
657
658 Ok(n_to_copy)
659 }
660
661 fn flush(&mut self) -> io::Result<()> {
662 Ok(())
663 }
664 }
665 impl Stream for TestStream {
666 fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
667 None
668 }
669 }
670
671 #[test]
672 fn read_msg() {
673 let test_stream = TestStream::default();
674 let mut stream = NonblockingStream::new(
675 Box::new(TestWaker::default()),
676 Box::new(test_stream.clone()),
677 );
678
679 // Try interacting with nothing to do.
680 let r = stream.interact_once(true, true);
681 assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
682
683 // Give it a partial message.
684 test_stream.push(b"Hello world");
685 let r = stream.interact_once(true, true);
686 assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
687
688 // Finish the message.
689 test_stream.push(b"\nAnd many happy");
690 let r = stream.interact_once(true, true);
691 assert_eq!(r.unwrap().unwrap_msg().as_str(), "Hello world\n");
692
693 // Then it should block...
694 let r = stream.interact_once(true, true);
695 assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
696
697 // Finish two more messages, and leave a partial message.
698 test_stream.push(b" returns\nof the day\nto you!");
699 let r = stream.interact_once(true, true);
700 assert_eq!(r.unwrap().unwrap_msg().as_str(), "And many happy returns\n");
701 let r = stream.interact_once(true, true);
702 assert_eq!(r.unwrap().unwrap_msg().as_str(), "of the day\n");
703 }
704
705 #[test]
706 fn write_msg() {
707 let test_stream = TestStream::default();
708 let mut stream = NonblockingStream::new(
709 Box::new(TestWaker::default()),
710 Box::new(test_stream.clone()),
711 );
712 let writer = stream.writer();
713
714 // Make sure we can write in a nonblocking way...
715 let req1 = r#"{"id":7,
716 "obj":"foo",
717 "method":"arti:x-frob", "params":{},
718 "extra": "preserved"
719 }"#;
720 let v = ValidatedRequest::from_string_strict(req1).unwrap();
721 writer.send_valid(&v).unwrap();
722
723 // At this point the above request is queued, but won't be sent until we interact.
724 {
725 assert!(test_stream.inner.lock().unwrap().received.is_empty());
726 }
727
728 // Now interact. This will cause the whole request to get flushed.
729 let r = stream.interact_once(true, true);
730 assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
731
732 let m = test_stream.drain(v.as_ref().len());
733 assert_eq!(m, v.as_ref().as_bytes());
734
735 // Now try again, but with a blocked stream.
736 {
737 test_stream.inner.lock().unwrap().receive_capacity = Some(32);
738 }
739 writer.send_valid(&v).unwrap();
740
741 let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
742 assert_eq!(r.unwrap().unwrap_wantio().want_write(), true);
743 {
744 assert_eq!(test_stream.inner.lock().unwrap().received.len(), 32);
745 // Make the capacity unlimited.
746 test_stream.inner.lock().unwrap().receive_capacity = None;
747 }
748 let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
749 assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
750 let m = test_stream.drain(v.as_ref().len());
751 assert_eq!(m, v.as_ref().as_bytes());
752 }
753
754 // TODO nb: It would be good to have additional tests for the MIO code as well.
755}