chainseeker_server/
http_server.rs

1use std::str::FromStr;
2use std::time::Instant;
3use std::convert::Infallible;
4use std::net::SocketAddr;
5use std::sync::Arc;
6use std::collections::HashMap;
7use tokio::sync::RwLock;
8use hyper::{Body, Request, Response, Server, StatusCode};
9use routerify::prelude::*;
10use routerify::{Middleware, Router, RouterService};
11use bitcoin::hashes::hex::{FromHex, ToHex};
12use bitcoin::{Script, Address};
13use bitcoincore_rpc::{Auth, Client, RpcApi};
14
15use super::*;
16
17#[derive(Debug, Clone)]
18pub struct HttpServer {
19    coin: String,
20    config: Config,
21    // (height, RestBlockSummary)
22    block_summary_cache: Arc<RwLock<HashMap<u32, RestBlockSummary>>>,
23    pub synced_height_db: Arc<RwLock<SyncedHeightDB>>,
24    pub block_db: Arc<RwLock<BlockDB>>,
25    pub tx_db: Arc<RwLock<TxDB>>,
26    pub addr_index_db: Arc<RwLock<AddressIndexDB>>,
27    pub utxo_server: Arc<RwLock<UtxoServer>>,
28    pub rich_list: Arc<RwLock<RichList>>,
29}
30
31impl HttpServer {
32    pub fn new(coin: &str, config: &Config) -> Self {
33        Self{
34            coin: coin.to_string(),
35            config: (*config).clone(),
36            block_summary_cache: Arc::new(RwLock::new(HashMap::new())),
37            synced_height_db: Arc::new(RwLock::new(SyncedHeightDB::new(coin))),
38            block_db: Arc::new(RwLock::new(BlockDB::new(coin, false))),
39            tx_db: Arc::new(RwLock::new(TxDB::new(coin, false))),
40            addr_index_db: Arc::new(RwLock::new(AddressIndexDB::new(coin, false))),
41            utxo_server: Arc::new(RwLock::new(UtxoServer::new())),
42            rich_list: Arc::new(RwLock::new(RichList::new())),
43        }
44    }
45    fn response(status: &StatusCode, body: String, cacheable: bool) -> Response<Body> {
46        let builder = Response::builder();
47        let builder = if cacheable {
48            builder
49                .header("Cache-Control", "public, max-age=60, s-maxage=86400")
50                .header("CDN-Cache-Control", "max-age=86400")
51        } else {
52            builder
53                .header("Cache-Control", "no-store")
54                .header("CDN-Cache-Control", "no-store")
55        };
56        builder
57            .header("Content-Type", "application/json")
58            .header(hyper::header::ACCESS_CONTROL_ALLOW_ORIGIN, "*")
59            .status(status)
60            .body(body.into())
61            .unwrap()
62    }
63    fn error(status: &StatusCode, msg: &str) -> Response<Body> {
64        Self::response(status, format!("{{\"error\":\"{}\"}}", msg), false)
65    }
66    pub fn not_found(msg: &str) -> Response<Body> {
67        Self::error(&StatusCode::NOT_FOUND, msg)
68    }
69    pub fn bad_request(msg: &str) -> Response<Body> {
70        Self::error(&StatusCode::BAD_REQUEST, msg)
71    }
72    pub fn internal_error(msg: &str) -> Response<Body> {
73        Self::error(&StatusCode::INTERNAL_SERVER_ERROR, msg)
74    }
75    pub fn ok(json: String, cacheable: bool) -> Response<Body> {
76        Self::response(&StatusCode::OK, json, cacheable)
77    }
78    pub fn json<S>(object: S, cacheable: bool) -> Response<Body>
79        where S: serde::ser::Serialize,
80    {
81        let json = serde_json::to_string(&object);
82        match json {
83            Ok(json) => Self::ok(json, cacheable),
84            Err(_) => Self::internal_error("Failed to encode to JSON."),
85        }
86    }
87    /// `/status` endpoint.
88    async fn status_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
89        let server = req.data::<HttpServer>().unwrap();
90        Ok(Self::json(RestStatus { blocks: server.synced_height_db.read().await.get().map_or(-1, |h| h as i32) }, false))
91    }
92    /// `/tx/:txid` endpoint.
93    async fn tx_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
94        let server = req.data::<HttpServer>().unwrap();
95        let txid = match req.param("txid").unwrap().parse() {
96            Ok(txid) => txid,
97            Err(_) => return Ok(Self::not_found("Failed to decode txid.")),
98        };
99        match server.tx_db.read().await.get_as_rest(&txid, &server.config) {
100            Some(tx) => {
101                let cacheable = tx.confirmed_height.is_some();
102                Ok(Self::json(tx, cacheable))
103            },
104            None => Ok(Self::not_found("Transaction not found.")),
105        }
106    }
107    /// `/tx/broadcast` endpoint.
108    async fn tx_broadcast_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
109        let server = req.data::<HttpServer>().unwrap();
110        let auth = Auth::UserPass(server.config.rpc_user.clone(), server.config.rpc_pass.clone());
111        let rpc = Client::new(server.config.rpc_endpoint.clone(), auth).unwrap();
112        let hex = hyper::body::to_bytes(req.into_body()).await.unwrap();
113        let hex = String::from_utf8(hex.to_vec());
114        if hex.is_err() {
115            return Ok(Self::bad_request("Failed to parse input."));
116        }
117        match rpc.send_raw_transaction(hex.unwrap()) {
118            Ok(txid) => Ok(Self::ok(format!("{{\"txid\":\"{}\"}}", txid), false)),
119            Err(_) => Ok(Self::bad_request("Failed to broadcast transaction.")),
120        }
121    }
122    /// `/block_summary/:offset/:limit` endpoint.
123    async fn block_summary_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
124        let offset: u32 = match req.param("offset").unwrap().parse() {
125            Ok(offset) => offset,
126            Err(_) => return Ok(Self::bad_request("Cannot parse \"offset\" as an integer.")),
127        };
128        let limit: u32 = match req.param("limit").unwrap().parse() {
129            Ok(limit) => limit,
130            Err(_) => return Ok(Self::bad_request("Cannot parse \"limit\" as an integer.")),
131        };
132        let server = req.data::<HttpServer>().unwrap();
133        let mut ret = Vec::new();
134        for height in offset..offset+limit {
135            {
136                let block_summary_cache = server.block_summary_cache.read().await;
137                let summary = block_summary_cache.get(&height);
138                if summary.is_some() {
139                    ret.push((*summary.unwrap()).clone());
140                    continue;
141                }
142            }
143            let block = server.block_db.read().await.get(height);
144            if block.is_none() {
145                break;
146            }
147            let summary = RestBlockSummary::new(&block.unwrap());
148            server.block_summary_cache.write().await.insert(height, summary.clone());
149            ret.push(summary);
150        }
151        Ok(Self::json(&ret, true))
152    }
153    /// Helper function for `/block*` APIs.
154    async fn block_content(req: &Request<Body>) -> Result<BlockContentDBValue, Response<Body>> {
155        let server = req.data::<HttpServer>().unwrap();
156        let hash_or_height = req.param("hash_or_height").unwrap();
157        let block_content = if hash_or_height.len() == 64 {
158            let block_hash = BlockHash::from_hex(hash_or_height);
159            if block_hash.is_err() {
160                return Err(Self::not_found("Failed to decode input block hash."));
161            }
162            server.block_db.read().await.get_by_hash(&block_hash.unwrap())
163        } else {
164            let height = hash_or_height.parse();
165            if height.is_err() {
166                return Err(Self::not_found("Failed to decode input block height."));
167            }
168            server.block_db.read().await.get(height.unwrap())
169        };
170        match block_content {
171            Some(block_content) => Ok(block_content),
172            None => Err(Self::not_found("Block not found.")),
173        }
174    }
175    /// `/block_with_txids/:hash_or_height` endpoint.
176    async fn block_with_txids_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
177        let server = req.data::<HttpServer>().unwrap();
178        match Self::block_content(&req).await {
179            Ok(block_content) => Ok(Self::json(RestBlockWithTxids::from_block_content(&block_content, &server.config), true)),
180            Err(res) => Ok(res),
181        }
182    }
183    /// `/block_with_txs/:hash_or_height` endpoint.
184    async fn block_with_txs_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
185        let server = req.data::<HttpServer>().unwrap();
186        let tx_db = server.tx_db.read().await;
187        match Self::block_content(&req).await {
188            Ok(block_content) => Ok(Self::json(RestBlockWithTxs::from_block_content(&tx_db, &block_content, &server.config), true)),
189            Err(res) => Ok(res),
190        }
191    }
192    /// `/block/:hash_or_height` endpoint.
193    async fn block_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
194        let server = req.data::<HttpServer>().unwrap();
195        match Self::block_content(&req).await {
196            Ok(block_content) => Ok(Self::json(RestBlockHeader::from_block_content(&block_content, &server.config), true)),
197            Err(res) => Ok(res),
198        }
199    }
200    fn decode_script_or_address(script_or_address: &str) -> Option<Script> {
201        match Address::from_str(script_or_address) {
202            Ok(addr) => return Some(addr.script_pubkey()),
203            Err(err) => {
204                println!("Failed to decode address: {}.", err);
205            }
206        }
207        Script::from_hex(script_or_address).ok()
208    }
209    /// `/txids/:script_or_address` endpoint.
210    async fn txids_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
211        let server = req.data::<HttpServer>().unwrap();
212        let script = Self::decode_script_or_address(req.param("script_or_address").unwrap());
213        if script.is_none() {
214            return Ok(Self::not_found("Failed to decode input script or address."));
215        }
216        let txids = server.addr_index_db.read().await.get(&script.unwrap());
217        let txids = txids.iter().map(|txid| txid.to_hex()).collect::<Vec<String>>();
218        Ok(Self::json(&txids, false))
219    }
220    /// `/txs/:script_or_address` endpoint.
221    async fn txs_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
222        let server = req.data::<HttpServer>().unwrap();
223        let script = Self::decode_script_or_address(req.param("script_or_address").unwrap());
224        if script.is_none() {
225            return Ok(Self::not_found("Failed to decode input script or address."));
226        }
227        let txids = server.addr_index_db.read().await.get(&script.unwrap());
228        let tx_db = server.tx_db.read().await;
229        let mut txids_not_found = Vec::new();
230        let txs = txids.iter().map(|txid| {
231            match tx_db.get_as_rest(txid, &server.config) {
232                Some(tx) => Some(tx),
233                None => {
234                    txids_not_found.push(txid.to_string());
235                    None
236                },
237            }
238        }).collect::<Vec<Option<RestTx>>>();
239        if !txids_not_found.is_empty() {
240            return Ok(Self::internal_error(&format!("Failed to resolve transactions: {}.", txids_not_found.join(", "))));
241        }
242        let txs: Vec<RestTx> = txs.into_iter().map(|x| x.unwrap()).collect();
243        Ok(Self::json(&txs, false))
244    }
245    /// `/utxos/:script_or_address` endpoint.
246    async fn utxos_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
247        let server = req.data::<HttpServer>().unwrap();
248        let script = Self::decode_script_or_address(req.param("script_or_address").unwrap());
249        if script.is_none() {
250            return Ok(Self::not_found("Failed to decode input script or address."));
251        }
252        let tx_db = server.tx_db.read().await;
253        let values = server.utxo_server.read().await.get(&script.unwrap());
254        let mut utxos: Vec<RestUtxo> = Vec::new();
255        for utxo in values.iter() {
256            match tx_db.get(&utxo.txid) {
257                Some(tx_db_value) => utxos.push(RestUtxo::new(&utxo, &tx_db_value.tx, &server.config)),
258                None => return Ok(Self::internal_error(&format!("Failed to resolve previous txid: {}", utxo.txid))),
259            }
260        };
261        Ok(Self::json(&utxos, false))
262    }
263    /// `/rich_list_count` endpoint.
264    async fn rich_list_count_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
265        let server = req.data::<HttpServer>().unwrap();
266        let json = format!("{{\"count\":{}}}", server.rich_list.read().await.len());
267        Ok(Self::ok(json, false))
268    }
269    /// `/rich_list_addr_rank/:script_or_address` endpoint.
270    async fn rich_list_addr_rank_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
271        let server = req.data::<HttpServer>().unwrap();
272        let script = Self::decode_script_or_address(req.param("script_or_address").unwrap());
273        if script.is_none() {
274            return Ok(Self::not_found("Failed to decode input script or address."));
275        }
276        match server.rich_list.read().await.get_index_of(&script.unwrap()) {
277            Some(rank) => Ok(Self::ok(format!("{{\"rank\":{}}}", rank + 1), false)),
278            None => Ok(Self::ok("{\"rank\":null}".to_string(), false)),
279        }
280    }
281    /// `/rich_list/:offset/:limit` endpoint.
282    async fn rich_list_handler(req: Request<Body>) -> Result<Response<Body>, Infallible> {
283        let offset: usize = match req.param("offset").unwrap().parse() {
284            Ok(offset) => offset,
285            Err(_) => return Ok(Self::bad_request("Cannot parse \"offset\" as an integer.")),
286        };
287        let limit: usize = match req.param("limit").unwrap().parse() {
288            Ok(limit) => limit,
289            Err(_) => return Ok(Self::bad_request("Cannot parse \"limit\" as an integer.")),
290        };
291        let server = req.data::<HttpServer>().unwrap();
292        let rich_list = server.rich_list.read().await;
293        Ok(Self::json(&rich_list.get_in_range_as_rest(offset..offset+limit, &server.config), false))
294    }
295    pub async fn run(&self, ip: &str, port: u16) {
296        let addr = SocketAddr::from((
297            ip.parse::<std::net::IpAddr>().expect("Failed to parse HTTP IP address."),
298            port));
299        let router = Router::builder()
300            .data((*self).clone())
301            .middleware(Middleware::pre(|req| async move {
302                req.set_context(Instant::now());
303                Ok(req)
304            }))
305            .get("/api/v1/status", Self::status_handler)
306            .get("/api/v1/tx/:txid", Self::tx_handler)
307            .put("/api/v1/tx/broadcast", Self::tx_broadcast_handler)
308            .get("/api/v1/block_summary/:offset/:limit", Self::block_summary_handler)
309            .get("/api/v1/block_with_txids/:hash_or_height", Self::block_with_txids_handler)
310            .get("/api/v1/block_with_txs/:hash_or_height", Self::block_with_txs_handler)
311            .get("/api/v1/block/:hash_or_height", Self::block_handler)
312            .get("/api/v1/txids/:script_or_address", Self::txids_handler)
313            .get("/api/v1/txs/:script_or_address", Self::txs_handler)
314            .get("/api/v1/utxos/:script_or_address", Self::utxos_handler)
315            .get("/api/v1/rich_list_count", Self::rich_list_count_handler)
316            .get("/api/v1/rich_list_addr_rank/:script_or_address", Self::rich_list_addr_rank_handler)
317            .get("/api/v1/rich_list/:offset/:limit", Self::rich_list_handler)
318            .any(|_req| async {
319                Ok(Self::not_found("invalid URL."))
320            })
321            .middleware(Middleware::post_with_info(|res, req_info| async move {
322                let begin = req_info.context::<Instant>().unwrap();
323                println!("HTTP: {} {} {} ({}ms)",
324                    req_info.method(), req_info.uri().path(), res.status(), begin.elapsed().as_millis());
325                Ok(res)
326            }))
327            .err_handler_with_info(|err, _| async move {
328                eprintln!("{}", err);
329                Self::internal_error(&format!("Something went wrong: {}", err))
330            })
331            .build()
332            .unwrap();
333        let service = RouterService::new(router).unwrap();
334        let server = Server::bind(&addr).serve(service);
335        println!("HTTP server is listening on http://{}:{}/", ip, port);
336        // Fill BlockSummary cache.
337        /*
338        let mut height = 0;
339        loop {
340            let block = self.block_db.read().await.get(height);
341            if block.is_none() {
342                break;
343            }
344            let summary = RestBlockSummary::new(&block.unwrap());
345            self.block_summary_cache.write().await.insert(height, summary);
346            height += 1;
347        }
348        */
349        let graceful = server.with_graceful_shutdown(async {
350            tokio::signal::ctrl_c().await.expect("Failed to install Ctrl+C signal handler.");
351        });
352        if let Err(e) = graceful.await {
353            panic!("HttpServer failed: {}", e);
354        }
355        println!("HTTP server stopped.");
356    }
357}