miden_node_store/server/
mod.rs

1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::{block_producer_store, ntx_builder_store, rpc_store};
8use miden_node_proto_build::{
9    store_block_producer_api_descriptor,
10    store_ntx_builder_api_descriptor,
11    store_rpc_api_descriptor,
12    store_shared_api_descriptor,
13};
14use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
15use miden_node_utils::tracing::grpc::grpc_trace_fn;
16use tokio::net::TcpListener;
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::TcpListenerStream;
19use tower_http::trace::TraceLayer;
20use tracing::{info, instrument};
21
22use crate::blocks::BlockStore;
23use crate::db::Db;
24use crate::server::db_maintenance::DbMaintenance;
25use crate::state::State;
26use crate::{COMPONENT, DATABASE_MAINTENANCE_INTERVAL, GenesisState};
27
28mod api;
29mod block_producer;
30mod db_maintenance;
31mod ntx_builder;
32mod rpc_api;
33
34/// The store server.
35pub struct Store {
36    pub rpc_listener: TcpListener,
37    pub ntx_builder_listener: TcpListener,
38    pub block_producer_listener: TcpListener,
39    pub data_directory: PathBuf,
40    /// Server-side timeout for an individual gRPC request.
41    ///
42    /// If the handler takes longer than this duration, the server cancels the call.
43    pub grpc_timeout: Duration,
44}
45
46impl Store {
47    /// Bootstraps the Store, creating the database state and inserting the genesis block data.
48    #[instrument(
49        target = COMPONENT,
50        name = "store.bootstrap",
51        skip_all,
52        err,
53    )]
54    pub fn bootstrap(genesis: GenesisState, data_directory: &Path) -> anyhow::Result<()> {
55        let genesis = genesis
56            .into_block()
57            .context("failed to convert genesis configuration into the genesis block")?;
58
59        let data_directory =
60            DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
61                format!("failed to load data directory at {}", data_directory.display())
62            })?;
63        tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
64
65        let block_store = data_directory.block_store_dir();
66        let block_store =
67            BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
68                format!("failed to bootstrap block store at {}", block_store.display())
69            })?;
70        tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
71
72        // Create the genesis block and insert it into the database.
73        let database_filepath = data_directory.database_path();
74        Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
75            format!("failed to bootstrap database at {}", database_filepath.display())
76        })?;
77        tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
78
79        Ok(())
80    }
81
82    /// Serves the store APIs (rpc, ntx-builder, block-producer) and DB maintenance background task.
83    ///
84    /// Note: this blocks until the server dies.
85    pub async fn serve(self) -> anyhow::Result<()> {
86        let rpc_address = self.rpc_listener.local_addr()?;
87        let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
88        let block_producer_address = self.block_producer_listener.local_addr()?;
89        info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
90            block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout, "Loading database");
91
92        let state =
93            Arc::new(State::load(&self.data_directory).await.context("failed to load state")?);
94
95        let db_maintenance_service =
96            DbMaintenance::new(Arc::clone(&state), DATABASE_MAINTENANCE_INTERVAL);
97
98        let rpc_service =
99            rpc_store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
100        let ntx_builder_service =
101            ntx_builder_store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
102                state: Arc::clone(&state),
103            });
104        let block_producer_service =
105            block_producer_store::block_producer_server::BlockProducerServer::new(api::StoreApi {
106                state: Arc::clone(&state),
107            });
108        let reflection_service = tonic_reflection::server::Builder::configure()
109            .register_file_descriptor_set(store_rpc_api_descriptor())
110            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
111            .register_file_descriptor_set(store_block_producer_api_descriptor())
112            .register_file_descriptor_set(store_shared_api_descriptor())
113            .build_v1()
114            .context("failed to build reflection service")?;
115
116        // This is currently required for postman to work properly because
117        // it doesn't support the new version yet.
118        //
119        // See: <https://github.com/postmanlabs/postman-app-support/issues/13120>.
120        let reflection_service_alpha = tonic_reflection::server::Builder::configure()
121            .register_file_descriptor_set(store_rpc_api_descriptor())
122            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
123            .register_file_descriptor_set(store_block_producer_api_descriptor())
124            .register_file_descriptor_set(store_shared_api_descriptor())
125            .build_v1alpha()
126            .context("failed to build reflection service")?;
127
128        info!(target: COMPONENT, "Database loaded");
129
130        let mut join_set = JoinSet::new();
131
132        join_set.spawn(async move {
133            db_maintenance_service.run().await;
134            Ok(())
135        });
136
137        // Build the gRPC server with the API services and trace layer.
138        join_set.spawn(
139            tonic::transport::Server::builder()
140                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
141                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
142                .timeout(self.grpc_timeout)
143                .add_service(rpc_service)
144                .add_service(reflection_service.clone())
145                .add_service(reflection_service_alpha.clone())
146                .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
147        );
148
149        join_set.spawn(
150            tonic::transport::Server::builder()
151                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
152                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
153                .timeout(self.grpc_timeout)
154                .add_service(ntx_builder_service)
155                .add_service(reflection_service.clone())
156                .add_service(reflection_service_alpha.clone())
157                .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
158        );
159
160        join_set.spawn(
161            tonic::transport::Server::builder()
162                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
163                .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
164                .timeout(self.grpc_timeout)
165                .add_service(block_producer_service)
166                .add_service(reflection_service)
167                .add_service(reflection_service_alpha)
168                .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
169        );
170
171        // SAFETY: The joinset is definitely not empty.
172        join_set.join_next().await.unwrap()?.map_err(Into::into)
173    }
174}
175
176/// Represents the store's data-directory and its content paths.
177///
178/// Used to keep our filepath assumptions in one location.
179#[derive(Clone)]
180pub struct DataDirectory(PathBuf);
181
182impl DataDirectory {
183    /// Creates a new [`DataDirectory`], ensuring that the directory exists and is accessible
184    /// insofar as is possible.
185    pub fn load(path: PathBuf) -> std::io::Result<Self> {
186        let meta = std::fs::metadata(&path)?;
187        if meta.is_dir().not() {
188            return Err(std::io::ErrorKind::NotConnected.into());
189        }
190
191        Ok(Self(path))
192    }
193
194    pub fn block_store_dir(&self) -> PathBuf {
195        self.0.join("blocks")
196    }
197
198    pub fn database_path(&self) -> PathBuf {
199        self.0.join("miden-store.sqlite3")
200    }
201
202    pub fn display(&self) -> std::path::Display<'_> {
203        self.0.display()
204    }
205}