tendermint_abci/
server.rs

1//! ABCI application server interface.
2
3use std::{
4    net::{TcpListener, TcpStream, ToSocketAddrs},
5    thread,
6};
7
8use tracing::{error, info};
9
10use crate::{application::RequestDispatcher, codec::ServerCodec, error::Error, Application};
11
12/// The size of the read buffer for each incoming connection to the ABCI
13/// server (1MB).
14pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024;
15
16/// Allows us to configure and construct an ABCI server.
17pub struct ServerBuilder {
18    read_buf_size: usize,
19}
20
21impl ServerBuilder {
22    /// Builder constructor.
23    ///
24    /// Allows you to specify the read buffer size used when reading chunks of
25    /// incoming data from the client. This needs to be tuned for your
26    /// application.
27    pub fn new(read_buf_size: usize) -> Self {
28        Self { read_buf_size }
29    }
30
31    /// Constructor for an ABCI server.
32    ///
33    /// Binds the server to the given address. You must subsequently call the
34    /// [`Server::listen`] method in order for incoming connections' requests
35    /// to be routed to the specified ABCI application.
36    pub fn bind<Addr, App>(self, addr: Addr, app: App) -> Result<Server<App>, Error>
37    where
38        Addr: ToSocketAddrs,
39        App: Application,
40    {
41        let listener = TcpListener::bind(addr).map_err(Error::io)?;
42        let local_addr = listener.local_addr().map_err(Error::io)?.to_string();
43        info!("ABCI server running at {}", local_addr);
44        Ok(Server {
45            app,
46            listener,
47            local_addr,
48            read_buf_size: self.read_buf_size,
49        })
50    }
51}
52
53impl Default for ServerBuilder {
54    fn default() -> Self {
55        Self {
56            read_buf_size: DEFAULT_SERVER_READ_BUF_SIZE,
57        }
58    }
59}
60
61/// A TCP-based server for serving a specific ABCI application.
62///
63/// Each incoming connection is handled in a separate thread. The ABCI
64/// application is cloned for access in each thread. It is up to the
65/// application developer to manage shared state across these different
66/// threads.
67pub struct Server<App> {
68    app: App,
69    listener: TcpListener,
70    local_addr: String,
71    read_buf_size: usize,
72}
73
74impl<App: Application> Server<App> {
75    /// Initiate a blocking listener for incoming connections.
76    pub fn listen(self) -> Result<(), Error> {
77        loop {
78            let (stream, addr) = self.listener.accept().map_err(Error::io)?;
79            let addr = addr.to_string();
80            info!("Incoming connection from: {}", addr);
81            self.spawn_client_handler(stream, addr);
82        }
83    }
84
85    /// Getter for this server's local address.
86    pub fn local_addr(&self) -> String {
87        self.local_addr.clone()
88    }
89
90    fn spawn_client_handler(&self, stream: TcpStream, addr: String) {
91        let app = self.app.clone();
92        let read_buf_size = self.read_buf_size;
93        let _ = thread::spawn(move || Self::handle_client(stream, addr, app, read_buf_size));
94    }
95
96    fn handle_client(stream: TcpStream, addr: String, app: App, read_buf_size: usize) {
97        let mut codec = ServerCodec::new(stream, read_buf_size);
98        info!("Listening for incoming requests from {}", addr);
99        loop {
100            let request = match codec.next() {
101                Some(result) => match result {
102                    Ok(r) => r,
103                    Err(e) => {
104                        error!(
105                            "Failed to read incoming request from client {}: {:?}",
106                            addr, e
107                        );
108                        return;
109                    },
110                },
111                None => {
112                    info!("Client {} terminated stream", addr);
113                    return;
114                },
115            };
116            let response = app.handle(request);
117            if let Err(e) = codec.send(response) {
118                error!("Failed sending response to client {}: {:?}", addr, e);
119                return;
120            }
121        }
122    }
123}