Skip to main content

snap_coin/api/
api_server.rs

1use futures::io;
2use log::{error, info, warn};
3use thiserror::Error;
4use tokio::{
5    io::AsyncWriteExt,
6    net::{TcpListener, TcpStream},
7};
8
9use crate::{
10    api::requests::{Request, Response},
11    blockchain_data_provider::BlockchainDataProvider,
12    core::{
13        difficulty::calculate_live_transaction_difficulty, transaction::TransactionError,
14        utils::slice_vec,
15    },
16    economics::get_block_reward,
17    full_node::{SharedBlockchain, accept_block, accept_transaction, node_state::SharedNodeState},
18};
19
20pub const PAGE_SIZE: u32 = 200;
21
22#[derive(Error, Debug)]
23pub enum ApiError {
24    #[error("{0}")]
25    IOError(#[from] io::Error),
26}
27
28/// Server for hosting a Snap Coin API
29pub struct Server {
30    port: u32,
31    blockchain: SharedBlockchain,
32    node_state: SharedNodeState,
33}
34
35impl Server {
36    /// Create a new server, do not listen for connections yet
37    pub fn new(port: u32, blockchain: SharedBlockchain, node_state: SharedNodeState) -> Self {
38        Server {
39            port,
40            blockchain,
41            node_state,
42        }
43    }
44
45    /// Handle a incoming connection
46    async fn connection(
47        mut stream: TcpStream,
48        blockchain: SharedBlockchain,
49        node_state: SharedNodeState,
50    ) {
51        loop {
52            if let Err(e) = async {
53                let request = Request::decode_from_stream(&mut stream).await?;
54                let response = match request {
55                    Request::Height => Response::Height {
56                        height: blockchain.block_store().get_height() as u64,
57                    },
58                    Request::Block { block_hash } => Response::Block {
59                        block: blockchain.block_store().get_block_by_hash(block_hash),
60                    },
61                    Request::BlockHash { height } => Response::BlockHash {
62                        hash: blockchain
63                            .block_store()
64                            .get_block_hash_by_height(height as usize),
65                    },
66                    Request::Transaction { transaction_id } => Response::Transaction {
67                        transaction: blockchain.block_store().get_transaction(transaction_id),
68                    },
69                    Request::TransactionAndInfo { transaction_id } => {
70                        Response::TransactionAndInfo {
71                            transaction_and_info: blockchain
72                                .block_store()
73                                .get_transaction_and_info(transaction_id),
74                        }
75                    }
76                    Request::TransactionsOfAddress {
77                        address,
78                        page: requested_page,
79                    } => {
80                        let start = (requested_page * PAGE_SIZE) as usize;
81                        let end = start + PAGE_SIZE as usize;
82
83                        let mut transactions = Vec::with_capacity(PAGE_SIZE as usize);
84                        let mut seen = 0usize;
85                        let mut has_more = false;
86
87                        'outer: for block in blockchain.block_store().iter_blocks().rev() {
88                            let block = block?;
89
90                            for tx in block.transactions {
91                                if tx.contains_address(address) {
92                                    if seen >= start && seen < end {
93                                        transactions.push(
94                                            tx.transaction_id.ok_or(TransactionError::MissingId)?,
95                                        );
96                                    }
97
98                                    seen += 1;
99
100                                    if seen >= end {
101                                        has_more = true;
102                                        break 'outer;
103                                    }
104                                }
105                            }
106                        }
107
108                        let next_page = if has_more {
109                            Some(requested_page + 1)
110                        } else {
111                            None
112                        };
113
114                        Response::TransactionsOfAddress {
115                            transactions,
116                            next_page,
117                        }
118                    }
119
120                    Request::AvailableUTXOs {
121                        address,
122                        page: requested_page,
123                    } => {
124                        let available = blockchain
125                            .get_available_transaction_outputs(address)
126                            .await?;
127                        let page = slice_vec(
128                            &available,
129                            (requested_page * PAGE_SIZE) as usize,
130                            ((requested_page + 1) * PAGE_SIZE) as usize,
131                        );
132                        let next_page = if page.len() != PAGE_SIZE as usize {
133                            None
134                        } else {
135                            Some(requested_page + 1)
136                        };
137                        Response::AvailableUTXOs {
138                            available_inputs: page.to_vec(),
139                            next_page,
140                        }
141                    }
142                    Request::Balance { address } => Response::Balance {
143                        balance: blockchain.get_utxos().calculate_confirmed_balance(address),
144                    },
145                    Request::Reward => Response::Reward {
146                        reward: get_block_reward(blockchain.block_store().get_height()),
147                    },
148                    Request::Peers => {
149                        let peers = node_state
150                            .connected_peers
151                            .read()
152                            .await
153                            .iter()
154                            .map(|peer| *peer.0)
155                            .collect();
156                        Response::Peers { peers }
157                    }
158                    Request::Mempool {
159                        page: requested_page,
160                    } => {
161                        let mempool = node_state.mempool.get_mempool().await;
162                        let page = slice_vec(
163                            &mempool,
164                            (requested_page * PAGE_SIZE) as usize,
165                            ((requested_page + 1) * PAGE_SIZE) as usize,
166                        );
167                        let next_page = if page.len() != PAGE_SIZE as usize {
168                            None
169                        } else {
170                            Some(requested_page + 1)
171                        };
172
173                        Response::Mempool {
174                            mempool: page.to_vec(),
175                            next_page,
176                        }
177                    }
178                    Request::NewBlock { new_block } => Response::NewBlock {
179                        status: accept_block(&blockchain, &node_state, new_block).await,
180                    },
181                    Request::NewTransaction { new_transaction } => Response::NewTransaction {
182                        status: accept_transaction(&blockchain, &node_state, new_transaction).await,
183                    },
184                    Request::Difficulty => Response::Difficulty {
185                        transaction_difficulty: blockchain.get_transaction_difficulty(),
186                        block_difficulty: blockchain.get_block_difficulty(),
187                    },
188                    Request::BlockHeight { hash } => Response::BlockHeight {
189                        height: blockchain.block_store().get_block_height_by_hash(hash),
190                    },
191                    Request::LiveTransactionDifficulty => Response::LiveTransactionDifficulty {
192                        live_difficulty: calculate_live_transaction_difficulty(
193                            &blockchain.get_transaction_difficulty(),
194                            node_state.mempool.mempool_size().await,
195                        ),
196                    },
197                    Request::SubscribeToChainEvents => {
198                        let mut rx = node_state.chain_events.subscribe();
199                        // Start event stream task
200                        loop {
201                            match rx.recv().await {
202                                Ok(event) => {
203                                    let response = Response::ChainEvent { event };
204                                    stream.write_all(&response.encode()?).await?;
205                                }
206                                Err(_) => break,
207                            }
208                        }
209
210                        // Stop request response task
211                        return Ok(());
212                    }
213                };
214                let response_buf = response.encode()?;
215
216                stream.write_all(&response_buf).await?;
217
218                Ok::<(), anyhow::Error>(())
219            }
220            .await
221            {
222                warn!("API client error: {}", e);
223                break;
224            }
225        }
226    }
227
228    /// Start listening for clients
229    pub async fn listen(self) -> Result<(), ApiError> {
230        let listener = match TcpListener::bind(format!("0.0.0.0:{}", self.port)).await {
231            Ok(l) => l,
232            Err(_) => TcpListener::bind("0.0.0.0:0").await?,
233        };
234        info!(
235            "API Server listening on 0.0.0.0:{}",
236            listener.local_addr()?.port()
237        );
238
239        tokio::spawn(async move {
240            loop {
241                if let Err(e) = async {
242                    let (stream, _) = listener.accept().await?;
243
244                    tokio::spawn(Self::connection(
245                        stream,
246                        self.blockchain.clone(),
247                        self.node_state.clone(),
248                    ));
249
250                    Ok::<(), ApiError>(())
251                }
252                .await
253                {
254                    error!("API client failed to connect: {e}")
255                }
256            }
257        });
258
259        Ok(())
260    }
261}