miden_node_store/server/
mod.rs1use std::ops::Not;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Duration;
5
6use anyhow::Context;
7use miden_node_proto::generated::store;
8use miden_node_proto_build::{
9 store_block_producer_api_descriptor,
10 store_ntx_builder_api_descriptor,
11 store_rpc_api_descriptor,
12};
13use miden_node_utils::clap::GrpcOptionsInternal;
14use miden_node_utils::panic::{CatchPanicLayer, catch_panic_layer_fn};
15use miden_node_utils::tracing::grpc::grpc_trace_fn;
16use miden_protocol::block::BlockSigner;
17use tokio::net::TcpListener;
18use tokio::task::JoinSet;
19use tokio_stream::wrappers::TcpListenerStream;
20use tower_http::trace::TraceLayer;
21use tracing::{info, instrument};
22
23use crate::blocks::BlockStore;
24use crate::db::Db;
25use crate::errors::ApplyBlockError;
26use crate::state::State;
27use crate::{COMPONENT, GenesisState};
28
29mod api;
30mod block_producer;
31mod ntx_builder;
32mod rpc_api;
33
34pub struct Store {
36 pub rpc_listener: TcpListener,
37 pub ntx_builder_listener: TcpListener,
38 pub block_producer_listener: TcpListener,
39 pub data_directory: PathBuf,
40 pub grpc_options: GrpcOptionsInternal,
41}
42
43impl Store {
44 #[instrument(
46 target = COMPONENT,
47 name = "store.bootstrap",
48 skip_all,
49 err,
50 )]
51 pub fn bootstrap<S: BlockSigner>(
52 genesis: GenesisState<S>,
53 data_directory: &Path,
54 ) -> anyhow::Result<()> {
55 let genesis = genesis
56 .into_block()
57 .context("failed to convert genesis configuration into the genesis block")?;
58
59 let data_directory =
60 DataDirectory::load(data_directory.to_path_buf()).with_context(|| {
61 format!("failed to load data directory at {}", data_directory.display())
62 })?;
63 tracing::info!(target=COMPONENT, path=%data_directory.display(), "Data directory loaded");
64
65 let block_store = data_directory.block_store_dir();
66 let block_store =
67 BlockStore::bootstrap(block_store.clone(), &genesis).with_context(|| {
68 format!("failed to bootstrap block store at {}", block_store.display())
69 })?;
70 tracing::info!(target=COMPONENT, path=%block_store.display(), "Block store created");
71
72 let database_filepath = data_directory.database_path();
74 Db::bootstrap(database_filepath.clone(), &genesis).with_context(|| {
75 format!("failed to bootstrap database at {}", database_filepath.display())
76 })?;
77 tracing::info!(target=COMPONENT, path=%database_filepath.display(), "Database created");
78
79 Ok(())
80 }
81
82 pub async fn serve(self) -> anyhow::Result<()> {
86 let rpc_address = self.rpc_listener.local_addr()?;
87 let ntx_builder_address = self.ntx_builder_listener.local_addr()?;
88 let block_producer_address = self.block_producer_listener.local_addr()?;
89 info!(target: COMPONENT, rpc_endpoint=?rpc_address, ntx_builder_endpoint=?ntx_builder_address,
90 block_producer_endpoint=?block_producer_address, ?self.data_directory, ?self.grpc_options.request_timeout,
91 "Loading database");
92
93 let (termination_ask, mut termination_signal) =
94 tokio::sync::mpsc::channel::<ApplyBlockError>(1);
95 let state = Arc::new(
96 State::load(&self.data_directory, termination_ask)
97 .await
98 .context("failed to load state")?,
99 );
100
101 let rpc_service =
102 store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state) });
103 let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi {
104 state: Arc::clone(&state),
105 });
106 let block_producer_service =
107 store::block_producer_server::BlockProducerServer::new(api::StoreApi {
108 state: Arc::clone(&state),
109 });
110 let reflection_service = tonic_reflection::server::Builder::configure()
111 .register_file_descriptor_set(store_rpc_api_descriptor())
112 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
113 .register_file_descriptor_set(store_block_producer_api_descriptor())
114 .build_v1()
115 .context("failed to build reflection service")?;
116
117 let reflection_service_alpha = tonic_reflection::server::Builder::configure()
122 .register_file_descriptor_set(store_rpc_api_descriptor())
123 .register_file_descriptor_set(store_ntx_builder_api_descriptor())
124 .register_file_descriptor_set(store_block_producer_api_descriptor())
125 .build_v1alpha()
126 .context("failed to build reflection service")?;
127
128 info!(target: COMPONENT, "Database loaded");
129
130 let mut join_set = JoinSet::new();
131
132 join_set.spawn(async move {
133 let mut interval = tokio::time::interval(Duration::from_secs(5 * 60));
138 let database = Arc::clone(&state);
139 loop {
140 interval.tick().await;
141 let _ = database.analyze_table_sizes().await;
142 }
143 });
144
145 join_set.spawn(
147 tonic::transport::Server::builder()
148 .timeout(self.grpc_options.request_timeout)
149 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
150 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
151 .add_service(rpc_service)
152 .add_service(reflection_service.clone())
153 .add_service(reflection_service_alpha.clone())
154 .serve_with_incoming(TcpListenerStream::new(self.rpc_listener)),
155 );
156
157 join_set.spawn(
158 tonic::transport::Server::builder()
159 .timeout(self.grpc_options.request_timeout)
160 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
161 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
162 .add_service(ntx_builder_service)
163 .add_service(reflection_service.clone())
164 .add_service(reflection_service_alpha.clone())
165 .serve_with_incoming(TcpListenerStream::new(self.ntx_builder_listener)),
166 );
167
168 join_set.spawn(
169 tonic::transport::Server::builder()
170 .accept_http1(true)
171 .timeout(self.grpc_options.request_timeout)
172 .layer(CatchPanicLayer::custom(catch_panic_layer_fn))
173 .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
174 .add_service(block_producer_service)
175 .add_service(reflection_service)
176 .add_service(reflection_service_alpha)
177 .serve_with_incoming(TcpListenerStream::new(self.block_producer_listener)),
178 );
179
180 let service = async move { join_set.join_next().await.unwrap()?.map_err(Into::into) };
182 tokio::select! {
183 result = service => result,
184 Some(err) = termination_signal.recv() => {
185 Err(anyhow::anyhow!("received termination signal").context(err))
186 }
187 }
188 }
189}
190
191#[derive(Clone)]
195pub struct DataDirectory(PathBuf);
196
197impl DataDirectory {
198 pub fn load(path: PathBuf) -> std::io::Result<Self> {
201 let meta = fs_err::metadata(&path)?;
202 if meta.is_dir().not() {
203 return Err(std::io::ErrorKind::NotConnected.into());
204 }
205
206 Ok(Self(path))
207 }
208
209 pub fn block_store_dir(&self) -> PathBuf {
210 self.0.join("blocks")
211 }
212
213 pub fn database_path(&self) -> PathBuf {
214 self.0.join("miden-store.sqlite3")
215 }
216
217 pub fn display(&self) -> std::path::Display<'_> {
218 self.0.display()
219 }
220}