miden_node_store/server/
mod.rs1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::store;
8use miden_node_proto_build::{
9 store_block_producer_api_descriptor,
10 store_ntx_builder_api_descriptor,
11 store_rpc_api_descriptor,
12};
13use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
14use miden_node_utils::tracing::grpc::grpc_trace_fn;
15use miden_protocol::block::BlockSigner;
16use tokio::net::TcpListener;
17use tokio::task::JoinSet;
18use tokio_stream::wrappers::TcpListenerStream;
19use tower_http::trace::TraceLayer;
20use tracing::{info, instrument};
21
22use crate::blocks::BlockStore;
23use crate::db::Db;
24use crate::errors::ApplyBlockError;
25use crate::state::State;
26use crate::{COMPONENT, GenesisState};
27
28mod api;
29mod block_producer;
30mod ntx_builder;
31mod rpc_api;
32
33pub struct Store {
35 pub rpc_listener: TcpListener,
36 pub ntx_builder_listener: TcpListener,
37 pub block_producer_listener: TcpListener,
38 pub data_directory: PathBuf,
39 pub grpc_timeout: Duration,
43}
44
45impl Store {
46 #[instrument(
48 target = COMPONENT,
49 name = "store.bootstrap",
50 skip_all,
51 err,
52 )]
53 pub fn bootstrap<S: BlockSigner>(
54 genesis: GenesisState<S>,
55 data_directory: &Path,
56 ) -> anyhow::Result<()> {
57 let genesis = genesis
58 .into_block()
59 .context("failed to convert genesis configuration into the genesis block")?;
60
61 let data_directory =
62 DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
63 format!("failed to load data directory at {}", data_directory.display())
64 })?;
65 tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
66
67 let block_store = data_directory.block_store_dir();
68 let block_store =
69 BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
70 format!("failed to bootstrap block store at {}", block_store.display())
71 })?;
72 tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
73
74 let database_filepath = data_directory.database_path();
76 Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
77 format!("failed to bootstrap database at {}", database_filepath.display())
78 })?;
79 tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
80
81 Ok(())
82 }
83
84 pub async fn serve(self) -> anyhow::Result<()> {
88 let rpc_address = self.rpc_listener.local_addr()?;
89 let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
90 let block_producer_address = self.block_producer_listener.local_addr()?;
91 info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
92 block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_timeout,
93 "Loading database");
94
95 let (termination_ask, mut termination_signal) =
96 tokio::sync::mpsc::channel::<ApplyBlockError>(1);
97 let state = Arc::new(
98 State::load(&self.data_directory, termination_ask)
99 .await
100 .context("failed to load state")?,
101 );
102
103 let rpc_service =
104 store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
105 let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
106 state: Arc::clone(&state),
107 });
108 let block_producer_service =
109 store::block_producer_server::BlockProducerServer::new(api::StoreApi {
110 state: Arc::clone(&state),
111 });
112 let reflection_service = tonic_reflection::server::Builder::configure()
113 .register_file_descriptor_set(store_rpc_api_descriptor())
114 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
115 .register_file_descriptor_set(store_block_producer_api_descriptor())
116 .build_v1()
117 .context("failed to build reflection service")?;
118
119 let reflection_service_alpha = tonic_reflection::server::Builder::configure()
124 .register_file_descriptor_set(store_rpc_api_descriptor())
125 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
126 .register_file_descriptor_set(store_block_producer_api_descriptor())
127 .build_v1alpha()
128 .context("failed to build reflection service")?;
129
130 info!(target: COMPONENT, "Database loaded");
131
132 let mut join_set = JoinSet::new();
133
134 join_set.spawn(async move {
135 let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
140 let database = Arc::clone(&state);
141 loop {
142 interval.tick().await;
143 let _ = database.analyze_table_sizes().await;
144 }
145 });
146
147 join_set.spawn(
149 tonic::transport::Server::builder()
150 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
151 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
152 .timeout(self.grpc_timeout)
153 .add_service(rpc_service)
154 .add_service(reflection_service.clone())
155 .add_service(reflection_service_alpha.clone())
156 .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
157 );
158
159 join_set.spawn(
160 tonic::transport::Server::builder()
161 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
162 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
163 .timeout(self.grpc_timeout)
164 .add_service(ntx_builder_service)
165 .add_service(reflection_service.clone())
166 .add_service(reflection_service_alpha.clone())
167 .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
168 );
169
170 join_set.spawn(
171 tonic::transport::Server::builder()
172 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
173 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
174 .timeout(self.grpc_timeout)
175 .add_service(block_producer_service)
176 .add_service(reflection_service)
177 .add_service(reflection_service_alpha)
178 .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
179 );
180
181 let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
183 tokio::select! {
184 result = service => result,
185 Some(err) = termination_signal.recv() => {
186 Err(anyhow::anyhow!("received termination signal").context(err))
187 }
188 }
189 }
190}
191
192#[derive(Clone)]
196pub struct DataDirectory(PathBuf);
197
198impl DataDirectory {
199 pub fn load(path: PathBuf) -> std::io::Result<Self> {
202 let meta = fs_err::metadata(&path)?;
203 if meta.is_dir().not() {
204 return Err(std::io::ErrorKind::NotConnected.into());
205 }
206
207 Ok(Self(path))
208 }
209
210 pub fn block_store_dir(&self) -> PathBuf {
211 self.0.join("blocks")
212 }
213
214 pub fn database_path(&self) -> PathBuf {
215 self.0.join("miden-store.sqlite3")
216 }
217
218 pub fn display(&self) -> std::path::Display<'_> {
219 self.0.display()
220 }
221}