1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30 collections::HashMap,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52pub fn init_index_lag_threshold(value: usize) {
55 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59 INDEX_LAG_THRESHOLD_RUNTIME
60 .get()
61 .copied()
62 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65#[derive(Debug, Clone)]
68pub struct IndexIntent {
69 pub name: &'static str,
72 pub column: &'static str,
74 pub trigger: IndexTrigger,
76 pub params: IndexParamsKind,
79}
80
81#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84 OnAnyRows,
87 OnNonNullCount {
90 column: &'static str,
91 threshold: usize,
92 },
93}
94
95#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99 Scalar(BuiltinIndexType),
102 InvertedFtsNgram { min: u32, max: u32 },
106 IvfPqCosine {
111 sub_vectors: usize,
112 num_bits: u8,
113 max_iters: usize,
114 },
115}
116
117impl IndexTrigger {
118 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119 match self {
120 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121 Self::OnNonNullCount { column, threshold } => {
122 let count = dataset
123 .count_rows(Some(format!("{column} IS NOT NULL")))
124 .await?;
125 Ok(count >= *threshold)
126 }
127 }
128 }
129}
130
131impl IndexParamsKind {
132 fn index_type(&self) -> IndexType {
133 match self {
134 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135 Self::Scalar(_) => IndexType::BTree,
136 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137 Self::IvfPqCosine { .. } => IndexType::Vector,
138 }
139 }
140
141 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142 match self {
143 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145 InvertedIndexParams::default()
146 .base_tokenizer("ngram".to_owned())
147 .ngram_min_length(*min)
148 .ngram_max_length(*max)
149 .stem(false)
150 .remove_stop_words(false),
151 )),
152 Self::IvfPqCosine {
153 sub_vectors,
154 num_bits,
155 max_iters,
156 } => {
157 let count = dataset
158 .count_rows(Some("vector IS NOT NULL".to_owned()))
159 .await?;
160 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161 Ok(Box::new(VectorIndexParams::ivf_pq(
162 partitions,
163 *num_bits,
164 *sub_vectors,
165 MetricType::Cosine,
166 *max_iters,
167 )))
168 }
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175 pub table: Table,
176 pub intent_name: String,
177 pub fragments_covered: usize,
178 pub unindexed_fragments: usize,
179 pub unindexed_rows: usize,
180 pub exists: bool,
181}
182
183#[derive(Debug, Clone, Copy)]
188pub struct ConflictExhausted {
189 pub attempts: u8,
190}
191
192impl std::fmt::Display for ConflictExhausted {
193 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 write!(
195 formatter,
196 "commit conflict exhausted after {} attempt(s)",
197 self.attempts
198 )
199 }
200}
201
202impl std::error::Error for ConflictExhausted {}
203
204#[derive(Debug)]
209pub enum PhaseOutcome {
210 Ok,
212 Noop,
214 SkippedConflict,
217 Failed(anyhow::Error),
219 NotAttempted,
222}
223
224impl PhaseOutcome {
225 pub fn is_failed(&self) -> bool {
226 matches!(self, Self::Failed(_))
227 }
228}
229
230#[derive(Debug)]
232pub struct TableOptimizeOutcome {
233 pub table: Table,
234 pub indices: PhaseOutcome,
235 pub compaction: PhaseOutcome,
236}
237
238#[derive(Debug, Clone)]
241pub enum OptimizeEvent {
242 PhaseStart {
243 table: Table,
244 phase: OptimizePhase,
245 detail: Option<String>,
246 },
247 PhaseDone {
248 table: Table,
249 phase: OptimizePhase,
250 elapsed_ms: u64,
251 },
252}
253
254#[derive(Debug, Clone, Copy)]
255pub enum OptimizePhase {
256 Compact,
257 Cleanup,
258 IndexCreate,
259 IndexRebuild,
260 IndexAppend,
261}
262
263impl OptimizePhase {
264 pub fn label(self) -> &'static str {
265 match self {
266 Self::Compact => "compact",
267 Self::Cleanup => "cleanup",
268 Self::IndexCreate => "index-create",
269 Self::IndexRebuild => "index-rebuild",
270 Self::IndexAppend => "index-append",
271 }
272 }
273}
274
275pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
276
277fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
278 if let Some(callback) = progress {
279 callback(event);
280 }
281}
282
283pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
287 error.downcast_ref::<lance::Error>().is_some_and(|err| {
288 matches!(
289 err,
290 lance::Error::CommitConflict { .. }
291 | lance::Error::RetryableCommitConflict { .. }
292 | lance::Error::TooMuchWriteContention { .. }
293 )
294 })
295}
296
297fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
300 error.chain().any(|cause| cause.is::<ConflictExhausted>())
301}
302
303#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
307pub struct TableSizes {
308 pub sessions: u64,
309 pub messages: u64,
310 pub parts: u64,
311 pub other: u64,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub enum ScalarValue {
316 String(String),
317 Int32(i32),
318 Raw(String),
319}
320impl From<&str> for ScalarValue {
321 fn from(value: &str) -> Self {
322 Self::String(value.to_owned())
323 }
324}
325impl From<String> for ScalarValue {
326 fn from(value: String) -> Self {
327 Self::String(value)
328 }
329}
330impl From<i32> for ScalarValue {
331 fn from(value: i32) -> Self {
332 Self::Int32(value)
333 }
334}
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum Predicate {
337 Eq(&'static str, ScalarValue),
338 Ne(&'static str, ScalarValue),
339 IsNull(&'static str),
340 IsNotNull(&'static str),
341 In(&'static str, Vec<ScalarValue>),
342 LikeContains(&'static str, String),
343 Regex(&'static str, String),
348 Gte(&'static str, ScalarValue),
349 Lte(&'static str, ScalarValue),
350 And(Vec<Predicate>),
351 Or(Vec<Predicate>),
352 Not(Box<Predicate>),
353}
354impl Predicate {
355 pub fn to_lance(&self) -> String {
356 match self {
357 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
358 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
359 Self::IsNull(column) => format!("{column} IS NULL"),
360 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
361 Self::In(column, values) => {
362 let values = values
363 .iter()
364 .map(ScalarValue::to_lance)
365 .collect::<Vec<_>>()
366 .join(", ");
367 format!("{column} IN ({values})")
368 }
369 Self::LikeContains(column, value) => {
370 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
371 }
372 Self::Regex(column, pattern) => {
373 format!("regexp_like({column}, {})", quoted_string(pattern))
374 }
375 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
376 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
377 Self::And(predicates) => predicates
378 .iter()
379 .map(Self::to_lance)
380 .filter(|predicate| !predicate.is_empty())
381 .collect::<Vec<_>>()
382 .join(" AND "),
383 Self::Or(predicates) => {
384 let body = predicates
387 .iter()
388 .map(Self::to_lance)
389 .filter(|predicate| !predicate.is_empty())
390 .collect::<Vec<_>>()
391 .join(" OR ");
392 if body.is_empty() {
393 String::new()
394 } else {
395 format!("({body})")
396 }
397 }
398 Self::Not(inner) => {
399 let body = inner.to_lance();
400 if body.is_empty() {
401 String::new()
402 } else {
403 format!("NOT ({body})")
404 }
405 }
406 }
407 }
408}
409#[derive(Default)]
412pub struct ScanOpts<'a> {
413 pub predicate: Option<&'a Predicate>,
414 pub projection: Option<&'a [&'a str]>,
415}
416
417impl<'a> ScanOpts<'a> {
418 pub fn project_only(projection: &'a [&'a str]) -> Self {
419 Self {
420 predicate: None,
421 projection: Some(projection),
422 }
423 }
424 pub fn with_predicate_and_projection(
425 predicate: &'a Predicate,
426 projection: &'a [&'a str],
427 ) -> Self {
428 Self {
429 predicate: Some(predicate),
430 projection: Some(projection),
431 }
432 }
433}
434
435impl ScalarValue {
436 fn to_lance(&self) -> String {
437 match self {
438 Self::String(value) => quoted_string(value),
439 Self::Int32(value) => value.to_string(),
440 Self::Raw(value) => value.clone(),
441 }
442 }
443}
444#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
448pub struct RuntimeCaps {
449 pub index_cache_bytes: Option<usize>,
450 pub metadata_cache_bytes: Option<usize>,
451}
452
453impl RuntimeCaps {
454 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
455 Self {
456 index_cache_bytes: config.index_cache_bytes,
457 metadata_cache_bytes: config.metadata_cache_bytes,
458 }
459 }
460}
461
462const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
466const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
467const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
469const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
470
471fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
472 let (index_default, metadata_default) = if config::is_local(location) {
473 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
474 } else {
475 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
476 };
477 (
478 caps.index_cache_bytes.unwrap_or(index_default),
479 caps.metadata_cache_bytes.unwrap_or(metadata_default),
480 )
481}
482
483pub struct Handle {
484 datasets: DatasetSet,
485 retry: RetryPolicy,
486 #[allow(dead_code)]
494 session: Arc<Session>,
495 nm: Arc<dyn LanceNamespace>,
499 nm_ident: NamespaceIdent,
503 storage_options: HashMap<String, String>,
508 location: Url,
512 parts_refresh_after: Duration,
516}
517
518impl std::fmt::Debug for Handle {
519 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520 formatter
521 .debug_struct("Handle")
522 .field("datasets", &self.datasets)
523 .field("retry", &self.retry)
524 .field("nm_ident", &self.nm_ident)
525 .field("storage_options", &self.storage_options)
526 .field("location", &self.location)
527 .finish()
528 }
529}
530
531#[derive(Debug, Clone, Copy, PartialEq, Eq)]
532pub enum Table {
533 Sessions,
534 Messages,
535 Parts,
536}
537impl Table {
538 pub fn as_str(self) -> &'static str {
539 self.label()
540 }
541
542 fn label(self) -> &'static str {
543 match self {
544 Self::Sessions => "sessions",
545 Self::Messages => "messages",
546 Self::Parts => "parts",
547 }
548 }
549}
550#[derive(Debug)]
551struct DatasetSet {
552 sessions: Mutex<CachedDataset>,
553 messages: Mutex<CachedDataset>,
554 parts: OnceCell<Mutex<CachedDataset>>,
562}
563#[derive(Debug)]
564struct CachedDataset {
565 dataset: Dataset,
566 last_refresh: Instant,
567 refresh_after: Duration,
568}
569impl CachedDataset {
570 async fn latest(&mut self) -> Result<Dataset> {
571 if self.last_refresh.elapsed() >= self.refresh_after {
572 self.dataset.checkout_latest().await?;
573 self.last_refresh = Instant::now();
574 }
575 Ok(self.dataset.clone())
576 }
577 fn replace(&mut self, dataset: Dataset) {
578 self.dataset = dataset;
579 self.last_refresh = Instant::now();
580 }
581}
582impl Handle {
583 pub async fn open(location: &Url) -> Result<Self> {
586 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
587 }
588
589 pub async fn open_with_options(
595 location: &Url,
596 mut storage_options: HashMap<String, String>,
597 caps: RuntimeCaps,
598 ) -> Result<Self> {
599 if let Some(path) = config::local_path(location) {
600 tokio::fs::create_dir_all(&path)
601 .await
602 .with_context(|| format!("failed to create data dir {}", path.display()))?;
603 } else {
604 apply_remote_storage_defaults(&mut storage_options);
605 }
606 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
612 let session = Arc::new(Session::new(
613 index_cache_bytes,
614 metadata_cache_bytes,
615 Arc::new(ObjectStoreRegistry::default()),
616 ));
617 let root = location.as_str().trim_end_matches('/').to_string();
623 let mut connect = ConnectBuilder::new("dir")
624 .property("root", root)
625 .session(session.clone());
626 for (key, value) in &storage_options {
630 connect = connect.property(format!("storage.{key}"), value.clone());
631 }
632 let nm: Arc<dyn LanceNamespace> = connect
633 .connect()
634 .await
635 .context("failed to connect lance Directory namespace")?;
636 let nm_ident = NamespaceIdent::root();
637 let refresh_after = if config::is_local(location) {
643 Duration::ZERO
644 } else {
645 Duration::from_secs(5)
646 };
647 let handle = Self {
648 datasets: DatasetSet {
649 sessions: Mutex::new(CachedDataset {
650 dataset: open_or_create_via_ns(
651 &nm,
652 &nm_ident,
653 sessions::SESSIONS,
654 sessions::session_schema(),
655 &session,
656 &storage_options,
657 )
658 .await?,
659 last_refresh: Instant::now(),
660 refresh_after,
661 }),
662 messages: Mutex::new(CachedDataset {
663 dataset: open_or_create_via_ns(
664 &nm,
665 &nm_ident,
666 sessions::MESSAGES,
667 sessions::message_schema(),
668 &session,
669 &storage_options,
670 )
671 .await?,
672 last_refresh: Instant::now(),
673 refresh_after,
674 }),
675 parts: OnceCell::new(),
676 },
677 retry: RetryPolicy::default(),
678 session,
679 nm,
680 nm_ident,
681 storage_options,
682 location: location.clone(),
683 parts_refresh_after: refresh_after,
684 };
685 Ok(handle)
686 }
687
688 pub fn location(&self) -> &Url {
689 &self.location
690 }
691
692 pub fn storage_options(&self) -> &HashMap<String, String> {
696 &self.storage_options
697 }
698
699 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
700 Ok((
701 self.count_rows(Table::Sessions).await?,
702 self.count_rows(Table::Messages).await?,
703 self.count_rows(Table::Parts).await?,
704 ))
705 }
706
707 pub(crate) async fn merge_insert(
711 &self,
712 table: Table,
713 batch: RecordBatch,
714 row_count: usize,
715 ) -> Result<u64> {
716 self.merge(
717 table,
718 batch,
719 row_count,
720 "merge_insert",
721 WhenMatched::DoNothing,
722 WhenNotMatched::InsertAll,
723 )
724 .await
725 }
726
727 pub(crate) async fn merge_update(
730 &self,
731 table: Table,
732 batch: RecordBatch,
733 row_count: usize,
734 ) -> Result<u64> {
735 self.merge(
736 table,
737 batch,
738 row_count,
739 "merge_update",
740 WhenMatched::UpdateAll,
741 WhenNotMatched::DoNothing,
742 )
743 .await
744 }
745
746 async fn merge(
750 &self,
751 table: Table,
752 batch: RecordBatch,
753 row_count: usize,
754 op: &'static str,
755 when_matched: WhenMatched,
756 when_not_matched: WhenNotMatched,
757 ) -> Result<u64> {
758 if row_count == 0 {
759 return Ok(0);
760 }
761 let started = Instant::now();
762 let result = self
763 .retry_lance(table.label(), || async {
764 let mut cached = self.cached(table).await?.lock().await;
765 let existing = cached.latest().await?;
766 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
767 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
768 builder.when_matched(when_matched.clone());
769 builder.when_not_matched(when_not_matched.clone());
770 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
773 builder.skip_auto_cleanup(true);
777 let (dataset, stats) = builder
778 .try_build()?
779 .execute_reader(Box::new(reader))
780 .await?;
781 cached.replace(dataset.as_ref().clone());
782 Ok((
783 stats.num_inserted_rows + stats.num_updated_rows,
784 stats.num_skipped_duplicates,
785 ))
786 })
787 .await;
788 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
789 tracing::info!(
790 target: "pond::perf",
791 op,
792 table = %table.label(),
793 rows = row_count,
794 elapsed_ms = started.elapsed().as_millis() as u64,
795 skipped,
796 "merge",
797 );
798 result.map(|(affected, _)| affected)
799 }
800
801 pub async fn optimize_table(
810 &self,
811 table: Table,
812 intents: &[IndexIntent],
813 progress: Option<&OptimizeProgressFn>,
814 cleanup: crate::sessions::CleanupConfig,
815 ) -> TableOptimizeOutcome {
816 let compaction = self
817 .run_optimize_compact_phase(table, progress, cleanup)
818 .await;
819 let indices = self
820 .run_optimize_indices_phase(table, intents, progress)
821 .await;
822 TableOptimizeOutcome {
823 table,
824 indices,
825 compaction,
826 }
827 }
828
829 pub async fn optimize_table_indices_only(
833 &self,
834 table: Table,
835 intents: &[IndexIntent],
836 progress: Option<&OptimizeProgressFn>,
837 ) -> PhaseOutcome {
838 self.run_optimize_indices_phase(table, intents, progress)
839 .await
840 }
841
842 async fn run_optimize_indices_phase(
843 &self,
844 table: Table,
845 intents: &[IndexIntent],
846 progress: Option<&OptimizeProgressFn>,
847 ) -> PhaseOutcome {
848 if intents.is_empty() {
849 return PhaseOutcome::Noop;
850 }
851 let result = self
852 .retry_lance(table.label(), || async {
853 let mut guard = self.cached(table).await?.lock().await;
854 let mut dataset = guard.latest().await?;
855 let did_work =
856 optimize_table_indices(&mut dataset, intents, table, progress).await?;
857 guard.replace(dataset);
858 Ok::<_, anyhow::Error>(did_work)
859 })
860 .await;
861 match result {
862 Ok(true) => PhaseOutcome::Ok,
863 Ok(false) => PhaseOutcome::Noop,
864 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
865 Err(error) => PhaseOutcome::Failed(error),
866 }
867 }
868
869 async fn run_optimize_compact_phase(
870 &self,
871 table: Table,
872 progress: Option<&OptimizeProgressFn>,
873 cleanup: crate::sessions::CleanupConfig,
874 ) -> PhaseOutcome {
875 let result = self
876 .retry_lance(table.label(), || async {
877 let mut guard = self.cached(table).await?.lock().await;
878 let mut dataset = guard.latest().await?;
879 optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
880 guard.replace(dataset);
881 Ok::<_, anyhow::Error>(())
882 })
883 .await;
884 match result {
885 Ok(()) => PhaseOutcome::Ok,
886 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
887 Err(error) => PhaseOutcome::Failed(error),
888 }
889 }
890
891 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
892 self.retry_lance(table.label(), || async {
893 let mut guard = self.cached(table).await?.lock().await;
894 let mut dataset = guard.latest().await?;
895 rebuild_index(&mut dataset, intent).await?;
896 guard.replace(dataset);
897 Ok(())
898 })
899 .await
900 }
901
902 pub async fn index_status(
903 &self,
904 table: Table,
905 intents: &[IndexIntent],
906 ) -> Result<Vec<IndexStatus>> {
907 let dataset = self.dataset(table).await?;
908 index_status(table, &dataset, intents).await
909 }
910
911 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
912 let mut cached = self.cached(table).await?.lock().await;
913 cached.latest().await
914 }
915 pub(crate) async fn scanner(
920 &self,
921 table: Table,
922 predicate: Option<&Predicate>,
923 ) -> Result<lance::dataset::scanner::Scanner> {
924 let dataset = self.dataset(table).await?;
925 scanner_with_prefilter(&dataset, predicate)
926 }
927 pub async fn scan(
930 &self,
931 table: Table,
932 opts: ScanOpts<'_>,
933 ) -> Result<lance::dataset::scanner::Scanner> {
934 let mut scanner = self.scanner(table, opts.predicate).await?;
935 if let Some(projection) = opts.projection {
936 scanner.project(projection)?;
937 }
938 Ok(scanner)
939 }
940 pub(crate) async fn scan_batch(
941 &self,
942 table: Table,
943 predicate: Option<&Predicate>,
944 projection: &[&str],
945 ) -> Result<RecordBatch> {
946 let opts = ScanOpts {
947 predicate,
948 projection: (!projection.is_empty()).then_some(projection),
949 };
950 self.scan(table, opts)
951 .await?
952 .try_into_batch()
953 .await
954 .context("scan failed")
955 }
956 pub async fn count_rows(&self, table: Table) -> Result<usize> {
957 self.dataset(table)
958 .await?
959 .count_rows(None)
960 .await
961 .map_err(Into::into)
962 }
963 #[cfg(test)]
965 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
966 let dataset = self.dataset(Table::Messages).await?;
967 let indices = dataset.load_indices().await?;
968 Ok(indices.iter().map(|index| index.name.clone()).collect())
969 }
970
971 pub(crate) async fn unindexed_row_count(
974 &self,
975 table: Table,
976 index_name: &str,
977 ) -> Result<usize> {
978 let dataset = self.dataset(table).await?;
979 let fragments = dataset
980 .unindexed_fragments(index_name)
981 .await
982 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
983 Ok(fragments
984 .iter()
985 .map(|fragment| fragment.num_rows().unwrap_or(0))
986 .sum())
987 }
988
989 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
995 let mut guard = self.cached(table).await?.lock().await;
996 let mut dataset = guard.latest().await?;
997 dataset
998 .drop_index(name)
999 .await
1000 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1001 guard.replace(dataset);
1002 Ok(())
1003 }
1004
1005 async fn table_location(&self, table_name: &str) -> Result<String> {
1008 let request = DescribeTableRequest {
1009 id: Some(self.nm_ident.as_table_id(table_name)),
1010 ..Default::default()
1011 };
1012 let response = self
1013 .nm
1014 .describe_table(request)
1015 .await
1016 .with_context(|| format!("failed to describe table {table_name}"))?;
1017 response
1018 .location
1019 .with_context(|| format!("namespace returned no location for table {table_name}"))
1020 }
1021
1022 pub async fn table_sizes(&self) -> Result<TableSizes> {
1026 let registry = Arc::new(ObjectStoreRegistry::default());
1027 let params = ObjectStoreParams {
1028 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1029 Arc::new(StorageOptionsAccessor::with_static_options(
1030 self.storage_options.clone(),
1031 ))
1032 }),
1033 ..Default::default()
1034 };
1035
1036 let sessions = self
1037 .listed_size(
1038 ®istry,
1039 ¶ms,
1040 &self.table_location(sessions::SESSIONS).await?,
1041 )
1042 .await?;
1043 let messages = self
1044 .listed_size(
1045 ®istry,
1046 ¶ms,
1047 &self.table_location(sessions::MESSAGES).await?,
1048 )
1049 .await?;
1050 let parts = self
1051 .listed_size(
1052 ®istry,
1053 ¶ms,
1054 &self.table_location(sessions::PARTS).await?,
1055 )
1056 .await?;
1057 let root_total = self
1060 .listed_size(®istry, ¶ms, self.location.as_str())
1061 .await?;
1062 let other = root_total.saturating_sub(sessions + messages + parts);
1063 Ok(TableSizes {
1064 sessions,
1065 messages,
1066 parts,
1067 other,
1068 })
1069 }
1070
1071 async fn listed_size(
1073 &self,
1074 registry: &Arc<ObjectStoreRegistry>,
1075 params: &ObjectStoreParams,
1076 uri: &str,
1077 ) -> Result<u64> {
1078 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1079 .await
1080 .with_context(|| format!("failed to open object store for {uri}"))?;
1081 let mut listing = store.list(Some(base));
1082 let mut total = 0u64;
1083 while let Some(meta) = listing.next().await {
1084 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1085 total += meta.size;
1086 }
1087 Ok(total)
1088 }
1089 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1090 match table {
1091 Table::Sessions => Ok(&self.datasets.sessions),
1092 Table::Messages => Ok(&self.datasets.messages),
1093 Table::Parts => self.parts_cached().await,
1094 }
1095 }
1096
1097 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1100 self.datasets
1101 .parts
1102 .get_or_try_init(|| async {
1103 let dataset = open_or_create_via_ns(
1104 &self.nm,
1105 &self.nm_ident,
1106 sessions::PARTS,
1107 sessions::part_schema(),
1108 &self.session,
1109 &self.storage_options,
1110 )
1111 .await?;
1112 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1113 dataset,
1114 last_refresh: Instant::now(),
1115 refresh_after: self.parts_refresh_after,
1116 }))
1117 })
1118 .await
1119 }
1120 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1121 where
1122 Fut: std::future::Future<Output = Result<T>>,
1123 Op: FnMut() -> Fut,
1124 {
1125 let mut attempt = 0u8;
1126 loop {
1127 attempt = attempt.saturating_add(1);
1128 match operation().await {
1129 Ok(value) => return Ok(value),
1130 Err(error) if attempt < self.retry.attempts => {
1131 let backoff = self.backoff(attempt);
1132 let error_chain = format!("{error:#}");
1135 tracing::warn!(
1136 label,
1137 attempt,
1138 ?backoff,
1139 error = %error_chain,
1140 "retrying Lance operation"
1141 );
1142 tokio::time::sleep(backoff).await;
1143 }
1144 Err(error) => {
1145 let error_chain = format!("{error:#}");
1146 tracing::warn!(
1147 label,
1148 attempt,
1149 error = %error_chain,
1150 "Lance operation exhausted retries"
1151 );
1152 if is_commit_conflict(&error) {
1159 return Err(error.context(ConflictExhausted { attempts: attempt }));
1160 }
1161 return Err(error);
1162 }
1163 }
1164 }
1165 }
1166 fn backoff(&self, attempt: u8) -> Duration {
1167 let shift = u32::from(attempt.saturating_sub(1));
1168 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1169 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1170 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1173 base.mul_f64(factor).min(self.retry.max_backoff)
1174 }
1175}
1176async fn optimize_table_compact(
1191 dataset: &mut Dataset,
1192 table: Table,
1193 progress: Option<&OptimizeProgressFn>,
1194 cleanup: crate::sessions::CleanupConfig,
1195) -> Result<()> {
1196 let compaction = CompactionOptions {
1197 defer_index_remap: false,
1198 ..CompactionOptions::default()
1199 };
1200
1201 emit(
1202 progress,
1203 OptimizeEvent::PhaseStart {
1204 table,
1205 phase: OptimizePhase::Compact,
1206 detail: None,
1207 },
1208 );
1209 let started = Instant::now();
1210 compact_files(dataset, compaction, None).await?;
1211 emit(
1212 progress,
1213 OptimizeEvent::PhaseDone {
1214 table,
1215 phase: OptimizePhase::Compact,
1216 elapsed_ms: started.elapsed().as_millis() as u64,
1217 },
1218 );
1219
1220 emit(
1221 progress,
1222 OptimizeEvent::PhaseStart {
1223 table,
1224 phase: OptimizePhase::Cleanup,
1225 detail: None,
1226 },
1227 );
1228 let started = Instant::now();
1229 dataset
1234 .cleanup_old_versions(
1235 cleanup.older_than,
1236 Some(cleanup.delete_unverified),
1237 Some(false),
1238 )
1239 .await
1240 .context("cleanup_old_versions failed during index optimize")?;
1241 emit(
1242 progress,
1243 OptimizeEvent::PhaseDone {
1244 table,
1245 phase: OptimizePhase::Cleanup,
1246 elapsed_ms: started.elapsed().as_millis() as u64,
1247 },
1248 );
1249
1250 Ok(())
1251}
1252
1253async fn optimize_table_indices(
1256 dataset: &mut Dataset,
1257 intents: &[IndexIntent],
1258 table: Table,
1259 progress: Option<&OptimizeProgressFn>,
1260) -> Result<bool> {
1261 let existing = dataset.load_indices().await?;
1262 let existing_names: std::collections::HashSet<String> =
1263 existing.iter().map(|index| index.name.clone()).collect();
1264
1265 let mut append_indices: Vec<String> = Vec::new();
1266 let mut did_work = false;
1267
1268 for intent in intents {
1269 let exists = existing_names.contains(intent.name);
1270
1271 if !exists {
1272 if !intent.trigger.should_create(dataset).await? {
1273 continue;
1274 }
1275 let params = intent.params.build(dataset).await?;
1276 let index_type = intent.params.index_type();
1277 tracing::info!(
1278 index = intent.name,
1279 column = intent.column,
1280 "creating Lance index (trigger fired)",
1281 );
1282 emit(
1283 progress,
1284 OptimizeEvent::PhaseStart {
1285 table,
1286 phase: OptimizePhase::IndexCreate,
1287 detail: Some(intent.name.to_owned()),
1288 },
1289 );
1290 let started = Instant::now();
1291 dataset
1292 .create_index(
1293 &[intent.column],
1294 index_type,
1295 Some(intent.name.to_owned()),
1296 params.as_ref(),
1297 false,
1298 )
1299 .await
1300 .with_context(|| format!("failed to create index {}", intent.name))?;
1301 emit(
1302 progress,
1303 OptimizeEvent::PhaseDone {
1304 table,
1305 phase: OptimizePhase::IndexCreate,
1306 elapsed_ms: started.elapsed().as_millis() as u64,
1307 },
1308 );
1309 did_work = true;
1310 continue;
1311 }
1312
1313 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1314 if unindexed.is_empty() {
1315 continue;
1316 }
1317 if unindexed.len() < index_lag_threshold() {
1321 continue;
1322 }
1323 match intent.params {
1324 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1325 let params = intent.params.build(dataset).await?;
1326 let index_type = intent.params.index_type();
1327 tracing::debug!(
1328 target: "pond::perf",
1329 index = intent.name,
1330 column = intent.column,
1331 "rebuilding Lance BTree index",
1332 );
1333 emit(
1334 progress,
1335 OptimizeEvent::PhaseStart {
1336 table,
1337 phase: OptimizePhase::IndexRebuild,
1338 detail: Some(intent.name.to_owned()),
1339 },
1340 );
1341 let started = Instant::now();
1342 dataset
1343 .create_index(
1344 &[intent.column],
1345 index_type,
1346 Some(intent.name.to_owned()),
1347 params.as_ref(),
1348 true,
1349 )
1350 .await
1351 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1352 emit(
1353 progress,
1354 OptimizeEvent::PhaseDone {
1355 table,
1356 phase: OptimizePhase::IndexRebuild,
1357 elapsed_ms: started.elapsed().as_millis() as u64,
1358 },
1359 );
1360 did_work = true;
1361 }
1362 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1363 | IndexParamsKind::InvertedFtsNgram { .. }
1364 | IndexParamsKind::IvfPqCosine { .. } => {
1365 append_indices.push(intent.name.to_owned());
1366 }
1367 IndexParamsKind::Scalar(_) => {
1368 let params = intent.params.build(dataset).await?;
1369 emit(
1370 progress,
1371 OptimizeEvent::PhaseStart {
1372 table,
1373 phase: OptimizePhase::IndexRebuild,
1374 detail: Some(intent.name.to_owned()),
1375 },
1376 );
1377 let started = Instant::now();
1378 dataset
1379 .create_index(
1380 &[intent.column],
1381 intent.params.index_type(),
1382 Some(intent.name.to_owned()),
1383 params.as_ref(),
1384 true,
1385 )
1386 .await
1387 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1388 emit(
1389 progress,
1390 OptimizeEvent::PhaseDone {
1391 table,
1392 phase: OptimizePhase::IndexRebuild,
1393 elapsed_ms: started.elapsed().as_millis() as u64,
1394 },
1395 );
1396 did_work = true;
1397 }
1398 }
1399 }
1400
1401 if !append_indices.is_empty() {
1402 let to_append = append_indices.clone();
1403 emit(
1404 progress,
1405 OptimizeEvent::PhaseStart {
1406 table,
1407 phase: OptimizePhase::IndexAppend,
1408 detail: Some(append_indices.join(", ")),
1409 },
1410 );
1411 let started = Instant::now();
1412 dataset
1413 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1414 .await
1415 .context("optimize_indices(append) failed during index optimize")?;
1416 emit(
1417 progress,
1418 OptimizeEvent::PhaseDone {
1419 table,
1420 phase: OptimizePhase::IndexAppend,
1421 elapsed_ms: started.elapsed().as_millis() as u64,
1422 },
1423 );
1424 tracing::debug!(
1425 target: "pond::perf",
1426 indices = ?append_indices,
1427 "appended trailing fragments into indices",
1428 );
1429 did_work = true;
1430 }
1431
1432 Ok(did_work)
1433}
1434
1435async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1436 if !intent.trigger.should_create(dataset).await? {
1437 return Ok(());
1438 }
1439 let params = intent.params.build(dataset).await?;
1440 dataset
1441 .create_index(
1442 &[intent.column],
1443 intent.params.index_type(),
1444 Some(intent.name.to_owned()),
1445 params.as_ref(),
1446 true,
1447 )
1448 .await
1449 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1450 Ok(())
1451}
1452
1453async fn index_status(
1454 table: Table,
1455 dataset: &Dataset,
1456 intents: &[IndexIntent],
1457) -> Result<Vec<IndexStatus>> {
1458 let existing = dataset.load_indices().await?;
1459 let existing_names: std::collections::HashSet<String> =
1460 existing.iter().map(|index| index.name.clone()).collect();
1461 let total_fragments = dataset.get_fragments().len();
1462 let total_rows = dataset.count_rows(None).await?;
1463 let mut statuses = Vec::with_capacity(intents.len());
1464 for intent in intents {
1465 let exists = existing_names.contains(intent.name);
1466 if !exists {
1467 statuses.push(IndexStatus {
1468 table,
1469 intent_name: intent.name.to_owned(),
1470 fragments_covered: 0,
1471 unindexed_fragments: total_fragments,
1472 unindexed_rows: total_rows,
1473 exists,
1474 });
1475 continue;
1476 }
1477 let unindexed = dataset
1478 .unindexed_fragments(intent.name)
1479 .await
1480 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1481 let unindexed_fragments = unindexed.len();
1482 let unindexed_rows = unindexed
1483 .iter()
1484 .map(|fragment| fragment.num_rows().unwrap_or(0))
1485 .sum();
1486 statuses.push(IndexStatus {
1487 table,
1488 intent_name: intent.name.to_owned(),
1489 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1490 unindexed_fragments,
1491 unindexed_rows,
1492 exists,
1493 });
1494 }
1495 Ok(statuses)
1496}
1497
1498async fn open_or_create_via_ns(
1510 nm: &Arc<dyn LanceNamespace>,
1511 nm_ident: &NamespaceIdent,
1512 table_name: &str,
1513 schema: lance::deps::arrow_schema::SchemaRef,
1514 session: &Arc<Session>,
1515 storage_options: &HashMap<String, String>,
1516) -> Result<Dataset> {
1517 let table_id = nm_ident.as_table_id(table_name);
1518
1519 let request = DescribeTableRequest {
1520 id: Some(table_id.clone()),
1521 ..Default::default()
1522 };
1523 match nm.describe_table(request).await {
1524 Ok(response) => {
1525 let location = response.location.with_context(|| {
1526 format!("namespace returned no location for table {table_name}")
1527 })?;
1528 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1529 if !storage_options.is_empty() {
1530 builder = builder.with_storage_options(storage_options.clone());
1531 }
1532 let dataset = builder
1533 .load()
1534 .await
1535 .with_context(|| format!("failed to open table {table_name}"))?;
1536 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1537 return Ok(dataset);
1538 }
1539 Err(error) => match &error {
1540 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1541 }
1543 _ => {
1544 return Err(anyhow::Error::from(error))
1545 .with_context(|| format!("failed to describe table {table_name}"));
1546 }
1547 },
1548 }
1549
1550 let mut write_params = sessions::write_params_for_create();
1553 write_params.session = Some(session.clone());
1554 write_params.mode = WriteMode::Create;
1555 if !storage_options.is_empty() {
1556 write_params.store_params = Some(ObjectStoreParams {
1557 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1558 storage_options.clone(),
1559 ))),
1560 ..Default::default()
1561 });
1562 }
1563 let reader = sessions::empty_reader(schema)?;
1564 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1565 .await
1566 .with_context(|| format!("failed to create table {table_name}"))
1567}
1568
1569fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1573 if !matches!(error, lance::Error::Namespace { .. }) {
1574 return false;
1575 }
1576 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1577 link.source()
1578 })
1579 .filter_map(|link| link.downcast_ref::<NamespaceError>())
1580 .any(|inner| inner.code() == code)
1581}
1582
1583fn scanner_with_prefilter(
1584 dataset: &Dataset,
1585 predicate: Option<&Predicate>,
1586) -> Result<lance::dataset::scanner::Scanner> {
1587 let mut scanner = dataset.scan();
1588 scanner.prefilter(true);
1589 if let Some(predicate) = predicate {
1590 let filter = predicate.to_lance();
1591 if !filter.is_empty() {
1592 scanner.filter(&filter)?;
1593 }
1594 }
1595 Ok(scanner)
1596}
1597fn ensure_schema_matches(
1598 dataset: &Dataset,
1599 expected: &lance::deps::arrow_schema::Schema,
1600 table_name: &str,
1601) -> Result<()> {
1602 use lance::deps::arrow_schema::DataType;
1603 use std::collections::BTreeSet;
1604 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1605 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1606 let expected_names: BTreeSet<&str> = expected
1607 .fields()
1608 .iter()
1609 .map(|f| f.name().as_str())
1610 .collect();
1611 if actual_names != expected_names {
1612 anyhow::bail!(
1613 "table {table_name} has columns {actual_names:?} but this pond build expects \
1614 {expected_names:?} - the on-disk store predates a schema change; delete the \
1615 data directory and re-run `pond ingest`",
1616 );
1617 }
1618 for actual_field in actual.fields() {
1623 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1624 continue;
1625 };
1626 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1627 (actual_field.data_type(), expected_field.data_type())
1628 && actual_dim != expected_dim
1629 {
1630 tracing::warn!(
1631 table = table_name,
1632 column = actual_field.name(),
1633 actual_dim,
1634 expected_dim,
1635 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1636 );
1637 }
1638 }
1639 Ok(())
1640}
1641fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1648 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1649 if aliases
1650 .iter()
1651 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1652 {
1653 return;
1654 }
1655 options.insert(aliases[0].to_owned(), value.to_owned());
1656 }
1657 set_default(options, &["pool_idle_timeout"], "300 seconds");
1658 set_default(options, &["connect_timeout"], "10 seconds");
1659 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1660 .iter()
1661 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1662 if has_custom_endpoint {
1663 set_default(
1664 options,
1665 &["aws_unsigned_payload", "unsigned_payload"],
1666 "true",
1667 );
1668 }
1669}
1670
1671fn quoted_string(value: &str) -> String {
1672 format!("'{}'", value.replace('\'', "''"))
1673}
1674fn like_contains(value: &str) -> String {
1675 let escaped = value
1676 .replace('\\', "\\\\")
1677 .replace('%', "\\%")
1678 .replace('_', "\\_")
1679 .replace('\'', "''");
1680 format!("'%{escaped}%'")
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685 use super::*;
1686 use tempfile::TempDir;
1687
1688 #[test]
1689 fn namespace_error_code_walks_wrapped_chain() {
1690 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1691 message: "missing".into(),
1692 }));
1693 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1694
1695 let wrapped = lance::Error::namespace_source(Box::new(direct));
1696 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1697
1698 let other_code =
1699 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1700 message: "nope".into(),
1701 }));
1702 assert!(!is_namespace_error_code(
1703 &other_code,
1704 ErrorCode::TableNotFound
1705 ));
1706
1707 let not_namespace = lance::Error::internal("unrelated");
1708 assert!(!is_namespace_error_code(
1709 ¬_namespace,
1710 ErrorCode::TableNotFound
1711 ));
1712 }
1713
1714 #[tokio::test]
1718 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1719 let temp = TempDir::new()?;
1720 let url = Url::from_directory_path(temp.path())
1721 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1722 let handle = Handle::open(&url).await?;
1723 let cases: [(Table, &[&str]); 3] = [
1726 (Table::Sessions, &["id"]),
1727 (Table::Messages, &["id"]),
1728 (Table::Parts, &["id"]),
1729 ];
1730 for (table, projection) in cases {
1731 let scanner = handle
1732 .scan(table, ScanOpts::project_only(projection))
1733 .await?;
1734 let batch = scanner.try_into_batch().await?;
1735 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1736 }
1737 Ok(())
1738 }
1739}