1use crate::indexer::Indexer;
2use anyhow::Result;
3use clap::Parser;
4use rosetta_core::crypto::address::Address;
5use rosetta_core::crypto::PublicKey;
6use rosetta_core::types::{
7 AccountBalanceRequest, AccountBalanceResponse, AccountCoinsRequest, AccountCoinsResponse,
8 AccountFaucetRequest, Amount, BlockRequest, BlockResponse, BlockTransactionRequest,
9 BlockTransactionResponse, CallRequest, CallResponse, ConstructionMetadataRequest,
10 ConstructionMetadataResponse, ConstructionSubmitRequest, MetadataRequest, NetworkIdentifier,
11 NetworkListResponse, NetworkOptionsResponse, NetworkRequest, NetworkStatusResponse,
12 SearchTransactionsRequest, TransactionIdentifier, TransactionIdentifierResponse, Version,
13};
14use std::net::SocketAddr;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Duration;
18use tide::http::headers::HeaderValue;
19use tide::security::{CorsMiddleware, Origin};
20use tide::{Body, Request, Response};
21
22pub use rosetta_core::*;
23
24mod indexer;
25
26#[derive(Parser)]
27struct Opts {
28 #[clap(long)]
29 network: String,
30 #[clap(long)]
31 addr: SocketAddr,
32 #[clap(long)]
33 node_addr: String,
34 #[clap(long)]
35 path: PathBuf,
36}
37
38pub async fn main<T: BlockchainClient>() -> Result<()> {
39 femme::start();
40 let opts = Opts::parse();
41
42 log::info!("connecting to {}", &opts.node_addr);
43 let config = T::create_config(&opts.network)?;
44 let client = T::new(config, &opts.node_addr).await?;
45 let indexer = Arc::new(Indexer::new(&opts.path, client)?);
46
47 let cors = CorsMiddleware::new()
48 .allow_methods("POST".parse::<HeaderValue>().unwrap())
49 .allow_origin(Origin::from("*"))
50 .allow_credentials(false);
51 let mut app = tide::new();
52 app.with(tide::log::LogMiddleware::new());
53 app.with(cors);
54 app.at("/").nest(server(indexer.clone()));
55
56 tokio::task::spawn(async move {
57 loop {
58 tokio::time::sleep(Duration::from_secs(10)).await;
59 if let Err(err) = indexer.sync().await {
60 log::error!("{}", err);
61 }
62 }
63 });
64
65 log::info!("listening on {}", &opts.addr);
66 app.listen(opts.addr).await?;
67
68 Ok(())
69}
70
71type State<T> = Arc<Indexer<T>>;
72
73fn server<T: BlockchainClient>(client: State<T>) -> tide::Server<State<T>> {
74 let config = client.config();
75 let utxo = config.utxo;
76 let testnet = config.testnet;
77 let mut app = tide::with_state(client);
78 app.at("/account/balance").post(account_balance);
79 if utxo {
80 app.at("/account/coins").post(account_coins);
81 }
82 if testnet {
83 app.at("/account/faucet").post(account_faucet);
84 }
85 app.at("/block").post(block);
86 app.at("/block/transaction").post(block_transaction);
87 app.at("/call").post(call);
88 app.at("/construction/metadata").post(construction_metadata);
89 app.at("/construction/submit").post(construction_submit);
90 app.at("/network/list").post(network_list);
91 app.at("/network/options").post(network_options);
92 app.at("/network/status").post(network_status);
93 app.at("/search/transactions").post(search_transactions);
94 app.at("/mempool").post(unsupported);
96 app.at("/mempool/transaction").post(unsupported);
97 app.at("/construction/combine").post(unsupported);
98 app.at("/construction/derive").post(unsupported);
99 app.at("/construction/hash").post(unsupported);
100 app.at("/construction/parse").post(unsupported);
101 app.at("/construction/payloads").post(unsupported);
102 app.at("/construction/preprocess").post(unsupported);
103 app.at("/events/blocks").post(unsupported);
104 app
105}
106
107fn ok<T: serde::Serialize>(t: &T) -> tide::Result {
108 let r = Response::builder(200)
109 .body(Body::from_json(t).unwrap())
110 .build();
111 Ok(r)
112}
113
114fn is_network_supported(network_identifier: &NetworkIdentifier, config: &BlockchainConfig) -> bool {
115 network_identifier.blockchain == config.blockchain
116 && network_identifier.network == config.network
117 && network_identifier.sub_network_identifier.is_none()
118}
119
120async fn network_list<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
121 let _request: MetadataRequest = req.body_json().await?;
122 let config = req.state().config();
123 let response = NetworkListResponse {
124 network_identifiers: vec![NetworkIdentifier {
125 blockchain: config.blockchain.into(),
126 network: config.network.into(),
127 sub_network_identifier: None,
128 }],
129 };
130 ok(&response)
131}
132
133async fn network_options<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
134 let request: NetworkRequest = req.body_json().await?;
135 let config = req.state().config();
136 if !is_network_supported(&request.network_identifier, config) {
137 return Error::UnsupportedNetwork.to_result();
138 }
139 let node_version = match req.state().node_version().await {
140 Ok(node_version) => node_version,
141 Err(err) => return Error::RpcError(err).to_result(),
142 };
143 let response = NetworkOptionsResponse {
144 version: Version {
145 rosetta_version: "1.4.13".into(),
146 node_version,
147 middleware_version: Some(env!("VERGEN_GIT_DESCRIBE").into()),
148 metadata: None,
149 },
150 allow: None,
151 };
152 ok(&response)
153}
154
155async fn network_status<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
156 let request: NetworkRequest = req.body_json().await?;
157 let config = req.state().config();
158 if !is_network_supported(&request.network_identifier, config) {
159 return Error::UnsupportedNetwork.to_result();
160 }
161 let current_block_identifier = match req.state().current_block().await {
162 Ok(current_block_identifier) => current_block_identifier,
163 Err(err) => return Error::RpcError(err).to_result(),
164 };
165 let response = NetworkStatusResponse {
166 current_block_identifier,
167 current_block_timestamp: 0,
168 genesis_block_identifier: Some(req.state().genesis_block().clone()),
169 peers: None,
170 oldest_block_identifier: None,
171 sync_status: None,
172 };
173 ok(&response)
174}
175
176async fn account_balance<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
177 let request: AccountBalanceRequest = req.body_json().await?;
178 let config = req.state().config();
179 if !is_network_supported(&request.network_identifier, config) {
180 return Error::UnsupportedNetwork.to_result();
181 }
182 let block_identifier = match req.state().current_block().await {
183 Ok(block_identifier) => block_identifier,
184 Err(err) => return Error::RpcError(err).to_result(),
185 };
186 let address = Address::new(config.address_format, request.account_identifier.address);
187 let value = match req.state().balance(&address, &block_identifier).await {
188 Ok(value) => value,
189 Err(err) => return Error::RpcError(err).to_result(),
190 };
191 let response = AccountBalanceResponse {
192 balances: vec![Amount {
193 value: value.to_string(),
194 currency: config.currency(),
195 metadata: None,
196 }],
197 block_identifier,
198 metadata: None,
199 };
200 ok(&response)
201}
202
203async fn account_coins<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
204 let request: AccountCoinsRequest = req.body_json().await?;
205 let config = req.state().config();
206 if !is_network_supported(&request.network_identifier, config) {
207 return Error::UnsupportedNetwork.to_result();
208 }
209 let block_identifier = match req.state().current_block().await {
210 Ok(block_identifier) => block_identifier,
211 Err(err) => return Error::RpcError(err).to_result(),
212 };
213 let address = Address::new(config.address_format, request.account_identifier.address);
214 let coins = match req.state().coins(&address, &block_identifier).await {
215 Ok(coins) => coins,
216 Err(err) => return Error::RpcError(err).to_result(),
217 };
218 let response = AccountCoinsResponse {
219 coins,
220 block_identifier,
221 metadata: None,
222 };
223 ok(&response)
224}
225
226async fn account_faucet<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
227 let request: AccountFaucetRequest = req.body_json().await?;
228 let config = req.state().config();
229 if !is_network_supported(&request.network_identifier, config) {
230 return Error::UnsupportedNetwork.to_result();
231 }
232 let address = Address::new(config.address_format, request.account_identifier.address);
233 let hash = match req.state().faucet(&address, request.faucet_parameter).await {
234 Ok(hash) => hash,
235 Err(err) => return Error::RpcError(err).to_result(),
236 };
237 let response = TransactionIdentifierResponse {
238 transaction_identifier: TransactionIdentifier {
239 hash: hex::encode(hash),
240 },
241 metadata: None,
242 };
243 ok(&response)
244}
245
246async fn construction_metadata<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
247 let request: ConstructionMetadataRequest = req.body_json().await?;
248 let config = req.state().config();
249 if !is_network_supported(&request.network_identifier, config) {
250 return Error::UnsupportedNetwork.to_result();
251 }
252 let options: T::MetadataParams = if let Some(options) = request.options {
253 serde_json::from_value(options)?
254 } else {
255 return Error::UnsupportedOption.to_result();
256 };
257 if request.public_keys.len() != 1 {
258 return Error::MissingPublicKey.to_result();
259 }
260 let public_key = &request.public_keys[0];
261 if public_key.curve_type != config.algorithm.to_curve_type() {
262 return Error::UnsupportedCurveType.to_result();
263 }
264 let public_key_bytes = hex::decode(&public_key.hex_bytes)?;
265 let public_key = PublicKey::from_bytes(config.algorithm, &public_key_bytes)?;
266 let metadata = match req.state().metadata(&public_key, &options).await {
267 Ok(metadata) => metadata,
268 Err(err) => return Error::RpcError(err).to_result(),
269 };
270 let response = ConstructionMetadataResponse {
271 metadata: serde_json::to_value(&metadata)?,
272 suggested_fee: None,
273 };
274 ok(&response)
275}
276
277async fn construction_submit<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
278 let request: ConstructionSubmitRequest = req.body_json().await?;
279 let config = req.state().config();
280 if !is_network_supported(&request.network_identifier, config) {
281 return Error::UnsupportedNetwork.to_result();
282 }
283 let transaction = hex::decode(&request.signed_transaction)?;
284 let hash = match req.state().submit(&transaction).await {
285 Ok(hash) => hash,
286 Err(err) => return Error::RpcError(err).to_result(),
287 };
288 let response = TransactionIdentifierResponse {
289 transaction_identifier: TransactionIdentifier {
290 hash: hex::encode(hash),
291 },
292 metadata: None,
293 };
294 ok(&response)
295}
296
297async fn block<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
298 let request: BlockRequest = req.body_json().await?;
299 let config = req.state().config();
300 if !is_network_supported(&request.network_identifier, config) {
301 return Error::UnsupportedNetwork.to_result();
302 }
303 let block = match req.state().block(&request.block_identifier).await {
304 Ok(block) => block,
305 Err(err) => return Error::RpcError(err).to_result(),
306 };
307 let response = BlockResponse {
308 block: Some(block),
309 other_transactions: None,
310 };
311 ok(&response)
312}
313
314async fn block_transaction<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
315 let request: BlockTransactionRequest = req.body_json().await?;
316 let config = req.state().config();
317 if !is_network_supported(&request.network_identifier, config) {
318 return Error::UnsupportedNetwork.to_result();
319 }
320 let transaction = match req
321 .state()
322 .block_transaction(&request.block_identifier, &request.transaction_identifier)
323 .await
324 {
325 Ok(transaction) => transaction,
326 Err(err) => return Error::RpcError(err).to_result(),
327 };
328 let response = BlockTransactionResponse { transaction };
329 ok(&response)
330}
331
332async fn search_transactions<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
333 let request: SearchTransactionsRequest = req.body_json().await?;
334 let config = req.state().config();
335 if !is_network_supported(&request.network_identifier, config) {
336 return Error::UnsupportedNetwork.to_result();
337 }
338 let response = match req.state().search(&request).await {
339 Ok(response) => response,
340 Err(err) => return Error::RpcError(err).to_result(),
341 };
342 ok(&response)
343}
344
345async fn call<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
346 let request: CallRequest = req.body_json().await?;
347 let config = req.state().config();
348 if !is_network_supported(&request.network_identifier, config) {
349 return Error::UnsupportedNetwork.to_result();
350 }
351 let call_result = match req.state().call(&request).await {
352 Ok(call_result) => call_result,
353 Err(err) => return Error::RpcError(err).to_result(),
354 };
355 let response = CallResponse {
356 result: call_result,
357 idempotent: false,
358 };
359 ok(&response)
360}
361
362async fn unsupported<T>(_: Request<T>) -> tide::Result {
363 Error::Unsupported.to_result()
364}
365
366#[derive(Debug)]
367pub enum Error {
368 Unimplemented,
369 Unsupported,
370 UnsupportedNetwork,
371 UnsupportedOption,
372 MissingPublicKey,
373 UnsupportedCurveType,
374 MoreThanOneSignature,
375 InvalidSignatureType,
376 RpcError(anyhow::Error),
377}
378
379impl std::fmt::Display for Error {
380 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
381 let msg = match self {
382 Self::Unimplemented => "unimplemented",
383 Self::Unsupported => "unsupported",
384 Self::UnsupportedNetwork => "unsupported network",
385 Self::UnsupportedOption => "unsupported option",
386 Self::MissingPublicKey => "missing public key",
387 Self::UnsupportedCurveType => "unsupported curve type",
388 Self::MoreThanOneSignature => "expected one signature",
389 Self::InvalidSignatureType => "invalid signature type",
390 Self::RpcError(error) => return write!(f, "rpc error: {error}",),
391 };
392 f.write_str(msg)
393 }
394}
395
396impl Error {
397 pub fn error(&self) -> Option<&anyhow::Error> {
398 let error = match self {
399 Self::RpcError(error) => error,
400 _ => return None,
401 };
402 Some(error)
403 }
404
405 pub fn description(&self) -> Option<String> {
406 self.error().map(|error| error.to_string())
407 }
408
409 pub fn to_response(&self) -> Response {
410 let error = rosetta_core::types::Error {
411 code: 500,
412 message: self.to_string(),
413 description: self.description(),
414 retriable: false,
415 details: None,
416 };
417 Response::builder(500)
418 .body(Body::from_json(&error).unwrap())
419 .build()
420 }
421
422 pub fn to_result(&self) -> tide::Result {
423 Ok(self.to_response())
424 }
425}
426
427#[cfg(feature = "tests")]
428pub mod tests {
429 use super::*;
430 use futures::stream::StreamExt;
431 use rosetta_docker::Env;
432
433 pub async fn network_list(config: BlockchainConfig) -> Result<()> {
434 let env = Env::new("network-list", config.clone()).await?;
435
436 let client = env.connector()?;
437 let networks = client.network_list().await?;
438 assert_eq!(networks.len(), 1);
439 assert_eq!(networks[0].blockchain, config.blockchain);
440 assert_eq!(networks[0].network, config.network);
441 assert!(networks[0].sub_network_identifier.is_none());
442
443 env.shutdown().await?;
444 Ok(())
445 }
446
447 pub async fn network_options<T: BlockchainClient>(config: BlockchainConfig) -> Result<()> {
448 let env = Env::new("network-options", config.clone()).await?;
449
450 let client = env.node::<T>().await?;
451 let version = client.node_version().await?;
452
453 let client = env.connector()?;
454 let options = client.network_options(config.network()).await?;
455 assert_eq!(options.version.node_version, version);
456
457 env.shutdown().await?;
458 Ok(())
459 }
460
461 pub async fn network_status<T: BlockchainClient>(config: BlockchainConfig) -> Result<()> {
462 let env = Env::new("network-status", config.clone()).await?;
463
464 let client = env.node::<T>().await?;
465 let genesis = client.genesis_block().clone();
466 let current = client.current_block().await?;
467
468 let client = env.connector()?;
469 let status = client.network_status(config.network()).await?;
470 assert_eq!(status.genesis_block_identifier, Some(genesis));
471 assert_eq!(status.current_block_identifier, current);
472
473 env.shutdown().await?;
474 Ok(())
475 }
476
477 pub async fn account(config: BlockchainConfig) -> Result<()> {
478 let env = Env::new("account", config.clone()).await?;
479
480 let value = 100 * u128::pow(10, config.currency_decimals);
481 let wallet = env.ephemeral_wallet()?;
482 wallet.faucet(value).await?;
483 let amount = wallet.balance().await?;
484 assert_eq!(amount.value, value.to_string());
485 assert_eq!(amount.currency, config.currency());
486 assert!(amount.metadata.is_none());
487
488 env.shutdown().await?;
489 Ok(())
490 }
491
492 pub async fn construction(config: BlockchainConfig) -> Result<()> {
493 let env = Env::new("construction", config.clone()).await?;
494
495 let faucet = 100 * u128::pow(10, config.currency_decimals);
496 let value = u128::pow(10, config.currency_decimals);
497 let alice = env.ephemeral_wallet()?;
498 alice.faucet(faucet).await?;
499
500 let bob = env.ephemeral_wallet()?;
501 alice.transfer(bob.account(), value).await?;
502 let amount = bob.balance().await?;
503 assert_eq!(amount.value, value.to_string());
504
505 env.shutdown().await?;
506 Ok(())
507 }
508
509 pub async fn find_transaction(config: BlockchainConfig) -> Result<()> {
510 let env = Env::new("find-transaction", config.clone()).await?;
511
512 let faucet = 100 * u128::pow(10, config.currency_decimals);
513 let value = u128::pow(10, config.currency_decimals);
514 let alice = env.ephemeral_wallet()?;
515 alice.faucet(faucet).await?;
516
517 let bob = env.ephemeral_wallet()?;
518 let tx_id = alice.transfer(bob.account(), value).await?;
519
520 let tx = alice.transaction(tx_id.clone()).await?;
521 assert_eq!(tx.transaction.transaction_identifier, tx_id);
522
523 env.shutdown().await?;
524 Ok(())
525 }
526
527 pub async fn list_transactions(config: BlockchainConfig) -> Result<()> {
528 let env = Env::new("list-transactions", config.clone()).await?;
529
530 let faucet = 100 * u128::pow(10, config.currency_decimals);
531 let value = u128::pow(10, config.currency_decimals);
532 let alice = env.ephemeral_wallet()?;
533 alice.faucet(faucet).await?;
534
535 let bob = env.ephemeral_wallet()?;
536 alice.transfer(bob.account(), value).await?;
537 alice.transfer(bob.account(), value).await?;
538 alice.transfer(bob.account(), value).await?;
539
540 tokio::time::sleep(Duration::from_secs(1)).await;
541
542 let mut stream = bob.transactions(1);
543 let mut count = 0;
544 while let Some(res) = stream.next().await {
545 let transactions = res?;
546 assert_eq!(transactions.len(), 1);
547 assert_eq!(stream.total_count(), Some(3));
548 count += 1;
549 assert!(count <= 3);
550 }
551 assert_eq!(count, 3);
552
553 let mut stream = bob.transactions(10);
554 let mut count = 0;
555 while let Some(res) = stream.next().await {
556 let transactions = res?;
557 assert_eq!(transactions.len(), 3);
558 assert_eq!(stream.total_count(), Some(3));
559 count += 1;
560 assert!(count <= 1);
561 }
562 assert_eq!(count, 1);
563
564 env.shutdown().await?;
565 Ok(())
566 }
567}