mod index_spec;
mod manifest;
mod options;
mod search_tvf;
mod uri;
use std::{
collections::{HashMap, HashSet},
sync::{
Arc, Mutex, OnceLock,
atomic::{AtomicU64, Ordering},
},
time::{SystemTime, UNIX_EPOCH},
};
use arrow::record_batch::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion::{config::Dialect, execution::context::SessionContext};
use futures::future::try_join_all;
pub use index_spec::IndexSpec;
use manifest::{
TableEntry, VectorEntry, commit_catalog, read_catalog, schema_from_ipc, schema_to_ipc,
};
pub use options::{ColdFetchMode, ConnectOptions};
use tokio::runtime::Runtime;
use uri::{Backend, parse_uri};
use crate::{
InfinoError,
runtime_bridge::{bridge_on_runtime, bridge_sync_to_async, build_query_runtime},
storage::{StorageError, StorageProvider},
superfile::{
builder::FtsConfig,
fts::tokenize::{AsciiLowerTokenizer, Tokenizer},
vector::{builder::VectorConfig, distance::Metric},
},
supertable::{
Supertable,
options::SupertableOptions,
reader_cache::{DiskCacheConfig, DiskCacheStore},
},
};
pub fn connect(uri: impl AsRef<str>) -> Result<Connection, InfinoError> {
connect_with(uri, ConnectOptions::default())
}
pub fn connect_with(
uri: impl AsRef<str>,
options: ConnectOptions,
) -> Result<Connection, InfinoError> {
let backend = parse_uri(uri.as_ref())?;
let store = match &backend {
Backend::Memory => CatalogStore::Memory(Mutex::new(HashMap::new())),
_ => {
let root = backend_to_provider(&backend, &options)?
.expect("non-memory backend yields a storage provider");
CatalogStore::Storage(root)
}
};
Ok(Connection {
inner: Arc::new(ConnectionInner {
backend,
options,
store,
query_runtime: OnceLock::new(),
}),
})
}
#[derive(Clone)]
pub struct Connection {
inner: Arc<ConnectionInner>,
}
struct ConnectionInner {
backend: Backend,
options: ConnectOptions,
store: CatalogStore,
query_runtime: OnceLock<Arc<Runtime>>,
}
impl Drop for ConnectionInner {
fn drop(&mut self) {
if let Some(rt) = self.query_runtime.take()
&& let Ok(rt) = Arc::try_unwrap(rt)
{
rt.shutdown_background();
}
}
}
enum CatalogStore {
Memory(Mutex<HashMap<String, Supertable>>),
Storage(Arc<dyn StorageProvider>),
}
impl Connection {
pub fn create_table(
&self,
name: &str,
schema: SchemaRef,
indexes: IndexSpec,
) -> Result<Supertable, InfinoError> {
validate_name(name)?;
let (fts_cfg, vec_cfg) = indexes.to_configs();
let tokenizer = table_tokenizer(&indexes);
match &self.inner.store {
CatalogStore::Memory(map) => {
let opts = build_options(schema, fts_cfg, vec_cfg, tokenizer, None)?;
let handle = Supertable::create(opts)?;
let mut map = map.lock().expect("catalog mutex poisoned");
if map.contains_key(name) {
return Err(InfinoError::AlreadyExists(name.to_string()));
}
map.insert(name.to_string(), handle.clone());
Ok(handle)
}
CatalogStore::Storage(root) => {
let vectors: Vec<VectorEntry> = vec_cfg
.iter()
.map(|vc| VectorEntry {
column: vc.column.clone(),
dim: vc.dim,
n_cent: vc.n_cent,
metric: metric_to_str(vc.metric).to_string(),
})
.collect();
let location = unique_location(name);
let entry = TableEntry {
location: location.clone(),
schema_ipc: schema_to_ipc(&schema)?,
fts: indexes.fts_columns().to_vec(),
vectors,
created_at_unix: now_unix(),
};
let table_storage =
backend_to_provider(&self.inner.backend.join(&location), &self.inner.options)?
.expect("non-memory backend yields a storage provider");
let disk_cache = build_disk_cache(&self.inner.options, &table_storage, name)?;
let mut opts =
build_options(schema, fts_cfg, vec_cfg, tokenizer, Some(table_storage))?;
if let Some(cache) = disk_cache {
opts = opts.with_disk_cache(cache);
}
let handle = Supertable::create(opts)?;
let name = name.to_string();
bridge_sync_to_async(commit_catalog(root.as_ref(), move |body| {
if body.tables.contains_key(&name) {
return Err(InfinoError::AlreadyExists(name.clone()));
}
body.tables.insert(name.clone(), entry.clone());
Ok(())
}))?;
Ok(handle)
}
}
}
pub fn open_table(&self, name: &str) -> Result<Supertable, InfinoError> {
match &self.inner.store {
CatalogStore::Memory(map) => map
.lock()
.expect("catalog mutex poisoned")
.get(name)
.cloned()
.ok_or_else(|| InfinoError::NotFound(name.to_string())),
CatalogStore::Storage(root) => {
let (body, _etag) = bridge_sync_to_async(read_catalog(root.as_ref()))?;
let entry = body
.tables
.get(name)
.ok_or_else(|| InfinoError::NotFound(name.to_string()))?;
let schema = schema_from_ipc(&entry.schema_ipc)?;
let mut spec = IndexSpec::new();
for column in &entry.fts {
spec = spec.fts(column.clone());
}
for v in &entry.vectors {
spec = spec.vector(
v.column.clone(),
v.dim,
v.n_cent,
metric_from_str(&v.metric)?,
);
}
let (fts_cfg, vec_cfg) = spec.to_configs();
let tokenizer = table_tokenizer(&spec);
let table_storage = backend_to_provider(
&self.inner.backend.join(&entry.location),
&self.inner.options,
)?
.expect("non-memory backend yields a storage provider");
let disk_cache = build_disk_cache(&self.inner.options, &table_storage, name)?;
let mut opts =
build_options(schema, fts_cfg, vec_cfg, tokenizer, Some(table_storage))?;
if let Some(cache) = disk_cache {
opts = opts.with_disk_cache(cache);
}
Ok(Supertable::open(opts)?)
}
}
}
pub fn drop_table(&self, name: &str, purge: bool) -> Result<(), InfinoError> {
match &self.inner.store {
CatalogStore::Memory(map) => map
.lock()
.expect("catalog mutex poisoned")
.remove(name)
.map(|_| ())
.ok_or_else(|| InfinoError::NotFound(name.to_string())),
CatalogStore::Storage(root) => {
let mut location: Option<String> = None;
bridge_sync_to_async(commit_catalog(root.as_ref(), |body| {
match body.tables.remove(name) {
Some(entry) => {
location = Some(entry.location);
Ok(())
}
None => Err(InfinoError::NotFound(name.to_string())),
}
}))?;
if purge {
let location =
location.expect("catalog commit succeeded => an entry was removed");
bridge_sync_to_async(async {
let objects = root.list_with_prefix(&location).await?;
try_join_all(objects.iter().map(|uri| root.delete(uri))).await?;
Ok::<(), StorageError>(())
})?;
}
Ok(())
}
}
}
pub fn list_tables(&self) -> Result<Vec<String>, InfinoError> {
match &self.inner.store {
CatalogStore::Memory(map) => {
let mut names: Vec<String> = map
.lock()
.expect("catalog mutex poisoned")
.keys()
.cloned()
.collect();
names.sort();
Ok(names)
}
CatalogStore::Storage(root) => {
let (body, _etag) = bridge_sync_to_async(read_catalog(root.as_ref()))?;
Ok(body.tables.into_keys().collect())
}
}
}
pub fn query_sql(&self, sql: &str) -> Result<Vec<RecordBatch>, InfinoError> {
let ctx = SessionContext::new();
let statement = ctx
.state()
.sql_to_statement(sql, &Dialect::Generic)
.map_err(|e| InfinoError::Query(e.to_string()))?;
let refs = ctx
.state()
.resolve_table_references(&statement)
.map_err(|e| InfinoError::Query(e.to_string()))?;
let mut seen = HashSet::new();
let mut handles: Vec<Supertable> = Vec::new();
for r in &refs {
let name = r.table().to_string();
if !seen.insert(name.clone()) {
continue;
}
match self.open_table(&name) {
Ok(table) => {
table
.register_into(&ctx, &name)
.map_err(|e| InfinoError::Query(e.to_string()))?;
handles.push(table);
}
Err(InfinoError::NotFound(_)) => {}
Err(e) => return Err(e),
}
}
search_tvf::register_search_tvfs(&ctx, self.clone());
let sql = sql.to_owned();
let drive = async move {
let df = ctx
.sql(&sql)
.await
.map_err(|e| InfinoError::Query(e.to_string()))?;
df.collect()
.await
.map_err(|e| InfinoError::Query(e.to_string()))
};
match handles.first() {
Some(table) => table.block_on_query(drive),
None => bridge_on_runtime(drive, &self.query_runtime()),
}
}
fn query_runtime(&self) -> Arc<Runtime> {
Arc::clone(
self.inner
.query_runtime
.get_or_init(|| build_query_runtime("catalog-query")),
)
}
}
fn build_options(
schema: SchemaRef,
fts: Vec<FtsConfig>,
vectors: Vec<VectorConfig>,
tokenizer: Option<Arc<dyn Tokenizer>>,
storage: Option<Arc<dyn StorageProvider>>,
) -> Result<SupertableOptions, InfinoError> {
let mut opts = SupertableOptions::new(schema, fts, vectors, tokenizer)?;
if let Some(s) = storage {
opts = opts.with_storage(s);
}
Ok(opts)
}
fn table_tokenizer(indexes: &IndexSpec) -> Option<Arc<dyn Tokenizer>> {
if indexes.has_fts() {
Some(Arc::new(AsciiLowerTokenizer))
} else {
None
}
}
fn backend_to_provider(
backend: &Backend,
options: &ConnectOptions,
) -> Result<Option<Arc<dyn StorageProvider>>, InfinoError> {
use crate::storage::{AzureStorageProvider, LocalFsStorageProvider, S3StorageProvider};
let provider: Option<Arc<dyn StorageProvider>> = match backend {
Backend::Memory => None,
Backend::LocalFs { root } => Some(Arc::new(LocalFsStorageProvider::new(root.clone())?)),
Backend::S3 { bucket, prefix } => {
let p = match options.s3.as_ref() {
Some(s3) => S3StorageProvider::new_with_endpoint_and_prefix(
&s3.endpoint,
bucket,
&s3.access_key,
&s3.secret_key,
&s3.region,
prefix,
)?,
None => S3StorageProvider::new_with_prefix(bucket, prefix)?,
};
Some(Arc::new(p))
}
Backend::Azure { container, prefix } => Some(Arc::new(
AzureStorageProvider::new_with_prefix(container, prefix)?,
)),
};
Ok(provider)
}
fn build_disk_cache(
options: &ConnectOptions,
storage: &Arc<dyn StorageProvider>,
name: &str,
) -> Result<Option<Arc<DiskCacheStore>>, InfinoError> {
let Some(cache_root) = options.cache_dir.as_ref() else {
return Ok(None);
};
let mut cfg = DiskCacheConfig {
cache_root: cache_root.join(name),
cold_fetch_mode: options.cold_fetch_mode.to_internal(),
..Default::default()
};
if let Some(budget) = options.cache_budget_bytes {
cfg.disk_budget_bytes = budget;
}
let cache = DiskCacheStore::new_unpinned(Arc::clone(storage), cfg)
.map_err(|e| InfinoError::Io(e.to_string()))?;
Ok(Some(cache))
}
fn validate_name(name: &str) -> Result<(), InfinoError> {
let ok = !name.is_empty()
&& !name.starts_with('_')
&& name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-');
if ok {
Ok(())
} else {
Err(InfinoError::Backend(format!(
"invalid table name {name:?}: use non-empty [A-Za-z0-9_-], not starting with '_'"
)))
}
}
fn unique_location(name: &str) -> String {
static SEQ: AtomicU64 = AtomicU64::new(0);
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let seq = SEQ.fetch_add(1, Ordering::Relaxed);
format!("{name}-{nanos:x}-{seq:x}")
}
fn metric_to_str(m: Metric) -> &'static str {
match m {
Metric::Cosine => "cosine",
Metric::L2Sq => "l2sq",
Metric::NegDot => "negdot",
}
}
fn metric_from_str(s: &str) -> Result<Metric, InfinoError> {
match s {
"cosine" => Ok(Metric::Cosine),
"l2sq" => Ok(Metric::L2Sq),
"negdot" => Ok(Metric::NegDot),
other => Err(InfinoError::Backend(format!(
"unknown vector metric {other:?}"
))),
}
}
fn now_unix() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use std::{fs, path::Path};
use arrow_schema::{DataType, Field, Schema};
use super::*;
use crate::{
BoolMode,
test_helpers::{build_title_batch, schema_id_title},
};
const TOP_K: usize = 10;
fn n_rows(batches: &[RecordBatch]) -> usize {
batches.iter().map(|b| b.num_rows()).sum()
}
#[test]
fn memory_create_open_search_drop() {
let conn = connect("memory://").expect("connect");
let table = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create_table");
table
.append(&build_title_batch(&["the quick brown fox"]))
.expect("append");
assert_eq!(conn.list_tables().expect("list"), vec!["docs".to_string()]);
let reopened = conn.open_table("docs").expect("open_table");
let hits = reopened
.bm25_search("title", "fox", TOP_K, BoolMode::Or, None)
.expect("bm25_search");
assert_eq!(n_rows(&hits), 1, "expected one hit for 'fox'");
conn.drop_table("docs", false).expect("drop_table");
assert!(conn.list_tables().expect("list").is_empty());
assert!(matches!(
conn.open_table("docs"),
Err(InfinoError::NotFound(_))
));
}
#[test]
fn duplicate_create_is_already_exists() {
let conn = connect("memory://").expect("connect");
conn.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("first create");
let again = conn.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"));
assert!(matches!(again, Err(InfinoError::AlreadyExists(_))));
}
#[test]
fn open_missing_is_not_found() {
let conn = connect("memory://").expect("connect");
assert!(matches!(
conn.open_table("nope"),
Err(InfinoError::NotFound(_))
));
}
#[test]
fn invalid_table_name_rejected() {
let conn = connect("memory://").expect("connect");
let bad = conn.create_table("has space", schema_id_title(), IndexSpec::new());
assert!(bad.is_err());
}
#[test]
fn underscore_prefixed_name_rejected() {
let conn = connect("memory://").expect("connect");
assert!(
conn.create_table("_catalog", schema_id_title(), IndexSpec::new())
.is_err()
);
assert!(
conn.create_table("_hidden", schema_id_title(), IndexSpec::new())
.is_err()
);
}
#[test]
fn drop_then_recreate_same_name_is_empty() {
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let conn = connect(&uri).expect("connect");
let first = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create");
first
.append(&build_title_batch(&["a lazy sleeping fox"]))
.expect("append");
assert_eq!(
n_rows(
&first
.bm25_search("title", "fox", TOP_K, BoolMode::Or, None)
.expect("search")
),
1
);
conn.drop_table("docs", false).expect("drop");
let second = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("recreate");
assert_eq!(
n_rows(
&second
.bm25_search("title", "fox", TOP_K, BoolMode::Or, None)
.expect("search")
),
0,
"re-created table must not resurrect the dropped table's rows"
);
}
#[test]
fn drop_with_purge_reclaims_the_storage_subtree() {
fn files_under_location(dir: &Path, prefix: &str) -> usize {
let mut n = 0;
let mut stack = vec![dir.to_path_buf()];
while let Some(d) = stack.pop() {
let Ok(entries) = fs::read_dir(&d) else {
continue;
};
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path
.components()
.any(|c| c.as_os_str().to_string_lossy().starts_with(prefix))
{
n += 1;
}
}
}
n
}
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let conn = connect(&uri).expect("connect");
let table = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create");
table
.append(&build_title_batch(&["a lazy sleeping fox"]))
.expect("append");
assert!(
files_under_location(dir.path(), "docs-") > 0,
"committed table must have bytes under its unique location"
);
conn.drop_table("docs", true).expect("drop with purge");
assert!(conn.list_tables().expect("list").is_empty());
assert_eq!(
files_under_location(dir.path(), "docs-"),
0,
"purge must delete every object under the dropped table's location"
);
}
#[test]
fn query_sql_resolves_tables_by_catalog_name() {
use arrow_array::Int64Array;
let conn = connect("memory://").expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&["the quick brown fox", "a lazy dog"]))
.expect("append docs");
let more = conn
.create_table("more", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create more");
more.append(&build_title_batch(&["hello world"]))
.expect("append more");
let batches = conn
.query_sql("SELECT COUNT(*) AS n FROM docs")
.expect("count docs");
let n = batches[0]
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("Int64 count")
.value(0);
assert_eq!(n, 2, "docs has two rows");
let rows: usize = conn
.query_sql("SELECT title FROM docs UNION ALL SELECT title FROM more")
.expect("union across tables")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 3, "2 from docs + 1 from more");
}
#[test]
fn query_sql_bm25_search_tvf_resolves_table() {
let conn = connect("memory://").expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&["the quick brown fox", "a lazy dog"]))
.expect("append");
let rows: usize = conn
.query_sql("SELECT _id, score FROM bm25_search('docs', 'title', 'fox', 10)")
.expect("bm25_search tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 1, "one doc matches 'fox'");
assert!(
conn.query_sql("SELECT _id FROM bm25_search('nope', 'title', 'fox', 10)")
.is_err()
);
}
#[test]
fn query_sql_search_tvf_over_storage_does_not_panic() {
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let conn = connect(&uri).expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&["the quick brown fox", "a lazy dog"]))
.expect("append");
let rows: usize = conn
.query_sql("SELECT _id, score FROM bm25_search('docs', 'title', 'fox', 10)")
.expect("bm25_search tvf over storage")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 1, "one doc matches 'fox'");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn connection_drops_cleanly_inside_async_runtime() {
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let conn = connect(&uri).expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&["the quick brown fox"]))
.expect("append");
conn.query_sql("SELECT _id FROM bm25_search('docs', 'title', 'fox', 10)")
.expect("query");
drop(docs);
drop(conn); }
#[test]
fn query_sql_match_tvfs_resolve_table() {
let conn = connect("memory://").expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&[
"the quick brown fox",
"a lazy dog",
"quick thinking",
]))
.expect("append");
let rows: usize = conn
.query_sql("SELECT _id FROM token_match('docs', 'title', 'quick')")
.expect("token_match tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 2, "two docs contain 'quick'");
let rows: usize = conn
.query_sql(
"SELECT _id FROM token_match('docs', 'title', 'quick') \
EXCEPT \
SELECT _id FROM token_match('docs', 'title', 'fox')",
)
.expect("EXCEPT over token_match")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 1, "'quick thinking' has quick but not fox");
let rows: usize = conn
.query_sql("SELECT _id FROM exact_match('docs', 'title', 'a lazy dog')")
.expect("exact_match tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 1, "one doc equals the raw string exactly");
}
#[test]
fn query_sql_prefix_vector_and_hybrid_tvfs_resolve_table() {
use crate::Metric;
const DIM: usize = 16;
const N_CENT: usize = 4;
const ROWS: usize = 4;
const TOP_K: usize = 4;
let schema = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new(
"emb",
DataType::FixedSizeList(
Arc::new(Field::new("item", DataType::Float32, true)),
DIM as i32,
),
false,
),
]));
let batch = {
use arrow_array::{FixedSizeListArray, Float32Array, LargeStringArray};
let titles = ["rust async", "python data", "rust systems", "go rust"];
let mut flat = Vec::<f32>::with_capacity(ROWS * DIM);
for i in 0..ROWS {
for d in 0..DIM {
flat.push(if d == i { 1.0 } else { 0.0 });
}
}
let field = Arc::new(Field::new("item", DataType::Float32, true));
let list = FixedSizeListArray::new(
field,
DIM as i32,
Arc::new(Float32Array::from(flat)),
None,
);
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(LargeStringArray::from(titles.to_vec())),
Arc::new(list),
],
)
.expect("vector batch")
};
let conn = connect("memory://").expect("connect");
let table = conn
.create_table(
"vecs",
schema,
IndexSpec::new()
.fts("title")
.vector("emb", DIM, N_CENT, Metric::L2Sq),
)
.expect("create table");
table.append(&batch).expect("append");
let one_hot_0 = (0..DIM)
.map(|d| if d == 0 { "1" } else { "0" })
.collect::<Vec<_>>()
.join(",");
let prefix_rows: usize = conn
.query_sql(&format!(
"SELECT _id FROM bm25_search_prefix('vecs', 'title', 'rus', {TOP_K})"
))
.expect("bm25_search_prefix tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert!(prefix_rows >= 1, "'rus' prefix should match 'rust' docs");
let vec_rows: usize = conn
.query_sql(&format!(
"SELECT _id FROM vector_search('vecs', 'emb', '{one_hot_0}', {TOP_K})"
))
.expect("vector_search tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert!(vec_rows >= 1, "vector_search should return neighbours");
let hybrid_rows: usize = conn
.query_sql(&format!(
"SELECT _id FROM hybrid_search('vecs', 'title', 'rust', 'emb', '{one_hot_0}', {TOP_K})"
))
.expect("hybrid_search tvf")
.iter()
.map(|b| b.num_rows())
.sum();
assert!(
hybrid_rows >= 1,
"hybrid_search should fuse and return hits"
);
}
#[test]
fn localfs_with_disk_cache() {
let root = tempfile::tempdir().expect("tempdir");
let cache = tempfile::tempdir().expect("cache tempdir");
let opts = ConnectOptions::new()
.with_cache_dir(cache.path())
.with_cold_fetch_mode(ColdFetchMode::HybridWithPrefetch)
.with_cache_budget_bytes(64 * 1024 * 1024);
let conn = connect_with(root.path().to_str().expect("utf8"), opts).expect("connect");
let table = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create");
table
.append(&build_title_batch(&["the quick brown fox"]))
.expect("append");
let hits = table
.bm25_search("title", "fox", TOP_K, BoolMode::Or, None)
.expect("search");
assert_eq!(n_rows(&hits), 1);
assert!(cache.path().join("docs").exists());
}
#[test]
fn connect_with_default_options_yields_empty_memory_catalog() {
let db = connect_with("memory://", ConnectOptions::new()).expect("connect_with");
assert!(db.list_tables().expect("list").is_empty());
}
#[test]
fn connection_clone_shares_one_catalog() {
let conn = connect("memory://").expect("connect");
let clone = conn.clone();
conn.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create on original");
assert_eq!(clone.list_tables().expect("list"), vec!["docs".to_string()]);
}
#[test]
fn query_sql_table_free_select_uses_shared_bridge() {
let conn = connect("memory://").expect("connect");
let batches = conn
.query_sql("SELECT 1 AS one")
.expect("table-free select");
assert_eq!(n_rows(&batches), 1);
}
#[test]
fn query_sql_invalid_sql_is_query_error() {
let conn = connect("memory://").expect("connect");
let err = conn.query_sql("NOT VALID SQL @@@");
assert!(matches!(err, Err(InfinoError::Query(_))), "got {err:?}");
}
#[test]
fn drop_missing_is_not_found() {
let conn = connect("memory://").expect("connect");
assert!(matches!(
conn.drop_table("nope", false),
Err(InfinoError::NotFound(_))
));
}
#[test]
fn empty_table_name_rejected() {
let conn = connect("memory://").expect("connect");
assert!(
conn.create_table("", schema_id_title(), IndexSpec::new())
.is_err()
);
}
#[test]
fn vector_index_round_trips_metric_through_storage_catalog() {
use crate::Metric;
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let schema = Arc::new(Schema::new(vec![
Field::new("title", DataType::LargeUtf8, false),
Field::new(
"embedding",
DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Float32, true)), 16),
false,
),
]));
let one_vector = || -> RecordBatch {
use arrow_array::{FixedSizeListArray, Float32Array, LargeStringArray};
let values = Float32Array::from(vec![0.0_f32; 16]);
let field = Arc::new(Field::new("item", DataType::Float32, true));
let list = FixedSizeListArray::new(field, 16, Arc::new(values), None);
RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(LargeStringArray::from(vec!["hello"])),
Arc::new(list),
],
)
.expect("vector batch")
};
{
let conn = connect(&uri).expect("connect");
let table = conn
.create_table(
"vecs",
schema.clone(),
IndexSpec::new()
.fts("title")
.vector("embedding", 16, 4, Metric::L2Sq),
)
.expect("create vector table");
table.append(&one_vector()).expect("append vector row");
}
let conn = connect(&uri).expect("reconnect");
assert_eq!(conn.list_tables().expect("list"), vec!["vecs".to_string()]);
conn.open_table("vecs").expect("open vector table");
}
#[test]
fn metric_str_round_trips_all_variants_and_rejects_unknown() {
for m in [Metric::Cosine, Metric::L2Sq, Metric::NegDot] {
let s = metric_to_str(m);
let back = metric_from_str(s).expect("known metric round-trips");
assert_eq!(back, m, "{m:?} did not survive the string round-trip");
}
assert_eq!(metric_to_str(Metric::Cosine), "cosine");
assert_eq!(metric_to_str(Metric::L2Sq), "l2sq");
assert_eq!(metric_to_str(Metric::NegDot), "negdot");
assert!(matches!(
metric_from_str("euclidean"),
Err(InfinoError::Backend(_))
));
}
#[test]
fn storage_duplicate_create_is_already_exists() {
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
let conn = connect(&uri).expect("connect");
conn.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("first create");
let again = conn.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"));
assert!(matches!(again, Err(InfinoError::AlreadyExists(_))));
}
#[test]
fn query_sql_dedups_repeated_table_reference() {
let conn = connect("memory://").expect("connect");
let docs = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create docs");
docs.append(&build_title_batch(&["alpha", "beta"]))
.expect("append");
let rows: usize = conn
.query_sql("SELECT a.title FROM docs a JOIN docs b ON a._id = b._id")
.expect("self-join resolves the repeated reference once")
.iter()
.map(|b| b.num_rows())
.sum();
assert_eq!(rows, 2, "self-join on _id pairs each row with itself");
}
#[test]
fn localfs_persists_across_reconnect() {
let dir = tempfile::tempdir().expect("tempdir");
let uri = dir.path().to_str().expect("utf8 path").to_string();
{
let conn = connect(&uri).expect("connect");
let table = conn
.create_table("docs", schema_id_title(), IndexSpec::new().fts("title"))
.expect("create_table");
table
.append(&build_title_batch(&["a lazy sleeping fox"]))
.expect("append");
}
let conn = connect(&uri).expect("reconnect");
assert_eq!(conn.list_tables().expect("list"), vec!["docs".to_string()]);
let table = conn.open_table("docs").expect("open_table");
let hits = table
.bm25_search("title", "fox", TOP_K, BoolMode::Or, None)
.expect("bm25_search");
assert_eq!(
n_rows(&hits),
1,
"expected the persisted doc to be searchable"
);
}
}