1use futures::io;
2use log::{error, info, warn};
3use thiserror::Error;
4use tokio::{
5 io::AsyncWriteExt,
6 net::{TcpListener, TcpStream},
7};
8
9use crate::{
10 api::requests::{Request, Response},
11 blockchain_data_provider::BlockchainDataProvider,
12 core::{
13 difficulty::calculate_live_transaction_difficulty, transaction::TransactionError,
14 utils::slice_vec,
15 },
16 economics::get_block_reward,
17 full_node::{SharedBlockchain, accept_block, accept_transaction, node_state::SharedNodeState},
18};
19
20pub const PAGE_SIZE: u32 = 200;
21
22#[derive(Error, Debug)]
23pub enum ApiError {
24 #[error("{0}")]
25 IOError(#[from] io::Error),
26}
27
28pub struct Server {
30 port: u32,
31 blockchain: SharedBlockchain,
32 node_state: SharedNodeState,
33}
34
35impl Server {
36 pub fn new(port: u32, blockchain: SharedBlockchain, node_state: SharedNodeState) -> Self {
38 Server {
39 port,
40 blockchain,
41 node_state,
42 }
43 }
44
45 async fn connection(
47 mut stream: TcpStream,
48 blockchain: SharedBlockchain,
49 node_state: SharedNodeState,
50 ) {
51 loop {
52 if let Err(e) = async {
53 let request = Request::decode_from_stream(&mut stream).await?;
54 let response = match request {
55 Request::Height => Response::Height {
56 height: blockchain.block_store().get_height() as u64,
57 },
58 Request::Block { block_hash } => Response::Block {
59 block: blockchain.block_store().get_block_by_hash(block_hash),
60 },
61 Request::BlockHash { height } => Response::BlockHash {
62 hash: blockchain
63 .block_store()
64 .get_block_hash_by_height(height as usize),
65 },
66 Request::Transaction { transaction_id } => Response::Transaction {
67 transaction: blockchain.block_store().get_transaction(transaction_id),
68 },
69 Request::TransactionAndInfo { transaction_id } => {
70 Response::TransactionAndInfo {
71 transaction_and_info: blockchain
72 .block_store()
73 .get_transaction_and_info(transaction_id),
74 }
75 }
76 Request::TransactionsOfAddress {
77 address,
78 page: requested_page,
79 } => {
80 let start = (requested_page * PAGE_SIZE) as usize;
81 let end = start + PAGE_SIZE as usize;
82
83 let mut transactions = Vec::with_capacity(PAGE_SIZE as usize);
84 let mut seen = 0usize;
85 let mut has_more = false;
86
87 'outer: for block in blockchain.block_store().iter_blocks().rev() {
88 let block = block?;
89
90 for tx in block.transactions {
91 if tx.contains_address(address) {
92 if seen >= start && seen < end {
93 transactions.push(
94 tx.transaction_id.ok_or(TransactionError::MissingId)?,
95 );
96 }
97
98 seen += 1;
99
100 if seen >= end {
101 has_more = true;
102 break 'outer;
103 }
104 }
105 }
106 }
107
108 let next_page = if has_more {
109 Some(requested_page + 1)
110 } else {
111 None
112 };
113
114 Response::TransactionsOfAddress {
115 transactions,
116 next_page,
117 }
118 }
119
120 Request::AvailableUTXOs {
121 address,
122 page: requested_page,
123 } => {
124 let available = blockchain
125 .get_available_transaction_outputs(address)
126 .await?;
127 let page = slice_vec(
128 &available,
129 (requested_page * PAGE_SIZE) as usize,
130 ((requested_page + 1) * PAGE_SIZE) as usize,
131 );
132 let next_page = if page.len() != PAGE_SIZE as usize {
133 None
134 } else {
135 Some(requested_page + 1)
136 };
137 Response::AvailableUTXOs {
138 available_inputs: page.to_vec(),
139 next_page,
140 }
141 }
142 Request::Balance { address } => Response::Balance {
143 balance: blockchain.get_utxos().calculate_confirmed_balance(address),
144 },
145 Request::Reward => Response::Reward {
146 reward: get_block_reward(blockchain.block_store().get_height()),
147 },
148 Request::Peers => {
149 let peers = node_state
150 .connected_peers
151 .read()
152 .await
153 .iter()
154 .map(|peer| *peer.0)
155 .collect();
156 Response::Peers { peers }
157 }
158 Request::Mempool {
159 page: requested_page,
160 } => {
161 let mempool = node_state.mempool.get_mempool().await;
162 let page = slice_vec(
163 &mempool,
164 (requested_page * PAGE_SIZE) as usize,
165 ((requested_page + 1) * PAGE_SIZE) as usize,
166 );
167 let next_page = if page.len() != PAGE_SIZE as usize {
168 None
169 } else {
170 Some(requested_page + 1)
171 };
172
173 Response::Mempool {
174 mempool: page.to_vec(),
175 next_page,
176 }
177 }
178 Request::NewBlock { new_block } => Response::NewBlock {
179 status: accept_block(&blockchain, &node_state, new_block).await,
180 },
181 Request::NewTransaction { new_transaction } => Response::NewTransaction {
182 status: accept_transaction(&blockchain, &node_state, new_transaction).await,
183 },
184 Request::Difficulty => Response::Difficulty {
185 transaction_difficulty: blockchain.get_transaction_difficulty(),
186 block_difficulty: blockchain.get_block_difficulty(),
187 },
188 Request::BlockHeight { hash } => Response::BlockHeight {
189 height: blockchain.block_store().get_block_height_by_hash(hash),
190 },
191 Request::LiveTransactionDifficulty => Response::LiveTransactionDifficulty {
192 live_difficulty: calculate_live_transaction_difficulty(
193 &blockchain.get_transaction_difficulty(),
194 node_state.mempool.mempool_size().await,
195 ),
196 },
197 Request::SubscribeToChainEvents => {
198 let mut rx = node_state.chain_events.subscribe();
199 loop {
201 match rx.recv().await {
202 Ok(event) => {
203 let response = Response::ChainEvent { event };
204 stream.write_all(&response.encode()?).await?;
205 }
206 Err(_) => break,
207 }
208 }
209
210 return Ok(());
212 }
213 };
214 let response_buf = response.encode()?;
215
216 stream.write_all(&response_buf).await?;
217
218 Ok::<(), anyhow::Error>(())
219 }
220 .await
221 {
222 warn!("API client error: {}", e);
223 break;
224 }
225 }
226 }
227
228 pub async fn listen(self) -> Result<(), ApiError> {
230 let listener = match TcpListener::bind(format!("0.0.0.0:{}", self.port)).await {
231 Ok(l) => l,
232 Err(_) => TcpListener::bind("0.0.0.0:0").await?,
233 };
234 info!(
235 "API Server listening on 0.0.0.0:{}",
236 listener.local_addr()?.port()
237 );
238
239 tokio::spawn(async move {
240 loop {
241 if let Err(e) = async {
242 let (stream, _) = listener.accept().await?;
243
244 tokio::spawn(Self::connection(
245 stream,
246 self.blockchain.clone(),
247 self.node_state.clone(),
248 ));
249
250 Ok::<(), ApiError>(())
251 }
252 .await
253 {
254 error!("API client failed to connect: {e}")
255 }
256 }
257 });
258
259 Ok(())
260 }
261}