tokio_qapi/
lib.rs

1#![doc(html_root_url = "http://docs.rs/tokio-qapi/0.4.0")]
2
3#[cfg(feature = "qapi-qmp")]
4pub use qapi_qmp as qmp;
5
6#[cfg(feature = "qapi-qga")]
7pub use qapi_qga as qga;
8
9pub use qapi_spec::{Any, Dictionary, Empty, Command, Event, Error, ErrorClass, Timestamp};
10
11use std::mem::replace;
12use std::{io, str};
13use tokio_codec::Framed;
14use tokio_io::{AsyncRead, AsyncWrite};
15use futures::{Future, Poll, StartSend, Async, AsyncSink, Sink, Stream, try_ready};
16use futures::sync::BiLock;
17use futures::task::{self, Task};
18use bytes::BytesMut;
19use bytes::buf::FromBuf;
20use log::{trace, debug};
21
22mod codec;
23
24pub struct QapiFuture<C, S> {
25    state: QapiState<C, S>,
26}
27
28impl<C: Command, S> QapiFuture<C, S> {
29    pub fn new(stream: S, command: C) -> Self {
30        QapiFuture {
31            state: QapiState::Queue {
32                inner: stream,
33                value: command,
34            },
35        }
36    }
37}
38
39enum QapiState<C, S> {
40    Queue {
41        inner: S,
42        value: C,
43    },
44    Waiting {
45        inner: S,
46    },
47    None,
48}
49
50impl<C, S> QapiState<C, S> {
51    fn inner_mut(&mut self) -> Option<&mut S> {
52        match *self {
53            QapiState::Queue { ref mut inner, .. } => Some(inner),
54            QapiState::Waiting { ref mut inner } => Some(inner),
55            QapiState::None => None,
56        }
57    }
58
59    fn take_value(&mut self) -> Option<C> {
60        match replace(self, QapiState::None) {
61            QapiState::Queue { inner, value } => {
62                *self = QapiState::Waiting { inner: inner };
63                Some(value)
64            },
65            v @ QapiState::Waiting { .. } => {
66                *self = v;
67                None
68            },
69            QapiState::None => None,
70        }
71    }
72
73    fn take_inner(&mut self) -> Option<S> {
74        match replace(self, QapiState::None) {
75            QapiState::Queue { inner, .. } => {
76                Some(inner)
77            },
78            QapiState::Waiting { inner, .. } => {
79                Some(inner)
80            },
81            QapiState::None => None,
82        }
83    }
84
85    fn set_value(&mut self, v: C) {
86        match replace(self, QapiState::None) {
87            QapiState::Queue { inner, .. } => {
88                *self = QapiState::Queue { inner: inner, value: v };
89            },
90            QapiState::Waiting { inner } => {
91                *self = QapiState::Queue { inner: inner, value: v };
92            },
93            QapiState::None => unreachable!(),
94        }
95    }
96}
97
98impl<C, S, E> Future for QapiFuture<C, S>
99    where
100        S: Sink<SinkItem=Box<[u8]>, SinkError=E> + Stream<Error=E>,
101        S::Item: AsRef<[u8]>,
102        C: Command,
103        io::Error: From<E>,
104{
105    type Item = (Result<C::Ok, Error>, S);
106    type Error = io::Error;
107
108    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
109        trace!("QapiFuture::poll()");
110        match self.state.take_value() {
111            Some(v) => {
112                let encoded = encode_command(&v)?;
113                debug!("-> {}", str::from_utf8(&encoded).unwrap_or("utf8 decoding failed"));
114                // TODO: queue the vec instead of the value?
115                match self.state.inner_mut().unwrap().start_send(encoded) {
116                    Ok(AsyncSink::Ready) => self.poll(),
117                    Ok(AsyncSink::NotReady(..)) => {
118                        trace!("Failed to start_send, try later");
119                        self.state.set_value(v);
120                        Ok(Async::NotReady)
121                    },
122                    Err(e) => Err(e.into()),
123                }
124            },
125            None => {
126                let poll = if let Some(inner) = self.state.inner_mut() {
127                    trace!("QapiFuture::poll_complete");
128                    try_ready!(inner.poll_complete());
129
130                    trace!("QapiFuture::poll for data");
131                    try_ready!(inner.poll())
132                } else {
133                    panic!("future polled after returning value")
134                };
135
136                match poll {
137                    Some(t) => {
138                        let t = t.as_ref();
139                        let t: qapi_spec::Response<C::Ok> = serde_json::from_slice(&t)?;
140                        Ok(Async::Ready((t.result(), self.state.take_inner().unwrap())))
141                    },
142                    None => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response, got eof")),
143                }
144            },
145        }
146    }
147}
148
149struct QapiStreamInner<S> {
150    stream: S,
151    #[cfg(feature = "qapi-qmp")]
152    events: Vec<Box<[u8]>>,
153    response: Option<BytesMut>,
154    #[cfg(feature = "qapi-qmp")]
155    greeting: Option<Box<[u8]>>,
156    fused: bool,
157    fused_events: bool,
158    #[cfg(feature = "qapi-qmp")]
159    task_events: Option<Task>,
160    task_response: Option<Task>,
161}
162
163impl<S> QapiStreamInner<S> {
164    fn new(s: S) -> Self {
165        QapiStreamInner {
166            stream: s,
167            #[cfg(feature = "qapi-qmp")]
168            events: Default::default(),
169            response: Default::default(),
170            #[cfg(feature = "qapi-qmp")]
171            greeting: Default::default(),
172            fused: false,
173            fused_events: false,
174            #[cfg(feature = "qapi-qmp")]
175            task_events: None,
176            task_response: None,
177        }
178    }
179}
180
181impl<S> QapiStreamInner<S> where
182    S: Stream,
183    S::Item: AsRef<[u8]>,
184    S::Error: From<io::Error>,
185{
186    #[cfg(feature = "qapi-qmp")]
187    fn push_event(&mut self, e: &[u8]) {
188        if self.fused_events {
189            return
190        }
191
192        if let Some(ref task) = self.task_events {
193            self.events.push(e.to_owned().into_boxed_slice());
194
195            task.notify()
196        }
197    }
198
199    #[cfg(feature = "qapi-qmp")]
200    fn push_greeting(&mut self, g: &[u8]) {
201        self.greeting = Some(g.to_owned().into_boxed_slice());
202        if let Some(ref task) = self.task_response {
203            task.notify()
204        }
205    }
206
207    #[cfg(not(feature = "qapi-qmp"))]
208    fn push_event(&mut self, _: &[u8]) { }
209
210    #[cfg(not(feature = "qapi-qmp"))]
211    fn push_greeting(&mut self, _: &[u8]) { }
212
213    fn poll(&mut self) -> Poll<(), S::Error> {
214        match try_ready!(self.stream.poll()) {
215            Some(v) => {
216                let v = v.as_ref();
217                debug!("<- {}", str::from_utf8(v).unwrap_or("utf8 decoding failed"));
218                if v.starts_with(b"{\"QMP\":") {
219                    self.push_greeting(v);
220                } else if v.starts_with(b"{\"timestamp\":") || v.starts_with(b"{\"event\":") {
221                    self.push_event(v);
222                } else {
223                    self.response = Some(BytesMut::from_buf(v));
224                    if let Some(ref task) = self.task_response {
225                        task.notify()
226                    }
227                }
228
229                Ok(Async::Ready(()))
230            },
231            None => {
232                self.fused = true;
233                Ok(Async::Ready(()))
234            },
235        }
236    }
237
238    #[cfg(feature = "qapi-qmp")]
239    fn greeting(&mut self) -> Poll<Option<qmp::QapiCapabilities>, S::Error> {
240        match self.greeting.take() {
241            Some(g) => {
242                serde_json::from_slice(&g)
243                    .map_err(io::Error::from).map_err(From::from)
244                    .map(Async::Ready)
245            },
246            None => if self.fused {
247                Ok(Async::Ready(None))
248            } else {
249                Ok(Async::NotReady)
250            },
251        }
252    }
253
254    #[cfg(feature = "qapi-qmp")]
255    fn event(&mut self) -> Poll<Option<qmp::Event>, S::Error> {
256        match self.events.pop() {
257            Some(v) => {
258                let v = serde_json::from_slice(v.as_ref()).map_err(io::Error::from)?;
259                Ok(Async::Ready(Some(v)))
260            },
261            None => if self.fused || self.fused_events {
262                Ok(Async::Ready(None))
263            } else {
264                Ok(Async::NotReady)
265            },
266        }
267    }
268
269    fn response(&mut self) -> Poll<Option<BytesMut>, S::Error> {
270        match self.response.take() {
271            Some(v) => {
272                Ok(Async::Ready(Some(v)))
273            },
274            None => if self.fused {
275                Ok(Async::Ready(None))
276            } else {
277                Ok(Async::NotReady)
278            },
279        }
280    }
281}
282
283pub struct QapiStream<S> {
284    inner: BiLock<QapiStreamInner<S>>,
285}
286
287impl<S> QapiStream<S> {
288    pub fn new(stream: S) -> Self {
289        let mut inner = QapiStreamInner::new(stream);
290        inner.fused_events = true;
291        // TODO: why bother with a lock here, make it generic instead
292        let (inner, _) = BiLock::new(inner);
293
294        QapiStream {
295            inner: inner,
296        }
297    }
298
299    pub fn execute<C: Command>(self, command: C) -> QapiFuture<C, Self> {
300        QapiFuture::new(self, command)
301    }
302}
303
304#[cfg(feature = "qapi-qmp")]
305pub struct QapiEventStream<S> {
306    inner: BiLock<QapiStreamInner<S>>,
307}
308
309#[cfg(feature = "qapi-qmp")]
310impl<S> QapiEventStream<S> {
311    pub fn new(stream: S) -> (QapiStream<S>, QapiEventStream<S>) {
312        let inner = QapiStreamInner::new(stream);
313        let (inner0, inner1) = BiLock::new(inner);
314        (
315            QapiStream {
316                inner: inner0,
317            },
318            QapiEventStream {
319                inner: inner1,
320            }
321        )
322    }
323}
324
325#[cfg(feature = "qapi-qmp")]
326impl<S> Stream for QapiEventStream<S> where
327    S: Stream,
328    S::Item: AsRef<[u8]>,
329    S::Error: From<io::Error>,
330{
331    type Item = qmp::Event;
332    type Error = S::Error;
333
334    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
335        let mut inner = try_ready!(Ok::<_, Self::Error>(self.inner.poll_lock()));
336        inner.task_events = Some(task::current());
337        let _ = inner.poll()?;
338        match inner.event() {
339            Ok(Async::NotReady) => {
340                try_ready!(inner.poll());
341                inner.event()
342            },
343            v => v,
344        }
345    }
346}
347
348#[cfg(feature = "qapi-qmp")]
349impl<S> QapiStream<S> where
350    S: Stream,
351    S::Item: AsRef<[u8]>,
352    S::Error: From<io::Error>,
353{
354    pub fn poll_greeting(&mut self) -> Poll<Option<qmp::QapiCapabilities>, S::Error> {
355        let mut inner = try_ready!(Ok::<_, S::Error>(self.inner.poll_lock()));
356        inner.task_response = Some(task::current());
357        let _ = inner.poll()?;
358        match inner.greeting() {
359            Ok(Async::NotReady) => {
360                try_ready!(inner.poll());
361                inner.greeting()
362            },
363            v => v,
364        }
365    }
366}
367
368impl<S> Stream for QapiStream<S> where
369    S: Stream,
370    S::Item: AsRef<[u8]>,
371    S::Error: From<io::Error>,
372{
373    type Item = BytesMut;
374    type Error = S::Error;
375
376    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
377        trace!("QapiStream::poll()");
378        let mut inner = try_ready!(Ok::<_, Self::Error>(self.inner.poll_lock()));
379        inner.task_response = Some(task::current());
380        let _ = inner.poll()?;
381        match inner.response() {
382            Ok(Async::NotReady) => {
383                try_ready!(inner.poll());
384                inner.response()
385            },
386            v => v,
387        }
388    }
389}
390
391impl<S> Sink for QapiStream<S> where
392    S: Sink<SinkItem = Box<[u8]>>,
393    S::SinkError: From<io::Error>,
394{
395    type SinkItem = Box<[u8]>;
396    type SinkError = S::SinkError;
397
398    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
399        trace!("QapiStream::start_send()");
400        let mut inner = match self.inner.poll_lock() {
401            Async::Ready(inner) => inner,
402            Async::NotReady => return Ok(AsyncSink::NotReady(item)),
403        };
404
405        inner.stream.start_send(item)
406    }
407
408    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
409        trace!("QapiStream::poll_complete()");
410        let mut inner = try_ready!(Ok::<_, Self::SinkError>(self.inner.poll_lock()));
411        inner.stream.poll_complete()
412    }
413}
414
415pub type QapiDataStream<S> = Framed<S, codec::LineCodec>;
416
417pub fn data_stream<S: AsyncRead + AsyncWrite>(stream: S) -> QapiDataStream<S> {
418    Framed::new(stream, codec::LineCodec)
419}
420
421#[cfg(feature = "qapi-qmp")]
422pub fn event_stream<S: AsyncRead + AsyncWrite>(stream: S) -> (QapiStream<QapiDataStream<S>>, QapiEventStream<QapiDataStream<S>>) {
423    QapiEventStream::new(data_stream(stream))
424}
425
426pub fn stream<S: AsyncRead + AsyncWrite>(stream: S) -> QapiStream<QapiDataStream<S>> {
427    QapiStream::new(data_stream(stream))
428}
429
430#[cfg(feature = "qapi-qmp")]
431pub fn qmp_handshake<S>(stream: QapiStream<S>) -> QmpHandshake<S> {
432    QmpHandshake::new(stream)
433}
434
435#[cfg(feature = "qapi-qmp")]
436pub struct QmpHandshake<S> {
437    state: QmpHandshakeState<S>,
438}
439
440#[cfg(feature = "qapi-qmp")]
441impl<S> QmpHandshake<S> {
442    pub fn new(stream: QapiStream<S>) -> Self {
443        QmpHandshake {
444            state: QmpHandshakeState::Greeting {
445                stream: stream,
446            },
447        }
448    }
449}
450
451#[cfg(feature = "qapi-qmp")]
452enum QmpHandshakeState<S> {
453    None,
454    Greeting {
455        stream: QapiStream<S>,
456    },
457    Future {
458        greeting: Option<qmp::QMP>,
459        future: QapiFuture<qmp::qmp_capabilities, QapiStream<S>>,
460    },
461}
462
463#[cfg(feature = "qapi-qmp")]
464impl<S, E> Future for QmpHandshake<S> where
465    S: Stream<Error=E> + Sink<SinkItem=Box<[u8]>, SinkError=E>,
466    S::Item: AsRef<[u8]>,
467    S::Error: From<io::Error>,
468    io::Error: From<S::Error>,
469{
470    type Item = (qmp::QMP, QapiStream<S>);
471    type Error = S::Error;
472
473    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
474        let g = match self.state {
475            QmpHandshakeState::Greeting { ref mut stream } => {
476                try_ready!(stream.poll_greeting())
477            },
478            QmpHandshakeState::Future { ref mut future, ref mut greeting } => {
479                let (res, stream) = try_ready!(future.poll());
480                if let Err(e) = res { // weird type gymnastics here ._.
481                    let err: io::Error = From::from(e);
482                    return Err(err.into())
483                }
484                let greeting = greeting.take().unwrap();
485                return Ok(Async::Ready((greeting, stream)))
486            },
487            QmpHandshakeState::None => unreachable!(),
488        };
489
490        let g = match g {
491            Some(g) => g,
492            None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected handshake greeting").into()),
493        };
494
495        let stream = match replace(&mut self.state, QmpHandshakeState::None) {
496            QmpHandshakeState::Greeting { stream } => stream,
497            _ => unreachable!(),
498        };
499
500        self.state = QmpHandshakeState::Future {
501            greeting: Some(g.QMP),
502            future: QapiFuture::new(stream, qmp::qmp_capabilities { enable: None }),
503        };
504
505        self.poll()
506    }
507}
508
509#[cfg(feature = "qapi-qga")]
510pub fn qga_handshake<S>(stream: QapiStream<S>) -> QgaHandshake<S> {
511    let sync = &stream as *const _ as usize as _;
512    QgaHandshake::new(stream, sync)
513}
514
515#[cfg(feature = "qapi-qga")]
516pub struct QgaHandshake<S> {
517    expected: isize,
518    future: QapiFuture<qga::guest_sync, QapiStream<S>>,
519}
520
521#[cfg(feature = "qapi-qga")]
522impl<S> QgaHandshake<S> {
523    pub fn new(stream: QapiStream<S>, sync_value: isize) -> Self {
524        QgaHandshake {
525            expected: sync_value,
526            future: QapiFuture::new(stream, qga::guest_sync { id: sync_value }),
527        }
528    }
529}
530
531#[cfg(feature = "qapi-qga")]
532impl<S, E> Future for QgaHandshake<S> where
533    S: Stream<Error=E> + Sink<SinkItem=Box<[u8]>, SinkError=E>,
534    S::Item: AsRef<[u8]>,
535    S::Error: From<io::Error>,
536    io::Error: From<S::Error>,
537{
538    type Item = QapiStream<S>;
539    type Error = S::Error;
540
541    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
542        let (r, stream) = try_ready!(self.future.poll());
543        match r {
544            Ok(r) if r == self.expected => Ok(Async::Ready(stream)),
545            Ok(..) => Err(io::Error::new(io::ErrorKind::InvalidData, "guest-sync handshake failed").into()),
546            Err(e) => { // weird type gymnastics here ._.
547                let err: io::Error = From::from(e);
548                Err(err.into())
549            },
550        }
551    }
552}
553
554pub fn encode_command<C: Command>(c: &C) -> io::Result<Box<[u8]>> {
555    let mut encoded = serde_json::to_vec(&qapi_spec::CommandSerializerRef(c))?;
556    encoded.push(b'\n');
557    Ok(encoded.into_boxed_slice())
558}