dbus_bytestream/
connection.rs

1//! Deals with creating and using connections to dbus-daemon.  The primary
2//! type of interest is the Connection struct
3//! 
4//! # Examples
5//! ```
6//! use dbus_bytestream::connection::Connection;
7//! use dbus_bytestream::message;
8//!
9//! let conn = Connection::connect_system().unwrap();
10//! let msg = message::create_method_call(
11//!     "org.freedesktop.DBus", // destination
12//!     "/org/freedesktop/DBus", // path
13//!     "org.freedesktop.DBus", //interface
14//!     "RequestName" // method
15//! );
16//! msg = msg.add_arg(&"com.test.foobar")
17//!          .add_arg(&(0 as u32));
18//! let reply = conn.call_sync(msg);
19//! println!("{:?}", reply);
20//! ```
21//!
22
23use std::collections::VecDeque;
24use std::env;
25use std::error;
26use std::fmt;
27use std::net::{TcpStream,ToSocketAddrs};
28use std::io;
29use std::io::{Read,Write};
30use std::fs::File;
31use std::ops::Deref;
32use std::path::Path;
33use std::cell::RefCell;
34use std::str::FromStr;
35use std::string;
36use std::num::ParseIntError;
37use rand::Rand;
38use rand;
39use libc;
40use crypto::digest::Digest;
41use crypto;
42
43use unix_socket::UnixStream;
44use rustc_serialize::hex::{ToHex,FromHex,FromHexError};
45use dbus_serialize::types::Value;
46use dbus_serialize::decoder::DBusDecoder;
47
48use address;
49use address::ServerAddress;
50use message;
51use message::{Message,HeaderField};
52use demarshal::{demarshal,DemarshalError};
53use marshal::Marshal;
54
55trait StreamSocket : Read + Write { }
56impl<T: Read + Write> StreamSocket for T {}
57
58enum Socket {
59    Tcp(TcpStream),
60    Uds(UnixStream)
61}
62
63pub struct Connection {
64    sock: RefCell<Socket>,
65    serial: RefCell<u32>,
66    queue: RefCell<VecDeque<Message>>,
67}
68
69#[derive(Debug)]
70pub enum Error {
71    Disconnected,
72    IOError(io::Error),
73    DemarshalError(DemarshalError),
74    AddressError(address::ServerAddressError),
75    BadData,
76    AuthFailed,
77    NoEnvironment,
78}
79
80impl From<io::Error> for Error {
81    fn from(x: io::Error) -> Self {
82        Error::IOError(x)
83    }
84}
85
86impl From<DemarshalError> for Error {
87    fn from(x: DemarshalError) -> Self {
88        Error::DemarshalError(x)
89    }
90}
91
92impl From<address::ServerAddressError> for Error {
93    fn from(x: address::ServerAddressError) -> Self {
94        Error::AddressError(x)
95    }
96}
97
98impl From<FromHexError> for Error {
99    fn from(_x: FromHexError) -> Self {
100        Error::AuthFailed
101    }
102}
103
104impl From<string::FromUtf8Error> for Error {
105    fn from(_x: string::FromUtf8Error) -> Self {
106        Error::AuthFailed
107    }
108}
109
110impl From<ParseIntError> for Error {
111    fn from(_x: ParseIntError) -> Self {
112        Error::AuthFailed
113    }
114}
115
116impl fmt::Display for Error {
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        match *self {
119            Error::Disconnected              => write!(f, "disconnected"),
120            Error::IOError(ref ioerr)        => write!(f, "i/o error: {}", ioerr),
121            Error::DemarshalError(ref dmerr) => write!(f, "demarshall error: {}", dmerr),
122            Error::AddressError(ref addrerr) => write!(f, "address error: {:?}", addrerr),
123            Error::BadData                   => write!(f, "bad data"),
124            Error::AuthFailed                => write!(f, "authentication failed"),
125            Error::NoEnvironment             => write!(f, "no environment"),
126        }
127    }
128}
129
130impl error::Error for Error {
131    fn description(&self) -> &str {
132        "D-Bus error"
133    }
134
135    fn cause(&self) -> Option<&error::Error> {
136        match *self {
137            Error::IOError(ref ioerr) => Some(ioerr),
138            _                         => None,
139        }
140    }
141}
142
143fn read_exactly(sock: &mut StreamSocket, buf: &mut Vec<u8>, len: usize) -> Result<(),Error> {
144    buf.truncate(0);
145    buf.reserve(len);
146    if try!(sock.take(len as u64).read_to_end(buf)) != len {
147        return Err(Error::Disconnected);
148    }
149    Ok(())
150}
151
152fn read_line(sock: &mut StreamSocket) -> Result<String,Error> {
153    let mut line = "".to_owned();
154    let mut last = '\0';
155
156    loop {
157        let mut buf = vec![0];
158        match sock.read(&mut buf) {
159            Ok(x) if x > 0 => (),
160            _ => return Err(Error::Disconnected)
161        };
162        let chr = buf[0] as char;
163        line.push(chr);
164        if chr == '\n' && last == '\r' {
165            break;
166        }
167        last = chr;
168    }
169    Ok(line)
170}
171
172fn get_cookie(context: &str, cookie_id: &str) -> Result<String,Error> {
173    let hd = match env::home_dir() {
174        Some(x) => x,
175        None => return Err(Error::AuthFailed)
176    };
177    let filename = hd.join(".dbus-keyrings").join(context);
178    let mut f = try!(File::open(filename));
179    let mut contents = String::new();
180    try!(f.read_to_string(&mut contents));
181    let lines : Vec<&str> = contents.split('\n').collect();
182    for line in lines {
183        if !line.starts_with(cookie_id) {
184            continue;
185        }
186        let words : Vec<&str> = line.split(' ').collect();
187        if words.len() != 3 {
188            break;
189        }
190        return Ok(words[2].to_owned());
191    }
192
193    Err(Error::AuthFailed)
194}
195
196impl Connection {
197    fn run_sock<F, T>(&self, f: F) -> T
198        where F: FnOnce(&mut StreamSocket) -> T {
199        let mut sock = self.sock.borrow_mut();
200
201        match *sock {
202            Socket::Tcp(ref mut x) => f(x),
203            Socket::Uds(ref mut x) => f(x),
204        }
205    }
206
207    fn sock_send_nul_byte(sock: &mut StreamSocket) -> Result<(),Error> {
208        // Send NUL byte
209        let buf = vec![0];
210        try!(sock.write_all(&buf));
211        Ok(())
212    }
213
214    fn send_nul_byte(&self) -> Result<(),Error> {
215        self.run_sock(Self::sock_send_nul_byte)
216    }
217
218    fn sock_auth_anonymous(sock: &mut StreamSocket) -> Result<(),Error> {
219        try!(sock.write_all(b"AUTH ANONYMOUS 6c69626462757320312e382e3132\r\n"));
220
221        // Read response
222        let resp = try!(read_line(sock));
223        if !resp.starts_with("OK ") {
224            return Err(Error::AuthFailed);
225        }
226
227        // Ready for action
228        try!(sock.write_all(b"BEGIN\r\n"));
229        Ok(())
230    }
231
232    fn auth_anonymous(&self) -> Result<(),Error> {
233        self.run_sock(Self::sock_auth_anonymous)
234    }
235
236    fn sock_auth_external(sock: &mut StreamSocket) -> Result<(),Error> {
237        let uid = unsafe {
238            libc::funcs::posix88::unistd::getuid()
239        };
240        let uid_str = uid.to_string();
241        let uid_hex = uid_str.into_bytes().to_hex();
242        let cmd = "AUTH EXTERNAL ".to_owned() + &uid_hex + "\r\n";
243        try!(sock.write_all(&cmd.into_bytes()));
244
245        // Read response
246        let resp = try!(read_line(sock));
247        if !resp.starts_with("OK ") {
248            return Err(Error::AuthFailed);
249        }
250
251        // Ready for action
252        try!(sock.write_all(b"BEGIN\r\n"));
253        Ok(())
254    }
255
256    fn auth_external(&self) -> Result<(),Error> {
257        self.run_sock(Self::sock_auth_external)
258    }
259
260    fn sock_auth_cookie(sock: &mut StreamSocket) -> Result<(),Error> {
261        let uid = unsafe {
262            libc::funcs::posix88::unistd::getuid()
263        };
264        let uid_str = uid.to_string();
265        let uid_hex = uid_str.into_bytes().to_hex();
266        let cmd = "AUTH DBUS_COOKIE_SHA1 ".to_owned() + &uid_hex + "\r\n";
267        try!(sock.write_all(&cmd.into_bytes()));
268
269        // Read response
270        let resp = try!(read_line(sock));
271        let words : Vec<&str> = resp.split(' ').collect();
272        if words.len() != 2 {
273            return Err(Error::AuthFailed);
274        }
275        if words[0] != "DATA" {
276            return Err(Error::AuthFailed);
277        }
278
279        let bytes = try!(words[1].from_hex());
280        let challenge = try!(String::from_utf8(bytes));
281        let words : Vec<&str> = challenge.split(' ').collect();
282        if words.len() != 3 {
283            return Err(Error::AuthFailed);
284        }
285
286        let cookie = try!(get_cookie(words[0], words[1]));
287
288        let mut my_challenge = Vec::new();
289        for _ in 0..16 {
290            my_challenge.push(u8::rand(&mut rand::thread_rng()));
291        }
292        let hex_challenge = my_challenge.to_hex();
293
294        let my_cookie = words[2].to_owned() + ":" + &hex_challenge + ":" + &cookie;
295        let mut hasher = crypto::sha1::Sha1::new();
296        hasher.input_str(&my_cookie);
297        let hash = hasher.result_str();
298
299        let my_resp = hex_challenge + " " + &hash;
300        let hex_resp = my_resp.into_bytes().to_hex();
301        let buf = "DATA ".to_owned() + &hex_resp + "\r\n";
302        try!(sock.write_all(&buf.into_bytes()));
303
304        // Read response
305        let resp = try!(read_line(sock));
306        if !resp.starts_with("OK ") {
307            return Err(Error::AuthFailed);
308        }
309
310        // Ready for action
311        try!(sock.write_all(b"BEGIN\r\n"));
312        Ok(())
313    }
314
315    fn auth_cookie(&self) -> Result<(),Error> {
316        self.run_sock(Self::sock_auth_cookie)
317    }
318
319    fn authenticate(&self) -> Result<(),Error> {
320        try!(self.send_nul_byte());
321        try!(self.auth_external()
322              .or_else(|_x| { self.auth_cookie() })
323              .or_else(|_x| { self.auth_anonymous() }));
324        self.say_hello()
325    }
326
327    fn say_hello(&self) -> Result<(),Error> {
328        let msg = message::create_method_call("org.freedesktop.DBus",
329                                              "/org/freedesktop/DBus",
330                                              "org.freedesktop.DBus",
331                                              "Hello");
332        try!(self.call_sync(msg));
333        Ok(())
334    }
335
336    fn connect_addr(addr: ServerAddress) -> Result<Connection,Error> {
337        match addr {
338            ServerAddress::Unix(unix) => Self::connect_uds(unix.path()),
339            ServerAddress::Tcp(tcp) => Self::connect_tcp(tcp),
340        }
341    }
342
343    /// Connects to a DBus address string.
344    pub fn connect(addr: &str) -> Result<Connection, Error> {
345        Self::connect_addr(try!(ServerAddress::from_str(addr)))
346    }
347
348    /// Connects to the system bus.
349    ///
350    /// The address is specified by the environment variable
351    /// DBUS_SYSTEM_BUS_ADDRESS or "unix:path=/var/run/dbus/system_bus_socket" if unset.
352    pub fn connect_system() -> Result<Connection, Error> {
353        let default = "unix:path=/var/run/dbus/system_bus_socket";
354        if let Ok(e) = env::var("DBUS_SYSTEM_BUS_ADDRESS") {
355            Self::connect(&e)
356        } else {
357            Self::connect(default)
358        }
359    }
360
361    /// Connects to the session bus.
362    ///
363    /// The address is specified by the environment variable DBUS_SESSION_BUS_ADDRESS.
364    pub fn connect_session() -> Result<Connection, Error> {
365        if let Ok(e) = env::var("DBUS_SESSION_BUS_ADDRESS") {
366            Self::connect(&e)
367        } else {
368            Err(Error::NoEnvironment)
369        }
370    }
371
372    /// Creates a Connection object using a UNIX domain socket as the transport.  The addr is the
373    /// path to connect to.  Abstract paths can be used by passing a NUL byte as the first byte of
374    /// addr.
375    pub fn connect_uds<P: AsRef<Path>>(addr: P) -> Result<Connection,Error> {
376        let sock = try!(UnixStream::connect(addr));
377        let conn = Connection {
378            sock: RefCell::new(Socket::Uds(sock)),
379            queue: RefCell::new(VecDeque::new()),
380            serial: RefCell::new(1)
381        };
382
383        try!(conn.authenticate());
384        Ok(conn)
385    }
386
387    /// Creates a Connection object using a TCP socket as the transport.  The addr is the host and
388    /// port to connect to.
389    pub fn connect_tcp<T: ToSocketAddrs>(addr: T) -> Result<Connection,Error> {
390        let sock = try!(TcpStream::connect(addr));
391        let conn = Connection {
392            sock: RefCell::new(Socket::Tcp(sock)),
393            queue: RefCell::new(VecDeque::new()),
394            serial: RefCell::new(1)
395        };
396
397        try!(conn.authenticate());
398        Ok(conn)
399    }
400
401    fn next_serial(&self) -> u32 {
402        let mut serial = self.serial.borrow_mut();
403        let current_serial = *serial;
404        *serial = current_serial + 1;
405        current_serial
406    }
407
408    fn sock_send(sock: &mut StreamSocket, mbuf: Message) -> Result<(), Error> {
409        let mut msg = Vec::new();
410        mbuf.dbus_encode(&mut msg);
411
412        try!(sock.write_all(&msg));
413        try!(sock.write_all(&mbuf.body));
414        Ok(())
415    }
416
417    /// Sends a message over the connection.  The Message can be created by one of the functions
418    /// from the message module, such as message::create_method_call .  On success, returns the
419    /// serial number of the outgoing message so that the reply can be identified.
420    pub fn send(&self, mut mbuf: Message) -> Result<u32, Error> {
421        let this_serial = self.next_serial();
422        mbuf.serial = this_serial;
423
424        try!(self.run_sock(move |sock| {
425            Self::sock_send(sock, mbuf)
426        }));
427        Ok(this_serial)
428    }
429
430    fn push_queue(&self, queue: &mut VecDeque<Message>) {
431        let mut master_queue = self.queue.borrow_mut();
432
433        while !queue.is_empty() {
434            master_queue.push_front(queue.pop_front().unwrap());
435        }
436        //#![feature(append)]
437        //self.queue.borrow_mut().append(queue)
438    }
439
440    /// Sends a message over a connection and block until a reply is received.  This is only valid
441    /// for method calls.  Returns the sequence of Value objects that is the body of the method
442    /// return.
443    ///
444    /// # Panics
445    /// Calling this function with a Message for other than METHOD_CALL or with the
446    /// NO_REPLY_EXPECTED flag set is a programming error and will panic.
447    pub fn call_sync(&self, mbuf: Message) -> Result<Option<Vec<Value>>,Error> {
448        assert_eq!(mbuf.message_type, message::MESSAGE_TYPE_METHOD_CALL);
449        assert_eq!(mbuf.flags & message::FLAGS_NO_REPLY_EXPECTED, 0);
450        let serial = try!(self.send(mbuf));
451        // We need a local queue so that read_msg doesn't just give us
452        // the same one over and over
453        let mut queue = VecDeque::new();
454        loop {
455            let msg = try!(self.read_msg());
456            if let Some(idx) = msg.headers.iter().position(|x| { x.0 == message::HEADER_FIELD_REPLY_SERIAL }) {
457                let obj = {
458                    let x = &msg.headers[idx].1;
459                    x.object.deref().clone()
460                };
461                let reply_serial : u32 = DBusDecoder::decode(obj).unwrap();
462                if reply_serial == serial {
463                    // Move our queued messages into the Connection's queue
464                    self.push_queue(&mut queue);
465                    return Ok(try!(msg.get_body()))
466                };
467            };
468            queue.push_back(msg);
469        }
470    }
471
472    fn pop_message(&self) -> Option<Message> {
473        self.queue.borrow_mut().pop_front()
474    }
475
476    fn sock_read_msg(sock: &mut StreamSocket) -> Result<Message,Error> {
477        let mut buf = Vec::new();
478
479        // Read and demarshal the fixed portion of the header
480        try!(read_exactly(sock, &mut buf, 12));
481        let mut offset = 0;
482        let mut sig = "(yyyyuu)".to_owned();
483        let header = match try!(demarshal(&mut buf, &mut offset, &mut sig)) {
484            Value::Struct(x) => x,
485            x => panic!("Demarshal didn't return what we asked for: {:?}", x)
486        };
487
488        let mut v = header.objects;
489        let mut msg : Message = Default::default();
490        let endian : u8 = DBusDecoder::decode(v.remove(0)).unwrap();
491        if endian == 'B' as u8 {
492            msg.big_endian = true;
493        }
494        msg.message_type = message::MessageType(DBusDecoder::decode(v.remove(0)).unwrap());
495        msg.flags = DBusDecoder::decode::<u8>(v.remove(0)).unwrap();
496        msg.version = DBusDecoder::decode::<u8>(v.remove(0)).unwrap();
497        let body_len = DBusDecoder::decode::<u32>(v.remove(0)).unwrap();
498        msg.serial = DBusDecoder::decode::<u32>(v.remove(0)).unwrap();
499
500        // Read array length
501        try!(read_exactly(sock, &mut buf, 4));
502        // demarshal consumes the buf, so save a copy for when we demarshal the entire array
503        let mut buf_copy = buf.clone();
504        offset = 12;
505        sig = "u".to_owned();
506        let data = demarshal(&mut buf, &mut offset, &mut sig).ok().unwrap();
507        let arr_len = DBusDecoder::decode::<u32>(data).unwrap() as usize;
508
509        // Make buf_copy big enough for the entire array, and fill it
510        buf_copy.reserve(arr_len);
511        if try!(sock.take(arr_len as u64).read_to_end(&mut buf_copy)) != arr_len {
512            return Err(Error::Disconnected);
513        };
514
515        offset = 12;
516        sig = "a(yv)".to_owned();
517        let header_fields = match try!(demarshal(&mut buf_copy, &mut offset, &mut sig)) {
518            Value::Array(x) => x,
519            x => panic!("Demarshal didn't return what we asked for: {:?}", x)
520        };
521
522        msg.headers = Vec::new();
523        for i in header_fields.objects {
524            let mut st = match i {
525                Value::Struct(x) => x,
526                x => panic!("Demarshal didn't return what we asked for: {:?}", x)
527            };
528            let val = st.objects.remove(1);
529            let code = DBusDecoder::decode::<u8>(st.objects.remove(0)).unwrap();
530            let variant = match val {
531                Value::Variant(x) => x,
532                x => panic!("Demarshal didn't return what we asked for: {:?}", x)
533            };
534            msg.headers.push(HeaderField(code, variant));
535        }
536
537        // Read the padding, if any
538        let trailing_pad = 8 - (offset % 8);
539        if trailing_pad % 8 != 0 {
540            try!(read_exactly(sock, &mut buf, trailing_pad));
541        }
542
543        // Finally, read the entire body
544        if body_len > 0 {
545            try!(read_exactly(sock, &mut msg.body, body_len as usize));
546        }
547
548        Ok(msg)
549    }
550
551    /// Blocks until a message comes in from the message bus.  The received message is returned.
552    pub fn read_msg(&self) -> Result<Message,Error> {
553        match self.pop_message() {
554            Some(m) => Ok(m),
555            _ => self.run_sock(Self::sock_read_msg)
556        }
557    }
558}
559
560#[cfg(test)]
561fn validate_connection(conn: &mut Connection) {
562    let msg = message::create_method_call("org.freedesktop.DBus", "/org/freedesktop/DBus",
563                                          "org.freedesktop.DBus", "ListNames");
564    let resp = conn.call_sync(msg).unwrap();
565    println!("ListNames: {:?}", resp);
566}
567
568#[test]
569fn test_connect_system() {
570    let mut conn = Connection::connect_system().unwrap();
571    validate_connection(&mut conn);
572}
573
574#[test]
575fn test_connect_session() {
576    let mut conn = Connection::connect_session().unwrap();
577    validate_connection(&mut conn);
578    let mut msg = message::create_method_call("org.freedesktop.DBus", "/org/freedesktop/DBus",
579                                              "org.freedesktop.DBus", "RequestName");
580    msg = msg.add_arg(&"com.test.foobar")
581             .add_arg(&(0 as u32));
582    println!("{:?}", msg);
583    let mut resp = conn.call_sync(msg).unwrap().unwrap();
584    println!("RequestName: {:?}", resp);
585    let value = resp.remove(0);
586    assert_eq!(value, Value::from(1 as u32));
587}
588
589#[test]
590fn test_tcp() {
591    let conn = Connection::connect(&env::var("DBUS_TCP_BUS_ADDRESS").unwrap()).unwrap();
592    let msg = message::create_method_call("org.freedesktop.DBus", "/org/freedesktop/DBus",
593                                          "org.freedesktop.DBus", "ListNames");
594    conn.send(msg).ok();
595    let msg = conn.read_msg().unwrap();
596    println!("{:?}", msg.body);
597    //loop {
598    //    conn.read_msg().unwrap();
599    //}
600}