async_abci/serverxx/
smol_impl.rs1use smol::{
2 io::{AsyncRead, AsyncWrite},
3 net::{AsyncToSocketAddrs, TcpListener},
4};
5
6use tm_abci::{request, response, ApplicationXX, Request, Response};
7
8use crate::{
9 codec::{ICodec, OCodec},
10 state::ConsensusQueue,
11 Error, Result,
12};
13
14pub struct ServerXX<App> {
16 tcp: Option<TcpListener>,
17 #[cfg(unix)]
18 unix: Option<smol::net::unix::UnixListener>,
19 app: App,
20}
21
22impl<App> ServerXX<App>
23where
24 App: ApplicationXX + Clone + 'static,
25{
26 pub fn new(app: App) -> Self {
27 Self {
28 tcp: None,
29
30 #[cfg(unix)]
31 unix: None,
32
33 app,
34 }
35 }
36
37 pub async fn bind<A: AsyncToSocketAddrs>(mut self, addr: A) -> Result<Self> {
38 let tcp = TcpListener::bind(addr).await?;
39 self.tcp = Some(tcp);
40
41 Ok(self)
42 }
43
44 #[cfg(unix)]
45 pub async fn bind_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> Result<Self> {
46 let unix = smol::net::unix::UnixListener::bind(path)?;
47 self.unix = Some(unix);
48
49 Ok(self)
50 }
51
52 pub async fn run(self) -> Result<()> {
53 if let Some(tcp) = self.tcp {
54 loop {
55
56 let (socket, addr) = tcp.accept().await?;
57 log::info!("new connect from {:?}", addr);
58
59 smol::spawn(conn_handle(socket.clone(), socket, self.app.clone())).detach();
60 }
61 }
62
63 #[cfg(unix)]
64 if let Some(unix) = self.unix {
65 loop {
66 let (socket, addr) = unix.accept().await?;
67 log::info!("new connect from {:?}", addr);
68
69 smol::spawn(conn_handle(socket.clone(), socket, self.app.clone())).detach();
70
71 }
72 }
73
74 Err(Error::ServerNotBinding)
75 }
76}
77
78async fn send_flush<W>(ocodec: &mut OCodec<W>)
79where
80 W: AsyncWrite + Unpin + Sync + Send + 'static,
81{
82 let flush = Response {
83 value: Some(response::Value::Flush(Default::default())),
84 };
85 log::info!("Send: {:?}", flush);
86 ocodec.send(flush).await.expect("Failed to send data");
87}
88
89async fn send_response<W>(ocodec: &mut OCodec<W>, resp: Response)
90where
91 W: AsyncWrite + Unpin + Sync + Send + 'static,
92{
93 log::info!("Send: {:?}", resp);
94 ocodec.send(resp).await.expect("Failed to send data");
95}
96
97async fn conn_handle<A, R, W>(reader: R, writer: W, app: A)
98where
99 R: AsyncRead + Unpin + Sync + Send + 'static,
100 W: AsyncWrite + Unpin + Sync + Send + 'static,
101 A: ApplicationXX + Clone + 'static,
102{
103 let mut state: Option<ConsensusQueue> = None;
104 let mut icodec = ICodec::new(reader, 4096);
105 let mut ocodec = OCodec::new(writer);
106 loop {
107 let pkt = icodec.next().await;
108
109 match pkt {
110 Some(Ok(p)) => {
111 log::info!("Recv: {:?}", p);
112
113 if state.is_none() && ConsensusQueue::is_consensus(&p) {
114 state = Some(ConsensusQueue::new(p.clone()).expect("Logic error"));
115 }
116
117 if let Some(st) = &mut state {
118 log::info!("State is: {:?}", st.state);
120 st.add_pkt(p).expect("Error state convert");
121
122 if st.flushed {
123 send_flush(&mut ocodec).await;
124 continue;
125 }
126
127 if st.is_deliver_block() {
128 let fbp = st.to_block().expect("Failed to build block");
130
131 let tx_len = fbp.transactions.len();
132
133 let res = app.finalized_block(fbp).await;
134
135 let filled_tx = tx_len - res.tx_receipt.len();
136
137 for tx in res.tx_receipt {
138 let value = Some(response::Value::DeliverTx(tx));
139 let resp = Response { value };
140
141 send_response(&mut ocodec, resp).await;
142 }
143
144 for _ in 0..filled_tx {
145 let value = Some(response::Value::DeliverTx(Default::default()));
146 let resp = Response { value };
147
148 send_response(&mut ocodec, resp).await;
149 }
150
151 let value = Some(response::Value::EndBlock(res.end_recepit));
152 let resp = Response { value };
153
154 send_response(&mut ocodec, resp).await;
155
156 send_flush(&mut ocodec).await;
157 }
158
159 if st.is_sendable() {
160 let request = st.to_packet().expect("Wrong state");
161 let resp = app.dispatch(request).await;
162
163 send_response(&mut ocodec, resp).await;
164
165 send_flush(&mut ocodec).await;
166 }
167
168 if st.is_begin_block_flush() {
169 let resp = Response {
170 value: Some(response::Value::BeginBlock(Default::default())),
171 };
172
173 send_response(&mut ocodec, resp).await;
174
175 send_flush(&mut ocodec).await;
176 }
177
178 if st.is_commit_flush() {
179 let req = Request {
180 value: Some(request::Value::Commit(Default::default())),
181 };
182 let resp = app.dispatch(req).await;
183
184 send_response(&mut ocodec, resp).await;
185
186 send_flush(&mut ocodec).await;
187 }
188 } else {
189 let res = app.dispatch(p).await;
191 log::info!("Send: {:?}", res);
192 ocodec.send(res).await.expect("Failed to send data");
193 }
194 }
195 Some(Err(e)) => {
196 log::info!("Failed to read incoming request: {:?}", e);
197 }
198 None => {}
199 }
200 }
201}