use std::sync::Arc;
use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use lance_index::{IndexParams, IndexType, PrewarmOptions, optimize::OptimizeOptions};
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use uuid::Uuid;
use crate::{Error, Result};
#[derive(Debug, Clone, PartialEq)]
pub struct IndexSegment {
uuid: Uuid,
fragment_bitmap: RoaringBitmap,
index_details: Arc<prost_types::Any>,
index_version: i32,
}
impl IndexSegment {
pub fn new<I>(
uuid: Uuid,
fragment_bitmap: I,
index_details: Arc<prost_types::Any>,
index_version: i32,
) -> Self
where
I: IntoIterator<Item = u32>,
{
Self {
uuid,
fragment_bitmap: fragment_bitmap.into_iter().collect(),
index_details,
index_version,
}
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn fragment_bitmap(&self) -> &RoaringBitmap {
&self.fragment_bitmap
}
pub fn index_details(&self) -> &Arc<prost_types::Any> {
&self.index_details
}
pub fn index_version(&self) -> i32 {
self.index_version
}
pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc<prost_types::Any>, i32) {
(
self.uuid,
self.fragment_bitmap,
self.index_details,
self.index_version,
)
}
}
pub trait IntoIndexSegment {
fn into_index_segment(self) -> Result<IndexSegment>;
}
impl IntoIndexSegment for IndexSegment {
fn into_index_segment(self) -> Result<IndexSegment> {
Ok(self)
}
}
impl IntoIndexSegment for IndexMetadata {
fn into_index_segment(self) -> Result<IndexSegment> {
let fragment_bitmap = self.fragment_bitmap.ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing fragment coverage",
self.uuid
))
})?;
let index_details = self.index_details.ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing index details",
self.uuid
))
})?;
Ok(IndexSegment::new(
self.uuid,
fragment_bitmap.iter(),
index_details,
self.index_version,
))
}
}
#[async_trait]
pub trait DatasetIndexExt {
type IndexBuilder<'a>
where
Self: 'a;
fn create_index_builder<'a>(
&'a mut self,
columns: &'a [&'a str],
index_type: IndexType,
params: &'a dyn IndexParams,
) -> Self::IndexBuilder<'a>;
async fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<IndexMetadata>;
async fn drop_index(&mut self, name: &str) -> Result<()>;
async fn prewarm_index(&self, name: &str) -> Result<()>;
async fn prewarm_index_with_options(
&self,
_name: &str,
_options: &PrewarmOptions,
) -> Result<()> {
Err(Error::not_supported(
"prewarm options are not supported by this dataset implementation".to_owned(),
))
}
async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>>;
async fn load_index(&self, uuid: &Uuid) -> Result<Option<IndexMetadata>> {
self.load_indices()
.await
.map(|indices| indices.iter().find(|idx| idx.uuid == *uuid).cloned())
}
async fn load_indices_by_name(&self, name: &str) -> Result<Vec<IndexMetadata>> {
self.load_indices().await.map(|indices| {
indices
.iter()
.filter(|idx| idx.name == name)
.cloned()
.collect()
})
}
async fn load_index_by_name(&self, name: &str) -> Result<Option<IndexMetadata>> {
let indices = self.load_indices_by_name(name).await?;
if indices.is_empty() {
Ok(None)
} else if indices.len() == 1 {
Ok(Some(indices[0].clone()))
} else {
Err(Error::index(format!(
"Found multiple indices of the same name: {:?}, please use load_indices_by_name",
indices.iter().map(|idx| &idx.name).collect::<Vec<_>>()
)))
}
}
async fn describe_indices<'a, 'b>(
&'a self,
criteria: Option<lance_index::IndexCriteria<'b>>,
) -> Result<Vec<Arc<dyn lance_index::IndexDescription>>>;
async fn load_scalar_index<'a, 'b>(
&'a self,
criteria: lance_index::IndexCriteria<'b>,
) -> Result<Option<IndexMetadata>>;
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>;
async fn index_statistics(&self, index_name: &str) -> Result<String>;
async fn merge_existing_index_segments(
&self,
source_segments: Vec<IndexMetadata>,
) -> Result<IndexMetadata>;
async fn commit_existing_index_segments(
&mut self,
index_name: &str,
column: &str,
segments: Vec<impl IntoIndexSegment + Send>,
) -> Result<()>;
async fn read_index_partition(
&self,
index_name: &str,
partition_id: usize,
with_vector: bool,
) -> Result<SendableRecordBatchStream>;
}