libvirt_rpc/
proto.rs

1use std::io::Cursor;
2use std::collections::HashMap;
3use ::xdr_codec::{Pack,Unpack};
4use ::bytes::{BufMut, BytesMut};
5use ::tokio_io::codec;
6use ::tokio_io::{AsyncRead, AsyncWrite};
7use ::tokio_io::codec::length_delimited;
8use ::tokio_proto::multiplex::{self, RequestId};
9use ::request;
10use ::futures::{Stream, Sink, Poll, StartSend};
11use ::futures::sync::mpsc::{Sender,Receiver};
12
13struct LibvirtCodec;
14
15#[derive(Debug)]
16pub struct LibvirtRequest {
17    pub stream: Option<Sender<LibvirtResponse>>,
18    pub sink: Option<Receiver<BytesMut>>,
19    pub event: Option<request::remote_procedure>,
20    pub header: request::virNetMessageHeader,
21    pub payload: BytesMut,
22}
23
24#[derive(Debug,Clone)]
25pub struct LibvirtResponse {
26    pub header: request::virNetMessageHeader,
27    pub payload: BytesMut,
28}
29
30impl codec::Encoder for LibvirtCodec {
31    type Item = (RequestId, LibvirtRequest);
32    type Error = ::std::io::Error;
33
34    fn encode(&mut self, msg: (RequestId, LibvirtRequest), buf: &mut BytesMut) -> Result<(), Self::Error> {
35        use ::std::io::ErrorKind;
36        let mut req = msg.1;
37        let buf = {
38            let mut writer = buf.writer();
39            req.header.serial = msg.0 as u32;
40            try!(req.header.pack(&mut writer).map_err(|e| ::std::io::Error::new(ErrorKind::InvalidInput, e.to_string())));
41            writer.into_inner()
42        };
43        buf.reserve(req.payload.len());
44        buf.put(req.payload);
45        Ok(())
46    }
47}
48
49impl codec::Decoder for LibvirtCodec {
50    type Item = (RequestId, LibvirtResponse);
51    type Error = ::std::io::Error;
52
53    fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
54        use ::std::io::ErrorKind;
55        let (header, hlen, buf) = {
56            let mut reader = Cursor::new(buf);
57            let (header, hlen) = try!(request::virNetMessageHeader::unpack(&mut reader)
58                                        .map_err(|e| ::std::io::Error::new(ErrorKind::InvalidInput, e.to_string())));
59            (header, hlen, reader.into_inner())
60        };
61        let payload = buf.split_off(hlen);
62        Ok(Some((header.serial as RequestId, LibvirtResponse {
63            header: header,
64            payload: payload,
65        })))
66    }
67}
68
69fn framed_delimited<T, C>(framed: length_delimited::Framed<T>, codec: C) -> FramedTransport<T, C>
70    where T: AsyncRead + AsyncWrite, C: codec::Encoder + codec::Decoder
71 {
72    FramedTransport{ inner: framed, codec: codec }
73}
74
75struct FramedTransport<T, C> where T: AsyncRead + AsyncWrite + 'static {
76    inner: length_delimited::Framed<T>,
77    codec: C,
78}
79
80impl<T, C> Stream for FramedTransport<T, C> where
81                T: AsyncRead + AsyncWrite, C: codec::Decoder,
82                ::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Decoder>::Error> {
83    type Item = <C as codec::Decoder>::Item;
84    type Error = <C as codec::Decoder>::Error;
85
86    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
87        use futures::Async;
88        let codec = &mut self.codec;
89        self.inner.poll().and_then(|async| {
90            match async {
91                Async::Ready(Some(mut buf)) => {
92                    let pkt = try!(codec.decode(&mut buf));
93                    Ok(Async::Ready(pkt))
94                },
95                Async::Ready(None) => {
96                    Ok(Async::Ready(None))
97                },
98                Async::NotReady => {
99                    Ok(Async::NotReady)
100                }
101            }
102        }).map_err(|e| e.into())
103    }
104}
105
106impl<T, C> Sink for FramedTransport<T, C> where
107        T: AsyncRead + AsyncWrite + 'static,
108        C: codec::Encoder + codec::Decoder,
109        ::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Encoder>::Error> {
110    type SinkItem = <C as codec::Encoder>::Item;
111    type SinkError = <C as codec::Encoder>::Error;
112
113    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
114        use futures::{Async,AsyncSink};
115
116        if let Ok(Async::NotReady) = self.poll_complete() {
117            return Ok(AsyncSink::NotReady(item))
118        }
119
120        let codec = &mut self.codec;
121        let mut buf = BytesMut::with_capacity(64);
122        try!(codec.encode(item, &mut buf));
123        assert!(try!(self.inner.start_send(buf)).is_ready());
124        Ok(AsyncSink::Ready)
125    }
126
127    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
128        self.inner.poll_complete().map_err(|e| e.into())
129    }
130
131    fn close(&mut self) -> Poll<(), Self::SinkError> {
132        try_ready!(self.poll_complete().map_err(|e| e.into()));
133        self.inner.close().map_err(|e| e.into())
134    }
135}
136
137pub struct LibvirtTransport<T> where T: AsyncRead + AsyncWrite + 'static {
138    /* store here if underlying transport is not ready */
139    buffer: Option<(RequestId, LibvirtRequest)>,
140    inner: FramedTransport<T, LibvirtCodec>,
141    /* procedure -> event stream */
142    events: HashMap<u16, ::futures::sync::mpsc::Sender<LibvirtResponse>>,
143    /* req.id -> stream */
144    streams: HashMap<u64, ::futures::sync::mpsc::Sender<LibvirtResponse>>,
145    /* req.id -> (stream, procedure) */
146    sinks: HashMap<u64, (::futures::sync::mpsc::Receiver<BytesMut>, i32, bool)>,
147}
148
149impl<T> LibvirtTransport<T> where T: AsyncRead + AsyncWrite + 'static {
150    fn is_event(&self, procedure: request::generated::remote_procedure) -> bool {
151        if request::DomainEventId::from_procedure(procedure).is_some() {
152            return true;
153        }
154        debug!("not event: procedure {:?}", procedure);
155        false
156    }
157
158    fn poll_sinks(&mut self) -> Poll<Option<(RequestId, LibvirtRequest)>, <LibvirtSink as Sink>::SinkError> {
159        use futures::Async;
160        let mut result = Ok(Async::NotReady);
161
162        debug!("POLL SINKS");
163        for (req_id, &mut (ref mut sink, proc_, ref mut complete)) in self.sinks.iter_mut() {
164            debug!("Processing sink {} proc: {} complete: {}", req_id, proc_, complete);
165            match sink.poll() {
166                Ok(Async::Ready(Some(buf))) => {
167                    let req = LibvirtRequest {
168                                stream: None,
169                                sink: None,
170                                event: None,
171                                header: request::virNetMessageHeader {
172                                    type_: ::request::generated::virNetMessageType::VIR_NET_STREAM,
173                                    status: request::virNetMessageStatus::VIR_NET_CONTINUE,
174                                    proc_: proc_,
175                                    ..Default::default()
176                                },
177                                payload: buf,
178                    };
179                    return Ok(Async::Ready(Some((*req_id, req))));
180                }
181
182                Ok(Async::Ready(None)) => {
183                    if *complete {
184                        /* skip completed sinks */
185                        continue;
186                    }
187                    let req = LibvirtRequest {
188                        stream: None,
189                        sink: None,
190                        event: None,
191                        header: request::virNetMessageHeader {
192                            type_: ::request::generated::virNetMessageType::VIR_NET_STREAM,
193                            status: request::virNetMessageStatus::VIR_NET_OK,
194                            proc_: proc_,
195                            ..Default::default()
196                        },
197                        payload: BytesMut::new(),
198                    };
199                    debug!("Empty sink {}, sending empty msg", req_id);
200                    *complete = true;
201                    result = Ok(Async::Ready(Some((*req_id, req))));
202                    break;
203                }
204
205                Ok(Async::NotReady) => {
206                    /* try next */
207                }
208
209                Err(e) => {
210                    error!("Error in sink {}: {:?}", req_id, e);
211                }
212            }
213        }
214
215        result
216    }
217
218    fn process_event(&mut self, resp: LibvirtResponse) {
219        let proc_ = resp.header.proc_ as u16;
220
221        if let Some(ref mut stream) = self.events.get_mut(&proc_) {
222            debug!("Event: found event stream for proc {}", proc_);
223            let sender = stream;
224            let _ = sender.start_send(resp);
225            let _ = sender.poll_complete();
226            return
227        }
228        debug!("Event: can't find event stream id for proc {}", proc_);
229    }
230
231    fn process_stream(&mut self, resp: LibvirtResponse) {
232        debug!("incoming stream: {:?}", resp.header);
233        {
234            let req_id = resp.header.serial as u64;
235            let mut remove_stream = false;
236
237            if let Some(ref mut stream) = self.streams.get_mut(&req_id) {
238                debug!("found stream for request id {}: {:?}", req_id, resp.header);
239                let sender = stream;
240                if resp.payload.len() != 0 {
241                    if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
242                        debug!("got error from stream, should drop sink");
243                        self.sinks.remove(&req_id);
244                    }
245                    let _ = sender.start_send(resp);
246                    let _ = sender.poll_complete();
247                } else {
248                    debug!("closing stream {}", req_id);
249
250                    debug!("got something from stream, should drop sink!");
251                    self.sinks.remove(&req_id);
252
253                    let _ = sender.start_send(resp);
254                    let _ = sender.close();
255                    let _ = sender.poll_complete();
256                    remove_stream = true;
257                }
258            } else {
259                error!("can't find stream for request id {}: {:?}", req_id, resp.header);
260                if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
261                    let mut reader = Cursor::new(resp.payload);
262                    let (err, _) = request::virNetMessageError::unpack(&mut reader).unwrap();
263                    println!("ERROR: {:?}", err);
264                }
265            }
266            if remove_stream {
267                debug!("Droppping stream ID {}", req_id);
268                self.streams.remove(&req_id);
269            }
270        }
271    }
272}
273
274impl<T> Stream for LibvirtTransport<T> where
275    T: AsyncRead + AsyncWrite + 'static,
276 {
277    type Item = (RequestId, LibvirtResponse);
278    type Error = ::std::io::Error;
279
280    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
281        use futures::Async;
282
283        debug!("POLL CALLED");
284
285        match self.inner.poll() {
286            Ok(async) => {
287                match async {
288                Async::Ready(Some((id, resp))) => {
289                    debug!("FRAME READY ID: {} RESP: {:?}", id, resp);
290
291                    let procedure = unsafe { ::std::mem::transmute(resp.header.proc_ as u16) };
292                    if self.is_event(procedure) {
293                        debug!("event received!");
294                        self.process_event(resp);
295                        debug!("processed event msg, get next packet");
296                        return self.poll();
297                    }
298
299                    if resp.header.type_ == request::generated::virNetMessageType::VIR_NET_STREAM {
300                        self.process_stream(resp);
301                        debug!("processed stream msg, get next packet");
302                        return self.poll();
303                    }
304
305                    return Ok(Async::Ready(Some((id, resp))));
306                },
307                _ => debug!("{:?}", async),
308                }
309                debug!("RETURNING {:?}", async);
310                Ok(async)
311            },
312            Err(e) => Err(e),
313        }
314    }
315}
316
317impl<T> Sink for LibvirtTransport<T> where
318    T: AsyncRead + AsyncWrite + 'static,
319 {
320    type SinkItem = (RequestId, LibvirtRequest);
321    type SinkError = ::std::io::Error;
322
323    fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
324        use ::std::mem;
325        use futures::{AsyncSink};
326
327        /*
328        if self.buffer.is_some() {
329            debug!("Found something in sink_buffer: NOT READY");
330            return Ok(AsyncSink::NotReady(item));
331        }
332        */
333
334        if let Some(event_proc) = mem::replace(&mut item.1.event, None) {
335            debug!("Sending event request {:?}", event_proc);
336            if let Some(stream) = mem::replace(&mut item.1.stream, None) {
337                self.events.insert(event_proc as u16, stream);
338            }
339        }
340
341        if let Some(stream) = mem::replace(&mut item.1.stream, None) {
342            debug!("SENDING REQ ID = {} {:?} WITH STREAM", item.0, item.1.header);
343            self.streams.insert(item.0, stream);
344        }
345
346        let mut new_sink = false;
347        if let Some(sink) = mem::replace(&mut item.1.sink, None) {
348            debug!("SENDING REQ ID = {} {:?} WITH SINK", item.0, item.1.header);
349            {
350                self.sinks.insert(item.0, (sink, item.1.header.proc_, false));
351                new_sink = true;
352            }
353        }
354
355        {
356            debug!("Have {} sinks", self.sinks.len());
357            if !new_sink && self.sinks.len() > 0 {
358                return Ok(AsyncSink::NotReady(item));
359            }
360        }
361        self.inner.start_send(item)
362    }
363
364    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
365        use futures::{Async,AsyncSink};
366        use std::mem;
367        debug!("POLL COMPLETE CALLED");
368
369        if let Some(req) = mem::replace(&mut self.buffer, None) {
370            debug!("Sending buffered msg");
371            match try!(self.inner.start_send(req)) {
372                AsyncSink::NotReady(item) => {
373                    debug!("Inner not ready, putting buffered msg back");
374                    mem::replace(&mut self.buffer, Some(item));
375                    return self.inner.poll_complete();
376                },
377                AsyncSink::Ready => {
378                    debug!("Sent buffered msg");
379                },
380            }
381        }
382
383        loop {
384            match self.poll_sinks() {
385                Ok(Async::Ready(Some(req))) => {
386                    debug!("SEND: some sinks are ready, try processing");
387
388                    debug!("Sending sink msg: {} {:?} pl: {}", req.0, req.1.header, req.1.payload.len());
389                    match try!(self.inner.start_send(req)) {
390                        AsyncSink::NotReady(item) => {
391                            debug!("Inner not ready, putting sink msg back");
392                            mem::replace(&mut self.buffer, Some(item));
393                            return self.inner.poll_complete();
394                        },
395                        AsyncSink::Ready => {
396                            debug!("Sent sink msg");
397                        },
398                    }
399                }
400                ret => {
401                    debug!("POLL_SINKS: {:?}", ret);
402                    break;
403                }
404            }
405        }
406
407        self.inner.poll_complete()
408    }
409
410    fn close(&mut self) -> Poll<(), Self::SinkError> {
411        self.inner.close()
412    }
413}
414
415#[derive(Debug, Clone)]
416pub struct LibvirtProto;
417
418impl<T> multiplex::ClientProto<T> for LibvirtProto where T: AsyncRead + AsyncWrite + 'static {
419    type Request = LibvirtRequest;
420    type Response = LibvirtResponse;
421    type Transport = LibvirtTransport<T>;
422    type BindTransport = Result<Self::Transport, ::std::io::Error>;
423
424    fn bind_transport(&self, io: T) -> Self::BindTransport {
425        let framed = length_delimited::Builder::new()
426                        .big_endian()
427                        .length_field_offset(0)
428                        .length_field_length(4)
429                        .length_adjustment(-4)
430                        .new_framed(io);
431        Ok(LibvirtTransport{ 
432            buffer: None,
433            inner: framed_delimited(framed, LibvirtCodec),
434            events: HashMap::new(),
435            streams: HashMap::new(),
436            sinks: HashMap::new(),
437        })
438    }
439}
440
441pub struct EventStream<E> where E: request::DomainEvent {
442    inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
443    handle_resp: fn(LibvirtResponse) -> Result<<E as request::DomainEvent>::From, ::LibvirtError>,
444}
445
446impl<E> EventStream<E> where E: request::DomainEvent {
447    pub fn new(inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
448           handler: fn(LibvirtResponse) -> Result<<E as request::DomainEvent>::From, ::LibvirtError>) -> Self {
449               EventStream { inner: inner, handle_resp: handler }
450    }
451
452}
453
454impl<E> Stream for EventStream<E> where E: ::std::fmt::Debug + request::DomainEvent {
455    type Item = E;
456    type Error = ::LibvirtError;
457
458    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
459        use futures::Async;
460        match self.inner.poll() {
461            Ok(Async::Ready(Some(resp))) => {
462                match (self.handle_resp)(resp) {
463                    Ok(msg) => {
464                        let msg = msg.into();
465                        debug!("EVENT (CALLBACK) {:?}", msg);
466                        Ok(Async::Ready(Some(msg)))
467                    },
468                    Err(e) => return Err(e),
469                }
470            },
471            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
472            Ok(Async::NotReady) => Ok(Async::NotReady),
473            Err(e) => panic!(e),
474        }
475    }
476}
477
478pub struct LibvirtStream {
479    inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
480}
481
482impl From<::futures::sync::mpsc::Receiver<LibvirtResponse>> for LibvirtStream {
483    fn from(f: ::futures::sync::mpsc::Receiver<LibvirtResponse>) -> Self {
484        LibvirtStream{ inner: f }
485    }
486}
487
488impl Stream for LibvirtStream {
489    type Item = BytesMut;
490    type Error = ::LibvirtError;
491
492    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
493        use futures::Async;
494        match self.inner.poll() {
495            Ok(Async::Ready(Some(resp))) => {
496                if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
497                    let mut reader = Cursor::new(resp.payload);
498                    let (err, _) = request::virNetMessageError::unpack(&mut reader).unwrap();
499                    return Err(::LibvirtError::from(err));
500                }
501                Ok(Async::Ready(Some(resp.payload)))
502            },
503            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
504            Ok(Async::NotReady) => Ok(Async::NotReady),
505            Err(e) => panic!("LibvirtStream: unexpected error from mpsc::receiver: {:?}", e),
506        }
507    }
508}
509
510pub struct LibvirtSink {
511    pub inner: ::futures::sync::mpsc::Sender<BytesMut>,
512}
513
514impl Sink for LibvirtSink {
515    type SinkItem = BytesMut;
516    type SinkError = ::futures::sync::mpsc::SendError<Self::SinkItem>;
517
518    fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
519        self.inner.start_send(item)
520    }
521
522    fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
523        self.inner.poll_complete()
524    }
525
526    fn close(&mut self) -> Poll<(), Self::SinkError> {
527        self.inner.close()
528    }
529}
530
531impl Drop for LibvirtSink {
532    fn drop(&mut self) {
533        debug!("LibvirtSink dropping");
534        let _ = self.close();
535        let _ = self.poll_complete();
536    }
537}