1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30 collections::HashMap,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52pub fn init_index_lag_threshold(value: usize) {
55 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59 INDEX_LAG_THRESHOLD_RUNTIME
60 .get()
61 .copied()
62 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
69
70pub fn default_cleanup_older_than() -> chrono::Duration {
76 chrono::Duration::days(1)
77}
78
79#[derive(Debug, Clone, Copy)]
84pub struct MaintenancePolicy {
85 pub compaction_fragment_cap: usize,
88 pub cleanup_older_than: chrono::Duration,
90}
91
92impl MaintenancePolicy {
93 pub fn always_compact() -> Self {
96 Self {
97 compaction_fragment_cap: 0,
98 cleanup_older_than: default_cleanup_older_than(),
99 }
100 }
101}
102
103fn should_compact(
107 mergeable_run_rows: usize,
108 candidate_count: usize,
109 target_rows: usize,
110 cap: usize,
111) -> bool {
112 mergeable_run_rows >= target_rows || candidate_count >= cap
113}
114
115fn compaction_candidates(
120 physical_rows: impl IntoIterator<Item = usize>,
121 target: usize,
122) -> (usize, usize) {
123 let mut count = 0;
124 let mut run = 0;
125 let mut max_run = 0;
126 for rows in physical_rows {
127 if rows < target {
128 count += 1;
129 run += rows;
130 max_run = max_run.max(run);
131 } else {
132 run = 0;
133 }
134 }
135 (max_run, count)
136}
137
138#[derive(Debug, Clone)]
141pub struct IndexIntent {
142 pub name: &'static str,
145 pub column: &'static str,
147 pub trigger: IndexTrigger,
149 pub params: IndexParamsKind,
152}
153
154#[derive(Debug, Clone)]
156pub enum IndexTrigger {
157 OnAnyRows,
160 OnNonNullCount {
163 column: &'static str,
164 threshold: usize,
165 },
166}
167
168#[derive(Debug, Clone)]
171pub enum IndexParamsKind {
172 Scalar(BuiltinIndexType),
175 InvertedFtsNgram { min: u32, max: u32 },
179 IvfPqCosine {
184 sub_vectors: usize,
185 num_bits: u8,
186 max_iters: usize,
187 },
188}
189
190impl IndexTrigger {
191 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
192 match self {
193 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
194 Self::OnNonNullCount { column, threshold } => {
195 let count = dataset
196 .count_rows(Some(format!("{column} IS NOT NULL")))
197 .await?;
198 Ok(count >= *threshold)
199 }
200 }
201 }
202}
203
204impl IndexParamsKind {
205 fn index_type(&self) -> IndexType {
206 match self {
207 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
208 Self::Scalar(_) => IndexType::BTree,
209 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
210 Self::IvfPqCosine { .. } => IndexType::Vector,
211 }
212 }
213
214 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
215 match self {
216 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
217 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
218 InvertedIndexParams::default()
219 .base_tokenizer("ngram".to_owned())
220 .ngram_min_length(*min)
221 .ngram_max_length(*max)
222 .stem(false)
223 .remove_stop_words(false),
224 )),
225 Self::IvfPqCosine {
226 sub_vectors,
227 num_bits,
228 max_iters,
229 } => {
230 let count = dataset
231 .count_rows(Some("vector IS NOT NULL".to_owned()))
232 .await?;
233 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
234 Ok(Box::new(VectorIndexParams::ivf_pq(
235 partitions,
236 *num_bits,
237 *sub_vectors,
238 MetricType::Cosine,
239 *max_iters,
240 )))
241 }
242 }
243 }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct IndexStatus {
248 pub table: Table,
249 pub intent_name: String,
250 pub fragments_covered: usize,
251 pub unindexed_fragments: usize,
252 pub unindexed_rows: usize,
253 pub exists: bool,
254}
255
256#[derive(Debug, Clone, Copy)]
261pub struct ConflictExhausted {
262 pub attempts: u8,
263}
264
265impl std::fmt::Display for ConflictExhausted {
266 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(
268 formatter,
269 "commit conflict exhausted after {} attempt(s)",
270 self.attempts
271 )
272 }
273}
274
275impl std::error::Error for ConflictExhausted {}
276
277#[derive(Debug)]
282pub enum PhaseOutcome {
283 Ok,
285 Noop,
287 SkippedConflict,
290 Failed(anyhow::Error),
292 NotAttempted,
295}
296
297impl PhaseOutcome {
298 pub fn is_failed(&self) -> bool {
299 matches!(self, Self::Failed(_))
300 }
301}
302
303#[derive(Debug)]
305pub struct TableOptimizeOutcome {
306 pub table: Table,
307 pub indices: PhaseOutcome,
308 pub compaction: PhaseOutcome,
309}
310
311#[derive(Debug, Clone)]
314pub enum OptimizeEvent {
315 PhaseStart {
316 table: Table,
317 phase: OptimizePhase,
318 detail: Option<String>,
319 },
320 PhaseDone {
321 table: Table,
322 phase: OptimizePhase,
323 elapsed_ms: u64,
324 },
325}
326
327#[derive(Debug, Clone, Copy)]
328pub enum OptimizePhase {
329 Compact,
330 Cleanup,
331 IndexCreate,
332 IndexRebuild,
333 IndexAppend,
334}
335
336impl OptimizePhase {
337 pub fn label(self) -> &'static str {
338 match self {
339 Self::Compact => "compact",
340 Self::Cleanup => "cleanup",
341 Self::IndexCreate => "index-create",
342 Self::IndexRebuild => "index-rebuild",
343 Self::IndexAppend => "index-append",
344 }
345 }
346}
347
348pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
349
350fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
351 if let Some(callback) = progress {
352 callback(event);
353 }
354}
355
356pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
360 error.downcast_ref::<lance::Error>().is_some_and(|err| {
361 matches!(
362 err,
363 lance::Error::CommitConflict { .. }
364 | lance::Error::RetryableCommitConflict { .. }
365 | lance::Error::TooMuchWriteContention { .. }
366 )
367 })
368}
369
370fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
373 error.chain().any(|cause| cause.is::<ConflictExhausted>())
374}
375
376#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
380pub struct TableSizes {
381 pub sessions: u64,
382 pub messages: u64,
383 pub parts: u64,
384 pub other: u64,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub enum ScalarValue {
389 String(String),
390 Int32(i32),
391 Raw(String),
392}
393impl From<&str> for ScalarValue {
394 fn from(value: &str) -> Self {
395 Self::String(value.to_owned())
396 }
397}
398impl From<String> for ScalarValue {
399 fn from(value: String) -> Self {
400 Self::String(value)
401 }
402}
403impl From<i32> for ScalarValue {
404 fn from(value: i32) -> Self {
405 Self::Int32(value)
406 }
407}
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum Predicate {
410 Eq(&'static str, ScalarValue),
411 Ne(&'static str, ScalarValue),
412 IsNull(&'static str),
413 IsNotNull(&'static str),
414 In(&'static str, Vec<ScalarValue>),
415 LikeContains(&'static str, String),
416 Regex(&'static str, String),
421 Gte(&'static str, ScalarValue),
422 Lte(&'static str, ScalarValue),
423 And(Vec<Predicate>),
424 Or(Vec<Predicate>),
425 Not(Box<Predicate>),
426}
427impl Predicate {
428 pub fn to_lance(&self) -> String {
429 match self {
430 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
431 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
432 Self::IsNull(column) => format!("{column} IS NULL"),
433 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
434 Self::In(column, values) => {
435 let values = values
436 .iter()
437 .map(ScalarValue::to_lance)
438 .collect::<Vec<_>>()
439 .join(", ");
440 format!("{column} IN ({values})")
441 }
442 Self::LikeContains(column, value) => {
443 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
444 }
445 Self::Regex(column, pattern) => {
446 format!("regexp_like({column}, {})", quoted_string(pattern))
447 }
448 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
449 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
450 Self::And(predicates) => predicates
451 .iter()
452 .map(Self::to_lance)
453 .filter(|predicate| !predicate.is_empty())
454 .collect::<Vec<_>>()
455 .join(" AND "),
456 Self::Or(predicates) => {
457 let body = predicates
460 .iter()
461 .map(Self::to_lance)
462 .filter(|predicate| !predicate.is_empty())
463 .collect::<Vec<_>>()
464 .join(" OR ");
465 if body.is_empty() {
466 String::new()
467 } else {
468 format!("({body})")
469 }
470 }
471 Self::Not(inner) => {
472 let body = inner.to_lance();
473 if body.is_empty() {
474 String::new()
475 } else {
476 format!("NOT ({body})")
477 }
478 }
479 }
480 }
481}
482#[derive(Default)]
485pub struct ScanOpts<'a> {
486 pub predicate: Option<&'a Predicate>,
487 pub projection: Option<&'a [&'a str]>,
488}
489
490impl<'a> ScanOpts<'a> {
491 pub fn project_only(projection: &'a [&'a str]) -> Self {
492 Self {
493 predicate: None,
494 projection: Some(projection),
495 }
496 }
497 pub fn with_predicate_and_projection(
498 predicate: &'a Predicate,
499 projection: &'a [&'a str],
500 ) -> Self {
501 Self {
502 predicate: Some(predicate),
503 projection: Some(projection),
504 }
505 }
506}
507
508impl ScalarValue {
509 fn to_lance(&self) -> String {
510 match self {
511 Self::String(value) => quoted_string(value),
512 Self::Int32(value) => value.to_string(),
513 Self::Raw(value) => value.clone(),
514 }
515 }
516}
517#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
521pub struct RuntimeCaps {
522 pub index_cache_bytes: Option<usize>,
523 pub metadata_cache_bytes: Option<usize>,
524}
525
526impl RuntimeCaps {
527 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
528 Self {
529 index_cache_bytes: config.index_cache_bytes,
530 metadata_cache_bytes: config.metadata_cache_bytes,
531 }
532 }
533}
534
535const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
539const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
540const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
542const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
543
544fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
545 let (index_default, metadata_default) = if config::is_local(location) {
546 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
547 } else {
548 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
549 };
550 (
551 caps.index_cache_bytes.unwrap_or(index_default),
552 caps.metadata_cache_bytes.unwrap_or(metadata_default),
553 )
554}
555
556pub struct Handle {
557 datasets: DatasetSet,
558 retry: RetryPolicy,
559 #[allow(dead_code)]
567 session: Arc<Session>,
568 nm: Arc<dyn LanceNamespace>,
572 nm_ident: NamespaceIdent,
576 storage_options: HashMap<String, String>,
581 location: Url,
585 parts_refresh_after: Duration,
589}
590
591impl std::fmt::Debug for Handle {
592 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593 formatter
594 .debug_struct("Handle")
595 .field("datasets", &self.datasets)
596 .field("retry", &self.retry)
597 .field("nm_ident", &self.nm_ident)
598 .field("storage_options", &self.storage_options)
599 .field("location", &self.location)
600 .finish()
601 }
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub enum Table {
606 Sessions,
607 Messages,
608 Parts,
609}
610impl Table {
611 pub fn as_str(self) -> &'static str {
612 self.label()
613 }
614
615 fn label(self) -> &'static str {
616 match self {
617 Self::Sessions => "sessions",
618 Self::Messages => "messages",
619 Self::Parts => "parts",
620 }
621 }
622}
623#[derive(Debug)]
624struct DatasetSet {
625 sessions: Mutex<CachedDataset>,
626 messages: Mutex<CachedDataset>,
627 parts: OnceCell<Mutex<CachedDataset>>,
635}
636#[derive(Debug)]
637struct CachedDataset {
638 dataset: Dataset,
639 last_refresh: Instant,
640 refresh_after: Duration,
641}
642impl CachedDataset {
643 async fn latest(&mut self) -> Result<Dataset> {
644 if self.last_refresh.elapsed() >= self.refresh_after {
645 self.dataset.checkout_latest().await?;
646 self.last_refresh = Instant::now();
647 }
648 Ok(self.dataset.clone())
649 }
650 fn replace(&mut self, dataset: Dataset) {
651 self.dataset = dataset;
652 self.last_refresh = Instant::now();
653 }
654}
655impl Handle {
656 pub async fn open(location: &Url) -> Result<Self> {
659 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
660 }
661
662 pub async fn open_with_options(
668 location: &Url,
669 mut storage_options: HashMap<String, String>,
670 caps: RuntimeCaps,
671 ) -> Result<Self> {
672 if let Some(path) = config::local_path(location) {
673 tokio::fs::create_dir_all(&path)
674 .await
675 .with_context(|| format!("failed to create data dir {}", path.display()))?;
676 } else {
677 apply_remote_storage_defaults(&mut storage_options);
678 }
679 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
685 let session = Arc::new(Session::new(
686 index_cache_bytes,
687 metadata_cache_bytes,
688 Arc::new(ObjectStoreRegistry::default()),
689 ));
690 let root = location.as_str().trim_end_matches('/').to_string();
696 let mut connect = ConnectBuilder::new("dir")
697 .property("root", root)
698 .session(session.clone());
699 for (key, value) in &storage_options {
703 connect = connect.property(format!("storage.{key}"), value.clone());
704 }
705 let nm: Arc<dyn LanceNamespace> = connect
706 .connect()
707 .await
708 .context("failed to connect lance Directory namespace")?;
709 let nm_ident = NamespaceIdent::root();
710 let refresh_after = if config::is_local(location) {
716 Duration::ZERO
717 } else {
718 Duration::from_secs(5)
719 };
720 let handle = Self {
721 datasets: DatasetSet {
722 sessions: Mutex::new(CachedDataset {
723 dataset: open_or_create_via_ns(
724 &nm,
725 &nm_ident,
726 sessions::SESSIONS,
727 sessions::session_schema(),
728 &session,
729 &storage_options,
730 )
731 .await?,
732 last_refresh: Instant::now(),
733 refresh_after,
734 }),
735 messages: Mutex::new(CachedDataset {
736 dataset: open_or_create_via_ns(
737 &nm,
738 &nm_ident,
739 sessions::MESSAGES,
740 sessions::message_schema(),
741 &session,
742 &storage_options,
743 )
744 .await?,
745 last_refresh: Instant::now(),
746 refresh_after,
747 }),
748 parts: OnceCell::new(),
749 },
750 retry: RetryPolicy::default(),
751 session,
752 nm,
753 nm_ident,
754 storage_options,
755 location: location.clone(),
756 parts_refresh_after: refresh_after,
757 };
758 Ok(handle)
759 }
760
761 pub fn location(&self) -> &Url {
762 &self.location
763 }
764
765 pub fn storage_options(&self) -> &HashMap<String, String> {
769 &self.storage_options
770 }
771
772 fn export_uri(&self, name: &str) -> String {
778 format!(
779 "{}/exports/{name}",
780 self.location.as_str().trim_end_matches('/')
781 )
782 }
783
784 fn object_store_params(&self) -> ObjectStoreParams {
788 ObjectStoreParams {
789 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
790 Arc::new(StorageOptionsAccessor::with_static_options(
791 self.storage_options.clone(),
792 ))
793 }),
794 ..Default::default()
795 }
796 }
797
798 pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
801 let uri = self.export_uri(name);
802 let registry = Arc::new(ObjectStoreRegistry::default());
803 let (store, path) =
804 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
805 .await
806 .with_context(|| format!("failed to open object store for {uri}"))?;
807 store
808 .put(&path, bytes)
809 .await
810 .with_context(|| format!("failed to write export {uri}"))?;
811 Ok(())
812 }
813
814 pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
817 let uri = self.export_uri(name);
818 let registry = Arc::new(ObjectStoreRegistry::default());
819 let (store, path) =
820 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
821 .await
822 .with_context(|| format!("failed to open object store for {uri}"))?;
823 let bytes = store
824 .read_one_all(&path)
825 .await
826 .with_context(|| format!("failed to read export {uri}"))?;
827 Ok(bytes.to_vec())
828 }
829
830 pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
835 if self.location.scheme() != "file" {
836 return None;
837 }
838 let dir = self.location.to_file_path().ok()?;
839 Some(dir.join("exports").join(name))
840 }
841
842 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
843 Ok((
844 self.count_rows(Table::Sessions).await?,
845 self.count_rows(Table::Messages).await?,
846 self.count_rows(Table::Parts).await?,
847 ))
848 }
849
850 pub(crate) async fn merge_insert(
854 &self,
855 table: Table,
856 batch: RecordBatch,
857 row_count: usize,
858 ) -> Result<u64> {
859 self.merge(
860 table,
861 batch,
862 row_count,
863 "merge_insert",
864 WhenMatched::DoNothing,
865 WhenNotMatched::InsertAll,
866 )
867 .await
868 }
869
870 pub(crate) async fn merge_update(
873 &self,
874 table: Table,
875 batch: RecordBatch,
876 row_count: usize,
877 ) -> Result<u64> {
878 self.merge(
879 table,
880 batch,
881 row_count,
882 "merge_update",
883 WhenMatched::UpdateAll,
884 WhenNotMatched::DoNothing,
885 )
886 .await
887 }
888
889 async fn merge(
893 &self,
894 table: Table,
895 batch: RecordBatch,
896 row_count: usize,
897 op: &'static str,
898 when_matched: WhenMatched,
899 when_not_matched: WhenNotMatched,
900 ) -> Result<u64> {
901 if row_count == 0 {
902 return Ok(0);
903 }
904 let started = Instant::now();
905 let result = self
906 .retry_lance(table.label(), || async {
907 let mut cached = self.cached(table).await?.lock().await;
908 let existing = cached.latest().await?;
909 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
910 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
911 builder.when_matched(when_matched.clone());
912 builder.when_not_matched(when_not_matched.clone());
913 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
916 builder.skip_auto_cleanup(true);
920 let (dataset, stats) = builder
921 .try_build()?
922 .execute_reader(Box::new(reader))
923 .await?;
924 cached.replace(dataset.as_ref().clone());
925 Ok((
926 stats.num_inserted_rows + stats.num_updated_rows,
927 stats.num_skipped_duplicates,
928 ))
929 })
930 .await;
931 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
932 tracing::info!(
933 target: "pond::perf",
934 op,
935 table = %table.label(),
936 rows = row_count,
937 elapsed_ms = started.elapsed().as_millis() as u64,
938 skipped,
939 "merge",
940 );
941 result.map(|(affected, _)| affected)
942 }
943
944 pub async fn optimize_table(
953 &self,
954 table: Table,
955 intents: &[IndexIntent],
956 progress: Option<&OptimizeProgressFn>,
957 policy: &MaintenancePolicy,
958 ) -> TableOptimizeOutcome {
959 let compaction = self
960 .run_optimize_compact_phase(table, progress, policy)
961 .await;
962 let indices = self
963 .run_optimize_indices_phase(table, intents, progress)
964 .await;
965 TableOptimizeOutcome {
966 table,
967 indices,
968 compaction,
969 }
970 }
971
972 pub async fn optimize_table_indices_only(
976 &self,
977 table: Table,
978 intents: &[IndexIntent],
979 progress: Option<&OptimizeProgressFn>,
980 ) -> PhaseOutcome {
981 self.run_optimize_indices_phase(table, intents, progress)
982 .await
983 }
984
985 async fn run_optimize_indices_phase(
986 &self,
987 table: Table,
988 intents: &[IndexIntent],
989 progress: Option<&OptimizeProgressFn>,
990 ) -> PhaseOutcome {
991 if intents.is_empty() {
992 return PhaseOutcome::Noop;
993 }
994 let result = self
995 .retry_lance(table.label(), || async {
996 let mut guard = self.cached(table).await?.lock().await;
997 let mut dataset = guard.latest().await?;
998 let did_work =
999 optimize_table_indices(&mut dataset, intents, table, progress).await?;
1000 guard.replace(dataset);
1001 Ok::<_, anyhow::Error>(did_work)
1002 })
1003 .await;
1004 match result {
1005 Ok(true) => PhaseOutcome::Ok,
1006 Ok(false) => PhaseOutcome::Noop,
1007 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1008 Err(error) => PhaseOutcome::Failed(error),
1009 }
1010 }
1011
1012 async fn run_optimize_compact_phase(
1013 &self,
1014 table: Table,
1015 progress: Option<&OptimizeProgressFn>,
1016 policy: &MaintenancePolicy,
1017 ) -> PhaseOutcome {
1018 let result = self
1019 .retry_lance(table.label(), || async {
1020 let mut guard = self.cached(table).await?.lock().await;
1021 let mut dataset = guard.latest().await?;
1022 optimize_table_compact(&mut dataset, table, progress, policy).await?;
1023 guard.replace(dataset);
1024 Ok::<_, anyhow::Error>(())
1025 })
1026 .await;
1027 match result {
1028 Ok(()) => PhaseOutcome::Ok,
1029 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1030 Err(error) => PhaseOutcome::Failed(error),
1031 }
1032 }
1033
1034 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1035 self.retry_lance(table.label(), || async {
1036 let mut guard = self.cached(table).await?.lock().await;
1037 let mut dataset = guard.latest().await?;
1038 rebuild_index(&mut dataset, intent).await?;
1039 guard.replace(dataset);
1040 Ok(())
1041 })
1042 .await
1043 }
1044
1045 pub async fn index_status(
1046 &self,
1047 table: Table,
1048 intents: &[IndexIntent],
1049 ) -> Result<Vec<IndexStatus>> {
1050 let dataset = self.dataset(table).await?;
1051 index_status(table, &dataset, intents).await
1052 }
1053
1054 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1055 let mut cached = self.cached(table).await?.lock().await;
1056 cached.latest().await
1057 }
1058 pub(crate) async fn scanner(
1063 &self,
1064 table: Table,
1065 predicate: Option<&Predicate>,
1066 ) -> Result<lance::dataset::scanner::Scanner> {
1067 let dataset = self.dataset(table).await?;
1068 scanner_with_prefilter(&dataset, predicate)
1069 }
1070 pub async fn scan(
1073 &self,
1074 table: Table,
1075 opts: ScanOpts<'_>,
1076 ) -> Result<lance::dataset::scanner::Scanner> {
1077 let mut scanner = self.scanner(table, opts.predicate).await?;
1078 if let Some(projection) = opts.projection {
1079 scanner.project(projection)?;
1080 }
1081 Ok(scanner)
1082 }
1083 pub(crate) async fn scan_batch(
1084 &self,
1085 table: Table,
1086 predicate: Option<&Predicate>,
1087 projection: &[&str],
1088 ) -> Result<RecordBatch> {
1089 let opts = ScanOpts {
1090 predicate,
1091 projection: (!projection.is_empty()).then_some(projection),
1092 };
1093 self.scan(table, opts)
1094 .await?
1095 .try_into_batch()
1096 .await
1097 .context("scan failed")
1098 }
1099 pub async fn count_rows(&self, table: Table) -> Result<usize> {
1100 self.dataset(table)
1101 .await?
1102 .count_rows(None)
1103 .await
1104 .map_err(Into::into)
1105 }
1106 #[cfg(test)]
1108 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1109 let dataset = self.dataset(Table::Messages).await?;
1110 let indices = dataset.load_indices().await?;
1111 Ok(indices.iter().map(|index| index.name.clone()).collect())
1112 }
1113
1114 pub(crate) async fn unindexed_row_count(
1117 &self,
1118 table: Table,
1119 index_name: &str,
1120 ) -> Result<usize> {
1121 let dataset = self.dataset(table).await?;
1122 let fragments = dataset
1123 .unindexed_fragments(index_name)
1124 .await
1125 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1126 Ok(fragments
1127 .iter()
1128 .map(|fragment| fragment.num_rows().unwrap_or(0))
1129 .sum())
1130 }
1131
1132 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1138 let mut guard = self.cached(table).await?.lock().await;
1139 let mut dataset = guard.latest().await?;
1140 dataset
1141 .drop_index(name)
1142 .await
1143 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1144 guard.replace(dataset);
1145 Ok(())
1146 }
1147
1148 async fn table_location(&self, table_name: &str) -> Result<String> {
1151 let request = DescribeTableRequest {
1152 id: Some(self.nm_ident.as_table_id(table_name)),
1153 ..Default::default()
1154 };
1155 let response = self
1156 .nm
1157 .describe_table(request)
1158 .await
1159 .with_context(|| format!("failed to describe table {table_name}"))?;
1160 response
1161 .location
1162 .with_context(|| format!("namespace returned no location for table {table_name}"))
1163 }
1164
1165 pub async fn table_sizes(&self) -> Result<TableSizes> {
1169 let registry = Arc::new(ObjectStoreRegistry::default());
1170 let params = self.object_store_params();
1171
1172 let sessions = self
1173 .listed_size(
1174 ®istry,
1175 ¶ms,
1176 &self.table_location(sessions::SESSIONS).await?,
1177 )
1178 .await?;
1179 let messages = self
1180 .listed_size(
1181 ®istry,
1182 ¶ms,
1183 &self.table_location(sessions::MESSAGES).await?,
1184 )
1185 .await?;
1186 let parts = self
1187 .listed_size(
1188 ®istry,
1189 ¶ms,
1190 &self.table_location(sessions::PARTS).await?,
1191 )
1192 .await?;
1193 let root_total = self
1196 .listed_size(®istry, ¶ms, self.location.as_str())
1197 .await?;
1198 let other = root_total.saturating_sub(sessions + messages + parts);
1199 Ok(TableSizes {
1200 sessions,
1201 messages,
1202 parts,
1203 other,
1204 })
1205 }
1206
1207 async fn listed_size(
1209 &self,
1210 registry: &Arc<ObjectStoreRegistry>,
1211 params: &ObjectStoreParams,
1212 uri: &str,
1213 ) -> Result<u64> {
1214 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1215 .await
1216 .with_context(|| format!("failed to open object store for {uri}"))?;
1217 let mut listing = store.list(Some(base));
1218 let mut total = 0u64;
1219 while let Some(meta) = listing.next().await {
1220 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1221 total += meta.size;
1222 }
1223 Ok(total)
1224 }
1225 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1226 match table {
1227 Table::Sessions => Ok(&self.datasets.sessions),
1228 Table::Messages => Ok(&self.datasets.messages),
1229 Table::Parts => self.parts_cached().await,
1230 }
1231 }
1232
1233 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1236 self.datasets
1237 .parts
1238 .get_or_try_init(|| async {
1239 let dataset = open_or_create_via_ns(
1240 &self.nm,
1241 &self.nm_ident,
1242 sessions::PARTS,
1243 sessions::part_schema(),
1244 &self.session,
1245 &self.storage_options,
1246 )
1247 .await?;
1248 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1249 dataset,
1250 last_refresh: Instant::now(),
1251 refresh_after: self.parts_refresh_after,
1252 }))
1253 })
1254 .await
1255 }
1256 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1257 where
1258 Fut: std::future::Future<Output = Result<T>>,
1259 Op: FnMut() -> Fut,
1260 {
1261 let mut attempt = 0u8;
1262 loop {
1263 attempt = attempt.saturating_add(1);
1264 match operation().await {
1265 Ok(value) => return Ok(value),
1266 Err(error) if attempt < self.retry.attempts => {
1267 let backoff = self.backoff(attempt);
1268 let error_chain = format!("{error:#}");
1271 tracing::warn!(
1272 label,
1273 attempt,
1274 ?backoff,
1275 error = %error_chain,
1276 "retrying Lance operation"
1277 );
1278 tokio::time::sleep(backoff).await;
1279 }
1280 Err(error) => {
1281 let error_chain = format!("{error:#}");
1282 tracing::warn!(
1283 label,
1284 attempt,
1285 error = %error_chain,
1286 "Lance operation exhausted retries"
1287 );
1288 if is_commit_conflict(&error) {
1295 return Err(error.context(ConflictExhausted { attempts: attempt }));
1296 }
1297 return Err(error);
1298 }
1299 }
1300 }
1301 }
1302 fn backoff(&self, attempt: u8) -> Duration {
1303 let shift = u32::from(attempt.saturating_sub(1));
1304 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1305 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1306 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1309 base.mul_f64(factor).min(self.retry.max_backoff)
1310 }
1311}
1312async fn optimize_table_compact(
1327 dataset: &mut Dataset,
1328 table: Table,
1329 progress: Option<&OptimizeProgressFn>,
1330 policy: &MaintenancePolicy,
1331) -> Result<()> {
1332 let compaction = CompactionOptions {
1333 defer_index_remap: false,
1334 ..CompactionOptions::default()
1335 };
1336
1337 let target = compaction.target_rows_per_fragment;
1340 let fragments = dataset.get_fragments();
1341 let (mergeable_run_rows, candidate_count) = compaction_candidates(
1342 fragments
1343 .iter()
1344 .map(|fragment| fragment.metadata().physical_rows.unwrap_or(0)),
1345 target,
1346 );
1347 if should_compact(
1348 mergeable_run_rows,
1349 candidate_count,
1350 target,
1351 policy.compaction_fragment_cap,
1352 ) {
1353 emit(
1354 progress,
1355 OptimizeEvent::PhaseStart {
1356 table,
1357 phase: OptimizePhase::Compact,
1358 detail: None,
1359 },
1360 );
1361 let started = Instant::now();
1362 compact_files(dataset, compaction, None).await?;
1363 emit(
1364 progress,
1365 OptimizeEvent::PhaseDone {
1366 table,
1367 phase: OptimizePhase::Compact,
1368 elapsed_ms: started.elapsed().as_millis() as u64,
1369 },
1370 );
1371 } else {
1372 tracing::debug!(
1373 target: "pond::perf",
1374 table = table.as_str(),
1375 mergeable_run_rows,
1376 candidate_count,
1377 cap = policy.compaction_fragment_cap,
1378 "compaction skipped: sub-target fragments under threshold",
1379 );
1380 }
1381
1382 emit(
1386 progress,
1387 OptimizeEvent::PhaseStart {
1388 table,
1389 phase: OptimizePhase::Cleanup,
1390 detail: None,
1391 },
1392 );
1393 let started = Instant::now();
1394 dataset
1395 .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1396 .await
1397 .context("cleanup_old_versions failed during index optimize")?;
1398 emit(
1399 progress,
1400 OptimizeEvent::PhaseDone {
1401 table,
1402 phase: OptimizePhase::Cleanup,
1403 elapsed_ms: started.elapsed().as_millis() as u64,
1404 },
1405 );
1406
1407 Ok(())
1408}
1409
1410async fn optimize_table_indices(
1413 dataset: &mut Dataset,
1414 intents: &[IndexIntent],
1415 table: Table,
1416 progress: Option<&OptimizeProgressFn>,
1417) -> Result<bool> {
1418 let existing = dataset.load_indices().await?;
1419 let existing_names: std::collections::HashSet<String> =
1420 existing.iter().map(|index| index.name.clone()).collect();
1421
1422 let mut append_indices: Vec<String> = Vec::new();
1423 let mut did_work = false;
1424
1425 for intent in intents {
1426 let exists = existing_names.contains(intent.name);
1427
1428 if !exists {
1429 if !intent.trigger.should_create(dataset).await? {
1430 continue;
1431 }
1432 let params = intent.params.build(dataset).await?;
1433 let index_type = intent.params.index_type();
1434 tracing::info!(
1435 index = intent.name,
1436 column = intent.column,
1437 "creating Lance index (trigger fired)",
1438 );
1439 emit(
1440 progress,
1441 OptimizeEvent::PhaseStart {
1442 table,
1443 phase: OptimizePhase::IndexCreate,
1444 detail: Some(intent.name.to_owned()),
1445 },
1446 );
1447 let started = Instant::now();
1448 dataset
1449 .create_index(
1450 &[intent.column],
1451 index_type,
1452 Some(intent.name.to_owned()),
1453 params.as_ref(),
1454 false,
1455 )
1456 .await
1457 .with_context(|| format!("failed to create index {}", intent.name))?;
1458 emit(
1459 progress,
1460 OptimizeEvent::PhaseDone {
1461 table,
1462 phase: OptimizePhase::IndexCreate,
1463 elapsed_ms: started.elapsed().as_millis() as u64,
1464 },
1465 );
1466 did_work = true;
1467 continue;
1468 }
1469
1470 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1471 if unindexed.is_empty() {
1472 continue;
1473 }
1474 if unindexed.len() < index_lag_threshold() {
1478 continue;
1479 }
1480 match intent.params {
1481 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1482 let params = intent.params.build(dataset).await?;
1483 let index_type = intent.params.index_type();
1484 tracing::debug!(
1485 target: "pond::perf",
1486 index = intent.name,
1487 column = intent.column,
1488 "rebuilding Lance BTree index",
1489 );
1490 emit(
1491 progress,
1492 OptimizeEvent::PhaseStart {
1493 table,
1494 phase: OptimizePhase::IndexRebuild,
1495 detail: Some(intent.name.to_owned()),
1496 },
1497 );
1498 let started = Instant::now();
1499 dataset
1500 .create_index(
1501 &[intent.column],
1502 index_type,
1503 Some(intent.name.to_owned()),
1504 params.as_ref(),
1505 true,
1506 )
1507 .await
1508 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1509 emit(
1510 progress,
1511 OptimizeEvent::PhaseDone {
1512 table,
1513 phase: OptimizePhase::IndexRebuild,
1514 elapsed_ms: started.elapsed().as_millis() as u64,
1515 },
1516 );
1517 did_work = true;
1518 }
1519 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1520 | IndexParamsKind::InvertedFtsNgram { .. }
1521 | IndexParamsKind::IvfPqCosine { .. } => {
1522 append_indices.push(intent.name.to_owned());
1523 }
1524 IndexParamsKind::Scalar(_) => {
1525 let params = intent.params.build(dataset).await?;
1526 emit(
1527 progress,
1528 OptimizeEvent::PhaseStart {
1529 table,
1530 phase: OptimizePhase::IndexRebuild,
1531 detail: Some(intent.name.to_owned()),
1532 },
1533 );
1534 let started = Instant::now();
1535 dataset
1536 .create_index(
1537 &[intent.column],
1538 intent.params.index_type(),
1539 Some(intent.name.to_owned()),
1540 params.as_ref(),
1541 true,
1542 )
1543 .await
1544 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1545 emit(
1546 progress,
1547 OptimizeEvent::PhaseDone {
1548 table,
1549 phase: OptimizePhase::IndexRebuild,
1550 elapsed_ms: started.elapsed().as_millis() as u64,
1551 },
1552 );
1553 did_work = true;
1554 }
1555 }
1556 }
1557
1558 if !append_indices.is_empty() {
1559 let to_append = append_indices.clone();
1560 emit(
1561 progress,
1562 OptimizeEvent::PhaseStart {
1563 table,
1564 phase: OptimizePhase::IndexAppend,
1565 detail: Some(append_indices.join(", ")),
1566 },
1567 );
1568 let started = Instant::now();
1569 dataset
1570 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1571 .await
1572 .context("optimize_indices(append) failed during index optimize")?;
1573 emit(
1574 progress,
1575 OptimizeEvent::PhaseDone {
1576 table,
1577 phase: OptimizePhase::IndexAppend,
1578 elapsed_ms: started.elapsed().as_millis() as u64,
1579 },
1580 );
1581 tracing::debug!(
1582 target: "pond::perf",
1583 indices = ?append_indices,
1584 "appended trailing fragments into indices",
1585 );
1586 did_work = true;
1587 }
1588
1589 Ok(did_work)
1590}
1591
1592async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1593 if !intent.trigger.should_create(dataset).await? {
1594 return Ok(());
1595 }
1596 let params = intent.params.build(dataset).await?;
1597 dataset
1598 .create_index(
1599 &[intent.column],
1600 intent.params.index_type(),
1601 Some(intent.name.to_owned()),
1602 params.as_ref(),
1603 true,
1604 )
1605 .await
1606 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1607 Ok(())
1608}
1609
1610async fn index_status(
1611 table: Table,
1612 dataset: &Dataset,
1613 intents: &[IndexIntent],
1614) -> Result<Vec<IndexStatus>> {
1615 let existing = dataset.load_indices().await?;
1616 let existing_names: std::collections::HashSet<String> =
1617 existing.iter().map(|index| index.name.clone()).collect();
1618 let total_fragments = dataset.get_fragments().len();
1619 let total_rows = dataset.count_rows(None).await?;
1620 let mut statuses = Vec::with_capacity(intents.len());
1621 for intent in intents {
1622 let exists = existing_names.contains(intent.name);
1623 if !exists {
1624 statuses.push(IndexStatus {
1625 table,
1626 intent_name: intent.name.to_owned(),
1627 fragments_covered: 0,
1628 unindexed_fragments: total_fragments,
1629 unindexed_rows: total_rows,
1630 exists,
1631 });
1632 continue;
1633 }
1634 let unindexed = dataset
1635 .unindexed_fragments(intent.name)
1636 .await
1637 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1638 let unindexed_fragments = unindexed.len();
1639 let unindexed_rows = unindexed
1640 .iter()
1641 .map(|fragment| fragment.num_rows().unwrap_or(0))
1642 .sum();
1643 statuses.push(IndexStatus {
1644 table,
1645 intent_name: intent.name.to_owned(),
1646 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1647 unindexed_fragments,
1648 unindexed_rows,
1649 exists,
1650 });
1651 }
1652 Ok(statuses)
1653}
1654
1655async fn open_or_create_via_ns(
1667 nm: &Arc<dyn LanceNamespace>,
1668 nm_ident: &NamespaceIdent,
1669 table_name: &str,
1670 schema: lance::deps::arrow_schema::SchemaRef,
1671 session: &Arc<Session>,
1672 storage_options: &HashMap<String, String>,
1673) -> Result<Dataset> {
1674 let table_id = nm_ident.as_table_id(table_name);
1675
1676 let request = DescribeTableRequest {
1677 id: Some(table_id.clone()),
1678 ..Default::default()
1679 };
1680 match nm.describe_table(request).await {
1681 Ok(response) => {
1682 let location = response.location.with_context(|| {
1683 format!("namespace returned no location for table {table_name}")
1684 })?;
1685 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1686 if !storage_options.is_empty() {
1687 builder = builder.with_storage_options(storage_options.clone());
1688 }
1689 let dataset = builder
1690 .load()
1691 .await
1692 .with_context(|| format!("failed to open table {table_name}"))?;
1693 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1694 return Ok(dataset);
1695 }
1696 Err(error) => match &error {
1697 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1698 }
1700 _ => {
1701 return Err(anyhow::Error::from(error))
1702 .with_context(|| format!("failed to describe table {table_name}"));
1703 }
1704 },
1705 }
1706
1707 let mut write_params = sessions::write_params_for_create();
1710 write_params.session = Some(session.clone());
1711 write_params.mode = WriteMode::Create;
1712 if !storage_options.is_empty() {
1713 write_params.store_params = Some(ObjectStoreParams {
1714 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1715 storage_options.clone(),
1716 ))),
1717 ..Default::default()
1718 });
1719 }
1720 let reader = sessions::empty_reader(schema)?;
1721 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1722 .await
1723 .with_context(|| format!("failed to create table {table_name}"))
1724}
1725
1726fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1730 if !matches!(error, lance::Error::Namespace { .. }) {
1731 return false;
1732 }
1733 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1734 link.source()
1735 })
1736 .filter_map(|link| link.downcast_ref::<NamespaceError>())
1737 .any(|inner| inner.code() == code)
1738}
1739
1740fn scanner_with_prefilter(
1741 dataset: &Dataset,
1742 predicate: Option<&Predicate>,
1743) -> Result<lance::dataset::scanner::Scanner> {
1744 let mut scanner = dataset.scan();
1745 scanner.prefilter(true);
1746 if let Some(predicate) = predicate {
1747 let filter = predicate.to_lance();
1748 if !filter.is_empty() {
1749 scanner.filter(&filter)?;
1750 }
1751 }
1752 Ok(scanner)
1753}
1754fn ensure_schema_matches(
1755 dataset: &Dataset,
1756 expected: &lance::deps::arrow_schema::Schema,
1757 table_name: &str,
1758) -> Result<()> {
1759 use lance::deps::arrow_schema::DataType;
1760 use std::collections::BTreeSet;
1761 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1762 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1763 let expected_names: BTreeSet<&str> = expected
1764 .fields()
1765 .iter()
1766 .map(|f| f.name().as_str())
1767 .collect();
1768 if actual_names != expected_names {
1769 anyhow::bail!(
1770 "table {table_name} has columns {actual_names:?} but this pond build expects \
1771 {expected_names:?} - the on-disk store predates a schema change; delete the \
1772 data directory and re-run `pond ingest`",
1773 );
1774 }
1775 for actual_field in actual.fields() {
1780 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1781 continue;
1782 };
1783 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1784 (actual_field.data_type(), expected_field.data_type())
1785 && actual_dim != expected_dim
1786 {
1787 tracing::warn!(
1788 table = table_name,
1789 column = actual_field.name(),
1790 actual_dim,
1791 expected_dim,
1792 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1793 );
1794 }
1795 }
1796 Ok(())
1797}
1798fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1805 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1806 if aliases
1807 .iter()
1808 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1809 {
1810 return;
1811 }
1812 options.insert(aliases[0].to_owned(), value.to_owned());
1813 }
1814 set_default(options, &["pool_idle_timeout"], "300 seconds");
1815 set_default(options, &["connect_timeout"], "10 seconds");
1816 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1817 .iter()
1818 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1819 if has_custom_endpoint {
1820 set_default(
1821 options,
1822 &["aws_unsigned_payload", "unsigned_payload"],
1823 "true",
1824 );
1825 }
1826}
1827
1828fn quoted_string(value: &str) -> String {
1829 format!("'{}'", value.replace('\'', "''"))
1830}
1831fn like_contains(value: &str) -> String {
1832 let escaped = value
1833 .replace('\\', "\\\\")
1834 .replace('%', "\\%")
1835 .replace('_', "\\_")
1836 .replace('\'', "''");
1837 format!("'%{escaped}%'")
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842 use super::*;
1843 use tempfile::TempDir;
1844
1845 #[test]
1846 fn compaction_gate_skips_subtarget_trickle_compacts_on_progress() {
1847 let target = 1_048_576;
1848 assert!(!should_compact(510_000 + 30, 2, target, 64));
1851 assert!(should_compact(target, 3, target, 64));
1853 assert!(should_compact(5_000, 64, target, 64));
1855 assert!(should_compact(0, 0, target, 0));
1857 }
1858
1859 #[test]
1860 fn compaction_candidates_strands_isolated_subtarget_fragment() {
1861 let target = 1_048_576;
1862 let (run, count) =
1866 compaction_candidates([1_048_576, 256_000, 1_048_576, 510_000, 30], target);
1867 assert_eq!(count, 3);
1868 assert_eq!(run, 510_030);
1869 assert!(!should_compact(run, count, target, 64));
1870 }
1871
1872 #[test]
1873 fn namespace_error_code_walks_wrapped_chain() {
1874 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1875 message: "missing".into(),
1876 }));
1877 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1878
1879 let wrapped = lance::Error::namespace_source(Box::new(direct));
1880 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1881
1882 let other_code =
1883 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1884 message: "nope".into(),
1885 }));
1886 assert!(!is_namespace_error_code(
1887 &other_code,
1888 ErrorCode::TableNotFound
1889 ));
1890
1891 let not_namespace = lance::Error::internal("unrelated");
1892 assert!(!is_namespace_error_code(
1893 ¬_namespace,
1894 ErrorCode::TableNotFound
1895 ));
1896 }
1897
1898 #[tokio::test]
1902 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1903 let temp = TempDir::new()?;
1904 let url = Url::from_directory_path(temp.path())
1905 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1906 let handle = Handle::open(&url).await?;
1907 let cases: [(Table, &[&str]); 3] = [
1910 (Table::Sessions, &["id"]),
1911 (Table::Messages, &["id"]),
1912 (Table::Parts, &["id"]),
1913 ];
1914 for (table, projection) in cases {
1915 let scanner = handle
1916 .scan(table, ScanOpts::project_only(projection))
1917 .await?;
1918 let batch = scanner.try_into_batch().await?;
1919 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1920 }
1921 Ok(())
1922 }
1923}