1use crate::{
2 RetryPolicy,
3 config::{self},
4 handlers::NamespaceIdent,
5 sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::index::DatasetIndexRemapperOptions;
11use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
12use lance::dataset::write::merge_insert::SourceDedupeBehavior;
13use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
14use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
15use lance::index::DatasetIndexExt;
16use lance::index::DatasetIndexInternalExt;
17use lance::index::vector::VectorIndexParams;
18use lance::session::Session;
19use lance_index::IndexType;
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
22use lance_io::object_store::{
23 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
24};
25use lance_linalg::distance::MetricType;
26use lance_namespace::LanceNamespace;
27use lance_namespace::error::{ErrorCode, NamespaceError};
28use lance_namespace::models::DescribeTableRequest;
29use lance_namespace_impls::ConnectBuilder;
30use std::{
31 collections::HashMap,
32 sync::Arc,
33 time::{Duration, Instant},
34};
35use tokio::sync::{Mutex, OnceCell};
36use tokio_stream::StreamExt;
37use url::Url;
38pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
43
44pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
50
51static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
52
53pub fn init_index_lag_threshold(value: usize) {
56 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
57}
58
59pub fn index_lag_threshold() -> usize {
60 INDEX_LAG_THRESHOLD_RUNTIME
61 .get()
62 .copied()
63 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
64}
65
66pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
70
71pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
75
76const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
77const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
79
80pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
83
84pub fn default_cleanup_older_than() -> chrono::Duration {
90 chrono::Duration::days(1)
91}
92
93#[derive(Debug, Clone, Copy)]
98pub struct MaintenancePolicy {
99 pub compaction_fragment_cap: usize,
101 pub cleanup_older_than: chrono::Duration,
103}
104
105impl MaintenancePolicy {
106 pub fn always_compact() -> Self {
108 Self {
109 compaction_fragment_cap: 0,
110 cleanup_older_than: default_cleanup_older_than(),
111 }
112 }
113}
114
115struct FragmentStat {
116 bytes: Option<u64>,
118 rows: u64,
119 deleted_rows: u64,
120}
121
122fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
125 fragment.files.iter().try_fold(0u64, |total, file| {
126 Some(total + file.file_size_bytes.get()?.get())
127 })
128}
129
130fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
131 FragmentStat {
132 bytes: fragment_bytes(fragment),
133 rows: fragment.physical_rows.unwrap_or(0) as u64,
134 deleted_rows: fragment
135 .deletion_file
136 .as_ref()
137 .and_then(|deletions| deletions.num_deleted_rows)
138 .unwrap_or(0) as u64,
139 }
140}
141
142fn derived_target_rows(stats: &[FragmentStat]) -> usize {
144 let (mut bytes, mut rows) = (0u64, 0u64);
145 for stat in stats {
146 if let Some(fragment_bytes) = stat.bytes
147 && stat.rows > 0
148 {
149 bytes += fragment_bytes;
150 rows += stat.rows;
151 }
152 }
153 if bytes == 0 || rows == 0 {
154 return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
155 }
156 let avg_row_bytes = (bytes / rows).max(1);
157 (TARGET_FRAGMENT_BYTES / avg_row_bytes)
158 .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
159}
160
161fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
166 if stats.iter().any(|stat| {
167 stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
168 }) {
169 return true;
170 }
171 if stats.len() >= cap {
172 return true;
173 }
174 let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
175 stats.iter().filter_map(|stat| stat.bytes).collect()
176 } else {
177 stats.iter().map(|stat| stat.rows).collect()
178 };
179 let total: u64 = weights.iter().sum();
180 let largest = weights.iter().copied().max().unwrap_or(0);
181 (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
182}
183
184#[derive(Debug, Clone)]
187pub struct IndexIntent {
188 pub name: &'static str,
191 pub column: &'static str,
193 pub trigger: IndexTrigger,
195 pub params: IndexParamsKind,
198}
199
200#[derive(Debug, Clone)]
202pub enum IndexTrigger {
203 OnAnyRows,
206 OnNonNullCount {
209 column: &'static str,
210 threshold: usize,
211 },
212}
213
214#[derive(Debug, Clone)]
217pub enum IndexParamsKind {
218 Scalar(BuiltinIndexType),
221 InvertedFtsNgram { min: u32, max: u32 },
225 IvfPqCosine {
230 sub_vectors: usize,
231 num_bits: u8,
232 max_iters: usize,
233 },
234}
235
236impl IndexTrigger {
237 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
238 match self {
239 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
240 Self::OnNonNullCount { column, threshold } => {
241 let count = dataset
242 .count_rows(Some(format!("{column} IS NOT NULL")))
243 .await?;
244 Ok(count >= *threshold)
245 }
246 }
247 }
248}
249
250impl IndexParamsKind {
251 fn index_type(&self) -> IndexType {
252 match self {
253 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
254 Self::Scalar(_) => IndexType::BTree,
255 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
256 Self::IvfPqCosine { .. } => IndexType::Vector,
257 }
258 }
259
260 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
261 match self {
262 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
263 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
264 InvertedIndexParams::default()
265 .base_tokenizer("ngram".to_owned())
266 .ngram_min_length(*min)
267 .ngram_max_length(*max)
268 .stem(false)
269 .remove_stop_words(false),
270 )),
271 Self::IvfPqCosine {
272 sub_vectors,
273 num_bits,
274 max_iters,
275 } => {
276 let count = dataset
277 .count_rows(Some("vector IS NOT NULL".to_owned()))
278 .await?;
279 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
280 Ok(Box::new(VectorIndexParams::ivf_pq(
281 partitions,
282 *num_bits,
283 *sub_vectors,
284 MetricType::Cosine,
285 *max_iters,
286 )))
287 }
288 }
289 }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct IndexStatus {
294 pub table: Table,
295 pub intent_name: String,
296 pub fragments_covered: usize,
297 pub unindexed_fragments: usize,
298 pub unindexed_rows: usize,
299 pub exists: bool,
300}
301
302#[derive(Debug, Clone, Copy)]
307pub struct ConflictExhausted {
308 pub attempts: u8,
309}
310
311impl std::fmt::Display for ConflictExhausted {
312 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313 write!(
314 formatter,
315 "commit conflict exhausted after {} attempt(s)",
316 self.attempts
317 )
318 }
319}
320
321impl std::error::Error for ConflictExhausted {}
322
323#[derive(Debug)]
328pub enum PhaseOutcome {
329 Ok,
331 Noop,
333 SkippedConflict,
336 Failed(anyhow::Error),
338 NotAttempted,
341}
342
343impl PhaseOutcome {
344 pub fn is_failed(&self) -> bool {
345 matches!(self, Self::Failed(_))
346 }
347}
348
349#[derive(Debug)]
351pub struct TableOptimizeOutcome {
352 pub table: Table,
353 pub indices: PhaseOutcome,
354 pub compaction: PhaseOutcome,
355}
356
357#[derive(Debug, Clone)]
360pub enum OptimizeEvent {
361 PhaseStart {
362 table: Table,
363 phase: OptimizePhase,
364 detail: Option<String>,
365 },
366 PhaseDone {
367 table: Table,
368 phase: OptimizePhase,
369 elapsed_ms: u64,
370 },
371}
372
373#[derive(Debug, Clone, Copy)]
374pub enum OptimizePhase {
375 Compact,
376 Cleanup,
377 IndexCreate,
378 IndexRebuild,
379 IndexAppend,
380}
381
382impl OptimizePhase {
383 pub fn label(self) -> &'static str {
384 match self {
385 Self::Compact => "compact",
386 Self::Cleanup => "cleanup",
387 Self::IndexCreate => "index-create",
388 Self::IndexRebuild => "index-rebuild",
389 Self::IndexAppend => "index-append",
390 }
391 }
392}
393
394pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
395
396fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
397 if let Some(callback) = progress {
398 callback(event);
399 }
400}
401
402pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
406 error.downcast_ref::<lance::Error>().is_some_and(|err| {
407 matches!(
408 err,
409 lance::Error::CommitConflict { .. }
410 | lance::Error::RetryableCommitConflict { .. }
411 | lance::Error::TooMuchWriteContention { .. }
412 )
413 })
414}
415
416fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
419 error.chain().any(|cause| cause.is::<ConflictExhausted>())
420}
421
422#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
426pub struct TableSizes {
427 pub sessions: u64,
428 pub messages: u64,
429 pub parts: u64,
430 pub other: u64,
431 pub sessions_data: DataLiveness,
432 pub messages_data: DataLiveness,
433 pub parts_data: DataLiveness,
434}
435
436#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
439pub struct DataLiveness {
440 pub on_disk: u64,
441 pub live: Option<u64>,
443}
444
445impl DataLiveness {
446 pub fn dead(&self) -> Option<u64> {
447 self.live.map(|live| self.on_disk.saturating_sub(live))
448 }
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum ScalarValue {
453 String(String),
454 Int32(i32),
455 Raw(String),
456}
457impl From<&str> for ScalarValue {
458 fn from(value: &str) -> Self {
459 Self::String(value.to_owned())
460 }
461}
462impl From<String> for ScalarValue {
463 fn from(value: String) -> Self {
464 Self::String(value)
465 }
466}
467impl From<i32> for ScalarValue {
468 fn from(value: i32) -> Self {
469 Self::Int32(value)
470 }
471}
472#[derive(Debug, Clone, PartialEq, Eq)]
473pub enum Predicate {
474 Eq(&'static str, ScalarValue),
475 Ne(&'static str, ScalarValue),
476 IsNull(&'static str),
477 IsNotNull(&'static str),
478 In(&'static str, Vec<ScalarValue>),
479 LikeContains(&'static str, String),
480 Regex(&'static str, String),
485 Gte(&'static str, ScalarValue),
486 Lte(&'static str, ScalarValue),
487 And(Vec<Predicate>),
488 Or(Vec<Predicate>),
489 Not(Box<Predicate>),
490}
491impl Predicate {
492 pub fn to_lance(&self) -> String {
493 match self {
494 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
495 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
496 Self::IsNull(column) => format!("{column} IS NULL"),
497 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
498 Self::In(column, values) => {
499 let values = values
500 .iter()
501 .map(ScalarValue::to_lance)
502 .collect::<Vec<_>>()
503 .join(", ");
504 format!("{column} IN ({values})")
505 }
506 Self::LikeContains(column, value) => {
507 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
508 }
509 Self::Regex(column, pattern) => {
510 format!("regexp_like({column}, {})", quoted_string(pattern))
511 }
512 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
513 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
514 Self::And(predicates) => predicates
515 .iter()
516 .map(Self::to_lance)
517 .filter(|predicate| !predicate.is_empty())
518 .collect::<Vec<_>>()
519 .join(" AND "),
520 Self::Or(predicates) => {
521 let body = predicates
524 .iter()
525 .map(Self::to_lance)
526 .filter(|predicate| !predicate.is_empty())
527 .collect::<Vec<_>>()
528 .join(" OR ");
529 if body.is_empty() {
530 String::new()
531 } else {
532 format!("({body})")
533 }
534 }
535 Self::Not(inner) => {
536 let body = inner.to_lance();
537 if body.is_empty() {
538 String::new()
539 } else {
540 format!("NOT ({body})")
541 }
542 }
543 }
544 }
545}
546#[derive(Default)]
549pub struct ScanOpts<'a> {
550 pub predicate: Option<&'a Predicate>,
551 pub projection: Option<&'a [&'a str]>,
552}
553
554impl<'a> ScanOpts<'a> {
555 pub fn project_only(projection: &'a [&'a str]) -> Self {
556 Self {
557 predicate: None,
558 projection: Some(projection),
559 }
560 }
561 pub fn with_predicate_and_projection(
562 predicate: &'a Predicate,
563 projection: &'a [&'a str],
564 ) -> Self {
565 Self {
566 predicate: Some(predicate),
567 projection: Some(projection),
568 }
569 }
570}
571
572impl ScalarValue {
573 fn to_lance(&self) -> String {
574 match self {
575 Self::String(value) => quoted_string(value),
576 Self::Int32(value) => value.to_string(),
577 Self::Raw(value) => value.clone(),
578 }
579 }
580}
581#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
585pub struct RuntimeCaps {
586 pub index_cache_bytes: Option<usize>,
587 pub metadata_cache_bytes: Option<usize>,
588}
589
590impl RuntimeCaps {
591 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
592 Self {
593 index_cache_bytes: config.index_cache_bytes,
594 metadata_cache_bytes: config.metadata_cache_bytes,
595 }
596 }
597}
598
599const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
603const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
604const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
606const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
607
608fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
609 let (index_default, metadata_default) = if config::is_local(location) {
610 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
611 } else {
612 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
613 };
614 (
615 caps.index_cache_bytes.unwrap_or(index_default),
616 caps.metadata_cache_bytes.unwrap_or(metadata_default),
617 )
618}
619
620pub struct Handle {
621 datasets: DatasetSet,
622 retry: RetryPolicy,
623 #[allow(dead_code)]
631 session: Arc<Session>,
632 nm: Arc<dyn LanceNamespace>,
636 nm_ident: NamespaceIdent,
640 storage_options: HashMap<String, String>,
645 location: Url,
649 parts_refresh_after: Duration,
653}
654
655impl std::fmt::Debug for Handle {
656 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657 formatter
658 .debug_struct("Handle")
659 .field("datasets", &self.datasets)
660 .field("retry", &self.retry)
661 .field("nm_ident", &self.nm_ident)
662 .field("storage_options", &self.storage_options)
663 .field("location", &self.location)
664 .finish()
665 }
666}
667
668#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669pub enum Table {
670 Sessions,
671 Messages,
672 Parts,
673}
674impl Table {
675 pub fn as_str(self) -> &'static str {
676 self.label()
677 }
678
679 fn label(self) -> &'static str {
680 match self {
681 Self::Sessions => "sessions",
682 Self::Messages => "messages",
683 Self::Parts => "parts",
684 }
685 }
686}
687#[derive(Debug)]
688struct DatasetSet {
689 sessions: Mutex<CachedDataset>,
690 messages: Mutex<CachedDataset>,
691 parts: OnceCell<Mutex<CachedDataset>>,
699}
700#[derive(Debug)]
701struct CachedDataset {
702 dataset: Dataset,
703 last_refresh: Instant,
704 refresh_after: Duration,
705}
706impl CachedDataset {
707 async fn latest(&mut self) -> Result<Dataset> {
708 if self.last_refresh.elapsed() >= self.refresh_after {
709 self.dataset.checkout_latest().await?;
710 self.last_refresh = Instant::now();
711 }
712 Ok(self.dataset.clone())
713 }
714 fn replace(&mut self, dataset: Dataset) {
715 self.dataset = dataset;
716 self.last_refresh = Instant::now();
717 }
718}
719impl Handle {
720 pub async fn open(location: &Url) -> Result<Self> {
723 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
724 }
725
726 pub async fn open_with_options(
732 location: &Url,
733 mut storage_options: HashMap<String, String>,
734 caps: RuntimeCaps,
735 ) -> Result<Self> {
736 if let Some(path) = config::local_path(location) {
737 tokio::fs::create_dir_all(&path)
738 .await
739 .with_context(|| format!("failed to create data dir {}", path.display()))?;
740 } else {
741 apply_remote_storage_defaults(&mut storage_options);
742 }
743 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
749 let session = Arc::new(Session::new(
750 index_cache_bytes,
751 metadata_cache_bytes,
752 Arc::new(ObjectStoreRegistry::default()),
753 ));
754 let root = location.as_str().trim_end_matches('/').to_string();
760 let mut connect = ConnectBuilder::new("dir")
761 .property("root", root)
762 .session(session.clone());
763 for (key, value) in &storage_options {
767 connect = connect.property(format!("storage.{key}"), value.clone());
768 }
769 let nm: Arc<dyn LanceNamespace> = connect
770 .connect()
771 .await
772 .context("failed to connect lance Directory namespace")?;
773 let nm_ident = NamespaceIdent::root();
774 let refresh_after = if config::is_local(location) {
780 Duration::ZERO
781 } else {
782 Duration::from_secs(5)
783 };
784 let handle = Self {
785 datasets: DatasetSet {
786 sessions: Mutex::new(CachedDataset {
787 dataset: open_or_create_via_ns(
788 &nm,
789 &nm_ident,
790 sessions::SESSIONS,
791 sessions::session_schema(),
792 &session,
793 &storage_options,
794 )
795 .await?,
796 last_refresh: Instant::now(),
797 refresh_after,
798 }),
799 messages: Mutex::new(CachedDataset {
800 dataset: open_or_create_via_ns(
801 &nm,
802 &nm_ident,
803 sessions::MESSAGES,
804 sessions::message_schema(),
805 &session,
806 &storage_options,
807 )
808 .await?,
809 last_refresh: Instant::now(),
810 refresh_after,
811 }),
812 parts: OnceCell::new(),
813 },
814 retry: RetryPolicy::default(),
815 session,
816 nm,
817 nm_ident,
818 storage_options,
819 location: location.clone(),
820 parts_refresh_after: refresh_after,
821 };
822 Ok(handle)
823 }
824
825 pub fn location(&self) -> &Url {
826 &self.location
827 }
828
829 pub fn storage_options(&self) -> &HashMap<String, String> {
833 &self.storage_options
834 }
835
836 fn export_uri(&self, name: &str) -> String {
842 format!(
843 "{}/exports/{name}",
844 self.location.as_str().trim_end_matches('/')
845 )
846 }
847
848 fn object_store_params(&self) -> ObjectStoreParams {
852 ObjectStoreParams {
853 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
854 Arc::new(StorageOptionsAccessor::with_static_options(
855 self.storage_options.clone(),
856 ))
857 }),
858 ..Default::default()
859 }
860 }
861
862 pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
865 let uri = self.export_uri(name);
866 let registry = Arc::new(ObjectStoreRegistry::default());
867 let (store, path) =
868 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
869 .await
870 .with_context(|| format!("failed to open object store for {uri}"))?;
871 store
872 .put(&path, bytes)
873 .await
874 .with_context(|| format!("failed to write export {uri}"))?;
875 Ok(())
876 }
877
878 pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
881 let uri = self.export_uri(name);
882 let registry = Arc::new(ObjectStoreRegistry::default());
883 let (store, path) =
884 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
885 .await
886 .with_context(|| format!("failed to open object store for {uri}"))?;
887 let bytes = store
888 .read_one_all(&path)
889 .await
890 .with_context(|| format!("failed to read export {uri}"))?;
891 Ok(bytes.to_vec())
892 }
893
894 pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
899 if self.location.scheme() != "file" {
900 return None;
901 }
902 let dir = self.location.to_file_path().ok()?;
903 Some(dir.join("exports").join(name))
904 }
905
906 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
907 Ok((
908 self.count_rows(Table::Sessions).await?,
909 self.count_rows(Table::Messages).await?,
910 self.count_rows(Table::Parts).await?,
911 ))
912 }
913
914 pub(crate) async fn merge_insert(
918 &self,
919 table: Table,
920 batch: RecordBatch,
921 row_count: usize,
922 ) -> Result<u64> {
923 self.merge(
924 table,
925 batch,
926 row_count,
927 "merge_insert",
928 WhenMatched::DoNothing,
929 WhenNotMatched::InsertAll,
930 )
931 .await
932 }
933
934 pub(crate) async fn merge_update(
937 &self,
938 table: Table,
939 batch: RecordBatch,
940 row_count: usize,
941 ) -> Result<u64> {
942 self.merge(
943 table,
944 batch,
945 row_count,
946 "merge_update",
947 WhenMatched::UpdateAll,
948 WhenNotMatched::DoNothing,
949 )
950 .await
951 }
952
953 async fn merge(
957 &self,
958 table: Table,
959 batch: RecordBatch,
960 row_count: usize,
961 op: &'static str,
962 when_matched: WhenMatched,
963 when_not_matched: WhenNotMatched,
964 ) -> Result<u64> {
965 if row_count == 0 {
966 return Ok(0);
967 }
968 let started = Instant::now();
969 let result = self
970 .retry_lance(table.label(), || async {
971 let mut cached = self.cached(table).await?.lock().await;
972 let existing = cached.latest().await?;
973 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
974 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
975 builder.when_matched(when_matched.clone());
976 builder.when_not_matched(when_not_matched.clone());
977 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
980 builder.skip_auto_cleanup(true);
984 let (dataset, stats) = builder
985 .try_build()?
986 .execute_reader(Box::new(reader))
987 .await?;
988 cached.replace(dataset.as_ref().clone());
989 Ok((
990 stats.num_inserted_rows + stats.num_updated_rows,
991 stats.num_skipped_duplicates,
992 ))
993 })
994 .await;
995 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
996 tracing::info!(
997 target: "pond::perf",
998 op,
999 table = %table.label(),
1000 rows = row_count,
1001 elapsed_ms = started.elapsed().as_millis() as u64,
1002 skipped,
1003 "merge",
1004 );
1005 result.map(|(affected, _)| affected)
1006 }
1007
1008 pub async fn optimize_table(
1017 &self,
1018 table: Table,
1019 intents: &[IndexIntent],
1020 progress: Option<&OptimizeProgressFn>,
1021 policy: &MaintenancePolicy,
1022 ) -> TableOptimizeOutcome {
1023 let compaction = self
1024 .run_optimize_compact_phase(table, progress, policy)
1025 .await;
1026 let indices = self
1027 .run_optimize_indices_phase(table, intents, progress)
1028 .await;
1029 TableOptimizeOutcome {
1030 table,
1031 indices,
1032 compaction,
1033 }
1034 }
1035
1036 pub async fn optimize_table_indices_only(
1040 &self,
1041 table: Table,
1042 intents: &[IndexIntent],
1043 progress: Option<&OptimizeProgressFn>,
1044 ) -> PhaseOutcome {
1045 self.run_optimize_indices_phase(table, intents, progress)
1046 .await
1047 }
1048
1049 async fn run_optimize_indices_phase(
1050 &self,
1051 table: Table,
1052 intents: &[IndexIntent],
1053 progress: Option<&OptimizeProgressFn>,
1054 ) -> PhaseOutcome {
1055 if intents.is_empty() {
1056 return PhaseOutcome::Noop;
1057 }
1058 let result = self
1059 .retry_lance(table.label(), || async {
1060 let mut guard = self.cached(table).await?.lock().await;
1061 let mut dataset = guard.latest().await?;
1062 let did_work =
1063 optimize_table_indices(&mut dataset, intents, table, progress).await?;
1064 guard.replace(dataset);
1065 Ok::<_, anyhow::Error>(did_work)
1066 })
1067 .await;
1068 match result {
1069 Ok(true) => PhaseOutcome::Ok,
1070 Ok(false) => PhaseOutcome::Noop,
1071 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1072 Err(error) => PhaseOutcome::Failed(error),
1073 }
1074 }
1075
1076 async fn run_optimize_compact_phase(
1077 &self,
1078 table: Table,
1079 progress: Option<&OptimizeProgressFn>,
1080 policy: &MaintenancePolicy,
1081 ) -> PhaseOutcome {
1082 let result = self
1083 .retry_lance(table.label(), || async {
1084 let mut guard = self.cached(table).await?.lock().await;
1085 let mut dataset = guard.latest().await?;
1086 optimize_table_compact(&mut dataset, table, progress, policy).await?;
1087 guard.replace(dataset);
1088 Ok::<_, anyhow::Error>(())
1089 })
1090 .await;
1091 match result {
1092 Ok(()) => PhaseOutcome::Ok,
1093 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1094 Err(error) => PhaseOutcome::Failed(error),
1095 }
1096 }
1097
1098 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1099 self.retry_lance(table.label(), || async {
1100 let mut guard = self.cached(table).await?.lock().await;
1101 let mut dataset = guard.latest().await?;
1102 rebuild_index(&mut dataset, intent).await?;
1103 guard.replace(dataset);
1104 Ok(())
1105 })
1106 .await
1107 }
1108
1109 pub async fn index_status(
1110 &self,
1111 table: Table,
1112 intents: &[IndexIntent],
1113 ) -> Result<Vec<IndexStatus>> {
1114 let dataset = self.dataset(table).await?;
1115 index_status(table, &dataset, intents).await
1116 }
1117
1118 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1119 let mut cached = self.cached(table).await?.lock().await;
1120 cached.latest().await
1121 }
1122 pub(crate) async fn scanner(
1127 &self,
1128 table: Table,
1129 predicate: Option<&Predicate>,
1130 ) -> Result<lance::dataset::scanner::Scanner> {
1131 let dataset = self.dataset(table).await?;
1132 scanner_with_prefilter(&dataset, predicate)
1133 }
1134 pub async fn scan(
1137 &self,
1138 table: Table,
1139 opts: ScanOpts<'_>,
1140 ) -> Result<lance::dataset::scanner::Scanner> {
1141 let mut scanner = self.scanner(table, opts.predicate).await?;
1142 if let Some(projection) = opts.projection {
1143 scanner.project(projection)?;
1144 }
1145 Ok(scanner)
1146 }
1147 pub(crate) async fn scan_batch(
1148 &self,
1149 table: Table,
1150 predicate: Option<&Predicate>,
1151 projection: &[&str],
1152 ) -> Result<RecordBatch> {
1153 let opts = ScanOpts {
1154 predicate,
1155 projection: (!projection.is_empty()).then_some(projection),
1156 };
1157 self.scan(table, opts)
1158 .await?
1159 .try_into_batch()
1160 .await
1161 .context("scan failed")
1162 }
1163 pub async fn count_rows(&self, table: Table) -> Result<usize> {
1164 self.dataset(table)
1165 .await?
1166 .count_rows(None)
1167 .await
1168 .map_err(Into::into)
1169 }
1170 #[cfg(test)]
1172 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1173 let dataset = self.dataset(Table::Messages).await?;
1174 let indices = dataset.load_indices().await?;
1175 Ok(indices.iter().map(|index| index.name.clone()).collect())
1176 }
1177
1178 pub(crate) async fn unindexed_row_count(
1181 &self,
1182 table: Table,
1183 index_name: &str,
1184 ) -> Result<usize> {
1185 let dataset = self.dataset(table).await?;
1186 let fragments = dataset
1187 .unindexed_fragments(index_name)
1188 .await
1189 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1190 Ok(fragments
1191 .iter()
1192 .map(|fragment| fragment.num_rows().unwrap_or(0))
1193 .sum())
1194 }
1195
1196 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1202 let mut guard = self.cached(table).await?.lock().await;
1203 let mut dataset = guard.latest().await?;
1204 dataset
1205 .drop_index(name)
1206 .await
1207 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1208 guard.replace(dataset);
1209 Ok(())
1210 }
1211
1212 async fn table_location(&self, table_name: &str) -> Result<String> {
1215 let request = DescribeTableRequest {
1216 id: Some(self.nm_ident.as_table_id(table_name)),
1217 ..Default::default()
1218 };
1219 let response = self
1220 .nm
1221 .describe_table(request)
1222 .await
1223 .with_context(|| format!("failed to describe table {table_name}"))?;
1224 response
1225 .location
1226 .with_context(|| format!("namespace returned no location for table {table_name}"))
1227 }
1228
1229 pub async fn table_sizes(&self) -> Result<TableSizes> {
1233 let registry = Arc::new(ObjectStoreRegistry::default());
1234 let params = self.object_store_params();
1235
1236 let sessions = self
1237 .listed_size(
1238 ®istry,
1239 ¶ms,
1240 &self.table_location(sessions::SESSIONS).await?,
1241 )
1242 .await?;
1243 let messages = self
1244 .listed_size(
1245 ®istry,
1246 ¶ms,
1247 &self.table_location(sessions::MESSAGES).await?,
1248 )
1249 .await?;
1250 let parts = self
1251 .listed_size(
1252 ®istry,
1253 ¶ms,
1254 &self.table_location(sessions::PARTS).await?,
1255 )
1256 .await?;
1257 let root_total = self
1260 .listed_size(®istry, ¶ms, self.location.as_str())
1261 .await?;
1262 let other = root_total.saturating_sub(sessions + messages + parts);
1263 let sessions_data = self
1264 .data_liveness(®istry, ¶ms, Table::Sessions, sessions::SESSIONS)
1265 .await?;
1266 let messages_data = self
1267 .data_liveness(®istry, ¶ms, Table::Messages, sessions::MESSAGES)
1268 .await?;
1269 let parts_data = self
1270 .data_liveness(®istry, ¶ms, Table::Parts, sessions::PARTS)
1271 .await?;
1272 Ok(TableSizes {
1273 sessions,
1274 messages,
1275 parts,
1276 other,
1277 sessions_data,
1278 messages_data,
1279 parts_data,
1280 })
1281 }
1282
1283 async fn data_liveness(
1284 &self,
1285 registry: &Arc<ObjectStoreRegistry>,
1286 params: &ObjectStoreParams,
1287 table: Table,
1288 table_name: &str,
1289 ) -> Result<DataLiveness> {
1290 let location = self.table_location(table_name).await?;
1291 let data_dir = format!("{}/data", location.trim_end_matches('/'));
1292 let on_disk = self.listed_size(registry, params, &data_dir).await?;
1293 let dataset = self.dataset(table).await?;
1294 let live = dataset
1295 .get_fragments()
1296 .iter()
1297 .try_fold(0u64, |total, fragment| {
1298 Some(total + fragment_bytes(fragment.metadata())?)
1299 });
1300 Ok(DataLiveness { on_disk, live })
1301 }
1302
1303 async fn listed_size(
1305 &self,
1306 registry: &Arc<ObjectStoreRegistry>,
1307 params: &ObjectStoreParams,
1308 uri: &str,
1309 ) -> Result<u64> {
1310 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1311 .await
1312 .with_context(|| format!("failed to open object store for {uri}"))?;
1313 let mut listing = store.list(Some(base));
1314 let mut total = 0u64;
1315 while let Some(meta) = listing.next().await {
1316 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1317 total += meta.size;
1318 }
1319 Ok(total)
1320 }
1321 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1322 match table {
1323 Table::Sessions => Ok(&self.datasets.sessions),
1324 Table::Messages => Ok(&self.datasets.messages),
1325 Table::Parts => self.parts_cached().await,
1326 }
1327 }
1328
1329 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1332 self.datasets
1333 .parts
1334 .get_or_try_init(|| async {
1335 let dataset = open_or_create_via_ns(
1336 &self.nm,
1337 &self.nm_ident,
1338 sessions::PARTS,
1339 sessions::part_schema(),
1340 &self.session,
1341 &self.storage_options,
1342 )
1343 .await?;
1344 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1345 dataset,
1346 last_refresh: Instant::now(),
1347 refresh_after: self.parts_refresh_after,
1348 }))
1349 })
1350 .await
1351 }
1352 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1353 where
1354 Fut: std::future::Future<Output = Result<T>>,
1355 Op: FnMut() -> Fut,
1356 {
1357 let mut attempt = 0u8;
1358 loop {
1359 attempt = attempt.saturating_add(1);
1360 match operation().await {
1361 Ok(value) => return Ok(value),
1362 Err(error) if attempt < self.retry.attempts => {
1363 let backoff = self.backoff(attempt);
1364 let error_chain = format!("{error:#}");
1367 tracing::warn!(
1368 label,
1369 attempt,
1370 ?backoff,
1371 error = %error_chain,
1372 "retrying Lance operation"
1373 );
1374 tokio::time::sleep(backoff).await;
1375 }
1376 Err(error) => {
1377 let error_chain = format!("{error:#}");
1378 tracing::warn!(
1379 label,
1380 attempt,
1381 error = %error_chain,
1382 "Lance operation exhausted retries"
1383 );
1384 if is_commit_conflict(&error) {
1391 return Err(error.context(ConflictExhausted { attempts: attempt }));
1392 }
1393 return Err(error);
1394 }
1395 }
1396 }
1397 }
1398 fn backoff(&self, attempt: u8) -> Duration {
1399 let shift = u32::from(attempt.saturating_sub(1));
1400 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1401 let base = self.retry.initial_backoff.saturating_mul(multiplier);
1402 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1405 base.mul_f64(factor).min(self.retry.max_backoff)
1406 }
1407}
1408async fn optimize_table_compact(
1429 dataset: &mut Dataset,
1430 table: Table,
1431 progress: Option<&OptimizeProgressFn>,
1432 policy: &MaintenancePolicy,
1433) -> Result<()> {
1434 let stats: Vec<FragmentStat> = dataset
1435 .get_fragments()
1436 .iter()
1437 .map(|fragment| fragment_stat(fragment.metadata()))
1438 .collect();
1439 let compaction = CompactionOptions {
1440 target_rows_per_fragment: derived_target_rows(&stats),
1441 max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
1442 defer_index_remap: false,
1443 ..CompactionOptions::default()
1444 };
1445
1446 let mut plan = plan_compaction(dataset, &compaction).await?;
1447 if policy.compaction_fragment_cap > 0 {
1448 plan.tasks.retain(|task| {
1449 let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
1450 let keep = keep_task(
1451 &task_stats,
1452 policy.compaction_fragment_cap,
1453 compaction.materialize_deletions_threshold,
1454 );
1455 if !keep {
1456 tracing::debug!(
1457 target: "pond::perf",
1458 table = table.as_str(),
1459 fragments = task_stats.len(),
1460 "compaction task vetoed: merge dominated by one large fragment",
1461 );
1462 }
1463 keep
1464 });
1465 }
1466 if plan.tasks.is_empty() {
1467 tracing::debug!(
1468 target: "pond::perf",
1469 table = table.as_str(),
1470 "compaction skipped: no task to run",
1471 );
1472 } else {
1473 emit(
1474 progress,
1475 OptimizeEvent::PhaseStart {
1476 table,
1477 phase: OptimizePhase::Compact,
1478 detail: None,
1479 },
1480 );
1481 let started = Instant::now();
1482 let mut completed = Vec::with_capacity(plan.tasks.len());
1483 for task in plan.compaction_tasks() {
1484 completed.push(task.execute(dataset).await?);
1485 }
1486 commit_compaction(
1487 dataset,
1488 completed,
1489 Arc::new(DatasetIndexRemapperOptions::default()),
1490 &compaction,
1491 )
1492 .await?;
1493 emit(
1494 progress,
1495 OptimizeEvent::PhaseDone {
1496 table,
1497 phase: OptimizePhase::Compact,
1498 elapsed_ms: started.elapsed().as_millis() as u64,
1499 },
1500 );
1501 }
1502
1503 emit(
1507 progress,
1508 OptimizeEvent::PhaseStart {
1509 table,
1510 phase: OptimizePhase::Cleanup,
1511 detail: None,
1512 },
1513 );
1514 let started = Instant::now();
1515 dataset
1516 .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1517 .await
1518 .context("cleanup_old_versions failed during index optimize")?;
1519 emit(
1520 progress,
1521 OptimizeEvent::PhaseDone {
1522 table,
1523 phase: OptimizePhase::Cleanup,
1524 elapsed_ms: started.elapsed().as_millis() as u64,
1525 },
1526 );
1527
1528 Ok(())
1529}
1530
1531async fn optimize_table_indices(
1534 dataset: &mut Dataset,
1535 intents: &[IndexIntent],
1536 table: Table,
1537 progress: Option<&OptimizeProgressFn>,
1538) -> Result<bool> {
1539 let existing = dataset.load_indices().await?;
1540 let existing_names: std::collections::HashSet<String> =
1541 existing.iter().map(|index| index.name.clone()).collect();
1542
1543 let mut append_indices: Vec<String> = Vec::new();
1544 let mut did_work = false;
1545
1546 for intent in intents {
1547 let exists = existing_names.contains(intent.name);
1548
1549 if !exists {
1550 if !intent.trigger.should_create(dataset).await? {
1551 continue;
1552 }
1553 let params = intent.params.build(dataset).await?;
1554 let index_type = intent.params.index_type();
1555 tracing::info!(
1556 index = intent.name,
1557 column = intent.column,
1558 "creating Lance index (trigger fired)",
1559 );
1560 emit(
1561 progress,
1562 OptimizeEvent::PhaseStart {
1563 table,
1564 phase: OptimizePhase::IndexCreate,
1565 detail: Some(intent.name.to_owned()),
1566 },
1567 );
1568 let started = Instant::now();
1569 dataset
1570 .create_index(
1571 &[intent.column],
1572 index_type,
1573 Some(intent.name.to_owned()),
1574 params.as_ref(),
1575 false,
1576 )
1577 .await
1578 .with_context(|| format!("failed to create index {}", intent.name))?;
1579 emit(
1580 progress,
1581 OptimizeEvent::PhaseDone {
1582 table,
1583 phase: OptimizePhase::IndexCreate,
1584 elapsed_ms: started.elapsed().as_millis() as u64,
1585 },
1586 );
1587 did_work = true;
1588 continue;
1589 }
1590
1591 let unindexed = dataset.unindexed_fragments(intent.name).await?;
1592 if unindexed.is_empty() {
1593 continue;
1594 }
1595 if unindexed.len() < index_lag_threshold() {
1599 continue;
1600 }
1601 match intent.params {
1602 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1603 let params = intent.params.build(dataset).await?;
1604 let index_type = intent.params.index_type();
1605 tracing::debug!(
1606 target: "pond::perf",
1607 index = intent.name,
1608 column = intent.column,
1609 "rebuilding Lance BTree index",
1610 );
1611 emit(
1612 progress,
1613 OptimizeEvent::PhaseStart {
1614 table,
1615 phase: OptimizePhase::IndexRebuild,
1616 detail: Some(intent.name.to_owned()),
1617 },
1618 );
1619 let started = Instant::now();
1620 dataset
1621 .create_index(
1622 &[intent.column],
1623 index_type,
1624 Some(intent.name.to_owned()),
1625 params.as_ref(),
1626 true,
1627 )
1628 .await
1629 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1630 emit(
1631 progress,
1632 OptimizeEvent::PhaseDone {
1633 table,
1634 phase: OptimizePhase::IndexRebuild,
1635 elapsed_ms: started.elapsed().as_millis() as u64,
1636 },
1637 );
1638 did_work = true;
1639 }
1640 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1641 | IndexParamsKind::InvertedFtsNgram { .. }
1642 | IndexParamsKind::IvfPqCosine { .. } => {
1643 append_indices.push(intent.name.to_owned());
1644 }
1645 IndexParamsKind::Scalar(_) => {
1646 let params = intent.params.build(dataset).await?;
1647 emit(
1648 progress,
1649 OptimizeEvent::PhaseStart {
1650 table,
1651 phase: OptimizePhase::IndexRebuild,
1652 detail: Some(intent.name.to_owned()),
1653 },
1654 );
1655 let started = Instant::now();
1656 dataset
1657 .create_index(
1658 &[intent.column],
1659 intent.params.index_type(),
1660 Some(intent.name.to_owned()),
1661 params.as_ref(),
1662 true,
1663 )
1664 .await
1665 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1666 emit(
1667 progress,
1668 OptimizeEvent::PhaseDone {
1669 table,
1670 phase: OptimizePhase::IndexRebuild,
1671 elapsed_ms: started.elapsed().as_millis() as u64,
1672 },
1673 );
1674 did_work = true;
1675 }
1676 }
1677 }
1678
1679 if !append_indices.is_empty() {
1680 let to_append = append_indices.clone();
1681 emit(
1682 progress,
1683 OptimizeEvent::PhaseStart {
1684 table,
1685 phase: OptimizePhase::IndexAppend,
1686 detail: Some(append_indices.join(", ")),
1687 },
1688 );
1689 let started = Instant::now();
1690 dataset
1691 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1692 .await
1693 .context("optimize_indices(append) failed during index optimize")?;
1694 emit(
1695 progress,
1696 OptimizeEvent::PhaseDone {
1697 table,
1698 phase: OptimizePhase::IndexAppend,
1699 elapsed_ms: started.elapsed().as_millis() as u64,
1700 },
1701 );
1702 tracing::debug!(
1703 target: "pond::perf",
1704 indices = ?append_indices,
1705 "appended trailing fragments into indices",
1706 );
1707 did_work = true;
1708 }
1709
1710 Ok(did_work)
1711}
1712
1713async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1714 if !intent.trigger.should_create(dataset).await? {
1715 return Ok(());
1716 }
1717 let params = intent.params.build(dataset).await?;
1718 dataset
1719 .create_index(
1720 &[intent.column],
1721 intent.params.index_type(),
1722 Some(intent.name.to_owned()),
1723 params.as_ref(),
1724 true,
1725 )
1726 .await
1727 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1728 Ok(())
1729}
1730
1731async fn index_status(
1732 table: Table,
1733 dataset: &Dataset,
1734 intents: &[IndexIntent],
1735) -> Result<Vec<IndexStatus>> {
1736 let existing = dataset.load_indices().await?;
1737 let existing_names: std::collections::HashSet<String> =
1738 existing.iter().map(|index| index.name.clone()).collect();
1739 let total_fragments = dataset.get_fragments().len();
1740 let total_rows = dataset.count_rows(None).await?;
1741 let mut statuses = Vec::with_capacity(intents.len());
1742 for intent in intents {
1743 let exists = existing_names.contains(intent.name);
1744 if !exists {
1745 statuses.push(IndexStatus {
1746 table,
1747 intent_name: intent.name.to_owned(),
1748 fragments_covered: 0,
1749 unindexed_fragments: total_fragments,
1750 unindexed_rows: total_rows,
1751 exists,
1752 });
1753 continue;
1754 }
1755 let unindexed = dataset
1756 .unindexed_fragments(intent.name)
1757 .await
1758 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1759 let unindexed_fragments = unindexed.len();
1760 let unindexed_rows = unindexed
1761 .iter()
1762 .map(|fragment| fragment.num_rows().unwrap_or(0))
1763 .sum();
1764 statuses.push(IndexStatus {
1765 table,
1766 intent_name: intent.name.to_owned(),
1767 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1768 unindexed_fragments,
1769 unindexed_rows,
1770 exists,
1771 });
1772 }
1773 Ok(statuses)
1774}
1775
1776async fn open_or_create_via_ns(
1788 nm: &Arc<dyn LanceNamespace>,
1789 nm_ident: &NamespaceIdent,
1790 table_name: &str,
1791 schema: lance::deps::arrow_schema::SchemaRef,
1792 session: &Arc<Session>,
1793 storage_options: &HashMap<String, String>,
1794) -> Result<Dataset> {
1795 let table_id = nm_ident.as_table_id(table_name);
1796
1797 let request = DescribeTableRequest {
1798 id: Some(table_id.clone()),
1799 ..Default::default()
1800 };
1801 match nm.describe_table(request).await {
1802 Ok(response) => {
1803 let location = response.location.with_context(|| {
1804 format!("namespace returned no location for table {table_name}")
1805 })?;
1806 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1807 if !storage_options.is_empty() {
1808 builder = builder.with_storage_options(storage_options.clone());
1809 }
1810 let dataset = builder
1811 .load()
1812 .await
1813 .with_context(|| format!("failed to open table {table_name}"))?;
1814 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1815 return Ok(dataset);
1816 }
1817 Err(error) => match &error {
1818 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1819 }
1821 _ => {
1822 return Err(anyhow::Error::from(error))
1823 .with_context(|| format!("failed to describe table {table_name}"));
1824 }
1825 },
1826 }
1827
1828 let mut write_params = sessions::write_params_for_create();
1831 write_params.session = Some(session.clone());
1832 write_params.mode = WriteMode::Create;
1833 if !storage_options.is_empty() {
1834 write_params.store_params = Some(ObjectStoreParams {
1835 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1836 storage_options.clone(),
1837 ))),
1838 ..Default::default()
1839 });
1840 }
1841 let reader = sessions::empty_reader(schema)?;
1842 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1843 .await
1844 .with_context(|| format!("failed to create table {table_name}"))
1845}
1846
1847fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1851 if !matches!(error, lance::Error::Namespace { .. }) {
1852 return false;
1853 }
1854 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1855 link.source()
1856 })
1857 .filter_map(|link| link.downcast_ref::<NamespaceError>())
1858 .any(|inner| inner.code() == code)
1859}
1860
1861fn scanner_with_prefilter(
1862 dataset: &Dataset,
1863 predicate: Option<&Predicate>,
1864) -> Result<lance::dataset::scanner::Scanner> {
1865 let mut scanner = dataset.scan();
1866 scanner.prefilter(true);
1867 if let Some(predicate) = predicate {
1868 let filter = predicate.to_lance();
1869 if !filter.is_empty() {
1870 scanner.filter(&filter)?;
1871 }
1872 }
1873 Ok(scanner)
1874}
1875fn ensure_schema_matches(
1876 dataset: &Dataset,
1877 expected: &lance::deps::arrow_schema::Schema,
1878 table_name: &str,
1879) -> Result<()> {
1880 use lance::deps::arrow_schema::DataType;
1881 use std::collections::BTreeSet;
1882 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1883 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1884 let expected_names: BTreeSet<&str> = expected
1885 .fields()
1886 .iter()
1887 .map(|f| f.name().as_str())
1888 .collect();
1889 if actual_names != expected_names {
1890 anyhow::bail!(
1891 "table {table_name} has columns {actual_names:?} but this pond build expects \
1892 {expected_names:?} - the on-disk store predates a schema change; delete the \
1893 data directory and re-run `pond ingest`",
1894 );
1895 }
1896 for actual_field in actual.fields() {
1901 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1902 continue;
1903 };
1904 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1905 (actual_field.data_type(), expected_field.data_type())
1906 && actual_dim != expected_dim
1907 {
1908 tracing::warn!(
1909 table = table_name,
1910 column = actual_field.name(),
1911 actual_dim,
1912 expected_dim,
1913 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1914 );
1915 }
1916 }
1917 Ok(())
1918}
1919fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1926 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1927 if aliases
1928 .iter()
1929 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1930 {
1931 return;
1932 }
1933 options.insert(aliases[0].to_owned(), value.to_owned());
1934 }
1935 set_default(options, &["pool_idle_timeout"], "300 seconds");
1936 set_default(options, &["connect_timeout"], "10 seconds");
1937 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1938 .iter()
1939 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1940 if has_custom_endpoint {
1941 set_default(
1942 options,
1943 &["aws_unsigned_payload", "unsigned_payload"],
1944 "true",
1945 );
1946 }
1947}
1948
1949fn quoted_string(value: &str) -> String {
1950 format!("'{}'", value.replace('\'', "''"))
1951}
1952fn like_contains(value: &str) -> String {
1953 let escaped = value
1954 .replace('\\', "\\\\")
1955 .replace('%', "\\%")
1956 .replace('_', "\\_")
1957 .replace('\'', "''");
1958 format!("'%{escaped}%'")
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963 use super::*;
1964 use tempfile::TempDir;
1965
1966 fn stat(bytes: u64) -> FragmentStat {
1967 FragmentStat {
1968 bytes: Some(bytes),
1969 rows: bytes / 1_000,
1970 deleted_rows: 0,
1971 }
1972 }
1973
1974 #[test]
1975 fn compaction_veto_blocks_absorb_keeps_peers() {
1976 let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
1978 assert!(!keep_task(&absorb, 64, 0.1));
1979 let peers = [stat(300_000_000), stat(300_000_000)];
1981 assert!(keep_task(&peers, 64, 0.1));
1982 let tiered = [stat(400_000), stat(60_000), stat(40_000)];
1984 assert!(keep_task(&tiered, 64, 0.1));
1985 }
1986
1987 #[test]
1988 fn compaction_veto_passes_deletions_and_cap() {
1989 let mut deleting = stat(665_000_000);
1990 deleting.deleted_rows = deleting.rows / 5;
1991 assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
1992
1993 let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
1994 .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
1995 .collect();
1996 assert!(keep_task(&wide, 64, 0.1));
1997 }
1998
1999 #[test]
2000 fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
2001 let mut unknown = stat(665_000_000);
2002 unknown.bytes = None;
2003 assert!(!keep_task(
2005 &[unknown, stat(1_000_000), stat(2_000_000)],
2006 64,
2007 0.1
2008 ));
2009 }
2010
2011 #[test]
2012 fn derived_target_rows_tracks_row_size_and_clamps() {
2013 let parts_like = [FragmentStat {
2015 bytes: Some(665_000_000),
2016 rows: 511_000,
2017 deleted_rows: 0,
2018 }];
2019 let target = derived_target_rows(&parts_like);
2020 assert!((150_000..300_000).contains(&target), "{target}");
2021 let unknown = [FragmentStat {
2023 bytes: None,
2024 rows: 511_000,
2025 deleted_rows: 0,
2026 }];
2027 assert_eq!(
2028 derived_target_rows(&unknown),
2029 MAX_TARGET_ROWS_PER_FRAGMENT as usize
2030 );
2031 let tiny = [FragmentStat {
2033 bytes: Some(1_000_000),
2034 rows: 100_000,
2035 deleted_rows: 0,
2036 }];
2037 assert_eq!(
2038 derived_target_rows(&tiny),
2039 MAX_TARGET_ROWS_PER_FRAGMENT as usize
2040 );
2041 let huge = [FragmentStat {
2042 bytes: Some(1_000_000_000),
2043 rows: 100,
2044 deleted_rows: 0,
2045 }];
2046 assert_eq!(
2047 derived_target_rows(&huge),
2048 MIN_TARGET_ROWS_PER_FRAGMENT as usize
2049 );
2050 }
2051
2052 #[test]
2053 fn namespace_error_code_walks_wrapped_chain() {
2054 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
2055 message: "missing".into(),
2056 }));
2057 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
2058
2059 let wrapped = lance::Error::namespace_source(Box::new(direct));
2060 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
2061
2062 let other_code =
2063 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
2064 message: "nope".into(),
2065 }));
2066 assert!(!is_namespace_error_code(
2067 &other_code,
2068 ErrorCode::TableNotFound
2069 ));
2070
2071 let not_namespace = lance::Error::internal("unrelated");
2072 assert!(!is_namespace_error_code(
2073 ¬_namespace,
2074 ErrorCode::TableNotFound
2075 ));
2076 }
2077
2078 #[tokio::test]
2082 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
2083 let temp = TempDir::new()?;
2084 let url = Url::from_directory_path(temp.path())
2085 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
2086 let handle = Handle::open(&url).await?;
2087 let cases: [(Table, &[&str]); 3] = [
2090 (Table::Sessions, &["id"]),
2091 (Table::Messages, &["id"]),
2092 (Table::Parts, &["id"]),
2093 ];
2094 for (table, projection) in cases {
2095 let scanner = handle
2096 .scan(table, ScanOpts::project_only(projection))
2097 .await?;
2098 let batch = scanner.try_into_batch().await?;
2099 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
2100 }
2101 Ok(())
2102 }
2103}