Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30    collections::HashMap,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37/// Embedded-row count at which pond builds the IVF_PQ vector index on
38/// `messages.vector` (spec.md#search). Below it, vector search runs a
39/// brute-force flat scan - exact and fast at small and medium scale, and
40/// IVF_PQ cannot train well on fewer vectors anyway.
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43/// Default minimum unindexed-fragment count required before a per-intent
44/// append/rebuild step is admitted into `optimize_table_indices`. Lower
45/// values make each commit smaller and more frequent (bad on remote
46/// stores); higher values let fragments accumulate behind the brute-force
47/// fallback. 4 is the floor of the documented 4-8 band.
48pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52/// Seed the process-wide index-lag threshold from `[maintenance].index_lag_threshold`.
53/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
54pub fn init_index_lag_threshold(value: usize) {
55    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59    INDEX_LAG_THRESHOLD_RUNTIME
60        .get()
61        .copied()
62        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65/// Compaction runs only past this many sub-target fragments, so the 5-min sync
66/// stops re-Rewriting the trailing fragment every pass (spec.md#lance-index-maintenance).
67/// 0 disables the gate.
68pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
69
70/// Default manifest-retention window for the safe cleanup pass. Matches
71/// LanceDB's recommended OSS-operator practice (lancedb docs: performance.mdx,
72/// tables/update.mdx). With `delete_unverified=false`, Lance's 7-day
73/// in-progress guard still protects unverified files regardless of this value
74/// (`UNVERIFIED_THRESHOLD_DAYS` in lance/dataset/cleanup.rs).
75pub fn default_cleanup_older_than() -> chrono::Duration {
76    chrono::Duration::days(1)
77}
78
79/// Resolved per-call inputs to the storage-maintenance pass. Built from
80/// `[maintenance]` (and any per-invocation CLI override) at the entry point;
81/// threaded down to `optimize_table_compact` so the substrate never re-reads
82/// `Config` itself.
83#[derive(Debug, Clone, Copy)]
84pub struct MaintenancePolicy {
85    /// Compaction gate: see [`DEFAULT_COMPACTION_FRAGMENT_CAP`]. `0` always
86    /// compacts (preserves the pre-gate test behavior).
87    pub compaction_fragment_cap: usize,
88    /// Manifest-retention window handed to `cleanup_old_versions`.
89    pub cleanup_older_than: chrono::Duration,
90}
91
92impl MaintenancePolicy {
93    /// Preserves the pre-gate compaction-always behavior that the existing
94    /// optimize tests assume.
95    pub fn always_compact() -> Self {
96        Self {
97            compaction_fragment_cap: 0,
98            cleanup_older_than: default_cleanup_older_than(),
99        }
100    }
101}
102
103/// Compact when the largest mergeable run of sub-target fragments can fill a
104/// whole target fragment (consolidation that freezes a fragment) or the
105/// sub-target count has piled past `cap`. `cap == 0` always compacts.
106fn should_compact(
107    mergeable_run_rows: usize,
108    candidate_count: usize,
109    target_rows: usize,
110    cap: usize,
111) -> bool {
112    mergeable_run_rows >= target_rows || candidate_count >= cap
113}
114
115/// Largest contiguous below-target run (rows) and total below-target count over
116/// fragments in dataset order. The run approximates Lance's biggest mergeable
117/// bin, so a fragment stranded between at-target fragments (which Lance won't
118/// merge) never inflates the total and never triggers perpetual re-compaction.
119fn compaction_candidates(
120    physical_rows: impl IntoIterator<Item = usize>,
121    target: usize,
122) -> (usize, usize) {
123    let mut count = 0;
124    let mut run = 0;
125    let mut max_run = 0;
126    for rows in physical_rows {
127        if rows < target {
128            count += 1;
129            run += rows;
130            max_run = max_run.max(run);
131        } else {
132            run = 0;
133        }
134    }
135    (max_run, count)
136}
137
138/// Declarative description of one index pond keeps on a table. Created when
139/// its trigger fires; folded forward by `pond index optimize`.
140#[derive(Debug, Clone)]
141pub struct IndexIntent {
142    /// Stable on-disk name. Must match across runs so existence checks
143    /// resolve.
144    pub name: &'static str,
145    /// Column the index covers.
146    pub column: &'static str,
147    /// Condition evaluated against the live dataset before each cycle.
148    pub trigger: IndexTrigger,
149    /// How the params are built at create time. Some intents have static
150    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
151    pub params: IndexParamsKind,
152}
153
154/// When an [`IndexIntent`] should exist on disk.
155#[derive(Debug, Clone)]
156pub enum IndexTrigger {
157    /// Build whenever the table has any rows. Used for FTS and scalar
158    /// indices: there is no training cost worth delaying.
159    OnAnyRows,
160    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
161    /// IVF_PQ vector index, which trains poorly on too few vectors.
162    OnNonNullCount {
163        column: &'static str,
164        threshold: usize,
165    },
166}
167
168/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
169/// right `IndexParams` at create time.
170#[derive(Debug, Clone)]
171pub enum IndexParamsKind {
172    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
173    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
174    Scalar(BuiltinIndexType),
175    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
176    /// `[min, max]` range and stemming / stop-words off
177    /// (spec.md#search-language-neutral-index).
178    InvertedFtsNgram { min: u32, max: u32 },
179    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
180    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
181    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
182    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
183    IvfPqCosine {
184        sub_vectors: usize,
185        num_bits: u8,
186        max_iters: usize,
187    },
188}
189
190impl IndexTrigger {
191    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
192        match self {
193            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
194            Self::OnNonNullCount { column, threshold } => {
195                let count = dataset
196                    .count_rows(Some(format!("{column} IS NOT NULL")))
197                    .await?;
198                Ok(count >= *threshold)
199            }
200        }
201    }
202}
203
204impl IndexParamsKind {
205    fn index_type(&self) -> IndexType {
206        match self {
207            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
208            Self::Scalar(_) => IndexType::BTree,
209            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
210            Self::IvfPqCosine { .. } => IndexType::Vector,
211        }
212    }
213
214    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
215        match self {
216            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
217            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
218                InvertedIndexParams::default()
219                    .base_tokenizer("ngram".to_owned())
220                    .ngram_min_length(*min)
221                    .ngram_max_length(*max)
222                    .stem(false)
223                    .remove_stop_words(false),
224            )),
225            Self::IvfPqCosine {
226                sub_vectors,
227                num_bits,
228                max_iters,
229            } => {
230                let count = dataset
231                    .count_rows(Some("vector IS NOT NULL".to_owned()))
232                    .await?;
233                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
234                Ok(Box::new(VectorIndexParams::ivf_pq(
235                    partitions,
236                    *num_bits,
237                    *sub_vectors,
238                    MetricType::Cosine,
239                    *max_iters,
240                )))
241            }
242        }
243    }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct IndexStatus {
248    pub table: Table,
249    pub intent_name: String,
250    pub fragments_covered: usize,
251    pub unindexed_fragments: usize,
252    pub unindexed_rows: usize,
253    pub exists: bool,
254}
255
256/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
257/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
258/// downcasts to this type to classify the outcome as `conflict` rather than
259/// the generic `storage_unavailable`.
260#[derive(Debug, Clone, Copy)]
261pub struct ConflictExhausted {
262    pub attempts: u8,
263}
264
265impl std::fmt::Display for ConflictExhausted {
266    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        write!(
268            formatter,
269            "commit conflict exhausted after {} attempt(s)",
270            self.attempts
271        )
272    }
273}
274
275impl std::error::Error for ConflictExhausted {}
276
277/// Per-phase result for one table's pass through `Handle::optimize_table`.
278/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
279/// compaction phase get independent retry budgets and independent commits,
280/// so a hot writer that starves the Rewrite cannot abort the index Update.
281#[derive(Debug)]
282pub enum PhaseOutcome {
283    /// Phase attempted and committed work.
284    Ok,
285    /// Phase attempted; no work was needed.
286    Noop,
287    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
288    /// can rerun later once the hot writer quiesces).
289    SkippedConflict,
290    /// Phase failed with a non-conflict error.
291    Failed(anyhow::Error),
292    /// Phase not requested by the caller (e.g. compaction skipped under
293    /// `Store::build_indices_only`).
294    NotAttempted,
295}
296
297impl PhaseOutcome {
298    pub fn is_failed(&self) -> bool {
299        matches!(self, Self::Failed(_))
300    }
301}
302
303/// What `Handle::optimize_table` did for one table.
304#[derive(Debug)]
305pub struct TableOptimizeOutcome {
306    pub table: Table,
307    pub indices: PhaseOutcome,
308    pub compaction: PhaseOutcome,
309}
310
311/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
312/// progress callback to render a live spinner; library callers pass `None`.
313#[derive(Debug, Clone)]
314pub enum OptimizeEvent {
315    PhaseStart {
316        table: Table,
317        phase: OptimizePhase,
318        detail: Option<String>,
319    },
320    PhaseDone {
321        table: Table,
322        phase: OptimizePhase,
323        elapsed_ms: u64,
324    },
325}
326
327#[derive(Debug, Clone, Copy)]
328pub enum OptimizePhase {
329    Compact,
330    Cleanup,
331    IndexCreate,
332    IndexRebuild,
333    IndexAppend,
334}
335
336impl OptimizePhase {
337    pub fn label(self) -> &'static str {
338        match self {
339            Self::Compact => "compact",
340            Self::Cleanup => "cleanup",
341            Self::IndexCreate => "index-create",
342            Self::IndexRebuild => "index-rebuild",
343            Self::IndexAppend => "index-append",
344        }
345    }
346}
347
348pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
349
350fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
351    if let Some(callback) = progress {
352        callback(event);
353    }
354}
355
356/// True when the chain root is one of Lance's commit-conflict variants
357/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
358/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
359pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
360    error.downcast_ref::<lance::Error>().is_some_and(|err| {
361        matches!(
362            err,
363            lance::Error::CommitConflict { .. }
364                | lance::Error::RetryableCommitConflict { .. }
365                | lance::Error::TooMuchWriteContention { .. }
366        )
367    })
368}
369
370/// True when `retry_lance` exhausted retries against an OCC conflict and
371/// attached `ConflictExhausted` to the chain head.
372fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
373    error.chain().any(|cause| cause.is::<ConflictExhausted>())
374}
375
376/// On-disk byte totals for the three session datasets, plus everything else
377/// under the data-dir root. Sized by listing through Lance's object-store
378/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
379#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
380pub struct TableSizes {
381    pub sessions: u64,
382    pub messages: u64,
383    pub parts: u64,
384    pub other: u64,
385}
386
387#[derive(Debug, Clone, PartialEq, Eq)]
388pub enum ScalarValue {
389    String(String),
390    Int32(i32),
391    Raw(String),
392}
393impl From<&str> for ScalarValue {
394    fn from(value: &str) -> Self {
395        Self::String(value.to_owned())
396    }
397}
398impl From<String> for ScalarValue {
399    fn from(value: String) -> Self {
400        Self::String(value)
401    }
402}
403impl From<i32> for ScalarValue {
404    fn from(value: i32) -> Self {
405        Self::Int32(value)
406    }
407}
408#[derive(Debug, Clone, PartialEq, Eq)]
409pub enum Predicate {
410    Eq(&'static str, ScalarValue),
411    Ne(&'static str, ScalarValue),
412    IsNull(&'static str),
413    IsNotNull(&'static str),
414    In(&'static str, Vec<ScalarValue>),
415    LikeContains(&'static str, String),
416    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
417    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
418    /// so the filter is a full-scan-with-predicate - acceptable for
419    /// human-driven `--project re:...` queries, not for hot paths.
420    Regex(&'static str, String),
421    Gte(&'static str, ScalarValue),
422    Lte(&'static str, ScalarValue),
423    And(Vec<Predicate>),
424    Or(Vec<Predicate>),
425    Not(Box<Predicate>),
426}
427impl Predicate {
428    pub fn to_lance(&self) -> String {
429        match self {
430            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
431            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
432            Self::IsNull(column) => format!("{column} IS NULL"),
433            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
434            Self::In(column, values) => {
435                let values = values
436                    .iter()
437                    .map(ScalarValue::to_lance)
438                    .collect::<Vec<_>>()
439                    .join(", ");
440                format!("{column} IN ({values})")
441            }
442            Self::LikeContains(column, value) => {
443                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
444            }
445            Self::Regex(column, pattern) => {
446                format!("regexp_like({column}, {})", quoted_string(pattern))
447            }
448            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
449            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
450            Self::And(predicates) => predicates
451                .iter()
452                .map(Self::to_lance)
453                .filter(|predicate| !predicate.is_empty())
454                .collect::<Vec<_>>()
455                .join(" AND "),
456            Self::Or(predicates) => {
457                // Wrap in parens so the disjunction composes safely as a child
458                // of an outer `And` (SQL `OR` binds looser than `AND`).
459                let body = predicates
460                    .iter()
461                    .map(Self::to_lance)
462                    .filter(|predicate| !predicate.is_empty())
463                    .collect::<Vec<_>>()
464                    .join(" OR ");
465                if body.is_empty() {
466                    String::new()
467                } else {
468                    format!("({body})")
469                }
470            }
471            Self::Not(inner) => {
472                let body = inner.to_lance();
473                if body.is_empty() {
474                    String::new()
475                } else {
476                    format!("NOT ({body})")
477                }
478            }
479        }
480    }
481}
482/// Read-side options for `Handle::scan`: optional prefilter predicate and
483/// optional projection. Default = no filter, all columns.
484#[derive(Default)]
485pub struct ScanOpts<'a> {
486    pub predicate: Option<&'a Predicate>,
487    pub projection: Option<&'a [&'a str]>,
488}
489
490impl<'a> ScanOpts<'a> {
491    pub fn project_only(projection: &'a [&'a str]) -> Self {
492        Self {
493            predicate: None,
494            projection: Some(projection),
495        }
496    }
497    pub fn with_predicate_and_projection(
498        predicate: &'a Predicate,
499        projection: &'a [&'a str],
500    ) -> Self {
501        Self {
502            predicate: Some(predicate),
503            projection: Some(projection),
504        }
505    }
506}
507
508impl ScalarValue {
509    fn to_lance(&self) -> String {
510        match self {
511            Self::String(value) => quoted_string(value),
512            Self::Int32(value) => value.to_string(),
513            Self::Raw(value) => value.clone(),
514        }
515    }
516}
517/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
518/// default (local FS gets a tighter cap; object stores stay near Lance's
519/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
520#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
521pub struct RuntimeCaps {
522    pub index_cache_bytes: Option<usize>,
523    pub metadata_cache_bytes: Option<usize>,
524}
525
526impl RuntimeCaps {
527    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
528        Self {
529            index_cache_bytes: config.index_cache_bytes,
530            metadata_cache_bytes: config.metadata_cache_bytes,
531        }
532    }
533}
534
535/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
536/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
537/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
538const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
539const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
540/// Object-store defaults: latency to refill is per-page, so keep more in cache.
541const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
542const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
543
544fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
545    let (index_default, metadata_default) = if config::is_local(location) {
546        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
547    } else {
548        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
549    };
550    (
551        caps.index_cache_bytes.unwrap_or(index_default),
552        caps.metadata_cache_bytes.unwrap_or(metadata_default),
553    )
554}
555
556pub struct Handle {
557    datasets: DatasetSet,
558    retry: RetryPolicy,
559    /// One `lance::Session` shared across all three datasets. Carries the
560    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
561    /// the underlying object_store / S3 client). Sharing the session means
562    /// one cache pool covers all three tables and one S3 client serves all
563    /// three datasets - load-bearing on object-store backends where a
564    /// per-dataset client would mean 3x the connection pools and 3x the
565    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
566    #[allow(dead_code)]
567    session: Arc<Session>,
568    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
569    /// future hosted pond swaps to "rest" without touching read/write paths
570    /// (spec.md#lance-chokepoints-catalog).
571    nm: Arc<dyn LanceNamespace>,
572    /// Namespace identifier this handle binds to. v1 is always `root()`; the
573    /// typed seam matches `resolve_namespace`'s return so multi-namespace
574    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
575    nm_ident: NamespaceIdent,
576    /// Object-store options threaded through every `DatasetBuilder` and
577    /// `Dataset::write` call so refresh / index-creation paths inherit the
578    /// same credentials and region as the initial open. Empty on local-FS
579    /// installs.
580    storage_options: HashMap<String, String>,
581    /// Data-dir URL the handle was opened against. `pond status` reads this
582    /// to display where the bytes live and to decide whether to walk a local
583    /// directory or issue a remote `LIST` for sizing.
584    location: Url,
585    /// Cached `parts.lance` open metadata, used the first time a caller asks
586    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
587    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
588    parts_refresh_after: Duration,
589}
590
591impl std::fmt::Debug for Handle {
592    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593        formatter
594            .debug_struct("Handle")
595            .field("datasets", &self.datasets)
596            .field("retry", &self.retry)
597            .field("nm_ident", &self.nm_ident)
598            .field("storage_options", &self.storage_options)
599            .field("location", &self.location)
600            .finish()
601    }
602}
603
604#[derive(Debug, Clone, Copy, PartialEq, Eq)]
605pub enum Table {
606    Sessions,
607    Messages,
608    Parts,
609}
610impl Table {
611    pub fn as_str(self) -> &'static str {
612        self.label()
613    }
614
615    fn label(self) -> &'static str {
616        match self {
617            Self::Sessions => "sessions",
618            Self::Messages => "messages",
619            Self::Parts => "parts",
620        }
621    }
622}
623#[derive(Debug)]
624struct DatasetSet {
625    sessions: Mutex<CachedDataset>,
626    messages: Mutex<CachedDataset>,
627    /// `parts.lance` opens lazily on the first read or write that needs it:
628    /// any `pond_get` (every mode reads parts to build summaries), grouped
629    /// search hydrating user-hit summaries, or ingest with Part events. A
630    /// process that does none of those skips the file, saving its metadata
631    /// pages and file handle at cold-open. The OnceCell makes init
632    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
633    /// to the other two.
634    parts: OnceCell<Mutex<CachedDataset>>,
635}
636#[derive(Debug)]
637struct CachedDataset {
638    dataset: Dataset,
639    last_refresh: Instant,
640    refresh_after: Duration,
641}
642impl CachedDataset {
643    async fn latest(&mut self) -> Result<Dataset> {
644        if self.last_refresh.elapsed() >= self.refresh_after {
645            self.dataset.checkout_latest().await?;
646            self.last_refresh = Instant::now();
647        }
648        Ok(self.dataset.clone())
649    }
650    fn replace(&mut self, dataset: Dataset) {
651        self.dataset = dataset;
652        self.last_refresh = Instant::now();
653    }
654}
655impl Handle {
656    /// Open without storage options or explicit cache caps. Backend-aware
657    /// defaults from `[runtime]` apply.
658    pub async fn open(location: &Url) -> Result<Self> {
659        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
660    }
661
662    /// Open with object-store options handed through to Lance verbatim, plus
663    /// the resolved `[runtime]` cache caps. Object-store keys are the
664    /// `object_store` crate's standard config names; pond does not parse them.
665    /// Opening datasets never performs index work; index lifecycle lives under
666    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
667    pub async fn open_with_options(
668        location: &Url,
669        mut storage_options: HashMap<String, String>,
670        caps: RuntimeCaps,
671    ) -> Result<Self> {
672        if let Some(path) = config::local_path(location) {
673            tokio::fs::create_dir_all(&path)
674                .await
675                .with_context(|| format!("failed to create data dir {}", path.display()))?;
676        } else {
677            apply_remote_storage_defaults(&mut storage_options);
678        }
679        // One Session shared across all three datasets so metadata/index
680        // caches and the object_store registry (and thus any S3 client) are
681        // pooled rather than duplicated three times. Caps are sized by the
682        // `[runtime]` block; explicit values from `caps` win, otherwise the
683        // local/remote backend default kicks in.
684        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
685        let session = Arc::new(Session::new(
686            index_cache_bytes,
687            metadata_cache_bytes,
688            Arc::new(ObjectStoreRegistry::default()),
689        ));
690        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
691        // The `root` property is whatever URL the Directory impl understands;
692        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
693        // URLs, so passing the scheme-qualified URL for local FS works the
694        // same as the bare-path form. Trailing slash stripped for clean logs.
695        let root = location.as_str().trim_end_matches('/').to_string();
696        let mut connect = ConnectBuilder::new("dir")
697            .property("root", root)
698            .session(session.clone());
699        // Object-store credentials/region/endpoint flow into the namespace
700        // via the `storage.<key>` property convention (lance-namespace-impls
701        // dir.rs from_properties: lines 423-436).
702        for (key, value) in &storage_options {
703            connect = connect.property(format!("storage.{key}"), value.clone());
704        }
705        let nm: Arc<dyn LanceNamespace> = connect
706            .connect()
707            .await
708            .context("failed to connect lance Directory namespace")?;
709        let nm_ident = NamespaceIdent::root();
710        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
711        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
712        // essentially free and removes the stale-read window entirely. Object
713        // stores have real per-call cost; `5s` caps manifest fetch overhead at
714        // acceptable lag for human-driven queries.
715        let refresh_after = if config::is_local(location) {
716            Duration::ZERO
717        } else {
718            Duration::from_secs(5)
719        };
720        let handle = Self {
721            datasets: DatasetSet {
722                sessions: Mutex::new(CachedDataset {
723                    dataset: open_or_create_via_ns(
724                        &nm,
725                        &nm_ident,
726                        sessions::SESSIONS,
727                        sessions::session_schema(),
728                        &session,
729                        &storage_options,
730                    )
731                    .await?,
732                    last_refresh: Instant::now(),
733                    refresh_after,
734                }),
735                messages: Mutex::new(CachedDataset {
736                    dataset: open_or_create_via_ns(
737                        &nm,
738                        &nm_ident,
739                        sessions::MESSAGES,
740                        sessions::message_schema(),
741                        &session,
742                        &storage_options,
743                    )
744                    .await?,
745                    last_refresh: Instant::now(),
746                    refresh_after,
747                }),
748                parts: OnceCell::new(),
749            },
750            retry: RetryPolicy::default(),
751            session,
752            nm,
753            nm_ident,
754            storage_options,
755            location: location.clone(),
756            parts_refresh_after: refresh_after,
757        };
758        Ok(handle)
759    }
760
761    pub fn location(&self) -> &Url {
762        &self.location
763    }
764
765    /// Read-only view of the `storage_options` the handle was opened with.
766    /// `pond status` needs them to instantiate a raw `object_store` client
767    /// that can `LIST` the remote bucket for sizing.
768    pub fn storage_options(&self) -> &HashMap<String, String> {
769        &self.storage_options
770    }
771
772    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
773        Ok((
774            self.count_rows(Table::Sessions).await?,
775            self.count_rows(Table::Messages).await?,
776            self.count_rows(Table::Parts).await?,
777        ))
778    }
779
780    /// Insert-only merge: append new rows, never overwrite a matched PK.
781    /// Returns rows inserted. The fold lives separately under
782    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
783    pub(crate) async fn merge_insert(
784        &self,
785        table: Table,
786        batch: RecordBatch,
787        row_count: usize,
788    ) -> Result<u64> {
789        self.merge(
790            table,
791            batch,
792            row_count,
793            "merge_insert",
794            WhenMatched::DoNothing,
795            WhenNotMatched::InsertAll,
796        )
797        .await
798    }
799
800    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
801    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
802    pub(crate) async fn merge_update(
803        &self,
804        table: Table,
805        batch: RecordBatch,
806        row_count: usize,
807    ) -> Result<u64> {
808        self.merge(
809            table,
810            batch,
811            row_count,
812            "merge_update",
813            WhenMatched::UpdateAll,
814            WhenNotMatched::DoNothing,
815        )
816        .await
817    }
818
819    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
820    /// Returns the number of rows affected (inserted or updated, whichever the
821    /// behaviors produce).
822    async fn merge(
823        &self,
824        table: Table,
825        batch: RecordBatch,
826        row_count: usize,
827        op: &'static str,
828        when_matched: WhenMatched,
829        when_not_matched: WhenNotMatched,
830    ) -> Result<u64> {
831        if row_count == 0 {
832            return Ok(0);
833        }
834        let started = Instant::now();
835        let result = self
836            .retry_lance(table.label(), || async {
837                let mut cached = self.cached(table).await?.lock().await;
838                let existing = cached.latest().await?;
839                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
840                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
841                builder.when_matched(when_matched.clone());
842                builder.when_not_matched(when_not_matched.clone());
843                // pond presents each PK at most once per batch; FirstSeen keeps
844                // the first occurrence rather than failing (Lance's default).
845                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
846                // Cleanup is operator-driven via `pond index optimize`; the
847                // per-commit auto hook would add a LIST per write on remote
848                // backends without changing the steady-state retention.
849                builder.skip_auto_cleanup(true);
850                let (dataset, stats) = builder
851                    .try_build()?
852                    .execute_reader(Box::new(reader))
853                    .await?;
854                cached.replace(dataset.as_ref().clone());
855                Ok((
856                    stats.num_inserted_rows + stats.num_updated_rows,
857                    stats.num_skipped_duplicates,
858                ))
859            })
860            .await;
861        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
862        tracing::info!(
863            target: "pond::perf",
864            op,
865            table = %table.label(),
866            rows = row_count,
867            elapsed_ms = started.elapsed().as_millis() as u64,
868            skipped,
869            "merge",
870        );
871        result.map(|(affected, _)| affected)
872    }
873
874    /// Run the table-local maintenance cycle for the supplied index intents.
875    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
876    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
877    ///
878    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
879    /// commit independently and use independent retry budgets, so a hot writer
880    /// that starves compaction (Rewrite) does not abort the index build
881    /// (Update) the operator actually asked for.
882    pub async fn optimize_table(
883        &self,
884        table: Table,
885        intents: &[IndexIntent],
886        progress: Option<&OptimizeProgressFn>,
887        policy: &MaintenancePolicy,
888    ) -> TableOptimizeOutcome {
889        let compaction = self
890            .run_optimize_compact_phase(table, progress, policy)
891            .await;
892        let indices = self
893            .run_optimize_indices_phase(table, intents, progress)
894            .await;
895        TableOptimizeOutcome {
896            table,
897            indices,
898            compaction,
899        }
900    }
901
902    /// Run only the indices phase for one table. Used by `pond embed`'s tail
903    /// to fold newly written vectors into the indices without paying the
904    /// compaction retry budget while embed itself may still be writing.
905    pub async fn optimize_table_indices_only(
906        &self,
907        table: Table,
908        intents: &[IndexIntent],
909        progress: Option<&OptimizeProgressFn>,
910    ) -> PhaseOutcome {
911        self.run_optimize_indices_phase(table, intents, progress)
912            .await
913    }
914
915    async fn run_optimize_indices_phase(
916        &self,
917        table: Table,
918        intents: &[IndexIntent],
919        progress: Option<&OptimizeProgressFn>,
920    ) -> PhaseOutcome {
921        if intents.is_empty() {
922            return PhaseOutcome::Noop;
923        }
924        let result = self
925            .retry_lance(table.label(), || async {
926                let mut guard = self.cached(table).await?.lock().await;
927                let mut dataset = guard.latest().await?;
928                let did_work =
929                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
930                guard.replace(dataset);
931                Ok::<_, anyhow::Error>(did_work)
932            })
933            .await;
934        match result {
935            Ok(true) => PhaseOutcome::Ok,
936            Ok(false) => PhaseOutcome::Noop,
937            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
938            Err(error) => PhaseOutcome::Failed(error),
939        }
940    }
941
942    async fn run_optimize_compact_phase(
943        &self,
944        table: Table,
945        progress: Option<&OptimizeProgressFn>,
946        policy: &MaintenancePolicy,
947    ) -> PhaseOutcome {
948        let result = self
949            .retry_lance(table.label(), || async {
950                let mut guard = self.cached(table).await?.lock().await;
951                let mut dataset = guard.latest().await?;
952                optimize_table_compact(&mut dataset, table, progress, policy).await?;
953                guard.replace(dataset);
954                Ok::<_, anyhow::Error>(())
955            })
956            .await;
957        match result {
958            Ok(()) => PhaseOutcome::Ok,
959            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
960            Err(error) => PhaseOutcome::Failed(error),
961        }
962    }
963
964    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
965        self.retry_lance(table.label(), || async {
966            let mut guard = self.cached(table).await?.lock().await;
967            let mut dataset = guard.latest().await?;
968            rebuild_index(&mut dataset, intent).await?;
969            guard.replace(dataset);
970            Ok(())
971        })
972        .await
973    }
974
975    pub async fn index_status(
976        &self,
977        table: Table,
978        intents: &[IndexIntent],
979    ) -> Result<Vec<IndexStatus>> {
980        let dataset = self.dataset(table).await?;
981        index_status(table, &dataset, intents).await
982    }
983
984    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
985        let mut cached = self.cached(table).await?.lock().await;
986        cached.latest().await
987    }
988    /// Build a prefiltered `Scanner` for `table`. Composable read entry
989    /// point for callers that need to layer extra builder calls
990    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
991    /// Routine scans should prefer `Handle::scan`.
992    pub(crate) async fn scanner(
993        &self,
994        table: Table,
995        predicate: Option<&Predicate>,
996    ) -> Result<lance::dataset::scanner::Scanner> {
997        let dataset = self.dataset(table).await?;
998        scanner_with_prefilter(&dataset, predicate)
999    }
1000    /// Single read entry point: prefilter via `predicate`, optionally
1001    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
1002    pub async fn scan(
1003        &self,
1004        table: Table,
1005        opts: ScanOpts<'_>,
1006    ) -> Result<lance::dataset::scanner::Scanner> {
1007        let mut scanner = self.scanner(table, opts.predicate).await?;
1008        if let Some(projection) = opts.projection {
1009            scanner.project(projection)?;
1010        }
1011        Ok(scanner)
1012    }
1013    pub(crate) async fn scan_batch(
1014        &self,
1015        table: Table,
1016        predicate: Option<&Predicate>,
1017        projection: &[&str],
1018    ) -> Result<RecordBatch> {
1019        let opts = ScanOpts {
1020            predicate,
1021            projection: (!projection.is_empty()).then_some(projection),
1022        };
1023        self.scan(table, opts)
1024            .await?
1025            .try_into_batch()
1026            .await
1027            .context("scan failed")
1028    }
1029    pub async fn count_rows(&self, table: Table) -> Result<usize> {
1030        self.dataset(table)
1031            .await?
1032            .count_rows(None)
1033            .await
1034            .map_err(Into::into)
1035    }
1036    /// Names of every index on `messages` - the vector-index tests read this.
1037    #[cfg(test)]
1038    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1039        let dataset = self.dataset(Table::Messages).await?;
1040        let indices = dataset.load_indices().await?;
1041        Ok(indices.iter().map(|index| index.name.clone()).collect())
1042    }
1043
1044    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
1045    /// a missing index reports the whole table. Powers `pond index status`.
1046    pub(crate) async fn unindexed_row_count(
1047        &self,
1048        table: Table,
1049        index_name: &str,
1050    ) -> Result<usize> {
1051        let dataset = self.dataset(table).await?;
1052        let fragments = dataset
1053            .unindexed_fragments(index_name)
1054            .await
1055            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1056        Ok(fragments
1057            .iter()
1058            .map(|fragment| fragment.num_rows().unwrap_or(0))
1059            .sum())
1060    }
1061
1062    /// Drop the named index. Used by the `pond embed --force` model-swap path
1063    /// to retire an IVF_PQ whose centroids belong to the old distance
1064    /// space, before the next write re-bootstraps it over the new model's
1065    /// vectors. Errors when the index does not exist; callers may swallow
1066    /// that.
1067    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1068        let mut guard = self.cached(table).await?.lock().await;
1069        let mut dataset = guard.latest().await?;
1070        dataset
1071            .drop_index(name)
1072            .await
1073            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1074        guard.replace(dataset);
1075        Ok(())
1076    }
1077
1078    /// Resolve each table's stored location through the namespace catalog
1079    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
1080    async fn table_location(&self, table_name: &str) -> Result<String> {
1081        let request = DescribeTableRequest {
1082            id: Some(self.nm_ident.as_table_id(table_name)),
1083            ..Default::default()
1084        };
1085        let response = self
1086            .nm
1087            .describe_table(request)
1088            .await
1089            .with_context(|| format!("failed to describe table {table_name}"))?;
1090        response
1091            .location
1092            .with_context(|| format!("namespace returned no location for table {table_name}"))
1093    }
1094
1095    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1096    /// Every byte is sized by listing through Lance's object store
1097    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1098    pub async fn table_sizes(&self) -> Result<TableSizes> {
1099        let registry = Arc::new(ObjectStoreRegistry::default());
1100        let params = ObjectStoreParams {
1101            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1102                Arc::new(StorageOptionsAccessor::with_static_options(
1103                    self.storage_options.clone(),
1104                ))
1105            }),
1106            ..Default::default()
1107        };
1108
1109        let sessions = self
1110            .listed_size(
1111                &registry,
1112                &params,
1113                &self.table_location(sessions::SESSIONS).await?,
1114            )
1115            .await?;
1116        let messages = self
1117            .listed_size(
1118                &registry,
1119                &params,
1120                &self.table_location(sessions::MESSAGES).await?,
1121            )
1122            .await?;
1123        let parts = self
1124            .listed_size(
1125                &registry,
1126                &params,
1127                &self.table_location(sessions::PARTS).await?,
1128            )
1129            .await?;
1130        // `other` is whatever sits under the data-dir root but not in the three
1131        // tables (config.toml, stray index temp files): root total minus them.
1132        let root_total = self
1133            .listed_size(&registry, &params, self.location.as_str())
1134            .await?;
1135        let other = root_total.saturating_sub(sessions + messages + parts);
1136        Ok(TableSizes {
1137            sessions,
1138            messages,
1139            parts,
1140            other,
1141        })
1142    }
1143
1144    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1145    async fn listed_size(
1146        &self,
1147        registry: &Arc<ObjectStoreRegistry>,
1148        params: &ObjectStoreParams,
1149        uri: &str,
1150    ) -> Result<u64> {
1151        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1152            .await
1153            .with_context(|| format!("failed to open object store for {uri}"))?;
1154        let mut listing = store.list(Some(base));
1155        let mut total = 0u64;
1156        while let Some(meta) = listing.next().await {
1157            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1158            total += meta.size;
1159        }
1160        Ok(total)
1161    }
1162    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1163        match table {
1164            Table::Sessions => Ok(&self.datasets.sessions),
1165            Table::Messages => Ok(&self.datasets.messages),
1166            Table::Parts => self.parts_cached().await,
1167        }
1168    }
1169
1170    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1171    /// `OnceCell`; once initialized, behaves identically to the other two.
1172    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1173        self.datasets
1174            .parts
1175            .get_or_try_init(|| async {
1176                let dataset = open_or_create_via_ns(
1177                    &self.nm,
1178                    &self.nm_ident,
1179                    sessions::PARTS,
1180                    sessions::part_schema(),
1181                    &self.session,
1182                    &self.storage_options,
1183                )
1184                .await?;
1185                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1186                    dataset,
1187                    last_refresh: Instant::now(),
1188                    refresh_after: self.parts_refresh_after,
1189                }))
1190            })
1191            .await
1192    }
1193    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1194    where
1195        Fut: std::future::Future<Output = Result<T>>,
1196        Op: FnMut() -> Fut,
1197    {
1198        let mut attempt = 0u8;
1199        loop {
1200            attempt = attempt.saturating_add(1);
1201            match operation().await {
1202                Ok(value) => return Ok(value),
1203                Err(error) if attempt < self.retry.attempts => {
1204                    let backoff = self.backoff(attempt);
1205                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1206                    // drops everything below the top-level message.
1207                    let error_chain = format!("{error:#}");
1208                    tracing::warn!(
1209                        label,
1210                        attempt,
1211                        ?backoff,
1212                        error = %error_chain,
1213                        "retrying Lance operation"
1214                    );
1215                    tokio::time::sleep(backoff).await;
1216                }
1217                Err(error) => {
1218                    let error_chain = format!("{error:#}");
1219                    tracing::warn!(
1220                        label,
1221                        attempt,
1222                        error = %error_chain,
1223                        "Lance operation exhausted retries"
1224                    );
1225                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1226                    // rather than the generic `storage_unavailable` bucket. The
1227                    // chain root is a `lance::Error` (commit-conflict family) when
1228                    // pond's retry layer exhausted because the manifest could not
1229                    // be advanced; everything else (timeouts, IAM, disk) stays
1230                    // `storage_unavailable`.
1231                    if is_commit_conflict(&error) {
1232                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1233                    }
1234                    return Err(error);
1235                }
1236            }
1237        }
1238    }
1239    fn backoff(&self, attempt: u8) -> Duration {
1240        let shift = u32::from(attempt.saturating_sub(1));
1241        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1242        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1243        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1244        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1245        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1246        base.mul_f64(factor).min(self.retry.max_backoff)
1247    }
1248}
1249/// Compaction phase: `compact_files` + `cleanup_old_versions`, both inside one
1250/// retry block. Distinct from the indices phase so a hot writer that loses the
1251/// Rewrite race here does not abort index work the operator actually asked for.
1252///
1253/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1254/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1255/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1256/// requires row_addrs but none were provided": `rewrite_files` skips
1257/// row_addrs when stable row ids are on, then the FRI builder demands
1258/// them. With stable_row_ids the remap step is already a no-op
1259/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1260/// !defer_index_remap`), so running without FRI is correct - we only
1261/// lose the documented concurrency-with-index-build benefit. Flip to
1262/// `true` once upstream fixes the conflict.
1263async fn optimize_table_compact(
1264    dataset: &mut Dataset,
1265    table: Table,
1266    progress: Option<&OptimizeProgressFn>,
1267    policy: &MaintenancePolicy,
1268) -> Result<()> {
1269    let compaction = CompactionOptions {
1270        defer_index_remap: false,
1271        ..CompactionOptions::default()
1272    };
1273
1274    // Candidacy mirrors Lance's planner: a fragment is compactable iff it holds
1275    // fewer than target_rows_per_fragment rows (optimize.rs).
1276    let target = compaction.target_rows_per_fragment;
1277    let fragments = dataset.get_fragments();
1278    let (mergeable_run_rows, candidate_count) = compaction_candidates(
1279        fragments
1280            .iter()
1281            .map(|fragment| fragment.metadata().physical_rows.unwrap_or(0)),
1282        target,
1283    );
1284    if should_compact(
1285        mergeable_run_rows,
1286        candidate_count,
1287        target,
1288        policy.compaction_fragment_cap,
1289    ) {
1290        emit(
1291            progress,
1292            OptimizeEvent::PhaseStart {
1293                table,
1294                phase: OptimizePhase::Compact,
1295                detail: None,
1296            },
1297        );
1298        let started = Instant::now();
1299        compact_files(dataset, compaction, None).await?;
1300        emit(
1301            progress,
1302            OptimizeEvent::PhaseDone {
1303                table,
1304                phase: OptimizePhase::Compact,
1305                elapsed_ms: started.elapsed().as_millis() as u64,
1306            },
1307        );
1308    } else {
1309        tracing::debug!(
1310            target: "pond::perf",
1311            table = table.as_str(),
1312            mergeable_run_rows,
1313            candidate_count,
1314            cap = policy.compaction_fragment_cap,
1315            "compaction skipped: sub-target fragments under threshold",
1316        );
1317    }
1318
1319    // Safe GC only. delete_unverified=false keeps Lance's 7-day in-progress
1320    // guard, so this never races a concurrent writer (spec.md#concurrency); GC
1321    // runs outside OCC, so the guard is what makes it safe on any backend.
1322    emit(
1323        progress,
1324        OptimizeEvent::PhaseStart {
1325            table,
1326            phase: OptimizePhase::Cleanup,
1327            detail: None,
1328        },
1329    );
1330    let started = Instant::now();
1331    dataset
1332        .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1333        .await
1334        .context("cleanup_old_versions failed during index optimize")?;
1335    emit(
1336        progress,
1337        OptimizeEvent::PhaseDone {
1338            table,
1339            phase: OptimizePhase::Cleanup,
1340            elapsed_ms: started.elapsed().as_millis() as u64,
1341        },
1342    );
1343
1344    Ok(())
1345}
1346
1347/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1348/// for incremental families. Returns `true` if anything committed.
1349async fn optimize_table_indices(
1350    dataset: &mut Dataset,
1351    intents: &[IndexIntent],
1352    table: Table,
1353    progress: Option<&OptimizeProgressFn>,
1354) -> Result<bool> {
1355    let existing = dataset.load_indices().await?;
1356    let existing_names: std::collections::HashSet<String> =
1357        existing.iter().map(|index| index.name.clone()).collect();
1358
1359    let mut append_indices: Vec<String> = Vec::new();
1360    let mut did_work = false;
1361
1362    for intent in intents {
1363        let exists = existing_names.contains(intent.name);
1364
1365        if !exists {
1366            if !intent.trigger.should_create(dataset).await? {
1367                continue;
1368            }
1369            let params = intent.params.build(dataset).await?;
1370            let index_type = intent.params.index_type();
1371            tracing::info!(
1372                index = intent.name,
1373                column = intent.column,
1374                "creating Lance index (trigger fired)",
1375            );
1376            emit(
1377                progress,
1378                OptimizeEvent::PhaseStart {
1379                    table,
1380                    phase: OptimizePhase::IndexCreate,
1381                    detail: Some(intent.name.to_owned()),
1382                },
1383            );
1384            let started = Instant::now();
1385            dataset
1386                .create_index(
1387                    &[intent.column],
1388                    index_type,
1389                    Some(intent.name.to_owned()),
1390                    params.as_ref(),
1391                    false,
1392                )
1393                .await
1394                .with_context(|| format!("failed to create index {}", intent.name))?;
1395            emit(
1396                progress,
1397                OptimizeEvent::PhaseDone {
1398                    table,
1399                    phase: OptimizePhase::IndexCreate,
1400                    elapsed_ms: started.elapsed().as_millis() as u64,
1401                },
1402            );
1403            did_work = true;
1404            continue;
1405        }
1406
1407        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1408        if unindexed.is_empty() {
1409            continue;
1410        }
1411        // Lag guard: let fragments accumulate behind the brute-force fallback
1412        // rather than firing a commit per tiny append. Threshold is operator-
1413        // tunable via `[maintenance].index_lag_threshold`.
1414        if unindexed.len() < index_lag_threshold() {
1415            continue;
1416        }
1417        match intent.params {
1418            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1419                let params = intent.params.build(dataset).await?;
1420                let index_type = intent.params.index_type();
1421                tracing::debug!(
1422                    target: "pond::perf",
1423                    index = intent.name,
1424                    column = intent.column,
1425                    "rebuilding Lance BTree index",
1426                );
1427                emit(
1428                    progress,
1429                    OptimizeEvent::PhaseStart {
1430                        table,
1431                        phase: OptimizePhase::IndexRebuild,
1432                        detail: Some(intent.name.to_owned()),
1433                    },
1434                );
1435                let started = Instant::now();
1436                dataset
1437                    .create_index(
1438                        &[intent.column],
1439                        index_type,
1440                        Some(intent.name.to_owned()),
1441                        params.as_ref(),
1442                        true,
1443                    )
1444                    .await
1445                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1446                emit(
1447                    progress,
1448                    OptimizeEvent::PhaseDone {
1449                        table,
1450                        phase: OptimizePhase::IndexRebuild,
1451                        elapsed_ms: started.elapsed().as_millis() as u64,
1452                    },
1453                );
1454                did_work = true;
1455            }
1456            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1457            | IndexParamsKind::InvertedFtsNgram { .. }
1458            | IndexParamsKind::IvfPqCosine { .. } => {
1459                append_indices.push(intent.name.to_owned());
1460            }
1461            IndexParamsKind::Scalar(_) => {
1462                let params = intent.params.build(dataset).await?;
1463                emit(
1464                    progress,
1465                    OptimizeEvent::PhaseStart {
1466                        table,
1467                        phase: OptimizePhase::IndexRebuild,
1468                        detail: Some(intent.name.to_owned()),
1469                    },
1470                );
1471                let started = Instant::now();
1472                dataset
1473                    .create_index(
1474                        &[intent.column],
1475                        intent.params.index_type(),
1476                        Some(intent.name.to_owned()),
1477                        params.as_ref(),
1478                        true,
1479                    )
1480                    .await
1481                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1482                emit(
1483                    progress,
1484                    OptimizeEvent::PhaseDone {
1485                        table,
1486                        phase: OptimizePhase::IndexRebuild,
1487                        elapsed_ms: started.elapsed().as_millis() as u64,
1488                    },
1489                );
1490                did_work = true;
1491            }
1492        }
1493    }
1494
1495    if !append_indices.is_empty() {
1496        let to_append = append_indices.clone();
1497        emit(
1498            progress,
1499            OptimizeEvent::PhaseStart {
1500                table,
1501                phase: OptimizePhase::IndexAppend,
1502                detail: Some(append_indices.join(", ")),
1503            },
1504        );
1505        let started = Instant::now();
1506        dataset
1507            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1508            .await
1509            .context("optimize_indices(append) failed during index optimize")?;
1510        emit(
1511            progress,
1512            OptimizeEvent::PhaseDone {
1513                table,
1514                phase: OptimizePhase::IndexAppend,
1515                elapsed_ms: started.elapsed().as_millis() as u64,
1516            },
1517        );
1518        tracing::debug!(
1519            target: "pond::perf",
1520            indices = ?append_indices,
1521            "appended trailing fragments into indices",
1522        );
1523        did_work = true;
1524    }
1525
1526    Ok(did_work)
1527}
1528
1529async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1530    if !intent.trigger.should_create(dataset).await? {
1531        return Ok(());
1532    }
1533    let params = intent.params.build(dataset).await?;
1534    dataset
1535        .create_index(
1536            &[intent.column],
1537            intent.params.index_type(),
1538            Some(intent.name.to_owned()),
1539            params.as_ref(),
1540            true,
1541        )
1542        .await
1543        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1544    Ok(())
1545}
1546
1547async fn index_status(
1548    table: Table,
1549    dataset: &Dataset,
1550    intents: &[IndexIntent],
1551) -> Result<Vec<IndexStatus>> {
1552    let existing = dataset.load_indices().await?;
1553    let existing_names: std::collections::HashSet<String> =
1554        existing.iter().map(|index| index.name.clone()).collect();
1555    let total_fragments = dataset.get_fragments().len();
1556    let total_rows = dataset.count_rows(None).await?;
1557    let mut statuses = Vec::with_capacity(intents.len());
1558    for intent in intents {
1559        let exists = existing_names.contains(intent.name);
1560        if !exists {
1561            statuses.push(IndexStatus {
1562                table,
1563                intent_name: intent.name.to_owned(),
1564                fragments_covered: 0,
1565                unindexed_fragments: total_fragments,
1566                unindexed_rows: total_rows,
1567                exists,
1568            });
1569            continue;
1570        }
1571        let unindexed = dataset
1572            .unindexed_fragments(intent.name)
1573            .await
1574            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1575        let unindexed_fragments = unindexed.len();
1576        let unindexed_rows = unindexed
1577            .iter()
1578            .map(|fragment| fragment.num_rows().unwrap_or(0))
1579            .sum();
1580        statuses.push(IndexStatus {
1581            table,
1582            intent_name: intent.name.to_owned(),
1583            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1584            unindexed_fragments,
1585            unindexed_rows,
1586            exists,
1587        });
1588    }
1589    Ok(statuses)
1590}
1591
1592/// Open the table at `table_name` via the namespace; create + initialize on
1593/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1594/// expectation so a stale data dir surfaces early.
1595///
1596/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1597/// the builder re-wraps an already-`Namespace`-wrapped error
1598/// (lance/src/dataset/builder.rs:142), so going through it would force a
1599/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1600/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1601/// namespace external-manifest commits) is not wired here; v1 ships
1602/// Directory v2 only.
1603async fn open_or_create_via_ns(
1604    nm: &Arc<dyn LanceNamespace>,
1605    nm_ident: &NamespaceIdent,
1606    table_name: &str,
1607    schema: lance::deps::arrow_schema::SchemaRef,
1608    session: &Arc<Session>,
1609    storage_options: &HashMap<String, String>,
1610) -> Result<Dataset> {
1611    let table_id = nm_ident.as_table_id(table_name);
1612
1613    let request = DescribeTableRequest {
1614        id: Some(table_id.clone()),
1615        ..Default::default()
1616    };
1617    match nm.describe_table(request).await {
1618        Ok(response) => {
1619            let location = response.location.with_context(|| {
1620                format!("namespace returned no location for table {table_name}")
1621            })?;
1622            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1623            if !storage_options.is_empty() {
1624                builder = builder.with_storage_options(storage_options.clone());
1625            }
1626            let dataset = builder
1627                .load()
1628                .await
1629                .with_context(|| format!("failed to open table {table_name}"))?;
1630            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1631            return Ok(dataset);
1632        }
1633        Err(error) => match &error {
1634            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1635                // fall through to create
1636            }
1637            _ => {
1638                return Err(anyhow::Error::from(error))
1639                    .with_context(|| format!("failed to describe table {table_name}"));
1640            }
1641        },
1642    }
1643
1644    // Create path: pond seeds an empty dataset with the canonical schema so
1645    // every subsequent open lands on a real Lance dataset, not a phantom.
1646    let mut write_params = sessions::write_params_for_create();
1647    write_params.session = Some(session.clone());
1648    write_params.mode = WriteMode::Create;
1649    if !storage_options.is_empty() {
1650        write_params.store_params = Some(ObjectStoreParams {
1651            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1652                storage_options.clone(),
1653            ))),
1654            ..Default::default()
1655        });
1656    }
1657    let reader = sessions::empty_reader(schema)?;
1658    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1659        .await
1660        .with_context(|| format!("failed to create table {table_name}"))
1661}
1662
1663// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
1664// before the underlying `NamespaceError`; walk the whole `.source()` chain
1665// rather than only matching the outer variant.
1666fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1667    if !matches!(error, lance::Error::Namespace { .. }) {
1668        return false;
1669    }
1670    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1671        link.source()
1672    })
1673    .filter_map(|link| link.downcast_ref::<NamespaceError>())
1674    .any(|inner| inner.code() == code)
1675}
1676
1677fn scanner_with_prefilter(
1678    dataset: &Dataset,
1679    predicate: Option<&Predicate>,
1680) -> Result<lance::dataset::scanner::Scanner> {
1681    let mut scanner = dataset.scan();
1682    scanner.prefilter(true);
1683    if let Some(predicate) = predicate {
1684        let filter = predicate.to_lance();
1685        if !filter.is_empty() {
1686            scanner.filter(&filter)?;
1687        }
1688    }
1689    Ok(scanner)
1690}
1691fn ensure_schema_matches(
1692    dataset: &Dataset,
1693    expected: &lance::deps::arrow_schema::Schema,
1694    table_name: &str,
1695) -> Result<()> {
1696    use lance::deps::arrow_schema::DataType;
1697    use std::collections::BTreeSet;
1698    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1699    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1700    let expected_names: BTreeSet<&str> = expected
1701        .fields()
1702        .iter()
1703        .map(|f| f.name().as_str())
1704        .collect();
1705    if actual_names != expected_names {
1706        anyhow::bail!(
1707            "table {table_name} has columns {actual_names:?} but this pond build expects \
1708             {expected_names:?} - the on-disk store predates a schema change; delete the \
1709             data directory and re-run `pond ingest`",
1710        );
1711    }
1712    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1713    // the on-disk vector column width) early with a friendly message. Lance
1714    // would otherwise reject the next write with an opaque schema-mismatch
1715    // error inside the `merge_update` path.
1716    for actual_field in actual.fields() {
1717        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1718            continue;
1719        };
1720        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1721            (actual_field.data_type(), expected_field.data_type())
1722            && actual_dim != expected_dim
1723        {
1724            tracing::warn!(
1725                table = table_name,
1726                column = actual_field.name(),
1727                actual_dim,
1728                expected_dim,
1729                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1730            );
1731        }
1732    }
1733    Ok(())
1734}
1735/// Object-store defaults injected for any non-local pond location. Each key
1736/// is only set when neither the user-provided key nor its env-var-form alias
1737/// is already present, so explicit overrides in `[storage]` always win.
1738/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1739/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1740/// signature is wasted work the server does not validate.
1741fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1742    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1743        if aliases
1744            .iter()
1745            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1746        {
1747            return;
1748        }
1749        options.insert(aliases[0].to_owned(), value.to_owned());
1750    }
1751    set_default(options, &["pool_idle_timeout"], "300 seconds");
1752    set_default(options, &["connect_timeout"], "10 seconds");
1753    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1754        .iter()
1755        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1756    if has_custom_endpoint {
1757        set_default(
1758            options,
1759            &["aws_unsigned_payload", "unsigned_payload"],
1760            "true",
1761        );
1762    }
1763}
1764
1765fn quoted_string(value: &str) -> String {
1766    format!("'{}'", value.replace('\'', "''"))
1767}
1768fn like_contains(value: &str) -> String {
1769    let escaped = value
1770        .replace('\\', "\\\\")
1771        .replace('%', "\\%")
1772        .replace('_', "\\_")
1773        .replace('\'', "''");
1774    format!("'%{escaped}%'")
1775}
1776
1777#[cfg(test)]
1778mod tests {
1779    use super::*;
1780    use tempfile::TempDir;
1781
1782    #[test]
1783    fn compaction_gate_skips_subtarget_trickle_compacts_on_progress() {
1784        let target = 1_048_576;
1785        // Bloat case: a large trailing fragment plus a tiny new one - under a
1786        // full target fragment and under the cap -> skip (don't re-Rewrite).
1787        assert!(!should_compact(510_000 + 30, 2, target, 64));
1788        // A run that fills a whole target fragment -> compact (and freeze it).
1789        assert!(should_compact(target, 3, target, 64));
1790        // Many tiny fragments past the cap -> compact to bound fragment count.
1791        assert!(should_compact(5_000, 64, target, 64));
1792        // cap == 0 always compacts (preserves pre-gate behavior for tests).
1793        assert!(should_compact(0, 0, target, 0));
1794    }
1795
1796    #[test]
1797    fn compaction_candidates_strands_isolated_subtarget_fragment() {
1798        let target = 1_048_576;
1799        // [at-target, isolated 256K, at-target, tail 510K, tiny 30]: the only
1800        // mergeable run is tail+tiny; the 256K between at-target frags is stranded,
1801        // so even though sub-target rows total 766K it never re-fires the gate.
1802        let (run, count) =
1803            compaction_candidates([1_048_576, 256_000, 1_048_576, 510_000, 30], target);
1804        assert_eq!(count, 3);
1805        assert_eq!(run, 510_030);
1806        assert!(!should_compact(run, count, target, 64));
1807    }
1808
1809    #[test]
1810    fn namespace_error_code_walks_wrapped_chain() {
1811        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1812            message: "missing".into(),
1813        }));
1814        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1815
1816        let wrapped = lance::Error::namespace_source(Box::new(direct));
1817        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1818
1819        let other_code =
1820            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1821                message: "nope".into(),
1822            }));
1823        assert!(!is_namespace_error_code(
1824            &other_code,
1825            ErrorCode::TableNotFound
1826        ));
1827
1828        let not_namespace = lance::Error::internal("unrelated");
1829        assert!(!is_namespace_error_code(
1830            &not_namespace,
1831            ErrorCode::TableNotFound
1832        ));
1833    }
1834
1835    /// Round-trip: opening a fresh data dir through `lance-namespace`
1836    /// produces all three tables, and `Handle::scan` returns an empty batch
1837    /// for each (no spurious schema mismatch, no namespace error).
1838    #[tokio::test]
1839    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1840        let temp = TempDir::new()?;
1841        let url = Url::from_directory_path(temp.path())
1842            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1843        let handle = Handle::open(&url).await?;
1844        // Each table has its own PK column; project the canonical one so the
1845        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
1846        let cases: [(Table, &[&str]); 3] = [
1847            (Table::Sessions, &["id"]),
1848            (Table::Messages, &["id"]),
1849            (Table::Parts, &["id"]),
1850        ];
1851        for (table, projection) in cases {
1852            let scanner = handle
1853                .scan(table, ScanOpts::project_only(projection))
1854                .await?;
1855            let batch = scanner.try_into_batch().await?;
1856            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1857        }
1858        Ok(())
1859    }
1860}