use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use bytes::Bytes;
use super::{ReaderCacheError, SuperfileReaderCache};
use crate::{superfile::SuperfileReader, supertable::manifest::SuperfileUri};
struct Entry {
bytes: Bytes,
reader: Arc<SuperfileReader>,
}
pub struct InMemoryReaderCache {
inner: RwLock<HashMap<SuperfileUri, Entry>>,
}
impl InMemoryReaderCache {
pub fn new() -> Self {
Self {
inner: RwLock::new(HashMap::new()),
}
}
pub fn n_superfiles(&self) -> usize {
self.inner
.read()
.expect("InMemoryReaderCache rwlock poisoned")
.len()
}
}
impl Default for InMemoryReaderCache {
fn default() -> Self {
Self::new()
}
}
impl SuperfileReaderCache for InMemoryReaderCache {
fn reader(&self, uri: &SuperfileUri) -> Result<Arc<SuperfileReader>, ReaderCacheError> {
let map = self
.inner
.read()
.expect("InMemoryReaderCache rwlock poisoned");
map.get(uri)
.map(|entry| Arc::clone(&entry.reader))
.ok_or(ReaderCacheError::NotFound { uri: *uri })
}
fn insert(&self, uri: SuperfileUri, bytes: Bytes) -> Result<(), ReaderCacheError> {
{
let map = self
.inner
.read()
.expect("InMemoryReaderCache rwlock poisoned");
if map.contains_key(&uri) {
return Ok(());
}
}
let reader = SuperfileReader::open(bytes.clone())
.map_err(|source| ReaderCacheError::OpenFailed { source })?;
let mut map = self
.inner
.write()
.expect("InMemoryReaderCache rwlock poisoned");
map.entry(uri).or_insert(Entry {
bytes,
reader: Arc::new(reader),
});
Ok(())
}
fn resident_bytes(&self) -> usize {
self.inner
.read()
.expect("InMemoryReaderCache rwlock poisoned")
.values()
.map(|e| e.bytes.len())
.sum()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::{LargeStringArray, RecordBatch};
use arrow_schema::{Field, Schema};
use super::*;
use crate::{
superfile::builder::{BuilderOptions, SuperfileBuilder},
test_helpers::{decimal128_id_field, decimal128_ids},
};
fn minimal_superfile_bytes() -> Bytes {
let schema: Arc<Schema> = Arc::new(Schema::new(vec![
decimal128_id_field("doc_id"),
Field::new("title", arrow_schema::DataType::LargeUtf8, false),
]));
let opts = BuilderOptions::new(schema.clone(), "doc_id", vec![], vec![], None);
let mut b = SuperfileBuilder::new(opts).expect("new SuperfileBuilder");
let ids = decimal128_ids(vec![1u64, 2, 3]);
let title = LargeStringArray::from(vec!["a", "b", "c"]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(ids), Arc::new(title)])
.expect("build RecordBatch");
b.add_batch(&batch, &[]).expect("add_batch");
Bytes::from(b.finish().expect("finish builder"))
}
fn fresh_uri() -> SuperfileUri {
SuperfileUri::new_v4()
}
#[test]
fn new_store_is_empty() {
let store = InMemoryReaderCache::new();
assert_eq!(store.n_superfiles(), 0);
assert_eq!(store.resident_bytes(), 0);
}
#[test]
fn insert_then_reader_round_trips() {
let store = InMemoryReaderCache::new();
let bytes = minimal_superfile_bytes();
let uri = fresh_uri();
store
.insert(uri, bytes.clone())
.expect("insert should succeed");
let r = store.reader(&uri).expect("reader should find uri");
assert_eq!(r.n_docs(), 3, "minimal superfile carries 3 docs");
assert_eq!(store.n_superfiles(), 1);
assert_eq!(store.resident_bytes(), bytes.len());
}
#[test]
fn reader_on_unknown_uri_returns_not_found() {
let store = InMemoryReaderCache::new();
let unknown = fresh_uri();
let err = store.reader(&unknown).expect_err("expected error");
match err {
ReaderCacheError::NotFound { uri } => {
assert_eq!(uri, unknown);
}
other => panic!("expected NotFound, got {:?}", other),
}
}
#[test]
fn insert_with_invalid_bytes_returns_open_failed() {
let store = InMemoryReaderCache::new();
let uri = fresh_uri();
let garbage = Bytes::from(vec![0u8; 16]);
let err = store.insert(uri, garbage).expect_err("expected error");
assert!(matches!(err, ReaderCacheError::OpenFailed { .. }));
assert_eq!(store.n_superfiles(), 0);
assert_eq!(store.resident_bytes(), 0);
assert!(store.reader(&uri).is_err());
}
#[test]
fn insert_is_idempotent_on_duplicate_uri() {
let store = InMemoryReaderCache::new();
let bytes = minimal_superfile_bytes();
let uri = fresh_uri();
store.insert(uri, bytes.clone()).expect("first insert");
let bytes_after_first = store.resident_bytes();
let r1 = store.reader(&uri).expect("first reader");
store.insert(uri, bytes.clone()).expect("idempotent insert");
let r2 = store.reader(&uri).expect("second reader");
assert_eq!(store.n_superfiles(), 1, "still one superfile");
assert_eq!(
store.resident_bytes(),
bytes_after_first,
"byte accounting unchanged",
);
assert!(Arc::ptr_eq(&r1, &r2));
}
#[test]
fn resident_bytes_sums_across_superfiles() {
let store = InMemoryReaderCache::new();
let bytes_a = minimal_superfile_bytes();
let bytes_b = minimal_superfile_bytes();
store
.insert(fresh_uri(), bytes_a.clone())
.expect("insert a");
store
.insert(fresh_uri(), bytes_b.clone())
.expect("insert b");
assert_eq!(store.n_superfiles(), 2);
assert_eq!(store.resident_bytes(), bytes_a.len() + bytes_b.len());
}
#[test]
fn concurrent_reader_clones_share_arc() {
let store = InMemoryReaderCache::new();
let uri = fresh_uri();
store
.insert(uri, minimal_superfile_bytes())
.expect("insert");
let a = store.reader(&uri).expect("a");
let b = store.reader(&uri).expect("b");
assert!(Arc::ptr_eq(&a, &b));
}
#[test]
fn store_is_send_and_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<InMemoryReaderCache>();
assert_send_sync::<Arc<dyn SuperfileReaderCache>>();
}
#[test]
fn concurrent_reads_after_put_succeed() {
use std::thread;
let store: Arc<dyn SuperfileReaderCache> = Arc::new(InMemoryReaderCache::new());
let bytes = minimal_superfile_bytes();
let uris: Vec<SuperfileUri> = (0..4).map(|_| fresh_uri()).collect();
for u in &uris {
store.insert(*u, bytes.clone()).expect("insert");
}
let mut handles = Vec::new();
for _ in 0..4 {
let store = Arc::clone(&store);
let uris = uris.clone();
handles.push(thread::spawn(move || {
for _ in 0..100 {
for u in &uris {
let r = store.reader(u).expect("reader");
assert_eq!(r.n_docs(), 3);
}
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
}
#[test]
fn concurrent_puts_of_distinct_uris_all_succeed() {
use std::thread;
let store: Arc<dyn SuperfileReaderCache> = Arc::new(InMemoryReaderCache::new());
let mut handles = Vec::new();
let n_threads = 4;
let n_per_thread = 4;
for _ in 0..n_threads {
let store = Arc::clone(&store);
handles.push(thread::spawn(move || {
for _ in 0..n_per_thread {
let uri = fresh_uri();
store
.insert(uri, minimal_superfile_bytes())
.expect("insert should succeed");
}
}));
}
for h in handles {
h.join().expect("thread panicked");
}
let store_concrete = InMemoryReaderCache::new();
let _ = store_concrete; let n = (0..n_threads * n_per_thread).map(|_| ()).count();
let resident = store.resident_bytes();
assert!(resident > 0);
let expected_per_seg = minimal_superfile_bytes().len();
assert_eq!(resident, expected_per_seg * n);
}
#[test]
fn concurrent_puts_of_same_uri_resolve_to_one_entry() {
use std::thread;
let store: Arc<dyn SuperfileReaderCache> = Arc::new(InMemoryReaderCache::new());
let uri = fresh_uri();
let mut handles = Vec::new();
for _ in 0..8 {
let store = Arc::clone(&store);
handles.push(thread::spawn(move || {
store
.insert(uri, minimal_superfile_bytes())
.expect("insert should succeed");
}));
}
for h in handles {
h.join().expect("thread panicked");
}
assert_eq!(store.resident_bytes(), minimal_superfile_bytes().len());
let r1 = store.reader(&uri).expect("r1");
let r2 = store.reader(&uri).expect("r2");
assert!(Arc::ptr_eq(&r1, &r2));
}
}