use std::sync::Arc;
use async_trait::async_trait;
use datafusion::execution::SendableRecordBatchStream;
use lance_index::{IndexParams, IndexType, PrewarmOptions, optimize::OptimizeOptions};
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use uuid::Uuid;
use crate::{Error, Result};
#[derive(Debug, Clone, PartialEq)]
pub struct IndexSegment {
uuid: Uuid,
fragment_bitmap: RoaringBitmap,
index_details: Arc<prost_types::Any>,
index_version: i32,
}
impl IndexSegment {
pub fn new<I>(
uuid: Uuid,
fragment_bitmap: I,
index_details: Arc<prost_types::Any>,
index_version: i32,
) -> Self
where
I: IntoIterator<Item = u32>,
{
Self {
uuid,
fragment_bitmap: fragment_bitmap.into_iter().collect(),
index_details,
index_version,
}
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn fragment_bitmap(&self) -> &RoaringBitmap {
&self.fragment_bitmap
}
pub fn index_details(&self) -> &Arc<prost_types::Any> {
&self.index_details
}
pub fn index_version(&self) -> i32 {
self.index_version
}
pub fn into_parts(self) -> (Uuid, RoaringBitmap, Arc<prost_types::Any>, i32) {
(
self.uuid,
self.fragment_bitmap,
self.index_details,
self.index_version,
)
}
}
pub trait IntoIndexSegment {
fn into_index_segment(self) -> Result<IndexSegment>;
}
impl IntoIndexSegment for IndexSegment {
fn into_index_segment(self) -> Result<IndexSegment> {
Ok(self)
}
}
impl IntoIndexSegment for IndexMetadata {
fn into_index_segment(self) -> Result<IndexSegment> {
let fragment_bitmap = self.fragment_bitmap.ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing fragment coverage",
self.uuid
))
})?;
let index_details = self.index_details.ok_or_else(|| {
Error::invalid_input(format!(
"CreateIndex: segment {} is missing index details",
self.uuid
))
})?;
Ok(IndexSegment::new(
self.uuid,
fragment_bitmap.iter(),
index_details,
self.index_version,
))
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct IndexSegmentPlan {
segment: IndexSegment,
segments: Vec<IndexMetadata>,
estimated_bytes: u64,
requested_index_type: Option<IndexType>,
}
impl IndexSegmentPlan {
pub fn new(
segment: IndexSegment,
segments: Vec<IndexMetadata>,
estimated_bytes: u64,
requested_index_type: Option<IndexType>,
) -> Self {
Self {
segment,
segments,
estimated_bytes,
requested_index_type,
}
}
pub fn segment(&self) -> &IndexSegment {
&self.segment
}
pub fn segments(&self) -> &[IndexMetadata] {
&self.segments
}
pub fn estimated_bytes(&self) -> u64 {
self.estimated_bytes
}
pub fn requested_index_type(&self) -> Option<IndexType> {
self.requested_index_type
}
}
#[async_trait]
pub trait DatasetIndexExt {
type IndexBuilder<'a>
where
Self: 'a;
type IndexSegmentBuilder<'a>
where
Self: 'a;
fn create_index_builder<'a>(
&'a mut self,
columns: &'a [&'a str],
index_type: IndexType,
params: &'a dyn IndexParams,
) -> Self::IndexBuilder<'a>;
fn create_index_segment_builder<'a>(&'a self) -> Self::IndexSegmentBuilder<'a>;
async fn create_index(
&mut self,
columns: &[&str],
index_type: IndexType,
name: Option<String>,
params: &dyn IndexParams,
replace: bool,
) -> Result<IndexMetadata>;
async fn drop_index(&mut self, name: &str) -> Result<()>;
async fn prewarm_index(&self, name: &str) -> Result<()>;
async fn prewarm_index_with_options(
&self,
_name: &str,
_options: &PrewarmOptions,
) -> Result<()> {
Err(Error::not_supported(
"prewarm options are not supported by this dataset implementation".to_owned(),
))
}
async fn load_indices(&self) -> Result<Arc<Vec<IndexMetadata>>>;
async fn load_index(&self, uuid: &str) -> Result<Option<IndexMetadata>> {
self.load_indices().await.map(|indices| {
indices
.iter()
.find(|idx| idx.uuid.to_string() == uuid)
.cloned()
})
}
async fn load_indices_by_name(&self, name: &str) -> Result<Vec<IndexMetadata>> {
self.load_indices().await.map(|indices| {
indices
.iter()
.filter(|idx| idx.name == name)
.cloned()
.collect()
})
}
async fn load_index_by_name(&self, name: &str) -> Result<Option<IndexMetadata>> {
let indices = self.load_indices_by_name(name).await?;
if indices.is_empty() {
Ok(None)
} else if indices.len() == 1 {
Ok(Some(indices[0].clone()))
} else {
Err(Error::index(format!(
"Found multiple indices of the same name: {:?}, please use load_indices_by_name",
indices.iter().map(|idx| &idx.name).collect::<Vec<_>>()
)))
}
}
async fn describe_indices<'a, 'b>(
&'a self,
criteria: Option<lance_index::IndexCriteria<'b>>,
) -> Result<Vec<Arc<dyn lance_index::IndexDescription>>>;
async fn load_scalar_index<'a, 'b>(
&'a self,
criteria: lance_index::IndexCriteria<'b>,
) -> Result<Option<IndexMetadata>>;
async fn optimize_indices(&mut self, options: &OptimizeOptions) -> Result<()>;
async fn index_statistics(&self, index_name: &str) -> Result<String>;
async fn merge_existing_index_segments(
&self,
source_segments: Vec<IndexMetadata>,
) -> Result<IndexMetadata>;
async fn commit_existing_index_segments(
&mut self,
index_name: &str,
column: &str,
segments: Vec<impl IntoIndexSegment + Send>,
) -> Result<()>;
async fn read_index_partition(
&self,
index_name: &str,
partition_id: usize,
with_vector: bool,
) -> Result<SendableRecordBatchStream>;
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::{IndexSegment, IndexSegmentPlan};
use lance_index::IndexType;
use uuid::Uuid;
#[test]
fn test_index_segment_plan_accessors() {
let uuid = Uuid::new_v4();
let segment = IndexSegment::new(uuid, [1_u32, 3], Arc::new(prost_types::Any::default()), 7);
let plan = IndexSegmentPlan::new(segment.clone(), vec![], 128, Some(IndexType::BTree));
assert_eq!(segment.uuid(), uuid);
assert_eq!(
segment.fragment_bitmap().iter().collect::<Vec<_>>(),
vec![1, 3]
);
assert_eq!(segment.index_version(), 7);
assert_eq!(plan.segment().uuid(), uuid);
assert_eq!(plan.estimated_bytes(), 128);
assert_eq!(plan.requested_index_type(), Some(IndexType::BTree));
}
}