use anyhow::Result;
use std::sync::Arc;
use arrow_array::{Array, RecordBatch};
use arrow_schema::Schema;
use futures::TryStreamExt;
use lancedb::{
index::{scalar::FtsIndexBuilder, Index, IndexType},
query::{ExecutableQuery, QueryBase, Select},
Connection,
};
pub struct TableOperations<'a> {
pub db: &'a Connection,
}
impl<'a> TableOperations<'a> {
pub fn new(db: &'a Connection) -> Self {
Self { db }
}
pub async fn table_exists(&self, table_name: &str) -> Result<bool> {
let table_names = self.db.table_names().execute().await?;
Ok(table_names.contains(&table_name.to_string()))
}
pub async fn tables_exist(&self, table_names: &[&str]) -> Result<bool> {
let existing_tables = self.db.table_names().execute().await?;
for &table_name in table_names {
if !existing_tables.contains(&table_name.to_string()) {
return Ok(false);
}
}
Ok(true)
}
pub async fn clear_table(&self, table_name: &str) -> Result<()> {
let table_names = self.db.table_names().execute().await?;
if table_names.contains(&table_name.to_string()) {
if let Err(e) = self.db.drop_table(table_name, &[]).await {
tracing::warn!("Failed to drop {} table: {}", table_name, e);
} else {
tracing::debug!("Dropped table: {}", table_name);
}
} else {
tracing::debug!("Table {} does not exist, skipping.", table_name);
}
Ok(())
}
pub async fn clear_tables(&self, table_names: &[&str]) -> Result<()> {
for &table_name in table_names {
self.clear_table(table_name).await?;
}
Ok(())
}
pub async fn clear_all_tables(&self) -> Result<()> {
let table_names = self.db.table_names().execute().await?;
for table_name in table_names {
if let Err(e) = self.db.drop_table(&table_name, &[]).await {
tracing::warn!("Failed to drop table {}: {}", table_name, e);
} else {
tracing::debug!("Dropped table: {}", table_name);
}
}
Ok(())
}
pub async fn flush_all_tables(&self) -> Result<()> {
let table_names = self.db.table_names().execute().await?;
for table_name in table_names {
let table = self.db.open_table(&table_name).execute().await?;
let row_count = table.count_rows(None).await?;
if row_count > 0 {
let _ = table.schema().await?;
}
if cfg!(debug_assertions) {
tracing::debug!("Flushed table '{}' with {} rows", table_name, row_count);
}
}
Ok(())
}
pub async fn content_exists(&self, hash: &str, collection: &str) -> Result<bool> {
let table = self.db.open_table(collection).execute().await?;
let mut results = table
.query()
.only_if(format!("hash = '{}'", hash))
.limit(1) .select(Select::Columns(vec!["hash".to_string()])) .execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
return Ok(true);
}
}
Ok(false)
}
pub async fn remove_blocks_by_path(&self, file_path: &str, table_name: &str) -> Result<usize> {
if !self.table_exists(table_name).await? {
return Ok(0);
}
let table = self.db.open_table(table_name).execute().await?;
let before_count = table.count_rows(None).await?;
table
.delete(&format!("path = '{}'", file_path))
.await
.map_err(|e| anyhow::anyhow!("Failed to delete from {}: {}", table_name, e))?;
let after_count = table.count_rows(None).await?;
let deleted_count = before_count.saturating_sub(after_count);
Ok(deleted_count)
}
pub async fn remove_blocks_by_hashes(&self, hashes: &[String], table_name: &str) -> Result<()> {
if hashes.is_empty() {
return Ok(());
}
if !self.table_exists(table_name).await? {
return Ok(());
}
let table = self.db.open_table(table_name).execute().await?;
let hash_filters: Vec<String> = hashes.iter().map(|h| format!("hash = '{}'", h)).collect();
let filter = hash_filters.join(" OR ");
table
.delete(&filter)
.await
.map_err(|e| anyhow::anyhow!("Failed to delete from {}: {}", table_name, e))?;
Ok(())
}
pub async fn get_file_blocks_metadata(
&self,
file_path: &str,
table_name: &str,
) -> Result<Vec<String>> {
let mut hashes = Vec::new();
if !self.table_exists(table_name).await? {
return Ok(hashes);
}
let table = self.db.open_table(table_name).execute().await?;
let mut results = table
.query()
.only_if(format!("path = '{}'", file_path))
.select(Select::Columns(vec!["hash".to_string()]))
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("hash") {
if let Some(hash_array) =
column.as_any().downcast_ref::<arrow::array::StringArray>()
{
for i in 0..hash_array.len() {
hashes.push(hash_array.value(i).to_string());
}
}
}
}
}
Ok(hashes)
}
pub async fn get_all_indexed_file_paths(
&self,
table_names: &[&str],
) -> Result<std::collections::HashSet<String>> {
let mut all_paths = std::collections::HashSet::new();
let existing_tables = self.db.table_names().execute().await?;
for &table_name in table_names {
if existing_tables.contains(&table_name.to_string()) {
let table = self.db.open_table(table_name).execute().await?;
let mut results = table
.query()
.select(Select::Columns(vec!["path".to_string()]))
.execute()
.await?;
while let Some(batch) = results.try_next().await? {
if batch.num_rows() > 0 {
if let Some(column) = batch.column_by_name("path") {
if let Some(path_array) =
column.as_any().downcast_ref::<arrow::array::StringArray>()
{
for i in 0..path_array.len() {
all_paths.insert(path_array.value(i).to_string());
}
}
}
}
}
}
}
Ok(all_paths)
}
pub async fn create_table_with_schema(
&self,
table_name: &str,
schema: Arc<Schema>,
) -> Result<()> {
let _table = self
.db
.create_empty_table(table_name, schema)
.execute()
.await?;
Ok(())
}
pub async fn store_batch(&self, table_name: &str, batch: RecordBatch) -> Result<()> {
if self.table_exists(table_name).await? {
let table = self.db.open_table(table_name).execute().await?;
use std::iter::once;
let batches = once(Ok(batch.clone()));
let batch_reader =
arrow::record_batch::RecordBatchIterator::new(batches, batch.schema());
table.add(batch_reader).execute().await?;
} else {
use std::iter::once;
let batches = once(Ok(batch.clone()));
let batch_reader =
arrow::record_batch::RecordBatchIterator::new(batches, batch.schema());
let _table = self
.db
.create_table(table_name, batch_reader)
.execute()
.await?;
}
Ok(())
}
pub async fn create_vector_index_optimized(
&self,
table_name: &str,
column_name: &str,
vector_dimension: usize,
use_quantization: bool,
) -> Result<()> {
if !self.table_exists(table_name).await? {
return Err(anyhow::anyhow!("Table {} does not exist", table_name));
}
let table = self.db.open_table(table_name).execute().await?;
let row_count = table.count_rows(None).await?;
let index_params = super::vector_optimizer::VectorOptimizer::calculate_index_params(
row_count,
vector_dimension,
use_quantization,
);
if !index_params.should_create_index {
tracing::debug!(
"Skipping index creation for table '{}' with {} rows - brute force search will be faster",
table_name, row_count
);
return Ok(());
}
let existing_indices = table.list_indices().await?;
let has_embedding_index = existing_indices
.iter()
.any(|idx| idx.columns == vec![column_name]);
if has_embedding_index {
tracing::debug!(
"Vector index already exists for table '{}' with {} rows. Consider recreating if dataset grew significantly.",
table_name, row_count
);
return Ok(());
}
let index_type_name = match index_params.index_type {
super::vector_optimizer::IndexType::IvfHnswSq => "IVF_HNSW_SQ",
super::vector_optimizer::IndexType::IvfRq => "IVF_RQ",
};
tracing::info!(
"Creating {} vector index for table '{}': {} rows, {} partitions",
index_type_name,
table_name,
row_count,
index_params.num_partitions
);
let start_time = std::time::Instant::now();
match index_params.index_type {
super::vector_optimizer::IndexType::IvfHnswSq => {
table
.create_index(
&[column_name],
lancedb::index::Index::IvfHnswSq(
lancedb::index::vector::IvfHnswSqIndexBuilder::default()
.distance_type(index_params.distance_type)
.num_partitions(index_params.num_partitions)
.num_edges(index_params.num_edges)
.ef_construction(index_params.ef_construction),
),
)
.execute()
.await?;
}
super::vector_optimizer::IndexType::IvfRq => {
table
.create_index(
&[column_name],
lancedb::index::Index::IvfRq(
lancedb::index::vector::IvfRqIndexBuilder::default()
.distance_type(index_params.distance_type)
.num_partitions(index_params.num_partitions),
),
)
.execute()
.await?;
}
}
let duration = start_time.elapsed();
tracing::info!(
"Successfully created {} index for table '{}' in {:.2}s",
index_type_name,
table_name,
duration.as_secs_f64()
);
Ok(())
}
pub async fn recreate_vector_index_optimized(
&self,
table_name: &str,
column_name: &str,
vector_dimension: usize,
use_quantization: bool,
) -> Result<()> {
if !self.table_exists(table_name).await? {
return Err(anyhow::anyhow!("Table {} does not exist", table_name));
}
let table = self.db.open_table(table_name).execute().await?;
let row_count = table.count_rows(None).await?;
tracing::info!(
"Recreating vector index for table '{}' with {} rows for better performance",
table_name,
row_count
);
let existing_indices = table.list_indices().await?;
for index in existing_indices {
if index.columns == vec![column_name] {
tracing::debug!("Dropping existing index: {}", index.name);
break;
}
}
let index_params = super::vector_optimizer::VectorOptimizer::calculate_index_params(
row_count,
vector_dimension,
use_quantization,
);
if !index_params.should_create_index {
tracing::warn!("Dataset size no longer warrants an index, skipping recreation");
return Ok(());
}
let index_type_name = match index_params.index_type {
super::vector_optimizer::IndexType::IvfHnswSq => "IVF_HNSW_SQ",
super::vector_optimizer::IndexType::IvfRq => "IVF_RQ",
};
let start_time = std::time::Instant::now();
match index_params.index_type {
super::vector_optimizer::IndexType::IvfHnswSq => {
table
.create_index(
&[column_name],
lancedb::index::Index::IvfHnswSq(
lancedb::index::vector::IvfHnswSqIndexBuilder::default()
.distance_type(index_params.distance_type)
.num_partitions(index_params.num_partitions)
.num_edges(index_params.num_edges)
.ef_construction(index_params.ef_construction),
),
)
.execute()
.await?;
}
super::vector_optimizer::IndexType::IvfRq => {
table
.create_index(
&[column_name],
lancedb::index::Index::IvfRq(
lancedb::index::vector::IvfRqIndexBuilder::default()
.distance_type(index_params.distance_type)
.num_partitions(index_params.num_partitions),
),
)
.execute()
.await?;
}
}
let duration = start_time.elapsed();
tracing::info!(
"Successfully recreated {} index for table '{}' in {:.2}s - {} partitions",
index_type_name,
table_name,
duration.as_secs_f64(),
index_params.num_partitions
);
Ok(())
}
pub async fn create_fts_index(&self, table_name: &str) -> Result<()> {
if !self.table_exists(table_name).await? {
return Ok(());
}
let table = self.db.open_table(table_name).execute().await?;
let indices = table.list_indices().await?;
if indices.iter().any(|idx| idx.index_type == IndexType::FTS) {
tracing::debug!("FTS index already exists for table '{}'", table_name);
return Ok(());
}
let row_count = table.count_rows(None).await?;
if row_count == 0 {
tracing::debug!(
"Table '{}' is empty, skipping FTS index creation",
table_name
);
return Ok(());
}
tracing::info!(
"Creating FTS index for table '{}' ({} rows)",
table_name,
row_count
);
let start = std::time::Instant::now();
table
.create_index(&["content"], Index::FTS(FtsIndexBuilder::default()))
.execute()
.await
.map_err(|e| {
anyhow::anyhow!("Failed to create FTS index on '{}': {}", table_name, e)
})?;
tracing::info!(
"FTS index created for '{}' in {:.2}s",
table_name,
start.elapsed().as_secs_f64()
);
Ok(())
}
}