qapi/
lib.rs

1#![doc(html_root_url = "https://docs.rs/qapi/0.15.0")]
2
3#[cfg(feature = "qapi-qmp")]
4pub use qapi_qmp as qmp;
5
6#[cfg(feature = "qapi-qga")]
7pub use qapi_qga as qga;
8
9pub use qapi_spec::{Any, Dictionary, Empty, Never, Execute, ExecuteOob, Command, CommandResult, Event, Enum, Error, ErrorClass, Timestamp};
10
11pub use self::stream::Stream;
12
13#[cfg(feature = "qapi-qmp")]
14pub use self::qmp_impl::*;
15
16#[cfg(feature = "qapi-qga")]
17pub use self::qga_impl::*;
18
19use std::{error, fmt, io};
20
21#[cfg(feature = "async")]
22pub mod futures;
23
24#[derive(Debug)]
25pub enum ExecuteError {
26    Qapi(Error),
27    Io(io::Error),
28}
29
30pub type ExecuteResult<C> = Result<<C as Command>::Ok, ExecuteError>;
31
32impl fmt::Display for ExecuteError {
33    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
34        match self {
35            ExecuteError::Qapi(e) => fmt::Display::fmt(e, f),
36            ExecuteError::Io(e) => fmt::Display::fmt(e, f),
37        }
38    }
39}
40
41impl error::Error for ExecuteError {
42    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
43        match self {
44            ExecuteError::Qapi(e) => Some(e),
45            ExecuteError::Io(e) => Some(e),
46        }
47    }
48}
49
50impl From<io::Error> for ExecuteError {
51    fn from(e: io::Error) -> Self {
52        ExecuteError::Io(e)
53    }
54}
55
56impl From<Error> for ExecuteError {
57    fn from(e: Error) -> Self {
58        ExecuteError::Qapi(e)
59    }
60}
61
62impl From<ExecuteError> for io::Error {
63    fn from(e: ExecuteError) -> Self {
64        match e {
65            ExecuteError::Qapi(e) => e.into(),
66            ExecuteError::Io(e) => e,
67        }
68    }
69}
70
71#[cfg(any(feature = "qapi-qmp", feature = "qapi-qga"))]
72mod qapi {
73    use serde_json;
74    use serde::{Serialize, Deserialize};
75    use std::io::{self, BufRead, Write};
76    use crate::{Command, Execute};
77    use log::trace;
78
79    pub struct Qapi<S> {
80        pub stream: S,
81        pub buffer: Vec<u8>,
82    }
83
84    impl<S> Qapi<S> {
85        pub fn new(s: S) -> Self {
86            Qapi {
87                stream: s,
88                buffer: Default::default(),
89            }
90        }
91    }
92
93    impl<S: BufRead> Qapi<S> {
94        pub fn decode_line<'de, D: Deserialize<'de>>(&'de mut self) -> io::Result<Option<D>> {
95            self.buffer.clear();
96            let line = self.stream.read_until(b'\n', &mut self.buffer)?;
97            let line = &self.buffer[..line];
98            trace!("<- {}", String::from_utf8_lossy(line));
99
100            if line.is_empty() {
101                Ok(None)
102            } else {
103                serde_json::from_slice(line).map(Some).map_err(From::from)
104            }
105        }
106    }
107
108    impl<S: Write> Qapi<S> {
109        pub fn encode_line<C: Serialize>(&mut self, command: &C) -> io::Result<()> {
110            {
111                let mut ser = serde_json::Serializer::new(&mut self.stream);
112                command.serialize(&mut ser)?;
113            }
114
115            self.stream.write(&[b'\n'])?;
116
117            self.stream.flush()
118        }
119
120        pub fn write_command<C: Command>(&mut self, command: &C) -> io::Result<()> {
121            self.encode_line(&Execute::<&C>::from(command))?;
122
123            trace!("-> execute {}: {}", C::NAME, serde_json::to_string_pretty(command).unwrap());
124
125            Ok(())
126        }
127    }
128}
129
130mod stream {
131    use std::io::{Read, Write, BufRead, Result};
132
133    pub struct Stream<R, W> {
134        r: R,
135        w: W,
136    }
137
138    impl<R, W> Stream<R, W> {
139        pub fn new(r: R, w: W) -> Self {
140            Stream {
141                r,
142                w,
143            }
144        }
145
146        pub fn into_inner(self) -> (R, W) {
147            (self.r, self.w)
148        }
149
150        pub fn get_ref_read(&self) -> &R { &self.r }
151        pub fn get_mut_read(&mut self) -> &mut R { &mut self.r }
152        pub fn get_ref_write(&self) -> &W { &self.w }
153        pub fn get_mut_write(&mut self) -> &mut W { &mut self.w }
154    }
155
156    impl<R: Read, W> Read for Stream<R, W> {
157        fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
158            self.r.read(buf)
159        }
160    }
161
162    impl<R: BufRead, W> BufRead for Stream<R, W> {
163        fn fill_buf(&mut self) -> Result<&[u8]> {
164            self.r.fill_buf()
165        }
166
167        fn consume(&mut self, amt: usize) {
168            self.r.consume(amt)
169        }
170    }
171
172    impl<R, W: Write> Write for Stream<R, W> {
173        fn write(&mut self, buf: &[u8]) -> Result<usize> {
174            self.w.write(buf)
175        }
176
177        fn flush(&mut self) -> Result<()> {
178            self.w.flush()
179        }
180    }
181}
182
183#[cfg(feature = "qapi-qmp")]
184mod qmp_impl {
185    use std::io::{self, BufRead, Read, Write, BufReader};
186    use std::vec::Drain;
187    use qapi_qmp::{QMP, QapiCapabilities, QmpMessage, Event, qmp_capabilities, query_version};
188    use crate::{qapi::Qapi, Stream, ExecuteResult, ExecuteError, Command};
189
190    pub struct Qmp<S> {
191        inner: Qapi<S>,
192        event_queue: Vec<Event>,
193    }
194
195    impl<S: Read + Write + Clone> Qmp<Stream<BufReader<S>, S>> {
196        pub fn from_stream(s: S) -> Self {
197            Self::new(Stream::new(BufReader::new(s.clone()), s))
198        }
199    }
200
201    impl<S> Qmp<S> {
202        pub fn new(stream: S) -> Self {
203            Qmp {
204                inner: Qapi::new(stream),
205                event_queue: Default::default(),
206            }
207        }
208
209        pub fn into_inner(self) -> S {
210            self.inner.stream
211        }
212
213        pub fn inner(&self) -> &S {
214            &self.inner.stream
215        }
216
217        pub fn inner_mut(&mut self) -> &mut S {
218            &mut self.inner.stream
219        }
220
221        pub fn events(&mut self) -> Drain<Event> {
222            self.event_queue.drain(..)
223        }
224    }
225
226    impl<S: BufRead> Qmp<S> {
227        pub fn read_capabilities(&mut self) -> io::Result<QMP> {
228            self.inner.decode_line().map(|v: Option<QapiCapabilities>|
229                v.expect("unexpected eof").QMP
230            )
231        }
232
233        pub fn read_response<C: Command>(&mut self) -> ExecuteResult<C> {
234            loop {
235                match self.inner.decode_line()? {
236                    None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response").into()),
237                    Some(QmpMessage::Response(res)) => return res.result().map_err(From::from),
238                    Some(QmpMessage::Event(e)) => self.event_queue.push(e),
239                }
240            }
241        }
242    }
243
244    impl<S: BufRead + Write> Qmp<S> {
245        pub fn write_command<C: Command>(&mut self, command: &C) -> io::Result<()> {
246            self.inner.write_command(command)
247        }
248
249        pub fn execute<C: Command>(&mut self, command: &C) -> ExecuteResult<C> {
250            self.write_command(command)?;
251            self.read_response::<C>()
252        }
253
254        pub fn handshake(&mut self) -> Result<QMP, ExecuteError> {
255            let caps = self.read_capabilities()?;
256            self.execute(&qmp_capabilities { enable: None })
257                .map(|_| caps)
258        }
259
260        /// Can be used to poll the socket for pending events
261        pub fn nop(&mut self) -> io::Result<()> {
262            self.execute(&query_version { })
263                .map_err(From::from)
264                .map(drop)
265        }
266    }
267}
268
269#[cfg(feature = "qapi-qga")]
270mod qga_impl {
271    use std::io::{self, BufRead, Read, Write, BufReader};
272    use qapi_qga::guest_sync;
273    use qapi_spec::Response;
274    use crate::{qapi::Qapi, Stream, Command, ExecuteResult, ExecuteError};
275
276    pub struct Qga<S> {
277        inner: Qapi<S>,
278    }
279
280    impl<S: Read + Write + Clone> Qga<Stream<BufReader<S>, S>> {
281        pub fn from_stream(s: S) -> Self {
282            Self::new(Stream::new(BufReader::new(s.clone()), s))
283        }
284    }
285
286    impl<S> Qga<S> {
287        pub fn new(stream: S) -> Self {
288            Qga {
289                inner: Qapi::new(stream),
290            }
291        }
292
293        pub fn into_inner(self) -> S {
294            self.inner.stream
295        }
296
297        pub fn inner(&self) -> &S {
298            &self.inner.stream
299        }
300
301        pub fn inner_mut(&mut self) -> &mut S {
302            &mut self.inner.stream
303        }
304    }
305
306    impl<S: BufRead> Qga<S> {
307        pub fn read_response<C: Command>(&mut self) -> ExecuteResult<C> {
308            loop {
309                match self.inner.decode_line()?.map(|r: Response<_>| r.result()) {
310                    None => return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "expected command response").into()),
311                    Some(Ok(res)) => return Ok(res),
312                    Some(Err(e)) => return Err(e.into()),
313                }
314            }
315        }
316    }
317
318    impl<S: BufRead + Write> Qga<S> {
319        pub fn write_command<C: Command>(&mut self, command: &C) -> io::Result<()> {
320            self.inner.write_command(command)
321        }
322
323        pub fn execute<C: Command>(&mut self, command: &C) -> ExecuteResult<C> {
324            self.write_command(command)?;
325            self.read_response::<C>()
326        }
327
328        pub fn guest_sync(&mut self, sync_value: i32) -> Result<(), ExecuteError> {
329            let id = sync_value.into();
330            let sync = guest_sync {
331                id,
332            };
333
334            match self.execute(&sync) {
335                Ok(r) if r == sync.id => Ok(()),
336                Ok(..) => Err(io::Error::new(io::ErrorKind::InvalidData, "guest-sync handshake failed").into()),
337                Err(e) => Err(e.into()),
338            }
339        }
340    }
341}