1use std::thread;
2use std::sync::Arc;
3use std::time::Duration;
4
5use libc;
6
7use ::Result;
8
9use bluez::constants::*;
10use bluez::util::handle_error;
11
12use std::fmt;
13use std::fmt::{Debug, Formatter};
14use std::sync::mpsc::{channel, Sender, Receiver};
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Mutex;
17use bluez::protocol::hci::ACLData;
18use bluez::protocol::att;
19
20use self::StreamMessage::*;
21use api::BDAddr;
22use Error;
23use api::CommandCallback;
24use api::RequestCallback;
25use bluez::adapter::Adapter;
26use bytes::BytesMut;
27use bytes::BufMut;
28use api::NotificationHandler;
29
30enum StreamMessage {
31 Command(Vec<u8>, Option<CommandCallback>),
32 Request(Vec<u8>, Option<RequestCallback>),
33 Data(Vec<u8>),
34}
35
36impl Debug for StreamMessage {
37 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
38 match self {
39 &Command(ref data, ref _cb) => write!(f, "Command({:?})", data),
40 &Request(ref data, ref cb) => write!(f, "Request({:?}, cb: {})", data, cb.is_some()),
41 &Data(ref data) => write!(f, "Data({:?})", data),
42 }
43 }
44}
45
46#[derive(Clone)]
47pub struct ACLStream {
48 adapter: Adapter,
49 pub address: BDAddr,
50 pub handle: u16,
51 fd: i32,
52 should_stop: Arc<AtomicBool>,
53 sender: Arc<Mutex<Sender<StreamMessage>>>,
54 notification_handlers: Arc<Mutex<Vec<NotificationHandler>>>,
55}
56
57impl ACLStream {
58 pub fn new(adapter: Adapter, address: BDAddr, handle: u16, fd: i32) -> ACLStream {
59 info!("Creating new ACLStream for {}, {}, {}", address, handle, fd);
60 let (tx, rx) = channel();
61 let acl_stream = ACLStream {
62 adapter,
63 address,
64 handle,
65 fd,
66 should_stop: Arc::new(AtomicBool::new(false)),
67 sender: Arc::new(Mutex::new(tx)),
68 notification_handlers: Arc::new(Mutex::new(vec![])),
69 };
70
71 {
72 let should_stop = acl_stream.should_stop.clone();
73 let stream = acl_stream.clone();
74 thread::spawn(move || {
75 let mut msg = rx.recv().unwrap();
76 while !should_stop.load(Ordering::Relaxed) {
77 match stream.handle_iteration(&mut msg, &rx) {
78 Ok(_) => msg = rx.recv().unwrap(),
79 Err(Error::NotConnected) => {
80 thread::sleep(Duration::from_millis(50));
82 continue;
83 }
84 Err(e) => {
85 error!("Unhandled error {}", e);
86 }
87 }
88 }
89
90 if let Err(err) = handle_error(unsafe { libc::close(fd) }) {
91 warn!("Failed to close socket {}: {}", fd, err);
92 };
93 });
94 }
95
96 acl_stream
97 }
98
99 fn write_socket(&self, value: &mut [u8], command: bool,
100 receiver: &Receiver<StreamMessage>) -> Result<Vec<u8>> {
101 debug!("writing {:?}", value);
102 handle_error(unsafe {
103 libc::write(self.fd, value.as_mut_ptr() as *mut libc::c_void, value.len()) as i32
104 })?;
105
106 let mut skipped = vec![];
107 loop {
108 let message = receiver.recv().unwrap();
109 debug!("waiting for confirmation... {:?}", message);
110 if let Data(rec) = message {
111 if rec != value {
112 skipped.into_iter().for_each(|m|
113 self.send(m));
114 return Ok(rec);
115 } else if command {
116 return Ok(vec![]);
117 }
118 } else {
119 skipped.push(message);
120 }
121 }
122 }
123
124 fn handle_iteration(&self, msg: &mut StreamMessage,
125 receiver: &Receiver<StreamMessage>) -> Result<()> {
126 match *msg {
127 Command(ref mut value, ref handler) => {
128 debug!("sending command {:?} to {}", value, self.fd);
129
130 let result = self.write_socket(value, true, receiver)
131 .map(|_v| ());
132 if let &Some(ref f) = handler {
133 f(result);
134 }
135 },
136 Request(ref mut value, ref handler) => {
137 debug!("sending request {:?} to {}", value, self.fd);
138
139 let result = self.write_socket(value, false, receiver);
140 if let &Some(ref f) = handler {
141 f(result);
142 }
143 },
144 Data(ref value) => {
145 debug!("Received data {:?}", value);
146 }
147 }
148
149 Ok(())
150 }
151
152 fn send(&self, message: StreamMessage) {
153 let l = self.sender.lock().unwrap();
154 l.send(message).unwrap();
155 }
156
157 pub fn write(&self, data: &mut [u8], handler: Option<RequestCallback>) {
158 self.send(Request(data.to_owned(), handler));
160 }
161
162 pub fn write_cmd(&self, data: &mut [u8], on_done: Option<CommandCallback>) {
163 self.send(Command(data.to_owned(), on_done));
164 }
165
166 pub fn on_notification(&self, handler: NotificationHandler) {
167 let mut list = self.notification_handlers.lock().unwrap();
168 list.push(handler);
169 }
170
171 pub fn receive(&self, message: &ACLData) {
172 debug!("receive message: {:?}", message);
173 if message.cid == ATT_CID {
176 let value = message.data.to_vec();
177 if !value.is_empty() {
178 match value[0] {
179 ATT_OP_EXCHANGE_MTU_REQ => {
180 let request = att::mtu_request(&value).unwrap().1;
181 if request.client_rx_mtu <= self.adapter.info.acl_mtu {
183 debug!("sending MTU: {}", self.adapter.info.acl_mtu);
184 let mut buf = BytesMut::with_capacity(3);
186 buf.put_u8(ATT_OP_EXCHANGE_MTU_RESP);
187 buf.put_u16_le(self.adapter.info.acl_mtu);
188 self.write_cmd(&mut buf, None);
189 } else {
190 error!("client's MTU is larger than ours");
192 self.write_cmd(&mut [0x01, 0x02, 0x00, 0x00, 0x06], None);
193 }
194 }
195 ATT_OP_VALUE_NOTIFICATION => {
196 debug!("value notification: {:?}", value);
197 match att::value_notification(&value) {
198 Ok(notification) => {
199 let handlers = self.notification_handlers.lock().unwrap();
200 handlers.iter().for_each(|h| h(notification.1.clone()));
201 }
202 Err(err) => {
203 error!("failed to parse notification: {:?}", err);
204 }
205 }
206 }
207 _ => {
208 self.send(Data(value));
209 }
210 }
211 }
212 }
213 }
214}
215
216impl Drop for ACLStream {
217 fn drop(&mut self) {
218 self.should_stop.clone().store(true, Ordering::Relaxed);
219 }
220}