miden_node_store/server/
mod.rs

1use std::{
2    ops::Not,
3    path::{Path, PathBuf},
4    sync::Arc,
5};
6
7use anyhow::Context;
8use miden_node_proto::generated::store;
9use miden_node_proto_build::store_api_descriptor;
10use miden_node_utils::tracing::grpc::{TracedComponent, traced_span_fn};
11use tokio::{net::TcpListener, task::JoinSet};
12use tokio_stream::wrappers::TcpListenerStream;
13use tower_http::trace::TraceLayer;
14use tracing::{info, instrument};
15
16use crate::{
17    COMPONENT, DATABASE_MAINTENANCE_INTERVAL, GenesisState, blocks::BlockStore, db::Db,
18    server::db_maintenance::DbMaintenance, state::State,
19};
20
21mod api;
22mod block_producer;
23mod db_maintenance;
24mod ntx_builder;
25mod rpc_api;
26
27/// The store server.
28pub struct Store {
29    pub rpc_listener: TcpListener,
30    pub ntx_builder_listener: TcpListener,
31    pub block_producer_listener: TcpListener,
32    pub data_directory: PathBuf,
33}
34
35impl Store {
36    /// Bootstraps the Store, creating the database state and inserting the genesis block data.
37    #[instrument(
38        target = COMPONENT,
39        name = "store.bootstrap",
40        skip_all,
41        err,
42    )]
43    pub fn bootstrap(genesis: GenesisState, data_directory: &Path) -> anyhow::Result<()> {
44        let genesis = genesis
45            .into_block()
46            .context("failed to convert genesis configuration into the genesis block")?;
47
48        let data_directory =
49            DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
50                format!("failed to load data directory at {}", data_directory.display())
51            })?;
52        tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
53
54        let block_store = data_directory.block_store_dir();
55        let block_store =
56            BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
57                format!("failed to bootstrap block store at {}", block_store.display())
58            })?;
59        tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
60
61        // Create the genesis block and insert it into the database.
62        let database_filepath = data_directory.database_path();
63        Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
64            format!("failed to bootstrap database at {}", database_filepath.display())
65        })?;
66        tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
67
68        Ok(())
69    }
70
71    /// Serves the store APIs (rpc, ntx-builder, block-producer) and DB maintenance background task.
72    ///
73    /// Note: this blocks until the server dies.
74    pub async fn serve(self) -> anyhow::Result<()> {
75        let rpc_address = self.rpc_listener.local_addr()?;
76        let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
77        let block_producer_address = self.block_producer_listener.local_addr()?;
78        info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address, block_producer_endpoint=?block_producer_address, ?self.data_directory, "Loading database");
79
80        let data_directory =
81            DataDirectory::load(self.data_directory.clone()).with_context(|| {
82                format!("failed to load data directory at {}", self.data_directory.display())
83            })?;
84
85        let block_store =
86            Arc::new(BlockStore::load(data_directory.block_store_dir()).with_context(|| {
87                format!("failed to load block store at {}", self.data_directory.display())
88            })?);
89
90        let database_filepath = data_directory.database_path();
91        let db = Db::load(database_filepath.clone()).await.with_context(|| {
92            format!("failed to load database at {}", database_filepath.display())
93        })?;
94
95        let state = Arc::new(State::load(db, block_store).await.context("failed to load state")?);
96
97        let db_maintenance_service =
98            DbMaintenance::new(Arc::clone(&state), DATABASE_MAINTENANCE_INTERVAL);
99
100        let rpc_service =
101            store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
102        let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
103            state: Arc::clone(&state),
104        });
105        let block_producer_service =
106            store::block_producer_server::BlockProducerServer::new(api::StoreApi {
107                state: Arc::clone(&state),
108            });
109        let reflection_service = tonic_reflection::server::Builder::configure()
110            .register_file_descriptor_set(store_api_descriptor())
111            .build_v1()
112            .context("failed to build reflection service")?;
113
114        // This is currently required for postman to work properly because
115        // it doesn't support the new version yet.
116        //
117        // See: <https://github.com/postmanlabs/postman-app-support/issues/13120>.
118        let reflection_service_alpha = tonic_reflection::server::Builder::configure()
119            .register_file_descriptor_set(store_api_descriptor())
120            .build_v1alpha()
121            .context("failed to build reflection service")?;
122
123        info!(target: COMPONENT, "Database loaded");
124
125        let mut join_set = JoinSet::new();
126
127        join_set.spawn(async move {
128            db_maintenance_service.run().await;
129            Ok(())
130        });
131
132        // Build the gRPC server with the API services and trace layer.
133        join_set.spawn(
134            tonic::transport::Server::builder()
135                .layer(
136                    TraceLayer::new_for_grpc()
137                        .make_span_with(traced_span_fn(TracedComponent::StoreRpc)),
138                )
139                .add_service(rpc_service)
140                .add_service(reflection_service.clone())
141                .add_service(reflection_service_alpha.clone())
142                .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
143        );
144
145        join_set.spawn(
146            tonic::transport::Server::builder()
147                .layer(
148                    TraceLayer::new_for_grpc()
149                        .make_span_with(traced_span_fn(TracedComponent::StoreNtxBuilder)),
150                )
151                .add_service(ntx_builder_service)
152                .add_service(reflection_service.clone())
153                .add_service(reflection_service_alpha.clone())
154                .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
155        );
156
157        join_set.spawn(
158            tonic::transport::Server::builder()
159                .layer(
160                    TraceLayer::new_for_grpc()
161                        .make_span_with(traced_span_fn(TracedComponent::BlockProducer)),
162                )
163                .add_service(block_producer_service)
164                .add_service(reflection_service)
165                .add_service(reflection_service_alpha)
166                .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
167        );
168
169        // SAFETY: The joinset is definitely not empty.
170        join_set.join_next().await.unwrap()?.map_err(Into::into)
171    }
172}
173
174/// Represents the store's data-directory and its content paths.
175///
176/// Used to keep our filepath assumptions in one location.
177#[derive(Clone)]
178pub struct DataDirectory(PathBuf);
179
180impl DataDirectory {
181    /// Creates a new [`DataDirectory`], ensuring that the directory exists and is accessible
182    /// insofar as is possible.
183    pub fn load(path: PathBuf) -> std::io::Result<Self> {
184        let meta = std::fs::metadata(&path)?;
185        if meta.is_dir().not() {
186            return Err(std::io::ErrorKind::NotConnected.into());
187        }
188
189        Ok(Self(path))
190    }
191
192    pub fn block_store_dir(&self) -> PathBuf {
193        self.0.join("blocks")
194    }
195
196    pub fn database_path(&self) -> PathBuf {
197        self.0.join("miden-store.sqlite3")
198    }
199
200    pub fn display(&self) -> std::path::Display<'_> {
201        self.0.display()
202    }
203}