Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30    collections::HashMap,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37/// Embedded-row count at which pond builds the IVF_PQ vector index on
38/// `messages.vector` (spec.md#search). Below it, vector search runs a
39/// brute-force flat scan - exact and fast at small and medium scale, and
40/// IVF_PQ cannot train well on fewer vectors anyway.
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43/// Default minimum unindexed-fragment count required before a per-intent
44/// append/rebuild step is admitted into `optimize_table_indices`. Lower
45/// values make each commit smaller and more frequent (bad on remote
46/// stores); higher values let fragments accumulate behind the brute-force
47/// fallback. 4 is the floor of the documented 4-8 band.
48pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52/// Seed the process-wide index-lag threshold from `[maintenance].index_lag_threshold`.
53/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
54pub fn init_index_lag_threshold(value: usize) {
55    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59    INDEX_LAG_THRESHOLD_RUNTIME
60        .get()
61        .copied()
62        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65/// Compaction runs only past this many sub-target fragments, so the 5-min sync
66/// stops re-Rewriting the trailing fragment every pass (spec.md#lance-index-maintenance).
67/// 0 disables the gate.
68pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
69
70/// Default manifest-retention window for the safe cleanup pass. Matches
71/// LanceDB's recommended OSS-operator practice (lancedb docs: performance.mdx,
72/// tables/update.mdx). With `delete_unverified=false`, Lance's 7-day
73/// in-progress guard still protects unverified files regardless of this value
74/// (`UNVERIFIED_THRESHOLD_DAYS` in lance/dataset/cleanup.rs).
75pub fn default_cleanup_older_than() -> chrono::Duration {
76    chrono::Duration::days(1)
77}
78
79/// Resolved per-call inputs to the storage-maintenance pass. Built from
80/// `[maintenance]` (and any per-invocation CLI override) at the entry point;
81/// threaded down to `optimize_table_compact` so the substrate never re-reads
82/// `Config` itself.
83#[derive(Debug, Clone, Copy)]
84pub struct MaintenancePolicy {
85    /// Compaction gate: see [`DEFAULT_COMPACTION_FRAGMENT_CAP`]. `0` always
86    /// compacts (preserves the pre-gate test behavior).
87    pub compaction_fragment_cap: usize,
88    /// Manifest-retention window handed to `cleanup_old_versions`.
89    pub cleanup_older_than: chrono::Duration,
90}
91
92impl MaintenancePolicy {
93    /// Preserves the pre-gate compaction-always behavior that the existing
94    /// optimize tests assume.
95    pub fn always_compact() -> Self {
96        Self {
97            compaction_fragment_cap: 0,
98            cleanup_older_than: default_cleanup_older_than(),
99        }
100    }
101}
102
103/// Compact when the largest mergeable run of sub-target fragments can fill a
104/// whole target fragment (consolidation that freezes a fragment) or the
105/// sub-target count has piled past `cap`. `cap == 0` always compacts.
106fn should_compact(
107    mergeable_run_rows: usize,
108    candidate_count: usize,
109    target_rows: usize,
110    cap: usize,
111) -> bool {
112    mergeable_run_rows >= target_rows || candidate_count >= cap
113}
114
115/// Largest contiguous below-target run (rows) and total below-target count over
116/// fragments in dataset order. The run approximates Lance's biggest mergeable
117/// bin, so a fragment stranded between at-target fragments (which Lance won't
118/// merge) never inflates the total and never triggers perpetual re-compaction.
119fn compaction_candidates(
120    physical_rows: impl IntoIterator<Item = usize>,
121    target: usize,
122) -> (usize, usize) {
123    let mut count = 0;
124    let mut run = 0;
125    let mut max_run = 0;
126    for rows in physical_rows {
127        if rows < target {
128            count += 1;
129            run += rows;
130            max_run = max_run.max(run);
131        } else {
132            run = 0;
133        }
134    }
135    (max_run, count)
136}
137
138/// Declarative description of one index pond keeps on a table. Created when
139/// its trigger fires; folded forward by `pond index optimize`.
140#[derive(Debug, Clone)]
141pub struct IndexIntent {
142    /// Stable on-disk name. Must match across runs so existence checks
143    /// resolve.
144    pub name: &'static str,
145    /// Column the index covers.
146    pub column: &'static str,
147    /// Condition evaluated against the live dataset before each cycle.
148    pub trigger: IndexTrigger,
149    /// How the params are built at create time. Some intents have static
150    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
151    pub params: IndexParamsKind,
152}
153
154/// When an [`IndexIntent`] should exist on disk.
155#[derive(Debug, Clone)]
156pub enum IndexTrigger {
157    /// Build whenever the table has any rows. Used for FTS and scalar
158    /// indices: there is no training cost worth delaying.
159    OnAnyRows,
160    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
161    /// IVF_PQ vector index, which trains poorly on too few vectors.
162    OnNonNullCount {
163        column: &'static str,
164        threshold: usize,
165    },
166}
167
168/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
169/// right `IndexParams` at create time.
170#[derive(Debug, Clone)]
171pub enum IndexParamsKind {
172    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
173    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
174    Scalar(BuiltinIndexType),
175    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
176    /// `[min, max]` range and stemming / stop-words off
177    /// (spec.md#search-language-neutral-index).
178    InvertedFtsNgram { min: u32, max: u32 },
179    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
180    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
181    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
182    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
183    IvfPqCosine {
184        sub_vectors: usize,
185        num_bits: u8,
186        max_iters: usize,
187    },
188}
189
190impl IndexTrigger {
191    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
192        match self {
193            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
194            Self::OnNonNullCount { column, threshold } => {
195                let count = dataset
196                    .count_rows(Some(format!("{column} IS NOT NULL")))
197                    .await?;
198                Ok(count >= *threshold)
199            }
200        }
201    }
202}
203
204impl IndexParamsKind {
205    fn index_type(&self) -> IndexType {
206        match self {
207            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
208            Self::Scalar(_) => IndexType::BTree,
209            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
210            Self::IvfPqCosine { .. } => IndexType::Vector,
211        }
212    }
213
214    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
215        match self {
216            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
217            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
218                InvertedIndexParams::default()
219                    .base_tokenizer("ngram".to_owned())
220                    .ngram_min_length(*min)
221                    .ngram_max_length(*max)
222                    .stem(false)
223                    .remove_stop_words(false),
224            )),
225            Self::IvfPqCosine {
226                sub_vectors,
227                num_bits,
228                max_iters,
229            } => {
230                let count = dataset
231                    .count_rows(Some("vector IS NOT NULL".to_owned()))
232                    .await?;
233                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
234                Ok(Box::new(VectorIndexParams::ivf_pq(
235                    partitions,
236                    *num_bits,
237                    *sub_vectors,
238                    MetricType::Cosine,
239                    *max_iters,
240                )))
241            }
242        }
243    }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct IndexStatus {
248    pub table: Table,
249    pub intent_name: String,
250    pub fragments_covered: usize,
251    pub unindexed_fragments: usize,
252    pub unindexed_rows: usize,
253    pub exists: bool,
254}
255
256/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
257/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
258/// downcasts to this type to classify the outcome as `conflict` rather than
259/// the generic `storage_unavailable`.
260#[derive(Debug, Clone, Copy)]
261pub struct ConflictExhausted {
262    pub attempts: u8,
263}
264
265impl std::fmt::Display for ConflictExhausted {
266    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(
268            formatter,
269            "commit conflict exhausted after {} attempt(s)",
270            self.attempts
271        )
272    }
273}
274
275impl std::error::Error for ConflictExhausted {}
276
277/// Per-phase result for one table's pass through `Handle::optimize_table`.
278/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
279/// compaction phase get independent retry budgets and independent commits,
280/// so a hot writer that starves the Rewrite cannot abort the index Update.
281#[derive(Debug)]
282pub enum PhaseOutcome {
283    /// Phase attempted and committed work.
284    Ok,
285    /// Phase attempted; no work was needed.
286    Noop,
287    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
288    /// can rerun later once the hot writer quiesces).
289    SkippedConflict,
290    /// Phase failed with a non-conflict error.
291    Failed(anyhow::Error),
292    /// Phase not requested by the caller (e.g. compaction skipped under
293    /// `Store::build_indices_only`).
294    NotAttempted,
295}
296
297impl PhaseOutcome {
298    pub fn is_failed(&self) -> bool {
299        matches!(self, Self::Failed(_))
300    }
301}
302
303/// What `Handle::optimize_table` did for one table.
304#[derive(Debug)]
305pub struct TableOptimizeOutcome {
306    pub table: Table,
307    pub indices: PhaseOutcome,
308    pub compaction: PhaseOutcome,
309}
310
311/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
312/// progress callback to render a live spinner; library callers pass `None`.
313#[derive(Debug, Clone)]
314pub enum OptimizeEvent {
315    PhaseStart {
316        table: Table,
317        phase: OptimizePhase,
318        detail: Option<String>,
319    },
320    PhaseDone {
321        table: Table,
322        phase: OptimizePhase,
323        elapsed_ms: u64,
324    },
325}
326
327#[derive(Debug, Clone, Copy)]
328pub enum OptimizePhase {
329    Compact,
330    Cleanup,
331    IndexCreate,
332    IndexRebuild,
333    IndexAppend,
334}
335
336impl OptimizePhase {
337    pub fn label(self) -> &'static str {
338        match self {
339            Self::Compact => "compact",
340            Self::Cleanup => "cleanup",
341            Self::IndexCreate => "index-create",
342            Self::IndexRebuild => "index-rebuild",
343            Self::IndexAppend => "index-append",
344        }
345    }
346}
347
348pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
349
350fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
351    if let Some(callback) = progress {
352        callback(event);
353    }
354}
355
356/// True when the chain root is one of Lance's commit-conflict variants
357/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
358/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
359pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
360    error.downcast_ref::<lance::Error>().is_some_and(|err| {
361        matches!(
362            err,
363            lance::Error::CommitConflict { .. }
364                | lance::Error::RetryableCommitConflict { .. }
365                | lance::Error::TooMuchWriteContention { .. }
366        )
367    })
368}
369
370/// True when `retry_lance` exhausted retries against an OCC conflict and
371/// attached `ConflictExhausted` to the chain head.
372fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
373    error.chain().any(|cause| cause.is::<ConflictExhausted>())
374}
375
376/// On-disk byte totals for the three session datasets, plus everything else
377/// under the data-dir root. Sized by listing through Lance's object-store
378/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
379#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
380pub struct TableSizes {
381    pub sessions: u64,
382    pub messages: u64,
383    pub parts: u64,
384    pub other: u64,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub enum ScalarValue {
389    String(String),
390    Int32(i32),
391    Raw(String),
392}
393impl From<&str> for ScalarValue {
394    fn from(value: &str) -> Self {
395        Self::String(value.to_owned())
396    }
397}
398impl From<String> for ScalarValue {
399    fn from(value: String) -> Self {
400        Self::String(value)
401    }
402}
403impl From<i32> for ScalarValue {
404    fn from(value: i32) -> Self {
405        Self::Int32(value)
406    }
407}
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum Predicate {
410    Eq(&'static str, ScalarValue),
411    Ne(&'static str, ScalarValue),
412    IsNull(&'static str),
413    IsNotNull(&'static str),
414    In(&'static str, Vec<ScalarValue>),
415    LikeContains(&'static str, String),
416    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
417    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
418    /// so the filter is a full-scan-with-predicate - acceptable for
419    /// human-driven `--project re:...` queries, not for hot paths.
420    Regex(&'static str, String),
421    Gte(&'static str, ScalarValue),
422    Lte(&'static str, ScalarValue),
423    And(Vec<Predicate>),
424    Or(Vec<Predicate>),
425    Not(Box<Predicate>),
426}
427impl Predicate {
428    pub fn to_lance(&self) -> String {
429        match self {
430            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
431            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
432            Self::IsNull(column) => format!("{column} IS NULL"),
433            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
434            Self::In(column, values) => {
435                let values = values
436                    .iter()
437                    .map(ScalarValue::to_lance)
438                    .collect::<Vec<_>>()
439                    .join(", ");
440                format!("{column} IN ({values})")
441            }
442            Self::LikeContains(column, value) => {
443                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
444            }
445            Self::Regex(column, pattern) => {
446                format!("regexp_like({column}, {})", quoted_string(pattern))
447            }
448            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
449            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
450            Self::And(predicates) => predicates
451                .iter()
452                .map(Self::to_lance)
453                .filter(|predicate| !predicate.is_empty())
454                .collect::<Vec<_>>()
455                .join(" AND "),
456            Self::Or(predicates) => {
457                // Wrap in parens so the disjunction composes safely as a child
458                // of an outer `And` (SQL `OR` binds looser than `AND`).
459                let body = predicates
460                    .iter()
461                    .map(Self::to_lance)
462                    .filter(|predicate| !predicate.is_empty())
463                    .collect::<Vec<_>>()
464                    .join(" OR ");
465                if body.is_empty() {
466                    String::new()
467                } else {
468                    format!("({body})")
469                }
470            }
471            Self::Not(inner) => {
472                let body = inner.to_lance();
473                if body.is_empty() {
474                    String::new()
475                } else {
476                    format!("NOT ({body})")
477                }
478            }
479        }
480    }
481}
482/// Read-side options for `Handle::scan`: optional prefilter predicate and
483/// optional projection. Default = no filter, all columns.
484#[derive(Default)]
485pub struct ScanOpts<'a> {
486    pub predicate: Option<&'a Predicate>,
487    pub projection: Option<&'a [&'a str]>,
488}
489
490impl<'a> ScanOpts<'a> {
491    pub fn project_only(projection: &'a [&'a str]) -> Self {
492        Self {
493            predicate: None,
494            projection: Some(projection),
495        }
496    }
497    pub fn with_predicate_and_projection(
498        predicate: &'a Predicate,
499        projection: &'a [&'a str],
500    ) -> Self {
501        Self {
502            predicate: Some(predicate),
503            projection: Some(projection),
504        }
505    }
506}
507
508impl ScalarValue {
509    fn to_lance(&self) -> String {
510        match self {
511            Self::String(value) => quoted_string(value),
512            Self::Int32(value) => value.to_string(),
513            Self::Raw(value) => value.clone(),
514        }
515    }
516}
517/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
518/// default (local FS gets a tighter cap; object stores stay near Lance's
519/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
520#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
521pub struct RuntimeCaps {
522    pub index_cache_bytes: Option<usize>,
523    pub metadata_cache_bytes: Option<usize>,
524}
525
526impl RuntimeCaps {
527    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
528        Self {
529            index_cache_bytes: config.index_cache_bytes,
530            metadata_cache_bytes: config.metadata_cache_bytes,
531        }
532    }
533}
534
535/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
536/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
537/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
538const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
539const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
540/// Object-store defaults: latency to refill is per-page, so keep more in cache.
541const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
542const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
543
544fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
545    let (index_default, metadata_default) = if config::is_local(location) {
546        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
547    } else {
548        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
549    };
550    (
551        caps.index_cache_bytes.unwrap_or(index_default),
552        caps.metadata_cache_bytes.unwrap_or(metadata_default),
553    )
554}
555
556pub struct Handle {
557    datasets: DatasetSet,
558    retry: RetryPolicy,
559    /// One `lance::Session` shared across all three datasets. Carries the
560    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
561    /// the underlying object_store / S3 client). Sharing the session means
562    /// one cache pool covers all three tables and one S3 client serves all
563    /// three datasets - load-bearing on object-store backends where a
564    /// per-dataset client would mean 3x the connection pools and 3x the
565    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
566    #[allow(dead_code)]
567    session: Arc<Session>,
568    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
569    /// future hosted pond swaps to "rest" without touching read/write paths
570    /// (spec.md#lance-chokepoints-catalog).
571    nm: Arc<dyn LanceNamespace>,
572    /// Namespace identifier this handle binds to. v1 is always `root()`; the
573    /// typed seam matches `resolve_namespace`'s return so multi-namespace
574    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
575    nm_ident: NamespaceIdent,
576    /// Object-store options threaded through every `DatasetBuilder` and
577    /// `Dataset::write` call so refresh / index-creation paths inherit the
578    /// same credentials and region as the initial open. Empty on local-FS
579    /// installs.
580    storage_options: HashMap<String, String>,
581    /// Data-dir URL the handle was opened against. `pond status` reads this
582    /// to display where the bytes live and to decide whether to walk a local
583    /// directory or issue a remote `LIST` for sizing.
584    location: Url,
585    /// Cached `parts.lance` open metadata, used the first time a caller asks
586    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
587    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
588    parts_refresh_after: Duration,
589}
590
591impl std::fmt::Debug for Handle {
592    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593        formatter
594            .debug_struct("Handle")
595            .field("datasets", &self.datasets)
596            .field("retry", &self.retry)
597            .field("nm_ident", &self.nm_ident)
598            .field("storage_options", &self.storage_options)
599            .field("location", &self.location)
600            .finish()
601    }
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub enum Table {
606    Sessions,
607    Messages,
608    Parts,
609}
610impl Table {
611    pub fn as_str(self) -> &'static str {
612        self.label()
613    }
614
615    fn label(self) -> &'static str {
616        match self {
617            Self::Sessions => "sessions",
618            Self::Messages => "messages",
619            Self::Parts => "parts",
620        }
621    }
622}
623#[derive(Debug)]
624struct DatasetSet {
625    sessions: Mutex<CachedDataset>,
626    messages: Mutex<CachedDataset>,
627    /// `parts.lance` opens lazily on the first read or write that needs it:
628    /// any `pond_get` (every mode reads parts to build summaries), grouped
629    /// search hydrating user-hit summaries, or ingest with Part events. A
630    /// process that does none of those skips the file, saving its metadata
631    /// pages and file handle at cold-open. The OnceCell makes init
632    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
633    /// to the other two.
634    parts: OnceCell<Mutex<CachedDataset>>,
635}
636#[derive(Debug)]
637struct CachedDataset {
638    dataset: Dataset,
639    last_refresh: Instant,
640    refresh_after: Duration,
641}
642impl CachedDataset {
643    async fn latest(&mut self) -> Result<Dataset> {
644        if self.last_refresh.elapsed() >= self.refresh_after {
645            self.dataset.checkout_latest().await?;
646            self.last_refresh = Instant::now();
647        }
648        Ok(self.dataset.clone())
649    }
650    fn replace(&mut self, dataset: Dataset) {
651        self.dataset = dataset;
652        self.last_refresh = Instant::now();
653    }
654}
655impl Handle {
656    /// Open without storage options or explicit cache caps. Backend-aware
657    /// defaults from `[runtime]` apply.
658    pub async fn open(location: &Url) -> Result<Self> {
659        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
660    }
661
662    /// Open with object-store options handed through to Lance verbatim, plus
663    /// the resolved `[runtime]` cache caps. Object-store keys are the
664    /// `object_store` crate's standard config names; pond does not parse them.
665    /// Opening datasets never performs index work; index lifecycle lives under
666    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
667    pub async fn open_with_options(
668        location: &Url,
669        mut storage_options: HashMap<String, String>,
670        caps: RuntimeCaps,
671    ) -> Result<Self> {
672        if let Some(path) = config::local_path(location) {
673            tokio::fs::create_dir_all(&path)
674                .await
675                .with_context(|| format!("failed to create data dir {}", path.display()))?;
676        } else {
677            apply_remote_storage_defaults(&mut storage_options);
678        }
679        // One Session shared across all three datasets so metadata/index
680        // caches and the object_store registry (and thus any S3 client) are
681        // pooled rather than duplicated three times. Caps are sized by the
682        // `[runtime]` block; explicit values from `caps` win, otherwise the
683        // local/remote backend default kicks in.
684        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
685        let session = Arc::new(Session::new(
686            index_cache_bytes,
687            metadata_cache_bytes,
688            Arc::new(ObjectStoreRegistry::default()),
689        ));
690        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
691        // The `root` property is whatever URL the Directory impl understands;
692        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
693        // URLs, so passing the scheme-qualified URL for local FS works the
694        // same as the bare-path form. Trailing slash stripped for clean logs.
695        let root = location.as_str().trim_end_matches('/').to_string();
696        let mut connect = ConnectBuilder::new("dir")
697            .property("root", root)
698            .session(session.clone());
699        // Object-store credentials/region/endpoint flow into the namespace
700        // via the `storage.<key>` property convention (lance-namespace-impls
701        // dir.rs from_properties: lines 423-436).
702        for (key, value) in &storage_options {
703            connect = connect.property(format!("storage.{key}"), value.clone());
704        }
705        let nm: Arc<dyn LanceNamespace> = connect
706            .connect()
707            .await
708            .context("failed to connect lance Directory namespace")?;
709        let nm_ident = NamespaceIdent::root();
710        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
711        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
712        // essentially free and removes the stale-read window entirely. Object
713        // stores have real per-call cost; `5s` caps manifest fetch overhead at
714        // acceptable lag for human-driven queries.
715        let refresh_after = if config::is_local(location) {
716            Duration::ZERO
717        } else {
718            Duration::from_secs(5)
719        };
720        let handle = Self {
721            datasets: DatasetSet {
722                sessions: Mutex::new(CachedDataset {
723                    dataset: open_or_create_via_ns(
724                        &nm,
725                        &nm_ident,
726                        sessions::SESSIONS,
727                        sessions::session_schema(),
728                        &session,
729                        &storage_options,
730                    )
731                    .await?,
732                    last_refresh: Instant::now(),
733                    refresh_after,
734                }),
735                messages: Mutex::new(CachedDataset {
736                    dataset: open_or_create_via_ns(
737                        &nm,
738                        &nm_ident,
739                        sessions::MESSAGES,
740                        sessions::message_schema(),
741                        &session,
742                        &storage_options,
743                    )
744                    .await?,
745                    last_refresh: Instant::now(),
746                    refresh_after,
747                }),
748                parts: OnceCell::new(),
749            },
750            retry: RetryPolicy::default(),
751            session,
752            nm,
753            nm_ident,
754            storage_options,
755            location: location.clone(),
756            parts_refresh_after: refresh_after,
757        };
758        Ok(handle)
759    }
760
761    pub fn location(&self) -> &Url {
762        &self.location
763    }
764
765    /// Read-only view of the `storage_options` the handle was opened with.
766    /// `pond status` needs them to instantiate a raw `object_store` client
767    /// that can `LIST` the remote bucket for sizing.
768    pub fn storage_options(&self) -> &HashMap<String, String> {
769        &self.storage_options
770    }
771
772    /// Object-store URI for a `pond_sql_query` export artifact:
773    /// `<location>/exports/<name>`. A sibling of the `*.lance` table dirs;
774    /// the Directory namespace tracks tables in its `__manifest` table rather
775    /// than by listing prefixes, so this prefix is never seen as a table
776    /// (lance-namespace-impls dir/manifest.rs). Never `register_table`'d.
777    fn export_uri(&self, name: &str) -> String {
778        format!(
779            "{}/exports/{name}",
780            self.location.as_str().trim_end_matches('/')
781        )
782    }
783
784    /// `ObjectStoreParams` carrying the handle's `storage_options` so raw
785    /// object-store opens (export I/O, `table_sizes` listing) inherit the same
786    /// credentials/region as the dataset opens. Empty options -> no accessor.
787    fn object_store_params(&self) -> ObjectStoreParams {
788        ObjectStoreParams {
789            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
790                Arc::new(StorageOptionsAccessor::with_static_options(
791                    self.storage_options.clone(),
792                ))
793            }),
794            ..Default::default()
795        }
796    }
797
798    /// Write a `pond_sql_query` export artifact, reusing the handle's
799    /// storage_options so S3 installs inherit the same credentials.
800    pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
801        let uri = self.export_uri(name);
802        let registry = Arc::new(ObjectStoreRegistry::default());
803        let (store, path) =
804            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
805                .await
806                .with_context(|| format!("failed to open object store for {uri}"))?;
807        store
808            .put(&path, bytes)
809            .await
810            .with_context(|| format!("failed to write export {uri}"))?;
811        Ok(())
812    }
813
814    /// Read a `pond_sql_query` export artifact back (for the
815    /// `pond-sql-export://` MCP resource).
816    pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
817        let uri = self.export_uri(name);
818        let registry = Arc::new(ObjectStoreRegistry::default());
819        let (store, path) =
820            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
821                .await
822                .with_context(|| format!("failed to open object store for {uri}"))?;
823        let bytes = store
824            .read_one_all(&path)
825            .await
826            .with_context(|| format!("failed to read export {uri}"))?;
827        Ok(bytes.to_vec())
828    }
829
830    /// Local filesystem path of an export artifact, when the data dir is
831    /// `file://`. The stdio MCP client shares this filesystem, so it can read
832    /// the file directly (e.g. duckdb/polars) instead of pulling base64 via
833    /// `resources/read`. `None` on object-store installs.
834    pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
835        if self.location.scheme() != "file" {
836            return None;
837        }
838        let dir = self.location.to_file_path().ok()?;
839        Some(dir.join("exports").join(name))
840    }
841
842    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
843        Ok((
844            self.count_rows(Table::Sessions).await?,
845            self.count_rows(Table::Messages).await?,
846            self.count_rows(Table::Parts).await?,
847        ))
848    }
849
850    /// Insert-only merge: append new rows, never overwrite a matched PK.
851    /// Returns rows inserted. The fold lives separately under
852    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
853    pub(crate) async fn merge_insert(
854        &self,
855        table: Table,
856        batch: RecordBatch,
857        row_count: usize,
858    ) -> Result<u64> {
859        self.merge(
860            table,
861            batch,
862            row_count,
863            "merge_insert",
864            WhenMatched::DoNothing,
865            WhenNotMatched::InsertAll,
866        )
867        .await
868    }
869
870    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
871    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
872    pub(crate) async fn merge_update(
873        &self,
874        table: Table,
875        batch: RecordBatch,
876        row_count: usize,
877    ) -> Result<u64> {
878        self.merge(
879            table,
880            batch,
881            row_count,
882            "merge_update",
883            WhenMatched::UpdateAll,
884            WhenNotMatched::DoNothing,
885        )
886        .await
887    }
888
889    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
890    /// Returns the number of rows affected (inserted or updated, whichever the
891    /// behaviors produce).
892    async fn merge(
893        &self,
894        table: Table,
895        batch: RecordBatch,
896        row_count: usize,
897        op: &'static str,
898        when_matched: WhenMatched,
899        when_not_matched: WhenNotMatched,
900    ) -> Result<u64> {
901        if row_count == 0 {
902            return Ok(0);
903        }
904        let started = Instant::now();
905        let result = self
906            .retry_lance(table.label(), || async {
907                let mut cached = self.cached(table).await?.lock().await;
908                let existing = cached.latest().await?;
909                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
910                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
911                builder.when_matched(when_matched.clone());
912                builder.when_not_matched(when_not_matched.clone());
913                // pond presents each PK at most once per batch; FirstSeen keeps
914                // the first occurrence rather than failing (Lance's default).
915                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
916                // Cleanup is operator-driven via `pond index optimize`; the
917                // per-commit auto hook would add a LIST per write on remote
918                // backends without changing the steady-state retention.
919                builder.skip_auto_cleanup(true);
920                let (dataset, stats) = builder
921                    .try_build()?
922                    .execute_reader(Box::new(reader))
923                    .await?;
924                cached.replace(dataset.as_ref().clone());
925                Ok((
926                    stats.num_inserted_rows + stats.num_updated_rows,
927                    stats.num_skipped_duplicates,
928                ))
929            })
930            .await;
931        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
932        tracing::info!(
933            target: "pond::perf",
934            op,
935            table = %table.label(),
936            rows = row_count,
937            elapsed_ms = started.elapsed().as_millis() as u64,
938            skipped,
939            "merge",
940        );
941        result.map(|(affected, _)| affected)
942    }
943
944    /// Run the table-local maintenance cycle for the supplied index intents.
945    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
946    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
947    ///
948    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
949    /// commit independently and use independent retry budgets, so a hot writer
950    /// that starves compaction (Rewrite) does not abort the index build
951    /// (Update) the operator actually asked for.
952    pub async fn optimize_table(
953        &self,
954        table: Table,
955        intents: &[IndexIntent],
956        progress: Option<&OptimizeProgressFn>,
957        policy: &MaintenancePolicy,
958    ) -> TableOptimizeOutcome {
959        let compaction = self
960            .run_optimize_compact_phase(table, progress, policy)
961            .await;
962        let indices = self
963            .run_optimize_indices_phase(table, intents, progress)
964            .await;
965        TableOptimizeOutcome {
966            table,
967            indices,
968            compaction,
969        }
970    }
971
972    /// Run only the indices phase for one table. Used by `pond embed`'s tail
973    /// to fold newly written vectors into the indices without paying the
974    /// compaction retry budget while embed itself may still be writing.
975    pub async fn optimize_table_indices_only(
976        &self,
977        table: Table,
978        intents: &[IndexIntent],
979        progress: Option<&OptimizeProgressFn>,
980    ) -> PhaseOutcome {
981        self.run_optimize_indices_phase(table, intents, progress)
982            .await
983    }
984
985    async fn run_optimize_indices_phase(
986        &self,
987        table: Table,
988        intents: &[IndexIntent],
989        progress: Option<&OptimizeProgressFn>,
990    ) -> PhaseOutcome {
991        if intents.is_empty() {
992            return PhaseOutcome::Noop;
993        }
994        let result = self
995            .retry_lance(table.label(), || async {
996                let mut guard = self.cached(table).await?.lock().await;
997                let mut dataset = guard.latest().await?;
998                let did_work =
999                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
1000                guard.replace(dataset);
1001                Ok::<_, anyhow::Error>(did_work)
1002            })
1003            .await;
1004        match result {
1005            Ok(true) => PhaseOutcome::Ok,
1006            Ok(false) => PhaseOutcome::Noop,
1007            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1008            Err(error) => PhaseOutcome::Failed(error),
1009        }
1010    }
1011
1012    async fn run_optimize_compact_phase(
1013        &self,
1014        table: Table,
1015        progress: Option<&OptimizeProgressFn>,
1016        policy: &MaintenancePolicy,
1017    ) -> PhaseOutcome {
1018        let result = self
1019            .retry_lance(table.label(), || async {
1020                let mut guard = self.cached(table).await?.lock().await;
1021                let mut dataset = guard.latest().await?;
1022                optimize_table_compact(&mut dataset, table, progress, policy).await?;
1023                guard.replace(dataset);
1024                Ok::<_, anyhow::Error>(())
1025            })
1026            .await;
1027        match result {
1028            Ok(()) => PhaseOutcome::Ok,
1029            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1030            Err(error) => PhaseOutcome::Failed(error),
1031        }
1032    }
1033
1034    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1035        self.retry_lance(table.label(), || async {
1036            let mut guard = self.cached(table).await?.lock().await;
1037            let mut dataset = guard.latest().await?;
1038            rebuild_index(&mut dataset, intent).await?;
1039            guard.replace(dataset);
1040            Ok(())
1041        })
1042        .await
1043    }
1044
1045    pub async fn index_status(
1046        &self,
1047        table: Table,
1048        intents: &[IndexIntent],
1049    ) -> Result<Vec<IndexStatus>> {
1050        let dataset = self.dataset(table).await?;
1051        index_status(table, &dataset, intents).await
1052    }
1053
1054    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1055        let mut cached = self.cached(table).await?.lock().await;
1056        cached.latest().await
1057    }
1058    /// Build a prefiltered `Scanner` for `table`. Composable read entry
1059    /// point for callers that need to layer extra builder calls
1060    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
1061    /// Routine scans should prefer `Handle::scan`.
1062    pub(crate) async fn scanner(
1063        &self,
1064        table: Table,
1065        predicate: Option<&Predicate>,
1066    ) -> Result<lance::dataset::scanner::Scanner> {
1067        let dataset = self.dataset(table).await?;
1068        scanner_with_prefilter(&dataset, predicate)
1069    }
1070    /// Single read entry point: prefilter via `predicate`, optionally
1071    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
1072    pub async fn scan(
1073        &self,
1074        table: Table,
1075        opts: ScanOpts<'_>,
1076    ) -> Result<lance::dataset::scanner::Scanner> {
1077        let mut scanner = self.scanner(table, opts.predicate).await?;
1078        if let Some(projection) = opts.projection {
1079            scanner.project(projection)?;
1080        }
1081        Ok(scanner)
1082    }
1083    pub(crate) async fn scan_batch(
1084        &self,
1085        table: Table,
1086        predicate: Option<&Predicate>,
1087        projection: &[&str],
1088    ) -> Result<RecordBatch> {
1089        let opts = ScanOpts {
1090            predicate,
1091            projection: (!projection.is_empty()).then_some(projection),
1092        };
1093        self.scan(table, opts)
1094            .await?
1095            .try_into_batch()
1096            .await
1097            .context("scan failed")
1098    }
1099    pub async fn count_rows(&self, table: Table) -> Result<usize> {
1100        self.dataset(table)
1101            .await?
1102            .count_rows(None)
1103            .await
1104            .map_err(Into::into)
1105    }
1106    /// Names of every index on `messages` - the vector-index tests read this.
1107    #[cfg(test)]
1108    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1109        let dataset = self.dataset(Table::Messages).await?;
1110        let indices = dataset.load_indices().await?;
1111        Ok(indices.iter().map(|index| index.name.clone()).collect())
1112    }
1113
1114    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
1115    /// a missing index reports the whole table. Powers `pond index status`.
1116    pub(crate) async fn unindexed_row_count(
1117        &self,
1118        table: Table,
1119        index_name: &str,
1120    ) -> Result<usize> {
1121        let dataset = self.dataset(table).await?;
1122        let fragments = dataset
1123            .unindexed_fragments(index_name)
1124            .await
1125            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1126        Ok(fragments
1127            .iter()
1128            .map(|fragment| fragment.num_rows().unwrap_or(0))
1129            .sum())
1130    }
1131
1132    /// Drop the named index. Used by the `pond embed --force` model-swap path
1133    /// to retire an IVF_PQ whose centroids belong to the old distance
1134    /// space, before the next write re-bootstraps it over the new model's
1135    /// vectors. Errors when the index does not exist; callers may swallow
1136    /// that.
1137    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1138        let mut guard = self.cached(table).await?.lock().await;
1139        let mut dataset = guard.latest().await?;
1140        dataset
1141            .drop_index(name)
1142            .await
1143            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1144        guard.replace(dataset);
1145        Ok(())
1146    }
1147
1148    /// Resolve each table's stored location through the namespace catalog
1149    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
1150    async fn table_location(&self, table_name: &str) -> Result<String> {
1151        let request = DescribeTableRequest {
1152            id: Some(self.nm_ident.as_table_id(table_name)),
1153            ..Default::default()
1154        };
1155        let response = self
1156            .nm
1157            .describe_table(request)
1158            .await
1159            .with_context(|| format!("failed to describe table {table_name}"))?;
1160        response
1161            .location
1162            .with_context(|| format!("namespace returned no location for table {table_name}"))
1163    }
1164
1165    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1166    /// Every byte is sized by listing through Lance's object store
1167    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1168    pub async fn table_sizes(&self) -> Result<TableSizes> {
1169        let registry = Arc::new(ObjectStoreRegistry::default());
1170        let params = self.object_store_params();
1171
1172        let sessions = self
1173            .listed_size(
1174                &registry,
1175                &params,
1176                &self.table_location(sessions::SESSIONS).await?,
1177            )
1178            .await?;
1179        let messages = self
1180            .listed_size(
1181                &registry,
1182                &params,
1183                &self.table_location(sessions::MESSAGES).await?,
1184            )
1185            .await?;
1186        let parts = self
1187            .listed_size(
1188                &registry,
1189                &params,
1190                &self.table_location(sessions::PARTS).await?,
1191            )
1192            .await?;
1193        // `other` is whatever sits under the data-dir root but not in the three
1194        // tables (config.toml, stray index temp files): root total minus them.
1195        let root_total = self
1196            .listed_size(&registry, &params, self.location.as_str())
1197            .await?;
1198        let other = root_total.saturating_sub(sessions + messages + parts);
1199        Ok(TableSizes {
1200            sessions,
1201            messages,
1202            parts,
1203            other,
1204        })
1205    }
1206
1207    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1208    async fn listed_size(
1209        &self,
1210        registry: &Arc<ObjectStoreRegistry>,
1211        params: &ObjectStoreParams,
1212        uri: &str,
1213    ) -> Result<u64> {
1214        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1215            .await
1216            .with_context(|| format!("failed to open object store for {uri}"))?;
1217        let mut listing = store.list(Some(base));
1218        let mut total = 0u64;
1219        while let Some(meta) = listing.next().await {
1220            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1221            total += meta.size;
1222        }
1223        Ok(total)
1224    }
1225    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1226        match table {
1227            Table::Sessions => Ok(&self.datasets.sessions),
1228            Table::Messages => Ok(&self.datasets.messages),
1229            Table::Parts => self.parts_cached().await,
1230        }
1231    }
1232
1233    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1234    /// `OnceCell`; once initialized, behaves identically to the other two.
1235    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1236        self.datasets
1237            .parts
1238            .get_or_try_init(|| async {
1239                let dataset = open_or_create_via_ns(
1240                    &self.nm,
1241                    &self.nm_ident,
1242                    sessions::PARTS,
1243                    sessions::part_schema(),
1244                    &self.session,
1245                    &self.storage_options,
1246                )
1247                .await?;
1248                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1249                    dataset,
1250                    last_refresh: Instant::now(),
1251                    refresh_after: self.parts_refresh_after,
1252                }))
1253            })
1254            .await
1255    }
1256    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1257    where
1258        Fut: std::future::Future<Output = Result<T>>,
1259        Op: FnMut() -> Fut,
1260    {
1261        let mut attempt = 0u8;
1262        loop {
1263            attempt = attempt.saturating_add(1);
1264            match operation().await {
1265                Ok(value) => return Ok(value),
1266                Err(error) if attempt < self.retry.attempts => {
1267                    let backoff = self.backoff(attempt);
1268                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1269                    // drops everything below the top-level message.
1270                    let error_chain = format!("{error:#}");
1271                    tracing::warn!(
1272                        label,
1273                        attempt,
1274                        ?backoff,
1275                        error = %error_chain,
1276                        "retrying Lance operation"
1277                    );
1278                    tokio::time::sleep(backoff).await;
1279                }
1280                Err(error) => {
1281                    let error_chain = format!("{error:#}");
1282                    tracing::warn!(
1283                        label,
1284                        attempt,
1285                        error = %error_chain,
1286                        "Lance operation exhausted retries"
1287                    );
1288                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1289                    // rather than the generic `storage_unavailable` bucket. The
1290                    // chain root is a `lance::Error` (commit-conflict family) when
1291                    // pond's retry layer exhausted because the manifest could not
1292                    // be advanced; everything else (timeouts, IAM, disk) stays
1293                    // `storage_unavailable`.
1294                    if is_commit_conflict(&error) {
1295                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1296                    }
1297                    return Err(error);
1298                }
1299            }
1300        }
1301    }
1302    fn backoff(&self, attempt: u8) -> Duration {
1303        let shift = u32::from(attempt.saturating_sub(1));
1304        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1305        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1306        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1307        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1308        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1309        base.mul_f64(factor).min(self.retry.max_backoff)
1310    }
1311}
1312/// Compaction phase: `compact_files` + `cleanup_old_versions`, both inside one
1313/// retry block. Distinct from the indices phase so a hot writer that loses the
1314/// Rewrite race here does not abort index work the operator actually asked for.
1315///
1316/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1317/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1318/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1319/// requires row_addrs but none were provided": `rewrite_files` skips
1320/// row_addrs when stable row ids are on, then the FRI builder demands
1321/// them. With stable_row_ids the remap step is already a no-op
1322/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1323/// !defer_index_remap`), so running without FRI is correct - we only
1324/// lose the documented concurrency-with-index-build benefit. Flip to
1325/// `true` once upstream fixes the conflict.
1326async fn optimize_table_compact(
1327    dataset: &mut Dataset,
1328    table: Table,
1329    progress: Option<&OptimizeProgressFn>,
1330    policy: &MaintenancePolicy,
1331) -> Result<()> {
1332    let compaction = CompactionOptions {
1333        defer_index_remap: false,
1334        ..CompactionOptions::default()
1335    };
1336
1337    // Candidacy mirrors Lance's planner: a fragment is compactable iff it holds
1338    // fewer than target_rows_per_fragment rows (optimize.rs).
1339    let target = compaction.target_rows_per_fragment;
1340    let fragments = dataset.get_fragments();
1341    let (mergeable_run_rows, candidate_count) = compaction_candidates(
1342        fragments
1343            .iter()
1344            .map(|fragment| fragment.metadata().physical_rows.unwrap_or(0)),
1345        target,
1346    );
1347    if should_compact(
1348        mergeable_run_rows,
1349        candidate_count,
1350        target,
1351        policy.compaction_fragment_cap,
1352    ) {
1353        emit(
1354            progress,
1355            OptimizeEvent::PhaseStart {
1356                table,
1357                phase: OptimizePhase::Compact,
1358                detail: None,
1359            },
1360        );
1361        let started = Instant::now();
1362        compact_files(dataset, compaction, None).await?;
1363        emit(
1364            progress,
1365            OptimizeEvent::PhaseDone {
1366                table,
1367                phase: OptimizePhase::Compact,
1368                elapsed_ms: started.elapsed().as_millis() as u64,
1369            },
1370        );
1371    } else {
1372        tracing::debug!(
1373            target: "pond::perf",
1374            table = table.as_str(),
1375            mergeable_run_rows,
1376            candidate_count,
1377            cap = policy.compaction_fragment_cap,
1378            "compaction skipped: sub-target fragments under threshold",
1379        );
1380    }
1381
1382    // Safe GC only. delete_unverified=false keeps Lance's 7-day in-progress
1383    // guard, so this never races a concurrent writer (spec.md#concurrency); GC
1384    // runs outside OCC, so the guard is what makes it safe on any backend.
1385    emit(
1386        progress,
1387        OptimizeEvent::PhaseStart {
1388            table,
1389            phase: OptimizePhase::Cleanup,
1390            detail: None,
1391        },
1392    );
1393    let started = Instant::now();
1394    dataset
1395        .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1396        .await
1397        .context("cleanup_old_versions failed during index optimize")?;
1398    emit(
1399        progress,
1400        OptimizeEvent::PhaseDone {
1401            table,
1402            phase: OptimizePhase::Cleanup,
1403            elapsed_ms: started.elapsed().as_millis() as u64,
1404        },
1405    );
1406
1407    Ok(())
1408}
1409
1410/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1411/// for incremental families. Returns `true` if anything committed.
1412async fn optimize_table_indices(
1413    dataset: &mut Dataset,
1414    intents: &[IndexIntent],
1415    table: Table,
1416    progress: Option<&OptimizeProgressFn>,
1417) -> Result<bool> {
1418    let existing = dataset.load_indices().await?;
1419    let existing_names: std::collections::HashSet<String> =
1420        existing.iter().map(|index| index.name.clone()).collect();
1421
1422    let mut append_indices: Vec<String> = Vec::new();
1423    let mut did_work = false;
1424
1425    for intent in intents {
1426        let exists = existing_names.contains(intent.name);
1427
1428        if !exists {
1429            if !intent.trigger.should_create(dataset).await? {
1430                continue;
1431            }
1432            let params = intent.params.build(dataset).await?;
1433            let index_type = intent.params.index_type();
1434            tracing::info!(
1435                index = intent.name,
1436                column = intent.column,
1437                "creating Lance index (trigger fired)",
1438            );
1439            emit(
1440                progress,
1441                OptimizeEvent::PhaseStart {
1442                    table,
1443                    phase: OptimizePhase::IndexCreate,
1444                    detail: Some(intent.name.to_owned()),
1445                },
1446            );
1447            let started = Instant::now();
1448            dataset
1449                .create_index(
1450                    &[intent.column],
1451                    index_type,
1452                    Some(intent.name.to_owned()),
1453                    params.as_ref(),
1454                    false,
1455                )
1456                .await
1457                .with_context(|| format!("failed to create index {}", intent.name))?;
1458            emit(
1459                progress,
1460                OptimizeEvent::PhaseDone {
1461                    table,
1462                    phase: OptimizePhase::IndexCreate,
1463                    elapsed_ms: started.elapsed().as_millis() as u64,
1464                },
1465            );
1466            did_work = true;
1467            continue;
1468        }
1469
1470        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1471        if unindexed.is_empty() {
1472            continue;
1473        }
1474        // Lag guard: let fragments accumulate behind the brute-force fallback
1475        // rather than firing a commit per tiny append. Threshold is operator-
1476        // tunable via `[maintenance].index_lag_threshold`.
1477        if unindexed.len() < index_lag_threshold() {
1478            continue;
1479        }
1480        match intent.params {
1481            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1482                let params = intent.params.build(dataset).await?;
1483                let index_type = intent.params.index_type();
1484                tracing::debug!(
1485                    target: "pond::perf",
1486                    index = intent.name,
1487                    column = intent.column,
1488                    "rebuilding Lance BTree index",
1489                );
1490                emit(
1491                    progress,
1492                    OptimizeEvent::PhaseStart {
1493                        table,
1494                        phase: OptimizePhase::IndexRebuild,
1495                        detail: Some(intent.name.to_owned()),
1496                    },
1497                );
1498                let started = Instant::now();
1499                dataset
1500                    .create_index(
1501                        &[intent.column],
1502                        index_type,
1503                        Some(intent.name.to_owned()),
1504                        params.as_ref(),
1505                        true,
1506                    )
1507                    .await
1508                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1509                emit(
1510                    progress,
1511                    OptimizeEvent::PhaseDone {
1512                        table,
1513                        phase: OptimizePhase::IndexRebuild,
1514                        elapsed_ms: started.elapsed().as_millis() as u64,
1515                    },
1516                );
1517                did_work = true;
1518            }
1519            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1520            | IndexParamsKind::InvertedFtsNgram { .. }
1521            | IndexParamsKind::IvfPqCosine { .. } => {
1522                append_indices.push(intent.name.to_owned());
1523            }
1524            IndexParamsKind::Scalar(_) => {
1525                let params = intent.params.build(dataset).await?;
1526                emit(
1527                    progress,
1528                    OptimizeEvent::PhaseStart {
1529                        table,
1530                        phase: OptimizePhase::IndexRebuild,
1531                        detail: Some(intent.name.to_owned()),
1532                    },
1533                );
1534                let started = Instant::now();
1535                dataset
1536                    .create_index(
1537                        &[intent.column],
1538                        intent.params.index_type(),
1539                        Some(intent.name.to_owned()),
1540                        params.as_ref(),
1541                        true,
1542                    )
1543                    .await
1544                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1545                emit(
1546                    progress,
1547                    OptimizeEvent::PhaseDone {
1548                        table,
1549                        phase: OptimizePhase::IndexRebuild,
1550                        elapsed_ms: started.elapsed().as_millis() as u64,
1551                    },
1552                );
1553                did_work = true;
1554            }
1555        }
1556    }
1557
1558    if !append_indices.is_empty() {
1559        let to_append = append_indices.clone();
1560        emit(
1561            progress,
1562            OptimizeEvent::PhaseStart {
1563                table,
1564                phase: OptimizePhase::IndexAppend,
1565                detail: Some(append_indices.join(", ")),
1566            },
1567        );
1568        let started = Instant::now();
1569        dataset
1570            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1571            .await
1572            .context("optimize_indices(append) failed during index optimize")?;
1573        emit(
1574            progress,
1575            OptimizeEvent::PhaseDone {
1576                table,
1577                phase: OptimizePhase::IndexAppend,
1578                elapsed_ms: started.elapsed().as_millis() as u64,
1579            },
1580        );
1581        tracing::debug!(
1582            target: "pond::perf",
1583            indices = ?append_indices,
1584            "appended trailing fragments into indices",
1585        );
1586        did_work = true;
1587    }
1588
1589    Ok(did_work)
1590}
1591
1592async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1593    if !intent.trigger.should_create(dataset).await? {
1594        return Ok(());
1595    }
1596    let params = intent.params.build(dataset).await?;
1597    dataset
1598        .create_index(
1599            &[intent.column],
1600            intent.params.index_type(),
1601            Some(intent.name.to_owned()),
1602            params.as_ref(),
1603            true,
1604        )
1605        .await
1606        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1607    Ok(())
1608}
1609
1610async fn index_status(
1611    table: Table,
1612    dataset: &Dataset,
1613    intents: &[IndexIntent],
1614) -> Result<Vec<IndexStatus>> {
1615    let existing = dataset.load_indices().await?;
1616    let existing_names: std::collections::HashSet<String> =
1617        existing.iter().map(|index| index.name.clone()).collect();
1618    let total_fragments = dataset.get_fragments().len();
1619    let total_rows = dataset.count_rows(None).await?;
1620    let mut statuses = Vec::with_capacity(intents.len());
1621    for intent in intents {
1622        let exists = existing_names.contains(intent.name);
1623        if !exists {
1624            statuses.push(IndexStatus {
1625                table,
1626                intent_name: intent.name.to_owned(),
1627                fragments_covered: 0,
1628                unindexed_fragments: total_fragments,
1629                unindexed_rows: total_rows,
1630                exists,
1631            });
1632            continue;
1633        }
1634        let unindexed = dataset
1635            .unindexed_fragments(intent.name)
1636            .await
1637            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1638        let unindexed_fragments = unindexed.len();
1639        let unindexed_rows = unindexed
1640            .iter()
1641            .map(|fragment| fragment.num_rows().unwrap_or(0))
1642            .sum();
1643        statuses.push(IndexStatus {
1644            table,
1645            intent_name: intent.name.to_owned(),
1646            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1647            unindexed_fragments,
1648            unindexed_rows,
1649            exists,
1650        });
1651    }
1652    Ok(statuses)
1653}
1654
1655/// Open the table at `table_name` via the namespace; create + initialize on
1656/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1657/// expectation so a stale data dir surfaces early.
1658///
1659/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1660/// the builder re-wraps an already-`Namespace`-wrapped error
1661/// (lance/src/dataset/builder.rs:142), so going through it would force a
1662/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1663/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1664/// namespace external-manifest commits) is not wired here; v1 ships
1665/// Directory v2 only.
1666async fn open_or_create_via_ns(
1667    nm: &Arc<dyn LanceNamespace>,
1668    nm_ident: &NamespaceIdent,
1669    table_name: &str,
1670    schema: lance::deps::arrow_schema::SchemaRef,
1671    session: &Arc<Session>,
1672    storage_options: &HashMap<String, String>,
1673) -> Result<Dataset> {
1674    let table_id = nm_ident.as_table_id(table_name);
1675
1676    let request = DescribeTableRequest {
1677        id: Some(table_id.clone()),
1678        ..Default::default()
1679    };
1680    match nm.describe_table(request).await {
1681        Ok(response) => {
1682            let location = response.location.with_context(|| {
1683                format!("namespace returned no location for table {table_name}")
1684            })?;
1685            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1686            if !storage_options.is_empty() {
1687                builder = builder.with_storage_options(storage_options.clone());
1688            }
1689            let dataset = builder
1690                .load()
1691                .await
1692                .with_context(|| format!("failed to open table {table_name}"))?;
1693            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1694            return Ok(dataset);
1695        }
1696        Err(error) => match &error {
1697            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1698                // fall through to create
1699            }
1700            _ => {
1701                return Err(anyhow::Error::from(error))
1702                    .with_context(|| format!("failed to describe table {table_name}"));
1703            }
1704        },
1705    }
1706
1707    // Create path: pond seeds an empty dataset with the canonical schema so
1708    // every subsequent open lands on a real Lance dataset, not a phantom.
1709    let mut write_params = sessions::write_params_for_create();
1710    write_params.session = Some(session.clone());
1711    write_params.mode = WriteMode::Create;
1712    if !storage_options.is_empty() {
1713        write_params.store_params = Some(ObjectStoreParams {
1714            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1715                storage_options.clone(),
1716            ))),
1717            ..Default::default()
1718        });
1719    }
1720    let reader = sessions::empty_reader(schema)?;
1721    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1722        .await
1723        .with_context(|| format!("failed to create table {table_name}"))
1724}
1725
1726// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
1727// before the underlying `NamespaceError`; walk the whole `.source()` chain
1728// rather than only matching the outer variant.
1729fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1730    if !matches!(error, lance::Error::Namespace { .. }) {
1731        return false;
1732    }
1733    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1734        link.source()
1735    })
1736    .filter_map(|link| link.downcast_ref::<NamespaceError>())
1737    .any(|inner| inner.code() == code)
1738}
1739
1740fn scanner_with_prefilter(
1741    dataset: &Dataset,
1742    predicate: Option<&Predicate>,
1743) -> Result<lance::dataset::scanner::Scanner> {
1744    let mut scanner = dataset.scan();
1745    scanner.prefilter(true);
1746    if let Some(predicate) = predicate {
1747        let filter = predicate.to_lance();
1748        if !filter.is_empty() {
1749            scanner.filter(&filter)?;
1750        }
1751    }
1752    Ok(scanner)
1753}
1754fn ensure_schema_matches(
1755    dataset: &Dataset,
1756    expected: &lance::deps::arrow_schema::Schema,
1757    table_name: &str,
1758) -> Result<()> {
1759    use lance::deps::arrow_schema::DataType;
1760    use std::collections::BTreeSet;
1761    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1762    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1763    let expected_names: BTreeSet<&str> = expected
1764        .fields()
1765        .iter()
1766        .map(|f| f.name().as_str())
1767        .collect();
1768    if actual_names != expected_names {
1769        anyhow::bail!(
1770            "table {table_name} has columns {actual_names:?} but this pond build expects \
1771             {expected_names:?} - the on-disk store predates a schema change; delete the \
1772             data directory and re-run `pond ingest`",
1773        );
1774    }
1775    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1776    // the on-disk vector column width) early with a friendly message. Lance
1777    // would otherwise reject the next write with an opaque schema-mismatch
1778    // error inside the `merge_update` path.
1779    for actual_field in actual.fields() {
1780        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1781            continue;
1782        };
1783        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1784            (actual_field.data_type(), expected_field.data_type())
1785            && actual_dim != expected_dim
1786        {
1787            tracing::warn!(
1788                table = table_name,
1789                column = actual_field.name(),
1790                actual_dim,
1791                expected_dim,
1792                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1793            );
1794        }
1795    }
1796    Ok(())
1797}
1798/// Object-store defaults injected for any non-local pond location. Each key
1799/// is only set when neither the user-provided key nor its env-var-form alias
1800/// is already present, so explicit overrides in `[storage]` always win.
1801/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1802/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1803/// signature is wasted work the server does not validate.
1804fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1805    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1806        if aliases
1807            .iter()
1808            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1809        {
1810            return;
1811        }
1812        options.insert(aliases[0].to_owned(), value.to_owned());
1813    }
1814    set_default(options, &["pool_idle_timeout"], "300 seconds");
1815    set_default(options, &["connect_timeout"], "10 seconds");
1816    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1817        .iter()
1818        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1819    if has_custom_endpoint {
1820        set_default(
1821            options,
1822            &["aws_unsigned_payload", "unsigned_payload"],
1823            "true",
1824        );
1825    }
1826}
1827
1828fn quoted_string(value: &str) -> String {
1829    format!("'{}'", value.replace('\'', "''"))
1830}
1831fn like_contains(value: &str) -> String {
1832    let escaped = value
1833        .replace('\\', "\\\\")
1834        .replace('%', "\\%")
1835        .replace('_', "\\_")
1836        .replace('\'', "''");
1837    format!("'%{escaped}%'")
1838}
1839
1840#[cfg(test)]
1841mod tests {
1842    use super::*;
1843    use tempfile::TempDir;
1844
1845    #[test]
1846    fn compaction_gate_skips_subtarget_trickle_compacts_on_progress() {
1847        let target = 1_048_576;
1848        // Bloat case: a large trailing fragment plus a tiny new one - under a
1849        // full target fragment and under the cap -> skip (don't re-Rewrite).
1850        assert!(!should_compact(510_000 + 30, 2, target, 64));
1851        // A run that fills a whole target fragment -> compact (and freeze it).
1852        assert!(should_compact(target, 3, target, 64));
1853        // Many tiny fragments past the cap -> compact to bound fragment count.
1854        assert!(should_compact(5_000, 64, target, 64));
1855        // cap == 0 always compacts (preserves pre-gate behavior for tests).
1856        assert!(should_compact(0, 0, target, 0));
1857    }
1858
1859    #[test]
1860    fn compaction_candidates_strands_isolated_subtarget_fragment() {
1861        let target = 1_048_576;
1862        // [at-target, isolated 256K, at-target, tail 510K, tiny 30]: the only
1863        // mergeable run is tail+tiny; the 256K between at-target frags is stranded,
1864        // so even though sub-target rows total 766K it never re-fires the gate.
1865        let (run, count) =
1866            compaction_candidates([1_048_576, 256_000, 1_048_576, 510_000, 30], target);
1867        assert_eq!(count, 3);
1868        assert_eq!(run, 510_030);
1869        assert!(!should_compact(run, count, target, 64));
1870    }
1871
1872    #[test]
1873    fn namespace_error_code_walks_wrapped_chain() {
1874        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1875            message: "missing".into(),
1876        }));
1877        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1878
1879        let wrapped = lance::Error::namespace_source(Box::new(direct));
1880        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1881
1882        let other_code =
1883            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1884                message: "nope".into(),
1885            }));
1886        assert!(!is_namespace_error_code(
1887            &other_code,
1888            ErrorCode::TableNotFound
1889        ));
1890
1891        let not_namespace = lance::Error::internal("unrelated");
1892        assert!(!is_namespace_error_code(
1893            &not_namespace,
1894            ErrorCode::TableNotFound
1895        ));
1896    }
1897
1898    /// Round-trip: opening a fresh data dir through `lance-namespace`
1899    /// produces all three tables, and `Handle::scan` returns an empty batch
1900    /// for each (no spurious schema mismatch, no namespace error).
1901    #[tokio::test]
1902    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1903        let temp = TempDir::new()?;
1904        let url = Url::from_directory_path(temp.path())
1905            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1906        let handle = Handle::open(&url).await?;
1907        // Each table has its own PK column; project the canonical one so the
1908        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
1909        let cases: [(Table, &[&str]); 3] = [
1910            (Table::Sessions, &["id"]),
1911            (Table::Messages, &["id"]),
1912            (Table::Parts, &["id"]),
1913        ];
1914        for (table, projection) in cases {
1915            let scanner = handle
1916                .scan(table, ScanOpts::project_only(projection))
1917                .await?;
1918            let batch = scanner.try_into_batch().await?;
1919            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1920        }
1921        Ok(())
1922    }
1923}