use std::{
collections::HashSet,
fmt,
future::Future,
io,
sync::{Arc, Mutex, OnceLock, Weak, atomic::AtomicBool},
time::{Duration, Instant},
};
use arc_swap::ArcSwap;
use arrow_schema::SchemaRef;
use chrono::Utc;
use datafusion::execution::context::SessionContext;
use tokio::runtime::Runtime;
use super::{
error::{BuildError, OpenError},
manifest::ManifestSnapshot,
options::SupertableOptions,
};
use crate::{
runtime_bridge::{bridge_on_runtime, bridge_sync_to_async, build_query_runtime},
storage::StorageError,
supertable::{
ManifestLoadError, SuperfileUri, SupertableStats,
manifest::commit::read_pointer,
options::Consistency,
reader_cache::disk::{DiskCacheError, skip_background_fill},
stats::process_rss_bytes,
tombstones::{SidecarCache, cache::DEFAULT_REFRESH_TTL},
utils::idgen::IdGenerator,
wal::{
WalStore, gc,
lease::DEFAULT_LEASE_DURATION,
recovery::{RecoveryError, RecoveryReport, scan_and_recover},
state_doc::SupertableHandleId,
},
},
};
#[derive(Clone)]
pub struct Supertable {
inner: Arc<SupertableInner>,
}
pub(super) struct SupertableInner {
pub(super) options: Arc<SupertableOptions>,
pub(super) manifest: ArcSwap<ManifestSnapshot>,
pub(super) writer_outstanding: AtomicBool,
pub(super) compaction_outstanding: AtomicBool,
pub(super) id_generator: Mutex<IdGenerator>,
pub(super) query_runtime: OnceLock<Arc<Runtime>>,
pub(super) sql_session_cache: Mutex<Option<(Arc<ManifestSnapshot>, SessionContext)>>,
pub(super) tombstone_cache: Option<Arc<SidecarCache>>,
pub(super) handle_id: SupertableHandleId,
pub(super) last_pointer_check: Mutex<Option<Instant>>,
}
impl Drop for SupertableInner {
fn drop(&mut self) {
if let Some(rt) = self.query_runtime.take()
&& let Ok(rt) = Arc::try_unwrap(rt)
{
rt.shutdown_background();
}
}
}
impl SupertableInner {
pub(super) fn query_runtime(&self) -> Arc<Runtime> {
Arc::clone(
self.query_runtime
.get_or_init(|| build_query_runtime("supertable-query")),
)
}
}
impl Supertable {
test_visible! {
fn create(options: SupertableOptions) -> Result<Self, OpenError> {
if let Some(storage) = options.storage.as_ref() {
let probe = Arc::clone(storage);
let probe_result =
bridge_sync_to_async(async move { read_pointer(&*probe).await });
match probe_result {
Ok(Some(_pointer)) => {
return Self::open(options);
}
Ok(None) => {
}
Err(e) => {
return Err(OpenError::Storage(StorageError::Permanent {
uri: "_supertable/current".into(),
source: Box::new(io::Error::other(format!("{e}"))),
}));
}
}
}
let options = Arc::new(options);
let initial = ManifestSnapshot::empty(options.clone());
let tombstone_cache = build_tombstone_cache(&options);
let id_generator = IdGenerator::new();
let handle_id = SupertableHandleId(id_generator.next_id());
let inner = Arc::new(SupertableInner {
options,
manifest: ArcSwap::new(Arc::new(initial)),
writer_outstanding: AtomicBool::new(false),
compaction_outstanding: AtomicBool::new(false),
id_generator: Mutex::new(id_generator),
query_runtime: OnceLock::new(),
sql_session_cache: Mutex::new(None),
tombstone_cache,
handle_id,
last_pointer_check: Mutex::new(None),
});
install_disk_cache_pinning(&inner);
let st = Self { inner };
if st.inner.options.storage.is_some() {
let _ = st.run_recovery_sweep_once_blocking();
let _ = bridge_sync_to_async(async { st.run_gc_sweep_once().await.map_err(|_| ()) });
}
Ok(st)
}
}
test_visible! {
fn open(options: SupertableOptions) -> Result<Self, OpenError> {
bridge_sync_to_async(Self::open_async(options))
}
}
pub(crate) async fn open_async(options: SupertableOptions) -> Result<Self, OpenError> {
let storage = options
.storage
.as_ref()
.ok_or_else(|| {
OpenError::Build(BuildError::Store(
"Supertable::open requires options.storage; \
attach via .with_storage(...) before calling open"
.into(),
))
})?
.clone();
let options_arc = Arc::new(options);
let manifest = ManifestSnapshot::load(None, storage, Some(options_arc.clone())).await?;
let tombstone_cache = build_tombstone_cache(&options_arc);
let id_generator = IdGenerator::new();
let handle_id = SupertableHandleId(id_generator.next_id());
let inner = Arc::new(SupertableInner {
options: options_arc,
manifest: ArcSwap::new(manifest),
writer_outstanding: AtomicBool::new(false),
compaction_outstanding: AtomicBool::new(false),
id_generator: Mutex::new(id_generator),
query_runtime: OnceLock::new(),
sql_session_cache: Mutex::new(None),
tombstone_cache,
handle_id,
last_pointer_check: Mutex::new(None),
});
install_disk_cache_pinning(&inner);
let st = Self { inner };
let _ = st.run_recovery_sweep_once().await;
let _ = st.run_gc_sweep_once().await;
Ok(st)
}
pub(crate) async fn refresh(&self) -> Result<bool, OpenError> {
let storage = self
.inner
.options
.storage
.as_ref()
.ok_or_else(|| {
OpenError::Build(BuildError::Store(
"Supertable::refresh requires options.storage".into(),
))
})?
.clone();
let current = self.inner.manifest.load_full();
let manifest = match ManifestSnapshot::load(Some(current), storage, None).await {
Ok(manifest) => manifest,
Err(ManifestLoadError::PointerNotFound) => return Ok(false),
Err(ManifestLoadError::AlreadyLoaded) => return Ok(false),
Err(err) => return Err(OpenError::ManifestLoadError(err)),
};
self.inner.manifest.store(manifest);
Ok(true)
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn manifest_id(&self) -> u64 {
self.inner.manifest.load().manifest_id
}
test_visible! {
fn reader(&self) -> SupertableReader {
self.ensure_fresh();
SupertableReader {
manifest: self.inner.manifest.load_full(),
tombstone_cache: self.inner.tombstone_cache.clone(),
inner: Arc::clone(&self.inner),
}
}
}
pub(crate) fn ensure_fresh(&self) {
if self.inner.options.storage.is_none() {
return;
}
match self.inner.options.read_consistency {
Consistency::Snapshot => {}
Consistency::Strong => {
let _ = bridge_sync_to_async(self.refresh());
}
Consistency::BoundedStaleness(window) => {
let due = {
let mut last = self
.inner
.last_pointer_check
.lock()
.expect("last_pointer_check mutex poisoned");
let due = last.map(|t| t.elapsed() >= window).unwrap_or(true);
if due {
*last = Some(Instant::now());
}
due
};
if due {
let _ = bridge_sync_to_async(self.refresh());
}
}
}
}
test_visible! {
fn options(&self) -> &Arc<SupertableOptions> {
&self.inner.options
}
}
pub fn schema(&self) -> SchemaRef {
self.inner.options.user_schema()
}
pub(crate) fn block_on_query<F: Future>(&self, fut: F) -> F::Output {
bridge_on_runtime(fut, &self.query_runtime())
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn wait_until_warm(&self, timeout: Duration) -> Result<(), DiskCacheError> {
let Some(cache) = self.inner.options.disk_cache.as_ref() else {
return Ok(());
};
if skip_background_fill() {
return Ok(());
}
let cache = Arc::clone(cache);
let manifest = self.inner.manifest.load_full();
self.block_on_query(async move {
for entry in manifest.superfiles.iter() {
cache.wait_until_mmap_promoted(&entry.uri, timeout).await?;
}
Ok(())
})
}
#[cfg(test)]
pub(crate) fn handle_id(&self) -> SupertableHandleId {
self.inner.handle_id
}
pub(super) fn from_inner(inner: Arc<SupertableInner>) -> Self {
Self { inner }
}
pub(crate) async fn run_recovery_sweep_once(&self) -> Result<RecoveryReport, RecoveryError> {
scan_and_recover(self, self.inner.handle_id, DEFAULT_LEASE_DURATION).await
}
pub(crate) fn run_recovery_sweep_once_blocking(&self) -> Result<RecoveryReport, RecoveryError> {
let drive = self.run_recovery_sweep_once();
bridge_on_runtime(drive, &self.inner.query_runtime())
}
pub(crate) async fn run_gc_sweep_once(&self) -> Result<gc::GcReport, gc::GcError> {
gc::run_sweep(
self,
Utc::now(),
gc::DEFAULT_WAL_GRACE,
gc::DEFAULT_SIDECAR_GRACE,
)
.await
}
#[cfg(any(test, feature = "test-helpers"))]
pub fn stats(&self) -> SupertableStats {
let manifest = self.inner.manifest.load();
let n_manifest_parts = manifest.get_num_parts();
let cache = self.inner.options.disk_cache.as_ref();
let mmap_resident_bytes = cache.map(|c| c.current_mmap_size_bytes());
let cache_snapshot = cache.map(|c| c.stats());
SupertableStats {
manifest_id: manifest.get_manifest_id(),
n_superfiles: manifest.get_all_superfiles().len(),
n_manifest_parts,
n_manifest_parts_loaded: manifest.get_num_parts_loaded(),
process_rss_bytes: process_rss_bytes(),
mmap_resident_bytes,
memory_budget_bytes: self.inner.options.memory_budget_bytes,
n_cold_fetches: cache_snapshot.as_ref().map(|s| s.n_cold_fetches),
n_cache_evictions: cache_snapshot.as_ref().map(|s| s.n_evictions),
n_cache_madvise_calls: cache_snapshot.as_ref().map(|s| s.n_madvise_calls),
n_cache_entries: cache_snapshot.as_ref().map(|s| s.n_entries),
}
}
pub(super) fn inner(&self) -> &Arc<SupertableInner> {
&self.inner
}
pub(crate) fn query_runtime(&self) -> Arc<Runtime> {
self.inner.query_runtime()
}
pub(crate) fn sql_session_cache(
&self,
) -> &Mutex<Option<(Arc<ManifestSnapshot>, SessionContext)>> {
&self.inner.sql_session_cache
}
#[doc(hidden)]
#[cfg(any(test, feature = "test-helpers"))]
pub fn __debug_cached_session(&self) -> SessionContext {
self.reader().query_sql("SELECT 1 WHERE 1=0").ok();
let guard = self
.sql_session_cache()
.lock()
.expect("sql_session_cache mutex poisoned");
guard
.as_ref()
.map(|(_, ctx)| ctx.clone())
.expect("session cache must be populated after warm-up call")
}
}
fn install_disk_cache_pinning(inner: &Arc<SupertableInner>) {
let cache = match inner.options.disk_cache.as_ref() {
Some(c) => c,
None => return,
};
let pinned_fn: Arc<dyn Fn() -> HashSet<SuperfileUri> + Send + Sync> = Arc::new(HashSet::new);
cache.set_pinned_fn(pinned_fn);
}
fn build_tombstone_cache(options: &Arc<SupertableOptions>) -> Option<Arc<SidecarCache>> {
let storage = options.storage.as_ref()?.clone();
let wal_store = WalStore::new(storage);
Some(Arc::new(SidecarCache::new(wal_store, DEFAULT_REFRESH_TTL)))
}
impl fmt::Debug for Supertable {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let m = self.inner.manifest.load();
f.debug_struct("Supertable")
.field("manifest_id", &m.manifest_id)
.field("n_superfiles", &m.superfiles.len())
.field("id_column", &self.inner.options.id_column)
.finish()
}
}
#[derive(Clone)]
pub struct SupertableReader {
manifest: Arc<ManifestSnapshot>,
pub(crate) tombstone_cache: Option<Arc<SidecarCache>>,
inner: Arc<SupertableInner>,
}
#[derive(Clone)]
pub(crate) struct WeakReader {
inner: Weak<SupertableInner>,
manifest: Arc<ManifestSnapshot>,
tombstone_cache: Option<Arc<SidecarCache>>,
}
impl fmt::Debug for WeakReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("WeakReader").finish_non_exhaustive()
}
}
impl WeakReader {
pub(crate) fn from_reader(reader: &SupertableReader) -> Self {
Self {
inner: Arc::downgrade(reader.inner_arc()),
manifest: Arc::clone(reader.manifest()),
tombstone_cache: reader.tombstone_cache.clone(),
}
}
pub(crate) fn upgrade(&self) -> Option<Arc<SupertableReader>> {
let inner = self.inner.upgrade()?;
Some(Arc::new(SupertableReader::from_inner_pinned(
inner,
Arc::clone(&self.manifest),
self.tombstone_cache.clone(),
)))
}
}
impl SupertableReader {
pub fn manifest_id(&self) -> u64 {
self.manifest.manifest_id
}
pub(crate) fn block_on<F: Future>(&self, fut: F) -> F::Output {
bridge_on_runtime(fut, &self.inner.query_runtime())
}
pub fn n_superfiles(&self) -> usize {
self.manifest.superfiles.len()
}
pub fn n_docs_total(&self) -> u64 {
self.manifest.n_docs_total()
}
pub fn manifest(&self) -> &Arc<ManifestSnapshot> {
&self.manifest
}
fn inner_arc(&self) -> &Arc<SupertableInner> {
&self.inner
}
fn from_inner_pinned(
inner: Arc<SupertableInner>,
manifest: Arc<ManifestSnapshot>,
tombstone_cache: Option<Arc<SidecarCache>>,
) -> Self {
Self {
manifest,
tombstone_cache,
inner,
}
}
pub(crate) fn options(&self) -> &Arc<SupertableOptions> {
&self.inner.options
}
pub(crate) fn sql_session_cache(
&self,
) -> &Mutex<Option<(Arc<ManifestSnapshot>, SessionContext)>> {
&self.inner.sql_session_cache
}
}
impl fmt::Debug for SupertableReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SupertableReader")
.field("manifest_id", &self.manifest.manifest_id)
.field("n_superfiles", &self.manifest.superfiles.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use std::{collections::HashMap, sync::Arc};
use arrow_schema::{DataType, Field, Schema};
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::{
storage::{LocalFsStorageProvider, StorageProvider},
superfile::builder::FtsConfig,
supertable::{
manifest::{SuperfileEntry, SuperfileUri},
options::Consistency,
},
test_helpers::default_tokenizer,
};
fn schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![Field::new(
"title",
DataType::LargeUtf8,
false,
)]))
}
fn opts() -> SupertableOptions {
let tk = default_tokenizer();
SupertableOptions::new(
schema(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(tk),
)
.expect("valid options")
}
fn entry(n_docs: u64) -> Arc<SuperfileEntry> {
let id = Uuid::new_v4();
Arc::new(SuperfileEntry {
superfile_id: id,
uri: SuperfileUri(id),
n_docs,
id_min: 0,
id_max: n_docs.saturating_sub(1) as i128,
scalar_stats: HashMap::new(),
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
fn publish_appended(st: &Supertable, entries: Vec<Arc<SuperfileEntry>>) {
let old = st.inner.manifest.load();
let new = old.with_appended(entries);
st.inner.manifest.store(Arc::new(new));
}
#[test]
fn create_returns_handle_with_empty_initial_manifest() {
let st = Supertable::create(opts()).expect("create");
assert_eq!(st.manifest_id(), 0);
let r = st.reader();
assert_eq!(r.manifest_id(), 0);
assert_eq!(r.n_superfiles(), 0);
assert_eq!(r.n_docs_total(), 0);
}
#[test]
fn supertable_clone_shares_inner_state() {
let st1 = Supertable::create(opts()).expect("create");
let st2 = st1.clone();
publish_appended(&st1, vec![entry(50)]);
assert_eq!(st2.manifest_id(), 1);
}
#[test]
fn options_accessor_returns_arc_to_validated_options() {
let st = Supertable::create(opts()).expect("create");
let opts_arc = st.options();
assert_eq!(opts_arc.id_column, "_id");
assert_eq!(opts_arc.fts_columns.len(), 1);
}
#[test]
fn reader_pins_manifest_across_subsequent_commits() {
let st = Supertable::create(opts()).expect("create");
let pinned = st.reader();
assert_eq!(pinned.manifest_id(), 0);
assert_eq!(pinned.n_superfiles(), 0);
publish_appended(&st, vec![entry(10), entry(20)]);
assert_eq!(st.manifest_id(), 1);
assert_eq!(pinned.manifest_id(), 0);
assert_eq!(pinned.n_superfiles(), 0);
let fresh = st.reader();
assert_eq!(fresh.manifest_id(), 1);
assert_eq!(fresh.n_superfiles(), 2);
assert_eq!(fresh.n_docs_total(), 30);
}
#[test]
fn manifest_immutability_property() {
let st = Supertable::create(opts()).expect("create");
let r0 = st.reader();
publish_appended(&st, vec![entry(1)]);
let r1 = st.reader();
publish_appended(&st, vec![entry(2)]);
let r2 = st.reader();
publish_appended(&st, vec![entry(3)]);
let r3 = st.reader();
assert_eq!(r0.manifest_id(), 0);
assert_eq!(r1.manifest_id(), 1);
assert_eq!(r2.manifest_id(), 2);
assert_eq!(r3.manifest_id(), 3);
assert_eq!(r0.n_superfiles(), 0);
assert_eq!(r1.n_superfiles(), 1);
assert_eq!(r2.n_superfiles(), 2);
assert_eq!(r3.n_superfiles(), 3);
assert_eq!(r0.n_docs_total(), 0);
assert_eq!(r1.n_docs_total(), 1);
assert_eq!(r2.n_docs_total(), 1 + 2);
assert_eq!(r3.n_docs_total(), 1 + 2 + 3);
}
#[test]
fn reader_manifest_arc_outlives_supertable_drop() {
let r = {
let st = Supertable::create(opts()).expect("create");
publish_appended(&st, vec![entry(5)]);
st.reader()
};
assert_eq!(r.manifest_id(), 1);
assert_eq!(r.n_superfiles(), 1);
assert_eq!(r.n_docs_total(), 5);
}
#[test]
fn many_concurrent_readers_share_one_manifest() {
let st = Supertable::create(opts()).expect("create");
publish_appended(&st, vec![entry(7)]);
let r1 = st.reader();
let r2 = st.reader();
assert!(Arc::ptr_eq(r1.manifest(), r2.manifest()));
}
#[test]
fn debug_format_doesnt_explode() {
let st = Supertable::create(opts()).expect("create");
let s = format!("{:?}", st);
assert!(s.contains("Supertable"));
let r = st.reader();
let s = format!("{:?}", r);
assert!(s.contains("SupertableReader"));
}
#[test]
fn schema_returns_user_schema_without_injected_id() {
let st = Supertable::create(opts()).expect("create");
let sch = st.schema();
assert_eq!(sch.fields().len(), 1);
assert_eq!(sch.field(0).name(), "title");
}
#[test]
fn manifest_accessor_matches_reader_manifest_id() {
let st = Supertable::create(opts()).expect("create");
assert_eq!(st.manifest_id(), 0);
publish_appended(&st, vec![entry(3)]);
assert_eq!(st.manifest_id(), 1);
assert_eq!(st.reader().manifest_id(), 1);
}
#[test]
fn handle_id_is_stable_for_a_handle_and_distinct_across_handles() {
let st1 = Supertable::create(opts()).expect("create");
let st2 = Supertable::create(opts()).expect("create");
assert_eq!(st1.handle_id(), st1.clone().handle_id());
assert_ne!(st1.handle_id(), st2.handle_id());
}
#[test]
fn query_runtime_is_lazily_built_and_cached() {
let st = Supertable::create(opts()).expect("create");
let rt1 = st.query_runtime();
let rt2 = st.query_runtime();
assert!(Arc::ptr_eq(&rt1, &rt2));
}
#[test]
fn block_on_query_drives_a_future_to_completion() {
let st = Supertable::create(opts()).expect("create");
let out = st.block_on_query(async { 7_u32 + 35 });
assert_eq!(out, 42);
}
#[test]
fn stats_reports_in_memory_snapshot() {
let st = Supertable::create(opts()).expect("create");
publish_appended(&st, vec![entry(10), entry(20)]);
let s = st.stats();
assert_eq!(s.manifest_id, 1);
assert_eq!(s.n_superfiles, 2);
assert_eq!(s.n_manifest_parts, 0);
assert_eq!(s.mmap_resident_bytes, None);
assert_eq!(s.n_cold_fetches, None);
}
#[test]
fn wait_until_warm_is_noop_without_disk_cache() {
let st = Supertable::create(opts()).expect("create");
st.wait_until_warm(Duration::from_millis(1))
.expect("warm no-op");
}
#[test]
fn debug_cached_session_populates_the_session_cache() {
let st = Supertable::create(opts()).expect("create");
let _ctx = st.__debug_cached_session();
let guard = st
.sql_session_cache()
.lock()
.expect("sql_session_cache mutex");
assert!(guard.is_some(), "session cache populated after warm-up");
}
#[test]
fn weak_reader_round_trips_and_debug() {
let st = Supertable::create(opts()).expect("create");
publish_appended(&st, vec![entry(4)]);
let reader = st.reader();
let weak = WeakReader::from_reader(&reader);
assert!(format!("{weak:?}").contains("WeakReader"));
let upgraded = weak.upgrade().expect("upgrade while inner alive");
assert_eq!(upgraded.manifest_id(), reader.manifest_id());
assert_eq!(upgraded.n_superfiles(), 1);
}
#[test]
fn weak_reader_upgrade_fails_after_inner_dropped() {
let weak = {
let st = Supertable::create(opts()).expect("create");
let reader = st.reader();
let weak = WeakReader::from_reader(&reader);
drop(reader);
drop(st);
weak
};
assert!(weak.upgrade().is_none());
}
#[test]
fn reader_options_match_handle_options() {
let st = Supertable::create(opts()).expect("create");
let r = st.reader();
assert_eq!(r.options().id_column, st.options().id_column);
assert_eq!(r.options().fts_columns.len(), 1);
}
#[test]
fn ensure_fresh_under_strong_consistency_refreshes_against_storage() {
let dir = TempDir::new().expect("tempdir");
let storage: Arc<dyn StorageProvider> =
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("provider"));
let options = opts()
.with_storage(storage)
.with_read_consistency(Consistency::Strong);
let st = Supertable::create(options).expect("create storage-backed handle");
let r = st.reader();
assert_eq!(r.n_superfiles(), 0);
let advanced = bridge_sync_to_async(st.refresh()).expect("refresh against empty store");
assert!(!advanced, "no commit yet ⇒ refresh finds nothing newer");
}
}