1use std::io::Cursor;
31use std::path::Path;
32use ::xdr_codec::{Pack,Unpack};
33use ::bytes::{BufMut, BytesMut};
34use ::tokio_proto::multiplex::{self};
35use ::tokio_service::Service;
36use ::request;
37use ::LibvirtError;
38use ::futures::{Future, future};
39use ::futures::sync::mpsc::{Sender,Receiver};
40use ::proto::{LibvirtProto, LibvirtRequest, LibvirtResponse};
41pub use ::proto::{LibvirtSink, LibvirtStream, EventStream};
42
43pub type LibvirtFuture<T> = Box<Future<Item = T, Error = LibvirtError>>;
44
45#[derive(Clone)]
47pub struct Client {
48 inner: multiplex::ClientService<::tokio_uds::UnixStream, LibvirtProto>,
49}
50
51impl Client {
52 pub fn connect<P: AsRef<Path>>(path: P, handle: &::tokio_core::reactor::Handle) -> Result<Client, ::std::io::Error> {
54 use ::tokio_uds_proto::UnixClient;
55 UnixClient::new(LibvirtProto)
56 .connect(path, handle)
57 .map(|inner| Client {
58 inner: inner,
59 })
60 }
61
62 fn pack<P: Pack<::bytes::Writer<::bytes::BytesMut>>>(procedure: request::remote_procedure,
63 payload: P,
64 stream: Option<Sender<LibvirtResponse>>,
65 sink: Option<Receiver<BytesMut>>,
66 event: Option<request::remote_procedure>) -> Result<LibvirtRequest, ::xdr_codec::Error> {
67 let buf = BytesMut::with_capacity(4096);
68 let buf = {
69 let mut writer = buf.writer();
70 try!(payload.pack(&mut writer));
71 writer.into_inner()
72 };
73 let req = LibvirtRequest {
74 stream: stream,
75 sink: sink,
76 event: event,
77 header: request::virNetMessageHeader {
78 proc_: procedure as i32,
79 ..Default::default()
80 },
81 payload: buf,
82 };
83 Ok(req)
84 }
85
86 fn handle_response<'a, P: Unpack<Cursor<::bytes::BytesMut>>>(resp: LibvirtResponse) -> Result<P, LibvirtError> {
87 let mut reader = Cursor::new(resp.payload);
88 if resp.header.status == request::virNetMessageStatus::VIR_NET_OK {
89 let (pkt, _) = try!(P::unpack(&mut reader));
90 Ok(pkt)
91 } else {
92 let (err, _) = try!(request::virNetMessageError::unpack(&mut reader));
93 Err(err.into())
94 }
95 }
96
97 fn request<P>(&self, payload: P) ->
98 LibvirtFuture<<P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response>
99 where P: Pack<::bytes::Writer<::bytes::BytesMut>> + request::LibvirtRpc<Cursor<::bytes::BytesMut>>,
100 <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response: 'static
101 {
102 self.request_stream(payload, None, None)
103 }
104
105 fn request_stream<P>(&self, payload: P, stream: Option<Sender<LibvirtResponse>>, event: Option<request::remote_procedure>) ->
106 Box<Future<Item = <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response, Error = LibvirtError>>
107 where P: Pack<::bytes::Writer<::bytes::BytesMut>> + request::LibvirtRpc<Cursor<::bytes::BytesMut>>,
108 <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response: 'static
109 {
110 self.request_sink_stream(payload, stream, None, event)
111 }
112
113 fn request_sink<P>(&self, payload: P, sink: Option<Receiver<BytesMut>>) ->
114 Box<Future<Item = <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response, Error = LibvirtError>>
115 where P: Pack<::bytes::Writer<::bytes::BytesMut>> + request::LibvirtRpc<Cursor<::bytes::BytesMut>>,
116 <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response: 'static
117 {
118 self.request_sink_stream(payload, None, sink, None)
119 }
120
121 fn request_sink_stream<P>(&self, payload: P,
122 stream: Option<Sender<LibvirtResponse>>,
123 sink: Option<Receiver<BytesMut>>,
124 event: Option<request::remote_procedure>) ->
125 Box<Future<Item = <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response, Error = LibvirtError>>
126 where P: Pack<::bytes::Writer<::bytes::BytesMut>> + request::LibvirtRpc<Cursor<::bytes::BytesMut>>,
127 <P as request::LibvirtRpc<Cursor<::bytes::BytesMut>>>::Response: 'static
128 {
129 let req = Self::pack(P::PROCEDURE, payload, stream, sink, event);
130 match req {
131 Err(e) => {
132 Box::new(future::err(e.into()))
133 },
134 Ok(req) => Box::new(self.call(req)
135 .map_err(|e| e.into())
136 .and_then(Self::handle_response))
137 }
138 }
139
140 pub fn auth(&self) -> LibvirtFuture<request::AuthListResponse> {
142 let pl = request::AuthListRequest::new();
143 self.request(pl)
144 }
145
146 pub fn open(&self) -> LibvirtFuture<()> {
148 let pl = request::ConnectOpenRequest::new();
149 Box::new(self.request(pl).map(|_| ()))
150 }
151
152 pub fn version(&self) -> LibvirtFuture<(u32, u32, u32)> {
154 let pl = request::GetLibVersionRequest::new();
155 Box::new(self.request(pl).map(|resp| resp.version()))
156 }
157
158 pub fn node_info(&self) -> LibvirtFuture<request::NodeInfo> {
159 let pl = request::NodeGetInfoRequest::new();
160 Box::new(self.request(pl).map(|resp| resp.into()))
161 }
162
163 pub fn domain(&self) -> DomainOperations {
164 DomainOperations{client: self}
165 }
166
167 pub fn pool(&self) -> PoolOperations {
168 PoolOperations{client: self}
169 }
170
171 pub fn volume(&self) -> VolumeOperations {
172 VolumeOperations{client: self}
173 }
174}
175
176pub struct VolumeOperations<'a> {
178 client: &'a Client,
179}
180
181impl<'a> VolumeOperations<'a> {
182 pub fn create(&self, pool: &request::StoragePool, xml: &str,
184 flags: request::StorageVolCreateXmlFlags::StorageVolCreateXmlFlags) -> LibvirtFuture<request::Volume> {
185 let payload = request::StorageVolCreateXmlRequest::new(pool, xml, flags);
186 Box::new(self.client.request(payload).map(|resp| resp.into()))
187 }
188
189 pub fn create_from(&self, pool: &request::StoragePool, xml: &str, vol: &request::Volume,
192 flags: request::StorageVolCreateXmlFlags::StorageVolCreateXmlFlags) -> LibvirtFuture<request::Volume> {
193 let payload = request::StorageVolCreateXmlFromRequest::new(pool, xml, vol, flags);
194 Box::new(self.client.request(payload).map(|resp| resp.into()))
195 }
196
197 pub fn delete(&self, vol: request::Volume) -> LibvirtFuture<()> {
199 let payload = request::StorageVolDeleteRequest::new(vol, 0);
200 Box::new(self.client.request(payload).map(|resp| resp.into()))
201 }
202
203 pub fn wipe(&self, vol: &request::Volume) -> LibvirtFuture<()> {
209 let payload = request::StorageVolWipeRequest::new(vol, 0);
210 Box::new(self.client.request(payload).map(|resp| resp.into()))
211 }
212
213 pub fn lookup_by_name(&self, pool: &request::StoragePool, name: &str) -> LibvirtFuture<request::Volume> {
214 let payload = request::StorageVolLookupByNameRequest::new(pool, name);
215 Box::new(self.client.request(payload).map(|resp| resp.into()))
216 }
217
218 pub fn resize(&self, vol: &request::Volume, capacity: u64, flags: request::StorageVolResizeFlags::StorageVolResizeFlags) -> LibvirtFuture<()> {
235 let payload = request::StorageVolResizeRequest::new(vol, capacity, flags);
236 Box::new(self.client.request(payload).map(|resp| resp.into()))
237 }
238
239 pub fn download(&self, vol: &request::Volume, offset: u64, length: u64) -> LibvirtFuture<LibvirtStream> {
244 let pl = request::StorageVolDownloadRequest::new(vol, offset, length, 0);
245 let (sender, receiver) = ::futures::sync::mpsc::channel(0);
246
247 Box::new(self.client.request_stream(pl, Some(sender), None).map(move |_| {
248 LibvirtStream::from(receiver)
249 }))
250 }
251
252 pub fn upload(&self, vol: &request::Volume, offset: u64, length: u64) -> LibvirtFuture<LibvirtSink> {
261 let pl = request::StorageVolUploadRequest::new(vol, offset, length, 0);
262 let (sender, receiver) = ::futures::sync::mpsc::channel(64);
263
264 Box::new(self.client.request_sink(pl, Some(receiver)).map(move |_| {
265 LibvirtSink { inner: sender }
266 }))
267 }
268
269 pub fn upload_with<F, R>(&self, vol: &request::Volume, offset: u64, length: u64, uploader: F) -> Box<Future<Item = (), Error = R::Error>>
271 where F: FnOnce(LibvirtSink) -> R + Send + 'static,
272 R: ::futures::IntoFuture + 'static,
273 R::Future: Send + 'static,
274 R::Item: Send + 'static,
275 R::Error: Send + 'static + From<LibvirtError>,
276 {
277 use futures::{Future, Stream};
278 let pl = request::StorageVolUploadRequest::new(vol, offset, length, 0);
279 let (sink_sender, sink_receiver) = ::futures::sync::mpsc::channel(64);
280 let (stream_sender, stream_receiver) = ::futures::sync::mpsc::channel(64);
281
282 Box::new(self.client.request_sink_stream(pl, Some(stream_sender), Some(sink_receiver), None)
283 .map_err(|e| e.into())
284 .and_then(move |_| uploader(LibvirtSink { inner: sink_sender }).into_future())
285 .and_then(|_| stream_receiver.into_future().map_err(|e| panic!("Unexpected error in mpsc receiver: {:?}", e)))
286 .and_then(|(ev, _)| {
287 Client::handle_response(ev.unwrap()).map_err(|e| e.into())
288 }))
289 }
290
291 pub fn info(&self, vol: &request::Volume) -> LibvirtFuture<request::VolumeInfo> {
293 let pl = request::StorageVolGetInfoRequest::new(vol);
294 Box::new(self.client.request(pl).map(|resp| resp.into()))
295 }
296}
297
298pub struct PoolOperations<'a> {
300 client: &'a Client,
301}
302
303impl<'a> PoolOperations<'a> {
304 pub fn list(&self, flags: request::ListAllStoragePoolsFlags::ListAllStoragePoolsFlags) -> LibvirtFuture<Vec<request::StoragePool>> {
306 let payload = request::ListAllStoragePoolsRequest::new(flags);
307 Box::new(self.client.request(payload).map(|resp| resp.into()))
308 }
309
310 pub fn define(&self, xml: &str) -> LibvirtFuture<request::StoragePool> {
312 let payload = request::StoragePoolDefineXmlRequest::new(xml, 0);
313 Box::new(self.client.request(payload).map(|resp| resp.into()))
314 }
315
316 pub fn lookup_by_uuid(&self, uuid: &::uuid::Uuid) -> LibvirtFuture<request::StoragePool> {
318 let payload = request::StoragePoolLookupByUuidRequest::new(uuid);
319 Box::new(self.client.request(payload).map(|resp| resp.into()))
320 }
321
322 pub fn lookup_by_name(&self, name: &str) -> LibvirtFuture<request::StoragePool> {
324 let payload = request::StoragePoolLookupByNameRequest::new(name);
325 Box::new(self.client.request(payload).map(|resp| resp.into()))
326 }
327
328 pub fn start(&self, pool: &request::StoragePool) -> LibvirtFuture<()> {
330 let payload = request::StoragePoolCreateRequest::new(pool, 0);
331 Box::new(self.client.request(payload).map(|resp| resp.into()))
332 }
333
334 pub fn destroy(&self, pool: &request::StoragePool) -> LibvirtFuture<()> {
337 let payload = request::StoragePoolDestroyRequest::new(pool);
338 Box::new(self.client.request(payload).map(|resp| resp.into()))
339 }
340
341 pub fn undefine(&self, pool: request::StoragePool) -> LibvirtFuture<()> {
343 let payload = request::StoragePoolUndefineRequest::new(pool);
344 Box::new(self.client.request(payload).map(|resp| resp.into()))
345 }
346
347 pub fn list_volume_names(&self, pool: &request::StoragePool) -> LibvirtFuture<Vec<String>> {
349 let payload = request::StoragePoolListVolumesRequest::new(pool, request::generated::REMOTE_STORAGE_VOL_LIST_MAX as i32);
350 Box::new(self.client.request(payload).map(|resp| resp.into()))
351 }
352
353 pub fn list_volumes(&self, pool: &request::StoragePool) -> LibvirtFuture<Vec<request::Volume>> {
355 let payload = request::StoragePoolListAllVolumesRequest::new(pool, 1, 0);
356 Box::new(self.client.request(payload).map(|resp| resp.into()))
357 }
358
359 pub fn info(&self, pool: &request::StoragePool) -> LibvirtFuture<request::StoragePoolInfo> {
360 let payload = request::StoragePoolGetInfoRequest::new(pool);
361 Box::new(self.client.request(payload).map(|resp| resp.into()))
362 }
363}
364
365pub struct DomainOperations<'a> {
367 client: &'a Client,
368}
369
370impl<'a> DomainOperations<'a> {
371 pub fn info(&self, dom: &request::Domain) -> LibvirtFuture<request::DomainInfo> {
372 let payload = request::DomainGetInfoRequest::new(dom);
373 Box::new(self.client.request(payload).map(|resp| resp.into()))
374 }
375
376 pub fn list(&self, flags: request::ListAllDomainsFlags::ListAllDomainsFlags) -> LibvirtFuture<Vec<request::Domain>> {
378 let payload = request::ListAllDomainsRequest::new(flags);
379 Box::new(self.client.request(payload).map(|resp| resp.into()))
380 }
381
382 pub fn lookup_by_uuid(&self, uuid: &::uuid::Uuid) -> LibvirtFuture<request::Domain> {
384 let pl = request::DomainLookupByUuidRequest::new(uuid);
385 Box::new(self.client.request(pl).map(|resp| resp.domain()))
386 }
387
388 fn register_event<T: request::DomainEvent>(&self, dom: Option<&request::Domain>, event: request::DomainEventId) -> LibvirtFuture<EventStream<T>> {
389 let pl = request::DomainEventCallbackRegisterAnyRequest::new(event as i32, dom);
390 let (sender, receiver) = ::futures::sync::mpsc::channel(1024);
391 let event_procedure = event.to_procedure();
392 Box::new(self.client.request_stream(pl, Some(sender), Some(event_procedure))
393 .map(move |resp| {
394 let id = resp.callback_id();
395 debug!("REGISTERED CALLBACK ID {}", id);
396 {
397 EventStream::new(receiver, Client::handle_response)
398 }
399 }))
400 }
401
402 pub fn register_lifecycle_event(&self, dom: Option<&request::Domain>) -> LibvirtFuture<EventStream<request::DomainLifecycleEvent>> {
403 self.register_event(dom, request::DomainEventId::Lifecycle)
404 }
405
406 pub fn register_reboot_event(&self, dom: Option<&request::Domain>) -> LibvirtFuture<EventStream<request::DomainRebootEvent>> {
407 self.register_event(dom, request::DomainEventId::Reboot)
408 }
409
410 pub fn register_block_job_event(&self, dom: Option<&request::Domain>) -> LibvirtFuture<EventStream<request::DomainBlockJobEvent>> {
411 self.register_event(dom, request::DomainEventId::BlockJob)
412 }
413 pub fn start(&self, dom: request::Domain, flags: request::DomainCreateFlags::DomainCreateFlags) -> LibvirtFuture<request::Domain> {
417 let pl = request::DomainCreateRequest::new(dom, flags);
418 Box::new(self.client.request(pl).map(|resp| resp.into()))
419 }
420
421 pub fn destroy(&self, dom: &request::Domain, flags: request::DomainDestroyFlags::DomainDestroyFlags) -> LibvirtFuture<()> {
423 let pl = request::DomainDestroyRequest::new(dom, flags);
424 Box::new(self.client.request(pl).map(|_| ()))
425 }
426
427 pub fn define(&self, xml: &str) -> LibvirtFuture<request::Domain> {
430 let pl = request::DomainDefineXMLRequest::new(xml, 1); Box::new(self.client.request(pl).map(|resp| resp.into()))
432 }
433
434 pub fn undefine(&self, dom: request::Domain) -> LibvirtFuture<()> {
437 let pl = request::DomainUndefineRequest::new(dom, 0); Box::new(self.client.request(pl).map(|resp| resp.into()))
439 }
440
441 pub fn shutdown(&self, dom: &request::Domain) -> LibvirtFuture<()> {
450 let pl = request::DomainShutdownRequest::new(dom);
451 Box::new(self.client.request(pl).map(|resp| resp.into()))
452 }
453
454 pub fn reboot(&self, dom: &request::Domain) -> LibvirtFuture<()> {
459 let pl = request::DomainRebootRequest::new(dom, 0);
460 Box::new(self.client.request(pl).map(|resp| resp.into()))
461 }
462
463 pub fn reset(&self, dom: &request::Domain) -> LibvirtFuture<()> {
468 let pl = request::DomainResetRequest::new(dom, 0);
469 Box::new(self.client.request(pl).map(|resp| resp.into()))
470 }
471
472 pub fn screenshot(&self, dom: &request::Domain, screen: u32) -> LibvirtFuture<(Option<String>, LibvirtStream)> {
481 let pl = request::DomainScreenshotRequest::new(dom, screen, 0);
482 let (sender, receiver) = ::futures::sync::mpsc::channel(0);
483
484 Box::new(self.client.request_stream(pl, Some(sender), None).map(move |resp|{
485 (resp.into(), LibvirtStream::from(receiver))
486 }))
487 }
488
489
490 pub fn attach_device(&self, dom: &request::Domain, xml: &str, flags: request::DomainModificationImpact::DomainModificationImpact) -> LibvirtFuture<()> {
504 let pl = request::DomainAttachDeviceRequest::new(dom, xml, flags);
505 Box::new(self.client.request(pl).map(|resp| resp.into()))
506 }
507
508 pub fn detach_device(&self, dom: &request::Domain, xml: &str, flags: request::DomainModificationImpact::DomainModificationImpact) -> LibvirtFuture<()> {
534 let pl = request::DomainDetachDeviceRequest::new(dom, xml, flags);
535 Box::new(self.client.request(pl).map(|resp| resp.into()))
536 }
537
538 pub fn update_device(&self, dom: &request::Domain, xml: &str, flags: request::DomainModificationImpact::DomainModificationImpact) -> LibvirtFuture<()> {
548 let pl = request::DomainUpdateDeviceRequest::new(dom, xml, flags);
549 Box::new(self.client.request(pl).map(|resp| resp.into()))
550 }
551
552 pub fn set_memory(&self, dom: &request::Domain, size: u64, flags: request::DomainModificationImpact::MemoryModificationImpact) -> LibvirtFuture<()> {
554 let pl = request::DomainSetMemoryRequest::new(dom, size, flags);
555 Box::new(self.client.request(pl).map(|resp| resp.into()))
556 }
557
558 pub fn set_vcpus(&self, dom: &request::Domain, count: u32, flags: request::DomainModificationImpact::VcpuModificationImpact) -> LibvirtFuture<()> {
559 let pl = request::DomainSetVcpusRequest::new(dom, count, flags);
560 Box::new(self.client.request(pl).map(|resp| resp.into()))
561 }
562
563 pub fn get_vcpus(&self, dom: &request::Domain, flags: request::DomainModificationImpact::VcpuModificationImpact) -> LibvirtFuture<u32> {
564 let pl = request::DomainGetVcpusRequest::new(dom, flags);
565 Box::new(self.client.request(pl).map(|resp| resp.into()))
566 }
567
568 pub fn get_memory_params(&self, dom: &request::Domain, flags: request::DomainModificationImpact::DomainModificationImpact) -> LibvirtFuture<()> {
569 let pl = request::DomainGetMemoryParametersRequest::new(dom, 8 , flags);
570 Box::new(self.client.request(pl).map(|resp| println!("DEBUG RESP {:?}", resp)))
571 }
572
573 pub fn get_xml(&self, dom: &request::Domain, flags: request::DomainXmlFlags::DomainXmlFlags) -> LibvirtFuture<String> {
581 let pl = request::DomainGetXmlDescRequest::new(dom, flags);
582 Box::new(self.client.request(pl).map(|resp| resp.into()))
583 }
584
585 pub fn set_autostart(&self, dom: &request::Domain, enable: bool) -> LibvirtFuture<()> {
587 let pl = request::DomainSetAutoStartRequest::new(dom, enable);
588 Box::new(self.client.request(pl).map(|resp| resp.into()))
589 }
590
591 pub fn get_autostart(&self, dom: &request::Domain) -> LibvirtFuture<bool> {
593 let pl = request::DomainGetAutoStartRequest::new(dom);
594 Box::new(self.client.request(pl).map(|resp| resp.into()))
595 }
596
597 pub fn send_key(&self, dom: &request::Domain, codeset: u32, holdtime: u32, keycodes: Vec<u32>) -> LibvirtFuture<()> {
599 let pl = request::DomainSendKeyRequest::new(dom, codeset, holdtime, keycodes, 0);
600 Box::new(self.client.request(pl).map(|resp| resp.into()))
601 }
602
603pub fn migrate(&self, dom: &request::Domain, uri: &str, params: Vec<request::MigrationParam>, flags: request::DomainMigrateFlags::DomainMigrateFlags) -> LibvirtFuture<()> {
614 let pl = request::MigratePerformRequest::new(dom, Some(uri), params, vec![], flags);
615 Box::new(self.client.request(pl).map(|resp| {
616 println!("DEBUG RESP: {:?}", resp);
617 }))
618 }
619}
620
621impl Service for Client {
622 type Request = LibvirtRequest;
623 type Response = LibvirtResponse;
624 type Error = ::std::io::Error;
625 type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
626
627 fn call(&self, req: Self::Request) -> Self::Future {
628 Box::new(self.inner.call(req))
629 }
630}
631
632#[cfg(test)]
633mod tests {
634 use std::fmt::Debug;
635 use ::tokio_core::reactor::Core;
636 use ::async::Client;
637 use futures::{Future,IntoFuture,Stream};
638
639 fn connect() -> (Client, Core) {
640 let core = Core::new().unwrap();
641 let handle = core.handle();
642 let client = Client::connect("/var/run/libvirt/libvirt-sock", &handle).unwrap();
643 (client, core)
644 }
645
646 fn run_connected<'a, P, F, I>(f: P)
647 where P: FnOnce(Client) -> F,
648 I: Debug,
649 F: IntoFuture<Item=I, Error=::LibvirtError> + 'static {
650 let (client, mut core) = connect();
651 let result = core.run({
652 client.auth()
653 .and_then(|_| client.open())
654 .and_then(|_| {
655 let c = client.clone();
656 f(c)
657 })
658 }).unwrap();
659 println!("{:?}", result);
660 }
661
662 #[test]
663 fn test_version() {
664 run_connected(|client| client.version())
665 }
666}