use std::any::Any;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{self, JoinHandle as ThreadJoinHandle};
use crate::block_journal::{FileBlockJournal, JournalOptions, SyncPolicy};
use crate::error::{StoreError, StoreResult};
use crate::metadata::LmdbMetadataStore;
use crate::net::{RemoteServerHandle, RemoteStoreServer, ServerError, ServerMetricsSnapshot};
use crate::orchestrator::{BlockOrchestrator, DefaultBlockOrchestrator, DurabilityMode};
use crate::snapshot::MmapSnapshotter;
use crate::state_engine::ShardedStateEngine;
use crate::state_shard::{RawTableShard, StateShard};
use crate::store_lock::StoreLockGuard;
use crate::types::{BlockId, Operation, StoreKey as Key, Value};
use parking_lot::RwLock;
use tokio::runtime::Builder as TokioRuntimeBuilder;
use tokio::sync::oneshot;
use super::config::{RemoteServerSettings, StoreConfig};
use super::recovery::{
reconcile_metadata_with_journal, replay_committed_blocks, resolve_shard_layout,
restore_existing_state,
};
use super::StoreFacade;
pub struct SimpleStoreFacade {
orchestrator: Arc<dyn BlockOrchestrator>,
metadata: Option<Arc<LmdbMetadataStore>>,
durability_mode: Arc<RwLock<DurabilityMode>>,
lock: Option<Arc<SharedStoreLock>>,
shutdown_state: Arc<AtomicBool>,
handle_count: Arc<AtomicUsize>,
remote_server: Option<RemoteServerController>,
}
impl Clone for SimpleStoreFacade {
fn clone(&self) -> Self {
self.handle_count.fetch_add(1, Ordering::AcqRel);
Self {
orchestrator: Arc::clone(&self.orchestrator),
metadata: self.metadata.clone(),
durability_mode: Arc::clone(&self.durability_mode),
lock: self.lock.clone(),
shutdown_state: Arc::clone(&self.shutdown_state),
handle_count: Arc::clone(&self.handle_count),
remote_server: self.remote_server.clone(),
}
}
}
impl SimpleStoreFacade {
pub fn new(config: StoreConfig) -> StoreResult<Self> {
let lock = StoreLockGuard::acquire(&config.data_dir)?;
let lock = Arc::new(SharedStoreLock::new(lock));
Self::build(config, lock)
}
fn build(config: StoreConfig, lock: Arc<SharedStoreLock>) -> StoreResult<Self> {
std::fs::create_dir_all(&config.data_dir)?;
let metadata = Arc::new(LmdbMetadataStore::new_with_map_size(
config.metadata_dir(),
config.lmdb_map_size,
)?);
match metadata.load_durability_mode()? {
Some(stored) if stored != config.durability_mode => {
tracing::info!(
stored_mode = ?stored,
configured_mode = ?config.durability_mode,
"Configured durability mode overrides stored value; persisting new setting"
);
metadata.store_durability_mode(&config.durability_mode)?;
}
Some(_) => {}
None => {
metadata.store_durability_mode(&config.durability_mode)?;
}
}
match metadata.load_journal_chunk_size()? {
Some(stored) if stored != config.journal_chunk_size_bytes => {
tracing::info!(
stored_chunk_size = stored,
configured_chunk_size = config.journal_chunk_size_bytes,
"Configured journal chunk size overrides stored value; persisting new setting"
);
metadata.store_journal_chunk_size(config.journal_chunk_size_bytes)?;
}
Some(_) => {}
None => {
metadata.store_journal_chunk_size(config.journal_chunk_size_bytes)?;
}
}
match metadata.load_min_rollback_window()? {
Some(stored) if stored != config.min_rollback_window => {
tracing::info!(
stored_window = stored,
configured_window = config.min_rollback_window,
"Configured rollback window overrides stored value; persisting new setting"
);
metadata.store_min_rollback_window(config.min_rollback_window)?;
}
Some(_) => {}
None => {
metadata.store_min_rollback_window(config.min_rollback_window)?;
}
}
match metadata.load_prune_interval()? {
Some(stored) if stored != config.prune_interval => {
tracing::info!(
stored_interval_ms = stored.as_millis(),
configured_interval_ms = config.prune_interval.as_millis(),
"Configured prune interval overrides stored value; persisting new setting"
);
metadata.store_prune_interval(config.prune_interval)?;
}
Some(_) => {}
None => {
metadata.store_prune_interval(config.prune_interval)?;
}
}
match metadata.load_bootstrap_block_profile()? {
Some(stored) if stored != config.bootstrap_block_profile => {
tracing::info!(
stored_profile = stored,
configured_profile = config.bootstrap_block_profile,
"Configured bootstrap block profile overrides stored value; persisting new setting"
);
metadata.store_bootstrap_block_profile(config.bootstrap_block_profile)?;
}
Some(_) => {}
None => {
metadata.store_bootstrap_block_profile(config.bootstrap_block_profile)?;
}
}
let sync_policy = match &config.durability_mode {
crate::orchestrator::DurabilityMode::AsyncRelaxed {
sync_every_n_blocks,
..
}
| crate::orchestrator::DurabilityMode::SynchronousRelaxed {
sync_every_n_blocks,
} => SyncPolicy::every_n_blocks(*sync_every_n_blocks),
_ => SyncPolicy::default(),
};
let journal_options = JournalOptions {
compress: config.compress_journal,
compression_level: config.journal_compression_level,
sync_policy,
max_chunk_size_bytes: config.journal_chunk_size_bytes,
};
let journal = Arc::new(FileBlockJournal::with_options(
config.journal_dir(),
journal_options,
)?);
let snapshotter = Arc::new(MmapSnapshotter::new(config.snapshots_dir())?);
reconcile_metadata_with_journal(journal.as_ref(), metadata.as_ref())?;
let shard_layout = resolve_shard_layout(metadata.as_ref(), &config, true)?;
let shards: Vec<Arc<dyn StateShard>> = (0..shard_layout.shards_count)
.map(|i| {
Arc::new(RawTableShard::new(i, shard_layout.initial_capacity))
as Arc<dyn StateShard>
})
.collect();
let restored_block =
restore_existing_state(snapshotter.as_ref(), metadata.as_ref(), &shards)?;
let thread_pool = if config.thread_count > 1 {
Some(Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(config.thread_count)
.build()?,
))
} else {
None
};
let engine = match &thread_pool {
Some(pool) => Arc::new(ShardedStateEngine::with_thread_pool(
shards.clone(),
metadata.clone(),
Some(Arc::clone(pool)),
)),
None => Arc::new(ShardedStateEngine::new(shards.clone(), metadata.clone())),
};
replay_committed_blocks(
journal.as_ref(),
metadata.as_ref(),
engine.as_ref(),
restored_block,
)?;
let sizing_snapshot = journal.sizing_snapshot(StoreConfig::prune_sample_window())?;
let prune_diagnostics = config.pruning_diagnostics(&sizing_snapshot)?;
if prune_diagnostics.pruning_disabled {
tracing::info!("Journal pruning disabled via configuration");
} else if prune_diagnostics.waiting_for_history {
tracing::info!(
sample_size = prune_diagnostics.sample_size,
required_window = prune_diagnostics.required_window,
min_window = prune_diagnostics.min_window,
"Journal pruning deferred until sufficient history accumulates"
);
} else if prune_diagnostics.used_bootstrap {
tracing::info!(
bootstrap_profile = config.bootstrap_block_profile,
sample_size = prune_diagnostics.sample_size,
"Using bootstrap block profile for pruning validation until history accumulates"
);
} else if !prune_diagnostics.window_satisfied {
tracing::info!(
min_window = prune_diagnostics.min_window,
required_window = prune_diagnostics.required_window,
blocks_per_chunk = prune_diagnostics.blocks_per_chunk,
safety_chunk_span = prune_diagnostics.safety_chunk_span,
observed_block_bytes = prune_diagnostics.observed_block_bytes,
"Advisory: heuristics recommend a larger rollback window. Pruning still honors the configured minimum (>0 is the only enforced constraint)."
);
} else {
tracing::info!(
min_window = prune_diagnostics.min_window,
required_window = prune_diagnostics.required_window,
blocks_per_chunk = prune_diagnostics.blocks_per_chunk,
safety_chunk_span = prune_diagnostics.safety_chunk_span,
observed_block_bytes = prune_diagnostics.observed_block_bytes,
"Journal pruning heuristics satisfied (advisory only)"
);
}
let persistence_settings = crate::orchestrator::PersistenceSettings {
durability_mode: config.durability_mode.clone(),
snapshot_interval: config.snapshot_interval,
max_snapshot_interval: config.max_snapshot_interval,
min_rollback_window: prune_diagnostics.min_window,
prune_interval: prune_diagnostics.prune_interval,
};
if config.snapshot_interval.is_zero()
&& config.max_snapshot_interval.is_zero()
&& matches!(
config.durability_mode,
DurabilityMode::Async { .. } | DurabilityMode::AsyncRelaxed { .. }
)
{
tracing::warn!(
durability_mode = ?config.durability_mode,
"Asynchronous durability enabled but automatic snapshots are disabled; crash recovery will rely solely on journal replay"
);
}
let orchestrator: Arc<dyn BlockOrchestrator> = Arc::new(DefaultBlockOrchestrator::new(
Arc::clone(&engine),
journal,
snapshotter,
Arc::clone(&metadata),
persistence_settings,
)?);
let durability_mode = Arc::new(RwLock::new(config.durability_mode.clone()));
let mut store = Self {
orchestrator,
metadata: Some(metadata),
durability_mode,
lock: Some(lock),
shutdown_state: Arc::new(AtomicBool::new(false)),
handle_count: Arc::new(AtomicUsize::new(1)),
remote_server: None,
};
if config.enable_server {
if let Some(settings) = config.remote_server.clone() {
store.remote_server = Some(RemoteServerController::spawn(&store, settings)?);
}
}
Ok(store)
}
pub fn from_orchestrator(orchestrator: Arc<dyn BlockOrchestrator>) -> Self {
Self {
orchestrator,
metadata: None,
durability_mode: Arc::new(RwLock::new(DurabilityMode::default())),
lock: None,
shutdown_state: Arc::new(AtomicBool::new(false)),
handle_count: Arc::new(AtomicUsize::new(1)),
remote_server: None,
}
}
pub fn orchestrator(&self) -> &Arc<dyn BlockOrchestrator> {
&self.orchestrator
}
pub fn key_bytes(&self) -> usize {
Key::BYTES
}
pub fn metrics(&self) -> Option<&crate::metrics::StoreMetrics> {
self.orchestrator.metrics()
}
pub fn health(&self) -> Option<crate::metrics::HealthStatus> {
self.orchestrator.metrics().map(|m| m.health())
}
pub fn remote_server_metrics(&self) -> Option<ServerMetricsSnapshot> {
self.remote_server
.as_ref()
.map(|controller| controller.snapshot())
}
pub fn current_block(&self) -> StoreResult<BlockId> {
self.orchestrator.current_block()
}
pub fn applied_block(&self) -> StoreResult<BlockId> {
self.orchestrator.ensure_healthy()?;
Ok(self.orchestrator.applied_block_height())
}
pub fn durable_block(&self) -> StoreResult<BlockId> {
self.orchestrator.durable_block_height()
}
pub fn pop(&self, block_height: BlockId, key: Key) -> StoreResult<Value> {
self.orchestrator.pop(block_height, key)
}
pub fn relaxed_mode_enabled(&self) -> bool {
self.durability_mode.read().is_relaxed()
}
pub fn disable_relaxed_mode(&self) -> StoreResult<()> {
self.orchestrator.set_sync_policy(SyncPolicy::EveryBlock);
self.orchestrator.flush()?;
self.orchestrator.set_metadata_sync_interval(0)?;
let updated_mode = {
let mut guard = self.durability_mode.write();
match &*guard {
DurabilityMode::SynchronousRelaxed { .. } => {
*guard = DurabilityMode::Synchronous;
Some(DurabilityMode::Synchronous)
}
DurabilityMode::AsyncRelaxed {
max_pending_blocks, ..
} => {
let updated = DurabilityMode::Async {
max_pending_blocks: (*max_pending_blocks).max(1),
};
*guard = updated.clone();
Some(updated)
}
_ => None,
}
};
if let Some(mode) = updated_mode.as_ref() {
self.persist_durability_mode(mode)?;
}
tracing::info!(
relaxed_mode_previously_enabled = updated_mode.is_some(),
"Relaxed durability disabled; syncing every block"
);
Ok(())
}
pub fn enable_relaxed_mode(&self, sync_every_n_blocks: usize) -> StoreResult<()> {
let n = sync_every_n_blocks.max(1);
self.orchestrator.set_metadata_sync_interval(n)?;
self.orchestrator
.set_sync_policy(SyncPolicy::every_n_blocks(n));
let new_mode = {
let mut guard = self.durability_mode.write();
let updated = match &*guard {
DurabilityMode::Synchronous | DurabilityMode::SynchronousRelaxed { .. } => {
DurabilityMode::SynchronousRelaxed {
sync_every_n_blocks: n,
}
}
DurabilityMode::Async { max_pending_blocks }
| DurabilityMode::AsyncRelaxed {
max_pending_blocks, ..
} => DurabilityMode::AsyncRelaxed {
max_pending_blocks: (*max_pending_blocks).max(1),
sync_every_n_blocks: n,
},
};
*guard = updated.clone();
updated
};
self.persist_durability_mode(&new_mode)?;
tracing::info!(
sync_every_n_blocks = n,
"Switched to relaxed durability mode"
);
Ok(())
}
fn persist_durability_mode(&self, mode: &DurabilityMode) -> StoreResult<()> {
if let Some(metadata) = &self.metadata {
if let Err(err) = metadata.store_durability_mode(mode) {
let block = self.orchestrator.applied_block_height();
let reason = format!("failed to persist durability mode {:?}: {}", mode, err);
self.orchestrator.record_fatal_error(block, reason);
return Err(err);
}
}
Ok(())
}
pub fn close(&self) -> StoreResult<()> {
if self
.shutdown_state
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return Ok(());
}
if let Some(controller) = &self.remote_server {
if let Err(err) = controller.shutdown() {
self.shutdown_state.store(false, Ordering::Release);
return Err(err);
}
}
match self.orchestrator.shutdown() {
Ok(()) => {
if let Some(lock) = &self.lock {
lock.release();
}
Ok(())
}
Err(err) => {
self.shutdown_state.store(false, Ordering::Release);
Err(err)
}
}
}
#[cfg(test)]
pub(crate) fn new_for_testing(
orchestrator: Arc<dyn BlockOrchestrator>,
shutdown_state: Arc<AtomicBool>,
handle_count: Arc<AtomicUsize>,
) -> Self {
Self {
orchestrator,
metadata: None,
durability_mode: Arc::new(RwLock::new(DurabilityMode::default())),
lock: None,
shutdown_state,
handle_count,
remote_server: None,
}
}
}
impl StoreFacade for SimpleStoreFacade {
fn set(&self, block_height: BlockId, operations: Vec<Operation>) -> StoreResult<()> {
for op in operations.iter() {
op.value.ensure_within_limit()?;
}
self.orchestrator.apply_operations(block_height, operations)
}
fn rollback(&self, target: BlockId) -> StoreResult<()> {
self.orchestrator.revert_to(target)
}
fn get(&self, key: Key) -> StoreResult<Value> {
self.orchestrator.fetch(key)
}
fn multi_get(&self, keys: &[Key]) -> StoreResult<Vec<Value>> {
self.orchestrator.fetch_many(keys)
}
fn pop(&self, block_height: BlockId, key: Key) -> StoreResult<Value> {
self.orchestrator.pop(block_height, key)
}
fn enable_relaxed_mode(&self, sync_every_n_blocks: usize) -> StoreResult<()> {
SimpleStoreFacade::enable_relaxed_mode(self, sync_every_n_blocks)
}
fn relaxed_mode_enabled(&self) -> bool {
SimpleStoreFacade::relaxed_mode_enabled(self)
}
fn disable_relaxed_mode(&self) -> StoreResult<()> {
SimpleStoreFacade::disable_relaxed_mode(self)
}
fn close(&self) -> StoreResult<()> {
SimpleStoreFacade::close(self)
}
fn current_block(&self) -> StoreResult<BlockId> {
self.orchestrator.current_block()
}
fn applied_block(&self) -> StoreResult<BlockId> {
SimpleStoreFacade::applied_block(self)
}
fn durable_block(&self) -> StoreResult<BlockId> {
self.orchestrator.durable_block_height()
}
fn ensure_healthy(&self) -> StoreResult<()> {
self.orchestrator.ensure_healthy()
}
}
impl Drop for SimpleStoreFacade {
fn drop(&mut self) {
if self.handle_count.fetch_sub(1, Ordering::AcqRel) != 1 {
return;
}
if self
.shutdown_state
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
if let Some(controller) = &self.remote_server {
if let Err(err) = controller.shutdown() {
tracing::warn!(error = ?err, "Failed to shutdown remote server during drop");
}
}
if let Err(err) = self.orchestrator.shutdown() {
tracing::warn!(
error = ?err,
"Failed to shutdown store during drop; persistence thread may remain active"
);
}
if let Some(lock) = &self.lock {
lock.release();
}
}
}
#[derive(Clone)]
struct RemoteServerController {
shared: Arc<RemoteServerShared>,
}
struct SharedStoreLock {
guard: Mutex<Option<StoreLockGuard>>,
}
impl SharedStoreLock {
fn new(guard: StoreLockGuard) -> Self {
Self {
guard: Mutex::new(Some(guard)),
}
}
fn release(&self) {
let mut guard = self.guard.lock().unwrap();
guard.take();
}
}
struct RemoteServerShared {
thread: Mutex<Option<ThreadJoinHandle<Result<(), ServerError>>>>,
shutdown_tx: Mutex<Option<oneshot::Sender<()>>>,
metrics: RemoteServerHandle,
}
impl RemoteServerController {
fn spawn(store: &SimpleStoreFacade, settings: RemoteServerSettings) -> StoreResult<Self> {
let username_missing = settings.auth.username.trim().is_empty();
let password_missing = settings.auth.password.trim().is_empty();
if username_missing || password_missing || settings.uses_default_auth() {
return Err(StoreError::RemoteServerCredentialsMissing);
}
let worker_threads = settings.worker_threads.max(1);
let runtime = TokioRuntimeBuilder::new_multi_thread()
.worker_threads(worker_threads)
.enable_all()
.build()?;
let server = RemoteStoreServer::new(store.clone(), settings.to_server_config())?;
let metrics = server.handle();
let (shutdown_tx, shutdown_rx) = oneshot::channel();
let (startup_tx, startup_rx) = mpsc::sync_channel(1);
let thread = thread::Builder::new()
.name("rollblock-remote-server".into())
.spawn(move || {
let shutdown = async move {
let _ = shutdown_rx.await;
};
let listener = match runtime.block_on(server.bind_listener()) {
Ok(listener) => {
let _ = startup_tx.send(Ok(()));
listener
}
Err(err) => {
let _ = startup_tx.send(Err(()));
return Err(err);
}
};
runtime.block_on(async move {
server
.run_until_shutdown_with_listener(listener, shutdown)
.await
})
})
.map_err(|err| StoreError::RemoteServerTaskFailure {
reason: format!("failed to spawn remote server thread: {err}"),
})?;
if startup_rx.recv().map_err(|_| ()) != Ok(Ok(())) {
let err = match thread.join() {
Ok(Ok(())) => StoreError::RemoteServerTaskFailure {
reason: "remote server thread exited before reporting status".into(),
},
Ok(Err(server_err)) => StoreError::from(server_err),
Err(panic) => StoreError::RemoteServerTaskFailure {
reason: describe_panic(panic),
},
};
return Err(err);
}
Ok(Self {
shared: Arc::new(RemoteServerShared {
thread: Mutex::new(Some(thread)),
shutdown_tx: Mutex::new(Some(shutdown_tx)),
metrics,
}),
})
}
fn shutdown(&self) -> StoreResult<()> {
self.shared.shutdown()
}
fn snapshot(&self) -> ServerMetricsSnapshot {
self.shared.metrics.snapshot()
}
}
impl RemoteServerShared {
fn shutdown(&self) -> StoreResult<()> {
if let Some(tx) = self.shutdown_tx.lock().unwrap().take() {
let _ = tx.send(());
}
if let Some(thread) = self.thread.lock().unwrap().take() {
match thread.join() {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(StoreError::from(err)),
Err(panic) => Err(StoreError::RemoteServerTaskFailure {
reason: describe_panic(panic),
}),
}
} else {
Ok(())
}
}
}
impl Drop for RemoteServerShared {
fn drop(&mut self) {
let _ = self.shutdown();
}
}
fn describe_panic(payload: Box<dyn Any + Send + 'static>) -> String {
if let Some(msg) = payload.downcast_ref::<&str>() {
msg.to_string()
} else if let Some(msg) = payload.downcast_ref::<String>() {
msg.clone()
} else {
"remote server thread panicked".into()
}
}