1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::NamespaceError;
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30 collections::HashMap,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52pub fn init_index_lag_threshold(value: usize) {
55 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59 INDEX_LAG_THRESHOLD_RUNTIME
60 .get()
61 .copied()
62 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65#[derive(Debug, Clone)]
68pub struct IndexIntent {
69 pub name: &'static str,
72 pub column: &'static str,
74 pub trigger: IndexTrigger,
76 pub params: IndexParamsKind,
79}
80
81#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84 OnAnyRows,
87 OnNonNullCount {
90 column: &'static str,
91 threshold: usize,
92 },
93}
94
95#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99 Scalar(BuiltinIndexType),
102 InvertedFtsNgram { min: u32, max: u32 },
106 IvfPqCosine {
111 sub_vectors: usize,
112 num_bits: u8,
113 max_iters: usize,
114 },
115}
116
117impl IndexTrigger {
118 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119 match self {
120 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121 Self::OnNonNullCount { column, threshold } => {
122 let count = dataset
123 .count_rows(Some(format!("{column} IS NOT NULL")))
124 .await?;
125 Ok(count >= *threshold)
126 }
127 }
128 }
129}
130
131impl IndexParamsKind {
132 fn index_type(&self) -> IndexType {
133 match self {
134 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135 Self::Scalar(_) => IndexType::BTree,
136 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137 Self::IvfPqCosine { .. } => IndexType::Vector,
138 }
139 }
140
141 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142 match self {
143 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145 InvertedIndexParams::default()
146 .base_tokenizer("ngram".to_owned())
147 .ngram_min_length(*min)
148 .ngram_max_length(*max)
149 .stem(false)
150 .remove_stop_words(false),
151 )),
152 Self::IvfPqCosine {
153 sub_vectors,
154 num_bits,
155 max_iters,
156 } => {
157 let count = dataset
158 .count_rows(Some("vector IS NOT NULL".to_owned()))
159 .await?;
160 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161 Ok(Box::new(VectorIndexParams::ivf_pq(
162 partitions,
163 *num_bits,
164 *sub_vectors,
165 MetricType::Cosine,
166 *max_iters,
167 )))
168 }
169 }
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175 pub table: Table,
176 pub intent_name: String,
177 pub fragments_covered: usize,
178 pub unindexed_rows: usize,
179 pub exists: bool,
180}
181
182#[derive(Debug, Clone, Copy)]
187pub struct ConflictExhausted {
188 pub attempts: u8,
189}
190
191impl std::fmt::Display for ConflictExhausted {
192 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193 write!(
194 formatter,
195 "commit conflict exhausted after {} attempt(s)",
196 self.attempts
197 )
198 }
199}
200
201impl std::error::Error for ConflictExhausted {}
202
203#[derive(Debug)]
208pub enum PhaseOutcome {
209 Ok,
211 Noop,
213 SkippedConflict,
216 Failed(anyhow::Error),
218 NotAttempted,
221}
222
223impl PhaseOutcome {
224 pub fn is_failed(&self) -> bool {
225 matches!(self, Self::Failed(_))
226 }
227}
228
229#[derive(Debug)]
231pub struct TableOptimizeOutcome {
232 pub table: Table,
233 pub indices: PhaseOutcome,
234 pub compaction: PhaseOutcome,
235}
236
237#[derive(Debug, Clone)]
240pub enum OptimizeEvent {
241 PhaseStart {
242 table: Table,
243 phase: OptimizePhase,
244 detail: Option<String>,
245 },
246 PhaseDone {
247 table: Table,
248 phase: OptimizePhase,
249 elapsed_ms: u64,
250 },
251}
252
253#[derive(Debug, Clone, Copy)]
254pub enum OptimizePhase {
255 Compact,
256 Cleanup,
257 IndexCreate,
258 IndexRebuild,
259 IndexAppend,
260}
261
262impl OptimizePhase {
263 pub fn label(self) -> &'static str {
264 match self {
265 Self::Compact => "compact",
266 Self::Cleanup => "cleanup",
267 Self::IndexCreate => "index-create",
268 Self::IndexRebuild => "index-rebuild",
269 Self::IndexAppend => "index-append",
270 }
271 }
272}
273
274pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
275
276fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
277 if let Some(callback) = progress {
278 callback(event);
279 }
280}
281
282pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
286 error.downcast_ref::<lance::Error>().is_some_and(|err| {
287 matches!(
288 err,
289 lance::Error::CommitConflict { .. }
290 | lance::Error::RetryableCommitConflict { .. }
291 | lance::Error::TooMuchWriteContention { .. }
292 )
293 })
294}
295
296fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
299 error.chain().any(|cause| cause.is::<ConflictExhausted>())
300}
301
302#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
306pub struct TableSizes {
307 pub sessions: u64,
308 pub messages: u64,
309 pub parts: u64,
310 pub other: u64,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub enum ScalarValue {
315 String(String),
316 Int32(i32),
317 Raw(String),
318}
319impl From<&str> for ScalarValue {
320 fn from(value: &str) -> Self {
321 Self::String(value.to_owned())
322 }
323}
324impl From<String> for ScalarValue {
325 fn from(value: String) -> Self {
326 Self::String(value)
327 }
328}
329impl From<i32> for ScalarValue {
330 fn from(value: i32) -> Self {
331 Self::Int32(value)
332 }
333}
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub enum Predicate {
336 Eq(&'static str, ScalarValue),
337 Ne(&'static str, ScalarValue),
338 IsNull(&'static str),
339 IsNotNull(&'static str),
340 In(&'static str, Vec<ScalarValue>),
341 LikeContains(&'static str, String),
342 Regex(&'static str, String),
347 Gte(&'static str, ScalarValue),
348 Lte(&'static str, ScalarValue),
349 And(Vec<Predicate>),
350 Or(Vec<Predicate>),
351}
352impl Predicate {
353 pub fn to_lance(&self) -> String {
354 match self {
355 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
356 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
357 Self::IsNull(column) => format!("{column} IS NULL"),
358 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
359 Self::In(column, values) => {
360 let values = values
361 .iter()
362 .map(ScalarValue::to_lance)
363 .collect::<Vec<_>>()
364 .join(", ");
365 format!("{column} IN ({values})")
366 }
367 Self::LikeContains(column, value) => {
368 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
369 }
370 Self::Regex(column, pattern) => {
371 format!("regexp_like({column}, {})", quoted_string(pattern))
372 }
373 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
374 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
375 Self::And(predicates) => predicates
376 .iter()
377 .map(Self::to_lance)
378 .filter(|predicate| !predicate.is_empty())
379 .collect::<Vec<_>>()
380 .join(" AND "),
381 Self::Or(predicates) => {
382 let body = predicates
385 .iter()
386 .map(Self::to_lance)
387 .filter(|predicate| !predicate.is_empty())
388 .collect::<Vec<_>>()
389 .join(" OR ");
390 if body.is_empty() {
391 String::new()
392 } else {
393 format!("({body})")
394 }
395 }
396 }
397 }
398}
399#[derive(Default)]
402pub struct ScanOpts<'a> {
403 pub predicate: Option<&'a Predicate>,
404 pub projection: Option<&'a [&'a str]>,
405}
406
407impl<'a> ScanOpts<'a> {
408 pub fn project_only(projection: &'a [&'a str]) -> Self {
409 Self {
410 predicate: None,
411 projection: Some(projection),
412 }
413 }
414 pub fn with_predicate_and_projection(
415 predicate: &'a Predicate,
416 projection: &'a [&'a str],
417 ) -> Self {
418 Self {
419 predicate: Some(predicate),
420 projection: Some(projection),
421 }
422 }
423}
424
425impl ScalarValue {
426 fn to_lance(&self) -> String {
427 match self {
428 Self::String(value) => quoted_string(value),
429 Self::Int32(value) => value.to_string(),
430 Self::Raw(value) => value.clone(),
431 }
432 }
433}
434#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
438pub struct RuntimeCaps {
439 pub index_cache_bytes: Option<usize>,
440 pub metadata_cache_bytes: Option<usize>,
441}
442
443impl RuntimeCaps {
444 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
445 Self {
446 index_cache_bytes: config.index_cache_bytes,
447 metadata_cache_bytes: config.metadata_cache_bytes,
448 }
449 }
450}
451
452const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
456const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
457const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
459const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
460
461fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
462 let (index_default, metadata_default) = if config::is_local(location) {
463 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
464 } else {
465 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
466 };
467 (
468 caps.index_cache_bytes.unwrap_or(index_default),
469 caps.metadata_cache_bytes.unwrap_or(metadata_default),
470 )
471}
472
473pub struct Handle {
474 datasets: DatasetSet,
475 retry: RetryPolicy,
476 #[allow(dead_code)]
484 session: Arc<Session>,
485 nm: Arc<dyn LanceNamespace>,
489 nm_ident: NamespaceIdent,
493 storage_options: HashMap<String, String>,
498 location: Url,
502 parts_refresh_after: Duration,
506}
507
508impl std::fmt::Debug for Handle {
509 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510 formatter
511 .debug_struct("Handle")
512 .field("datasets", &self.datasets)
513 .field("retry", &self.retry)
514 .field("nm_ident", &self.nm_ident)
515 .field("storage_options", &self.storage_options)
516 .field("location", &self.location)
517 .finish()
518 }
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub enum Table {
523 Sessions,
524 Messages,
525 Parts,
526}
527impl Table {
528 pub fn as_str(self) -> &'static str {
529 self.label()
530 }
531
532 fn label(self) -> &'static str {
533 match self {
534 Self::Sessions => "sessions",
535 Self::Messages => "messages",
536 Self::Parts => "parts",
537 }
538 }
539}
540#[derive(Debug)]
541struct DatasetSet {
542 sessions: Mutex<CachedDataset>,
543 messages: Mutex<CachedDataset>,
544 parts: OnceCell<Mutex<CachedDataset>>,
552}
553#[derive(Debug)]
554struct CachedDataset {
555 dataset: Dataset,
556 last_refresh: Instant,
557 refresh_after: Duration,
558}
559impl CachedDataset {
560 async fn latest(&mut self) -> Result<Dataset> {
561 if self.last_refresh.elapsed() >= self.refresh_after {
562 self.dataset.checkout_latest().await?;
563 self.last_refresh = Instant::now();
564 }
565 Ok(self.dataset.clone())
566 }
567 fn replace(&mut self, dataset: Dataset) {
568 self.dataset = dataset;
569 self.last_refresh = Instant::now();
570 }
571}
572impl Handle {
573 pub async fn open(location: &Url) -> Result<Self> {
576 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
577 }
578
579 pub async fn open_with_options(
585 location: &Url,
586 mut storage_options: HashMap<String, String>,
587 caps: RuntimeCaps,
588 ) -> Result<Self> {
589 if let Some(path) = config::local_path(location) {
590 tokio::fs::create_dir_all(&path)
591 .await
592 .with_context(|| format!("failed to create data dir {}", path.display()))?;
593 } else {
594 apply_remote_storage_defaults(&mut storage_options);
595 }
596 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
602 let session = Arc::new(Session::new(
603 index_cache_bytes,
604 metadata_cache_bytes,
605 Arc::new(ObjectStoreRegistry::default()),
606 ));
607 let root = location.as_str().trim_end_matches('/').to_string();
613 let mut connect = ConnectBuilder::new("dir")
614 .property("root", root)
615 .session(session.clone());
616 for (key, value) in &storage_options {
620 connect = connect.property(format!("storage.{key}"), value.clone());
621 }
622 let nm: Arc<dyn LanceNamespace> = connect
623 .connect()
624 .await
625 .context("failed to connect lance Directory namespace")?;
626 let nm_ident = NamespaceIdent::root();
627 let refresh_after = if config::is_local(location) {
633 Duration::ZERO
634 } else {
635 Duration::from_secs(5)
636 };
637 let handle = Self {
638 datasets: DatasetSet {
639 sessions: Mutex::new(CachedDataset {
640 dataset: open_or_create_via_ns(
641 &nm,
642 &nm_ident,
643 sessions::SESSIONS,
644 sessions::session_schema(),
645 &session,
646 &storage_options,
647 )
648 .await?,
649 last_refresh: Instant::now(),
650 refresh_after,
651 }),
652 messages: Mutex::new(CachedDataset {
653 dataset: open_or_create_via_ns(
654 &nm,
655 &nm_ident,
656 sessions::MESSAGES,
657 sessions::message_schema(),
658 &session,
659 &storage_options,
660 )
661 .await?,
662 last_refresh: Instant::now(),
663 refresh_after,
664 }),
665 parts: OnceCell::new(),
666 },
667 retry: RetryPolicy::default(),
668 session,
669 nm,
670 nm_ident,
671 storage_options,
672 location: location.clone(),
673 parts_refresh_after: refresh_after,
674 };
675 Ok(handle)
676 }
677
678 pub fn location(&self) -> &Url {
679 &self.location
680 }
681
682 pub fn storage_options(&self) -> &HashMap<String, String> {
686 &self.storage_options
687 }
688
689 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
690 Ok((
691 self.count_rows(Table::Sessions).await?,
692 self.count_rows(Table::Messages).await?,
693 self.count_rows(Table::Parts).await?,
694 ))
695 }
696
697 pub(crate) async fn merge_insert(
701 &self,
702 table: Table,
703 batch: RecordBatch,
704 row_count: usize,
705 ) -> Result<u64> {
706 self.merge(
707 table,
708 batch,
709 row_count,
710 "merge_insert",
711 WhenMatched::DoNothing,
712 WhenNotMatched::InsertAll,
713 )
714 .await
715 }
716
717 pub(crate) async fn merge_update(
720 &self,
721 table: Table,
722 batch: RecordBatch,
723 row_count: usize,
724 ) -> Result<u64> {
725 self.merge(
726 table,
727 batch,
728 row_count,
729 "merge_update",
730 WhenMatched::UpdateAll,
731 WhenNotMatched::DoNothing,
732 )
733 .await
734 }
735
736 async fn merge(
740 &self,
741 table: Table,
742 batch: RecordBatch,
743 row_count: usize,
744 op: &'static str,
745 when_matched: WhenMatched,
746 when_not_matched: WhenNotMatched,
747 ) -> Result<u64> {
748 if row_count == 0 {
749 return Ok(0);
750 }
751 let started = Instant::now();
752 let result = self
753 .retry_lance(table.label(), || async {
754 let mut cached = self.cached(table).await?.lock().await;
755 let existing = cached.latest().await?;
756 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
757 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
758 builder.when_matched(when_matched.clone());
759 builder.when_not_matched(when_not_matched.clone());
760 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
763 builder.skip_auto_cleanup(true);
767 let (dataset, stats) = builder
768 .try_build()?
769 .execute_reader(Box::new(reader))
770 .await?;
771 cached.replace(dataset.as_ref().clone());
772 Ok((
773 stats.num_inserted_rows + stats.num_updated_rows,
774 stats.num_skipped_duplicates,
775 ))
776 })
777 .await;
778 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
779 tracing::info!(
780 target: "pond::perf",
781 op,
782 table = %table.label(),
783 rows = row_count,
784 elapsed_ms = started.elapsed().as_millis() as u64,
785 skipped,
786 "merge",
787 );
788 result.map(|(affected, _)| affected)
789 }
790
791 pub async fn optimize_table(
800 &self,
801 table: Table,
802 intents: &[IndexIntent],
803 progress: Option<&OptimizeProgressFn>,
804 cleanup: crate::sessions::CleanupConfig,
805 ) -> TableOptimizeOutcome {
806 let compaction = self
807 .run_optimize_compact_phase(table, progress, cleanup)
808 .await;
809 let indices = self
810 .run_optimize_indices_phase(table, intents, progress)
811 .await;
812 TableOptimizeOutcome {
813 table,
814 indices,
815 compaction,
816 }
817 }
818
819 pub async fn optimize_table_indices_only(
823 &self,
824 table: Table,
825 intents: &[IndexIntent],
826 progress: Option<&OptimizeProgressFn>,
827 ) -> PhaseOutcome {
828 self.run_optimize_indices_phase(table, intents, progress)
829 .await
830 }
831
832 async fn run_optimize_indices_phase(
833 &self,
834 table: Table,
835 intents: &[IndexIntent],
836 progress: Option<&OptimizeProgressFn>,
837 ) -> PhaseOutcome {
838 if intents.is_empty() {
839 return PhaseOutcome::Noop;
840 }
841 let result = self
842 .retry_lance(table.label(), || async {
843 let mut guard = self.cached(table).await?.lock().await;
844 let mut dataset = guard.latest().await?;
845 let did_work =
846 optimize_table_indices(&mut dataset, intents, table, progress).await?;
847 guard.replace(dataset);
848 Ok::<_, anyhow::Error>(did_work)
849 })
850 .await;
851 match result {
852 Ok(true) => PhaseOutcome::Ok,
853 Ok(false) => PhaseOutcome::Noop,
854 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
855 Err(error) => PhaseOutcome::Failed(error),
856 }
857 }
858
859 async fn run_optimize_compact_phase(
860 &self,
861 table: Table,
862 progress: Option<&OptimizeProgressFn>,
863 cleanup: crate::sessions::CleanupConfig,
864 ) -> PhaseOutcome {
865 let result = self
866 .retry_lance(table.label(), || async {
867 let mut guard = self.cached(table).await?.lock().await;
868 let mut dataset = guard.latest().await?;
869 optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
870 guard.replace(dataset);
871 Ok::<_, anyhow::Error>(())
872 })
873 .await;
874 match result {
875 Ok(()) => PhaseOutcome::Ok,
876 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
877 Err(error) => PhaseOutcome::Failed(error),
878 }
879 }
880
881 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
882 self.retry_lance(table.label(), || async {
883 let mut guard = self.cached(table).await?.lock().await;
884 let mut dataset = guard.latest().await?;
885 rebuild_index(&mut dataset, intent).await?;
886 guard.replace(dataset);
887 Ok(())
888 })
889 .await
890 }
891
892 pub async fn index_status(
893 &self,
894 table: Table,
895 intents: &[IndexIntent],
896 ) -> Result<Vec<IndexStatus>> {
897 let dataset = self.dataset(table).await?;
898 index_status(table, &dataset, intents).await
899 }
900
901 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
902 let mut cached = self.cached(table).await?.lock().await;
903 cached.latest().await
904 }
905 pub(crate) async fn scanner(
910 &self,
911 table: Table,
912 predicate: Option<&Predicate>,
913 ) -> Result<lance::dataset::scanner::Scanner> {
914 let dataset = self.dataset(table).await?;
915 scanner_with_prefilter(&dataset, predicate)
916 }
917 pub async fn scan(
920 &self,
921 table: Table,
922 opts: ScanOpts<'_>,
923 ) -> Result<lance::dataset::scanner::Scanner> {
924 let mut scanner = self.scanner(table, opts.predicate).await?;
925 if let Some(projection) = opts.projection {
926 scanner.project(projection)?;
927 }
928 Ok(scanner)
929 }
930 pub(crate) async fn scan_batch(
931 &self,
932 table: Table,
933 predicate: Option<&Predicate>,
934 projection: &[&str],
935 ) -> Result<RecordBatch> {
936 let opts = ScanOpts {
937 predicate,
938 projection: (!projection.is_empty()).then_some(projection),
939 };
940 self.scan(table, opts)
941 .await?
942 .try_into_batch()
943 .await
944 .context("scan failed")
945 }
946 pub async fn count_rows(&self, table: Table) -> Result<usize> {
947 self.dataset(table)
948 .await?
949 .count_rows(None)
950 .await
951 .map_err(Into::into)
952 }
953 #[cfg(test)]
955 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
956 let dataset = self.dataset(Table::Messages).await?;
957 let indices = dataset.load_indices().await?;
958 Ok(indices.iter().map(|index| index.name.clone()).collect())
959 }
960
961 pub(crate) async fn unindexed_row_count(
964 &self,
965 table: Table,
966 index_name: &str,
967 ) -> Result<usize> {
968 let dataset = self.dataset(table).await?;
969 let fragments = dataset
970 .unindexed_fragments(index_name)
971 .await
972 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
973 Ok(fragments
974 .iter()
975 .map(|fragment| fragment.num_rows().unwrap_or(0))
976 .sum())
977 }
978
979 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
985 let mut guard = self.cached(table).await?.lock().await;
986 let mut dataset = guard.latest().await?;
987 dataset
988 .drop_index(name)
989 .await
990 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
991 guard.replace(dataset);
992 Ok(())
993 }
994
995 async fn table_location(&self, table_name: &str) -> Result<String> {
998 let request = DescribeTableRequest {
999 id: Some(self.nm_ident.as_table_id(table_name)),
1000 ..Default::default()
1001 };
1002 let response = self
1003 .nm
1004 .describe_table(request)
1005 .await
1006 .with_context(|| format!("failed to describe table {table_name}"))?;
1007 response
1008 .location
1009 .with_context(|| format!("namespace returned no location for table {table_name}"))
1010 }
1011
1012 pub async fn table_sizes(&self) -> Result<TableSizes> {
1016 let registry = Arc::new(ObjectStoreRegistry::default());
1017 let params = ObjectStoreParams {
1018 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1019 Arc::new(StorageOptionsAccessor::with_static_options(
1020 self.storage_options.clone(),
1021 ))
1022 }),
1023 ..Default::default()
1024 };
1025
1026 let sessions = self
1027 .listed_size(
1028 ®istry,
1029 ¶ms,
1030 &self.table_location(sessions::SESSIONS).await?,
1031 )
1032 .await?;
1033 let messages = self
1034 .listed_size(
1035 ®istry,
1036 ¶ms,
1037 &self.table_location(sessions::MESSAGES).await?,
1038 )
1039 .await?;
1040 let parts = self
1041 .listed_size(
1042 ®istry,
1043 ¶ms,
1044 &self.table_location(sessions::PARTS).await?,
1045 )
1046 .await?;
1047 let root_total = self
1050 .listed_size(®istry, ¶ms, self.location.as_str())
1051 .await?;
1052 let other = root_total.saturating_sub(sessions + messages + parts);
1053 Ok(TableSizes {
1054 sessions,
1055 messages,
1056 parts,
1057 other,
1058 })
1059 }
1060
1061 async fn listed_size(
1063 &self,
1064 registry: &Arc<ObjectStoreRegistry>,
1065 params: &ObjectStoreParams,
1066 uri: &str,
1067 ) -> Result<u64> {
1068 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1069 .await
1070 .with_context(|| format!("failed to open object store for {uri}"))?;
1071 let mut listing = store.list(Some(base));
1072 let mut total = 0u64;
1073 while let Some(meta) = listing.next().await {
1074 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1075 total += meta.size;
1076 }
1077 Ok(total)
1078 }
1079 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1080 match table {
1081 Table::Sessions => Ok(&self.datasets.sessions),
1082 Table::Messages => Ok(&self.datasets.messages),
1083 Table::Parts => self.parts_cached().await,
1084 }
1085 }
1086
1087 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1090 self.datasets
1091 .parts
1092 .get_or_try_init(|| async {
1093 let dataset = open_or_create_via_ns(
1094 &self.nm,
1095 &self.nm_ident,
1096 sessions::PARTS,
1097 sessions::part_schema(),
1098 &self.session,
1099 &self.storage_options,
1100 )
1101 .await?;
1102 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1103 dataset,
1104 last_refresh: Instant::now(),
1105 refresh_after: self.parts_refresh_after,
1106 }))
1107 })
1108 .await
1109 }
1110 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1111 where
1112 Fut: std::future::Future<Output = Result<T>>,
1113 Op: FnMut() -> Fut,
1114 {
1115 let mut attempt = 0u8;
1116 loop {
1117 attempt = attempt.saturating_add(1);
1118 match operation().await {
1119 Ok(value) => return Ok(value),
1120 Err(error) if attempt < self.retry.attempts => {
1121 let backoff = self.backoff(attempt);
1122 let error_chain = format!("{error:#}");
1125 tracing::warn!(
1126 label,
1127 attempt,
1128 ?backoff,
1129 error = %error_chain,
1130 "retrying Lance operation"
1131 );
1132 tokio::time::sleep(backoff).await;
1133 }
1134 Err(error) => {
1135 let error_chain = format!("{error:#}");
1136 tracing::warn!(
1137 label,
1138 attempt,
1139 error = %error_chain,
1140 "Lance operation exhausted retries"
1141 );
1142 if is_commit_conflict(&error) {
1149 return Err(error.context(ConflictExhausted { attempts: attempt }));
1150 }
1151 return Err(error);
1152 }
1153 }
1154 }
1155 }
1156 fn backoff(&self, attempt: u8) -> Duration {
1157 let shift = u32::from(attempt.saturating_sub(1));
1158 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1159 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1160 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1163 base.mul_f64(factor).min(self.retry.max_backoff)
1164 }
1165}
1166async fn optimize_table_compact(
1181 dataset: &mut Dataset,
1182 table: Table,
1183 progress: Option<&OptimizeProgressFn>,
1184 cleanup: crate::sessions::CleanupConfig,
1185) -> Result<()> {
1186 let compaction = CompactionOptions {
1187 defer_index_remap: false,
1188 ..CompactionOptions::default()
1189 };
1190
1191 emit(
1192 progress,
1193 OptimizeEvent::PhaseStart {
1194 table,
1195 phase: OptimizePhase::Compact,
1196 detail: None,
1197 },
1198 );
1199 let started = Instant::now();
1200 compact_files(dataset, compaction, None).await?;
1201 emit(
1202 progress,
1203 OptimizeEvent::PhaseDone {
1204 table,
1205 phase: OptimizePhase::Compact,
1206 elapsed_ms: started.elapsed().as_millis() as u64,
1207 },
1208 );
1209
1210 emit(
1211 progress,
1212 OptimizeEvent::PhaseStart {
1213 table,
1214 phase: OptimizePhase::Cleanup,
1215 detail: None,
1216 },
1217 );
1218 let started = Instant::now();
1219 dataset
1224 .cleanup_old_versions(
1225 cleanup.older_than,
1226 Some(cleanup.delete_unverified),
1227 Some(false),
1228 )
1229 .await
1230 .context("cleanup_old_versions failed during index optimize")?;
1231 emit(
1232 progress,
1233 OptimizeEvent::PhaseDone {
1234 table,
1235 phase: OptimizePhase::Cleanup,
1236 elapsed_ms: started.elapsed().as_millis() as u64,
1237 },
1238 );
1239
1240 Ok(())
1241}
1242
1243async fn optimize_table_indices(
1246 dataset: &mut Dataset,
1247 intents: &[IndexIntent],
1248 table: Table,
1249 progress: Option<&OptimizeProgressFn>,
1250) -> Result<bool> {
1251 let existing = dataset.load_indices().await?;
1252 let existing_names: std::collections::HashSet<String> =
1253 existing.iter().map(|index| index.name.clone()).collect();
1254
1255 let mut append_indices: Vec<String> = Vec::new();
1256 let mut did_work = false;
1257
1258 for intent in intents {
1259 let exists = existing_names.contains(intent.name);
1260
1261 if !exists {
1262 if !intent.trigger.should_create(dataset).await? {
1263 continue;
1264 }
1265 let params = intent.params.build(dataset).await?;
1266 let index_type = intent.params.index_type();
1267 tracing::info!(
1268 index = intent.name,
1269 column = intent.column,
1270 "creating Lance index (trigger fired)",
1271 );
1272 emit(
1273 progress,
1274 OptimizeEvent::PhaseStart {
1275 table,
1276 phase: OptimizePhase::IndexCreate,
1277 detail: Some(intent.name.to_owned()),
1278 },
1279 );
1280 let started = Instant::now();
1281 dataset
1282 .create_index(
1283 &[intent.column],
1284 index_type,
1285 Some(intent.name.to_owned()),
1286 params.as_ref(),
1287 false,
1288 )
1289 .await
1290 .with_context(|| format!("failed to create index {}", intent.name))?;
1291 emit(
1292 progress,
1293 OptimizeEvent::PhaseDone {
1294 table,
1295 phase: OptimizePhase::IndexCreate,
1296 elapsed_ms: started.elapsed().as_millis() as u64,
1297 },
1298 );
1299 did_work = true;
1300 continue;
1301 }
1302
1303 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1304 if unindexed.is_empty() {
1305 continue;
1306 }
1307 if unindexed.len() < index_lag_threshold() {
1311 continue;
1312 }
1313 match intent.params {
1314 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1315 let params = intent.params.build(dataset).await?;
1316 let index_type = intent.params.index_type();
1317 tracing::debug!(
1318 target: "pond::perf",
1319 index = intent.name,
1320 column = intent.column,
1321 "rebuilding Lance BTree index",
1322 );
1323 emit(
1324 progress,
1325 OptimizeEvent::PhaseStart {
1326 table,
1327 phase: OptimizePhase::IndexRebuild,
1328 detail: Some(intent.name.to_owned()),
1329 },
1330 );
1331 let started = Instant::now();
1332 dataset
1333 .create_index(
1334 &[intent.column],
1335 index_type,
1336 Some(intent.name.to_owned()),
1337 params.as_ref(),
1338 true,
1339 )
1340 .await
1341 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1342 emit(
1343 progress,
1344 OptimizeEvent::PhaseDone {
1345 table,
1346 phase: OptimizePhase::IndexRebuild,
1347 elapsed_ms: started.elapsed().as_millis() as u64,
1348 },
1349 );
1350 did_work = true;
1351 }
1352 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1353 | IndexParamsKind::InvertedFtsNgram { .. }
1354 | IndexParamsKind::IvfPqCosine { .. } => {
1355 append_indices.push(intent.name.to_owned());
1356 }
1357 IndexParamsKind::Scalar(_) => {
1358 let params = intent.params.build(dataset).await?;
1359 emit(
1360 progress,
1361 OptimizeEvent::PhaseStart {
1362 table,
1363 phase: OptimizePhase::IndexRebuild,
1364 detail: Some(intent.name.to_owned()),
1365 },
1366 );
1367 let started = Instant::now();
1368 dataset
1369 .create_index(
1370 &[intent.column],
1371 intent.params.index_type(),
1372 Some(intent.name.to_owned()),
1373 params.as_ref(),
1374 true,
1375 )
1376 .await
1377 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1378 emit(
1379 progress,
1380 OptimizeEvent::PhaseDone {
1381 table,
1382 phase: OptimizePhase::IndexRebuild,
1383 elapsed_ms: started.elapsed().as_millis() as u64,
1384 },
1385 );
1386 did_work = true;
1387 }
1388 }
1389 }
1390
1391 if !append_indices.is_empty() {
1392 let to_append = append_indices.clone();
1393 emit(
1394 progress,
1395 OptimizeEvent::PhaseStart {
1396 table,
1397 phase: OptimizePhase::IndexAppend,
1398 detail: Some(append_indices.join(", ")),
1399 },
1400 );
1401 let started = Instant::now();
1402 dataset
1403 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1404 .await
1405 .context("optimize_indices(append) failed during index optimize")?;
1406 emit(
1407 progress,
1408 OptimizeEvent::PhaseDone {
1409 table,
1410 phase: OptimizePhase::IndexAppend,
1411 elapsed_ms: started.elapsed().as_millis() as u64,
1412 },
1413 );
1414 tracing::debug!(
1415 target: "pond::perf",
1416 indices = ?append_indices,
1417 "appended trailing fragments into indices",
1418 );
1419 did_work = true;
1420 }
1421
1422 Ok(did_work)
1423}
1424
1425async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1426 if !intent.trigger.should_create(dataset).await? {
1427 return Ok(());
1428 }
1429 let params = intent.params.build(dataset).await?;
1430 dataset
1431 .create_index(
1432 &[intent.column],
1433 intent.params.index_type(),
1434 Some(intent.name.to_owned()),
1435 params.as_ref(),
1436 true,
1437 )
1438 .await
1439 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1440 Ok(())
1441}
1442
1443async fn index_status(
1444 table: Table,
1445 dataset: &Dataset,
1446 intents: &[IndexIntent],
1447) -> Result<Vec<IndexStatus>> {
1448 let existing = dataset.load_indices().await?;
1449 let existing_names: std::collections::HashSet<String> =
1450 existing.iter().map(|index| index.name.clone()).collect();
1451 let total_fragments = dataset.get_fragments().len();
1452 let total_rows = dataset.count_rows(None).await?;
1453 let mut statuses = Vec::with_capacity(intents.len());
1454 for intent in intents {
1455 let exists = existing_names.contains(intent.name);
1456 if !exists {
1457 statuses.push(IndexStatus {
1458 table,
1459 intent_name: intent.name.to_owned(),
1460 fragments_covered: 0,
1461 unindexed_rows: total_rows,
1462 exists,
1463 });
1464 continue;
1465 }
1466 let unindexed = dataset
1467 .unindexed_fragments(intent.name)
1468 .await
1469 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1470 let unindexed_rows = unindexed
1471 .iter()
1472 .map(|fragment| fragment.num_rows().unwrap_or(0))
1473 .sum();
1474 statuses.push(IndexStatus {
1475 table,
1476 intent_name: intent.name.to_owned(),
1477 fragments_covered: total_fragments.saturating_sub(unindexed.len()),
1478 unindexed_rows,
1479 exists,
1480 });
1481 }
1482 Ok(statuses)
1483}
1484
1485async fn open_or_create_via_ns(
1497 nm: &Arc<dyn LanceNamespace>,
1498 nm_ident: &NamespaceIdent,
1499 table_name: &str,
1500 schema: lance::deps::arrow_schema::SchemaRef,
1501 session: &Arc<Session>,
1502 storage_options: &HashMap<String, String>,
1503) -> Result<Dataset> {
1504 let table_id = nm_ident.as_table_id(table_name);
1505
1506 let request = DescribeTableRequest {
1507 id: Some(table_id.clone()),
1508 ..Default::default()
1509 };
1510 match nm.describe_table(request).await {
1511 Ok(response) => {
1512 let location = response.location.with_context(|| {
1513 format!("namespace returned no location for table {table_name}")
1514 })?;
1515 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1516 if !storage_options.is_empty() {
1517 builder = builder.with_storage_options(storage_options.clone());
1518 }
1519 let dataset = builder
1520 .load()
1521 .await
1522 .with_context(|| format!("failed to open table {table_name}"))?;
1523 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1524 return Ok(dataset);
1525 }
1526 Err(error) => match &error {
1527 lance::Error::Namespace { source, .. }
1528 if matches!(
1529 source.downcast_ref::<NamespaceError>(),
1530 Some(NamespaceError::TableNotFound { .. })
1531 ) =>
1532 {
1533 }
1535 _ => {
1536 return Err(anyhow::Error::from(error))
1537 .with_context(|| format!("failed to describe table {table_name}"));
1538 }
1539 },
1540 }
1541
1542 let mut write_params = sessions::write_params_for_create();
1545 write_params.session = Some(session.clone());
1546 write_params.mode = WriteMode::Create;
1547 if !storage_options.is_empty() {
1548 write_params.store_params = Some(ObjectStoreParams {
1549 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1550 storage_options.clone(),
1551 ))),
1552 ..Default::default()
1553 });
1554 }
1555 let reader = sessions::empty_reader(schema)?;
1556 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1557 .await
1558 .with_context(|| format!("failed to create table {table_name}"))
1559}
1560
1561fn scanner_with_prefilter(
1562 dataset: &Dataset,
1563 predicate: Option<&Predicate>,
1564) -> Result<lance::dataset::scanner::Scanner> {
1565 let mut scanner = dataset.scan();
1566 scanner.prefilter(true);
1567 if let Some(predicate) = predicate {
1568 let filter = predicate.to_lance();
1569 if !filter.is_empty() {
1570 scanner.filter(&filter)?;
1571 }
1572 }
1573 Ok(scanner)
1574}
1575fn ensure_schema_matches(
1576 dataset: &Dataset,
1577 expected: &lance::deps::arrow_schema::Schema,
1578 table_name: &str,
1579) -> Result<()> {
1580 use lance::deps::arrow_schema::DataType;
1581 use std::collections::BTreeSet;
1582 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1583 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1584 let expected_names: BTreeSet<&str> = expected
1585 .fields()
1586 .iter()
1587 .map(|f| f.name().as_str())
1588 .collect();
1589 if actual_names != expected_names {
1590 anyhow::bail!(
1591 "table {table_name} has columns {actual_names:?} but this pond build expects \
1592 {expected_names:?} - the on-disk store predates a schema change; delete the \
1593 data directory and re-run `pond ingest`",
1594 );
1595 }
1596 for actual_field in actual.fields() {
1601 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1602 continue;
1603 };
1604 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1605 (actual_field.data_type(), expected_field.data_type())
1606 && actual_dim != expected_dim
1607 {
1608 tracing::warn!(
1609 table = table_name,
1610 column = actual_field.name(),
1611 actual_dim,
1612 expected_dim,
1613 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1614 );
1615 }
1616 }
1617 Ok(())
1618}
1619fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1626 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1627 if aliases
1628 .iter()
1629 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1630 {
1631 return;
1632 }
1633 options.insert(aliases[0].to_owned(), value.to_owned());
1634 }
1635 set_default(options, &["pool_idle_timeout"], "300 seconds");
1636 set_default(options, &["connect_timeout"], "10 seconds");
1637 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1638 .iter()
1639 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1640 if has_custom_endpoint {
1641 set_default(
1642 options,
1643 &["aws_unsigned_payload", "unsigned_payload"],
1644 "true",
1645 );
1646 }
1647}
1648
1649fn quoted_string(value: &str) -> String {
1650 format!("'{}'", value.replace('\'', "''"))
1651}
1652fn like_contains(value: &str) -> String {
1653 let escaped = value
1654 .replace('\\', "\\\\")
1655 .replace('%', "\\%")
1656 .replace('_', "\\_")
1657 .replace('\'', "''");
1658 format!("'%{escaped}%'")
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663 use super::*;
1664 use tempfile::TempDir;
1665
1666 #[tokio::test]
1670 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1671 let temp = TempDir::new()?;
1672 let url = Url::from_directory_path(temp.path())
1673 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1674 let handle = Handle::open(&url).await?;
1675 let cases: [(Table, &[&str]); 3] = [
1678 (Table::Sessions, &["id"]),
1679 (Table::Messages, &["id"]),
1680 (Table::Parts, &["id"]),
1681 ];
1682 for (table, projection) in cases {
1683 let scanner = handle
1684 .scan(table, ScanOpts::project_only(projection))
1685 .await?;
1686 let batch = scanner.try_into_batch().await?;
1687 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1688 }
1689 Ok(())
1690 }
1691}