tendermint_abci/
server.rs1use std::{
4 net::{TcpListener, TcpStream, ToSocketAddrs},
5 thread,
6};
7
8use tracing::{error, info};
9
10use crate::{application::RequestDispatcher, codec::ServerCodec, error::Error, Application};
11
12pub const DEFAULT_SERVER_READ_BUF_SIZE: usize = 1024 * 1024;
15
16pub struct ServerBuilder {
18 read_buf_size: usize,
19}
20
21impl ServerBuilder {
22 pub fn new(read_buf_size: usize) -> Self {
28 Self { read_buf_size }
29 }
30
31 pub fn bind<Addr, App>(self, addr: Addr, app: App) -> Result<Server<App>, Error>
37 where
38 Addr: ToSocketAddrs,
39 App: Application,
40 {
41 let listener = TcpListener::bind(addr).map_err(Error::io)?;
42 let local_addr = listener.local_addr().map_err(Error::io)?.to_string();
43 info!("ABCI server running at {}", local_addr);
44 Ok(Server {
45 app,
46 listener,
47 local_addr,
48 read_buf_size: self.read_buf_size,
49 })
50 }
51}
52
53impl Default for ServerBuilder {
54 fn default() -> Self {
55 Self {
56 read_buf_size: DEFAULT_SERVER_READ_BUF_SIZE,
57 }
58 }
59}
60
61pub struct Server<App> {
68 app: App,
69 listener: TcpListener,
70 local_addr: String,
71 read_buf_size: usize,
72}
73
74impl<App: Application> Server<App> {
75 pub fn listen(self) -> Result<(), Error> {
77 loop {
78 let (stream, addr) = self.listener.accept().map_err(Error::io)?;
79 let addr = addr.to_string();
80 info!("Incoming connection from: {}", addr);
81 self.spawn_client_handler(stream, addr);
82 }
83 }
84
85 pub fn local_addr(&self) -> String {
87 self.local_addr.clone()
88 }
89
90 fn spawn_client_handler(&self, stream: TcpStream, addr: String) {
91 let app = self.app.clone();
92 let read_buf_size = self.read_buf_size;
93 let _ = thread::spawn(move || Self::handle_client(stream, addr, app, read_buf_size));
94 }
95
96 fn handle_client(stream: TcpStream, addr: String, app: App, read_buf_size: usize) {
97 let mut codec = ServerCodec::new(stream, read_buf_size);
98 info!("Listening for incoming requests from {}", addr);
99 loop {
100 let request = match codec.next() {
101 Some(result) => match result {
102 Ok(r) => r,
103 Err(e) => {
104 error!(
105 "Failed to read incoming request from client {}: {:?}",
106 addr, e
107 );
108 return;
109 },
110 },
111 None => {
112 info!("Client {} terminated stream", addr);
113 return;
114 },
115 };
116 let response = app.handle(request);
117 if let Err(e) = codec.send(response) {
118 error!("Failed sending response to client {}: {:?}", addr, e);
119 return;
120 }
121 }
122 }
123}