1#[cfg(feature = "qapi-qmp")]
2use qapi_qmp::{QmpMessage, QmpMessageAny, QapiCapabilities, QMPCapability};
3
4use qapi_spec::Response;
5use crate::{Any, Execute, ExecuteResult, Command};
6
7use std::collections::BTreeMap;
8use std::convert::TryInto;
9use std::marker::Unpin;
10use std::sync::{Arc, Mutex as StdMutex, atomic::{AtomicUsize, AtomicBool, Ordering}};
11use std::task::{Context, Poll};
12use std::pin::Pin;
13use std::io;
14use futures::channel::oneshot;
15use futures::task::AtomicWaker;
16use futures::lock::Mutex;
17use futures::{Future, FutureExt, Sink, SinkExt, Stream};
18use serde::Deserialize;
19use log::{trace, info, warn};
20
21#[cfg(feature = "tokio-util")]
22mod codec;
23
24#[cfg(feature = "tokio")]
25mod tokio;
26#[cfg(feature = "tokio")]
27pub use self::tokio::*;
28
29#[cfg(feature = "tower-service")]
30mod tower;
31
32pub struct QapiStream<R, W> {
33 service: QapiService<W>,
34 events: QapiEvents<R>,
35}
36
37impl<R, W> QapiStream<R, W> {
38 pub fn with_parts(service: QapiService<W>, events: QapiEvents<R>) -> Self {
39 Self {
40 service,
41 events,
42 }
43 }
44
45 pub fn into_parts(self) -> (QapiService<W>, QapiEvents<R>) {
46 (self.service, self.events)
47 }
48
49 #[cfg(feature = "async-tokio-spawn")]
50 pub fn spawn_tokio(self) -> (QapiService<W>, ::tokio::task::JoinHandle<()>) where
51 QapiEvents<R>: Future<Output=io::Result<()>> + Send + 'static,
52 {
53 let handle = self.events.spawn_tokio();
54 (self.service, handle)
55 }
56
57 pub fn execute<'a, C: Command + 'a>(&'a mut self, command: C) -> impl Future<Output=ExecuteResult<C>> + 'a where
58 QapiEvents<R>: Future<Output=io::Result<()>> + Unpin,
59 W: Sink<Execute<C, u32>, Error=io::Error> + Unpin
60 {
61 let execute = self.service.execute(command).fuse();
62
63 async move {
64 futures::pin_mut!(execute);
65
66 futures::select_biased! {
67 res = execute => res,
68 res = (&mut self.events).fuse() => {
69 res?;
70 Err(io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF when executing command").into())
71 },
72 }
73 }
74 }
75}
76
77#[cfg(feature = "qapi-qmp")]
78pub struct QmpStreamNegotiation<S, W> {
79 pub stream: QapiStream<S, W>,
80 pub capabilities: QapiCapabilities,
81}
82
83#[cfg(feature = "qapi-qmp")]
84impl<S, W> QmpStreamNegotiation<S, W> where
85 QapiEvents<S>: Future<Output=io::Result<()>> + Unpin,
86 W: Sink<Execute<qapi_qmp::qmp_capabilities, u32>, Error=io::Error> + Unpin,
87{
88 pub async fn negotiate_caps<C>(mut self, caps: C) -> io::Result<QapiStream<S, W>> where
89 C: IntoIterator<Item=QMPCapability>,
90 {
91 let _ = self.stream.execute(qapi_qmp::qmp_capabilities {
92 enable: Some(caps.into_iter().collect()),
93 }).await?;
94
95 Ok(self.stream)
96 }
97
98 pub async fn negotiate(self) -> io::Result<QapiStream<S, W>> {
99 self.negotiate_caps(std::iter::empty()).await
100 }
101}
102
103type QapiCommandMap = BTreeMap<u32, oneshot::Sender<Result<Any, qapi_spec::Error>>>;
104
105pub struct QapiService<W> {
106 shared: Arc<QapiShared>,
107 write: Arc<Mutex<W>>,
108 id_counter: AtomicUsize,
109}
110
111impl<W> QapiService<W> {
112 #[cfg(feature = "tokio")]
113 fn new(write: W, shared: Arc<QapiShared>) -> Self {
114 QapiService {
115 shared,
116 write: Mutex::new(write).into(),
117 id_counter: AtomicUsize::new(0),
118 }
119 }
120
121 fn next_oob_id(&self) -> u32 {
122 self.id_counter.fetch_add(1, Ordering::Relaxed) as _
123 }
124
125 fn command_id(&self) -> Option<u32> {
126 if self.shared.supports_oob {
127 Some(self.next_oob_id())
128 } else {
129 None
130 }
131 }
132
133 fn command_response<C: Command>(receiver: oneshot::Receiver<Result<Any, qapi_spec::Error>>) -> impl Future<Output=ExecuteResult<C>> {
134 receiver.map(|res| match res {
135 Ok(Ok(res)) => C::Ok::deserialize(&res)
136 .map_err(io::Error::from).map_err(From::from),
137 Ok(Err(e)) => Err(e.into()),
138 Err(_cancelled) => Err(io::Error::new(io::ErrorKind::UnexpectedEof, "QAPI stream disconnected").into()),
139 })
140 }
141
142 pub fn execute<C: Command>(&self, command: C) -> impl Future<Output=ExecuteResult<C>> where
143 W: Sink<Execute<C, u32>, Error=io::Error> + Unpin
144 {
145 let id = self.command_id();
146 let sink = self.write.clone();
147 let shared = self.shared.clone();
148 let command = Execute::new(command, id);
149
150 async move {
151 let mut sink = sink.lock().await;
152 let receiver = shared.command_insert(id.unwrap_or_default());
153
154 sink.send(command).await?;
155 if id.is_some() {
156 drop(sink)
158 }
159
160 Self::command_response::<C>(receiver).await
161 }
162 }
163
164 #[cfg(feature = "qapi-qga")]
172 pub fn guest_sync(&self, sync_value: i32) -> impl Future<Output=Result<(), crate::ExecuteError>> where
173 W: Sink<Execute<qapi_qga::guest_sync, u32>, Error=io::Error> + Unpin
174 {
175 let id = sync_value.into();
176 self.execute(qapi_qga::guest_sync {
177 id,
178 }).map(move |res| res.and_then(|res| if res == id {
179 Ok(())
180 } else {
181 Err(io::Error::new(io::ErrorKind::InvalidData, "QGA sync failed").into())
182 }))
183 }
184
185 fn stop(&self) {
186 let mut commands = self.shared.commands.lock().unwrap();
187 if self.shared.abandoned.load(Ordering::Relaxed) {
188 self.shared.stop();
189 }
190 commands.abandoned = true;
191 }
192}
193
194impl<W> Drop for QapiService<W> {
195 fn drop(&mut self) {
196 self.stop();
197 }
198}
199
200#[derive(Default)]
201struct QapiSharedCommands {
202 pending: QapiCommandMap,
203 abandoned: bool,
204}
205
206struct QapiShared {
207 commands: StdMutex<QapiSharedCommands>,
208 stop_waker: AtomicWaker,
209 stop: AtomicBool,
210 abandoned: AtomicBool,
211 supports_oob: bool,
212}
213
214impl QapiShared {
215 #[cfg(feature = "tokio")]
216 fn new(supports_oob: bool) -> Self {
217 Self {
218 commands: Default::default(),
219 stop_waker: Default::default(),
220 stop: Default::default(),
221 abandoned: Default::default(),
222 supports_oob,
223 }
224 }
225
226 fn stop(&self) {
227 self.stop.store(true, Ordering::Relaxed);
228 self.stop_waker.wake();
229 }
230
231 fn is_stopped(&self) -> bool {
232 self.stop.load(Ordering::Relaxed)
233 }
234
235 fn poll_next<T, P: FnOnce(&mut Context) -> Poll<Option<T>>>(&self, cx: &mut Context, poll: P) -> Poll<Option<T>> {
236 if self.is_stopped() {
237 return Poll::Ready(None)
238 }
239
240 match poll(cx) {
242 Poll::Ready(res) => {
243 if res.is_none() {
244 self.stop.store(true, Ordering::Relaxed);
245 }
246 Poll::Ready(res)
247 },
248 Poll::Pending => {
249 self.stop_waker.register(cx.waker());
250 if self.is_stopped() {
251 Poll::Ready(None)
252 } else {
253 Poll::Pending
254 }
255 },
256 }
257 }
258
259 fn command_remove(&self, id: u32) -> Option<oneshot::Sender<Result<Any, qapi_spec::Error>>> {
260 let mut commands = self.commands.lock().unwrap();
261 commands.pending.remove(&id)
262 }
263
264 fn command_insert(&self, id: u32) -> oneshot::Receiver<Result<Any, qapi_spec::Error>> {
265 let (sender, receiver) = oneshot::channel();
266 let mut commands = self.commands.lock().unwrap();
267 if !commands.abandoned {
268 if let Some(_prev) = commands.pending.insert(id, sender) {
270 panic!("QAPI duplicate command id {:?}, this should not happen", id);
271 }
272 }
273 receiver
274 }
275}
276
277#[must_use]
278pub struct QapiEvents<S> {
279 stream: S,
280 shared: Arc<QapiShared>,
281}
282
283impl<S> QapiEvents<S> {
284 pub fn release(&self) -> Result<(), ()> {
285 let commands = self.shared.commands.lock().unwrap();
286 if commands.abandoned {
287 Err(())
288 } else {
289 self.shared.abandoned.store(true, Ordering::Relaxed);
290 Ok(())
291 }
292 }
293
294 pub async fn into_future(self) -> () where
295 Self: Future<Output=io::Result<()>>,
296 {
297 if self.release().is_err() {
298 info!("QAPI service abandoned before spawning");
299 return
300 }
301
302 match self.await {
303 Ok(()) => (),
304 Err(e) =>
305 warn!("QAPI stream closed with error {:?}", e),
306 }
307 }
308
309 pub fn spawn<SP: futures::task::Spawn>(self, spawn: SP) -> Result<(), futures::task::SpawnError> where
310 Self: Future<Output=io::Result<()>> + Send + 'static,
311 S: 'static
312 {
313 use futures::task::SpawnExt;
314
315 spawn.spawn(self.into_future())
316 }
317
318 #[cfg(feature = "async-tokio-spawn")]
319 pub fn spawn_tokio(self) -> ::tokio::task::JoinHandle<()> where
320 Self: Future<Output=io::Result<()>> + Send + 'static,
321 S: 'static
322 {
323 ::tokio::spawn(self.into_future())
324 }
325}
326
327impl<S> Drop for QapiEvents<S> {
328 fn drop(&mut self) {
329 let mut commands = self.shared.commands.lock().unwrap();
330 commands.pending.clear();
331 commands.abandoned = true;
332 }
333}
334
335fn response_id<T>(res: &Response<T>, supports_oob: bool) -> io::Result<u32> {
336 match (res.id().and_then(|id| id.as_u64()), supports_oob) {
337 (Some(id), true) =>
338 id.try_into().map_err(|e|
339 io::Error::new(io::ErrorKind::InvalidData, e)
340 ),
341 (None, false) =>
342 Ok(Default::default()),
343 (None, true) =>
344 Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response with numeric ID, got {:?}", res.id()))),
345 (Some(..), false) =>
346 Err(io::Error::new(io::ErrorKind::InvalidData, format!("QAPI expected response without ID, got {:?}", res.id()))),
347 }
348}
349
350fn handle_response(shared: &QapiShared, res: Response<Any>) -> io::Result<()> {
351 let id = response_id(&res, shared.supports_oob)?;
352
353 if let Some(sender) = shared.command_remove(id) {
354 sender.send(res.result()).map_err(|_e|
355 io::Error::new(io::ErrorKind::InvalidData, format!("failed to send response for ID {:?}", id))
356 )
357 } else {
358 Err(io::Error::new(io::ErrorKind::InvalidData, format!("unknown QAPI response with ID {:?}", res.id())))
359 }
360}
361
362impl<M, S> Future for QapiEvents<S> where
363 S: Stream<Item=io::Result<M>>,
364 M: TryInto<Response<Any>>,
365{
366 type Output = io::Result<()>;
367
368 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
369 let this = unsafe { self.get_unchecked_mut() };
370 let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
371 let shared = &this.shared;
372
373 shared.poll_next(cx, |cx| Poll::Ready(Some(match futures::ready!(stream.poll_next(cx)) {
374 None => return Poll::Ready(None),
375 Some(Err(e)) => Err(e),
376 Some(Ok(res)) => match res.try_into() {
377 Ok(res) => match handle_response(shared, res) {
378 Err(e) => Err(e),
379 Ok(()) => {
380 cx.waker().wake_by_ref(); return Poll::Pending
382 },
383 },
384 Err(..) => {
385 trace!("Ignoring QAPI event");
386 cx.waker().wake_by_ref(); return Poll::Pending
388 },
389 },
390 }))).map(|res| res.unwrap_or(Ok(())))
391 }
392}
393
394#[cfg(feature = "qapi-qmp")]
395impl<S: Stream<Item=io::Result<QmpMessageAny>>> Stream for QapiEvents<S> {
396 type Item = io::Result<qapi_qmp::Event>;
397
398 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
399 let this = unsafe { self.get_unchecked_mut() };
400 let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
401 let shared = &this.shared;
402
403 shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) {
404 None => None, Some(Err(e)) => Some(Err(e)),
406 Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)),
407 Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) {
408 Err(e) => Some(Err(e)),
409 Ok(()) => {
410 cx.waker().wake_by_ref(); return Poll::Pending
412 },
413 },
414 }))
415 }
416}