viral32111_stomp/
lib.rs

1use self::frame::Frame;
2use std::error::Error;
3use std::io::{Read, Write};
4use std::net::{Shutdown, TcpStream, ToSocketAddrs};
5use std::sync::mpsc::{channel, Receiver, Sender};
6use std::thread::{spawn, JoinHandle};
7use std::time::Duration;
8
9pub mod frame;
10pub mod header;
11
12const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
13
14/// Represents a connection to a STOMP server.
15pub struct Connection {
16	tcp_stream: TcpStream,
17	receive_thread: Option<JoinHandle<()>>,
18	host_header: String,
19	pub frame_receiver: Receiver<Result<Frame, String>>, // String instead of Box<dyn Error> as the latter doesn't implement Send trait
20}
21
22impl Connection {
23	// Sends the CONNECT frame to the STOMP server.
24	pub fn authenticate(&mut self, username: &str, password: &str) -> Result<(), Box<dyn Error>> {
25		let headers = vec![
26			("accept-version", "1.2"),
27			("host", self.host_header.as_str()),
28			("heart-beat", "0,0"), // TODO: Implement heart-beating
29			("login", username),
30			("passcode", password),
31		];
32
33		let frame = frame::create("CONNECT", Some(headers), None);
34
35		self.tcp_stream.write_all(frame.as_bytes())?;
36
37		Ok(())
38	}
39
40	/// Subscribes to a topic on the STOMP server.
41	pub fn subscribe(&mut self, identifier: u32, topic: &str) -> Result<(), Box<dyn Error>> {
42		let id = identifier.to_string();
43
44		let headers = vec![
45			("id", id.as_str()),
46			("destination", topic),
47			("ack", "auto"), // TODO: Implement acknowledgements
48		];
49
50		let frame = frame::create("SUBSCRIBE", Some(headers), None);
51
52		self.tcp_stream.write_all(frame.as_bytes())?;
53
54		Ok(())
55	}
56
57	/// Waits for the connection to close.
58	pub fn wait(&mut self) -> Result<(), Box<dyn Error>> {
59		// Don't bother if the thread no longer exists
60		if self.receive_thread.is_none() {
61			return Ok(());
62		}
63
64		// Yoink the thread handle & wait for it to finish
65		let result = self.receive_thread.take().unwrap().join();
66		if result.is_err() {
67			return Err("Unable to join receive thread".into());
68		}
69
70		Ok(())
71	}
72
73	/// Closes the connection to the STOMP server.
74	pub fn close(&mut self) -> Result<(), Box<dyn Error>> {
75		self.tcp_stream.shutdown(Shutdown::Both)?;
76
77		self.wait()?;
78
79		Ok(())
80	}
81}
82
83/// Establishes a connection to a STOMP server.
84pub fn open(
85	host: &str,
86	port: u16,
87	timeout: Option<Duration>,
88) -> Result<Connection, Box<dyn Error>> {
89	// Convert the host name & port number into a usable socket address
90	let address = format!("{}:{}", host, port)
91		.to_socket_addrs()?
92		.last()
93		.expect(format!("Unable to convert '{}:{}' to socket address", host, port).as_str());
94
95	// Open a TCP stream to the this address
96	let tcp_stream = TcpStream::connect_timeout(&address, timeout.unwrap_or(DEFAULT_TIMEOUT))?;
97
98	// Configure this stream
99	tcp_stream.set_nodelay(true)?;
100	tcp_stream.set_write_timeout(timeout.or(Some(DEFAULT_TIMEOUT)))?;
101
102	let (frame_sender, frame_receiver) = channel();
103
104	// Spawn a thread to listen for incoming bytes
105	let tcp_stream_clone = tcp_stream.try_clone()?;
106	let receive_thread = spawn(move || {
107		let result = receive_bytes(tcp_stream_clone, frame_sender); // Blocks until the TCP stream is closed
108
109		if result.is_err() {
110			let reason = result.err().unwrap_or("Unknown error".into()).to_string();
111			panic!("Unable to receive bytes: {}", reason);
112		}
113	});
114
115	// Give the caller a handle to this connection
116	Ok(Connection {
117		tcp_stream,
118		receive_thread: Some(receive_thread),
119		host_header: host.to_string(),
120		frame_receiver,
121	})
122}
123
124/// Continuously waits for bytes from the STOMP server.
125fn receive_bytes(
126	mut tcp_stream: TcpStream,
127	frame_sender: Sender<Result<Frame, String>>,
128) -> Result<(), Box<dyn Error>> {
129	let mut receive_buffer = [0; 4096]; // 4 KiB
130	let mut pending_data: Vec<u8> = Vec::new(); // Infinite
131
132	loop {
133		// Try to receive some bytes
134		let received_byte_count = tcp_stream.read(&mut receive_buffer)?;
135		if received_byte_count == 0 {
136			return Ok(()); // Give up, there's nothing left to receive
137		}
138
139		// Append the received bytes to the unprocessed data
140		pending_data.extend_from_slice(&receive_buffer[..received_byte_count]);
141
142		// Remove any complete frames from the unprocessed data
143		while let Some((frame, end_position)) = frame::parse(&mut pending_data)? {
144			pending_data.drain(..end_position + 1);
145			frame_sender.send(Ok(frame))?;
146		}
147	}
148}
149
150/*
151#[cfg(test)]
152mod tests {
153	use super::*;
154
155	#[test]
156	fn it_works() {
157		let result = add(2, 2);
158		assert_eq!(result, 4);
159	}
160}
161*/