use std::{io, sync::Arc};
use crate::{
storage::StorageProvider,
superfile::{ReadError, SuperfileReader},
supertable::{
manifest::{SubsectionOffsets, SuperfileUri},
reader_cache::{
DiskCacheStore, ReaderCacheError, SuperfileReaderCache, disk::DiskCacheError,
},
},
};
pub async fn superfile_reader(
store: &Arc<dyn SuperfileReaderCache>,
disk_cache: Option<&Arc<DiskCacheStore>>,
storage: Option<&Arc<dyn StorageProvider>>,
uri: &SuperfileUri,
offsets: Option<&SubsectionOffsets>,
) -> Result<Arc<SuperfileReader>, ReaderCacheError> {
match store.reader(uri) {
Ok(r) => return Ok(r),
Err(ReaderCacheError::NotFound { .. }) => {
}
Err(other) => return Err(other),
}
if let Some(cache) = disk_cache {
match cache.reader_with_hints(uri, offsets).await {
Ok(reader) => return Ok(reader),
Err(DiskCacheError::BudgetExceeded) => {
return cache
.open_range_only(uri, offsets)
.await
.map_err(cache_open_failed);
}
Err(e) => return Err(cache_open_failed(e)),
}
}
if let Some(storage) = storage {
let path = uri.storage_path();
let (bytes, _) = storage
.get(&path)
.await
.map_err(|e| ReaderCacheError::OpenFailed {
source: ReadError::Io(io::Error::other(format!("storage fetch {path}: {e}"))),
})?;
let reader = SuperfileReader::open(bytes)
.map_err(|source| ReaderCacheError::OpenFailed { source })?;
return Ok(Arc::new(reader));
}
Err(ReaderCacheError::NotFound { uri: *uri })
}
fn cache_open_failed(e: DiskCacheError) -> ReaderCacheError {
ReaderCacheError::OpenFailed {
source: ReadError::Io(io::Error::other(format!("disk cache fetch: {e}"))),
}
}
#[cfg(test)]
mod tests {
use arrow_array::{LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use bytes::Bytes;
use tempfile::TempDir;
use super::*;
use crate::{
storage::LocalFsStorageProvider,
superfile::{
ReadError,
builder::{BuilderOptions, SuperfileBuilder},
},
supertable::reader_cache::{InMemoryReaderCache, config::DiskCacheConfig},
test_helpers::{decimal128_id_field, decimal128_ids},
};
const N_DOCS: u64 = 3;
const TINY_BUDGET_BYTES: u64 = 4;
fn minimal_superfile_bytes() -> Bytes {
let schema: Arc<Schema> = Arc::new(Schema::new(vec![
decimal128_id_field("doc_id"),
Field::new("title", DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let ids = decimal128_ids(vec![1u64, 2, 3]);
let title = LargeStringArray::from(vec!["a", "b", "c"]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(title)])
.expect("build RecordBatch");
b.add_batch(&batch, &[]).expect("add_batch");
Bytes::from(b.finish().expect("finish builder"))
}
fn empty_store() -> Arc<dyn SuperfileReaderCache> {
Arc::new(InMemoryReaderCache::new())
}
fn local_storage(dir: &TempDir) -> Arc<dyn StorageProvider> {
Arc::new(LocalFsStorageProvider::new(dir.path()).expect("localfs"))
}
fn disk_cache(
dir: &TempDir,
storage: &Arc<dyn StorageProvider>,
mutate: impl FnOnce(&mut DiskCacheConfig),
) -> Arc<DiskCacheStore> {
let mut cfg = DiskCacheConfig {
cache_root: dir.path().join("cache"),
mmap_cold_threshold_secs: 0,
..Default::default()
};
mutate(&mut cfg);
DiskCacheStore::new_unpinned(Arc::clone(storage), cfg).expect("disk cache store")
}
async fn put_at_storage(storage: &Arc<dyn StorageProvider>, uri: &SuperfileUri, bytes: Bytes) {
storage
.put_atomic(&uri.storage_path(), bytes)
.await
.expect("put superfile bytes");
}
#[tokio::test]
async fn in_memory_hit_returns_reader_without_touching_fallbacks() {
let store = empty_store();
let uri = SuperfileUri::new_v4();
store
.insert(uri, minimal_superfile_bytes())
.expect("insert into in-memory tier");
let reader = superfile_reader(&store, None, None, &uri, None)
.await
.expect("in-memory hit");
assert_eq!(reader.n_docs(), N_DOCS);
}
#[derive(Debug)]
struct AlwaysOpenFailedCache;
impl SuperfileReaderCache for AlwaysOpenFailedCache {
fn reader(&self, _uri: &SuperfileUri) -> Result<Arc<SuperfileReader>, ReaderCacheError> {
Err(ReaderCacheError::OpenFailed {
source: ReadError::Io(io::Error::other("in-memory tier boom")),
})
}
fn insert(&self, _uri: SuperfileUri, _bytes: Bytes) -> Result<(), ReaderCacheError> {
Ok(())
}
fn resident_bytes(&self) -> usize {
0
}
}
#[tokio::test]
async fn in_memory_non_not_found_error_short_circuits_before_fallback() {
let store: Arc<dyn SuperfileReaderCache> = Arc::new(AlwaysOpenFailedCache);
let uri = SuperfileUri::new_v4();
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
put_at_storage(&storage, &uri, minimal_superfile_bytes()).await;
let err = superfile_reader(&store, None, Some(&storage), &uri, None)
.await
.expect_err("in-memory error must propagate");
assert!(
matches!(err, ReaderCacheError::OpenFailed { .. }),
"expected the in-memory OpenFailed to surface, got {err:?}",
);
}
#[tokio::test]
async fn disk_cache_cold_fetch_on_in_memory_miss() {
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
let uri = SuperfileUri::new_v4();
put_at_storage(&storage, &uri, minimal_superfile_bytes()).await;
let cache = disk_cache(&dir, &storage, |_| {});
let reader = superfile_reader(&empty_store(), Some(&cache), None, &uri, None)
.await
.expect("disk cache cold fetch");
assert_eq!(reader.n_docs(), N_DOCS);
}
#[tokio::test]
async fn disk_cache_budget_exceeded_falls_back_to_range_only() {
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
let uri = SuperfileUri::new_v4();
put_at_storage(&storage, &uri, minimal_superfile_bytes()).await;
let cache = disk_cache(&dir, &storage, |cfg| {
cfg.disk_budget_bytes = TINY_BUDGET_BYTES;
});
let reader = superfile_reader(&empty_store(), Some(&cache), None, &uri, None)
.await
.expect("range-only fallback on budget exceeded");
assert_eq!(reader.n_docs(), N_DOCS);
}
#[tokio::test]
async fn disk_cache_open_failure_surfaces_as_open_failed() {
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
let cache = disk_cache(&dir, &storage, |_| {});
let uri = SuperfileUri::new_v4();
let err = superfile_reader(&empty_store(), Some(&cache), None, &uri, None)
.await
.expect_err("missing storage object must error");
assert!(
matches!(err, ReaderCacheError::OpenFailed { .. }),
"expected OpenFailed, got {err:?}",
);
}
#[tokio::test]
async fn storage_only_fallback_opens_whole_object() {
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
let uri = SuperfileUri::new_v4();
put_at_storage(&storage, &uri, minimal_superfile_bytes()).await;
let reader = superfile_reader(&empty_store(), None, Some(&storage), &uri, None)
.await
.expect("storage-only fallback");
assert_eq!(reader.n_docs(), N_DOCS);
}
#[tokio::test]
async fn storage_only_fallback_missing_object_is_open_failed() {
let dir = TempDir::new().expect("tempdir");
let storage = local_storage(&dir);
let uri = SuperfileUri::new_v4();
let err = superfile_reader(&empty_store(), None, Some(&storage), &uri, None)
.await
.expect_err("missing object must error");
assert!(
matches!(err, ReaderCacheError::OpenFailed { .. }),
"expected OpenFailed, got {err:?}",
);
}
#[tokio::test]
async fn no_cache_no_storage_returns_not_found() {
let uri = SuperfileUri::new_v4();
let err = superfile_reader(&empty_store(), None, None, &uri, None)
.await
.expect_err("in-process-only miss must be NotFound");
match err {
ReaderCacheError::NotFound { uri: got } => assert_eq!(got, uri),
other => panic!("expected NotFound, got {other:?}"),
}
}
#[test]
fn cache_open_failed_maps_to_open_failed_and_preserves_message() {
let mapped = cache_open_failed(DiskCacheError::BudgetExceeded);
match mapped {
ReaderCacheError::OpenFailed { source } => {
let msg = source.to_string();
assert!(
msg.contains("disk cache fetch"),
"wrapped message should name the disk cache path, got {msg:?}",
);
}
other => panic!("expected OpenFailed, got {other:?}"),
}
}
}