Skip to main content

tycho_rpc/
node.rs

1use anyhow::{Context, Result};
2use tycho_types::models::BlockId;
3
4use crate::config::RpcConfig;
5use crate::state::{RpcBlockSubscriber, RpcState, RpcStateSubscriber};
6
7pub trait NodeBaseInitRpc: Send + Sync {
8    fn init_simple_rpc_raw(
9        &self,
10        last_block_id: &BlockId,
11        config: &RpcConfig,
12    ) -> impl Future<Output = Result<RpcState>> + Send;
13
14    fn init_simple_rpc(
15        &self,
16        last_block_id: &BlockId,
17        config: &RpcConfig,
18    ) -> impl Future<Output = Result<(RpcBlockSubscriber, RpcStateSubscriber)>> + Send {
19        async move {
20            self.init_simple_rpc_raw(last_block_id, config)
21                .await
22                .map(RpcState::split)
23        }
24    }
25
26    fn init_simple_rpc_opt(
27        &self,
28        last_block_id: &BlockId,
29        config: Option<&RpcConfig>,
30    ) -> impl Future<Output = Result<(Option<RpcBlockSubscriber>, Option<RpcStateSubscriber>)>> + Send
31    {
32        async move {
33            Ok(match config {
34                Some(config) => Some(self.init_simple_rpc(last_block_id, config).await?).unzip(),
35                None => (None, None),
36            })
37        }
38    }
39}
40
41impl NodeBaseInitRpc for tycho_core::node::NodeBase {
42    async fn init_simple_rpc_raw(
43        &self,
44        last_block_id: &BlockId,
45        config: &RpcConfig,
46    ) -> Result<RpcState> {
47        let rpc_state = RpcState::builder()
48            .with_config(config.clone())
49            .with_storage(self.core_storage.clone())
50            .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
51            .with_zerostate_id(self.global_config.zerostate)
52            .build()?;
53
54        rpc_state.init(last_block_id).await?;
55
56        let endpoint = rpc_state
57            .bind_endpoint()
58            .await
59            .context("failed to setup RPC server endpoint")?;
60
61        tracing::info!(listen_addr = %config.listen_addr, "RPC server started");
62        tokio::task::spawn(async move {
63            if let Err(e) = endpoint.serve().await {
64                tracing::error!("RPC server failed: {e:?}");
65            }
66            tracing::info!("RPC server stopped");
67        });
68
69        Ok(rpc_state)
70    }
71}