qapi/futures/
mod.rs

1#[cfg(feature = "qapi-qmp")]
2use qapi_qmp::{QmpMessage, QmpMessageAny, QapiCapabilities, QMPCapability};
3
4use qapi_spec::Response;
5use crate::{Any, Execute, ExecuteResult, Command};
6
7use std::collections::BTreeMap;
8use std::convert::TryInto;
9use std::marker::Unpin;
10use std::sync::{Arc, Mutex as StdMutex, atomic::{AtomicUsize, AtomicBool, Ordering}};
11use std::task::{Context, Poll};
12use std::pin::Pin;
13use std::io;
14use futures::channel::oneshot;
15use futures::task::AtomicWaker;
16use futures::lock::Mutex;
17use futures::{Future, FutureExt, Sink, SinkExt, Stream};
18use serde::Deserialize;
19use log::{trace, info, warn};
20
21#[cfg(feature = "tokio-util")]
22mod codec;
23
24#[cfg(feature = "tokio")]
25mod tokio;
26#[cfg(feature = "tokio")]
27pub use self::tokio::*;
28
29#[cfg(feature = "tower-service")]
30mod tower;
31
32pub struct QapiStream<R, W> {
33    service: QapiService<W>,
34    events: QapiEvents<R>,
35}
36
37impl<R, W> QapiStream<R, W> {
38    pub fn with_parts(service: QapiService<W>, events: QapiEvents<R>) -> Self {
39        Self {
40            service,
41            events,
42        }
43    }
44
45    pub fn into_parts(self) -> (QapiService<W>, QapiEvents<R>) {
46        (self.service, self.events)
47    }
48
49    #[cfg(feature = "async-tokio-spawn")]
50    pub fn spawn_tokio(self) -> (QapiService<W>, ::tokio::task::JoinHandle<()>) where
51        QapiEvents<R>: Future<Output=io::Result<()>> + Send + 'static,
52    {
53        let handle = self.events.spawn_tokio();
54        (self.service, handle)
55    }
56
57    pub fn execute<'a, C: Command + 'a>(&'a mut self, command: C) -> impl Future<Output=ExecuteResult<C>> + 'a where
58        QapiEvents<R>: Future<Output=io::Result<()>> + Unpin,
59        W: Sink<Execute<C, u32>, Error=io::Error> + Unpin
60    {
61        let execute = self.service.execute(command).fuse();
62
63        async move {
64            futures::pin_mut!(execute);
65
66            futures::select_biased! {
67                res = execute => res,
68                res = (&mut self.events).fuse() => {
69                    res?;
70                    Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF when executing command").into())
71                },
72            }
73        }
74    }
75}
76
77#[cfg(feature = "qapi-qmp")]
78pub struct QmpStreamNegotiation<S, W> {
79    pub stream: QapiStream<S, W>,
80    pub capabilities: QapiCapabilities,
81}
82
83#[cfg(feature = "qapi-qmp")]
84impl<S, W> QmpStreamNegotiation<S, W> where
85    QapiEvents<S>: Future<Output=io::Result<()>> + Unpin,
86    W: Sink<Execute<qapi_qmp::qmp_capabilities, u32>, Error=io::Error> + Unpin,
87{
88    pub async fn negotiate_caps<C>(mut self, caps: C) -> io::Result<QapiStream<S, W>> where
89        C: IntoIterator<Item=QMPCapability>,
90    {
91        let _ = self.stream.execute(qapi_qmp::qmp_capabilities {
92            enable: Some(caps.into_iter().collect()),
93        }).await?;
94
95        Ok(self.stream)
96    }
97
98    pub async fn negotiate(self) -> io::Result<QapiStream<S, W>> {
99        self.negotiate_caps(std::iter::empty()).await
100    }
101}
102
103type QapiCommandMap = BTreeMap<u32, oneshot::Sender<Result<Any, qapi_spec::Error>>>;
104
105pub struct QapiService<W> {
106    shared: Arc<QapiShared>,
107    write: Arc<Mutex<W>>,
108    id_counter: AtomicUsize,
109}
110
111impl<W> QapiService<W> {
112    #[cfg(feature = "tokio")]
113    fn new(write: W, shared: Arc<QapiShared>) -> Self {
114        QapiService {
115            shared,
116            write: Mutex::new(write).into(),
117            id_counter: AtomicUsize::new(0),
118        }
119    }
120
121    fn next_oob_id(&self) -> u32 {
122        self.id_counter.fetch_add(1, Ordering::Relaxed) as _
123    }
124
125    fn command_id(&self) -> Option<u32> {
126        if self.shared.supports_oob {
127            Some(self.next_oob_id())
128        } else {
129            None
130        }
131    }
132
133    fn command_response<C: Command>(receiver: oneshot::Receiver<Result<Any, qapi_spec::Error>>) -> impl Future<Output=ExecuteResult<C>> {
134        receiver.map(|res| match res {
135            Ok(Ok(res)) => C::Ok::deserialize(&res)
136                .map_err(io::Error::from).map_err(From::from),
137            Ok(Err(e)) => Err(e.into()),
138            Err(_cancelled) => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "QAPI stream disconnected").into()),
139        })
140    }
141
142    pub fn execute<C: Command>(&self, command: C) -> impl Future<Output=ExecuteResult<C>> where
143        W: Sink<Execute<C, u32>, Error=io::Error> + Unpin
144    {
145        let id = self.command_id();
146        let sink = self.write.clone();
147        let shared = self.shared.clone();
148        let command = Execute::new(command, id);
149
150        async move {
151            let mut sink = sink.lock().await;
152            let receiver = shared.command_insert(id.unwrap_or_default());
153
154            sink.send(command).await?;
155            if id.is_some() {
156                // retain write lock only if id/oob execution isn't supported
157                drop(sink)
158            }
159
160            Self::command_response::<C>(receiver).await
161        }
162    }
163
164    /*pub async fn execute_oob<C: Command>(&self, command: C) -> io::Result<ExecuteResult<C>> {
165        /* TODO: should we assert C::ALLOW_OOB here and/or at the type level?
166         * If oob isn't supported should we fall back to serial execution or error?
167         */
168        self.execute_(command, true).await
169    }*/
170
171    #[cfg(feature = "qapi-qga")]
172    pub fn guest_sync(&self, sync_value: i32) -> impl Future<Output=Result<(), crate::ExecuteError>> where
173        W: Sink<Execute<qapi_qga::guest_sync, u32>, Error=io::Error> + Unpin
174    {
175        let id = sync_value.into();
176        self.execute(qapi_qga::guest_sync {
177            id,
178        }).map(move |res| res.and_then(|res| if res == id {
179            Ok(())
180        } else {
181            Err(io::Error::new(io::ErrorKind::InvalidData, "QGA sync failed").into())
182        }))
183    }
184
185    fn stop(&self) {
186        let mut commands = self.shared.commands.lock().unwrap();
187        if self.shared.abandoned.load(Ordering::Relaxed) {
188            self.shared.stop();
189        }
190        commands.abandoned = true;
191    }
192}
193
194impl<W> Drop for QapiService<W> {
195    fn drop(&mut self) {
196        self.stop();
197    }
198}
199
200#[derive(Default)]
201struct QapiSharedCommands {
202    pending: QapiCommandMap,
203    abandoned: bool,
204}
205
206struct QapiShared {
207    commands: StdMutex<QapiSharedCommands>,
208    stop_waker: AtomicWaker,
209    stop: AtomicBool,
210    abandoned: AtomicBool,
211    supports_oob: bool,
212}
213
214impl QapiShared {
215    #[cfg(feature = "tokio")]
216    fn new(supports_oob: bool) -> Self {
217        Self {
218            commands: Default::default(),
219            stop_waker: Default::default(),
220            stop: Default::default(),
221            abandoned: Default::default(),
222            supports_oob,
223        }
224    }
225
226    fn stop(&self) {
227        self.stop.store(true, Ordering::Relaxed);
228        self.stop_waker.wake();
229    }
230
231    fn is_stopped(&self) -> bool {
232        self.stop.load(Ordering::Relaxed)
233    }
234
235    fn poll_next<T, P: FnOnce(&mut Context) -> Poll<Option<T>>>(&self, cx: &mut Context, poll: P) -> Poll<Option<T>> {
236        if self.is_stopped() {
237            return Poll::Ready(None)
238        }
239
240        // attempt to complete the future
241        match poll(cx) {
242            Poll::Ready(res) => {
243                if res.is_none() {
244                    self.stop.store(true, Ordering::Relaxed);
245                }
246                Poll::Ready(res)
247            },
248            Poll::Pending => {
249                self.stop_waker.register(cx.waker());
250                if self.is_stopped() {
251                    Poll::Ready(None)
252                } else {
253                    Poll::Pending
254                }
255            },
256        }
257    }
258
259    fn command_remove(&self, id: u32) -> Option<oneshot::Sender<Result<Any, qapi_spec::Error>>> {
260        let mut commands = self.commands.lock().unwrap();
261        commands.pending.remove(&id)
262    }
263
264    fn command_insert(&self, id: u32) -> oneshot::Receiver<Result<Any, qapi_spec::Error>> {
265        let (sender, receiver) = oneshot::channel();
266        let mut commands = self.commands.lock().unwrap();
267        if !commands.abandoned {
268            // otherwise sender is dropped immediately
269            if let Some(_prev) = commands.pending.insert(id, sender) {
270                panic!("QAPI duplicate command id {:?}, this should not happen", id);
271            }
272        }
273        receiver
274    }
275}
276
277#[must_use]
278pub struct QapiEvents<S> {
279    stream: S,
280    shared: Arc<QapiShared>,
281}
282
283impl<S> QapiEvents<S> {
284    pub fn release(&self) -> Result<(), ()> {
285        let commands = self.shared.commands.lock().unwrap();
286        if commands.abandoned {
287            Err(())
288        } else {
289            self.shared.abandoned.store(true, Ordering::Relaxed);
290            Ok(())
291        }
292    }
293
294    pub async fn into_future(self) -> () where
295        Self: Future<Output=io::Result<()>>,
296    {
297        if self.release().is_err() {
298            info!("QAPI service abandoned before spawning");
299            return
300        }
301
302        match self.await {
303            Ok(()) => (),
304            Err(e) =>
305                warn!("QAPI stream closed with error {:?}", e),
306        }
307    }
308
309    pub fn spawn<SP: futures::task::Spawn>(self, spawn: SP) -> Result<(), futures::task::SpawnError> where
310        Self: Future<Output=io::Result<()>> + Send + 'static,
311        S: 'static
312    {
313        use futures::task::SpawnExt;
314
315        spawn.spawn(self.into_future())
316    }
317
318    #[cfg(feature = "async-tokio-spawn")]
319    pub fn spawn_tokio(self) -> ::tokio::task::JoinHandle<()> where
320        Self: Future<Output=io::Result<()>> + Send + 'static,
321        S: 'static
322    {
323        ::tokio::spawn(self.into_future())
324    }
325}
326
327impl<S> Drop for QapiEvents<S> {
328    fn drop(&mut self) {
329        let mut commands = self.shared.commands.lock().unwrap();
330        commands.pending.clear();
331        commands.abandoned = true;
332    }
333}
334
335fn response_id<T>(res: &Response<T>, supports_oob: bool) -> io::Result<u32> {
336    match (res.id().and_then(|id| id.as_u64()), supports_oob) {
337        (Some(id), true) =>
338            id.try_into().map_err(|e|
339                io::Error::new(io::ErrorKind::InvalidData, e)
340            ),
341        (None, false) =>
342            Ok(Default::default()),
343        (None, true) =>
344            Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response with numeric ID, got {:?}", res.id()))),
345        (Some(..), false) =>
346            Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response without ID, got {:?}", res.id()))),
347    }
348}
349
350fn handle_response(shared: &QapiShared, res: Response<Any>) -> io::Result<()> {
351    let id = response_id(&res, shared.supports_oob)?;
352
353    if let Some(sender) = shared.command_remove(id) {
354        sender.send(res.result()).map_err(|_e|
355            io::Error::new(io::ErrorKind::InvalidData, format!("failed to send response for ID {:?}", id))
356        )
357    } else {
358        Err(io::Error::new(io::ErrorKind::InvalidData, format!("unknown QAPI response with ID {:?}", res.id())))
359    }
360}
361
362impl<M, S> Future for QapiEvents<S> where
363    S: Stream<Item=io::Result<M>>,
364    M: TryInto<Response<Any>>,
365{
366    type Output = io::Result<()>;
367
368    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
369        let this = unsafe { self.get_unchecked_mut() };
370        let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
371        let shared = &this.shared;
372
373        shared.poll_next(cx, |cx| Poll::Ready(Some(match futures::ready!(stream.poll_next(cx)) {
374            None => return Poll::Ready(None),
375            Some(Err(e)) => Err(e),
376            Some(Ok(res)) => match res.try_into() {
377                Ok(res) => match handle_response(shared, res) {
378                    Err(e) => Err(e),
379                    Ok(()) => {
380                        cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
381                        return Poll::Pending
382                    },
383                },
384                Err(..) => {
385                    trace!("Ignoring QAPI event");
386                    cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
387                    return Poll::Pending
388                },
389            },
390        }))).map(|res| res.unwrap_or(Ok(())))
391    }
392}
393
394#[cfg(feature = "qapi-qmp")]
395impl<S: Stream<Item=io::Result<QmpMessageAny>>> Stream for QapiEvents<S> {
396    type Item = io::Result<qapi_qmp::Event>;
397
398    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
399        let this = unsafe { self.get_unchecked_mut() };
400        let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
401        let shared = &this.shared;
402
403        shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) {
404            None => None, // eof
405            Some(Err(e)) => Some(Err(e)),
406            Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)),
407            Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) {
408                Err(e) => Some(Err(e)),
409                Ok(()) => {
410                    cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
411                    return Poll::Pending
412                },
413            },
414        }))
415    }
416}