Skip to main content

miden_node_store/server/
mod.rs

1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::store;
8use miden_node_proto_build::{
9    store_block_producer_api_descriptor,
10    store_ntx_builder_api_descriptor,
11    store_rpc_api_descriptor,
12};
13use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::BlockSigner;
16use tokio::net::TcpListener;
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::TcpListenerStream;
19use tower_http::trace::TraceLayer;
20use tracing::{info, instrument};
21
22use crate::blocks::BlockStore;
23use crate::db::Db;
24use crate::errors::ApplyBlockError;
25use crate::state::State;
26use crate::{COMPONENT, GenesisState};
27
28mod api;
29mod block_producer;
30mod ntx_builder;
31mod rpc_api;
32
33/// The store server.
34pub struct Store {
35    pub rpc_listener: TcpListener,
36    pub ntx_builder_listener: TcpListener,
37    pub block_producer_listener: TcpListener,
38    pub data_directory: PathBuf,
39    /// Server-side timeout for an individual gRPC request.
40    ///
41    /// If the handler takes longer than this duration, the server cancels the call.
42    pub grpc_timeout: Duration,
43}
44
45impl Store {
46    /// Bootstraps the Store, creating the database state and inserting the genesis block data.
47    #[instrument(
48        target = COMPONENT,
49        name = "store.bootstrap",
50        skip_all,
51        err,
52    )]
53    pub fn bootstrap<S: BlockSigner>(
54        genesis: GenesisState<S>,
55        data_directory: &Path,
56    ) -> anyhow::Result<()> {
57        let genesis = genesis
58            .into_block()
59            .context("failed to convert genesis configuration into the genesis block")?;
60
61        let data_directory =
62            DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
63                format!("failed to load data directory at {}", data_directory.display())
64            })?;
65        tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
66
67        let block_store = data_directory.block_store_dir();
68        let block_store =
69            BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
70                format!("failed to bootstrap block store at {}", block_store.display())
71            })?;
72        tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
73
74        // Create the genesis block and insert it into the database.
75        let database_filepath = data_directory.database_path();
76        Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
77            format!("failed to bootstrap database at {}", database_filepath.display())
78        })?;
79        tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
80
81        Ok(())
82    }
83
84    /// Serves the store APIs (rpc, ntx-builder, block-producer) and DB maintenance background task.
85    ///
86    /// Note: this blocks until the server dies.
87    pub async fn serve(self) -> anyhow::Result<()> {
88        let rpc_address = self.rpc_listener.local_addr()?;
89        let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
90        let block_producer_address = self.block_producer_listener.local_addr()?;
91        info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
92            block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout,
93            "Loading database");
94
95        let (termination_ask, mut termination_signal) =
96            tokio::sync::mpsc::channel::<ApplyBlockError>(1);
97        let state = Arc::new(
98            State::load(&self.data_directory, termination_ask)
99                .await
100                .context("failed to load state")?,
101        );
102
103        let rpc_service =
104            store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
105        let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
106            state: Arc::clone(&state),
107        });
108        let block_producer_service =
109            store::block_producer_server::BlockProducerServer::new(api::StoreApi {
110                state: Arc::clone(&state),
111            });
112        let reflection_service = tonic_reflection::server::Builder::configure()
113            .register_file_descriptor_set(store_rpc_api_descriptor())
114            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
115            .register_file_descriptor_set(store_block_producer_api_descriptor())
116            .build_v1()
117            .context("failed to build reflection service")?;
118
119        // This is currently required for postman to work properly because
120        // it doesn't support the new version yet.
121        //
122        // See: <https://github.com/postmanlabs/postman-app-support/issues/13120>.
123        let reflection_service_alpha = tonic_reflection::server::Builder::configure()
124            .register_file_descriptor_set(store_rpc_api_descriptor())
125            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
126            .register_file_descriptor_set(store_block_producer_api_descriptor())
127            .build_v1alpha()
128            .context("failed to build reflection service")?;
129
130        info!(target: COMPONENT, "Database loaded");
131
132        let mut join_set = JoinSet::new();
133
134        join_set.spawn(async move {
135            // Manual tests on testnet indicate each iteration takes ~2s once things are OS cached.
136            //
137            // 5 minutes seems like a reasonable interval, where this should have minimal database
138            // IO impact while providing a decent view into table growth over time.
139            let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
140            let database = Arc::clone(&state);
141            loop {
142                interval.tick().await;
143                let _ = database.analyze_table_sizes().await;
144            }
145        });
146
147        // Build the gRPC server with the API services and trace layer.
148        join_set.spawn(
149            tonic::transport::Server::builder()
150                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
151                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
152                .timeout(self.grpc_timeout)
153                .add_service(rpc_service)
154                .add_service(reflection_service.clone())
155                .add_service(reflection_service_alpha.clone())
156                .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
157        );
158
159        join_set.spawn(
160            tonic::transport::Server::builder()
161                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
162                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
163                .timeout(self.grpc_timeout)
164                .add_service(ntx_builder_service)
165                .add_service(reflection_service.clone())
166                .add_service(reflection_service_alpha.clone())
167                .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
168        );
169
170        join_set.spawn(
171            tonic::transport::Server::builder()
172                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
173                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
174                .timeout(self.grpc_timeout)
175                .add_service(block_producer_service)
176                .add_service(reflection_service)
177                .add_service(reflection_service_alpha)
178                .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
179        );
180
181        // SAFETY: The joinset is definitely not empty.
182        let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
183        tokio::select! {
184            result = service => result,
185            Some(err) = termination_signal.recv() => {
186                Err(anyhow::anyhow!("received termination signal").context(err))
187            }
188        }
189    }
190}
191
192/// Represents the store's data-directory and its content paths.
193///
194/// Used to keep our filepath assumptions in one location.
195#[derive(Clone)]
196pub struct DataDirectory(PathBuf);
197
198impl DataDirectory {
199    /// Creates a new [`DataDirectory`], ensuring that the directory exists and is accessible
200    /// insofar as is possible.
201    pub fn load(path: PathBuf) -> std::io::Result<Self> {
202        let meta = fs_err::metadata(&path)?;
203        if meta.is_dir().not() {
204            return Err(std::io::ErrorKind::NotConnected.into());
205        }
206
207        Ok(Self(path))
208    }
209
210    pub fn block_store_dir(&self) -> PathBuf {
211        self.0.join("blocks")
212    }
213
214    pub fn database_path(&self) -> PathBuf {
215        self.0.join("miden-store.sqlite3")
216    }
217
218    pub fn display(&self) -> std::path::Display<'_> {
219        self.0.display()
220    }
221}