1#![doc(html_root_url = "http://docs.rs/tokio-qapi/0.4.0")]
2
3#[cfg(feature = "qapi-qmp")]
4pub use qapi_qmp as qmp;
5
6#[cfg(feature = "qapi-qga")]
7pub use qapi_qga as qga;
8
9pub use qapi_spec::{Any, Dictionary, Empty, Command, Event, Error, ErrorClass, Timestamp};
10
11use std::mem::replace;
12use std::{io, str};
13use tokio_codec::Framed;
14use tokio_io::{AsyncRead, AsyncWrite};
15use futures::{Future, Poll, StartSend, Async, AsyncSink, Sink, Stream, try_ready};
16use futures::sync::BiLock;
17use futures::task::{self, Task};
18use bytes::BytesMut;
19use bytes::buf::FromBuf;
20use log::{trace, debug};
21
22mod codec;
23
24pub struct QapiFuture<C, S> {
25 state: QapiState<C, S>,
26}
27
28impl<C: Command, S> QapiFuture<C, S> {
29 pub fn new(stream: S, command: C) -> Self {
30 QapiFuture {
31 state: QapiState::Queue {
32 inner: stream,
33 value: command,
34 },
35 }
36 }
37}
38
39enum QapiState<C, S> {
40 Queue {
41 inner: S,
42 value: C,
43 },
44 Waiting {
45 inner: S,
46 },
47 None,
48}
49
50impl<C, S> QapiState<C, S> {
51 fn inner_mut(&mut self) -> Option<&mut S> {
52 match *self {
53 QapiState::Queue { ref mut inner, .. } => Some(inner),
54 QapiState::Waiting { ref mut inner } => Some(inner),
55 QapiState::None => None,
56 }
57 }
58
59 fn take_value(&mut self) -> Option<C> {
60 match replace(self, QapiState::None) {
61 QapiState::Queue { inner, value } => {
62 *self = QapiState::Waiting { inner: inner };
63 Some(value)
64 },
65 v @ QapiState::Waiting { .. } => {
66 *self = v;
67 None
68 },
69 QapiState::None => None,
70 }
71 }
72
73 fn take_inner(&mut self) -> Option<S> {
74 match replace(self, QapiState::None) {
75 QapiState::Queue { inner, .. } => {
76 Some(inner)
77 },
78 QapiState::Waiting { inner, .. } => {
79 Some(inner)
80 },
81 QapiState::None => None,
82 }
83 }
84
85 fn set_value(&mut self, v: C) {
86 match replace(self, QapiState::None) {
87 QapiState::Queue { inner, .. } => {
88 *self = QapiState::Queue { inner: inner, value: v };
89 },
90 QapiState::Waiting { inner } => {
91 *self = QapiState::Queue { inner: inner, value: v };
92 },
93 QapiState::None => unreachable!(),
94 }
95 }
96}
97
98impl<C, S, E> Future for QapiFuture<C, S>
99 where
100 S: Sink<SinkItem=Box<[u8]>, SinkError=E> + Stream<Error=E>,
101 S::Item: AsRef<[u8]>,
102 C: Command,
103 io::Error: From<E>,
104{
105 type Item = (Result<C::Ok, Error>, S);
106 type Error = io::Error;
107
108 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
109 trace!("QapiFuture::poll()");
110 match self.state.take_value() {
111 Some(v) => {
112 let encoded = encode_command(&v)?;
113 debug!("-> {}", str::from_utf8(&encoded).unwrap_or("utf8 decoding failed"));
114 match self.state.inner_mut().unwrap().start_send(encoded) {
116 Ok(AsyncSink::Ready) => self.poll(),
117 Ok(AsyncSink::NotReady(..)) => {
118 trace!("Failed to start_send, try later");
119 self.state.set_value(v);
120 Ok(Async::NotReady)
121 },
122 Err(e) => Err(e.into()),
123 }
124 },
125 None => {
126 let poll = if let Some(inner) = self.state.inner_mut() {
127 trace!("QapiFuture::poll_complete");
128 try_ready!(inner.poll_complete());
129
130 trace!("QapiFuture::poll for data");
131 try_ready!(inner.poll())
132 } else {
133 panic!("future polled after returning value")
134 };
135
136 match poll {
137 Some(t) => {
138 let t = t.as_ref();
139 let t: qapi_spec::Response<C::Ok> = serde_json::from_slice(&t)?;
140 Ok(Async::Ready((t.result(), self.state.take_inner().unwrap())))
141 },
142 None => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response, got eof")),
143 }
144 },
145 }
146 }
147}
148
149struct QapiStreamInner<S> {
150 stream: S,
151 #[cfg(feature = "qapi-qmp")]
152 events: Vec<Box<[u8]>>,
153 response: Option<BytesMut>,
154 #[cfg(feature = "qapi-qmp")]
155 greeting: Option<Box<[u8]>>,
156 fused: bool,
157 fused_events: bool,
158 #[cfg(feature = "qapi-qmp")]
159 task_events: Option<Task>,
160 task_response: Option<Task>,
161}
162
163impl<S> QapiStreamInner<S> {
164 fn new(s: S) -> Self {
165 QapiStreamInner {
166 stream: s,
167 #[cfg(feature = "qapi-qmp")]
168 events: Default::default(),
169 response: Default::default(),
170 #[cfg(feature = "qapi-qmp")]
171 greeting: Default::default(),
172 fused: false,
173 fused_events: false,
174 #[cfg(feature = "qapi-qmp")]
175 task_events: None,
176 task_response: None,
177 }
178 }
179}
180
181impl<S> QapiStreamInner<S> where
182 S: Stream,
183 S::Item: AsRef<[u8]>,
184 S::Error: From<io::Error>,
185{
186 #[cfg(feature = "qapi-qmp")]
187 fn push_event(&mut self, e: &[u8]) {
188 if self.fused_events {
189 return
190 }
191
192 if let Some(ref task) = self.task_events {
193 self.events.push(e.to_owned().into_boxed_slice());
194
195 task.notify()
196 }
197 }
198
199 #[cfg(feature = "qapi-qmp")]
200 fn push_greeting(&mut self, g: &[u8]) {
201 self.greeting = Some(g.to_owned().into_boxed_slice());
202 if let Some(ref task) = self.task_response {
203 task.notify()
204 }
205 }
206
207 #[cfg(not(feature = "qapi-qmp"))]
208 fn push_event(&mut self, _: &[u8]) { }
209
210 #[cfg(not(feature = "qapi-qmp"))]
211 fn push_greeting(&mut self, _: &[u8]) { }
212
213 fn poll(&mut self) -> Poll<(), S::Error> {
214 match try_ready!(self.stream.poll()) {
215 Some(v) => {
216 let v = v.as_ref();
217 debug!("<- {}", str::from_utf8(v).unwrap_or("utf8 decoding failed"));
218 if v.starts_with(b"{\"QMP\":") {
219 self.push_greeting(v);
220 } else if v.starts_with(b"{\"timestamp\":") || v.starts_with(b"{\"event\":") {
221 self.push_event(v);
222 } else {
223 self.response = Some(BytesMut::from_buf(v));
224 if let Some(ref task) = self.task_response {
225 task.notify()
226 }
227 }
228
229 Ok(Async::Ready(()))
230 },
231 None => {
232 self.fused = true;
233 Ok(Async::Ready(()))
234 },
235 }
236 }
237
238 #[cfg(feature = "qapi-qmp")]
239 fn greeting(&mut self) -> Poll<Option<qmp::QapiCapabilities>, S::Error> {
240 match self.greeting.take() {
241 Some(g) => {
242 serde_json::from_slice(&g)
243 .map_err(io::Error::from).map_err(From::from)
244 .map(Async::Ready)
245 },
246 None => if self.fused {
247 Ok(Async::Ready(None))
248 } else {
249 Ok(Async::NotReady)
250 },
251 }
252 }
253
254 #[cfg(feature = "qapi-qmp")]
255 fn event(&mut self) -> Poll<Option<qmp::Event>, S::Error> {
256 match self.events.pop() {
257 Some(v) => {
258 let v = serde_json::from_slice(v.as_ref()).map_err(io::Error::from)?;
259 Ok(Async::Ready(Some(v)))
260 },
261 None => if self.fused || self.fused_events {
262 Ok(Async::Ready(None))
263 } else {
264 Ok(Async::NotReady)
265 },
266 }
267 }
268
269 fn response(&mut self) -> Poll<Option<BytesMut>, S::Error> {
270 match self.response.take() {
271 Some(v) => {
272 Ok(Async::Ready(Some(v)))
273 },
274 None => if self.fused {
275 Ok(Async::Ready(None))
276 } else {
277 Ok(Async::NotReady)
278 },
279 }
280 }
281}
282
283pub struct QapiStream<S> {
284 inner: BiLock<QapiStreamInner<S>>,
285}
286
287impl<S> QapiStream<S> {
288 pub fn new(stream: S) -> Self {
289 let mut inner = QapiStreamInner::new(stream);
290 inner.fused_events = true;
291 let (inner, _) = BiLock::new(inner);
293
294 QapiStream {
295 inner: inner,
296 }
297 }
298
299 pub fn execute<C: Command>(self, command: C) -> QapiFuture<C, Self> {
300 QapiFuture::new(self, command)
301 }
302}
303
304#[cfg(feature = "qapi-qmp")]
305pub struct QapiEventStream<S> {
306 inner: BiLock<QapiStreamInner<S>>,
307}
308
309#[cfg(feature = "qapi-qmp")]
310impl<S> QapiEventStream<S> {
311 pub fn new(stream: S) -> (QapiStream<S>, QapiEventStream<S>) {
312 let inner = QapiStreamInner::new(stream);
313 let (inner0, inner1) = BiLock::new(inner);
314 (
315 QapiStream {
316 inner: inner0,
317 },
318 QapiEventStream {
319 inner: inner1,
320 }
321 )
322 }
323}
324
325#[cfg(feature = "qapi-qmp")]
326impl<S> Stream for QapiEventStream<S> where
327 S: Stream,
328 S::Item: AsRef<[u8]>,
329 S::Error: From<io::Error>,
330{
331 type Item = qmp::Event;
332 type Error = S::Error;
333
334 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
335 let mut inner = try_ready!(Ok::<_, Self::Error>(self.inner.poll_lock()));
336 inner.task_events = Some(task::current());
337 let _ = inner.poll()?;
338 match inner.event() {
339 Ok(Async::NotReady) => {
340 try_ready!(inner.poll());
341 inner.event()
342 },
343 v => v,
344 }
345 }
346}
347
348#[cfg(feature = "qapi-qmp")]
349impl<S> QapiStream<S> where
350 S: Stream,
351 S::Item: AsRef<[u8]>,
352 S::Error: From<io::Error>,
353{
354 pub fn poll_greeting(&mut self) -> Poll<Option<qmp::QapiCapabilities>, S::Error> {
355 let mut inner = try_ready!(Ok::<_, S::Error>(self.inner.poll_lock()));
356 inner.task_response = Some(task::current());
357 let _ = inner.poll()?;
358 match inner.greeting() {
359 Ok(Async::NotReady) => {
360 try_ready!(inner.poll());
361 inner.greeting()
362 },
363 v => v,
364 }
365 }
366}
367
368impl<S> Stream for QapiStream<S> where
369 S: Stream,
370 S::Item: AsRef<[u8]>,
371 S::Error: From<io::Error>,
372{
373 type Item = BytesMut;
374 type Error = S::Error;
375
376 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
377 trace!("QapiStream::poll()");
378 let mut inner = try_ready!(Ok::<_, Self::Error>(self.inner.poll_lock()));
379 inner.task_response = Some(task::current());
380 let _ = inner.poll()?;
381 match inner.response() {
382 Ok(Async::NotReady) => {
383 try_ready!(inner.poll());
384 inner.response()
385 },
386 v => v,
387 }
388 }
389}
390
391impl<S> Sink for QapiStream<S> where
392 S: Sink<SinkItem = Box<[u8]>>,
393 S::SinkError: From<io::Error>,
394{
395 type SinkItem = Box<[u8]>;
396 type SinkError = S::SinkError;
397
398 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
399 trace!("QapiStream::start_send()");
400 let mut inner = match self.inner.poll_lock() {
401 Async::Ready(inner) => inner,
402 Async::NotReady => return Ok(AsyncSink::NotReady(item)),
403 };
404
405 inner.stream.start_send(item)
406 }
407
408 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
409 trace!("QapiStream::poll_complete()");
410 let mut inner = try_ready!(Ok::<_, Self::SinkError>(self.inner.poll_lock()));
411 inner.stream.poll_complete()
412 }
413}
414
415pub type QapiDataStream<S> = Framed<S, codec::LineCodec>;
416
417pub fn data_stream<S: AsyncRead + AsyncWrite>(stream: S) -> QapiDataStream<S> {
418 Framed::new(stream, codec::LineCodec)
419}
420
421#[cfg(feature = "qapi-qmp")]
422pub fn event_stream<S: AsyncRead + AsyncWrite>(stream: S) -> (QapiStream<QapiDataStream<S>>, QapiEventStream<QapiDataStream<S>>) {
423 QapiEventStream::new(data_stream(stream))
424}
425
426pub fn stream<S: AsyncRead + AsyncWrite>(stream: S) -> QapiStream<QapiDataStream<S>> {
427 QapiStream::new(data_stream(stream))
428}
429
430#[cfg(feature = "qapi-qmp")]
431pub fn qmp_handshake<S>(stream: QapiStream<S>) -> QmpHandshake<S> {
432 QmpHandshake::new(stream)
433}
434
435#[cfg(feature = "qapi-qmp")]
436pub struct QmpHandshake<S> {
437 state: QmpHandshakeState<S>,
438}
439
440#[cfg(feature = "qapi-qmp")]
441impl<S> QmpHandshake<S> {
442 pub fn new(stream: QapiStream<S>) -> Self {
443 QmpHandshake {
444 state: QmpHandshakeState::Greeting {
445 stream: stream,
446 },
447 }
448 }
449}
450
451#[cfg(feature = "qapi-qmp")]
452enum QmpHandshakeState<S> {
453 None,
454 Greeting {
455 stream: QapiStream<S>,
456 },
457 Future {
458 greeting: Option<qmp::QMP>,
459 future: QapiFuture<qmp::qmp_capabilities, QapiStream<S>>,
460 },
461}
462
463#[cfg(feature = "qapi-qmp")]
464impl<S, E> Future for QmpHandshake<S> where
465 S: Stream<Error=E> + Sink<SinkItem=Box<[u8]>, SinkError=E>,
466 S::Item: AsRef<[u8]>,
467 S::Error: From<io::Error>,
468 io::Error: From<S::Error>,
469{
470 type Item = (qmp::QMP, QapiStream<S>);
471 type Error = S::Error;
472
473 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
474 let g = match self.state {
475 QmpHandshakeState::Greeting { ref mut stream } => {
476 try_ready!(stream.poll_greeting())
477 },
478 QmpHandshakeState::Future { ref mut future, ref mut greeting } => {
479 let (res, stream) = try_ready!(future.poll());
480 if let Err(e) = res { let err: io::Error = From::from(e);
482 return Err(err.into())
483 }
484 let greeting = greeting.take().unwrap();
485 return Ok(Async::Ready((greeting, stream)))
486 },
487 QmpHandshakeState::None => unreachable!(),
488 };
489
490 let g = match g {
491 Some(g) => g,
492 None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected handshake greeting").into()),
493 };
494
495 let stream = match replace(&mut self.state, QmpHandshakeState::None) {
496 QmpHandshakeState::Greeting { stream } => stream,
497 _ => unreachable!(),
498 };
499
500 self.state = QmpHandshakeState::Future {
501 greeting: Some(g.QMP),
502 future: QapiFuture::new(stream, qmp::qmp_capabilities { enable: None }),
503 };
504
505 self.poll()
506 }
507}
508
509#[cfg(feature = "qapi-qga")]
510pub fn qga_handshake<S>(stream: QapiStream<S>) -> QgaHandshake<S> {
511 let sync = &stream as *const _ as usize as _;
512 QgaHandshake::new(stream, sync)
513}
514
515#[cfg(feature = "qapi-qga")]
516pub struct QgaHandshake<S> {
517 expected: isize,
518 future: QapiFuture<qga::guest_sync, QapiStream<S>>,
519}
520
521#[cfg(feature = "qapi-qga")]
522impl<S> QgaHandshake<S> {
523 pub fn new(stream: QapiStream<S>, sync_value: isize) -> Self {
524 QgaHandshake {
525 expected: sync_value,
526 future: QapiFuture::new(stream, qga::guest_sync { id: sync_value }),
527 }
528 }
529}
530
531#[cfg(feature = "qapi-qga")]
532impl<S, E> Future for QgaHandshake<S> where
533 S: Stream<Error=E> + Sink<SinkItem=Box<[u8]>, SinkError=E>,
534 S::Item: AsRef<[u8]>,
535 S::Error: From<io::Error>,
536 io::Error: From<S::Error>,
537{
538 type Item = QapiStream<S>;
539 type Error = S::Error;
540
541 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
542 let (r, stream) = try_ready!(self.future.poll());
543 match r {
544 Ok(r) if r == self.expected => Ok(Async::Ready(stream)),
545 Ok(..) => Err(io::Error::new(io::ErrorKind::InvalidData, "guest-sync handshake failed").into()),
546 Err(e) => { let err: io::Error = From::from(e);
548 Err(err.into())
549 },
550 }
551 }
552}
553
554pub fn encode_command<C: Command>(c: &C) -> io::Result<Box<[u8]>> {
555 let mut encoded = serde_json::to_vec(&qapi_spec::CommandSerializerRef(c))?;
556 encoded.push(b'\n');
557 Ok(encoded.into_boxed_slice())
558}