1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30 collections::HashMap,
31 sync::Arc,
32 time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52pub fn init_index_lag_threshold(value: usize) {
55 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59 INDEX_LAG_THRESHOLD_RUNTIME
60 .get()
61 .copied()
62 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
69
70pub fn default_cleanup_older_than() -> chrono::Duration {
76 chrono::Duration::days(1)
77}
78
79#[derive(Debug, Clone, Copy)]
84pub struct MaintenancePolicy {
85 pub compaction_fragment_cap: usize,
88 pub cleanup_older_than: chrono::Duration,
90}
91
92impl MaintenancePolicy {
93 pub fn always_compact() -> Self {
96 Self {
97 compaction_fragment_cap: 0,
98 cleanup_older_than: default_cleanup_older_than(),
99 }
100 }
101}
102
103fn should_compact(
107 mergeable_run_rows: usize,
108 candidate_count: usize,
109 target_rows: usize,
110 cap: usize,
111) -> bool {
112 mergeable_run_rows >= target_rows || candidate_count >= cap
113}
114
115fn compaction_candidates(
120 physical_rows: impl IntoIterator<Item = usize>,
121 target: usize,
122) -> (usize, usize) {
123 let mut count = 0;
124 let mut run = 0;
125 let mut max_run = 0;
126 for rows in physical_rows {
127 if rows < target {
128 count += 1;
129 run += rows;
130 max_run = max_run.max(run);
131 } else {
132 run = 0;
133 }
134 }
135 (max_run, count)
136}
137
138#[derive(Debug, Clone)]
141pub struct IndexIntent {
142 pub name: &'static str,
145 pub column: &'static str,
147 pub trigger: IndexTrigger,
149 pub params: IndexParamsKind,
152}
153
154#[derive(Debug, Clone)]
156pub enum IndexTrigger {
157 OnAnyRows,
160 OnNonNullCount {
163 column: &'static str,
164 threshold: usize,
165 },
166}
167
168#[derive(Debug, Clone)]
171pub enum IndexParamsKind {
172 Scalar(BuiltinIndexType),
175 InvertedFtsNgram { min: u32, max: u32 },
179 IvfPqCosine {
184 sub_vectors: usize,
185 num_bits: u8,
186 max_iters: usize,
187 },
188}
189
190impl IndexTrigger {
191 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
192 match self {
193 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
194 Self::OnNonNullCount { column, threshold } => {
195 let count = dataset
196 .count_rows(Some(format!("{column} IS NOT NULL")))
197 .await?;
198 Ok(count >= *threshold)
199 }
200 }
201 }
202}
203
204impl IndexParamsKind {
205 fn index_type(&self) -> IndexType {
206 match self {
207 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
208 Self::Scalar(_) => IndexType::BTree,
209 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
210 Self::IvfPqCosine { .. } => IndexType::Vector,
211 }
212 }
213
214 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
215 match self {
216 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
217 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
218 InvertedIndexParams::default()
219 .base_tokenizer("ngram".to_owned())
220 .ngram_min_length(*min)
221 .ngram_max_length(*max)
222 .stem(false)
223 .remove_stop_words(false),
224 )),
225 Self::IvfPqCosine {
226 sub_vectors,
227 num_bits,
228 max_iters,
229 } => {
230 let count = dataset
231 .count_rows(Some("vector IS NOT NULL".to_owned()))
232 .await?;
233 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
234 Ok(Box::new(VectorIndexParams::ivf_pq(
235 partitions,
236 *num_bits,
237 *sub_vectors,
238 MetricType::Cosine,
239 *max_iters,
240 )))
241 }
242 }
243 }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct IndexStatus {
248 pub table: Table,
249 pub intent_name: String,
250 pub fragments_covered: usize,
251 pub unindexed_fragments: usize,
252 pub unindexed_rows: usize,
253 pub exists: bool,
254}
255
256#[derive(Debug, Clone, Copy)]
261pub struct ConflictExhausted {
262 pub attempts: u8,
263}
264
265impl std::fmt::Display for ConflictExhausted {
266 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 write!(
268 formatter,
269 "commit conflict exhausted after {} attempt(s)",
270 self.attempts
271 )
272 }
273}
274
275impl std::error::Error for ConflictExhausted {}
276
277#[derive(Debug)]
282pub enum PhaseOutcome {
283 Ok,
285 Noop,
287 SkippedConflict,
290 Failed(anyhow::Error),
292 NotAttempted,
295}
296
297impl PhaseOutcome {
298 pub fn is_failed(&self) -> bool {
299 matches!(self, Self::Failed(_))
300 }
301}
302
303#[derive(Debug)]
305pub struct TableOptimizeOutcome {
306 pub table: Table,
307 pub indices: PhaseOutcome,
308 pub compaction: PhaseOutcome,
309}
310
311#[derive(Debug, Clone)]
314pub enum OptimizeEvent {
315 PhaseStart {
316 table: Table,
317 phase: OptimizePhase,
318 detail: Option<String>,
319 },
320 PhaseDone {
321 table: Table,
322 phase: OptimizePhase,
323 elapsed_ms: u64,
324 },
325}
326
327#[derive(Debug, Clone, Copy)]
328pub enum OptimizePhase {
329 Compact,
330 Cleanup,
331 IndexCreate,
332 IndexRebuild,
333 IndexAppend,
334}
335
336impl OptimizePhase {
337 pub fn label(self) -> &'static str {
338 match self {
339 Self::Compact => "compact",
340 Self::Cleanup => "cleanup",
341 Self::IndexCreate => "index-create",
342 Self::IndexRebuild => "index-rebuild",
343 Self::IndexAppend => "index-append",
344 }
345 }
346}
347
348pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
349
350fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
351 if let Some(callback) = progress {
352 callback(event);
353 }
354}
355
356pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
360 error.downcast_ref::<lance::Error>().is_some_and(|err| {
361 matches!(
362 err,
363 lance::Error::CommitConflict { .. }
364 | lance::Error::RetryableCommitConflict { .. }
365 | lance::Error::TooMuchWriteContention { .. }
366 )
367 })
368}
369
370fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
373 error.chain().any(|cause| cause.is::<ConflictExhausted>())
374}
375
376#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
380pub struct TableSizes {
381 pub sessions: u64,
382 pub messages: u64,
383 pub parts: u64,
384 pub other: u64,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub enum ScalarValue {
389 String(String),
390 Int32(i32),
391 Raw(String),
392}
393impl From<&str> for ScalarValue {
394 fn from(value: &str) -> Self {
395 Self::String(value.to_owned())
396 }
397}
398impl From<String> for ScalarValue {
399 fn from(value: String) -> Self {
400 Self::String(value)
401 }
402}
403impl From<i32> for ScalarValue {
404 fn from(value: i32) -> Self {
405 Self::Int32(value)
406 }
407}
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum Predicate {
410 Eq(&'static str, ScalarValue),
411 Ne(&'static str, ScalarValue),
412 IsNull(&'static str),
413 IsNotNull(&'static str),
414 In(&'static str, Vec<ScalarValue>),
415 LikeContains(&'static str, String),
416 Regex(&'static str, String),
421 Gte(&'static str, ScalarValue),
422 Lte(&'static str, ScalarValue),
423 And(Vec<Predicate>),
424 Or(Vec<Predicate>),
425 Not(Box<Predicate>),
426}
427impl Predicate {
428 pub fn to_lance(&self) -> String {
429 match self {
430 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
431 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
432 Self::IsNull(column) => format!("{column} IS NULL"),
433 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
434 Self::In(column, values) => {
435 let values = values
436 .iter()
437 .map(ScalarValue::to_lance)
438 .collect::<Vec<_>>()
439 .join(", ");
440 format!("{column} IN ({values})")
441 }
442 Self::LikeContains(column, value) => {
443 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
444 }
445 Self::Regex(column, pattern) => {
446 format!("regexp_like({column}, {})", quoted_string(pattern))
447 }
448 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
449 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
450 Self::And(predicates) => predicates
451 .iter()
452 .map(Self::to_lance)
453 .filter(|predicate| !predicate.is_empty())
454 .collect::<Vec<_>>()
455 .join(" AND "),
456 Self::Or(predicates) => {
457 let body = predicates
460 .iter()
461 .map(Self::to_lance)
462 .filter(|predicate| !predicate.is_empty())
463 .collect::<Vec<_>>()
464 .join(" OR ");
465 if body.is_empty() {
466 String::new()
467 } else {
468 format!("({body})")
469 }
470 }
471 Self::Not(inner) => {
472 let body = inner.to_lance();
473 if body.is_empty() {
474 String::new()
475 } else {
476 format!("NOT ({body})")
477 }
478 }
479 }
480 }
481}
482#[derive(Default)]
485pub struct ScanOpts<'a> {
486 pub predicate: Option<&'a Predicate>,
487 pub projection: Option<&'a [&'a str]>,
488}
489
490impl<'a> ScanOpts<'a> {
491 pub fn project_only(projection: &'a [&'a str]) -> Self {
492 Self {
493 predicate: None,
494 projection: Some(projection),
495 }
496 }
497 pub fn with_predicate_and_projection(
498 predicate: &'a Predicate,
499 projection: &'a [&'a str],
500 ) -> Self {
501 Self {
502 predicate: Some(predicate),
503 projection: Some(projection),
504 }
505 }
506}
507
508impl ScalarValue {
509 fn to_lance(&self) -> String {
510 match self {
511 Self::String(value) => quoted_string(value),
512 Self::Int32(value) => value.to_string(),
513 Self::Raw(value) => value.clone(),
514 }
515 }
516}
517#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
521pub struct RuntimeCaps {
522 pub index_cache_bytes: Option<usize>,
523 pub metadata_cache_bytes: Option<usize>,
524}
525
526impl RuntimeCaps {
527 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
528 Self {
529 index_cache_bytes: config.index_cache_bytes,
530 metadata_cache_bytes: config.metadata_cache_bytes,
531 }
532 }
533}
534
535const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
539const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
540const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
542const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
543
544fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
545 let (index_default, metadata_default) = if config::is_local(location) {
546 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
547 } else {
548 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
549 };
550 (
551 caps.index_cache_bytes.unwrap_or(index_default),
552 caps.metadata_cache_bytes.unwrap_or(metadata_default),
553 )
554}
555
556pub struct Handle {
557 datasets: DatasetSet,
558 retry: RetryPolicy,
559 #[allow(dead_code)]
567 session: Arc<Session>,
568 nm: Arc<dyn LanceNamespace>,
572 nm_ident: NamespaceIdent,
576 storage_options: HashMap<String, String>,
581 location: Url,
585 parts_refresh_after: Duration,
589}
590
591impl std::fmt::Debug for Handle {
592 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593 formatter
594 .debug_struct("Handle")
595 .field("datasets", &self.datasets)
596 .field("retry", &self.retry)
597 .field("nm_ident", &self.nm_ident)
598 .field("storage_options", &self.storage_options)
599 .field("location", &self.location)
600 .finish()
601 }
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub enum Table {
606 Sessions,
607 Messages,
608 Parts,
609}
610impl Table {
611 pub fn as_str(self) -> &'static str {
612 self.label()
613 }
614
615 fn label(self) -> &'static str {
616 match self {
617 Self::Sessions => "sessions",
618 Self::Messages => "messages",
619 Self::Parts => "parts",
620 }
621 }
622}
623#[derive(Debug)]
624struct DatasetSet {
625 sessions: Mutex<CachedDataset>,
626 messages: Mutex<CachedDataset>,
627 parts: OnceCell<Mutex<CachedDataset>>,
635}
636#[derive(Debug)]
637struct CachedDataset {
638 dataset: Dataset,
639 last_refresh: Instant,
640 refresh_after: Duration,
641}
642impl CachedDataset {
643 async fn latest(&mut self) -> Result<Dataset> {
644 if self.last_refresh.elapsed() >= self.refresh_after {
645 self.dataset.checkout_latest().await?;
646 self.last_refresh = Instant::now();
647 }
648 Ok(self.dataset.clone())
649 }
650 fn replace(&mut self, dataset: Dataset) {
651 self.dataset = dataset;
652 self.last_refresh = Instant::now();
653 }
654}
655impl Handle {
656 pub async fn open(location: &Url) -> Result<Self> {
659 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
660 }
661
662 pub async fn open_with_options(
668 location: &Url,
669 mut storage_options: HashMap<String, String>,
670 caps: RuntimeCaps,
671 ) -> Result<Self> {
672 if let Some(path) = config::local_path(location) {
673 tokio::fs::create_dir_all(&path)
674 .await
675 .with_context(|| format!("failed to create data dir {}", path.display()))?;
676 } else {
677 apply_remote_storage_defaults(&mut storage_options);
678 }
679 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
685 let session = Arc::new(Session::new(
686 index_cache_bytes,
687 metadata_cache_bytes,
688 Arc::new(ObjectStoreRegistry::default()),
689 ));
690 let root = location.as_str().trim_end_matches('/').to_string();
696 let mut connect = ConnectBuilder::new("dir")
697 .property("root", root)
698 .session(session.clone());
699 for (key, value) in &storage_options {
703 connect = connect.property(format!("storage.{key}"), value.clone());
704 }
705 let nm: Arc<dyn LanceNamespace> = connect
706 .connect()
707 .await
708 .context("failed to connect lance Directory namespace")?;
709 let nm_ident = NamespaceIdent::root();
710 let refresh_after = if config::is_local(location) {
716 Duration::ZERO
717 } else {
718 Duration::from_secs(5)
719 };
720 let handle = Self {
721 datasets: DatasetSet {
722 sessions: Mutex::new(CachedDataset {
723 dataset: open_or_create_via_ns(
724 &nm,
725 &nm_ident,
726 sessions::SESSIONS,
727 sessions::session_schema(),
728 &session,
729 &storage_options,
730 )
731 .await?,
732 last_refresh: Instant::now(),
733 refresh_after,
734 }),
735 messages: Mutex::new(CachedDataset {
736 dataset: open_or_create_via_ns(
737 &nm,
738 &nm_ident,
739 sessions::MESSAGES,
740 sessions::message_schema(),
741 &session,
742 &storage_options,
743 )
744 .await?,
745 last_refresh: Instant::now(),
746 refresh_after,
747 }),
748 parts: OnceCell::new(),
749 },
750 retry: RetryPolicy::default(),
751 session,
752 nm,
753 nm_ident,
754 storage_options,
755 location: location.clone(),
756 parts_refresh_after: refresh_after,
757 };
758 Ok(handle)
759 }
760
761 pub fn location(&self) -> &Url {
762 &self.location
763 }
764
765 pub fn storage_options(&self) -> &HashMap<String, String> {
769 &self.storage_options
770 }
771
772 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
773 Ok((
774 self.count_rows(Table::Sessions).await?,
775 self.count_rows(Table::Messages).await?,
776 self.count_rows(Table::Parts).await?,
777 ))
778 }
779
780 pub(crate) async fn merge_insert(
784 &self,
785 table: Table,
786 batch: RecordBatch,
787 row_count: usize,
788 ) -> Result<u64> {
789 self.merge(
790 table,
791 batch,
792 row_count,
793 "merge_insert",
794 WhenMatched::DoNothing,
795 WhenNotMatched::InsertAll,
796 )
797 .await
798 }
799
800 pub(crate) async fn merge_update(
803 &self,
804 table: Table,
805 batch: RecordBatch,
806 row_count: usize,
807 ) -> Result<u64> {
808 self.merge(
809 table,
810 batch,
811 row_count,
812 "merge_update",
813 WhenMatched::UpdateAll,
814 WhenNotMatched::DoNothing,
815 )
816 .await
817 }
818
819 async fn merge(
823 &self,
824 table: Table,
825 batch: RecordBatch,
826 row_count: usize,
827 op: &'static str,
828 when_matched: WhenMatched,
829 when_not_matched: WhenNotMatched,
830 ) -> Result<u64> {
831 if row_count == 0 {
832 return Ok(0);
833 }
834 let started = Instant::now();
835 let result = self
836 .retry_lance(table.label(), || async {
837 let mut cached = self.cached(table).await?.lock().await;
838 let existing = cached.latest().await?;
839 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
840 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
841 builder.when_matched(when_matched.clone());
842 builder.when_not_matched(when_not_matched.clone());
843 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
846 builder.skip_auto_cleanup(true);
850 let (dataset, stats) = builder
851 .try_build()?
852 .execute_reader(Box::new(reader))
853 .await?;
854 cached.replace(dataset.as_ref().clone());
855 Ok((
856 stats.num_inserted_rows + stats.num_updated_rows,
857 stats.num_skipped_duplicates,
858 ))
859 })
860 .await;
861 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
862 tracing::info!(
863 target: "pond::perf",
864 op,
865 table = %table.label(),
866 rows = row_count,
867 elapsed_ms = started.elapsed().as_millis() as u64,
868 skipped,
869 "merge",
870 );
871 result.map(|(affected, _)| affected)
872 }
873
874 pub async fn optimize_table(
883 &self,
884 table: Table,
885 intents: &[IndexIntent],
886 progress: Option<&OptimizeProgressFn>,
887 policy: &MaintenancePolicy,
888 ) -> TableOptimizeOutcome {
889 let compaction = self
890 .run_optimize_compact_phase(table, progress, policy)
891 .await;
892 let indices = self
893 .run_optimize_indices_phase(table, intents, progress)
894 .await;
895 TableOptimizeOutcome {
896 table,
897 indices,
898 compaction,
899 }
900 }
901
902 pub async fn optimize_table_indices_only(
906 &self,
907 table: Table,
908 intents: &[IndexIntent],
909 progress: Option<&OptimizeProgressFn>,
910 ) -> PhaseOutcome {
911 self.run_optimize_indices_phase(table, intents, progress)
912 .await
913 }
914
915 async fn run_optimize_indices_phase(
916 &self,
917 table: Table,
918 intents: &[IndexIntent],
919 progress: Option<&OptimizeProgressFn>,
920 ) -> PhaseOutcome {
921 if intents.is_empty() {
922 return PhaseOutcome::Noop;
923 }
924 let result = self
925 .retry_lance(table.label(), || async {
926 let mut guard = self.cached(table).await?.lock().await;
927 let mut dataset = guard.latest().await?;
928 let did_work =
929 optimize_table_indices(&mut dataset, intents, table, progress).await?;
930 guard.replace(dataset);
931 Ok::<_, anyhow::Error>(did_work)
932 })
933 .await;
934 match result {
935 Ok(true) => PhaseOutcome::Ok,
936 Ok(false) => PhaseOutcome::Noop,
937 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
938 Err(error) => PhaseOutcome::Failed(error),
939 }
940 }
941
942 async fn run_optimize_compact_phase(
943 &self,
944 table: Table,
945 progress: Option<&OptimizeProgressFn>,
946 policy: &MaintenancePolicy,
947 ) -> PhaseOutcome {
948 let result = self
949 .retry_lance(table.label(), || async {
950 let mut guard = self.cached(table).await?.lock().await;
951 let mut dataset = guard.latest().await?;
952 optimize_table_compact(&mut dataset, table, progress, policy).await?;
953 guard.replace(dataset);
954 Ok::<_, anyhow::Error>(())
955 })
956 .await;
957 match result {
958 Ok(()) => PhaseOutcome::Ok,
959 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
960 Err(error) => PhaseOutcome::Failed(error),
961 }
962 }
963
964 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
965 self.retry_lance(table.label(), || async {
966 let mut guard = self.cached(table).await?.lock().await;
967 let mut dataset = guard.latest().await?;
968 rebuild_index(&mut dataset, intent).await?;
969 guard.replace(dataset);
970 Ok(())
971 })
972 .await
973 }
974
975 pub async fn index_status(
976 &self,
977 table: Table,
978 intents: &[IndexIntent],
979 ) -> Result<Vec<IndexStatus>> {
980 let dataset = self.dataset(table).await?;
981 index_status(table, &dataset, intents).await
982 }
983
984 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
985 let mut cached = self.cached(table).await?.lock().await;
986 cached.latest().await
987 }
988 pub(crate) async fn scanner(
993 &self,
994 table: Table,
995 predicate: Option<&Predicate>,
996 ) -> Result<lance::dataset::scanner::Scanner> {
997 let dataset = self.dataset(table).await?;
998 scanner_with_prefilter(&dataset, predicate)
999 }
1000 pub async fn scan(
1003 &self,
1004 table: Table,
1005 opts: ScanOpts<'_>,
1006 ) -> Result<lance::dataset::scanner::Scanner> {
1007 let mut scanner = self.scanner(table, opts.predicate).await?;
1008 if let Some(projection) = opts.projection {
1009 scanner.project(projection)?;
1010 }
1011 Ok(scanner)
1012 }
1013 pub(crate) async fn scan_batch(
1014 &self,
1015 table: Table,
1016 predicate: Option<&Predicate>,
1017 projection: &[&str],
1018 ) -> Result<RecordBatch> {
1019 let opts = ScanOpts {
1020 predicate,
1021 projection: (!projection.is_empty()).then_some(projection),
1022 };
1023 self.scan(table, opts)
1024 .await?
1025 .try_into_batch()
1026 .await
1027 .context("scan failed")
1028 }
1029 pub async fn count_rows(&self, table: Table) -> Result<usize> {
1030 self.dataset(table)
1031 .await?
1032 .count_rows(None)
1033 .await
1034 .map_err(Into::into)
1035 }
1036 #[cfg(test)]
1038 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1039 let dataset = self.dataset(Table::Messages).await?;
1040 let indices = dataset.load_indices().await?;
1041 Ok(indices.iter().map(|index| index.name.clone()).collect())
1042 }
1043
1044 pub(crate) async fn unindexed_row_count(
1047 &self,
1048 table: Table,
1049 index_name: &str,
1050 ) -> Result<usize> {
1051 let dataset = self.dataset(table).await?;
1052 let fragments = dataset
1053 .unindexed_fragments(index_name)
1054 .await
1055 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1056 Ok(fragments
1057 .iter()
1058 .map(|fragment| fragment.num_rows().unwrap_or(0))
1059 .sum())
1060 }
1061
1062 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1068 let mut guard = self.cached(table).await?.lock().await;
1069 let mut dataset = guard.latest().await?;
1070 dataset
1071 .drop_index(name)
1072 .await
1073 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1074 guard.replace(dataset);
1075 Ok(())
1076 }
1077
1078 async fn table_location(&self, table_name: &str) -> Result<String> {
1081 let request = DescribeTableRequest {
1082 id: Some(self.nm_ident.as_table_id(table_name)),
1083 ..Default::default()
1084 };
1085 let response = self
1086 .nm
1087 .describe_table(request)
1088 .await
1089 .with_context(|| format!("failed to describe table {table_name}"))?;
1090 response
1091 .location
1092 .with_context(|| format!("namespace returned no location for table {table_name}"))
1093 }
1094
1095 pub async fn table_sizes(&self) -> Result<TableSizes> {
1099 let registry = Arc::new(ObjectStoreRegistry::default());
1100 let params = ObjectStoreParams {
1101 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1102 Arc::new(StorageOptionsAccessor::with_static_options(
1103 self.storage_options.clone(),
1104 ))
1105 }),
1106 ..Default::default()
1107 };
1108
1109 let sessions = self
1110 .listed_size(
1111 ®istry,
1112 ¶ms,
1113 &self.table_location(sessions::SESSIONS).await?,
1114 )
1115 .await?;
1116 let messages = self
1117 .listed_size(
1118 ®istry,
1119 ¶ms,
1120 &self.table_location(sessions::MESSAGES).await?,
1121 )
1122 .await?;
1123 let parts = self
1124 .listed_size(
1125 ®istry,
1126 ¶ms,
1127 &self.table_location(sessions::PARTS).await?,
1128 )
1129 .await?;
1130 let root_total = self
1133 .listed_size(®istry, ¶ms, self.location.as_str())
1134 .await?;
1135 let other = root_total.saturating_sub(sessions + messages + parts);
1136 Ok(TableSizes {
1137 sessions,
1138 messages,
1139 parts,
1140 other,
1141 })
1142 }
1143
1144 async fn listed_size(
1146 &self,
1147 registry: &Arc<ObjectStoreRegistry>,
1148 params: &ObjectStoreParams,
1149 uri: &str,
1150 ) -> Result<u64> {
1151 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1152 .await
1153 .with_context(|| format!("failed to open object store for {uri}"))?;
1154 let mut listing = store.list(Some(base));
1155 let mut total = 0u64;
1156 while let Some(meta) = listing.next().await {
1157 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1158 total += meta.size;
1159 }
1160 Ok(total)
1161 }
1162 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1163 match table {
1164 Table::Sessions => Ok(&self.datasets.sessions),
1165 Table::Messages => Ok(&self.datasets.messages),
1166 Table::Parts => self.parts_cached().await,
1167 }
1168 }
1169
1170 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1173 self.datasets
1174 .parts
1175 .get_or_try_init(|| async {
1176 let dataset = open_or_create_via_ns(
1177 &self.nm,
1178 &self.nm_ident,
1179 sessions::PARTS,
1180 sessions::part_schema(),
1181 &self.session,
1182 &self.storage_options,
1183 )
1184 .await?;
1185 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1186 dataset,
1187 last_refresh: Instant::now(),
1188 refresh_after: self.parts_refresh_after,
1189 }))
1190 })
1191 .await
1192 }
1193 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1194 where
1195 Fut: std::future::Future<Output = Result<T>>,
1196 Op: FnMut() -> Fut,
1197 {
1198 let mut attempt = 0u8;
1199 loop {
1200 attempt = attempt.saturating_add(1);
1201 match operation().await {
1202 Ok(value) => return Ok(value),
1203 Err(error) if attempt < self.retry.attempts => {
1204 let backoff = self.backoff(attempt);
1205 let error_chain = format!("{error:#}");
1208 tracing::warn!(
1209 label,
1210 attempt,
1211 ?backoff,
1212 error = %error_chain,
1213 "retrying Lance operation"
1214 );
1215 tokio::time::sleep(backoff).await;
1216 }
1217 Err(error) => {
1218 let error_chain = format!("{error:#}");
1219 tracing::warn!(
1220 label,
1221 attempt,
1222 error = %error_chain,
1223 "Lance operation exhausted retries"
1224 );
1225 if is_commit_conflict(&error) {
1232 return Err(error.context(ConflictExhausted { attempts: attempt }));
1233 }
1234 return Err(error);
1235 }
1236 }
1237 }
1238 }
1239 fn backoff(&self, attempt: u8) -> Duration {
1240 let shift = u32::from(attempt.saturating_sub(1));
1241 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1242 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1243 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1246 base.mul_f64(factor).min(self.retry.max_backoff)
1247 }
1248}
1249async fn optimize_table_compact(
1264 dataset: &mut Dataset,
1265 table: Table,
1266 progress: Option<&OptimizeProgressFn>,
1267 policy: &MaintenancePolicy,
1268) -> Result<()> {
1269 let compaction = CompactionOptions {
1270 defer_index_remap: false,
1271 ..CompactionOptions::default()
1272 };
1273
1274 let target = compaction.target_rows_per_fragment;
1277 let fragments = dataset.get_fragments();
1278 let (mergeable_run_rows, candidate_count) = compaction_candidates(
1279 fragments
1280 .iter()
1281 .map(|fragment| fragment.metadata().physical_rows.unwrap_or(0)),
1282 target,
1283 );
1284 if should_compact(
1285 mergeable_run_rows,
1286 candidate_count,
1287 target,
1288 policy.compaction_fragment_cap,
1289 ) {
1290 emit(
1291 progress,
1292 OptimizeEvent::PhaseStart {
1293 table,
1294 phase: OptimizePhase::Compact,
1295 detail: None,
1296 },
1297 );
1298 let started = Instant::now();
1299 compact_files(dataset, compaction, None).await?;
1300 emit(
1301 progress,
1302 OptimizeEvent::PhaseDone {
1303 table,
1304 phase: OptimizePhase::Compact,
1305 elapsed_ms: started.elapsed().as_millis() as u64,
1306 },
1307 );
1308 } else {
1309 tracing::debug!(
1310 target: "pond::perf",
1311 table = table.as_str(),
1312 mergeable_run_rows,
1313 candidate_count,
1314 cap = policy.compaction_fragment_cap,
1315 "compaction skipped: sub-target fragments under threshold",
1316 );
1317 }
1318
1319 emit(
1323 progress,
1324 OptimizeEvent::PhaseStart {
1325 table,
1326 phase: OptimizePhase::Cleanup,
1327 detail: None,
1328 },
1329 );
1330 let started = Instant::now();
1331 dataset
1332 .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1333 .await
1334 .context("cleanup_old_versions failed during index optimize")?;
1335 emit(
1336 progress,
1337 OptimizeEvent::PhaseDone {
1338 table,
1339 phase: OptimizePhase::Cleanup,
1340 elapsed_ms: started.elapsed().as_millis() as u64,
1341 },
1342 );
1343
1344 Ok(())
1345}
1346
1347async fn optimize_table_indices(
1350 dataset: &mut Dataset,
1351 intents: &[IndexIntent],
1352 table: Table,
1353 progress: Option<&OptimizeProgressFn>,
1354) -> Result<bool> {
1355 let existing = dataset.load_indices().await?;
1356 let existing_names: std::collections::HashSet<String> =
1357 existing.iter().map(|index| index.name.clone()).collect();
1358
1359 let mut append_indices: Vec<String> = Vec::new();
1360 let mut did_work = false;
1361
1362 for intent in intents {
1363 let exists = existing_names.contains(intent.name);
1364
1365 if !exists {
1366 if !intent.trigger.should_create(dataset).await? {
1367 continue;
1368 }
1369 let params = intent.params.build(dataset).await?;
1370 let index_type = intent.params.index_type();
1371 tracing::info!(
1372 index = intent.name,
1373 column = intent.column,
1374 "creating Lance index (trigger fired)",
1375 );
1376 emit(
1377 progress,
1378 OptimizeEvent::PhaseStart {
1379 table,
1380 phase: OptimizePhase::IndexCreate,
1381 detail: Some(intent.name.to_owned()),
1382 },
1383 );
1384 let started = Instant::now();
1385 dataset
1386 .create_index(
1387 &[intent.column],
1388 index_type,
1389 Some(intent.name.to_owned()),
1390 params.as_ref(),
1391 false,
1392 )
1393 .await
1394 .with_context(|| format!("failed to create index {}", intent.name))?;
1395 emit(
1396 progress,
1397 OptimizeEvent::PhaseDone {
1398 table,
1399 phase: OptimizePhase::IndexCreate,
1400 elapsed_ms: started.elapsed().as_millis() as u64,
1401 },
1402 );
1403 did_work = true;
1404 continue;
1405 }
1406
1407 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1408 if unindexed.is_empty() {
1409 continue;
1410 }
1411 if unindexed.len() < index_lag_threshold() {
1415 continue;
1416 }
1417 match intent.params {
1418 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1419 let params = intent.params.build(dataset).await?;
1420 let index_type = intent.params.index_type();
1421 tracing::debug!(
1422 target: "pond::perf",
1423 index = intent.name,
1424 column = intent.column,
1425 "rebuilding Lance BTree index",
1426 );
1427 emit(
1428 progress,
1429 OptimizeEvent::PhaseStart {
1430 table,
1431 phase: OptimizePhase::IndexRebuild,
1432 detail: Some(intent.name.to_owned()),
1433 },
1434 );
1435 let started = Instant::now();
1436 dataset
1437 .create_index(
1438 &[intent.column],
1439 index_type,
1440 Some(intent.name.to_owned()),
1441 params.as_ref(),
1442 true,
1443 )
1444 .await
1445 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1446 emit(
1447 progress,
1448 OptimizeEvent::PhaseDone {
1449 table,
1450 phase: OptimizePhase::IndexRebuild,
1451 elapsed_ms: started.elapsed().as_millis() as u64,
1452 },
1453 );
1454 did_work = true;
1455 }
1456 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1457 | IndexParamsKind::InvertedFtsNgram { .. }
1458 | IndexParamsKind::IvfPqCosine { .. } => {
1459 append_indices.push(intent.name.to_owned());
1460 }
1461 IndexParamsKind::Scalar(_) => {
1462 let params = intent.params.build(dataset).await?;
1463 emit(
1464 progress,
1465 OptimizeEvent::PhaseStart {
1466 table,
1467 phase: OptimizePhase::IndexRebuild,
1468 detail: Some(intent.name.to_owned()),
1469 },
1470 );
1471 let started = Instant::now();
1472 dataset
1473 .create_index(
1474 &[intent.column],
1475 intent.params.index_type(),
1476 Some(intent.name.to_owned()),
1477 params.as_ref(),
1478 true,
1479 )
1480 .await
1481 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1482 emit(
1483 progress,
1484 OptimizeEvent::PhaseDone {
1485 table,
1486 phase: OptimizePhase::IndexRebuild,
1487 elapsed_ms: started.elapsed().as_millis() as u64,
1488 },
1489 );
1490 did_work = true;
1491 }
1492 }
1493 }
1494
1495 if !append_indices.is_empty() {
1496 let to_append = append_indices.clone();
1497 emit(
1498 progress,
1499 OptimizeEvent::PhaseStart {
1500 table,
1501 phase: OptimizePhase::IndexAppend,
1502 detail: Some(append_indices.join(", ")),
1503 },
1504 );
1505 let started = Instant::now();
1506 dataset
1507 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1508 .await
1509 .context("optimize_indices(append) failed during index optimize")?;
1510 emit(
1511 progress,
1512 OptimizeEvent::PhaseDone {
1513 table,
1514 phase: OptimizePhase::IndexAppend,
1515 elapsed_ms: started.elapsed().as_millis() as u64,
1516 },
1517 );
1518 tracing::debug!(
1519 target: "pond::perf",
1520 indices = ?append_indices,
1521 "appended trailing fragments into indices",
1522 );
1523 did_work = true;
1524 }
1525
1526 Ok(did_work)
1527}
1528
1529async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1530 if !intent.trigger.should_create(dataset).await? {
1531 return Ok(());
1532 }
1533 let params = intent.params.build(dataset).await?;
1534 dataset
1535 .create_index(
1536 &[intent.column],
1537 intent.params.index_type(),
1538 Some(intent.name.to_owned()),
1539 params.as_ref(),
1540 true,
1541 )
1542 .await
1543 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1544 Ok(())
1545}
1546
1547async fn index_status(
1548 table: Table,
1549 dataset: &Dataset,
1550 intents: &[IndexIntent],
1551) -> Result<Vec<IndexStatus>> {
1552 let existing = dataset.load_indices().await?;
1553 let existing_names: std::collections::HashSet<String> =
1554 existing.iter().map(|index| index.name.clone()).collect();
1555 let total_fragments = dataset.get_fragments().len();
1556 let total_rows = dataset.count_rows(None).await?;
1557 let mut statuses = Vec::with_capacity(intents.len());
1558 for intent in intents {
1559 let exists = existing_names.contains(intent.name);
1560 if !exists {
1561 statuses.push(IndexStatus {
1562 table,
1563 intent_name: intent.name.to_owned(),
1564 fragments_covered: 0,
1565 unindexed_fragments: total_fragments,
1566 unindexed_rows: total_rows,
1567 exists,
1568 });
1569 continue;
1570 }
1571 let unindexed = dataset
1572 .unindexed_fragments(intent.name)
1573 .await
1574 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1575 let unindexed_fragments = unindexed.len();
1576 let unindexed_rows = unindexed
1577 .iter()
1578 .map(|fragment| fragment.num_rows().unwrap_or(0))
1579 .sum();
1580 statuses.push(IndexStatus {
1581 table,
1582 intent_name: intent.name.to_owned(),
1583 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1584 unindexed_fragments,
1585 unindexed_rows,
1586 exists,
1587 });
1588 }
1589 Ok(statuses)
1590}
1591
1592async fn open_or_create_via_ns(
1604 nm: &Arc<dyn LanceNamespace>,
1605 nm_ident: &NamespaceIdent,
1606 table_name: &str,
1607 schema: lance::deps::arrow_schema::SchemaRef,
1608 session: &Arc<Session>,
1609 storage_options: &HashMap<String, String>,
1610) -> Result<Dataset> {
1611 let table_id = nm_ident.as_table_id(table_name);
1612
1613 let request = DescribeTableRequest {
1614 id: Some(table_id.clone()),
1615 ..Default::default()
1616 };
1617 match nm.describe_table(request).await {
1618 Ok(response) => {
1619 let location = response.location.with_context(|| {
1620 format!("namespace returned no location for table {table_name}")
1621 })?;
1622 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1623 if !storage_options.is_empty() {
1624 builder = builder.with_storage_options(storage_options.clone());
1625 }
1626 let dataset = builder
1627 .load()
1628 .await
1629 .with_context(|| format!("failed to open table {table_name}"))?;
1630 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1631 return Ok(dataset);
1632 }
1633 Err(error) => match &error {
1634 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1635 }
1637 _ => {
1638 return Err(anyhow::Error::from(error))
1639 .with_context(|| format!("failed to describe table {table_name}"));
1640 }
1641 },
1642 }
1643
1644 let mut write_params = sessions::write_params_for_create();
1647 write_params.session = Some(session.clone());
1648 write_params.mode = WriteMode::Create;
1649 if !storage_options.is_empty() {
1650 write_params.store_params = Some(ObjectStoreParams {
1651 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1652 storage_options.clone(),
1653 ))),
1654 ..Default::default()
1655 });
1656 }
1657 let reader = sessions::empty_reader(schema)?;
1658 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1659 .await
1660 .with_context(|| format!("failed to create table {table_name}"))
1661}
1662
1663fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1667 if !matches!(error, lance::Error::Namespace { .. }) {
1668 return false;
1669 }
1670 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1671 link.source()
1672 })
1673 .filter_map(|link| link.downcast_ref::<NamespaceError>())
1674 .any(|inner| inner.code() == code)
1675}
1676
1677fn scanner_with_prefilter(
1678 dataset: &Dataset,
1679 predicate: Option<&Predicate>,
1680) -> Result<lance::dataset::scanner::Scanner> {
1681 let mut scanner = dataset.scan();
1682 scanner.prefilter(true);
1683 if let Some(predicate) = predicate {
1684 let filter = predicate.to_lance();
1685 if !filter.is_empty() {
1686 scanner.filter(&filter)?;
1687 }
1688 }
1689 Ok(scanner)
1690}
1691fn ensure_schema_matches(
1692 dataset: &Dataset,
1693 expected: &lance::deps::arrow_schema::Schema,
1694 table_name: &str,
1695) -> Result<()> {
1696 use lance::deps::arrow_schema::DataType;
1697 use std::collections::BTreeSet;
1698 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1699 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1700 let expected_names: BTreeSet<&str> = expected
1701 .fields()
1702 .iter()
1703 .map(|f| f.name().as_str())
1704 .collect();
1705 if actual_names != expected_names {
1706 anyhow::bail!(
1707 "table {table_name} has columns {actual_names:?} but this pond build expects \
1708 {expected_names:?} - the on-disk store predates a schema change; delete the \
1709 data directory and re-run `pond ingest`",
1710 );
1711 }
1712 for actual_field in actual.fields() {
1717 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1718 continue;
1719 };
1720 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1721 (actual_field.data_type(), expected_field.data_type())
1722 && actual_dim != expected_dim
1723 {
1724 tracing::warn!(
1725 table = table_name,
1726 column = actual_field.name(),
1727 actual_dim,
1728 expected_dim,
1729 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1730 );
1731 }
1732 }
1733 Ok(())
1734}
1735fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1742 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1743 if aliases
1744 .iter()
1745 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1746 {
1747 return;
1748 }
1749 options.insert(aliases[0].to_owned(), value.to_owned());
1750 }
1751 set_default(options, &["pool_idle_timeout"], "300 seconds");
1752 set_default(options, &["connect_timeout"], "10 seconds");
1753 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1754 .iter()
1755 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1756 if has_custom_endpoint {
1757 set_default(
1758 options,
1759 &["aws_unsigned_payload", "unsigned_payload"],
1760 "true",
1761 );
1762 }
1763}
1764
1765fn quoted_string(value: &str) -> String {
1766 format!("'{}'", value.replace('\'', "''"))
1767}
1768fn like_contains(value: &str) -> String {
1769 let escaped = value
1770 .replace('\\', "\\\\")
1771 .replace('%', "\\%")
1772 .replace('_', "\\_")
1773 .replace('\'', "''");
1774 format!("'%{escaped}%'")
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779 use super::*;
1780 use tempfile::TempDir;
1781
1782 #[test]
1783 fn compaction_gate_skips_subtarget_trickle_compacts_on_progress() {
1784 let target = 1_048_576;
1785 assert!(!should_compact(510_000 + 30, 2, target, 64));
1788 assert!(should_compact(target, 3, target, 64));
1790 assert!(should_compact(5_000, 64, target, 64));
1792 assert!(should_compact(0, 0, target, 0));
1794 }
1795
1796 #[test]
1797 fn compaction_candidates_strands_isolated_subtarget_fragment() {
1798 let target = 1_048_576;
1799 let (run, count) =
1803 compaction_candidates([1_048_576, 256_000, 1_048_576, 510_000, 30], target);
1804 assert_eq!(count, 3);
1805 assert_eq!(run, 510_030);
1806 assert!(!should_compact(run, count, target, 64));
1807 }
1808
1809 #[test]
1810 fn namespace_error_code_walks_wrapped_chain() {
1811 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1812 message: "missing".into(),
1813 }));
1814 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1815
1816 let wrapped = lance::Error::namespace_source(Box::new(direct));
1817 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1818
1819 let other_code =
1820 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1821 message: "nope".into(),
1822 }));
1823 assert!(!is_namespace_error_code(
1824 &other_code,
1825 ErrorCode::TableNotFound
1826 ));
1827
1828 let not_namespace = lance::Error::internal("unrelated");
1829 assert!(!is_namespace_error_code(
1830 ¬_namespace,
1831 ErrorCode::TableNotFound
1832 ));
1833 }
1834
1835 #[tokio::test]
1839 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1840 let temp = TempDir::new()?;
1841 let url = Url::from_directory_path(temp.path())
1842 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1843 let handle = Handle::open(&url).await?;
1844 let cases: [(Table, &[&str]); 3] = [
1847 (Table::Sessions, &["id"]),
1848 (Table::Messages, &["id"]),
1849 (Table::Parts, &["id"]),
1850 ];
1851 for (table, projection) in cases {
1852 let scanner = handle
1853 .scan(table, ScanOpts::project_only(projection))
1854 .await?;
1855 let batch = scanner.try_into_batch().await?;
1856 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1857 }
1858 Ok(())
1859 }
1860}