rosetta_server/
lib.rs

1use crate::indexer::Indexer;
2use anyhow::Result;
3use clap::Parser;
4use rosetta_core::crypto::address::Address;
5use rosetta_core::crypto::PublicKey;
6use rosetta_core::types::{
7    AccountBalanceRequest, AccountBalanceResponse, AccountCoinsRequest, AccountCoinsResponse,
8    AccountFaucetRequest, Amount, BlockRequest, BlockResponse, BlockTransactionRequest,
9    BlockTransactionResponse, CallRequest, CallResponse, ConstructionMetadataRequest,
10    ConstructionMetadataResponse, ConstructionSubmitRequest, MetadataRequest, NetworkIdentifier,
11    NetworkListResponse, NetworkOptionsResponse, NetworkRequest, NetworkStatusResponse,
12    SearchTransactionsRequest, TransactionIdentifier, TransactionIdentifierResponse, Version,
13};
14use std::net::SocketAddr;
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Duration;
18use tide::http::headers::HeaderValue;
19use tide::security::{CorsMiddleware, Origin};
20use tide::{Body, Request, Response};
21
22pub use rosetta_core::*;
23
24mod indexer;
25
26#[derive(Parser)]
27struct Opts {
28    #[clap(long)]
29    network: String,
30    #[clap(long)]
31    addr: SocketAddr,
32    #[clap(long)]
33    node_addr: String,
34    #[clap(long)]
35    path: PathBuf,
36}
37
38pub async fn main<T: BlockchainClient>() -> Result<()> {
39    femme::start();
40    let opts = Opts::parse();
41
42    log::info!("connecting to {}", &opts.node_addr);
43    let config = T::create_config(&opts.network)?;
44    let client = T::new(config, &opts.node_addr).await?;
45    let indexer = Arc::new(Indexer::new(&opts.path, client)?);
46
47    let cors = CorsMiddleware::new()
48        .allow_methods("POST".parse::<HeaderValue>().unwrap())
49        .allow_origin(Origin::from("*"))
50        .allow_credentials(false);
51    let mut app = tide::new();
52    app.with(tide::log::LogMiddleware::new());
53    app.with(cors);
54    app.at("/").nest(server(indexer.clone()));
55
56    tokio::task::spawn(async move {
57        loop {
58            tokio::time::sleep(Duration::from_secs(10)).await;
59            if let Err(err) = indexer.sync().await {
60                log::error!("{}", err);
61            }
62        }
63    });
64
65    log::info!("listening on {}", &opts.addr);
66    app.listen(opts.addr).await?;
67
68    Ok(())
69}
70
71type State<T> = Arc<Indexer<T>>;
72
73fn server<T: BlockchainClient>(client: State<T>) -> tide::Server<State<T>> {
74    let config = client.config();
75    let utxo = config.utxo;
76    let testnet = config.testnet;
77    let mut app = tide::with_state(client);
78    app.at("/account/balance").post(account_balance);
79    if utxo {
80        app.at("/account/coins").post(account_coins);
81    }
82    if testnet {
83        app.at("/account/faucet").post(account_faucet);
84    }
85    app.at("/block").post(block);
86    app.at("/block/transaction").post(block_transaction);
87    app.at("/call").post(call);
88    app.at("/construction/metadata").post(construction_metadata);
89    app.at("/construction/submit").post(construction_submit);
90    app.at("/network/list").post(network_list);
91    app.at("/network/options").post(network_options);
92    app.at("/network/status").post(network_status);
93    app.at("/search/transactions").post(search_transactions);
94    // unsupported
95    app.at("/mempool").post(unsupported);
96    app.at("/mempool/transaction").post(unsupported);
97    app.at("/construction/combine").post(unsupported);
98    app.at("/construction/derive").post(unsupported);
99    app.at("/construction/hash").post(unsupported);
100    app.at("/construction/parse").post(unsupported);
101    app.at("/construction/payloads").post(unsupported);
102    app.at("/construction/preprocess").post(unsupported);
103    app.at("/events/blocks").post(unsupported);
104    app
105}
106
107fn ok<T: serde::Serialize>(t: &T) -> tide::Result {
108    let r = Response::builder(200)
109        .body(Body::from_json(t).unwrap())
110        .build();
111    Ok(r)
112}
113
114fn is_network_supported(network_identifier: &NetworkIdentifier, config: &BlockchainConfig) -> bool {
115    network_identifier.blockchain == config.blockchain
116        && network_identifier.network == config.network
117        && network_identifier.sub_network_identifier.is_none()
118}
119
120async fn network_list<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
121    let _request: MetadataRequest = req.body_json().await?;
122    let config = req.state().config();
123    let response = NetworkListResponse {
124        network_identifiers: vec![NetworkIdentifier {
125            blockchain: config.blockchain.into(),
126            network: config.network.into(),
127            sub_network_identifier: None,
128        }],
129    };
130    ok(&response)
131}
132
133async fn network_options<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
134    let request: NetworkRequest = req.body_json().await?;
135    let config = req.state().config();
136    if !is_network_supported(&request.network_identifier, config) {
137        return Error::UnsupportedNetwork.to_result();
138    }
139    let node_version = match req.state().node_version().await {
140        Ok(node_version) => node_version,
141        Err(err) => return Error::RpcError(err).to_result(),
142    };
143    let response = NetworkOptionsResponse {
144        version: Version {
145            rosetta_version: "1.4.13".into(),
146            node_version,
147            middleware_version: Some(env!("VERGEN_GIT_DESCRIBE").into()),
148            metadata: None,
149        },
150        allow: None,
151    };
152    ok(&response)
153}
154
155async fn network_status<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
156    let request: NetworkRequest = req.body_json().await?;
157    let config = req.state().config();
158    if !is_network_supported(&request.network_identifier, config) {
159        return Error::UnsupportedNetwork.to_result();
160    }
161    let current_block_identifier = match req.state().current_block().await {
162        Ok(current_block_identifier) => current_block_identifier,
163        Err(err) => return Error::RpcError(err).to_result(),
164    };
165    let response = NetworkStatusResponse {
166        current_block_identifier,
167        current_block_timestamp: 0,
168        genesis_block_identifier: Some(req.state().genesis_block().clone()),
169        peers: None,
170        oldest_block_identifier: None,
171        sync_status: None,
172    };
173    ok(&response)
174}
175
176async fn account_balance<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
177    let request: AccountBalanceRequest = req.body_json().await?;
178    let config = req.state().config();
179    if !is_network_supported(&request.network_identifier, config) {
180        return Error::UnsupportedNetwork.to_result();
181    }
182    let block_identifier = match req.state().current_block().await {
183        Ok(block_identifier) => block_identifier,
184        Err(err) => return Error::RpcError(err).to_result(),
185    };
186    let address = Address::new(config.address_format, request.account_identifier.address);
187    let value = match req.state().balance(&address, &block_identifier).await {
188        Ok(value) => value,
189        Err(err) => return Error::RpcError(err).to_result(),
190    };
191    let response = AccountBalanceResponse {
192        balances: vec![Amount {
193            value: value.to_string(),
194            currency: config.currency(),
195            metadata: None,
196        }],
197        block_identifier,
198        metadata: None,
199    };
200    ok(&response)
201}
202
203async fn account_coins<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
204    let request: AccountCoinsRequest = req.body_json().await?;
205    let config = req.state().config();
206    if !is_network_supported(&request.network_identifier, config) {
207        return Error::UnsupportedNetwork.to_result();
208    }
209    let block_identifier = match req.state().current_block().await {
210        Ok(block_identifier) => block_identifier,
211        Err(err) => return Error::RpcError(err).to_result(),
212    };
213    let address = Address::new(config.address_format, request.account_identifier.address);
214    let coins = match req.state().coins(&address, &block_identifier).await {
215        Ok(coins) => coins,
216        Err(err) => return Error::RpcError(err).to_result(),
217    };
218    let response = AccountCoinsResponse {
219        coins,
220        block_identifier,
221        metadata: None,
222    };
223    ok(&response)
224}
225
226async fn account_faucet<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
227    let request: AccountFaucetRequest = req.body_json().await?;
228    let config = req.state().config();
229    if !is_network_supported(&request.network_identifier, config) {
230        return Error::UnsupportedNetwork.to_result();
231    }
232    let address = Address::new(config.address_format, request.account_identifier.address);
233    let hash = match req.state().faucet(&address, request.faucet_parameter).await {
234        Ok(hash) => hash,
235        Err(err) => return Error::RpcError(err).to_result(),
236    };
237    let response = TransactionIdentifierResponse {
238        transaction_identifier: TransactionIdentifier {
239            hash: hex::encode(hash),
240        },
241        metadata: None,
242    };
243    ok(&response)
244}
245
246async fn construction_metadata<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
247    let request: ConstructionMetadataRequest = req.body_json().await?;
248    let config = req.state().config();
249    if !is_network_supported(&request.network_identifier, config) {
250        return Error::UnsupportedNetwork.to_result();
251    }
252    let options: T::MetadataParams = if let Some(options) = request.options {
253        serde_json::from_value(options)?
254    } else {
255        return Error::UnsupportedOption.to_result();
256    };
257    if request.public_keys.len() != 1 {
258        return Error::MissingPublicKey.to_result();
259    }
260    let public_key = &request.public_keys[0];
261    if public_key.curve_type != config.algorithm.to_curve_type() {
262        return Error::UnsupportedCurveType.to_result();
263    }
264    let public_key_bytes = hex::decode(&public_key.hex_bytes)?;
265    let public_key = PublicKey::from_bytes(config.algorithm, &public_key_bytes)?;
266    let metadata = match req.state().metadata(&public_key, &options).await {
267        Ok(metadata) => metadata,
268        Err(err) => return Error::RpcError(err).to_result(),
269    };
270    let response = ConstructionMetadataResponse {
271        metadata: serde_json::to_value(&metadata)?,
272        suggested_fee: None,
273    };
274    ok(&response)
275}
276
277async fn construction_submit<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
278    let request: ConstructionSubmitRequest = req.body_json().await?;
279    let config = req.state().config();
280    if !is_network_supported(&request.network_identifier, config) {
281        return Error::UnsupportedNetwork.to_result();
282    }
283    let transaction = hex::decode(&request.signed_transaction)?;
284    let hash = match req.state().submit(&transaction).await {
285        Ok(hash) => hash,
286        Err(err) => return Error::RpcError(err).to_result(),
287    };
288    let response = TransactionIdentifierResponse {
289        transaction_identifier: TransactionIdentifier {
290            hash: hex::encode(hash),
291        },
292        metadata: None,
293    };
294    ok(&response)
295}
296
297async fn block<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
298    let request: BlockRequest = req.body_json().await?;
299    let config = req.state().config();
300    if !is_network_supported(&request.network_identifier, config) {
301        return Error::UnsupportedNetwork.to_result();
302    }
303    let block = match req.state().block(&request.block_identifier).await {
304        Ok(block) => block,
305        Err(err) => return Error::RpcError(err).to_result(),
306    };
307    let response = BlockResponse {
308        block: Some(block),
309        other_transactions: None,
310    };
311    ok(&response)
312}
313
314async fn block_transaction<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
315    let request: BlockTransactionRequest = req.body_json().await?;
316    let config = req.state().config();
317    if !is_network_supported(&request.network_identifier, config) {
318        return Error::UnsupportedNetwork.to_result();
319    }
320    let transaction = match req
321        .state()
322        .block_transaction(&request.block_identifier, &request.transaction_identifier)
323        .await
324    {
325        Ok(transaction) => transaction,
326        Err(err) => return Error::RpcError(err).to_result(),
327    };
328    let response = BlockTransactionResponse { transaction };
329    ok(&response)
330}
331
332async fn search_transactions<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
333    let request: SearchTransactionsRequest = req.body_json().await?;
334    let config = req.state().config();
335    if !is_network_supported(&request.network_identifier, config) {
336        return Error::UnsupportedNetwork.to_result();
337    }
338    let response = match req.state().search(&request).await {
339        Ok(response) => response,
340        Err(err) => return Error::RpcError(err).to_result(),
341    };
342    ok(&response)
343}
344
345async fn call<T: BlockchainClient>(mut req: Request<State<T>>) -> tide::Result {
346    let request: CallRequest = req.body_json().await?;
347    let config = req.state().config();
348    if !is_network_supported(&request.network_identifier, config) {
349        return Error::UnsupportedNetwork.to_result();
350    }
351    let call_result = match req.state().call(&request).await {
352        Ok(call_result) => call_result,
353        Err(err) => return Error::RpcError(err).to_result(),
354    };
355    let response = CallResponse {
356        result: call_result,
357        idempotent: false,
358    };
359    ok(&response)
360}
361
362async fn unsupported<T>(_: Request<T>) -> tide::Result {
363    Error::Unsupported.to_result()
364}
365
366#[derive(Debug)]
367pub enum Error {
368    Unimplemented,
369    Unsupported,
370    UnsupportedNetwork,
371    UnsupportedOption,
372    MissingPublicKey,
373    UnsupportedCurveType,
374    MoreThanOneSignature,
375    InvalidSignatureType,
376    RpcError(anyhow::Error),
377}
378
379impl std::fmt::Display for Error {
380    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
381        let msg = match self {
382            Self::Unimplemented => "unimplemented",
383            Self::Unsupported => "unsupported",
384            Self::UnsupportedNetwork => "unsupported network",
385            Self::UnsupportedOption => "unsupported option",
386            Self::MissingPublicKey => "missing public key",
387            Self::UnsupportedCurveType => "unsupported curve type",
388            Self::MoreThanOneSignature => "expected one signature",
389            Self::InvalidSignatureType => "invalid signature type",
390            Self::RpcError(error) => return write!(f, "rpc error: {error}",),
391        };
392        f.write_str(msg)
393    }
394}
395
396impl Error {
397    pub fn error(&self) -> Option<&anyhow::Error> {
398        let error = match self {
399            Self::RpcError(error) => error,
400            _ => return None,
401        };
402        Some(error)
403    }
404
405    pub fn description(&self) -> Option<String> {
406        self.error().map(|error| error.to_string())
407    }
408
409    pub fn to_response(&self) -> Response {
410        let error = rosetta_core::types::Error {
411            code: 500,
412            message: self.to_string(),
413            description: self.description(),
414            retriable: false,
415            details: None,
416        };
417        Response::builder(500)
418            .body(Body::from_json(&error).unwrap())
419            .build()
420    }
421
422    pub fn to_result(&self) -> tide::Result {
423        Ok(self.to_response())
424    }
425}
426
427#[cfg(feature = "tests")]
428pub mod tests {
429    use super::*;
430    use futures::stream::StreamExt;
431    use rosetta_docker::Env;
432
433    pub async fn network_list(config: BlockchainConfig) -> Result<()> {
434        let env = Env::new("network-list", config.clone()).await?;
435
436        let client = env.connector()?;
437        let networks = client.network_list().await?;
438        assert_eq!(networks.len(), 1);
439        assert_eq!(networks[0].blockchain, config.blockchain);
440        assert_eq!(networks[0].network, config.network);
441        assert!(networks[0].sub_network_identifier.is_none());
442
443        env.shutdown().await?;
444        Ok(())
445    }
446
447    pub async fn network_options<T: BlockchainClient>(config: BlockchainConfig) -> Result<()> {
448        let env = Env::new("network-options", config.clone()).await?;
449
450        let client = env.node::<T>().await?;
451        let version = client.node_version().await?;
452
453        let client = env.connector()?;
454        let options = client.network_options(config.network()).await?;
455        assert_eq!(options.version.node_version, version);
456
457        env.shutdown().await?;
458        Ok(())
459    }
460
461    pub async fn network_status<T: BlockchainClient>(config: BlockchainConfig) -> Result<()> {
462        let env = Env::new("network-status", config.clone()).await?;
463
464        let client = env.node::<T>().await?;
465        let genesis = client.genesis_block().clone();
466        let current = client.current_block().await?;
467
468        let client = env.connector()?;
469        let status = client.network_status(config.network()).await?;
470        assert_eq!(status.genesis_block_identifier, Some(genesis));
471        assert_eq!(status.current_block_identifier, current);
472
473        env.shutdown().await?;
474        Ok(())
475    }
476
477    pub async fn account(config: BlockchainConfig) -> Result<()> {
478        let env = Env::new("account", config.clone()).await?;
479
480        let value = 100 * u128::pow(10, config.currency_decimals);
481        let wallet = env.ephemeral_wallet()?;
482        wallet.faucet(value).await?;
483        let amount = wallet.balance().await?;
484        assert_eq!(amount.value, value.to_string());
485        assert_eq!(amount.currency, config.currency());
486        assert!(amount.metadata.is_none());
487
488        env.shutdown().await?;
489        Ok(())
490    }
491
492    pub async fn construction(config: BlockchainConfig) -> Result<()> {
493        let env = Env::new("construction", config.clone()).await?;
494
495        let faucet = 100 * u128::pow(10, config.currency_decimals);
496        let value = u128::pow(10, config.currency_decimals);
497        let alice = env.ephemeral_wallet()?;
498        alice.faucet(faucet).await?;
499
500        let bob = env.ephemeral_wallet()?;
501        alice.transfer(bob.account(), value).await?;
502        let amount = bob.balance().await?;
503        assert_eq!(amount.value, value.to_string());
504
505        env.shutdown().await?;
506        Ok(())
507    }
508
509    pub async fn find_transaction(config: BlockchainConfig) -> Result<()> {
510        let env = Env::new("find-transaction", config.clone()).await?;
511
512        let faucet = 100 * u128::pow(10, config.currency_decimals);
513        let value = u128::pow(10, config.currency_decimals);
514        let alice = env.ephemeral_wallet()?;
515        alice.faucet(faucet).await?;
516
517        let bob = env.ephemeral_wallet()?;
518        let tx_id = alice.transfer(bob.account(), value).await?;
519
520        let tx = alice.transaction(tx_id.clone()).await?;
521        assert_eq!(tx.transaction.transaction_identifier, tx_id);
522
523        env.shutdown().await?;
524        Ok(())
525    }
526
527    pub async fn list_transactions(config: BlockchainConfig) -> Result<()> {
528        let env = Env::new("list-transactions", config.clone()).await?;
529
530        let faucet = 100 * u128::pow(10, config.currency_decimals);
531        let value = u128::pow(10, config.currency_decimals);
532        let alice = env.ephemeral_wallet()?;
533        alice.faucet(faucet).await?;
534
535        let bob = env.ephemeral_wallet()?;
536        alice.transfer(bob.account(), value).await?;
537        alice.transfer(bob.account(), value).await?;
538        alice.transfer(bob.account(), value).await?;
539
540        tokio::time::sleep(Duration::from_secs(1)).await;
541
542        let mut stream = bob.transactions(1);
543        let mut count = 0;
544        while let Some(res) = stream.next().await {
545            let transactions = res?;
546            assert_eq!(transactions.len(), 1);
547            assert_eq!(stream.total_count(), Some(3));
548            count += 1;
549            assert!(count <= 3);
550        }
551        assert_eq!(count, 3);
552
553        let mut stream = bob.transactions(10);
554        let mut count = 0;
555        while let Some(res) = stream.next().await {
556            let transactions = res?;
557            assert_eq!(transactions.len(), 3);
558            assert_eq!(stream.total_count(), Some(3));
559            count += 1;
560            assert!(count <= 1);
561        }
562        assert_eq!(count, 1);
563
564        env.shutdown().await?;
565        Ok(())
566    }
567}