1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30 collections::HashMap,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52pub fn init_index_lag_threshold(value: usize) {
55 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59 INDEX_LAG_THRESHOLD_RUNTIME
60 .get()
61 .copied()
62 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65#[derive(Debug, Clone)]
68pub struct IndexIntent {
69 pub name: &'static str,
72 pub column: &'static str,
74 pub trigger: IndexTrigger,
76 pub params: IndexParamsKind,
79}
80
81#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84 OnAnyRows,
87 OnNonNullCount {
90 column: &'static str,
91 threshold: usize,
92 },
93}
94
95#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99 Scalar(BuiltinIndexType),
102 InvertedFtsNgram { min: u32, max: u32 },
106 IvfPqCosine {
111 sub_vectors: usize,
112 num_bits: u8,
113 max_iters: usize,
114 },
115}
116
117impl IndexTrigger {
118 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119 match self {
120 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121 Self::OnNonNullCount { column, threshold } => {
122 let count = dataset
123 .count_rows(Some(format!("{column} IS NOT NULL")))
124 .await?;
125 Ok(count >= *threshold)
126 }
127 }
128 }
129}
130
131impl IndexParamsKind {
132 fn index_type(&self) -> IndexType {
133 match self {
134 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135 Self::Scalar(_) => IndexType::BTree,
136 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137 Self::IvfPqCosine { .. } => IndexType::Vector,
138 }
139 }
140
141 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142 match self {
143 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145 InvertedIndexParams::default()
146 .base_tokenizer("ngram".to_owned())
147 .ngram_min_length(*min)
148 .ngram_max_length(*max)
149 .stem(false)
150 .remove_stop_words(false),
151 )),
152 Self::IvfPqCosine {
153 sub_vectors,
154 num_bits,
155 max_iters,
156 } => {
157 let count = dataset
158 .count_rows(Some("vector IS NOT NULL".to_owned()))
159 .await?;
160 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161 Ok(Box::new(VectorIndexParams::ivf_pq(
162 partitions,
163 *num_bits,
164 *sub_vectors,
165 MetricType::Cosine,
166 *max_iters,
167 )))
168 }
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175 pub table: Table,
176 pub intent_name: String,
177 pub fragments_covered: usize,
178 pub unindexed_fragments: usize,
179 pub unindexed_rows: usize,
180 pub exists: bool,
181}
182
183#[derive(Debug, Clone, Copy)]
188pub struct ConflictExhausted {
189 pub attempts: u8,
190}
191
192impl std::fmt::Display for ConflictExhausted {
193 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 write!(
195 formatter,
196 "commit conflict exhausted after {} attempt(s)",
197 self.attempts
198 )
199 }
200}
201
202impl std::error::Error for ConflictExhausted {}
203
204#[derive(Debug)]
209pub enum PhaseOutcome {
210 Ok,
212 Noop,
214 SkippedConflict,
217 Failed(anyhow::Error),
219 NotAttempted,
222}
223
224impl PhaseOutcome {
225 pub fn is_failed(&self) -> bool {
226 matches!(self, Self::Failed(_))
227 }
228}
229
230#[derive(Debug)]
232pub struct TableOptimizeOutcome {
233 pub table: Table,
234 pub indices: PhaseOutcome,
235 pub compaction: PhaseOutcome,
236}
237
238#[derive(Debug, Clone)]
241pub enum OptimizeEvent {
242 PhaseStart {
243 table: Table,
244 phase: OptimizePhase,
245 detail: Option<String>,
246 },
247 PhaseDone {
248 table: Table,
249 phase: OptimizePhase,
250 elapsed_ms: u64,
251 },
252}
253
254#[derive(Debug, Clone, Copy)]
255pub enum OptimizePhase {
256 Compact,
257 Cleanup,
258 IndexCreate,
259 IndexRebuild,
260 IndexAppend,
261}
262
263impl OptimizePhase {
264 pub fn label(self) -> &'static str {
265 match self {
266 Self::Compact => "compact",
267 Self::Cleanup => "cleanup",
268 Self::IndexCreate => "index-create",
269 Self::IndexRebuild => "index-rebuild",
270 Self::IndexAppend => "index-append",
271 }
272 }
273}
274
275pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
276
277fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
278 if let Some(callback) = progress {
279 callback(event);
280 }
281}
282
283pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
287 error.downcast_ref::<lance::Error>().is_some_and(|err| {
288 matches!(
289 err,
290 lance::Error::CommitConflict { .. }
291 | lance::Error::RetryableCommitConflict { .. }
292 | lance::Error::TooMuchWriteContention { .. }
293 )
294 })
295}
296
297fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
300 error.chain().any(|cause| cause.is::<ConflictExhausted>())
301}
302
303#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
307pub struct TableSizes {
308 pub sessions: u64,
309 pub messages: u64,
310 pub parts: u64,
311 pub other: u64,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub enum ScalarValue {
316 String(String),
317 Int32(i32),
318 Raw(String),
319}
320impl From<&str> for ScalarValue {
321 fn from(value: &str) -> Self {
322 Self::String(value.to_owned())
323 }
324}
325impl From<String> for ScalarValue {
326 fn from(value: String) -> Self {
327 Self::String(value)
328 }
329}
330impl From<i32> for ScalarValue {
331 fn from(value: i32) -> Self {
332 Self::Int32(value)
333 }
334}
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum Predicate {
337 Eq(&'static str, ScalarValue),
338 Ne(&'static str, ScalarValue),
339 IsNull(&'static str),
340 IsNotNull(&'static str),
341 In(&'static str, Vec<ScalarValue>),
342 LikeContains(&'static str, String),
343 Regex(&'static str, String),
348 Gte(&'static str, ScalarValue),
349 Lte(&'static str, ScalarValue),
350 And(Vec<Predicate>),
351 Or(Vec<Predicate>),
352}
353impl Predicate {
354 pub fn to_lance(&self) -> String {
355 match self {
356 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
357 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
358 Self::IsNull(column) => format!("{column} IS NULL"),
359 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
360 Self::In(column, values) => {
361 let values = values
362 .iter()
363 .map(ScalarValue::to_lance)
364 .collect::<Vec<_>>()
365 .join(", ");
366 format!("{column} IN ({values})")
367 }
368 Self::LikeContains(column, value) => {
369 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
370 }
371 Self::Regex(column, pattern) => {
372 format!("regexp_like({column}, {})", quoted_string(pattern))
373 }
374 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
375 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
376 Self::And(predicates) => predicates
377 .iter()
378 .map(Self::to_lance)
379 .filter(|predicate| !predicate.is_empty())
380 .collect::<Vec<_>>()
381 .join(" AND "),
382 Self::Or(predicates) => {
383 let body = predicates
386 .iter()
387 .map(Self::to_lance)
388 .filter(|predicate| !predicate.is_empty())
389 .collect::<Vec<_>>()
390 .join(" OR ");
391 if body.is_empty() {
392 String::new()
393 } else {
394 format!("({body})")
395 }
396 }
397 }
398 }
399}
400#[derive(Default)]
403pub struct ScanOpts<'a> {
404 pub predicate: Option<&'a Predicate>,
405 pub projection: Option<&'a [&'a str]>,
406}
407
408impl<'a> ScanOpts<'a> {
409 pub fn project_only(projection: &'a [&'a str]) -> Self {
410 Self {
411 predicate: None,
412 projection: Some(projection),
413 }
414 }
415 pub fn with_predicate_and_projection(
416 predicate: &'a Predicate,
417 projection: &'a [&'a str],
418 ) -> Self {
419 Self {
420 predicate: Some(predicate),
421 projection: Some(projection),
422 }
423 }
424}
425
426impl ScalarValue {
427 fn to_lance(&self) -> String {
428 match self {
429 Self::String(value) => quoted_string(value),
430 Self::Int32(value) => value.to_string(),
431 Self::Raw(value) => value.clone(),
432 }
433 }
434}
435#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
439pub struct RuntimeCaps {
440 pub index_cache_bytes: Option<usize>,
441 pub metadata_cache_bytes: Option<usize>,
442}
443
444impl RuntimeCaps {
445 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
446 Self {
447 index_cache_bytes: config.index_cache_bytes,
448 metadata_cache_bytes: config.metadata_cache_bytes,
449 }
450 }
451}
452
453const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
457const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
458const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
460const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
461
462fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
463 let (index_default, metadata_default) = if config::is_local(location) {
464 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
465 } else {
466 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
467 };
468 (
469 caps.index_cache_bytes.unwrap_or(index_default),
470 caps.metadata_cache_bytes.unwrap_or(metadata_default),
471 )
472}
473
474pub struct Handle {
475 datasets: DatasetSet,
476 retry: RetryPolicy,
477 #[allow(dead_code)]
485 session: Arc<Session>,
486 nm: Arc<dyn LanceNamespace>,
490 nm_ident: NamespaceIdent,
494 storage_options: HashMap<String, String>,
499 location: Url,
503 parts_refresh_after: Duration,
507}
508
509impl std::fmt::Debug for Handle {
510 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 formatter
512 .debug_struct("Handle")
513 .field("datasets", &self.datasets)
514 .field("retry", &self.retry)
515 .field("nm_ident", &self.nm_ident)
516 .field("storage_options", &self.storage_options)
517 .field("location", &self.location)
518 .finish()
519 }
520}
521
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub enum Table {
524 Sessions,
525 Messages,
526 Parts,
527}
528impl Table {
529 pub fn as_str(self) -> &'static str {
530 self.label()
531 }
532
533 fn label(self) -> &'static str {
534 match self {
535 Self::Sessions => "sessions",
536 Self::Messages => "messages",
537 Self::Parts => "parts",
538 }
539 }
540}
541#[derive(Debug)]
542struct DatasetSet {
543 sessions: Mutex<CachedDataset>,
544 messages: Mutex<CachedDataset>,
545 parts: OnceCell<Mutex<CachedDataset>>,
553}
554#[derive(Debug)]
555struct CachedDataset {
556 dataset: Dataset,
557 last_refresh: Instant,
558 refresh_after: Duration,
559}
560impl CachedDataset {
561 async fn latest(&mut self) -> Result<Dataset> {
562 if self.last_refresh.elapsed() >= self.refresh_after {
563 self.dataset.checkout_latest().await?;
564 self.last_refresh = Instant::now();
565 }
566 Ok(self.dataset.clone())
567 }
568 fn replace(&mut self, dataset: Dataset) {
569 self.dataset = dataset;
570 self.last_refresh = Instant::now();
571 }
572}
573impl Handle {
574 pub async fn open(location: &Url) -> Result<Self> {
577 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
578 }
579
580 pub async fn open_with_options(
586 location: &Url,
587 mut storage_options: HashMap<String, String>,
588 caps: RuntimeCaps,
589 ) -> Result<Self> {
590 if let Some(path) = config::local_path(location) {
591 tokio::fs::create_dir_all(&path)
592 .await
593 .with_context(|| format!("failed to create data dir {}", path.display()))?;
594 } else {
595 apply_remote_storage_defaults(&mut storage_options);
596 }
597 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
603 let session = Arc::new(Session::new(
604 index_cache_bytes,
605 metadata_cache_bytes,
606 Arc::new(ObjectStoreRegistry::default()),
607 ));
608 let root = location.as_str().trim_end_matches('/').to_string();
614 let mut connect = ConnectBuilder::new("dir")
615 .property("root", root)
616 .session(session.clone());
617 for (key, value) in &storage_options {
621 connect = connect.property(format!("storage.{key}"), value.clone());
622 }
623 let nm: Arc<dyn LanceNamespace> = connect
624 .connect()
625 .await
626 .context("failed to connect lance Directory namespace")?;
627 let nm_ident = NamespaceIdent::root();
628 let refresh_after = if config::is_local(location) {
634 Duration::ZERO
635 } else {
636 Duration::from_secs(5)
637 };
638 let handle = Self {
639 datasets: DatasetSet {
640 sessions: Mutex::new(CachedDataset {
641 dataset: open_or_create_via_ns(
642 &nm,
643 &nm_ident,
644 sessions::SESSIONS,
645 sessions::session_schema(),
646 &session,
647 &storage_options,
648 )
649 .await?,
650 last_refresh: Instant::now(),
651 refresh_after,
652 }),
653 messages: Mutex::new(CachedDataset {
654 dataset: open_or_create_via_ns(
655 &nm,
656 &nm_ident,
657 sessions::MESSAGES,
658 sessions::message_schema(),
659 &session,
660 &storage_options,
661 )
662 .await?,
663 last_refresh: Instant::now(),
664 refresh_after,
665 }),
666 parts: OnceCell::new(),
667 },
668 retry: RetryPolicy::default(),
669 session,
670 nm,
671 nm_ident,
672 storage_options,
673 location: location.clone(),
674 parts_refresh_after: refresh_after,
675 };
676 Ok(handle)
677 }
678
679 pub fn location(&self) -> &Url {
680 &self.location
681 }
682
683 pub fn storage_options(&self) -> &HashMap<String, String> {
687 &self.storage_options
688 }
689
690 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
691 Ok((
692 self.count_rows(Table::Sessions).await?,
693 self.count_rows(Table::Messages).await?,
694 self.count_rows(Table::Parts).await?,
695 ))
696 }
697
698 pub(crate) async fn merge_insert(
702 &self,
703 table: Table,
704 batch: RecordBatch,
705 row_count: usize,
706 ) -> Result<u64> {
707 self.merge(
708 table,
709 batch,
710 row_count,
711 "merge_insert",
712 WhenMatched::DoNothing,
713 WhenNotMatched::InsertAll,
714 )
715 .await
716 }
717
718 pub(crate) async fn merge_update(
721 &self,
722 table: Table,
723 batch: RecordBatch,
724 row_count: usize,
725 ) -> Result<u64> {
726 self.merge(
727 table,
728 batch,
729 row_count,
730 "merge_update",
731 WhenMatched::UpdateAll,
732 WhenNotMatched::DoNothing,
733 )
734 .await
735 }
736
737 async fn merge(
741 &self,
742 table: Table,
743 batch: RecordBatch,
744 row_count: usize,
745 op: &'static str,
746 when_matched: WhenMatched,
747 when_not_matched: WhenNotMatched,
748 ) -> Result<u64> {
749 if row_count == 0 {
750 return Ok(0);
751 }
752 let started = Instant::now();
753 let result = self
754 .retry_lance(table.label(), || async {
755 let mut cached = self.cached(table).await?.lock().await;
756 let existing = cached.latest().await?;
757 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
758 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
759 builder.when_matched(when_matched.clone());
760 builder.when_not_matched(when_not_matched.clone());
761 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
764 builder.skip_auto_cleanup(true);
768 let (dataset, stats) = builder
769 .try_build()?
770 .execute_reader(Box::new(reader))
771 .await?;
772 cached.replace(dataset.as_ref().clone());
773 Ok((
774 stats.num_inserted_rows + stats.num_updated_rows,
775 stats.num_skipped_duplicates,
776 ))
777 })
778 .await;
779 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
780 tracing::info!(
781 target: "pond::perf",
782 op,
783 table = %table.label(),
784 rows = row_count,
785 elapsed_ms = started.elapsed().as_millis() as u64,
786 skipped,
787 "merge",
788 );
789 result.map(|(affected, _)| affected)
790 }
791
792 pub async fn optimize_table(
801 &self,
802 table: Table,
803 intents: &[IndexIntent],
804 progress: Option<&OptimizeProgressFn>,
805 cleanup: crate::sessions::CleanupConfig,
806 ) -> TableOptimizeOutcome {
807 let compaction = self
808 .run_optimize_compact_phase(table, progress, cleanup)
809 .await;
810 let indices = self
811 .run_optimize_indices_phase(table, intents, progress)
812 .await;
813 TableOptimizeOutcome {
814 table,
815 indices,
816 compaction,
817 }
818 }
819
820 pub async fn optimize_table_indices_only(
824 &self,
825 table: Table,
826 intents: &[IndexIntent],
827 progress: Option<&OptimizeProgressFn>,
828 ) -> PhaseOutcome {
829 self.run_optimize_indices_phase(table, intents, progress)
830 .await
831 }
832
833 async fn run_optimize_indices_phase(
834 &self,
835 table: Table,
836 intents: &[IndexIntent],
837 progress: Option<&OptimizeProgressFn>,
838 ) -> PhaseOutcome {
839 if intents.is_empty() {
840 return PhaseOutcome::Noop;
841 }
842 let result = self
843 .retry_lance(table.label(), || async {
844 let mut guard = self.cached(table).await?.lock().await;
845 let mut dataset = guard.latest().await?;
846 let did_work =
847 optimize_table_indices(&mut dataset, intents, table, progress).await?;
848 guard.replace(dataset);
849 Ok::<_, anyhow::Error>(did_work)
850 })
851 .await;
852 match result {
853 Ok(true) => PhaseOutcome::Ok,
854 Ok(false) => PhaseOutcome::Noop,
855 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
856 Err(error) => PhaseOutcome::Failed(error),
857 }
858 }
859
860 async fn run_optimize_compact_phase(
861 &self,
862 table: Table,
863 progress: Option<&OptimizeProgressFn>,
864 cleanup: crate::sessions::CleanupConfig,
865 ) -> PhaseOutcome {
866 let result = self
867 .retry_lance(table.label(), || async {
868 let mut guard = self.cached(table).await?.lock().await;
869 let mut dataset = guard.latest().await?;
870 optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
871 guard.replace(dataset);
872 Ok::<_, anyhow::Error>(())
873 })
874 .await;
875 match result {
876 Ok(()) => PhaseOutcome::Ok,
877 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
878 Err(error) => PhaseOutcome::Failed(error),
879 }
880 }
881
882 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
883 self.retry_lance(table.label(), || async {
884 let mut guard = self.cached(table).await?.lock().await;
885 let mut dataset = guard.latest().await?;
886 rebuild_index(&mut dataset, intent).await?;
887 guard.replace(dataset);
888 Ok(())
889 })
890 .await
891 }
892
893 pub async fn index_status(
894 &self,
895 table: Table,
896 intents: &[IndexIntent],
897 ) -> Result<Vec<IndexStatus>> {
898 let dataset = self.dataset(table).await?;
899 index_status(table, &dataset, intents).await
900 }
901
902 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
903 let mut cached = self.cached(table).await?.lock().await;
904 cached.latest().await
905 }
906 pub(crate) async fn scanner(
911 &self,
912 table: Table,
913 predicate: Option<&Predicate>,
914 ) -> Result<lance::dataset::scanner::Scanner> {
915 let dataset = self.dataset(table).await?;
916 scanner_with_prefilter(&dataset, predicate)
917 }
918 pub async fn scan(
921 &self,
922 table: Table,
923 opts: ScanOpts<'_>,
924 ) -> Result<lance::dataset::scanner::Scanner> {
925 let mut scanner = self.scanner(table, opts.predicate).await?;
926 if let Some(projection) = opts.projection {
927 scanner.project(projection)?;
928 }
929 Ok(scanner)
930 }
931 pub(crate) async fn scan_batch(
932 &self,
933 table: Table,
934 predicate: Option<&Predicate>,
935 projection: &[&str],
936 ) -> Result<RecordBatch> {
937 let opts = ScanOpts {
938 predicate,
939 projection: (!projection.is_empty()).then_some(projection),
940 };
941 self.scan(table, opts)
942 .await?
943 .try_into_batch()
944 .await
945 .context("scan failed")
946 }
947 pub async fn count_rows(&self, table: Table) -> Result<usize> {
948 self.dataset(table)
949 .await?
950 .count_rows(None)
951 .await
952 .map_err(Into::into)
953 }
954 #[cfg(test)]
956 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
957 let dataset = self.dataset(Table::Messages).await?;
958 let indices = dataset.load_indices().await?;
959 Ok(indices.iter().map(|index| index.name.clone()).collect())
960 }
961
962 pub(crate) async fn unindexed_row_count(
965 &self,
966 table: Table,
967 index_name: &str,
968 ) -> Result<usize> {
969 let dataset = self.dataset(table).await?;
970 let fragments = dataset
971 .unindexed_fragments(index_name)
972 .await
973 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
974 Ok(fragments
975 .iter()
976 .map(|fragment| fragment.num_rows().unwrap_or(0))
977 .sum())
978 }
979
980 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
986 let mut guard = self.cached(table).await?.lock().await;
987 let mut dataset = guard.latest().await?;
988 dataset
989 .drop_index(name)
990 .await
991 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
992 guard.replace(dataset);
993 Ok(())
994 }
995
996 async fn table_location(&self, table_name: &str) -> Result<String> {
999 let request = DescribeTableRequest {
1000 id: Some(self.nm_ident.as_table_id(table_name)),
1001 ..Default::default()
1002 };
1003 let response = self
1004 .nm
1005 .describe_table(request)
1006 .await
1007 .with_context(|| format!("failed to describe table {table_name}"))?;
1008 response
1009 .location
1010 .with_context(|| format!("namespace returned no location for table {table_name}"))
1011 }
1012
1013 pub async fn table_sizes(&self) -> Result<TableSizes> {
1017 let registry = Arc::new(ObjectStoreRegistry::default());
1018 let params = ObjectStoreParams {
1019 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1020 Arc::new(StorageOptionsAccessor::with_static_options(
1021 self.storage_options.clone(),
1022 ))
1023 }),
1024 ..Default::default()
1025 };
1026
1027 let sessions = self
1028 .listed_size(
1029 ®istry,
1030 ¶ms,
1031 &self.table_location(sessions::SESSIONS).await?,
1032 )
1033 .await?;
1034 let messages = self
1035 .listed_size(
1036 ®istry,
1037 ¶ms,
1038 &self.table_location(sessions::MESSAGES).await?,
1039 )
1040 .await?;
1041 let parts = self
1042 .listed_size(
1043 ®istry,
1044 ¶ms,
1045 &self.table_location(sessions::PARTS).await?,
1046 )
1047 .await?;
1048 let root_total = self
1051 .listed_size(®istry, ¶ms, self.location.as_str())
1052 .await?;
1053 let other = root_total.saturating_sub(sessions + messages + parts);
1054 Ok(TableSizes {
1055 sessions,
1056 messages,
1057 parts,
1058 other,
1059 })
1060 }
1061
1062 async fn listed_size(
1064 &self,
1065 registry: &Arc<ObjectStoreRegistry>,
1066 params: &ObjectStoreParams,
1067 uri: &str,
1068 ) -> Result<u64> {
1069 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1070 .await
1071 .with_context(|| format!("failed to open object store for {uri}"))?;
1072 let mut listing = store.list(Some(base));
1073 let mut total = 0u64;
1074 while let Some(meta) = listing.next().await {
1075 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1076 total += meta.size;
1077 }
1078 Ok(total)
1079 }
1080 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1081 match table {
1082 Table::Sessions => Ok(&self.datasets.sessions),
1083 Table::Messages => Ok(&self.datasets.messages),
1084 Table::Parts => self.parts_cached().await,
1085 }
1086 }
1087
1088 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1091 self.datasets
1092 .parts
1093 .get_or_try_init(|| async {
1094 let dataset = open_or_create_via_ns(
1095 &self.nm,
1096 &self.nm_ident,
1097 sessions::PARTS,
1098 sessions::part_schema(),
1099 &self.session,
1100 &self.storage_options,
1101 )
1102 .await?;
1103 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1104 dataset,
1105 last_refresh: Instant::now(),
1106 refresh_after: self.parts_refresh_after,
1107 }))
1108 })
1109 .await
1110 }
1111 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1112 where
1113 Fut: std::future::Future<Output = Result<T>>,
1114 Op: FnMut() -> Fut,
1115 {
1116 let mut attempt = 0u8;
1117 loop {
1118 attempt = attempt.saturating_add(1);
1119 match operation().await {
1120 Ok(value) => return Ok(value),
1121 Err(error) if attempt < self.retry.attempts => {
1122 let backoff = self.backoff(attempt);
1123 let error_chain = format!("{error:#}");
1126 tracing::warn!(
1127 label,
1128 attempt,
1129 ?backoff,
1130 error = %error_chain,
1131 "retrying Lance operation"
1132 );
1133 tokio::time::sleep(backoff).await;
1134 }
1135 Err(error) => {
1136 let error_chain = format!("{error:#}");
1137 tracing::warn!(
1138 label,
1139 attempt,
1140 error = %error_chain,
1141 "Lance operation exhausted retries"
1142 );
1143 if is_commit_conflict(&error) {
1150 return Err(error.context(ConflictExhausted { attempts: attempt }));
1151 }
1152 return Err(error);
1153 }
1154 }
1155 }
1156 }
1157 fn backoff(&self, attempt: u8) -> Duration {
1158 let shift = u32::from(attempt.saturating_sub(1));
1159 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1160 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1161 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1164 base.mul_f64(factor).min(self.retry.max_backoff)
1165 }
1166}
1167async fn optimize_table_compact(
1182 dataset: &mut Dataset,
1183 table: Table,
1184 progress: Option<&OptimizeProgressFn>,
1185 cleanup: crate::sessions::CleanupConfig,
1186) -> Result<()> {
1187 let compaction = CompactionOptions {
1188 defer_index_remap: false,
1189 ..CompactionOptions::default()
1190 };
1191
1192 emit(
1193 progress,
1194 OptimizeEvent::PhaseStart {
1195 table,
1196 phase: OptimizePhase::Compact,
1197 detail: None,
1198 },
1199 );
1200 let started = Instant::now();
1201 compact_files(dataset, compaction, None).await?;
1202 emit(
1203 progress,
1204 OptimizeEvent::PhaseDone {
1205 table,
1206 phase: OptimizePhase::Compact,
1207 elapsed_ms: started.elapsed().as_millis() as u64,
1208 },
1209 );
1210
1211 emit(
1212 progress,
1213 OptimizeEvent::PhaseStart {
1214 table,
1215 phase: OptimizePhase::Cleanup,
1216 detail: None,
1217 },
1218 );
1219 let started = Instant::now();
1220 dataset
1225 .cleanup_old_versions(
1226 cleanup.older_than,
1227 Some(cleanup.delete_unverified),
1228 Some(false),
1229 )
1230 .await
1231 .context("cleanup_old_versions failed during index optimize")?;
1232 emit(
1233 progress,
1234 OptimizeEvent::PhaseDone {
1235 table,
1236 phase: OptimizePhase::Cleanup,
1237 elapsed_ms: started.elapsed().as_millis() as u64,
1238 },
1239 );
1240
1241 Ok(())
1242}
1243
1244async fn optimize_table_indices(
1247 dataset: &mut Dataset,
1248 intents: &[IndexIntent],
1249 table: Table,
1250 progress: Option<&OptimizeProgressFn>,
1251) -> Result<bool> {
1252 let existing = dataset.load_indices().await?;
1253 let existing_names: std::collections::HashSet<String> =
1254 existing.iter().map(|index| index.name.clone()).collect();
1255
1256 let mut append_indices: Vec<String> = Vec::new();
1257 let mut did_work = false;
1258
1259 for intent in intents {
1260 let exists = existing_names.contains(intent.name);
1261
1262 if !exists {
1263 if !intent.trigger.should_create(dataset).await? {
1264 continue;
1265 }
1266 let params = intent.params.build(dataset).await?;
1267 let index_type = intent.params.index_type();
1268 tracing::info!(
1269 index = intent.name,
1270 column = intent.column,
1271 "creating Lance index (trigger fired)",
1272 );
1273 emit(
1274 progress,
1275 OptimizeEvent::PhaseStart {
1276 table,
1277 phase: OptimizePhase::IndexCreate,
1278 detail: Some(intent.name.to_owned()),
1279 },
1280 );
1281 let started = Instant::now();
1282 dataset
1283 .create_index(
1284 &[intent.column],
1285 index_type,
1286 Some(intent.name.to_owned()),
1287 params.as_ref(),
1288 false,
1289 )
1290 .await
1291 .with_context(|| format!("failed to create index {}", intent.name))?;
1292 emit(
1293 progress,
1294 OptimizeEvent::PhaseDone {
1295 table,
1296 phase: OptimizePhase::IndexCreate,
1297 elapsed_ms: started.elapsed().as_millis() as u64,
1298 },
1299 );
1300 did_work = true;
1301 continue;
1302 }
1303
1304 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1305 if unindexed.is_empty() {
1306 continue;
1307 }
1308 if unindexed.len() < index_lag_threshold() {
1312 continue;
1313 }
1314 match intent.params {
1315 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1316 let params = intent.params.build(dataset).await?;
1317 let index_type = intent.params.index_type();
1318 tracing::debug!(
1319 target: "pond::perf",
1320 index = intent.name,
1321 column = intent.column,
1322 "rebuilding Lance BTree index",
1323 );
1324 emit(
1325 progress,
1326 OptimizeEvent::PhaseStart {
1327 table,
1328 phase: OptimizePhase::IndexRebuild,
1329 detail: Some(intent.name.to_owned()),
1330 },
1331 );
1332 let started = Instant::now();
1333 dataset
1334 .create_index(
1335 &[intent.column],
1336 index_type,
1337 Some(intent.name.to_owned()),
1338 params.as_ref(),
1339 true,
1340 )
1341 .await
1342 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1343 emit(
1344 progress,
1345 OptimizeEvent::PhaseDone {
1346 table,
1347 phase: OptimizePhase::IndexRebuild,
1348 elapsed_ms: started.elapsed().as_millis() as u64,
1349 },
1350 );
1351 did_work = true;
1352 }
1353 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1354 | IndexParamsKind::InvertedFtsNgram { .. }
1355 | IndexParamsKind::IvfPqCosine { .. } => {
1356 append_indices.push(intent.name.to_owned());
1357 }
1358 IndexParamsKind::Scalar(_) => {
1359 let params = intent.params.build(dataset).await?;
1360 emit(
1361 progress,
1362 OptimizeEvent::PhaseStart {
1363 table,
1364 phase: OptimizePhase::IndexRebuild,
1365 detail: Some(intent.name.to_owned()),
1366 },
1367 );
1368 let started = Instant::now();
1369 dataset
1370 .create_index(
1371 &[intent.column],
1372 intent.params.index_type(),
1373 Some(intent.name.to_owned()),
1374 params.as_ref(),
1375 true,
1376 )
1377 .await
1378 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1379 emit(
1380 progress,
1381 OptimizeEvent::PhaseDone {
1382 table,
1383 phase: OptimizePhase::IndexRebuild,
1384 elapsed_ms: started.elapsed().as_millis() as u64,
1385 },
1386 );
1387 did_work = true;
1388 }
1389 }
1390 }
1391
1392 if !append_indices.is_empty() {
1393 let to_append = append_indices.clone();
1394 emit(
1395 progress,
1396 OptimizeEvent::PhaseStart {
1397 table,
1398 phase: OptimizePhase::IndexAppend,
1399 detail: Some(append_indices.join(", ")),
1400 },
1401 );
1402 let started = Instant::now();
1403 dataset
1404 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1405 .await
1406 .context("optimize_indices(append) failed during index optimize")?;
1407 emit(
1408 progress,
1409 OptimizeEvent::PhaseDone {
1410 table,
1411 phase: OptimizePhase::IndexAppend,
1412 elapsed_ms: started.elapsed().as_millis() as u64,
1413 },
1414 );
1415 tracing::debug!(
1416 target: "pond::perf",
1417 indices = ?append_indices,
1418 "appended trailing fragments into indices",
1419 );
1420 did_work = true;
1421 }
1422
1423 Ok(did_work)
1424}
1425
1426async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1427 if !intent.trigger.should_create(dataset).await? {
1428 return Ok(());
1429 }
1430 let params = intent.params.build(dataset).await?;
1431 dataset
1432 .create_index(
1433 &[intent.column],
1434 intent.params.index_type(),
1435 Some(intent.name.to_owned()),
1436 params.as_ref(),
1437 true,
1438 )
1439 .await
1440 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1441 Ok(())
1442}
1443
1444async fn index_status(
1445 table: Table,
1446 dataset: &Dataset,
1447 intents: &[IndexIntent],
1448) -> Result<Vec<IndexStatus>> {
1449 let existing = dataset.load_indices().await?;
1450 let existing_names: std::collections::HashSet<String> =
1451 existing.iter().map(|index| index.name.clone()).collect();
1452 let total_fragments = dataset.get_fragments().len();
1453 let total_rows = dataset.count_rows(None).await?;
1454 let mut statuses = Vec::with_capacity(intents.len());
1455 for intent in intents {
1456 let exists = existing_names.contains(intent.name);
1457 if !exists {
1458 statuses.push(IndexStatus {
1459 table,
1460 intent_name: intent.name.to_owned(),
1461 fragments_covered: 0,
1462 unindexed_fragments: total_fragments,
1463 unindexed_rows: total_rows,
1464 exists,
1465 });
1466 continue;
1467 }
1468 let unindexed = dataset
1469 .unindexed_fragments(intent.name)
1470 .await
1471 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1472 let unindexed_fragments = unindexed.len();
1473 let unindexed_rows = unindexed
1474 .iter()
1475 .map(|fragment| fragment.num_rows().unwrap_or(0))
1476 .sum();
1477 statuses.push(IndexStatus {
1478 table,
1479 intent_name: intent.name.to_owned(),
1480 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1481 unindexed_fragments,
1482 unindexed_rows,
1483 exists,
1484 });
1485 }
1486 Ok(statuses)
1487}
1488
1489async fn open_or_create_via_ns(
1501 nm: &Arc<dyn LanceNamespace>,
1502 nm_ident: &NamespaceIdent,
1503 table_name: &str,
1504 schema: lance::deps::arrow_schema::SchemaRef,
1505 session: &Arc<Session>,
1506 storage_options: &HashMap<String, String>,
1507) -> Result<Dataset> {
1508 let table_id = nm_ident.as_table_id(table_name);
1509
1510 let request = DescribeTableRequest {
1511 id: Some(table_id.clone()),
1512 ..Default::default()
1513 };
1514 match nm.describe_table(request).await {
1515 Ok(response) => {
1516 let location = response.location.with_context(|| {
1517 format!("namespace returned no location for table {table_name}")
1518 })?;
1519 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1520 if !storage_options.is_empty() {
1521 builder = builder.with_storage_options(storage_options.clone());
1522 }
1523 let dataset = builder
1524 .load()
1525 .await
1526 .with_context(|| format!("failed to open table {table_name}"))?;
1527 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1528 return Ok(dataset);
1529 }
1530 Err(error) => match &error {
1531 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1532 }
1534 _ => {
1535 return Err(anyhow::Error::from(error))
1536 .with_context(|| format!("failed to describe table {table_name}"));
1537 }
1538 },
1539 }
1540
1541 let mut write_params = sessions::write_params_for_create();
1544 write_params.session = Some(session.clone());
1545 write_params.mode = WriteMode::Create;
1546 if !storage_options.is_empty() {
1547 write_params.store_params = Some(ObjectStoreParams {
1548 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1549 storage_options.clone(),
1550 ))),
1551 ..Default::default()
1552 });
1553 }
1554 let reader = sessions::empty_reader(schema)?;
1555 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1556 .await
1557 .with_context(|| format!("failed to create table {table_name}"))
1558}
1559
1560fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1564 if !matches!(error, lance::Error::Namespace { .. }) {
1565 return false;
1566 }
1567 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1568 link.source()
1569 })
1570 .filter_map(|link| link.downcast_ref::<NamespaceError>())
1571 .any(|inner| inner.code() == code)
1572}
1573
1574fn scanner_with_prefilter(
1575 dataset: &Dataset,
1576 predicate: Option<&Predicate>,
1577) -> Result<lance::dataset::scanner::Scanner> {
1578 let mut scanner = dataset.scan();
1579 scanner.prefilter(true);
1580 if let Some(predicate) = predicate {
1581 let filter = predicate.to_lance();
1582 if !filter.is_empty() {
1583 scanner.filter(&filter)?;
1584 }
1585 }
1586 Ok(scanner)
1587}
1588fn ensure_schema_matches(
1589 dataset: &Dataset,
1590 expected: &lance::deps::arrow_schema::Schema,
1591 table_name: &str,
1592) -> Result<()> {
1593 use lance::deps::arrow_schema::DataType;
1594 use std::collections::BTreeSet;
1595 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1596 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1597 let expected_names: BTreeSet<&str> = expected
1598 .fields()
1599 .iter()
1600 .map(|f| f.name().as_str())
1601 .collect();
1602 if actual_names != expected_names {
1603 anyhow::bail!(
1604 "table {table_name} has columns {actual_names:?} but this pond build expects \
1605 {expected_names:?} - the on-disk store predates a schema change; delete the \
1606 data directory and re-run `pond ingest`",
1607 );
1608 }
1609 for actual_field in actual.fields() {
1614 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1615 continue;
1616 };
1617 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1618 (actual_field.data_type(), expected_field.data_type())
1619 && actual_dim != expected_dim
1620 {
1621 tracing::warn!(
1622 table = table_name,
1623 column = actual_field.name(),
1624 actual_dim,
1625 expected_dim,
1626 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1627 );
1628 }
1629 }
1630 Ok(())
1631}
1632fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1639 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1640 if aliases
1641 .iter()
1642 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1643 {
1644 return;
1645 }
1646 options.insert(aliases[0].to_owned(), value.to_owned());
1647 }
1648 set_default(options, &["pool_idle_timeout"], "300 seconds");
1649 set_default(options, &["connect_timeout"], "10 seconds");
1650 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1651 .iter()
1652 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1653 if has_custom_endpoint {
1654 set_default(
1655 options,
1656 &["aws_unsigned_payload", "unsigned_payload"],
1657 "true",
1658 );
1659 }
1660}
1661
1662fn quoted_string(value: &str) -> String {
1663 format!("'{}'", value.replace('\'', "''"))
1664}
1665fn like_contains(value: &str) -> String {
1666 let escaped = value
1667 .replace('\\', "\\\\")
1668 .replace('%', "\\%")
1669 .replace('_', "\\_")
1670 .replace('\'', "''");
1671 format!("'%{escaped}%'")
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676 use super::*;
1677 use tempfile::TempDir;
1678
1679 #[test]
1680 fn namespace_error_code_walks_wrapped_chain() {
1681 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1682 message: "missing".into(),
1683 }));
1684 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1685
1686 let wrapped = lance::Error::namespace_source(Box::new(direct));
1687 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1688
1689 let other_code =
1690 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1691 message: "nope".into(),
1692 }));
1693 assert!(!is_namespace_error_code(
1694 &other_code,
1695 ErrorCode::TableNotFound
1696 ));
1697
1698 let not_namespace = lance::Error::internal("unrelated");
1699 assert!(!is_namespace_error_code(
1700 ¬_namespace,
1701 ErrorCode::TableNotFound
1702 ));
1703 }
1704
1705 #[tokio::test]
1709 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1710 let temp = TempDir::new()?;
1711 let url = Url::from_directory_path(temp.path())
1712 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1713 let handle = Handle::open(&url).await?;
1714 let cases: [(Table, &[&str]); 3] = [
1717 (Table::Sessions, &["id"]),
1718 (Table::Messages, &["id"]),
1719 (Table::Parts, &["id"]),
1720 ];
1721 for (table, projection) in cases {
1722 let scanner = handle
1723 .scan(table, ScanOpts::project_only(projection))
1724 .await?;
1725 let batch = scanner.try_into_batch().await?;
1726 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1727 }
1728 Ok(())
1729 }
1730}