rumble/bluez/adapter/
acl_stream.rs

1use std::thread;
2use std::sync::Arc;
3use std::time::Duration;
4
5use libc;
6
7use ::Result;
8
9use bluez::constants::*;
10use bluez::util::handle_error;
11
12use std::fmt;
13use std::fmt::{Debug, Formatter};
14use std::sync::mpsc::{channel, Sender, Receiver};
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Mutex;
17use bluez::protocol::hci::ACLData;
18use bluez::protocol::att;
19
20use self::StreamMessage::*;
21use api::BDAddr;
22use Error;
23use api::CommandCallback;
24use api::RequestCallback;
25use bluez::adapter::Adapter;
26use bytes::BytesMut;
27use bytes::BufMut;
28use api::NotificationHandler;
29
30enum StreamMessage  {
31    Command(Vec<u8>, Option<CommandCallback>),
32    Request(Vec<u8>, Option<RequestCallback>),
33    Data(Vec<u8>),
34}
35
36impl Debug for StreamMessage {
37    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
38        match self {
39            &Command(ref data, ref _cb) => write!(f, "Command({:?})", data),
40            &Request(ref data, ref cb) => write!(f, "Request({:?}, cb: {})", data, cb.is_some()),
41            &Data(ref data) => write!(f, "Data({:?})", data),
42        }
43    }
44}
45
46#[derive(Clone)]
47pub struct ACLStream {
48    adapter: Adapter,
49    pub address: BDAddr,
50    pub handle: u16,
51    fd: i32,
52    should_stop: Arc<AtomicBool>,
53    sender: Arc<Mutex<Sender<StreamMessage>>>,
54    notification_handlers: Arc<Mutex<Vec<NotificationHandler>>>,
55}
56
57impl ACLStream {
58    pub fn new(adapter: Adapter, address: BDAddr, handle: u16, fd: i32) -> ACLStream {
59        info!("Creating new ACLStream for {}, {}, {}", address, handle, fd);
60        let (tx, rx) = channel();
61        let acl_stream = ACLStream {
62            adapter,
63            address,
64            handle,
65            fd,
66            should_stop: Arc::new(AtomicBool::new(false)),
67            sender: Arc::new(Mutex::new(tx)),
68            notification_handlers: Arc::new(Mutex::new(vec![])),
69        };
70
71        {
72            let should_stop = acl_stream.should_stop.clone();
73            let stream = acl_stream.clone();
74            thread::spawn(move || {
75                let mut msg = rx.recv().unwrap();
76                while !should_stop.load(Ordering::Relaxed) {
77                    match stream.handle_iteration(&mut msg, &rx) {
78                        Ok(_) => msg = rx.recv().unwrap(),
79                        Err(Error::NotConnected) => {
80                            // retry message
81                            thread::sleep(Duration::from_millis(50));
82                            continue;
83                        }
84                        Err(e) => {
85                            error!("Unhandled error {}", e);
86                        }
87                    }
88                }
89
90                if let Err(err) = handle_error(unsafe { libc::close(fd) }) {
91                    warn!("Failed to close socket {}: {}", fd, err);
92                };
93            });
94        }
95
96        acl_stream
97    }
98
99    fn write_socket(&self, value: &mut [u8], command: bool,
100                    receiver: &Receiver<StreamMessage>) -> Result<Vec<u8>> {
101        debug!("writing {:?}", value);
102        handle_error(unsafe {
103            libc::write(self.fd, value.as_mut_ptr() as *mut libc::c_void, value.len()) as i32
104        })?;
105
106        let mut skipped = vec![];
107        loop {
108            let message = receiver.recv().unwrap();
109            debug!("waiting for confirmation... {:?}", message);
110            if let Data(rec) = message {
111                if rec != value {
112                    skipped.into_iter().for_each(|m|
113                        self.send(m));
114                    return Ok(rec);
115                } else if command {
116                    return Ok(vec![]);
117                }
118            } else {
119                skipped.push(message);
120            }
121        }
122    }
123
124    fn handle_iteration(&self, msg: &mut StreamMessage,
125                        receiver: &Receiver<StreamMessage>) -> Result<()> {
126        match *msg {
127            Command(ref mut value, ref handler) => {
128                debug!("sending command {:?} to {}", value, self.fd);
129
130                let result = self.write_socket(value, true, receiver)
131                    .map(|_v| ());
132                if let &Some(ref f) = handler {
133                    f(result);
134                }
135            },
136            Request(ref mut value, ref handler) => {
137                debug!("sending request {:?} to {}", value, self.fd);
138
139                let result = self.write_socket(value, false, receiver);
140                if let &Some(ref f) = handler {
141                    f(result);
142                }
143            },
144            Data(ref value) => {
145                debug!("Received data {:?}", value);
146            }
147        }
148
149        Ok(())
150    }
151
152    fn send(&self, message: StreamMessage) {
153        let l = self.sender.lock().unwrap();
154        l.send(message).unwrap();
155    }
156
157    pub fn write(&self, data: &mut [u8], handler: Option<RequestCallback>) {
158        // let mut packet = Protocol::acl(self.handle, ATT_CID, data);
159        self.send(Request(data.to_owned(), handler));
160    }
161
162    pub fn write_cmd(&self, data: &mut [u8], on_done: Option<CommandCallback>) {
163        self.send(Command(data.to_owned(), on_done));
164    }
165
166    pub fn on_notification(&self, handler: NotificationHandler) {
167        let mut list = self.notification_handlers.lock().unwrap();
168        list.push(handler);
169    }
170
171    pub fn receive(&self, message: &ACLData) {
172        debug!("receive message: {:?}", message);
173        // message.data
174        // TODO: handle partial packets
175        if message.cid == ATT_CID {
176            let value = message.data.to_vec();
177            if !value.is_empty() {
178                match value[0] {
179                    ATT_OP_EXCHANGE_MTU_REQ => {
180                        let request = att::mtu_request(&value).unwrap().1;
181                        // is the client MTU smaller than ours?
182                        if request.client_rx_mtu <= self.adapter.info.acl_mtu {
183                            debug!("sending MTU: {}", self.adapter.info.acl_mtu);
184                            // it is, send confirmation
185                            let mut buf = BytesMut::with_capacity(3);
186                            buf.put_u8(ATT_OP_EXCHANGE_MTU_RESP);
187                            buf.put_u16_le(self.adapter.info.acl_mtu);
188                            self.write_cmd(&mut buf, None);
189                        } else {
190                            // TODO: reduce our MTU to client's
191                            error!("client's MTU is larger than ours");
192                            self.write_cmd(&mut [0x01, 0x02, 0x00, 0x00, 0x06], None);
193                        }
194                    }
195                    ATT_OP_VALUE_NOTIFICATION => {
196                        debug!("value notification: {:?}", value);
197                        match att::value_notification(&value) {
198                            Ok(notification) => {
199                                let handlers = self.notification_handlers.lock().unwrap();
200                                handlers.iter().for_each(|h| h(notification.1.clone()));
201                            }
202                            Err(err) => {
203                                error!("failed to parse notification: {:?}", err);
204                            }
205                        }
206                    }
207                    _ => {
208                        self.send(Data(value));
209                    }
210                }
211            }
212        }
213    }
214}
215
216impl Drop for ACLStream {
217    fn drop(&mut self) {
218        self.should_stop.clone().store(true, Ordering::Relaxed);
219    }
220}