qrpc_sdk/
socket.rs

1//! Internal abstraction over the Rpc socket
2//!
3//! The protocol uses TCP as a transport, meaning that when sending
4//! messages, they need to be framed.  The `builder` abstraction takes
5//! care of this!  Do not manually frame your messages!
6
7use crate::{
8    error::{RpcError, RpcResult},
9    io::{self, Message},
10};
11use async_std::{
12    channel::{bounded, Receiver, Sender},
13    future,
14    net::{TcpListener, TcpStream},
15    stream::StreamExt,
16    sync::{Arc, Mutex},
17    task,
18};
19use identity::Identity;
20use std::{
21    collections::BTreeMap,
22    net::Shutdown,
23    sync::atomic::{AtomicBool, Ordering},
24    time::Duration,
25};
26
27type Lock<T> = Arc<Mutex<T>>;
28
29/// Return the default bind location for the qrpc broker socket
30pub fn default_socket_path() -> (&'static str, u16) {
31    ("localhost", 10222)
32}
33
34/// Bi-directional socket connection to a qrpc bus system
35///
36/// A connection is always between a component on the bus, and the
37/// broker.  The broker listens to incoming connections, and relays
38/// them.  A component (service, or utility library) can either
39/// operate only in sending mode, or listen as well, so that it can be
40/// used as a dependency by other services.  The sending socket is
41/// used as a listener, meaning that no specific port needs to be
42/// bound for a service.
43///
44/// When using the `server(...)` constructor you bind a port, when
45/// attaching a lambda via `listen(...)` you use the established
46/// connection.  In your service code there is no reason to ever use
47/// `server(...)`!
48///
49/// When sending a message, the socket will listen for a reply from
50/// the broker on the sending stream, to make sure that return data is
51/// properly associated.  You can control the timeout via the
52/// `connect_timeout` function.
53pub struct RpcSocket {
54    stream: Option<TcpStream>,
55    listen: Option<Arc<TcpListener>>,
56    running: AtomicBool,
57    listening: AtomicBool,
58    wfm: Lock<BTreeMap<Identity, Sender<Message>>>,
59    inc_io: (Sender<Message>, Receiver<Message>),
60    timeout: Duration,
61}
62
63impl RpcSocket {
64    /// Create a client socket that connects to a remote broker
65    pub async fn connect(addr: &str, port: u16) -> RpcResult<Arc<Self>> {
66        Self::connect_timeout(addr, port, Duration::from_secs(5)).await
67    }
68
69    /// Create a client socket with an explicit timeout
70    pub async fn connect_timeout(addr: &str, port: u16, timeout: Duration) -> RpcResult<Arc<Self>> {
71        let stream = TcpStream::connect(&format!("{}:{}", addr, port)).await?;
72
73        let _self = Arc::new(Self {
74            stream: Some(stream),
75            listen: None,
76            running: true.into(),
77            listening: false.into(),
78            wfm: Default::default(),
79            inc_io: bounded(4),
80            timeout,
81        });
82
83        _self.spawn_incoming();
84        Ok(_self)
85    }
86
87    /// Attach a permanent listener to the sending stream
88    pub async fn listen<F: Fn(Message) + Send + 'static>(self: &Arc<Self>, cb: F) {
89        let _self = Arc::clone(self);
90        _self.listening.swap(true, Ordering::Relaxed);
91        task::spawn(async move {
92            while let Ok(msg) = _self.inc_io.1.recv().await {
93                cb(msg);
94            }
95        });
96    }
97
98    /// Bind a socket to listen for connections
99    ///
100    /// This function is primarily used by the qrpc-broker and should
101    /// not be used in your service code.  To listen for incoming
102    /// connections on the outgoing stream (meaning client side), use
103    /// `listen(...)`
104    pub async fn server<F, D>(addr: &str, port: u16, cb: F, data: D) -> RpcResult<Arc<Self>>
105    where
106        F: Fn(TcpStream, D) + Send + Copy + 'static,
107        D: Send + Sync + Clone + 'static,
108    {
109        info!("Opening qrpc socket on {}:{}", addr, port);
110        let listen = Arc::new(TcpListener::bind(format!("{}:{}", addr, port)).await?);
111        let _self = Arc::new(Self {
112            stream: None,
113            listen: Some(listen),
114            running: true.into(),
115            listening: true.into(),
116            wfm: Default::default(),
117            inc_io: bounded(4),
118            timeout: Duration::from_secs(5),
119        });
120
121        let s = Arc::clone(&_self);
122        task::spawn(async move {
123            let mut inc = s.listen.as_ref().unwrap().incoming();
124            while let Some(Ok(stream)) = inc.next().await {
125                if !s.running() {
126                    break;
127                }
128
129                debug!("New incoming qrpc connection! ({:?})", stream.peer_addr());
130                let d = data.clone();
131                task::spawn(async move { cb(stream, d) });
132            }
133
134            info!("Terminating rpc accept loop...");
135        });
136
137        Ok(_self)
138    }
139
140    /// Handle the incoming side of the stream connection
141    ///
142    /// When acting as a server this is simple: all messages can be
143    /// received at the same point, spawning tasks for each connection
144    /// to not mix things up.  On the client side this is harder.  We
145    /// need to listen for incoming messages after sending one, so
146    /// that we can handle the return data.  But we also need to
147    /// generally handle incoming messages.  To avoid having to peek
148    /// into the socket periodically to check if a message has
149    /// arrived, this mechanism uses boundeds, and an enum type to
150    /// associate message IDs.
151    fn spawn_incoming(self: &Arc<Self>) {
152        let _self = Arc::clone(self);
153        task::spawn(async move {
154            let mut sock = _self.stream.clone().unwrap();
155            while _self.running.load(Ordering::Relaxed) {
156                let msg = match io::recv(&mut sock).await {
157                    Ok(msg) => msg,
158                    Err(e) => {
159                        task::sleep(std::time::Duration::from_millis(10)).await;
160                        error!("Failed reading message: {}", e.to_string());
161                        continue;
162                    }
163                };
164
165                let id = msg.id;
166                let mut wfm = _self.wfm.lock().await;
167                match wfm.get(&id) {
168                    Some(sender) => sender.send(msg).await.unwrap(),
169                    None => _self.inc_io.0.send(msg).await.unwrap(),
170                }
171
172                wfm.remove(&id);
173            }
174        });
175    }
176
177    /// Send a message as a reply to a recipient
178    pub async fn reply(self: &Arc<Self>, msg: Message) -> RpcResult<()> {
179        let mut s = self.stream.clone().unwrap();
180        io::send(&mut s, msg).await
181    }
182
183    /// Send a message to the other side of this stream
184    ///
185    /// This function is meant to be used by qrpc clients that only
186    /// have a single connection stream to the broker.  If you wanted
187    /// to write an alternative message broker, you have to use the
188    /// [`io`] utilities directly (as the `qrpc-broker` crate does)!
189    ///
190    /// After sending a message this function will wait for a reply
191    /// and parse the message for you.  You must provide a conversion
192    /// lambda so that the types can be extracted from the message
193    /// type that the SDK receives.
194    ///
195    /// [`io`]: ./io/index.html
196    pub async fn send<T, F>(self: &Arc<Self>, msg: Message, convert: F) -> RpcResult<T>
197    where
198        F: Fn(Message) -> RpcResult<T>,
199    {
200        // Insert a receive hook for the message we are about to send
201        let id = msg.id;
202        let (tx, rx) = bounded(1);
203        self.wfm.lock().await.insert(id, tx);
204
205        // Send off the message...
206        let mut s = self.stream.clone().unwrap();
207        io::send(&mut s, msg).await?;
208
209        // Wait for a reply
210        future::timeout(self.timeout, async move {
211            match rx.recv().await {
212                Ok(msg) => convert(msg),
213                Err(_) => Err(RpcError::ConnectionFault(
214                    "No message with matching ID received!".into(),
215                )),
216            }
217        })
218        .await?
219    }
220
221    /// Terminate all workers associated with this socket
222    pub fn shutdown(self: &Arc<Self>) {
223        self.running.swap(false, Ordering::Relaxed);
224        if let Some(ref s) = self.stream {
225            s.shutdown(Shutdown::Both).unwrap();
226        }
227    }
228
229    /// Get the current running state
230    pub fn running(&self) -> bool {
231        self.running.load(Ordering::Relaxed)
232    }
233
234    /// Get the current listening state
235    pub fn listening(&self) -> bool {
236        self.listening.load(Ordering::Relaxed)
237    }
238}