async_abci/serverxx/
smol_impl.rs

1use smol::{
2    io::{AsyncRead, AsyncWrite},
3    net::{AsyncToSocketAddrs, TcpListener},
4};
5
6use tm_abci::{request, response, ApplicationXX, Request, Response};
7
8use crate::{
9    codec::{ICodec, OCodec},
10    state::ConsensusQueue,
11    Error, Result,
12};
13
14/// ACBI Server.
15pub struct ServerXX<App> {
16    tcp: Option<TcpListener>,
17    #[cfg(unix)]
18    unix: Option<smol::net::unix::UnixListener>,
19    app: App,
20}
21
22impl<App> ServerXX<App>
23where
24    App: ApplicationXX + Clone + 'static,
25{
26    pub fn new(app: App) -> Self {
27        Self {
28            tcp: None,
29
30            #[cfg(unix)]
31            unix: None,
32
33            app,
34        }
35    }
36
37    pub async fn bind<A: AsyncToSocketAddrs>(mut self, addr: A) -> Result<Self> {
38        let tcp = TcpListener::bind(addr).await?;
39        self.tcp = Some(tcp);
40
41        Ok(self)
42    }
43
44    #[cfg(unix)]
45    pub async fn bind_unix<P: AsRef<std::path::Path>>(mut self, path: P) -> Result<Self> {
46        let unix = smol::net::unix::UnixListener::bind(path)?;
47        self.unix = Some(unix);
48
49        Ok(self)
50    }
51
52    pub async fn run(self) -> Result<()> {
53        if let Some(tcp) = self.tcp {
54            loop {
55
56            let (socket, addr) = tcp.accept().await?;
57            log::info!("new connect from {:?}", addr);
58
59            smol::spawn(conn_handle(socket.clone(), socket, self.app.clone())).detach();
60            }
61        }
62
63        #[cfg(unix)]
64        if let Some(unix) = self.unix {
65            loop {
66            let (socket, addr) = unix.accept().await?;
67            log::info!("new connect from {:?}", addr);
68
69            smol::spawn(conn_handle(socket.clone(), socket, self.app.clone())).detach();
70
71            }
72        }
73
74        Err(Error::ServerNotBinding)
75    }
76}
77
78async fn send_flush<W>(ocodec: &mut OCodec<W>)
79where
80    W: AsyncWrite + Unpin + Sync + Send + 'static,
81{
82    let flush = Response {
83        value: Some(response::Value::Flush(Default::default())),
84    };
85    log::info!("Send: {:?}", flush);
86    ocodec.send(flush).await.expect("Failed to send data");
87}
88
89async fn send_response<W>(ocodec: &mut OCodec<W>, resp: Response)
90where
91    W: AsyncWrite + Unpin + Sync + Send + 'static,
92{
93    log::info!("Send: {:?}", resp);
94    ocodec.send(resp).await.expect("Failed to send data");
95}
96
97async fn conn_handle<A, R, W>(reader: R, writer: W, app: A)
98where
99    R: AsyncRead + Unpin + Sync + Send + 'static,
100    W: AsyncWrite + Unpin + Sync + Send + 'static,
101    A: ApplicationXX + Clone + 'static,
102{
103    let mut state: Option<ConsensusQueue> = None;
104    let mut icodec = ICodec::new(reader, 4096);
105    let mut ocodec = OCodec::new(writer);
106    loop {
107        let pkt = icodec.next().await;
108
109        match pkt {
110            Some(Ok(p)) => {
111                log::info!("Recv: {:?}", p);
112
113                if state.is_none() && ConsensusQueue::is_consensus(&p) {
114                    state = Some(ConsensusQueue::new(p.clone()).expect("Logic error"));
115                }
116
117                if let Some(st) = &mut state {
118                    // do logic of based on state.
119                    log::info!("State is: {:?}", st.state);
120                    st.add_pkt(p).expect("Error state convert");
121
122                    if st.flushed {
123                        send_flush(&mut ocodec).await;
124                        continue;
125                    }
126
127                    if st.is_deliver_block() {
128                        // do appxx
129                        let fbp = st.to_block().expect("Failed to build block");
130
131                        let tx_len = fbp.transactions.len();
132
133                        let res = app.finalized_block(fbp).await;
134
135                        let filled_tx = tx_len - res.tx_receipt.len();
136
137                        for tx in res.tx_receipt {
138                            let value = Some(response::Value::DeliverTx(tx));
139                            let resp = Response { value };
140
141                            send_response(&mut ocodec, resp).await;
142                        }
143
144                        for _ in 0..filled_tx {
145                            let value = Some(response::Value::DeliverTx(Default::default()));
146                            let resp = Response { value };
147
148                            send_response(&mut ocodec, resp).await;
149                        }
150
151                        let value = Some(response::Value::EndBlock(res.end_recepit));
152                        let resp = Response { value };
153
154                        send_response(&mut ocodec, resp).await;
155
156                        send_flush(&mut ocodec).await;
157                    }
158
159                    if st.is_sendable() {
160                        let request = st.to_packet().expect("Wrong state");
161                        let resp = app.dispatch(request).await;
162
163                        send_response(&mut ocodec, resp).await;
164
165                        send_flush(&mut ocodec).await;
166                    }
167
168                    if st.is_begin_block_flush() {
169                        let resp = Response {
170                            value: Some(response::Value::BeginBlock(Default::default())),
171                        };
172
173                        send_response(&mut ocodec, resp).await;
174
175                        send_flush(&mut ocodec).await;
176                    }
177
178                    if st.is_commit_flush() {
179                        let req = Request {
180                            value: Some(request::Value::Commit(Default::default())),
181                        };
182                        let resp = app.dispatch(req).await;
183
184                        send_response(&mut ocodec, resp).await;
185
186                        send_flush(&mut ocodec).await;
187                    }
188                } else {
189                    // do appxx
190                    let res = app.dispatch(p).await;
191                    log::info!("Send: {:?}", res);
192                    ocodec.send(res).await.expect("Failed to send data");
193                }
194            }
195            Some(Err(e)) => {
196                log::info!("Failed to read incoming request: {:?}", e);
197            }
198            None => {}
199        }
200    }
201}