1use std::io::Cursor;
2use std::collections::HashMap;
3use ::xdr_codec::{Pack,Unpack};
4use ::bytes::{BufMut, BytesMut};
5use ::tokio_io::codec;
6use ::tokio_io::{AsyncRead, AsyncWrite};
7use ::tokio_io::codec::length_delimited;
8use ::tokio_proto::multiplex::{self, RequestId};
9use ::request;
10use ::futures::{Stream, Sink, Poll, StartSend};
11use ::futures::sync::mpsc::{Sender,Receiver};
12
13struct LibvirtCodec;
14
15#[derive(Debug)]
16pub struct LibvirtRequest {
17 pub stream: Option<Sender<LibvirtResponse>>,
18 pub sink: Option<Receiver<BytesMut>>,
19 pub event: Option<request::remote_procedure>,
20 pub header: request::virNetMessageHeader,
21 pub payload: BytesMut,
22}
23
24#[derive(Debug,Clone)]
25pub struct LibvirtResponse {
26 pub header: request::virNetMessageHeader,
27 pub payload: BytesMut,
28}
29
30impl codec::Encoder for LibvirtCodec {
31 type Item = (RequestId, LibvirtRequest);
32 type Error = ::std::io::Error;
33
34 fn encode(&mut self, msg: (RequestId, LibvirtRequest), buf: &mut BytesMut) -> Result<(), Self::Error> {
35 use ::std::io::ErrorKind;
36 let mut req = msg.1;
37 let buf = {
38 let mut writer = buf.writer();
39 req.header.serial = msg.0 as u32;
40 try!(req.header.pack(&mut writer).map_err(|e| ::std::io::Error::new(ErrorKind::InvalidInput, e.to_string())));
41 writer.into_inner()
42 };
43 buf.reserve(req.payload.len());
44 buf.put(req.payload);
45 Ok(())
46 }
47}
48
49impl codec::Decoder for LibvirtCodec {
50 type Item = (RequestId, LibvirtResponse);
51 type Error = ::std::io::Error;
52
53 fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
54 use ::std::io::ErrorKind;
55 let (header, hlen, buf) = {
56 let mut reader = Cursor::new(buf);
57 let (header, hlen) = try!(request::virNetMessageHeader::unpack(&mut reader)
58 .map_err(|e| ::std::io::Error::new(ErrorKind::InvalidInput, e.to_string())));
59 (header, hlen, reader.into_inner())
60 };
61 let payload = buf.split_off(hlen);
62 Ok(Some((header.serial as RequestId, LibvirtResponse {
63 header: header,
64 payload: payload,
65 })))
66 }
67}
68
69fn framed_delimited<T, C>(framed: length_delimited::Framed<T>, codec: C) -> FramedTransport<T, C>
70 where T: AsyncRead + AsyncWrite, C: codec::Encoder + codec::Decoder
71 {
72 FramedTransport{ inner: framed, codec: codec }
73}
74
75struct FramedTransport<T, C> where T: AsyncRead + AsyncWrite + 'static {
76 inner: length_delimited::Framed<T>,
77 codec: C,
78}
79
80impl<T, C> Stream for FramedTransport<T, C> where
81 T: AsyncRead + AsyncWrite, C: codec::Decoder,
82 ::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Decoder>::Error> {
83 type Item = <C as codec::Decoder>::Item;
84 type Error = <C as codec::Decoder>::Error;
85
86 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
87 use futures::Async;
88 let codec = &mut self.codec;
89 self.inner.poll().and_then(|async| {
90 match async {
91 Async::Ready(Some(mut buf)) => {
92 let pkt = try!(codec.decode(&mut buf));
93 Ok(Async::Ready(pkt))
94 },
95 Async::Ready(None) => {
96 Ok(Async::Ready(None))
97 },
98 Async::NotReady => {
99 Ok(Async::NotReady)
100 }
101 }
102 }).map_err(|e| e.into())
103 }
104}
105
106impl<T, C> Sink for FramedTransport<T, C> where
107 T: AsyncRead + AsyncWrite + 'static,
108 C: codec::Encoder + codec::Decoder,
109 ::std::io::Error: ::std::convert::From<<C as ::tokio_io::codec::Encoder>::Error> {
110 type SinkItem = <C as codec::Encoder>::Item;
111 type SinkError = <C as codec::Encoder>::Error;
112
113 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
114 use futures::{Async,AsyncSink};
115
116 if let Ok(Async::NotReady) = self.poll_complete() {
117 return Ok(AsyncSink::NotReady(item))
118 }
119
120 let codec = &mut self.codec;
121 let mut buf = BytesMut::with_capacity(64);
122 try!(codec.encode(item, &mut buf));
123 assert!(try!(self.inner.start_send(buf)).is_ready());
124 Ok(AsyncSink::Ready)
125 }
126
127 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
128 self.inner.poll_complete().map_err(|e| e.into())
129 }
130
131 fn close(&mut self) -> Poll<(), Self::SinkError> {
132 try_ready!(self.poll_complete().map_err(|e| e.into()));
133 self.inner.close().map_err(|e| e.into())
134 }
135}
136
137pub struct LibvirtTransport<T> where T: AsyncRead + AsyncWrite + 'static {
138 buffer: Option<(RequestId, LibvirtRequest)>,
140 inner: FramedTransport<T, LibvirtCodec>,
141 events: HashMap<u16, ::futures::sync::mpsc::Sender<LibvirtResponse>>,
143 streams: HashMap<u64, ::futures::sync::mpsc::Sender<LibvirtResponse>>,
145 sinks: HashMap<u64, (::futures::sync::mpsc::Receiver<BytesMut>, i32, bool)>,
147}
148
149impl<T> LibvirtTransport<T> where T: AsyncRead + AsyncWrite + 'static {
150 fn is_event(&self, procedure: request::generated::remote_procedure) -> bool {
151 if request::DomainEventId::from_procedure(procedure).is_some() {
152 return true;
153 }
154 debug!("not event: procedure {:?}", procedure);
155 false
156 }
157
158 fn poll_sinks(&mut self) -> Poll<Option<(RequestId, LibvirtRequest)>, <LibvirtSink as Sink>::SinkError> {
159 use futures::Async;
160 let mut result = Ok(Async::NotReady);
161
162 debug!("POLL SINKS");
163 for (req_id, &mut (ref mut sink, proc_, ref mut complete)) in self.sinks.iter_mut() {
164 debug!("Processing sink {} proc: {} complete: {}", req_id, proc_, complete);
165 match sink.poll() {
166 Ok(Async::Ready(Some(buf))) => {
167 let req = LibvirtRequest {
168 stream: None,
169 sink: None,
170 event: None,
171 header: request::virNetMessageHeader {
172 type_: ::request::generated::virNetMessageType::VIR_NET_STREAM,
173 status: request::virNetMessageStatus::VIR_NET_CONTINUE,
174 proc_: proc_,
175 ..Default::default()
176 },
177 payload: buf,
178 };
179 return Ok(Async::Ready(Some((*req_id, req))));
180 }
181
182 Ok(Async::Ready(None)) => {
183 if *complete {
184 continue;
186 }
187 let req = LibvirtRequest {
188 stream: None,
189 sink: None,
190 event: None,
191 header: request::virNetMessageHeader {
192 type_: ::request::generated::virNetMessageType::VIR_NET_STREAM,
193 status: request::virNetMessageStatus::VIR_NET_OK,
194 proc_: proc_,
195 ..Default::default()
196 },
197 payload: BytesMut::new(),
198 };
199 debug!("Empty sink {}, sending empty msg", req_id);
200 *complete = true;
201 result = Ok(Async::Ready(Some((*req_id, req))));
202 break;
203 }
204
205 Ok(Async::NotReady) => {
206 }
208
209 Err(e) => {
210 error!("Error in sink {}: {:?}", req_id, e);
211 }
212 }
213 }
214
215 result
216 }
217
218 fn process_event(&mut self, resp: LibvirtResponse) {
219 let proc_ = resp.header.proc_ as u16;
220
221 if let Some(ref mut stream) = self.events.get_mut(&proc_) {
222 debug!("Event: found event stream for proc {}", proc_);
223 let sender = stream;
224 let _ = sender.start_send(resp);
225 let _ = sender.poll_complete();
226 return
227 }
228 debug!("Event: can't find event stream id for proc {}", proc_);
229 }
230
231 fn process_stream(&mut self, resp: LibvirtResponse) {
232 debug!("incoming stream: {:?}", resp.header);
233 {
234 let req_id = resp.header.serial as u64;
235 let mut remove_stream = false;
236
237 if let Some(ref mut stream) = self.streams.get_mut(&req_id) {
238 debug!("found stream for request id {}: {:?}", req_id, resp.header);
239 let sender = stream;
240 if resp.payload.len() != 0 {
241 if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
242 debug!("got error from stream, should drop sink");
243 self.sinks.remove(&req_id);
244 }
245 let _ = sender.start_send(resp);
246 let _ = sender.poll_complete();
247 } else {
248 debug!("closing stream {}", req_id);
249
250 debug!("got something from stream, should drop sink!");
251 self.sinks.remove(&req_id);
252
253 let _ = sender.start_send(resp);
254 let _ = sender.close();
255 let _ = sender.poll_complete();
256 remove_stream = true;
257 }
258 } else {
259 error!("can't find stream for request id {}: {:?}", req_id, resp.header);
260 if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
261 let mut reader = Cursor::new(resp.payload);
262 let (err, _) = request::virNetMessageError::unpack(&mut reader).unwrap();
263 println!("ERROR: {:?}", err);
264 }
265 }
266 if remove_stream {
267 debug!("Droppping stream ID {}", req_id);
268 self.streams.remove(&req_id);
269 }
270 }
271 }
272}
273
274impl<T> Stream for LibvirtTransport<T> where
275 T: AsyncRead + AsyncWrite + 'static,
276 {
277 type Item = (RequestId, LibvirtResponse);
278 type Error = ::std::io::Error;
279
280 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
281 use futures::Async;
282
283 debug!("POLL CALLED");
284
285 match self.inner.poll() {
286 Ok(async) => {
287 match async {
288 Async::Ready(Some((id, resp))) => {
289 debug!("FRAME READY ID: {} RESP: {:?}", id, resp);
290
291 let procedure = unsafe { ::std::mem::transmute(resp.header.proc_ as u16) };
292 if self.is_event(procedure) {
293 debug!("event received!");
294 self.process_event(resp);
295 debug!("processed event msg, get next packet");
296 return self.poll();
297 }
298
299 if resp.header.type_ == request::generated::virNetMessageType::VIR_NET_STREAM {
300 self.process_stream(resp);
301 debug!("processed stream msg, get next packet");
302 return self.poll();
303 }
304
305 return Ok(Async::Ready(Some((id, resp))));
306 },
307 _ => debug!("{:?}", async),
308 }
309 debug!("RETURNING {:?}", async);
310 Ok(async)
311 },
312 Err(e) => Err(e),
313 }
314 }
315}
316
317impl<T> Sink for LibvirtTransport<T> where
318 T: AsyncRead + AsyncWrite + 'static,
319 {
320 type SinkItem = (RequestId, LibvirtRequest);
321 type SinkError = ::std::io::Error;
322
323 fn start_send(&mut self, mut item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
324 use ::std::mem;
325 use futures::{AsyncSink};
326
327 if let Some(event_proc) = mem::replace(&mut item.1.event, None) {
335 debug!("Sending event request {:?}", event_proc);
336 if let Some(stream) = mem::replace(&mut item.1.stream, None) {
337 self.events.insert(event_proc as u16, stream);
338 }
339 }
340
341 if let Some(stream) = mem::replace(&mut item.1.stream, None) {
342 debug!("SENDING REQ ID = {} {:?} WITH STREAM", item.0, item.1.header);
343 self.streams.insert(item.0, stream);
344 }
345
346 let mut new_sink = false;
347 if let Some(sink) = mem::replace(&mut item.1.sink, None) {
348 debug!("SENDING REQ ID = {} {:?} WITH SINK", item.0, item.1.header);
349 {
350 self.sinks.insert(item.0, (sink, item.1.header.proc_, false));
351 new_sink = true;
352 }
353 }
354
355 {
356 debug!("Have {} sinks", self.sinks.len());
357 if !new_sink && self.sinks.len() > 0 {
358 return Ok(AsyncSink::NotReady(item));
359 }
360 }
361 self.inner.start_send(item)
362 }
363
364 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
365 use futures::{Async,AsyncSink};
366 use std::mem;
367 debug!("POLL COMPLETE CALLED");
368
369 if let Some(req) = mem::replace(&mut self.buffer, None) {
370 debug!("Sending buffered msg");
371 match try!(self.inner.start_send(req)) {
372 AsyncSink::NotReady(item) => {
373 debug!("Inner not ready, putting buffered msg back");
374 mem::replace(&mut self.buffer, Some(item));
375 return self.inner.poll_complete();
376 },
377 AsyncSink::Ready => {
378 debug!("Sent buffered msg");
379 },
380 }
381 }
382
383 loop {
384 match self.poll_sinks() {
385 Ok(Async::Ready(Some(req))) => {
386 debug!("SEND: some sinks are ready, try processing");
387
388 debug!("Sending sink msg: {} {:?} pl: {}", req.0, req.1.header, req.1.payload.len());
389 match try!(self.inner.start_send(req)) {
390 AsyncSink::NotReady(item) => {
391 debug!("Inner not ready, putting sink msg back");
392 mem::replace(&mut self.buffer, Some(item));
393 return self.inner.poll_complete();
394 },
395 AsyncSink::Ready => {
396 debug!("Sent sink msg");
397 },
398 }
399 }
400 ret => {
401 debug!("POLL_SINKS: {:?}", ret);
402 break;
403 }
404 }
405 }
406
407 self.inner.poll_complete()
408 }
409
410 fn close(&mut self) -> Poll<(), Self::SinkError> {
411 self.inner.close()
412 }
413}
414
415#[derive(Debug, Clone)]
416pub struct LibvirtProto;
417
418impl<T> multiplex::ClientProto<T> for LibvirtProto where T: AsyncRead + AsyncWrite + 'static {
419 type Request = LibvirtRequest;
420 type Response = LibvirtResponse;
421 type Transport = LibvirtTransport<T>;
422 type BindTransport = Result<Self::Transport, ::std::io::Error>;
423
424 fn bind_transport(&self, io: T) -> Self::BindTransport {
425 let framed = length_delimited::Builder::new()
426 .big_endian()
427 .length_field_offset(0)
428 .length_field_length(4)
429 .length_adjustment(-4)
430 .new_framed(io);
431 Ok(LibvirtTransport{
432 buffer: None,
433 inner: framed_delimited(framed, LibvirtCodec),
434 events: HashMap::new(),
435 streams: HashMap::new(),
436 sinks: HashMap::new(),
437 })
438 }
439}
440
441pub struct EventStream<E> where E: request::DomainEvent {
442 inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
443 handle_resp: fn(LibvirtResponse) -> Result<<E as request::DomainEvent>::From, ::LibvirtError>,
444}
445
446impl<E> EventStream<E> where E: request::DomainEvent {
447 pub fn new(inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
448 handler: fn(LibvirtResponse) -> Result<<E as request::DomainEvent>::From, ::LibvirtError>) -> Self {
449 EventStream { inner: inner, handle_resp: handler }
450 }
451
452}
453
454impl<E> Stream for EventStream<E> where E: ::std::fmt::Debug + request::DomainEvent {
455 type Item = E;
456 type Error = ::LibvirtError;
457
458 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
459 use futures::Async;
460 match self.inner.poll() {
461 Ok(Async::Ready(Some(resp))) => {
462 match (self.handle_resp)(resp) {
463 Ok(msg) => {
464 let msg = msg.into();
465 debug!("EVENT (CALLBACK) {:?}", msg);
466 Ok(Async::Ready(Some(msg)))
467 },
468 Err(e) => return Err(e),
469 }
470 },
471 Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
472 Ok(Async::NotReady) => Ok(Async::NotReady),
473 Err(e) => panic!(e),
474 }
475 }
476}
477
478pub struct LibvirtStream {
479 inner: ::futures::sync::mpsc::Receiver<LibvirtResponse>,
480}
481
482impl From<::futures::sync::mpsc::Receiver<LibvirtResponse>> for LibvirtStream {
483 fn from(f: ::futures::sync::mpsc::Receiver<LibvirtResponse>) -> Self {
484 LibvirtStream{ inner: f }
485 }
486}
487
488impl Stream for LibvirtStream {
489 type Item = BytesMut;
490 type Error = ::LibvirtError;
491
492 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
493 use futures::Async;
494 match self.inner.poll() {
495 Ok(Async::Ready(Some(resp))) => {
496 if resp.header.status == request::generated::virNetMessageStatus::VIR_NET_ERROR {
497 let mut reader = Cursor::new(resp.payload);
498 let (err, _) = request::virNetMessageError::unpack(&mut reader).unwrap();
499 return Err(::LibvirtError::from(err));
500 }
501 Ok(Async::Ready(Some(resp.payload)))
502 },
503 Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
504 Ok(Async::NotReady) => Ok(Async::NotReady),
505 Err(e) => panic!("LibvirtStream: unexpected error from mpsc::receiver: {:?}", e),
506 }
507 }
508}
509
510pub struct LibvirtSink {
511 pub inner: ::futures::sync::mpsc::Sender<BytesMut>,
512}
513
514impl Sink for LibvirtSink {
515 type SinkItem = BytesMut;
516 type SinkError = ::futures::sync::mpsc::SendError<Self::SinkItem>;
517
518 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
519 self.inner.start_send(item)
520 }
521
522 fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
523 self.inner.poll_complete()
524 }
525
526 fn close(&mut self) -> Poll<(), Self::SinkError> {
527 self.inner.close()
528 }
529}
530
531impl Drop for LibvirtSink {
532 fn drop(&mut self) {
533 debug!("LibvirtSink dropping");
534 let _ = self.close();
535 let _ = self.poll_complete();
536 }
537}