abci/async_api/
server.rs

1use std::{io::Result, sync::Arc};
2
3#[cfg(all(unix, feature = "use-async-std"))]
4use async_std::os::unix::net::UnixListener;
5#[cfg(feature = "use-async-std")]
6use async_std::{
7    io::{Read, Write},
8    net::TcpListener,
9    sync::Mutex,
10    task::spawn,
11};
12#[cfg(all(unix, feature = "use-smol"))]
13use smol::net::unix::UnixListener;
14#[cfg(feature = "use-smol")]
15use smol::{
16    io::{AsyncRead as Read, AsyncWrite as Write},
17    lock::Mutex,
18    net::TcpListener,
19    spawn,
20};
21use tendermint_proto::abci::{Request, Response};
22#[cfg(all(unix, feature = "use-tokio"))]
23use tokio::net::UnixListener;
24#[cfg(feature = "use-tokio")]
25use tokio::{
26    io::{AsyncRead as Read, AsyncWrite as Write},
27    net::TcpListener,
28    spawn,
29    sync::Mutex,
30};
31use tracing::{debug, error, info, instrument};
32
33use crate::{
34    address::Address,
35    async_api::{Consensus, Info, Mempool, Snapshot},
36    handler::*,
37    state::ConsensusStateValidator,
38    stream_split::StreamSplit,
39    tasks::*,
40    utils::{get_stream_pair, ConnectionType, StreamReader, StreamWriter},
41};
42
43macro_rules! spawn {
44    ($expr: expr) => {
45        cfg_if::cfg_if! {
46            if #[cfg(any(feature = "use-async-std", feature = "use-tokio"))] {
47                spawn($expr)
48            } else if #[cfg(feature = "use-smol")] {
49                spawn($expr)
50                    .detach()
51            } else {
52                unreachable!()
53            }
54        }
55    };
56}
57
58/// ABCI Server
59pub struct Server<C, M, I, S>
60where
61    C: Consensus + 'static,
62    M: Mempool + 'static,
63    I: Info + 'static,
64    S: Snapshot + 'static,
65{
66    /// Wrapping inner type in `Arc` so that it becomes clonable and can be shared between multiple
67    /// async tasks
68    inner: Arc<Inner<C, M, I, S>>,
69}
70
71impl<C, M, I, S> Server<C, M, I, S>
72where
73    C: Consensus + 'static,
74    M: Mempool + 'static,
75    I: Info + 'static,
76    S: Snapshot + 'static,
77{
78    /// Creates a new instance of [`Server`](self::Server)
79    pub fn new(consensus: C, mempool: M, info: I, snapshot: S) -> Self {
80        Self {
81            inner: Arc::new(Inner::new(consensus, mempool, info, snapshot)),
82        }
83    }
84
85    /// Starts ABCI server
86    ///
87    /// # Note
88    ///
89    /// This is an `async` function and returns a `Future`. So, you'll need an executor to drive the `Future` returned
90    /// from this function. `async-std` and `tokio` are two popular options.
91    pub async fn run<T>(&self, addr: T) -> Result<()>
92    where
93        T: Into<Address>,
94    {
95        let addr = addr.into();
96
97        match addr {
98            Address::Tcp(addr) => {
99                let listener = TcpListener::bind(addr).await?;
100                info!(message = "Started ABCI server at", %addr);
101
102                loop {
103                    let (stream, peer_addr) = listener.accept().await?;
104                    self.handle_connection(stream, peer_addr.to_string());
105                }
106            }
107            #[cfg(unix)]
108            Address::Uds(path) => {
109                cfg_if::cfg_if! {
110                    if #[cfg(feature = "use-async-std")] {
111                        let listener = UnixListener::bind(&path).await?;
112                    } else if #[cfg(any(feature = "use-smol", feature = "use-tokio"))] {
113                        let listener = UnixListener::bind(&path)?;
114                    } else {
115                        unreachable!()
116                    }
117                }
118
119                info!(message = "Started ABCI server at", path = %path.display());
120
121                loop {
122                    let (stream, peer_addr) = listener.accept().await?;
123                    self.handle_connection(stream, format!("{:?}", peer_addr));
124                }
125            }
126            #[cfg(test)]
127            Address::Mock(mut listener) => {
128                while let Ok(stream) = listener.accept().await {
129                    self.handle_connection(stream, "test_peer".to_string());
130                }
131
132                Ok(())
133            }
134        }
135    }
136
137    #[instrument(skip(self, stream))]
138    pub(crate) fn handle_connection<D>(&self, stream: D, peer_addr: String)
139    where
140        D: StreamSplit,
141    {
142        info!("New peer connection");
143
144        let inner = self.inner.clone();
145        let (stream_reader, stream_writer) = get_stream_pair(stream);
146
147        spawn!(async move {
148            inner
149                .handle_connection(stream_reader, stream_writer, peer_addr)
150                .await
151        });
152    }
153}
154
155/// Inner type that contains all the trait implementations
156struct Inner<C, M, I, S>
157where
158    C: Consensus + 'static,
159    M: Mempool + 'static,
160    I: Info + 'static,
161    S: Snapshot + 'static,
162{
163    consensus: Arc<C>,
164    mempool: Arc<M>,
165    info: Arc<I>,
166    snapshot: Arc<S>,
167    validator: Arc<Mutex<ConsensusStateValidator>>,
168}
169
170impl<C, M, I, S> Inner<C, M, I, S>
171where
172    C: Consensus + 'static,
173    M: Mempool + 'static,
174    I: Info + 'static,
175    S: Snapshot + 'static,
176{
177    pub fn new(consensus: C, mempool: M, info: I, snapshot: S) -> Self {
178        Self {
179            consensus: Arc::new(consensus),
180            mempool: Arc::new(mempool),
181            info: Arc::new(info),
182            snapshot: Arc::new(snapshot),
183            validator: Default::default(),
184        }
185    }
186
187    #[instrument(skip(self, stream_reader, stream_writer))]
188    async fn handle_connection<R, W>(
189        self: Arc<Self>,
190        mut stream_reader: StreamReader<R>,
191        mut stream_writer: StreamWriter<W>,
192        peer_addr: String,
193    ) where
194        R: Read + Unpin + Send + 'static,
195        W: Write + Unpin + Send + 'static,
196    {
197        info!(message = "In handle_connection");
198
199        loop {
200            match stream_reader.read().await {
201                Ok(request) => match request {
202                    Some(request) => {
203                        let (response, connection_type) = self.process(request).await;
204
205                        if let Err(err) = stream_writer.write(response).await {
206                            error!(message = "Error while writing to stream", %err);
207                        }
208
209                        if !matches!(connection_type, ConnectionType::Unknown) {
210                            self.spawn_connection(
211                                stream_reader,
212                                stream_writer,
213                                peer_addr,
214                                connection_type,
215                            );
216                            break;
217                        }
218                    }
219                    None => debug!(message = "Received empty request"),
220                },
221                Err(err) => {
222                    error!(message = "Error while receiving ABCI request from socket", %err);
223                    break;
224                }
225            }
226        }
227    }
228
229    #[instrument(skip(self, stream_reader, stream_writer))]
230    fn spawn_connection<R, W>(
231        &self,
232        stream_reader: StreamReader<R>,
233        stream_writer: StreamWriter<W>,
234        peer_addr: String,
235        connection_type: ConnectionType,
236    ) where
237        R: Read + Unpin + Send + 'static,
238        W: Write + Unpin + Send + 'static,
239    {
240        debug!("Spawning a new connection task");
241
242        match connection_type {
243            ConnectionType::Unknown => unreachable!(
244                "Connection type cannot be unknown when spawning a task for a connection type"
245            ),
246            ConnectionType::Consensus => spawn_consensus_task(
247                stream_reader,
248                stream_writer,
249                peer_addr,
250                self.consensus.clone(),
251                self.validator.clone(),
252            ),
253            ConnectionType::Mempool => spawn_mempool_task(
254                stream_reader,
255                stream_writer,
256                peer_addr,
257                self.mempool.clone(),
258            ),
259            ConnectionType::Info => spawn_info_task(
260                stream_reader,
261                stream_writer,
262                peer_addr,
263                self.info.clone(),
264                self.validator.clone(),
265            ),
266            ConnectionType::Snapshot => spawn_snapshot_task(
267                stream_reader,
268                stream_writer,
269                peer_addr,
270                self.snapshot.clone(),
271            ),
272        }
273    }
274
275    #[instrument(skip(self))]
276    async fn process(&self, request: Request) -> (Response, ConnectionType) {
277        match request.value {
278            None => {
279                debug!(message = "Received empty value in request", ?request);
280
281                (Response::default(), ConnectionType::default())
282            }
283            Some(request_value) => {
284                let connection_type = ConnectionType::from(&request_value);
285
286                let response = match connection_type {
287                    ConnectionType::Unknown => handle_unknown_request(request_value),
288                    ConnectionType::Consensus => {
289                        handle_consensus_request(
290                            self.consensus.as_ref(),
291                            self.validator.clone(),
292                            request_value,
293                        )
294                        .await
295                    }
296                    ConnectionType::Mempool => {
297                        handle_mempool_request(self.mempool.as_ref(), request_value).await
298                    }
299                    ConnectionType::Info => {
300                        handle_info_request(
301                            self.info.as_ref(),
302                            self.validator.clone(),
303                            request_value,
304                        )
305                        .await
306                    }
307                    ConnectionType::Snapshot => {
308                        handle_snapshot_request(self.snapshot.as_ref(), request_value).await
309                    }
310                };
311
312                (response, connection_type)
313            }
314        }
315    }
316}