Skip to main content

miden_node_store/server/
mod.rs

1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::store;
8use miden_node_proto_build::{
9    store_block_producer_api_descriptor,
10    store_ntx_builder_api_descriptor,
11    store_rpc_api_descriptor,
12};
13use miden_node_utils::clap::GrpcOptionsInternal;
14use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
15use miden_node_utils::tracing::grpc::grpc_trace_fn;
16use miden_protocol::block::BlockSigner;
17use tokio::net::TcpListener;
18use tokio::task::JoinSet;
19use tokio_stream::wrappers::TcpListenerStream;
20use tower_http::trace::TraceLayer;
21use tracing::{info, instrument};
22
23use crate::blocks::BlockStore;
24use crate::db::Db;
25use crate::errors::ApplyBlockError;
26use crate::state::State;
27use crate::{COMPONENT, GenesisState};
28
29mod api;
30mod block_producer;
31mod ntx_builder;
32mod rpc_api;
33
34/// The store server.
35pub struct Store {
36    pub rpc_listener: TcpListener,
37    pub ntx_builder_listener: TcpListener,
38    pub block_producer_listener: TcpListener,
39    pub data_directory: PathBuf,
40    pub grpc_options: GrpcOptionsInternal,
41}
42
43impl Store {
44    /// Bootstraps the Store, creating the database state and inserting the genesis block data.
45    #[instrument(
46        target = COMPONENT,
47        name = "store.bootstrap",
48        skip_all,
49        err,
50    )]
51    pub fn bootstrap<S: BlockSigner>(
52        genesis: GenesisState<S>,
53        data_directory: &Path,
54    ) -> anyhow::Result<()> {
55        let genesis = genesis
56            .into_block()
57            .context("failed to convert genesis configuration into the genesis block")?;
58
59        let data_directory =
60            DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
61                format!("failed to load data directory at {}", data_directory.display())
62            })?;
63        tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
64
65        let block_store = data_directory.block_store_dir();
66        let block_store =
67            BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
68                format!("failed to bootstrap block store at {}", block_store.display())
69            })?;
70        tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
71
72        // Create the genesis block and insert it into the database.
73        let database_filepath = data_directory.database_path();
74        Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
75            format!("failed to bootstrap database at {}", database_filepath.display())
76        })?;
77        tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
78
79        Ok(())
80    }
81
82    /// Serves the store APIs (rpc, ntx-builder, block-producer) and DB maintenance background task.
83    ///
84    /// Note: this blocks until the server dies.
85    pub async fn serve(self) -> anyhow::Result<()> {
86        let rpc_address = self.rpc_listener.local_addr()?;
87        let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
88        let block_producer_address = self.block_producer_listener.local_addr()?;
89        info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
90            block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_options.request_timeout,
91            "Loading database");
92
93        let (termination_ask, mut termination_signal) =
94            tokio::sync::mpsc::channel::<ApplyBlockError>(1);
95        let state = Arc::new(
96            State::load(&self.data_directory, termination_ask)
97                .await
98                .context("failed to load state")?,
99        );
100
101        let rpc_service =
102            store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
103        let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
104            state: Arc::clone(&state),
105        });
106        let block_producer_service =
107            store::block_producer_server::BlockProducerServer::new(api::StoreApi {
108                state: Arc::clone(&state),
109            });
110        let reflection_service = tonic_reflection::server::Builder::configure()
111            .register_file_descriptor_set(store_rpc_api_descriptor())
112            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
113            .register_file_descriptor_set(store_block_producer_api_descriptor())
114            .build_v1()
115            .context("failed to build reflection service")?;
116
117        // This is currently required for postman to work properly because
118        // it doesn't support the new version yet.
119        //
120        // See: <https://github.com/postmanlabs/postman-app-support/issues/13120>.
121        let reflection_service_alpha = tonic_reflection::server::Builder::configure()
122            .register_file_descriptor_set(store_rpc_api_descriptor())
123            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
124            .register_file_descriptor_set(store_block_producer_api_descriptor())
125            .build_v1alpha()
126            .context("failed to build reflection service")?;
127
128        info!(target: COMPONENT, "Database loaded");
129
130        let mut join_set = JoinSet::new();
131
132        join_set.spawn(async move {
133            // Manual tests on testnet indicate each iteration takes ~2s once things are OS cached.
134            //
135            // 5 minutes seems like a reasonable interval, where this should have minimal database
136            // IO impact while providing a decent view into table growth over time.
137            let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
138            let database = Arc::clone(&state);
139            loop {
140                interval.tick().await;
141                let _ = database.analyze_table_sizes().await;
142            }
143        });
144
145        // Build the gRPC server with the API services and trace layer.
146        join_set.spawn(
147            tonic::transport::Server::builder()
148                .timeout(self.grpc_options.request_timeout)
149                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
150                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
151                .add_service(rpc_service)
152                .add_service(reflection_service.clone())
153                .add_service(reflection_service_alpha.clone())
154                .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
155        );
156
157        join_set.spawn(
158            tonic::transport::Server::builder()
159                .timeout(self.grpc_options.request_timeout)
160                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
161                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
162                .add_service(ntx_builder_service)
163                .add_service(reflection_service.clone())
164                .add_service(reflection_service_alpha.clone())
165                .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
166        );
167
168        join_set.spawn(
169            tonic::transport::Server::builder()
170                .accept_http1(true)
171                .timeout(self.grpc_options.request_timeout)
172                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
173                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
174                .add_service(block_producer_service)
175                .add_service(reflection_service)
176                .add_service(reflection_service_alpha)
177                .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
178        );
179
180        // SAFETY: The joinset is definitely not empty.
181        let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
182        tokio::select! {
183            result = service => result,
184            Some(err) = termination_signal.recv() => {
185                Err(anyhow::anyhow!("received termination signal").context(err))
186            }
187        }
188    }
189}
190
191/// Represents the store's data-directory and its content paths.
192///
193/// Used to keep our filepath assumptions in one location.
194#[derive(Clone)]
195pub struct DataDirectory(PathBuf);
196
197impl DataDirectory {
198    /// Creates a new [`DataDirectory`], ensuring that the directory exists and is accessible
199    /// insofar as is possible.
200    pub fn load(path: PathBuf) -> std::io::Result<Self> {
201        let meta = fs_err::metadata(&path)?;
202        if meta.is_dir().not() {
203            return Err(std::io::ErrorKind::NotConnected.into());
204        }
205
206        Ok(Self(path))
207    }
208
209    pub fn block_store_dir(&self) -> PathBuf {
210        self.0.join("blocks")
211    }
212
213    pub fn database_path(&self) -> PathBuf {
214        self.0.join("miden-store.sqlite3")
215    }
216
217    pub fn display(&self) -> std::path::Display<'_> {
218        self.0.display()
219    }
220}