Skip to main content

arti_rpc_client_core/
nb_stream.rs

1//! Low-level nonblocking stream implementation.
2//!
3//! This module defines two main types: [`NonblockingStream`].
4//! (a low-level type for use by external tools
5//! that want to implement their own nonblocking IO),
6//! and [`PollingStream`] (a slightly higher-level type
7//! that we use internally when we are asked to provide
8//! our own nonblocking IO loop(s)).
9//!
10//! This module also defines several traits for use by these types.
11//!
12//! Treats messages as unrelated strings, and validates outgoing messages for correctness.
13//!
14//! TODO nb: For now, nothing in this module is actually public; we'll want to expose some of these types.
15
16use mio::Interest;
17
18use crate::{
19    msgs::{request::ValidatedRequest, response::UnparsedResponse},
20    util::define_from_for_arc,
21};
22use std::{
23    io::{self, Read as _, Write as _},
24    mem,
25    sync::{Arc, Mutex},
26};
27
28/// An IO stream to Arti, along with any supporting logic necessary to check it for readiness.
29///
30/// Internally, this uses `mio` along with a [`NonblockingStream`] to check for events.
31///
32/// To use this type, mark the stream as nonblocking
33/// with e.g. [TcpStream::set_nonblocking](std::net::TcpStream::set_nonblocking),
34/// convert it into a [`mio::event::Source`],
35/// and pass it to [`PollingStream::new()`]
36///
37/// At this point, you can read and write messages via nonblocking IO.
38///
39/// The [`PollingStream::writer()`] method will return a handle that you can use from any thread
40/// that you can use to queue an outbound message.
41///
42/// No messages are actually sent or received unless some thread is calling [`PollingStream::interact()`].
43///
44/// ## Concurrency and interior mutability
45///
46/// A `PollingStream` has (limited) interior mutability.
47///
48/// Only a single call to `interact` can be made at the same time.
49/// So only one thread can be waiting for responses, and
50/// the caller of `interact` must demultiplex responses as necessary.
51///
52/// But, one or more [`WriteHandle`]s can be created,
53/// and these are `'static + Send + Sync`.
54/// Using `WriteHandle`, multiple threads can enqueue requests,
55/// with [`send_valid`](WriteHandle::send_valid), concurrently.
56///
57/// (All these restrictions imposed on the caller are enforced by the Rust type system.)
58#[derive(Debug)]
59pub(crate) struct PollingStream {
60    /// The poll object.
61    ///
62    /// (This typically corresponds to a kqueue or epoll handle.)
63    ///
64    /// ## IO Safety
65    ///
66    /// This object (semantically) contains references to the `fd`s or `SOCKETS`
67    /// of any inserted [`mio::event::Source`].  Therefore it must not outlive those sources.
68    /// Further, according to `mio`'s documentation, every Source must be deregistered
69    /// before it can be dropped.
70    ///
71    /// We ensure these properties are obeyed as follows:
72    ///  - We hold the stream via `stream`, the NonblockingStream member of this struct.
73    ///    We do not let anybody outside this module have the stream or the `Poll`.
74    ///  - We declare a Drop implementation that deregisters the stream.
75    ///    This method ensures that the stream is dropped before it is closed.
76    poll: mio::Poll,
77
78    /// A small buffer to receive IO readiness events.
79    events: mio::Events,
80
81    /// The underlying stream.
82    ///
83    /// Invariant: `stream.stream` is a [`MioStream`], so [`Stream::as_mio_stream`] will return
84    /// Some when we call it.
85    stream: NonblockingStream,
86}
87
88/// A `mio` token corresponding to the Waker we use to tell the interactor about new writes.
89const WAKE_TOKEN: mio::Token = mio::Token(0);
90
91/// A `mio` token corresponding to the Stream connecting to the RPC
92const STREAM_TOKEN: mio::Token = mio::Token(1);
93
94impl PollingStream {
95    /// Create a new PollingStream.
96    ///
97    /// The `stream` will be set to use nonblocking IO;
98    /// on Unix this will affect the behaviour of other `dup`s of the same fd!
99    pub(crate) fn new(stream: Box<dyn MioStream>) -> io::Result<Self> {
100        let poll = mio::Poll::new()?;
101        let waker = mio::Waker::new(poll.registry(), WAKE_TOKEN)?;
102
103        let stream = NonblockingStream::new(Box::new(waker), stream);
104
105        let mut cio = Self {
106            poll,
107            events: mio::Events::with_capacity(4),
108            stream,
109        };
110
111        // We register the stream here, since we want to use it exclusively with `reregister`
112        // later on.  We do not deregister the stream until `Drop::drop` is called.
113        cio.poll.registry().register(
114            cio.stream
115                .stream
116                .as_mio_stream()
117                .expect("logic error: not a mio stream."),
118            STREAM_TOKEN,
119            Interest::READABLE,
120        )?;
121
122        Ok(cio)
123    }
124
125    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
126    pub(crate) fn writer(&self) -> WriteHandle {
127        self.stream.writer()
128    }
129
130    /// Interact with the peer until some response is received.
131    ///
132    /// Sends all requests given to [`WriteHandle::send_valid`]
133    /// (including calls to `send_valid` made while `interact` is running)
134    /// while looking for a response from the server.
135    /// Returns when the first response is received.
136    ///
137    ///
138    /// Returns an error if an IO condition has failed.
139    /// Returns None if the other side has closed the stream.
140    /// Otherwise, returns an unparsed message from the RPC server.
141    ///
142    /// Unless some thread is calling this method, nobody will actually be reading or writing from
143    /// the [`PollingStream`], and so nobody's requests will be sent or answered.
144    pub(crate) fn interact(&mut self) -> io::Result<Option<UnparsedResponse>> {
145        // Should we try to read and write? Start out by assuming "yes".
146        let mut try_writing = true;
147        let mut try_reading = true;
148
149        loop {
150            // Try interacting with the underlying stream.
151            let want_io = match self.stream.interact_once(try_writing, try_reading)? {
152                PollStatus::Closed => return Ok(None),
153                PollStatus::Msg(msg) => return Ok(Some(msg)),
154                PollStatus::WouldBlock(w) => w,
155            };
156
157            // We're blocking on reading and possibly writing.  Register our interest,
158            // so that we get woken as appropriate.
159            self.poll.registry().reregister(
160                self.stream
161                    .stream
162                    .as_mio_stream()
163                    .expect("logic error: not a mio stream!"),
164                STREAM_TOKEN,
165                want_io.into(),
166            )?;
167
168            // Poll until the socket is ready to read or write,
169            // _or_ until somebody invokes the Waker because they have queued more to write.
170            let () = retry_eintr(|| self.poll.poll(&mut self.events, None))?;
171
172            // Now that we've been woken, see which events we've been woken with,
173            // and adjust our plans accordingly on the next time through the loop.
174            try_reading = false;
175            try_writing = false;
176            for event in self.events.iter() {
177                if event.token() == STREAM_TOKEN {
178                    if event.is_readable() {
179                        try_reading = true;
180                    }
181                    if event.is_writable() {
182                        try_writing = true;
183                    }
184                } else if event.token() == WAKE_TOKEN {
185                    try_writing = true;
186                }
187            }
188        }
189    }
190}
191
192impl Drop for PollingStream {
193    fn drop(&mut self) {
194        // IO SAFETY: See "IO Safety" note in documentation for PollingStream.
195        let s = self
196            .stream
197            .stream
198            .as_mio_stream()
199            .expect("Logic error: Stream was not a MIO stream.");
200        self.poll
201            .registry()
202            .deregister(s)
203            .expect("Deregister operation failed");
204    }
205}
206
207/// A handle that can be used to queue outgoing messages for a nonblocking stream.
208///
209/// Note that queueing a message has no effect unless some party is polling the stream,
210/// either with [`PollingStream::interact()`], or [`NonblockingStream::interact_once()`].
211#[derive(Clone, Debug)]
212pub(crate) struct WriteHandle {
213    /// The actual implementation type for this writer.
214    inner: Arc<Mutex<WriteHandleImpl>>,
215}
216
217impl WriteHandle {
218    /// Queue an outgoing message for a nonblocking stream.
219    pub(crate) fn send_valid(&self, msg: &ValidatedRequest) -> io::Result<()> {
220        let mut w = self.inner.lock().expect("Poisoned lock");
221        w.write_buf.extend_from_slice(msg.as_ref().as_bytes());
222
223        // See TOCTOU note on `WriteHandleImpl`: we need to wake() while we are holding the
224        // above mutex.
225        w.waker.wake()
226    }
227}
228
229/// An error that has occurred while sending a request.
230#[derive(Clone, Debug, thiserror::Error)]
231#[non_exhaustive]
232pub enum SendRequestError {
233    /// An IO error occurred while sending a request.
234    #[error("Unable to wake poling loop")]
235    Io(#[source] Arc<io::Error>),
236    /// We found a problem in the JSON while sending a request.
237    #[error("Invalid Json request")]
238    InvalidRequest(#[from] crate::InvalidRequestError),
239    /// Internal error while re-encoding request.  Should be impossible.
240    #[error("Unable to re-encode request after parsing it‽")]
241    ReEncode(#[source] Arc<serde_json::Error>),
242}
243define_from_for_arc!( io::Error => SendRequestError [Io] );
244
245/// The inner implementation for [`WriteHandle`].
246///
247/// NOTE: We need to be careful to avoid TOCTOU problems with this type:
248/// It would be bad if a writing thread called `waker.wake()`, and then the interactor checked the
249/// buffer and found it empty, and only then did the writing thread add to the buffer.
250///
251/// To solve this, we put the write_buf and the waker behind the same lock:
252/// While the interactor is checking the buffer, nobody is able to add to the buffer _or_ wake the
253/// interactor.
254#[derive(derive_more::Debug)]
255struct WriteHandleImpl {
256    /// An underlying buffer holding messages to be sent to the RPC server.
257    //
258    // TODO: Consider using a VecDeque or BytesMut or such.
259    write_buf: Vec<u8>,
260
261    /// The waker to use to wake the polling loop.
262    #[debug(ignore)]
263    waker: Box<dyn Waker>,
264}
265
266/// A lower-level implementation of nonblocking IO for an open stream to the RPC server.
267///
268/// Unlike [`PollingStream`], this type _does not_ handle the IO event polling loops:
269/// the caller is required to provide their own.
270#[derive(derive_more::Debug)]
271pub(crate) struct NonblockingStream {
272    /// A write handle used to write onto this stream.
273    #[debug(ignore)]
274    write_handle: WriteHandle,
275
276    /// A buffer of incoming messages (possibly partial) from the RPC server.
277    //
278    // TODO: Consider using a VecDeque or BytesMut or such.
279    read_buf: Vec<u8>,
280
281    /// The underlying nonblocking stream.
282    #[debug(ignore)]
283    stream: Box<dyn Stream>,
284}
285
286/// Helper to return which events a [`NonblockingStream`] is interested in.
287#[derive(Clone, Debug, Default, Copy)]
288pub(crate) struct WantIo {
289    /// True if the stream is interested in writing.
290    ///
291    /// (It is always interested in reading.)
292    write: bool,
293}
294
295#[allow(dead_code)] // TODO nb: remove or expose.
296impl WantIo {
297    /// Return true if the stream is interested in reading.
298    fn want_read(&self) -> bool {
299        true
300    }
301
302    /// Return true if the stream is interested in writing.
303    fn want_write(&self) -> bool {
304        self.write
305    }
306}
307
308impl From<WantIo> for mio::Interest {
309    fn from(value: WantIo) -> Self {
310        if value.write {
311            mio::Interest::WRITABLE | mio::Interest::READABLE
312        } else {
313            mio::Interest::READABLE
314        }
315    }
316}
317
318/// A return value from [`NonblockingStream::interact_once`].
319#[derive(Debug, Clone)]
320pub(crate) enum PollStatus {
321    /// The stream is closed.
322    Closed,
323
324    /// No progress can be made until the stream is available for further IO.
325    WouldBlock(WantIo),
326
327    /// We have received a message.
328    Msg(UnparsedResponse),
329}
330
331impl NonblockingStream {
332    /// Create a new `NonblockingStream` from a provided [`Waker`] and [`Stream`].
333    pub(crate) fn new(waker: Box<dyn Waker>, stream: Box<dyn Stream>) -> Self {
334        Self {
335            write_handle: WriteHandle {
336                inner: Arc::new(Mutex::new(WriteHandleImpl {
337                    write_buf: Default::default(),
338                    waker,
339                })),
340            },
341            read_buf: Default::default(),
342            stream,
343        }
344    }
345
346    /// Return a new [`WriteHandle`] that can be used to queue messages to be sent via this stream.
347    pub(crate) fn writer(&self) -> WriteHandle {
348        self.write_handle.clone()
349    }
350
351    /// Try to exchange messages with the RPC server.
352    ///
353    /// If `try_reading` is true, then we should try reading from the RPC server.
354    /// If `try_writing` is true, then we should try flushing messages to the RPC server
355    /// (if we have any).
356    ///
357    /// If the stream proves to be closed, returns [`PollStatus::Closed`].
358    ///
359    /// If a message is available, returns [`PollStatus::Msg`].
360    /// (Note that a message may be available in the internal buffer here even if try_reading is false.)
361    ///
362    /// If no message is available, return [`PollStatus::WouldBlock`] with a [`WantIo`]
363    /// describing which IO operations we would like to perform.
364    pub(crate) fn interact_once(
365        &mut self,
366        try_writing: bool,
367        try_reading: bool,
368    ) -> io::Result<PollStatus> {
369        use io::ErrorKind::WouldBlock;
370
371        if let Some(msg) = self.extract_msg()? {
372            return Ok(PollStatus::Msg(msg));
373        }
374
375        let mut want_io = WantIo::default();
376
377        if try_writing {
378            match self.flush_queue() {
379                Ok(()) => {}
380                Err(e) if e.kind() == WouldBlock => want_io.write = true,
381                Err(e) => return Err(e),
382            }
383        }
384        if try_reading {
385            match self.read_msg() {
386                Ok(Some(msg)) => return Ok(PollStatus::Msg(msg)),
387                Ok(None) => return Ok(PollStatus::Closed),
388                Err(e) if e.kind() == WouldBlock => {}
389                Err(e) => return Err(e),
390            }
391        }
392
393        if !want_io.write && self.has_data_to_write() {
394            want_io.write = true;
395        }
396
397        Ok(PollStatus::WouldBlock(want_io))
398    }
399
400    /// Internal helper: Try to get a buffered message out of our `read_buf`.
401    ///
402    /// Returns Ok(None) if there are no complete lines in the buffer.
403    ///
404    /// If there is a line, but it is not valid UTF-8, returns an error and discards the line.
405    fn extract_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
406        // Look for an eol within the buffer.
407        let Some(eol_pos) = memchr::memchr(b'\n', &self.read_buf[..]) else {
408            return Ok(None);
409        };
410        // Split off the part of the buffer ending with the EOF from the remainder.
411        let mut line = self.read_buf.split_off(eol_pos + 1);
412        // Put the message in "line" and the remainder of the buffer in read_buf.
413        mem::swap(&mut line, &mut self.read_buf);
414        // Try to convert the line to an UnparsedResponse.
415        match String::from_utf8(line) {
416            Ok(s) => Ok(Some(UnparsedResponse::new(s))),
417            Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)),
418        }
419    }
420
421    /// Internal helper: Return true if there is any outgoing data queued to be written.
422    fn has_data_to_write(&self) -> bool {
423        let w = self.write_handle.inner.lock().expect("Lock poisoned");
424        // See TOCTOU note on WriteHandleImpl: Our rule is to check whether we have data to write
425        // within the same lock used to hold the waker, so that we can't lose any data.
426        !w.write_buf.is_empty()
427    }
428
429    /// Helper: Try to get a message, reading into our read_buf as needed.
430    ///
431    /// (We don't use a BufReader here because its behavior with nonblocking IO is kind of underspecified.)
432    fn read_msg(&mut self) -> io::Result<Option<UnparsedResponse>> {
433        const READLEN: usize = 4096;
434        loop {
435            if let Some(msg) = self.extract_msg()? {
436                return Ok(Some(msg));
437            }
438
439            let len_orig = self.read_buf.len();
440            // TODO: Impose a maximum length?
441            self.read_buf.resize(len_orig + READLEN, 0);
442            let result = retry_eintr(|| self.stream.read(&mut self.read_buf[len_orig..]));
443            match result {
444                Ok(0) => return Ok(None),
445                Ok(n) => {
446                    self.read_buf.truncate(len_orig + n);
447                }
448                Err(e) => {
449                    self.read_buf.truncate(len_orig);
450                    return Err(e);
451                }
452            }
453        }
454    }
455
456    /// Try to flush data from the underlying write buffer.
457    ///
458    /// Returns Ok() only if all of the data is flushed, and the write buffer has become empty.
459    fn flush_queue(&mut self) -> io::Result<()> {
460        let mut w = self.write_handle.inner.lock().expect("Poisoned lock.");
461        loop {
462            if w.write_buf.is_empty() {
463                return Ok(());
464            }
465
466            let n = retry_eintr(|| self.stream.write(&w.write_buf[..]))?;
467            vec_pop_from_front(&mut w.write_buf, n);
468
469            // This is a no-op for the streams we support so far, but it could be necessary if
470            // we support more kinds in the future.
471            let () = retry_eintr(|| self.stream.flush())?;
472        }
473    }
474}
475
476/// Any type we can use as a target for [`NonblockingStream`].
477pub(crate) trait Stream: io::Read + io::Write + Send {
478    /// If this Stream object is a [`MioStream`], upcast it to one.
479    ///
480    /// Otherwise return None.
481    fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream>;
482}
483
484/// A [`Stream`] that we can use inside a [`PollingStream`].
485pub(crate) trait MioStream: Stream + mio::event::Source {}
486
487/// An object that can wake a pending IO poller.
488///
489/// When the underlying IO loop is `mio`, this is a [`mio::Waker`];
490/// otherwise, it is some user-provided type.
491pub(crate) trait Waker: Send + Sync {
492    /// Alert the polling thread.
493    fn wake(&mut self) -> io::Result<()>;
494}
495
496impl Waker for mio::Waker {
497    fn wake(&mut self) -> io::Result<()> {
498        mio::Waker::wake(self)
499    }
500}
501
502/// Implement Stream and MioStream for a related pair of types.
503macro_rules! impl_traits {
504    { $stream:ty => $mio_stream:ty } => {
505        impl Stream for $stream {
506            fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
507                None
508            }
509        }
510        impl Stream for $mio_stream {
511            fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
512                Some(self as _)
513            }
514        }
515        impl MioStream for $mio_stream {}
516    }
517}
518
519impl_traits! { std::net::TcpStream => mio::net::TcpStream }
520#[cfg(unix)]
521impl_traits! { std::os::unix::net::UnixStream => mio::net::UnixStream }
522
523/// Remove n elements from the front of v.
524///
525/// # Panics
526///
527/// Panics if `n > v.len()`.
528fn vec_pop_from_front(v: &mut Vec<u8>, n: usize) {
529    // This returns an iterator, but we don't need to actually iterate over the elements.
530    // The compiler appears to be smart enough to optimize it away.
531    // (Cargo asm indicates that this optimizes down to a memmove.)
532    v.drain(0..n);
533}
534
535/// Retry `f` until it returns Ok() or an error whose kind is not `Interrupted`.
536fn retry_eintr<F, T>(mut f: F) -> io::Result<T>
537where
538    F: FnMut() -> io::Result<T>,
539{
540    loop {
541        let r = f();
542        match r {
543            Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
544            _ => return r,
545        }
546    }
547}
548
549#[cfg(test)]
550mod test {
551    // @@ begin test lint list maintained by maint/add_warning @@
552    #![allow(clippy::bool_assert_comparison)]
553    #![allow(clippy::clone_on_copy)]
554    #![allow(clippy::dbg_macro)]
555    #![allow(clippy::mixed_attributes_style)]
556    #![allow(clippy::print_stderr)]
557    #![allow(clippy::print_stdout)]
558    #![allow(clippy::single_char_pattern)]
559    #![allow(clippy::unwrap_used)]
560    #![allow(clippy::unchecked_time_subtraction)]
561    #![allow(clippy::useless_vec)]
562    #![allow(clippy::needless_pass_by_value)]
563    //! <!-- @@ end test lint list maintained by maint/add_warning @@ -->
564
565    use std::cmp::min;
566
567    use super::*;
568
569    impl super::PollStatus {
570        fn unwrap_wantio(self) -> WantIo {
571            match self {
572                PollStatus::WouldBlock(want_io) => want_io,
573                other => panic!("Wanted WantIo; found {other:?}"),
574            }
575        }
576
577        fn unwrap_msg(self) -> UnparsedResponse {
578            match self {
579                PollStatus::Msg(msg) => msg,
580                other => panic!("Wanted Msg; found {other:?}"),
581            }
582        }
583    }
584
585    #[derive(Default, Debug)]
586    struct TestWaker {
587        n_wakes: usize,
588    }
589    impl Waker for TestWaker {
590        fn wake(&mut self) -> io::Result<()> {
591            self.n_wakes += 1;
592            Ok(())
593        }
594    }
595
596    // Helper: Simulates nonblocking IO.
597    //
598    // Has interior mutability so we can inspect it.
599    #[derive(Default, Debug, Clone)]
600    struct TestStream {
601        inner: Arc<Mutex<TestStreamInner>>,
602    }
603    #[derive(Default, Debug, Clone)]
604    struct TestStreamInner {
605        // Bytes that we have _received_ from the client.
606        received: Vec<u8>,
607        // Bytes that we are _sending_ to the client.
608        sending: Vec<u8>,
609        receive_capacity: Option<usize>,
610    }
611    impl TestStream {
612        fn push(&self, b: &[u8]) {
613            self.inner.lock().unwrap().sending.extend_from_slice(b);
614        }
615        fn drain(&self, n: usize) -> Vec<u8> {
616            let mut s = self.inner.lock().unwrap();
617            let n = min(n, s.received.len());
618            let mut v = vec![0_u8; n];
619            v[..].copy_from_slice(&s.received[..n]);
620            vec_pop_from_front(&mut s.received, n);
621            v
622        }
623    }
624
625    impl io::Read for TestStream {
626        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
627            let mut s = self.inner.lock().unwrap();
628            if s.sending.is_empty() {
629                return Err(io::ErrorKind::WouldBlock.into());
630            }
631
632            let n_to_copy = min(s.sending.len(), buf.len());
633            buf[..n_to_copy].copy_from_slice(&s.sending[..n_to_copy]);
634            vec_pop_from_front(&mut s.sending, n_to_copy);
635            Ok(n_to_copy)
636        }
637    }
638
639    impl io::Write for TestStream {
640        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
641            if buf.is_empty() {
642                return Ok(0);
643            }
644
645            let mut s = self.inner.lock().unwrap();
646
647            let n_to_copy = match s.receive_capacity {
648                Some(0) => return Err(io::ErrorKind::WouldBlock.into()),
649                Some(n) => min(n, buf.len()),
650                None => buf.len(),
651            };
652
653            s.received.extend_from_slice(&buf[..n_to_copy]);
654            if let Some(ref mut n) = s.receive_capacity {
655                *n -= n_to_copy;
656            }
657
658            Ok(n_to_copy)
659        }
660
661        fn flush(&mut self) -> io::Result<()> {
662            Ok(())
663        }
664    }
665    impl Stream for TestStream {
666        fn as_mio_stream(&mut self) -> Option<&mut dyn MioStream> {
667            None
668        }
669    }
670
671    #[test]
672    fn read_msg() {
673        let test_stream = TestStream::default();
674        let mut stream = NonblockingStream::new(
675            Box::new(TestWaker::default()),
676            Box::new(test_stream.clone()),
677        );
678
679        // Try interacting with nothing to do.
680        let r = stream.interact_once(true, true);
681        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
682
683        // Give it a partial message.
684        test_stream.push(b"Hello world");
685        let r = stream.interact_once(true, true);
686        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
687
688        // Finish the message.
689        test_stream.push(b"\nAnd many happy");
690        let r = stream.interact_once(true, true);
691        assert_eq!(r.unwrap().unwrap_msg().as_str(), "Hello world\n");
692
693        // Then it should block...
694        let r = stream.interact_once(true, true);
695        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
696
697        // Finish two more messages, and leave a partial message.
698        test_stream.push(b" returns\nof the day\nto you!");
699        let r = stream.interact_once(true, true);
700        assert_eq!(r.unwrap().unwrap_msg().as_str(), "And many happy returns\n");
701        let r = stream.interact_once(true, true);
702        assert_eq!(r.unwrap().unwrap_msg().as_str(), "of the day\n");
703    }
704
705    #[test]
706    fn write_msg() {
707        let test_stream = TestStream::default();
708        let mut stream = NonblockingStream::new(
709            Box::new(TestWaker::default()),
710            Box::new(test_stream.clone()),
711        );
712        let writer = stream.writer();
713
714        // Make sure we can write in a nonblocking way...
715        let req1 = r#"{"id":7,
716                 "obj":"foo",
717                 "method":"arti:x-frob", "params":{},
718                 "extra": "preserved"
719            }"#;
720        let v = ValidatedRequest::from_string_strict(req1).unwrap();
721        writer.send_valid(&v).unwrap();
722
723        // At this point the above request is queued, but won't be sent until we interact.
724        {
725            assert!(test_stream.inner.lock().unwrap().received.is_empty());
726        }
727
728        // Now interact. This will cause the whole request to get flushed.
729        let r = stream.interact_once(true, true);
730        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
731
732        let m = test_stream.drain(v.as_ref().len());
733        assert_eq!(m, v.as_ref().as_bytes());
734
735        // Now try again, but with a blocked stream.
736        {
737            test_stream.inner.lock().unwrap().receive_capacity = Some(32);
738        }
739        writer.send_valid(&v).unwrap();
740
741        let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
742        assert_eq!(r.unwrap().unwrap_wantio().want_write(), true);
743        {
744            assert_eq!(test_stream.inner.lock().unwrap().received.len(), 32);
745            // Make the capacity unlimited.
746            test_stream.inner.lock().unwrap().receive_capacity = None;
747        }
748        let r: Result<PollStatus, io::Error> = stream.interact_once(true, true);
749        assert_eq!(r.unwrap().unwrap_wantio().want_write(), false);
750        let m = test_stream.drain(v.as_ref().len());
751        assert_eq!(m, v.as_ref().as_bytes());
752    }
753
754    // TODO nb: It would be good to have additional tests for the MIO code as well.
755}