1use self::frame::Frame;
2use std::error::Error;
3use std::io::{Read, Write};
4use std::net::{Shutdown, TcpStream, ToSocketAddrs};
5use std::sync::mpsc::{channel, Receiver, Sender};
6use std::thread::{spawn, JoinHandle};
7use std::time::Duration;
8
9pub mod frame;
10pub mod header;
11
12const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
13
14pub struct Connection {
16 tcp_stream: TcpStream,
17 receive_thread: Option<JoinHandle<()>>,
18 host_header: String,
19 pub frame_receiver: Receiver<Result<Frame, String>>, }
21
22impl Connection {
23 pub fn authenticate(&mut self, username: &str, password: &str) -> Result<(), Box<dyn Error>> {
25 let headers = vec![
26 ("accept-version", "1.2"),
27 ("host", self.host_header.as_str()),
28 ("heart-beat", "0,0"), ("login", username),
30 ("passcode", password),
31 ];
32
33 let frame = frame::create("CONNECT", Some(headers), None);
34
35 self.tcp_stream.write_all(frame.as_bytes())?;
36
37 Ok(())
38 }
39
40 pub fn subscribe(&mut self, identifier: u32, topic: &str) -> Result<(), Box<dyn Error>> {
42 let id = identifier.to_string();
43
44 let headers = vec![
45 ("id", id.as_str()),
46 ("destination", topic),
47 ("ack", "auto"), ];
49
50 let frame = frame::create("SUBSCRIBE", Some(headers), None);
51
52 self.tcp_stream.write_all(frame.as_bytes())?;
53
54 Ok(())
55 }
56
57 pub fn wait(&mut self) -> Result<(), Box<dyn Error>> {
59 if self.receive_thread.is_none() {
61 return Ok(());
62 }
63
64 let result = self.receive_thread.take().unwrap().join();
66 if result.is_err() {
67 return Err("Unable to join receive thread".into());
68 }
69
70 Ok(())
71 }
72
73 pub fn close(&mut self) -> Result<(), Box<dyn Error>> {
75 self.tcp_stream.shutdown(Shutdown::Both)?;
76
77 self.wait()?;
78
79 Ok(())
80 }
81}
82
83pub fn open(
85 host: &str,
86 port: u16,
87 timeout: Option<Duration>,
88) -> Result<Connection, Box<dyn Error>> {
89 let address = format!("{}:{}", host, port)
91 .to_socket_addrs()?
92 .last()
93 .expect(format!("Unable to convert '{}:{}' to socket address", host, port).as_str());
94
95 let tcp_stream = TcpStream::connect_timeout(&address, timeout.unwrap_or(DEFAULT_TIMEOUT))?;
97
98 tcp_stream.set_nodelay(true)?;
100 tcp_stream.set_write_timeout(timeout.or(Some(DEFAULT_TIMEOUT)))?;
101
102 let (frame_sender, frame_receiver) = channel();
103
104 let tcp_stream_clone = tcp_stream.try_clone()?;
106 let receive_thread = spawn(move || {
107 let result = receive_bytes(tcp_stream_clone, frame_sender); if result.is_err() {
110 let reason = result.err().unwrap_or("Unknown error".into()).to_string();
111 panic!("Unable to receive bytes: {}", reason);
112 }
113 });
114
115 Ok(Connection {
117 tcp_stream,
118 receive_thread: Some(receive_thread),
119 host_header: host.to_string(),
120 frame_receiver,
121 })
122}
123
124fn receive_bytes(
126 mut tcp_stream: TcpStream,
127 frame_sender: Sender<Result<Frame, String>>,
128) -> Result<(), Box<dyn Error>> {
129 let mut receive_buffer = [0; 4096]; let mut pending_data: Vec<u8> = Vec::new(); loop {
133 let received_byte_count = tcp_stream.read(&mut receive_buffer)?;
135 if received_byte_count == 0 {
136 return Ok(()); }
138
139 pending_data.extend_from_slice(&receive_buffer[..received_byte_count]);
141
142 while let Some((frame, end_position)) = frame::parse(&mut pending_data)? {
144 pending_data.drain(..end_position + 1);
145 frame_sender.send(Ok(frame))?;
146 }
147 }
148}
149
150