1use std::{io::Result, sync::Arc};
2
3#[cfg(all(unix, feature = "use-async-std"))]
4use async_std::os::unix::net::UnixListener;
5#[cfg(feature = "use-async-std")]
6use async_std::{
7 io::{Read, Write},
8 net::TcpListener,
9 sync::Mutex,
10 task::spawn,
11};
12#[cfg(all(unix, feature = "use-smol"))]
13use smol::net::unix::UnixListener;
14#[cfg(feature = "use-smol")]
15use smol::{
16 io::{AsyncRead as Read, AsyncWrite as Write},
17 lock::Mutex,
18 net::TcpListener,
19 spawn,
20};
21use tendermint_proto::abci::{Request, Response};
22#[cfg(all(unix, feature = "use-tokio"))]
23use tokio::net::UnixListener;
24#[cfg(feature = "use-tokio")]
25use tokio::{
26 io::{AsyncRead as Read, AsyncWrite as Write},
27 net::TcpListener,
28 spawn,
29 sync::Mutex,
30};
31use tracing::{debug, error, info, instrument};
32
33use crate::{
34 address::Address,
35 async_api::{Consensus, Info, Mempool, Snapshot},
36 handler::*,
37 state::ConsensusStateValidator,
38 stream_split::StreamSplit,
39 tasks::*,
40 utils::{get_stream_pair, ConnectionType, StreamReader, StreamWriter},
41};
42
43macro_rules! spawn {
44 ($expr: expr) => {
45 cfg_if::cfg_if! {
46 if #[cfg(any(feature = "use-async-std", feature = "use-tokio"))] {
47 spawn($expr)
48 } else if #[cfg(feature = "use-smol")] {
49 spawn($expr)
50 .detach()
51 } else {
52 unreachable!()
53 }
54 }
55 };
56}
57
58pub struct Server<C, M, I, S>
60where
61 C: Consensus + 'static,
62 M: Mempool + 'static,
63 I: Info + 'static,
64 S: Snapshot + 'static,
65{
66 inner: Arc<Inner<C, M, I, S>>,
69}
70
71impl<C, M, I, S> Server<C, M, I, S>
72where
73 C: Consensus + 'static,
74 M: Mempool + 'static,
75 I: Info + 'static,
76 S: Snapshot + 'static,
77{
78 pub fn new(consensus: C, mempool: M, info: I, snapshot: S) -> Self {
80 Self {
81 inner: Arc::new(Inner::new(consensus, mempool, info, snapshot)),
82 }
83 }
84
85 pub async fn run<T>(&self, addr: T) -> Result<()>
92 where
93 T: Into<Address>,
94 {
95 let addr = addr.into();
96
97 match addr {
98 Address::Tcp(addr) => {
99 let listener = TcpListener::bind(addr).await?;
100 info!(message = "Started ABCI server at", %addr);
101
102 loop {
103 let (stream, peer_addr) = listener.accept().await?;
104 self.handle_connection(stream, peer_addr.to_string());
105 }
106 }
107 #[cfg(unix)]
108 Address::Uds(path) => {
109 cfg_if::cfg_if! {
110 if #[cfg(feature = "use-async-std")] {
111 let listener = UnixListener::bind(&path).await?;
112 } else if #[cfg(any(feature = "use-smol", feature = "use-tokio"))] {
113 let listener = UnixListener::bind(&path)?;
114 } else {
115 unreachable!()
116 }
117 }
118
119 info!(message = "Started ABCI server at", path = %path.display());
120
121 loop {
122 let (stream, peer_addr) = listener.accept().await?;
123 self.handle_connection(stream, format!("{:?}", peer_addr));
124 }
125 }
126 #[cfg(test)]
127 Address::Mock(mut listener) => {
128 while let Ok(stream) = listener.accept().await {
129 self.handle_connection(stream, "test_peer".to_string());
130 }
131
132 Ok(())
133 }
134 }
135 }
136
137 #[instrument(skip(self, stream))]
138 pub(crate) fn handle_connection<D>(&self, stream: D, peer_addr: String)
139 where
140 D: StreamSplit,
141 {
142 info!("New peer connection");
143
144 let inner = self.inner.clone();
145 let (stream_reader, stream_writer) = get_stream_pair(stream);
146
147 spawn!(async move {
148 inner
149 .handle_connection(stream_reader, stream_writer, peer_addr)
150 .await
151 });
152 }
153}
154
155struct Inner<C, M, I, S>
157where
158 C: Consensus + 'static,
159 M: Mempool + 'static,
160 I: Info + 'static,
161 S: Snapshot + 'static,
162{
163 consensus: Arc<C>,
164 mempool: Arc<M>,
165 info: Arc<I>,
166 snapshot: Arc<S>,
167 validator: Arc<Mutex<ConsensusStateValidator>>,
168}
169
170impl<C, M, I, S> Inner<C, M, I, S>
171where
172 C: Consensus + 'static,
173 M: Mempool + 'static,
174 I: Info + 'static,
175 S: Snapshot + 'static,
176{
177 pub fn new(consensus: C, mempool: M, info: I, snapshot: S) -> Self {
178 Self {
179 consensus: Arc::new(consensus),
180 mempool: Arc::new(mempool),
181 info: Arc::new(info),
182 snapshot: Arc::new(snapshot),
183 validator: Default::default(),
184 }
185 }
186
187 #[instrument(skip(self, stream_reader, stream_writer))]
188 async fn handle_connection<R, W>(
189 self: Arc<Self>,
190 mut stream_reader: StreamReader<R>,
191 mut stream_writer: StreamWriter<W>,
192 peer_addr: String,
193 ) where
194 R: Read + Unpin + Send + 'static,
195 W: Write + Unpin + Send + 'static,
196 {
197 info!(message = "In handle_connection");
198
199 loop {
200 match stream_reader.read().await {
201 Ok(request) => match request {
202 Some(request) => {
203 let (response, connection_type) = self.process(request).await;
204
205 if let Err(err) = stream_writer.write(response).await {
206 error!(message = "Error while writing to stream", %err);
207 }
208
209 if !matches!(connection_type, ConnectionType::Unknown) {
210 self.spawn_connection(
211 stream_reader,
212 stream_writer,
213 peer_addr,
214 connection_type,
215 );
216 break;
217 }
218 }
219 None => debug!(message = "Received empty request"),
220 },
221 Err(err) => {
222 error!(message = "Error while receiving ABCI request from socket", %err);
223 break;
224 }
225 }
226 }
227 }
228
229 #[instrument(skip(self, stream_reader, stream_writer))]
230 fn spawn_connection<R, W>(
231 &self,
232 stream_reader: StreamReader<R>,
233 stream_writer: StreamWriter<W>,
234 peer_addr: String,
235 connection_type: ConnectionType,
236 ) where
237 R: Read + Unpin + Send + 'static,
238 W: Write + Unpin + Send + 'static,
239 {
240 debug!("Spawning a new connection task");
241
242 match connection_type {
243 ConnectionType::Unknown => unreachable!(
244 "Connection type cannot be unknown when spawning a task for a connection type"
245 ),
246 ConnectionType::Consensus => spawn_consensus_task(
247 stream_reader,
248 stream_writer,
249 peer_addr,
250 self.consensus.clone(),
251 self.validator.clone(),
252 ),
253 ConnectionType::Mempool => spawn_mempool_task(
254 stream_reader,
255 stream_writer,
256 peer_addr,
257 self.mempool.clone(),
258 ),
259 ConnectionType::Info => spawn_info_task(
260 stream_reader,
261 stream_writer,
262 peer_addr,
263 self.info.clone(),
264 self.validator.clone(),
265 ),
266 ConnectionType::Snapshot => spawn_snapshot_task(
267 stream_reader,
268 stream_writer,
269 peer_addr,
270 self.snapshot.clone(),
271 ),
272 }
273 }
274
275 #[instrument(skip(self))]
276 async fn process(&self, request: Request) -> (Response, ConnectionType) {
277 match request.value {
278 None => {
279 debug!(message = "Received empty value in request", ?request);
280
281 (Response::default(), ConnectionType::default())
282 }
283 Some(request_value) => {
284 let connection_type = ConnectionType::from(&request_value);
285
286 let response = match connection_type {
287 ConnectionType::Unknown => handle_unknown_request(request_value),
288 ConnectionType::Consensus => {
289 handle_consensus_request(
290 self.consensus.as_ref(),
291 self.validator.clone(),
292 request_value,
293 )
294 .await
295 }
296 ConnectionType::Mempool => {
297 handle_mempool_request(self.mempool.as_ref(), request_value).await
298 }
299 ConnectionType::Info => {
300 handle_info_request(
301 self.info.as_ref(),
302 self.validator.clone(),
303 request_value,
304 )
305 .await
306 }
307 ConnectionType::Snapshot => {
308 handle_snapshot_request(self.snapshot.as_ref(), request_value).await
309 }
310 };
311
312 (response, connection_type)
313 }
314 }
315 }
316}