mod ancestry;
mod append;
pub mod backup_envelope;
mod chain_walk;
pub mod cold_start;
mod compaction_report;
mod config;
pub mod delivery;
mod diagnostics_api;
mod dir_lock;
mod error;
#[cfg(feature = "dangerous-test-hooks")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "dangerous-test-hooks"))
)]
pub mod fault;
mod file_classification;
mod frontier_api;
mod gate;
mod hidden_ranges;
pub mod index;
mod lifecycle;
mod lifecycle_api;
mod open;
mod platform;
pub mod projection;
mod projection_run;
pub mod reaction;
pub mod reactor_typed;
mod read_api;
mod read_walk;
mod receipt_verification;
#[cfg(test)]
mod runtime_contracts;
pub mod segment;
mod signing;
mod snapshot_report;
pub mod stats;
mod store_resource_report;
mod subscriber_frontier;
#[cfg(feature = "dangerous-test-hooks")]
mod test_support;
mod watch_api;
pub(crate) mod write;
mod write_api;
pub use append::{
AppendOptions, AppendPositionHint, AppendReceipt, BatchAppendItem, CausationRef,
CompactionConfig, CompactionStrategy, DenialReceipt, EncodedBytes, ExtensionKey,
ExtensionKeyError, ReceiptExtensionKey, ReceiptExtensionNamespace, ReceiptExtensionValue,
RetentionPredicate, SigningDowngradeBody, SigningDowngradeReason, SigningExtensionNamespace,
SIGNING_DOWNGRADE_SCHEMA_VERSION,
};
pub use chain_walk::{
ChainWalkEvidenceReport, ChainWalkFinding, ChainWalkHash, ChainWalkMode, ChainWalkReportBody,
ChainWalkReportError, ChainWalkRequest, ChainWalkStartRef, CHAIN_WALK_REPORT_SCHEMA_VERSION,
};
pub use compaction_report::{
compaction_strategy_shape, report_for_run, report_skipped, CompactionEvidenceHash,
CompactionEvidenceReport, CompactionReportBody, CompactionReportFinding,
CompactionStrategyShape, COMPACTION_REPORT_SCHEMA_VERSION,
};
pub use config::{
BatchConfig, IndexConfig, IndexTopology, OpenReportObserver, StoreConfig, SyncConfig, SyncMode,
WriterConfig,
};
pub use delivery::canal::{Canal, CanalBatch, CanalClosed, CanalHandle, CanalItem, ReactorCanal};
pub use delivery::cursor::{
Cursor, CursorGapConfig, CursorWorkerAction, CursorWorkerConfig, CursorWorkerHandle,
GapObservation,
};
pub use delivery::observation::{
AtLeastOnce, CheckpointId, CheckpointIdError, IdempotencyKey, ObservedOnce,
MAX_CHECKPOINT_ID_LEN,
};
pub use delivery::subscription::Subscription;
pub use error::{
HiddenRangesCorruption, ProfileInvalidKind, StoreError, StoreInvariant, StoreLockMode,
};
#[cfg(feature = "dangerous-test-hooks")]
#[cfg_attr(
all(docsrs, not(batpak_stable_docs)),
doc(cfg(feature = "dangerous-test-hooks"))
)]
pub use fault::{
CountdownAction, CountdownInjector, FaultInjector, InjectionPoint, ProbabilisticInjector,
};
pub use gate::DurabilityGate;
pub use platform::clock::{Clock, SystemClock};
pub use projection::watch::{CursorWatcherError, ProjectionWatcher, WatcherError};
pub use projection::{
CacheCapabilities, CacheMeta, Freshness, NativeCache, NoCache, ProjectionCache,
};
pub use projection_run::{
ProjectionRunCacheStatus, ProjectionRunCheckpointRef, ProjectionRunEvidenceReport,
ProjectionRunFinding, ProjectionRunFreshnessStatus, ProjectionRunFrontierKind,
ProjectionRunHash, ProjectionRunInputFrontier, ProjectionRunOutputHash,
ProjectionRunReplayMode, ProjectionRunReportBody, ProjectionRunReportError,
ProjectionRunRequestedFreshness, ProjectionSourceRef, PROJECTION_RUN_REPORT_SCHEMA_VERSION,
};
pub use reaction::ReactionBatch;
pub use reactor_typed::{ReactorConfig, ReactorError, TypedReactorHandle};
pub use read_walk::{
ReadWalkDroppedCount, ReadWalkEvidenceReport, ReadWalkFinding, ReadWalkFreshnessIntent,
ReadWalkFrontierKind, ReadWalkHash, ReadWalkInputFrontier, ReadWalkProofRef, ReadWalkProofRefs,
ReadWalkReplayMode, ReadWalkReportBody, ReadWalkReportError, ReadWalkRequest,
ReadWalkSourceRef, READ_WALK_REPORT_SCHEMA_VERSION,
};
pub use receipt_verification::{ReceiptVerification, ReceiptVerificationError};
pub use signing::SigningKey;
pub use snapshot_report::{
snapshot_report_body_hash, SnapshotEvidenceHash, SnapshotEvidenceReport, SnapshotFenceTokenRef,
SnapshotFileKind, SnapshotFinding, SnapshotReportBody, SnapshotWatermarkRef,
SNAPSHOT_EVIDENCE_REPORT_SCHEMA_VERSION,
};
pub use stats::{
ActiveSegmentReadEvidence, ClockEvidence, FrontierView, HlcPoint, HostEvidenceSummary,
LockLeafSymlinkProtection, MmapAdmissionSummary, MmapEvidence, ParentDirSyncAdmissionSummary,
ParentDirSyncEvidence, PlatformAdmissionSummary, PlatformEvidenceSummary, StoreDiagnostics,
StoreLockAdmissionSummary, StorePathEvidenceSummary, StorePathStatusEvidence, StoreStats,
WatermarkKind, WriterPressure,
};
pub use store_resource_report::{
store_data_dir_identity_hash, store_resource_evidence_report_from_diagnostics,
store_resource_report_body_from_diagnostics, store_resource_report_body_hash,
StoreResourceEvidenceReport, StoreResourceFrontierBody, StoreResourceHash,
StoreResourceReportBody, StoreResourceReportError, StoreResourceRestartPolicyShape,
STORE_RESOURCE_REPORT_SCHEMA_VERSION,
};
pub use subscriber_frontier::{
LossPrecision, SubscriberDeliveryState, SubscriberFrontierEvidenceReport,
SubscriberFrontierFinding, SubscriberFrontierHash, SubscriberFrontierReportBody,
SubscriberFrontierReportError, SubscriberFrontierRequest, SubscriberFrontierSource,
SUBSCRIBER_FRONTIER_REPORT_SCHEMA_VERSION,
};
pub use write::control::{AppendTicket, BatchAppendTicket, Outbox, VisibilityFence};
pub use write::writer::{Notification, RestartPolicy};
use crate::coordinate::{Coordinate, KindFilter, Region};
use crate::event::{
self, EventKind, EventPayload, EventPayloadValidation, EventSourced, StoredEvent,
};
use crate::guard::{Denial, GateSet};
use index::StoreIndex;
use open::timestamp_us_for_hlc;
use parking_lot::Mutex;
use projection::registry::ProjectionRegistry;
use segment::scan::Reader;
use serde::Serialize;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::Arc;
use write::control::AppendSubmission;
use write::fanout::{ReactorSubscriberList, SubscriberList};
use write::writer::{WatermarkAdvanceHandle, WatermarkState, WriterCommand, WriterHandle};
#[cfg(test)]
const TEST_WRITER_REPLY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
pub(crate) fn recv_writer_reply<T>(
rx: &flume::Receiver<Result<T, StoreError>>,
) -> Result<T, StoreError> {
#[cfg(test)]
let received = rx
.recv_timeout(TEST_WRITER_REPLY_TIMEOUT)
.map_err(|_| StoreError::WriterCrashed)?;
#[cfg(not(test))]
let received = rx.recv().map_err(|_| StoreError::WriterCrashed)?;
received
}
#[allow(unexpected_cfgs)]
#[cfg(feature = "async-store")]
compile_error!("INVARIANT 2: Store API is sync. Use spawn_blocking or flume recv_async.");
pub struct Open;
pub struct Closed;
pub struct ReadOnly;
pub struct Store<State = Open> {
pub(crate) index: Arc<StoreIndex>,
pub(crate) reader: Arc<Reader>,
pub(crate) cache: Box<dyn ProjectionCache>,
pub(crate) writer: Option<WriterHandle>,
pub(crate) watermark_handle: WatermarkAdvanceHandle,
pub(crate) projection_registry: ProjectionRegistry,
pub(crate) lifecycle_gate: Mutex<()>,
pub(crate) config: Arc<StoreConfig>,
pub(crate) runtime: Arc<config::ValidatedStoreConfig>,
pub(crate) should_shutdown_on_drop: bool,
pub(crate) open_report: Option<cold_start::rebuild::OpenIndexReport>,
pub(crate) cumulative_reserved_kind_fallbacks: segment::sidx::ReservedKindFallbackStats,
pub(crate) _state: std::marker::PhantomData<State>,
pub(crate) _store_lock: dir_lock::StoreDirLock,
}
impl<State> Drop for Store<State> {
fn drop(&mut self) {
if !self.should_shutdown_on_drop {
return;
}
let Some(mut writer) = self.writer.take() else {
return;
};
tracing::warn!(
"Store dropped without explicit close(); draining writer before releasing store lock"
);
let (tx, rx) = flume::bounded(1);
if writer
.tx
.send(WriterCommand::Shutdown { respond: tx })
.is_ok()
{
wait_for_drop_shutdown_ack(&rx);
}
join_drop_shutdown_writer(&mut writer);
}
}
fn wait_for_drop_shutdown_ack(rx: &flume::Receiver<Result<(), StoreError>>) {
let _ack = rx.recv();
}
fn join_drop_shutdown_writer(writer: &mut WriterHandle) {
let _join_result = writer.join();
}