qrpc_sdk/socket.rs
1//! Internal abstraction over the Rpc socket
2//!
3//! The protocol uses TCP as a transport, meaning that when sending
4//! messages, they need to be framed. The `builder` abstraction takes
5//! care of this! Do not manually frame your messages!
6
7use crate::{
8 error::{RpcError, RpcResult},
9 io::{self, Message},
10};
11use async_std::{
12 channel::{bounded, Receiver, Sender},
13 future,
14 net::{TcpListener, TcpStream},
15 stream::StreamExt,
16 sync::{Arc, Mutex},
17 task,
18};
19use identity::Identity;
20use std::{
21 collections::BTreeMap,
22 net::Shutdown,
23 sync::atomic::{AtomicBool, Ordering},
24 time::Duration,
25};
26
27type Lock<T> = Arc<Mutex<T>>;
28
29/// Return the default bind location for the qrpc broker socket
30pub fn default_socket_path() -> (&'static str, u16) {
31 ("localhost", 10222)
32}
33
34/// Bi-directional socket connection to a qrpc bus system
35///
36/// A connection is always between a component on the bus, and the
37/// broker. The broker listens to incoming connections, and relays
38/// them. A component (service, or utility library) can either
39/// operate only in sending mode, or listen as well, so that it can be
40/// used as a dependency by other services. The sending socket is
41/// used as a listener, meaning that no specific port needs to be
42/// bound for a service.
43///
44/// When using the `server(...)` constructor you bind a port, when
45/// attaching a lambda via `listen(...)` you use the established
46/// connection. In your service code there is no reason to ever use
47/// `server(...)`!
48///
49/// When sending a message, the socket will listen for a reply from
50/// the broker on the sending stream, to make sure that return data is
51/// properly associated. You can control the timeout via the
52/// `connect_timeout` function.
53pub struct RpcSocket {
54 stream: Option<TcpStream>,
55 listen: Option<Arc<TcpListener>>,
56 running: AtomicBool,
57 listening: AtomicBool,
58 wfm: Lock<BTreeMap<Identity, Sender<Message>>>,
59 inc_io: (Sender<Message>, Receiver<Message>),
60 timeout: Duration,
61}
62
63impl RpcSocket {
64 /// Create a client socket that connects to a remote broker
65 pub async fn connect(addr: &str, port: u16) -> RpcResult<Arc<Self>> {
66 Self::connect_timeout(addr, port, Duration::from_secs(5)).await
67 }
68
69 /// Create a client socket with an explicit timeout
70 pub async fn connect_timeout(addr: &str, port: u16, timeout: Duration) -> RpcResult<Arc<Self>> {
71 let stream = TcpStream::connect(&format!("{}:{}", addr, port)).await?;
72
73 let _self = Arc::new(Self {
74 stream: Some(stream),
75 listen: None,
76 running: true.into(),
77 listening: false.into(),
78 wfm: Default::default(),
79 inc_io: bounded(4),
80 timeout,
81 });
82
83 _self.spawn_incoming();
84 Ok(_self)
85 }
86
87 /// Attach a permanent listener to the sending stream
88 pub async fn listen<F: Fn(Message) + Send + 'static>(self: &Arc<Self>, cb: F) {
89 let _self = Arc::clone(self);
90 _self.listening.swap(true, Ordering::Relaxed);
91 task::spawn(async move {
92 while let Ok(msg) = _self.inc_io.1.recv().await {
93 cb(msg);
94 }
95 });
96 }
97
98 /// Bind a socket to listen for connections
99 ///
100 /// This function is primarily used by the qrpc-broker and should
101 /// not be used in your service code. To listen for incoming
102 /// connections on the outgoing stream (meaning client side), use
103 /// `listen(...)`
104 pub async fn server<F, D>(addr: &str, port: u16, cb: F, data: D) -> RpcResult<Arc<Self>>
105 where
106 F: Fn(TcpStream, D) + Send + Copy + 'static,
107 D: Send + Sync + Clone + 'static,
108 {
109 info!("Opening qrpc socket on {}:{}", addr, port);
110 let listen = Arc::new(TcpListener::bind(format!("{}:{}", addr, port)).await?);
111 let _self = Arc::new(Self {
112 stream: None,
113 listen: Some(listen),
114 running: true.into(),
115 listening: true.into(),
116 wfm: Default::default(),
117 inc_io: bounded(4),
118 timeout: Duration::from_secs(5),
119 });
120
121 let s = Arc::clone(&_self);
122 task::spawn(async move {
123 let mut inc = s.listen.as_ref().unwrap().incoming();
124 while let Some(Ok(stream)) = inc.next().await {
125 if !s.running() {
126 break;
127 }
128
129 debug!("New incoming qrpc connection! ({:?})", stream.peer_addr());
130 let d = data.clone();
131 task::spawn(async move { cb(stream, d) });
132 }
133
134 info!("Terminating rpc accept loop...");
135 });
136
137 Ok(_self)
138 }
139
140 /// Handle the incoming side of the stream connection
141 ///
142 /// When acting as a server this is simple: all messages can be
143 /// received at the same point, spawning tasks for each connection
144 /// to not mix things up. On the client side this is harder. We
145 /// need to listen for incoming messages after sending one, so
146 /// that we can handle the return data. But we also need to
147 /// generally handle incoming messages. To avoid having to peek
148 /// into the socket periodically to check if a message has
149 /// arrived, this mechanism uses boundeds, and an enum type to
150 /// associate message IDs.
151 fn spawn_incoming(self: &Arc<Self>) {
152 let _self = Arc::clone(self);
153 task::spawn(async move {
154 let mut sock = _self.stream.clone().unwrap();
155 while _self.running.load(Ordering::Relaxed) {
156 let msg = match io::recv(&mut sock).await {
157 Ok(msg) => msg,
158 Err(e) => {
159 task::sleep(std::time::Duration::from_millis(10)).await;
160 error!("Failed reading message: {}", e.to_string());
161 continue;
162 }
163 };
164
165 let id = msg.id;
166 let mut wfm = _self.wfm.lock().await;
167 match wfm.get(&id) {
168 Some(sender) => sender.send(msg).await.unwrap(),
169 None => _self.inc_io.0.send(msg).await.unwrap(),
170 }
171
172 wfm.remove(&id);
173 }
174 });
175 }
176
177 /// Send a message as a reply to a recipient
178 pub async fn reply(self: &Arc<Self>, msg: Message) -> RpcResult<()> {
179 let mut s = self.stream.clone().unwrap();
180 io::send(&mut s, msg).await
181 }
182
183 /// Send a message to the other side of this stream
184 ///
185 /// This function is meant to be used by qrpc clients that only
186 /// have a single connection stream to the broker. If you wanted
187 /// to write an alternative message broker, you have to use the
188 /// [`io`] utilities directly (as the `qrpc-broker` crate does)!
189 ///
190 /// After sending a message this function will wait for a reply
191 /// and parse the message for you. You must provide a conversion
192 /// lambda so that the types can be extracted from the message
193 /// type that the SDK receives.
194 ///
195 /// [`io`]: ./io/index.html
196 pub async fn send<T, F>(self: &Arc<Self>, msg: Message, convert: F) -> RpcResult<T>
197 where
198 F: Fn(Message) -> RpcResult<T>,
199 {
200 // Insert a receive hook for the message we are about to send
201 let id = msg.id;
202 let (tx, rx) = bounded(1);
203 self.wfm.lock().await.insert(id, tx);
204
205 // Send off the message...
206 let mut s = self.stream.clone().unwrap();
207 io::send(&mut s, msg).await?;
208
209 // Wait for a reply
210 future::timeout(self.timeout, async move {
211 match rx.recv().await {
212 Ok(msg) => convert(msg),
213 Err(_) => Err(RpcError::ConnectionFault(
214 "No message with matching ID received!".into(),
215 )),
216 }
217 })
218 .await?
219 }
220
221 /// Terminate all workers associated with this socket
222 pub fn shutdown(self: &Arc<Self>) {
223 self.running.swap(false, Ordering::Relaxed);
224 if let Some(ref s) = self.stream {
225 s.shutdown(Shutdown::Both).unwrap();
226 }
227 }
228
229 /// Get the current running state
230 pub fn running(&self) -> bool {
231 self.running.load(Ordering::Relaxed)
232 }
233
234 /// Get the current listening state
235 pub fn listening(&self) -> bool {
236 self.listening.load(Ordering::Relaxed)
237 }
238}