miden_node_store/server/
mod.rs1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::{block_producer_store, ntx_builder_store, rpc_store};
8use miden_node_proto_build::{
9 store_block_producer_api_descriptor,
10 store_ntx_builder_api_descriptor,
11 store_rpc_api_descriptor,
12 store_shared_api_descriptor,
13};
14use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
15use miden_node_utils::tracing::grpc::{TracedComponent, traced_span_fn};
16use tokio::net::TcpListener;
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::TcpListenerStream;
19use tower_http::trace::TraceLayer;
20use tracing::{info, instrument};
21
22use crate::blocks::BlockStore;
23use crate::db::Db;
24use crate::server::db_maintenance::DbMaintenance;
25use crate::state::State;
26use crate::{COMPONENT, DATABASE_MAINTENANCE_INTERVAL, GenesisState};
27
28mod api;
29mod block_producer;
30mod db_maintenance;
31mod ntx_builder;
32mod rpc_api;
33
34pub struct Store {
36 pub rpc_listener: TcpListener,
37 pub ntx_builder_listener: TcpListener,
38 pub block_producer_listener: TcpListener,
39 pub data_directory: PathBuf,
40 pub grpc_timeout: Duration,
44}
45
46impl Store {
47 #[instrument(
49 target = COMPONENT,
50 name = "store.bootstrap",
51 skip_all,
52 err,
53 )]
54 pub fn bootstrap(genesis: GenesisState, data_directory: &Path) -> anyhow::Result<()> {
55 let genesis = genesis
56 .into_block()
57 .context("failed to convert genesis configuration into the genesis block")?;
58
59 let data_directory =
60 DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
61 format!("failed to load data directory at {}", data_directory.display())
62 })?;
63 tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
64
65 let block_store = data_directory.block_store_dir();
66 let block_store =
67 BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
68 format!("failed to bootstrap block store at {}", block_store.display())
69 })?;
70 tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
71
72 let database_filepath = data_directory.database_path();
74 Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
75 format!("failed to bootstrap database at {}", database_filepath.display())
76 })?;
77 tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
78
79 Ok(())
80 }
81
82 pub async fn serve(self) -> anyhow::Result<()> {
86 let rpc_address = self.rpc_listener.local_addr()?;
87 let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
88 let block_producer_address = self.block_producer_listener.local_addr()?;
89 info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
90 block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout, "Loading database");
91
92 let state =
93 Arc::new(State::load(&self.data_directory).await.context("failed to load state")?);
94
95 let db_maintenance_service =
96 DbMaintenance::new(Arc::clone(&state), DATABASE_MAINTENANCE_INTERVAL);
97
98 let rpc_service =
99 rpc_store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
100 let ntx_builder_service =
101 ntx_builder_store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
102 state: Arc::clone(&state),
103 });
104 let block_producer_service =
105 block_producer_store::block_producer_server::BlockProducerServer::new(api::StoreApi {
106 state: Arc::clone(&state),
107 });
108 let reflection_service = tonic_reflection::server::Builder::configure()
109 .register_file_descriptor_set(store_rpc_api_descriptor())
110 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
111 .register_file_descriptor_set(store_block_producer_api_descriptor())
112 .register_file_descriptor_set(store_shared_api_descriptor())
113 .build_v1()
114 .context("failed to build reflection service")?;
115
116 let reflection_service_alpha = tonic_reflection::server::Builder::configure()
121 .register_file_descriptor_set(store_rpc_api_descriptor())
122 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
123 .register_file_descriptor_set(store_block_producer_api_descriptor())
124 .register_file_descriptor_set(store_shared_api_descriptor())
125 .build_v1alpha()
126 .context("failed to build reflection service")?;
127
128 info!(target: COMPONENT, "Database loaded");
129
130 let mut join_set = JoinSet::new();
131
132 join_set.spawn(async move {
133 db_maintenance_service.run().await;
134 Ok(())
135 });
136
137 join_set.spawn(
139 tonic::transport::Server::builder()
140 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
141 .layer(
142 TraceLayer::new_for_grpc()
143 .make_span_with(traced_span_fn(TracedComponent::StoreRpc)),
144 )
145 .timeout(self.grpc_timeout)
146 .add_service(rpc_service)
147 .add_service(reflection_service.clone())
148 .add_service(reflection_service_alpha.clone())
149 .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
150 );
151
152 join_set.spawn(
153 tonic::transport::Server::builder()
154 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
155 .layer(
156 TraceLayer::new_for_grpc()
157 .make_span_with(traced_span_fn(TracedComponent::StoreNtxBuilder)),
158 )
159 .timeout(self.grpc_timeout)
160 .add_service(ntx_builder_service)
161 .add_service(reflection_service.clone())
162 .add_service(reflection_service_alpha.clone())
163 .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
164 );
165
166 join_set.spawn(
167 tonic::transport::Server::builder()
168 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
169 .layer(
170 TraceLayer::new_for_grpc()
171 .make_span_with(traced_span_fn(TracedComponent::BlockProducer)),
172 )
173 .timeout(self.grpc_timeout)
174 .add_service(block_producer_service)
175 .add_service(reflection_service)
176 .add_service(reflection_service_alpha)
177 .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
178 );
179
180 join_set.join_next().await.unwrap()?.map_err(Into::into)
182 }
183}
184
185#[derive(Clone)]
189pub struct DataDirectory(PathBuf);
190
191impl DataDirectory {
192 pub fn load(path: PathBuf) -> std::io::Result<Self> {
195 let meta = std::fs::metadata(&path)?;
196 if meta.is_dir().not() {
197 return Err(std::io::ErrorKind::NotConnected.into());
198 }
199
200 Ok(Self(path))
201 }
202
203 pub fn block_store_dir(&self) -> PathBuf {
204 self.0.join("blocks")
205 }
206
207 pub fn database_path(&self) -> PathBuf {
208 self.0.join("miden-store.sqlite3")
209 }
210
211 pub fn display(&self) -> std::path::Display<'_> {
212 self.0.display()
213 }
214}