miden_node_store/server/
mod.rs

1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::{block_producer_store, ntx_builder_store, rpc_store};
8use miden_node_proto_build::{
9    store_block_producer_api_descriptor,
10    store_ntx_builder_api_descriptor,
11    store_rpc_api_descriptor,
12    store_shared_api_descriptor,
13};
14use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
15use miden_node_utils::tracing::grpc::{TracedComponent, traced_span_fn};
16use tokio::net::TcpListener;
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::TcpListenerStream;
19use tower_http::trace::TraceLayer;
20use tracing::{info, instrument};
21
22use crate::blocks::BlockStore;
23use crate::db::Db;
24use crate::server::db_maintenance::DbMaintenance;
25use crate::state::State;
26use crate::{COMPONENT, DATABASE_MAINTENANCE_INTERVAL, GenesisState};
27
28mod api;
29mod block_producer;
30mod db_maintenance;
31mod ntx_builder;
32mod rpc_api;
33
34/// The store server.
35pub struct Store {
36    pub rpc_listener: TcpListener,
37    pub ntx_builder_listener: TcpListener,
38    pub block_producer_listener: TcpListener,
39    pub data_directory: PathBuf,
40    /// Server-side timeout for an individual gRPC request.
41    ///
42    /// If the handler takes longer than this duration, the server cancels the call.
43    pub grpc_timeout: Duration,
44}
45
46impl Store {
47    /// Bootstraps the Store, creating the database state and inserting the genesis block data.
48    #[instrument(
49        target = COMPONENT,
50        name = "store.bootstrap",
51        skip_all,
52        err,
53    )]
54    pub fn bootstrap(genesis: GenesisState, data_directory: &Path) -> anyhow::Result<()> {
55        let genesis = genesis
56            .into_block()
57            .context("failed to convert genesis configuration into the genesis block")?;
58
59        let data_directory =
60            DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
61                format!("failed to load data directory at {}", data_directory.display())
62            })?;
63        tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
64
65        let block_store = data_directory.block_store_dir();
66        let block_store =
67            BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
68                format!("failed to bootstrap block store at {}", block_store.display())
69            })?;
70        tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
71
72        // Create the genesis block and insert it into the database.
73        let database_filepath = data_directory.database_path();
74        Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
75            format!("failed to bootstrap database at {}", database_filepath.display())
76        })?;
77        tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
78
79        Ok(())
80    }
81
82    /// Serves the store APIs (rpc, ntx-builder, block-producer) and DB maintenance background task.
83    ///
84    /// Note: this blocks until the server dies.
85    pub async fn serve(self) -> anyhow::Result<()> {
86        let rpc_address = self.rpc_listener.local_addr()?;
87        let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
88        let block_producer_address = self.block_producer_listener.local_addr()?;
89        info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
90            block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout, "Loading database");
91
92        let state =
93            Arc::new(State::load(&self.data_directory).await.context("failed to load state")?);
94
95        let db_maintenance_service =
96            DbMaintenance::new(Arc::clone(&state), DATABASE_MAINTENANCE_INTERVAL);
97
98        let rpc_service =
99            rpc_store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
100        let ntx_builder_service =
101            ntx_builder_store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
102                state: Arc::clone(&state),
103            });
104        let block_producer_service =
105            block_producer_store::block_producer_server::BlockProducerServer::new(api::StoreApi {
106                state: Arc::clone(&state),
107            });
108        let reflection_service = tonic_reflection::server::Builder::configure()
109            .register_file_descriptor_set(store_rpc_api_descriptor())
110            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
111            .register_file_descriptor_set(store_block_producer_api_descriptor())
112            .register_file_descriptor_set(store_shared_api_descriptor())
113            .build_v1()
114            .context("failed to build reflection service")?;
115
116        // This is currently required for postman to work properly because
117        // it doesn't support the new version yet.
118        //
119        // See: <https://github.com/postmanlabs/postman-app-support/issues/13120>.
120        let reflection_service_alpha = tonic_reflection::server::Builder::configure()
121            .register_file_descriptor_set(store_rpc_api_descriptor())
122            .register_file_descriptor_set(store_ntx_builder_api_descriptor())
123            .register_file_descriptor_set(store_block_producer_api_descriptor())
124            .register_file_descriptor_set(store_shared_api_descriptor())
125            .build_v1alpha()
126            .context("failed to build reflection service")?;
127
128        info!(target: COMPONENT, "Database loaded");
129
130        let mut join_set = JoinSet::new();
131
132        join_set.spawn(async move {
133            db_maintenance_service.run().await;
134            Ok(())
135        });
136
137        // Build the gRPC server with the API services and trace layer.
138        join_set.spawn(
139            tonic::transport::Server::builder()
140                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
141                .layer(
142                    TraceLayer::new_for_grpc()
143                        .make_span_with(traced_span_fn(TracedComponent::StoreRpc)),
144                )
145                .timeout(self.grpc_timeout)
146                .add_service(rpc_service)
147                .add_service(reflection_service.clone())
148                .add_service(reflection_service_alpha.clone())
149                .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
150        );
151
152        join_set.spawn(
153            tonic::transport::Server::builder()
154                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
155                .layer(
156                    TraceLayer::new_for_grpc()
157                        .make_span_with(traced_span_fn(TracedComponent::StoreNtxBuilder)),
158                )
159                .timeout(self.grpc_timeout)
160                .add_service(ntx_builder_service)
161                .add_service(reflection_service.clone())
162                .add_service(reflection_service_alpha.clone())
163                .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
164        );
165
166        join_set.spawn(
167            tonic::transport::Server::builder()
168                .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
169                .layer(
170                    TraceLayer::new_for_grpc()
171                        .make_span_with(traced_span_fn(TracedComponent::BlockProducer)),
172                )
173                .timeout(self.grpc_timeout)
174                .add_service(block_producer_service)
175                .add_service(reflection_service)
176                .add_service(reflection_service_alpha)
177                .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
178        );
179
180        // SAFETY: The joinset is definitely not empty.
181        join_set.join_next().await.unwrap()?.map_err(Into::into)
182    }
183}
184
185/// Represents the store's data-directory and its content paths.
186///
187/// Used to keep our filepath assumptions in one location.
188#[derive(Clone)]
189pub struct DataDirectory(PathBuf);
190
191impl DataDirectory {
192    /// Creates a new [`DataDirectory`], ensuring that the directory exists and is accessible
193    /// insofar as is possible.
194    pub fn load(path: PathBuf) -> std::io::Result<Self> {
195        let meta = std::fs::metadata(&path)?;
196        if meta.is_dir().not() {
197            return Err(std::io::ErrorKind::NotConnected.into());
198        }
199
200        Ok(Self(path))
201    }
202
203    pub fn block_store_dir(&self) -> PathBuf {
204        self.0.join("blocks")
205    }
206
207    pub fn database_path(&self) -> PathBuf {
208        self.0.join("miden-store.sqlite3")
209    }
210
211    pub fn display(&self) -> std::path::Display<'_> {
212        self.0.display()
213    }
214}