Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::NamespaceError;
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30    collections::HashMap,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37/// Embedded-row count at which pond builds the IVF_PQ vector index on
38/// `messages.vector` (spec.md#search). Below it, vector search runs a
39/// brute-force flat scan - exact and fast at small and medium scale, and
40/// IVF_PQ cannot train well on fewer vectors anyway.
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43/// Default minimum unindexed-fragment count required before a per-intent
44/// append/rebuild step is admitted into `optimize_table_indices`. Lower
45/// values make each commit smaller and more frequent (bad on remote
46/// stores); higher values let fragments accumulate behind the brute-force
47/// fallback. 4 is the floor of the documented 4-8 band.
48pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52/// Seed the process-wide index-lag threshold from `[search].index_lag_threshold`.
53/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
54pub fn init_index_lag_threshold(value: usize) {
55    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59    INDEX_LAG_THRESHOLD_RUNTIME
60        .get()
61        .copied()
62        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65/// Declarative description of one index pond keeps on a table. Created when
66/// its trigger fires; folded forward by `pond index optimize`.
67#[derive(Debug, Clone)]
68pub struct IndexIntent {
69    /// Stable on-disk name. Must match across runs so existence checks
70    /// resolve.
71    pub name: &'static str,
72    /// Column the index covers.
73    pub column: &'static str,
74    /// Condition evaluated against the live dataset before each cycle.
75    pub trigger: IndexTrigger,
76    /// How the params are built at create time. Some intents have static
77    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
78    pub params: IndexParamsKind,
79}
80
81/// When an [`IndexIntent`] should exist on disk.
82#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84    /// Build whenever the table has any rows. Used for FTS and scalar
85    /// indices: there is no training cost worth delaying.
86    OnAnyRows,
87    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
88    /// IVF_PQ vector index, which trains poorly on too few vectors.
89    OnNonNullCount {
90        column: &'static str,
91        threshold: usize,
92    },
93}
94
95/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
96/// right `IndexParams` at create time.
97#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
100    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
101    Scalar(BuiltinIndexType),
102    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
103    /// `[min, max]` range and stemming / stop-words off
104    /// (spec.md#search-language-neutral-index).
105    InvertedFtsNgram { min: u32, max: u32 },
106    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
107    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
108    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
109    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
110    IvfPqCosine {
111        sub_vectors: usize,
112        num_bits: u8,
113        max_iters: usize,
114    },
115}
116
117impl IndexTrigger {
118    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119        match self {
120            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121            Self::OnNonNullCount { column, threshold } => {
122                let count = dataset
123                    .count_rows(Some(format!("{column} IS NOT NULL")))
124                    .await?;
125                Ok(count >= *threshold)
126            }
127        }
128    }
129}
130
131impl IndexParamsKind {
132    fn index_type(&self) -> IndexType {
133        match self {
134            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135            Self::Scalar(_) => IndexType::BTree,
136            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137            Self::IvfPqCosine { .. } => IndexType::Vector,
138        }
139    }
140
141    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142        match self {
143            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145                InvertedIndexParams::default()
146                    .base_tokenizer("ngram".to_owned())
147                    .ngram_min_length(*min)
148                    .ngram_max_length(*max)
149                    .stem(false)
150                    .remove_stop_words(false),
151            )),
152            Self::IvfPqCosine {
153                sub_vectors,
154                num_bits,
155                max_iters,
156            } => {
157                let count = dataset
158                    .count_rows(Some("vector IS NOT NULL".to_owned()))
159                    .await?;
160                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161                Ok(Box::new(VectorIndexParams::ivf_pq(
162                    partitions,
163                    *num_bits,
164                    *sub_vectors,
165                    MetricType::Cosine,
166                    *max_iters,
167                )))
168            }
169        }
170    }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175    pub table: Table,
176    pub intent_name: String,
177    pub fragments_covered: usize,
178    pub unindexed_rows: usize,
179    pub exists: bool,
180}
181
182/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
183/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
184/// downcasts to this type to classify the outcome as `conflict` rather than
185/// the generic `storage_unavailable`.
186#[derive(Debug, Clone, Copy)]
187pub struct ConflictExhausted {
188    pub attempts: u8,
189}
190
191impl std::fmt::Display for ConflictExhausted {
192    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
193        write!(
194            formatter,
195            "commit conflict exhausted after {} attempt(s)",
196            self.attempts
197        )
198    }
199}
200
201impl std::error::Error for ConflictExhausted {}
202
203/// Per-phase result for one table's pass through `Handle::optimize_table`.
204/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
205/// compaction phase get independent retry budgets and independent commits,
206/// so a hot writer that starves the Rewrite cannot abort the index Update.
207#[derive(Debug)]
208pub enum PhaseOutcome {
209    /// Phase attempted and committed work.
210    Ok,
211    /// Phase attempted; no work was needed.
212    Noop,
213    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
214    /// can rerun later once the hot writer quiesces).
215    SkippedConflict,
216    /// Phase failed with a non-conflict error.
217    Failed(anyhow::Error),
218    /// Phase not requested by the caller (e.g. compaction skipped under
219    /// `Store::build_indices_only`).
220    NotAttempted,
221}
222
223impl PhaseOutcome {
224    pub fn is_failed(&self) -> bool {
225        matches!(self, Self::Failed(_))
226    }
227}
228
229/// What `Handle::optimize_table` did for one table.
230#[derive(Debug)]
231pub struct TableOptimizeOutcome {
232    pub table: Table,
233    pub indices: PhaseOutcome,
234    pub compaction: PhaseOutcome,
235}
236
237/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
238/// progress callback to render a live spinner; library callers pass `None`.
239#[derive(Debug, Clone)]
240pub enum OptimizeEvent {
241    PhaseStart {
242        table: Table,
243        phase: OptimizePhase,
244        detail: Option<String>,
245    },
246    PhaseDone {
247        table: Table,
248        phase: OptimizePhase,
249        elapsed_ms: u64,
250    },
251}
252
253#[derive(Debug, Clone, Copy)]
254pub enum OptimizePhase {
255    Compact,
256    Cleanup,
257    IndexCreate,
258    IndexRebuild,
259    IndexAppend,
260}
261
262impl OptimizePhase {
263    pub fn label(self) -> &'static str {
264        match self {
265            Self::Compact => "compact",
266            Self::Cleanup => "cleanup",
267            Self::IndexCreate => "index-create",
268            Self::IndexRebuild => "index-rebuild",
269            Self::IndexAppend => "index-append",
270        }
271    }
272}
273
274pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
275
276fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
277    if let Some(callback) = progress {
278        callback(event);
279    }
280}
281
282/// True when the chain root is one of Lance's commit-conflict variants
283/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
284/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
285pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
286    error.downcast_ref::<lance::Error>().is_some_and(|err| {
287        matches!(
288            err,
289            lance::Error::CommitConflict { .. }
290                | lance::Error::RetryableCommitConflict { .. }
291                | lance::Error::TooMuchWriteContention { .. }
292        )
293    })
294}
295
296/// True when `retry_lance` exhausted retries against an OCC conflict and
297/// attached `ConflictExhausted` to the chain head.
298fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
299    error.chain().any(|cause| cause.is::<ConflictExhausted>())
300}
301
302/// On-disk byte totals for the three session datasets, plus everything else
303/// under the data-dir root. Sized by listing through Lance's object-store
304/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
305#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
306pub struct TableSizes {
307    pub sessions: u64,
308    pub messages: u64,
309    pub parts: u64,
310    pub other: u64,
311}
312
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub enum ScalarValue {
315    String(String),
316    Int32(i32),
317    Raw(String),
318}
319impl From<&str> for ScalarValue {
320    fn from(value: &str) -> Self {
321        Self::String(value.to_owned())
322    }
323}
324impl From<String> for ScalarValue {
325    fn from(value: String) -> Self {
326        Self::String(value)
327    }
328}
329impl From<i32> for ScalarValue {
330    fn from(value: i32) -> Self {
331        Self::Int32(value)
332    }
333}
334#[derive(Debug, Clone, PartialEq, Eq)]
335pub enum Predicate {
336    Eq(&'static str, ScalarValue),
337    Ne(&'static str, ScalarValue),
338    IsNull(&'static str),
339    IsNotNull(&'static str),
340    In(&'static str, Vec<ScalarValue>),
341    LikeContains(&'static str, String),
342    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
343    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
344    /// so the filter is a full-scan-with-predicate - acceptable for
345    /// human-driven `--project re:...` queries, not for hot paths.
346    Regex(&'static str, String),
347    Gte(&'static str, ScalarValue),
348    Lte(&'static str, ScalarValue),
349    And(Vec<Predicate>),
350    Or(Vec<Predicate>),
351}
352impl Predicate {
353    pub fn to_lance(&self) -> String {
354        match self {
355            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
356            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
357            Self::IsNull(column) => format!("{column} IS NULL"),
358            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
359            Self::In(column, values) => {
360                let values = values
361                    .iter()
362                    .map(ScalarValue::to_lance)
363                    .collect::<Vec<_>>()
364                    .join(", ");
365                format!("{column} IN ({values})")
366            }
367            Self::LikeContains(column, value) => {
368                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
369            }
370            Self::Regex(column, pattern) => {
371                format!("regexp_like({column}, {})", quoted_string(pattern))
372            }
373            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
374            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
375            Self::And(predicates) => predicates
376                .iter()
377                .map(Self::to_lance)
378                .filter(|predicate| !predicate.is_empty())
379                .collect::<Vec<_>>()
380                .join(" AND "),
381            Self::Or(predicates) => {
382                // Wrap in parens so the disjunction composes safely as a child
383                // of an outer `And` (SQL `OR` binds looser than `AND`).
384                let body = predicates
385                    .iter()
386                    .map(Self::to_lance)
387                    .filter(|predicate| !predicate.is_empty())
388                    .collect::<Vec<_>>()
389                    .join(" OR ");
390                if body.is_empty() {
391                    String::new()
392                } else {
393                    format!("({body})")
394                }
395            }
396        }
397    }
398}
399/// Read-side options for `Handle::scan`: optional prefilter predicate and
400/// optional projection. Default = no filter, all columns.
401#[derive(Default)]
402pub struct ScanOpts<'a> {
403    pub predicate: Option<&'a Predicate>,
404    pub projection: Option<&'a [&'a str]>,
405}
406
407impl<'a> ScanOpts<'a> {
408    pub fn project_only(projection: &'a [&'a str]) -> Self {
409        Self {
410            predicate: None,
411            projection: Some(projection),
412        }
413    }
414    pub fn with_predicate_and_projection(
415        predicate: &'a Predicate,
416        projection: &'a [&'a str],
417    ) -> Self {
418        Self {
419            predicate: Some(predicate),
420            projection: Some(projection),
421        }
422    }
423}
424
425impl ScalarValue {
426    fn to_lance(&self) -> String {
427        match self {
428            Self::String(value) => quoted_string(value),
429            Self::Int32(value) => value.to_string(),
430            Self::Raw(value) => value.clone(),
431        }
432    }
433}
434/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
435/// default (local FS gets a tighter cap; object stores stay near Lance's
436/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
437#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
438pub struct RuntimeCaps {
439    pub index_cache_bytes: Option<usize>,
440    pub metadata_cache_bytes: Option<usize>,
441}
442
443impl RuntimeCaps {
444    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
445        Self {
446            index_cache_bytes: config.index_cache_bytes,
447            metadata_cache_bytes: config.metadata_cache_bytes,
448        }
449    }
450}
451
452/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
453/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
454/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
455const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
456const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
457/// Object-store defaults: latency to refill is per-page, so keep more in cache.
458const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
459const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
460
461fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
462    let (index_default, metadata_default) = if config::is_local(location) {
463        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
464    } else {
465        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
466    };
467    (
468        caps.index_cache_bytes.unwrap_or(index_default),
469        caps.metadata_cache_bytes.unwrap_or(metadata_default),
470    )
471}
472
473pub struct Handle {
474    datasets: DatasetSet,
475    retry: RetryPolicy,
476    /// One `lance::Session` shared across all three datasets. Carries the
477    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
478    /// the underlying object_store / S3 client). Sharing the session means
479    /// one cache pool covers all three tables and one S3 client serves all
480    /// three datasets - load-bearing on object-store backends where a
481    /// per-dataset client would mean 3x the connection pools and 3x the
482    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
483    #[allow(dead_code)]
484    session: Arc<Session>,
485    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
486    /// future hosted pond swaps to "rest" without touching read/write paths
487    /// (spec.md#lance-chokepoints-catalog).
488    nm: Arc<dyn LanceNamespace>,
489    /// Namespace identifier this handle binds to. v1 is always `root()`; the
490    /// typed seam matches `resolve_namespace`'s return so multi-namespace
491    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
492    nm_ident: NamespaceIdent,
493    /// Object-store options threaded through every `DatasetBuilder` and
494    /// `Dataset::write` call so refresh / index-creation paths inherit the
495    /// same credentials and region as the initial open. Empty on local-FS
496    /// installs.
497    storage_options: HashMap<String, String>,
498    /// Data-dir URL the handle was opened against. `pond status` reads this
499    /// to display where the bytes live and to decide whether to walk a local
500    /// directory or issue a remote `LIST` for sizing.
501    location: Url,
502    /// Cached `parts.lance` open metadata, used the first time a caller asks
503    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
504    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
505    parts_refresh_after: Duration,
506}
507
508impl std::fmt::Debug for Handle {
509    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
510        formatter
511            .debug_struct("Handle")
512            .field("datasets", &self.datasets)
513            .field("retry", &self.retry)
514            .field("nm_ident", &self.nm_ident)
515            .field("storage_options", &self.storage_options)
516            .field("location", &self.location)
517            .finish()
518    }
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
522pub enum Table {
523    Sessions,
524    Messages,
525    Parts,
526}
527impl Table {
528    pub fn as_str(self) -> &'static str {
529        self.label()
530    }
531
532    fn label(self) -> &'static str {
533        match self {
534            Self::Sessions => "sessions",
535            Self::Messages => "messages",
536            Self::Parts => "parts",
537        }
538    }
539}
540#[derive(Debug)]
541struct DatasetSet {
542    sessions: Mutex<CachedDataset>,
543    messages: Mutex<CachedDataset>,
544    /// `parts.lance` opens lazily on the first read or write that needs it:
545    /// any `pond_get` (every mode reads parts to build summaries), grouped
546    /// search hydrating user-hit summaries, or ingest with Part events. A
547    /// process that does none of those skips the file, saving its metadata
548    /// pages and file handle at cold-open. The OnceCell makes init
549    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
550    /// to the other two.
551    parts: OnceCell<Mutex<CachedDataset>>,
552}
553#[derive(Debug)]
554struct CachedDataset {
555    dataset: Dataset,
556    last_refresh: Instant,
557    refresh_after: Duration,
558}
559impl CachedDataset {
560    async fn latest(&mut self) -> Result<Dataset> {
561        if self.last_refresh.elapsed() >= self.refresh_after {
562            self.dataset.checkout_latest().await?;
563            self.last_refresh = Instant::now();
564        }
565        Ok(self.dataset.clone())
566    }
567    fn replace(&mut self, dataset: Dataset) {
568        self.dataset = dataset;
569        self.last_refresh = Instant::now();
570    }
571}
572impl Handle {
573    /// Open without storage options or explicit cache caps. Backend-aware
574    /// defaults from `[runtime]` apply.
575    pub async fn open(location: &Url) -> Result<Self> {
576        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
577    }
578
579    /// Open with object-store options handed through to Lance verbatim, plus
580    /// the resolved `[runtime]` cache caps. Object-store keys are the
581    /// `object_store` crate's standard config names; pond does not parse them.
582    /// Opening datasets never performs index work; index lifecycle lives under
583    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
584    pub async fn open_with_options(
585        location: &Url,
586        mut storage_options: HashMap<String, String>,
587        caps: RuntimeCaps,
588    ) -> Result<Self> {
589        if let Some(path) = config::local_path(location) {
590            tokio::fs::create_dir_all(&path)
591                .await
592                .with_context(|| format!("failed to create data dir {}", path.display()))?;
593        } else {
594            apply_remote_storage_defaults(&mut storage_options);
595        }
596        // One Session shared across all three datasets so metadata/index
597        // caches and the object_store registry (and thus any S3 client) are
598        // pooled rather than duplicated three times. Caps are sized by the
599        // `[runtime]` block; explicit values from `caps` win, otherwise the
600        // local/remote backend default kicks in.
601        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
602        let session = Arc::new(Session::new(
603            index_cache_bytes,
604            metadata_cache_bytes,
605            Arc::new(ObjectStoreRegistry::default()),
606        ));
607        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
608        // The `root` property is whatever URL the Directory impl understands;
609        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
610        // URLs, so passing the scheme-qualified URL for local FS works the
611        // same as the bare-path form. Trailing slash stripped for clean logs.
612        let root = location.as_str().trim_end_matches('/').to_string();
613        let mut connect = ConnectBuilder::new("dir")
614            .property("root", root)
615            .session(session.clone());
616        // Object-store credentials/region/endpoint flow into the namespace
617        // via the `storage.<key>` property convention (lance-namespace-impls
618        // dir.rs from_properties: lines 423-436).
619        for (key, value) in &storage_options {
620            connect = connect.property(format!("storage.{key}"), value.clone());
621        }
622        let nm: Arc<dyn LanceNamespace> = connect
623            .connect()
624            .await
625            .context("failed to connect lance Directory namespace")?;
626        let nm_ident = NamespaceIdent::root();
627        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
628        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
629        // essentially free and removes the stale-read window entirely. Object
630        // stores have real per-call cost; `5s` caps manifest fetch overhead at
631        // acceptable lag for human-driven queries.
632        let refresh_after = if config::is_local(location) {
633            Duration::ZERO
634        } else {
635            Duration::from_secs(5)
636        };
637        let handle = Self {
638            datasets: DatasetSet {
639                sessions: Mutex::new(CachedDataset {
640                    dataset: open_or_create_via_ns(
641                        &nm,
642                        &nm_ident,
643                        sessions::SESSIONS,
644                        sessions::session_schema(),
645                        &session,
646                        &storage_options,
647                    )
648                    .await?,
649                    last_refresh: Instant::now(),
650                    refresh_after,
651                }),
652                messages: Mutex::new(CachedDataset {
653                    dataset: open_or_create_via_ns(
654                        &nm,
655                        &nm_ident,
656                        sessions::MESSAGES,
657                        sessions::message_schema(),
658                        &session,
659                        &storage_options,
660                    )
661                    .await?,
662                    last_refresh: Instant::now(),
663                    refresh_after,
664                }),
665                parts: OnceCell::new(),
666            },
667            retry: RetryPolicy::default(),
668            session,
669            nm,
670            nm_ident,
671            storage_options,
672            location: location.clone(),
673            parts_refresh_after: refresh_after,
674        };
675        Ok(handle)
676    }
677
678    pub fn location(&self) -> &Url {
679        &self.location
680    }
681
682    /// Read-only view of the `storage_options` the handle was opened with.
683    /// `pond status` needs them to instantiate a raw `object_store` client
684    /// that can `LIST` the remote bucket for sizing.
685    pub fn storage_options(&self) -> &HashMap<String, String> {
686        &self.storage_options
687    }
688
689    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
690        Ok((
691            self.count_rows(Table::Sessions).await?,
692            self.count_rows(Table::Messages).await?,
693            self.count_rows(Table::Parts).await?,
694        ))
695    }
696
697    /// Insert-only merge: append new rows, never overwrite a matched PK.
698    /// Returns rows inserted. The fold lives separately under
699    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
700    pub(crate) async fn merge_insert(
701        &self,
702        table: Table,
703        batch: RecordBatch,
704        row_count: usize,
705    ) -> Result<u64> {
706        self.merge(
707            table,
708            batch,
709            row_count,
710            "merge_insert",
711            WhenMatched::DoNothing,
712            WhenNotMatched::InsertAll,
713        )
714        .await
715    }
716
717    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
718    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
719    pub(crate) async fn merge_update(
720        &self,
721        table: Table,
722        batch: RecordBatch,
723        row_count: usize,
724    ) -> Result<u64> {
725        self.merge(
726            table,
727            batch,
728            row_count,
729            "merge_update",
730            WhenMatched::UpdateAll,
731            WhenNotMatched::DoNothing,
732        )
733        .await
734    }
735
736    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
737    /// Returns the number of rows affected (inserted or updated, whichever the
738    /// behaviors produce).
739    async fn merge(
740        &self,
741        table: Table,
742        batch: RecordBatch,
743        row_count: usize,
744        op: &'static str,
745        when_matched: WhenMatched,
746        when_not_matched: WhenNotMatched,
747    ) -> Result<u64> {
748        if row_count == 0 {
749            return Ok(0);
750        }
751        let started = Instant::now();
752        let result = self
753            .retry_lance(table.label(), || async {
754                let mut cached = self.cached(table).await?.lock().await;
755                let existing = cached.latest().await?;
756                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
757                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
758                builder.when_matched(when_matched.clone());
759                builder.when_not_matched(when_not_matched.clone());
760                // pond presents each PK at most once per batch; FirstSeen keeps
761                // the first occurrence rather than failing (Lance's default).
762                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
763                // Cleanup is operator-driven via `pond index optimize`; the
764                // per-commit auto hook would add a LIST per write on remote
765                // backends without changing the steady-state retention.
766                builder.skip_auto_cleanup(true);
767                let (dataset, stats) = builder
768                    .try_build()?
769                    .execute_reader(Box::new(reader))
770                    .await?;
771                cached.replace(dataset.as_ref().clone());
772                Ok((
773                    stats.num_inserted_rows + stats.num_updated_rows,
774                    stats.num_skipped_duplicates,
775                ))
776            })
777            .await;
778        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
779        tracing::info!(
780            target: "pond::perf",
781            op,
782            table = %table.label(),
783            rows = row_count,
784            elapsed_ms = started.elapsed().as_millis() as u64,
785            skipped,
786            "merge",
787        );
788        result.map(|(affected, _)| affected)
789    }
790
791    /// Run the table-local maintenance cycle for the supplied index intents.
792    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
793    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
794    ///
795    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
796    /// commit independently and use independent retry budgets, so a hot writer
797    /// that starves compaction (Rewrite) does not abort the index build
798    /// (Update) the operator actually asked for.
799    pub async fn optimize_table(
800        &self,
801        table: Table,
802        intents: &[IndexIntent],
803        progress: Option<&OptimizeProgressFn>,
804        cleanup: crate::sessions::CleanupConfig,
805    ) -> TableOptimizeOutcome {
806        let compaction = self
807            .run_optimize_compact_phase(table, progress, cleanup)
808            .await;
809        let indices = self
810            .run_optimize_indices_phase(table, intents, progress)
811            .await;
812        TableOptimizeOutcome {
813            table,
814            indices,
815            compaction,
816        }
817    }
818
819    /// Run only the indices phase for one table. Used by `pond embed`'s tail
820    /// to fold newly written vectors into the indices without paying the
821    /// compaction retry budget while embed itself may still be writing.
822    pub async fn optimize_table_indices_only(
823        &self,
824        table: Table,
825        intents: &[IndexIntent],
826        progress: Option<&OptimizeProgressFn>,
827    ) -> PhaseOutcome {
828        self.run_optimize_indices_phase(table, intents, progress)
829            .await
830    }
831
832    async fn run_optimize_indices_phase(
833        &self,
834        table: Table,
835        intents: &[IndexIntent],
836        progress: Option<&OptimizeProgressFn>,
837    ) -> PhaseOutcome {
838        if intents.is_empty() {
839            return PhaseOutcome::Noop;
840        }
841        let result = self
842            .retry_lance(table.label(), || async {
843                let mut guard = self.cached(table).await?.lock().await;
844                let mut dataset = guard.latest().await?;
845                let did_work =
846                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
847                guard.replace(dataset);
848                Ok::<_, anyhow::Error>(did_work)
849            })
850            .await;
851        match result {
852            Ok(true) => PhaseOutcome::Ok,
853            Ok(false) => PhaseOutcome::Noop,
854            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
855            Err(error) => PhaseOutcome::Failed(error),
856        }
857    }
858
859    async fn run_optimize_compact_phase(
860        &self,
861        table: Table,
862        progress: Option<&OptimizeProgressFn>,
863        cleanup: crate::sessions::CleanupConfig,
864    ) -> PhaseOutcome {
865        let result = self
866            .retry_lance(table.label(), || async {
867                let mut guard = self.cached(table).await?.lock().await;
868                let mut dataset = guard.latest().await?;
869                optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
870                guard.replace(dataset);
871                Ok::<_, anyhow::Error>(())
872            })
873            .await;
874        match result {
875            Ok(()) => PhaseOutcome::Ok,
876            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
877            Err(error) => PhaseOutcome::Failed(error),
878        }
879    }
880
881    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
882        self.retry_lance(table.label(), || async {
883            let mut guard = self.cached(table).await?.lock().await;
884            let mut dataset = guard.latest().await?;
885            rebuild_index(&mut dataset, intent).await?;
886            guard.replace(dataset);
887            Ok(())
888        })
889        .await
890    }
891
892    pub async fn index_status(
893        &self,
894        table: Table,
895        intents: &[IndexIntent],
896    ) -> Result<Vec<IndexStatus>> {
897        let dataset = self.dataset(table).await?;
898        index_status(table, &dataset, intents).await
899    }
900
901    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
902        let mut cached = self.cached(table).await?.lock().await;
903        cached.latest().await
904    }
905    /// Build a prefiltered `Scanner` for `table`. Composable read entry
906    /// point for callers that need to layer extra builder calls
907    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
908    /// Routine scans should prefer `Handle::scan`.
909    pub(crate) async fn scanner(
910        &self,
911        table: Table,
912        predicate: Option<&Predicate>,
913    ) -> Result<lance::dataset::scanner::Scanner> {
914        let dataset = self.dataset(table).await?;
915        scanner_with_prefilter(&dataset, predicate)
916    }
917    /// Single read entry point: prefilter via `predicate`, optionally
918    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
919    pub async fn scan(
920        &self,
921        table: Table,
922        opts: ScanOpts<'_>,
923    ) -> Result<lance::dataset::scanner::Scanner> {
924        let mut scanner = self.scanner(table, opts.predicate).await?;
925        if let Some(projection) = opts.projection {
926            scanner.project(projection)?;
927        }
928        Ok(scanner)
929    }
930    pub(crate) async fn scan_batch(
931        &self,
932        table: Table,
933        predicate: Option<&Predicate>,
934        projection: &[&str],
935    ) -> Result<RecordBatch> {
936        let opts = ScanOpts {
937            predicate,
938            projection: (!projection.is_empty()).then_some(projection),
939        };
940        self.scan(table, opts)
941            .await?
942            .try_into_batch()
943            .await
944            .context("scan failed")
945    }
946    pub async fn count_rows(&self, table: Table) -> Result<usize> {
947        self.dataset(table)
948            .await?
949            .count_rows(None)
950            .await
951            .map_err(Into::into)
952    }
953    /// Names of every index on `messages` - the vector-index tests read this.
954    #[cfg(test)]
955    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
956        let dataset = self.dataset(Table::Messages).await?;
957        let indices = dataset.load_indices().await?;
958        Ok(indices.iter().map(|index| index.name.clone()).collect())
959    }
960
961    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
962    /// a missing index reports the whole table. Powers `pond index status`.
963    pub(crate) async fn unindexed_row_count(
964        &self,
965        table: Table,
966        index_name: &str,
967    ) -> Result<usize> {
968        let dataset = self.dataset(table).await?;
969        let fragments = dataset
970            .unindexed_fragments(index_name)
971            .await
972            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
973        Ok(fragments
974            .iter()
975            .map(|fragment| fragment.num_rows().unwrap_or(0))
976            .sum())
977    }
978
979    /// Drop the named index. Used by the `pond embed --force` model-swap path
980    /// to retire an IVF_PQ whose centroids belong to the old distance
981    /// space, before the next write re-bootstraps it over the new model's
982    /// vectors. Errors when the index does not exist; callers may swallow
983    /// that.
984    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
985        let mut guard = self.cached(table).await?.lock().await;
986        let mut dataset = guard.latest().await?;
987        dataset
988            .drop_index(name)
989            .await
990            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
991        guard.replace(dataset);
992        Ok(())
993    }
994
995    /// Resolve each table's stored location through the namespace catalog
996    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
997    async fn table_location(&self, table_name: &str) -> Result<String> {
998        let request = DescribeTableRequest {
999            id: Some(self.nm_ident.as_table_id(table_name)),
1000            ..Default::default()
1001        };
1002        let response = self
1003            .nm
1004            .describe_table(request)
1005            .await
1006            .with_context(|| format!("failed to describe table {table_name}"))?;
1007        response
1008            .location
1009            .with_context(|| format!("namespace returned no location for table {table_name}"))
1010    }
1011
1012    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1013    /// Every byte is sized by listing through Lance's object store
1014    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1015    pub async fn table_sizes(&self) -> Result<TableSizes> {
1016        let registry = Arc::new(ObjectStoreRegistry::default());
1017        let params = ObjectStoreParams {
1018            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1019                Arc::new(StorageOptionsAccessor::with_static_options(
1020                    self.storage_options.clone(),
1021                ))
1022            }),
1023            ..Default::default()
1024        };
1025
1026        let sessions = self
1027            .listed_size(
1028                &registry,
1029                &params,
1030                &self.table_location(sessions::SESSIONS).await?,
1031            )
1032            .await?;
1033        let messages = self
1034            .listed_size(
1035                &registry,
1036                &params,
1037                &self.table_location(sessions::MESSAGES).await?,
1038            )
1039            .await?;
1040        let parts = self
1041            .listed_size(
1042                &registry,
1043                &params,
1044                &self.table_location(sessions::PARTS).await?,
1045            )
1046            .await?;
1047        // `other` is whatever sits under the data-dir root but not in the three
1048        // tables (config.toml, stray index temp files): root total minus them.
1049        let root_total = self
1050            .listed_size(&registry, &params, self.location.as_str())
1051            .await?;
1052        let other = root_total.saturating_sub(sessions + messages + parts);
1053        Ok(TableSizes {
1054            sessions,
1055            messages,
1056            parts,
1057            other,
1058        })
1059    }
1060
1061    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1062    async fn listed_size(
1063        &self,
1064        registry: &Arc<ObjectStoreRegistry>,
1065        params: &ObjectStoreParams,
1066        uri: &str,
1067    ) -> Result<u64> {
1068        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1069            .await
1070            .with_context(|| format!("failed to open object store for {uri}"))?;
1071        let mut listing = store.list(Some(base));
1072        let mut total = 0u64;
1073        while let Some(meta) = listing.next().await {
1074            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1075            total += meta.size;
1076        }
1077        Ok(total)
1078    }
1079    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1080        match table {
1081            Table::Sessions => Ok(&self.datasets.sessions),
1082            Table::Messages => Ok(&self.datasets.messages),
1083            Table::Parts => self.parts_cached().await,
1084        }
1085    }
1086
1087    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1088    /// `OnceCell`; once initialized, behaves identically to the other two.
1089    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1090        self.datasets
1091            .parts
1092            .get_or_try_init(|| async {
1093                let dataset = open_or_create_via_ns(
1094                    &self.nm,
1095                    &self.nm_ident,
1096                    sessions::PARTS,
1097                    sessions::part_schema(),
1098                    &self.session,
1099                    &self.storage_options,
1100                )
1101                .await?;
1102                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1103                    dataset,
1104                    last_refresh: Instant::now(),
1105                    refresh_after: self.parts_refresh_after,
1106                }))
1107            })
1108            .await
1109    }
1110    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1111    where
1112        Fut: std::future::Future<Output = Result<T>>,
1113        Op: FnMut() -> Fut,
1114    {
1115        let mut attempt = 0u8;
1116        loop {
1117            attempt = attempt.saturating_add(1);
1118            match operation().await {
1119                Ok(value) => return Ok(value),
1120                Err(error) if attempt < self.retry.attempts => {
1121                    let backoff = self.backoff(attempt);
1122                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1123                    // drops everything below the top-level message.
1124                    let error_chain = format!("{error:#}");
1125                    tracing::warn!(
1126                        label,
1127                        attempt,
1128                        ?backoff,
1129                        error = %error_chain,
1130                        "retrying Lance operation"
1131                    );
1132                    tokio::time::sleep(backoff).await;
1133                }
1134                Err(error) => {
1135                    let error_chain = format!("{error:#}");
1136                    tracing::warn!(
1137                        label,
1138                        attempt,
1139                        error = %error_chain,
1140                        "Lance operation exhausted retries"
1141                    );
1142                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1143                    // rather than the generic `storage_unavailable` bucket. The
1144                    // chain root is a `lance::Error` (commit-conflict family) when
1145                    // pond's retry layer exhausted because the manifest could not
1146                    // be advanced; everything else (timeouts, IAM, disk) stays
1147                    // `storage_unavailable`.
1148                    if is_commit_conflict(&error) {
1149                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1150                    }
1151                    return Err(error);
1152                }
1153            }
1154        }
1155    }
1156    fn backoff(&self, attempt: u8) -> Duration {
1157        let shift = u32::from(attempt.saturating_sub(1));
1158        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1159        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1160        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1161        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1162        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1163        base.mul_f64(factor).min(self.retry.max_backoff)
1164    }
1165}
1166/// Compaction phase: `compact_files` + `cleanup_old_versions`, both inside one
1167/// retry block. Distinct from the indices phase so a hot writer that loses the
1168/// Rewrite race here does not abort index work the operator actually asked for.
1169///
1170/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1171/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1172/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1173/// requires row_addrs but none were provided": `rewrite_files` skips
1174/// row_addrs when stable row ids are on, then the FRI builder demands
1175/// them. With stable_row_ids the remap step is already a no-op
1176/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1177/// !defer_index_remap`), so running without FRI is correct - we only
1178/// lose the documented concurrency-with-index-build benefit. Flip to
1179/// `true` once upstream fixes the conflict.
1180async fn optimize_table_compact(
1181    dataset: &mut Dataset,
1182    table: Table,
1183    progress: Option<&OptimizeProgressFn>,
1184    cleanup: crate::sessions::CleanupConfig,
1185) -> Result<()> {
1186    let compaction = CompactionOptions {
1187        defer_index_remap: false,
1188        ..CompactionOptions::default()
1189    };
1190
1191    emit(
1192        progress,
1193        OptimizeEvent::PhaseStart {
1194            table,
1195            phase: OptimizePhase::Compact,
1196            detail: None,
1197        },
1198    );
1199    let started = Instant::now();
1200    compact_files(dataset, compaction, None).await?;
1201    emit(
1202        progress,
1203        OptimizeEvent::PhaseDone {
1204            table,
1205            phase: OptimizePhase::Compact,
1206            elapsed_ms: started.elapsed().as_millis() as u64,
1207        },
1208    );
1209
1210    emit(
1211        progress,
1212        OptimizeEvent::PhaseStart {
1213            table,
1214            phase: OptimizePhase::Cleanup,
1215            detail: None,
1216        },
1217    );
1218    let started = Instant::now();
1219    // delete_unverified=true is the operator opt-in via `--cleanup-older-than`
1220    // / `--vacuum` that bypasses Lance's 7-day in-progress safety guard
1221    // (UNVERIFIED_THRESHOLD_DAYS in lance/dataset/cleanup.rs). Required to
1222    // reclaim files younger than 7 days; unsafe under concurrent writers.
1223    dataset
1224        .cleanup_old_versions(
1225            cleanup.older_than,
1226            Some(cleanup.delete_unverified),
1227            Some(false),
1228        )
1229        .await
1230        .context("cleanup_old_versions failed during index optimize")?;
1231    emit(
1232        progress,
1233        OptimizeEvent::PhaseDone {
1234            table,
1235            phase: OptimizePhase::Cleanup,
1236            elapsed_ms: started.elapsed().as_millis() as u64,
1237        },
1238    );
1239
1240    Ok(())
1241}
1242
1243/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1244/// for incremental families. Returns `true` if anything committed.
1245async fn optimize_table_indices(
1246    dataset: &mut Dataset,
1247    intents: &[IndexIntent],
1248    table: Table,
1249    progress: Option<&OptimizeProgressFn>,
1250) -> Result<bool> {
1251    let existing = dataset.load_indices().await?;
1252    let existing_names: std::collections::HashSet<String> =
1253        existing.iter().map(|index| index.name.clone()).collect();
1254
1255    let mut append_indices: Vec<String> = Vec::new();
1256    let mut did_work = false;
1257
1258    for intent in intents {
1259        let exists = existing_names.contains(intent.name);
1260
1261        if !exists {
1262            if !intent.trigger.should_create(dataset).await? {
1263                continue;
1264            }
1265            let params = intent.params.build(dataset).await?;
1266            let index_type = intent.params.index_type();
1267            tracing::info!(
1268                index = intent.name,
1269                column = intent.column,
1270                "creating Lance index (trigger fired)",
1271            );
1272            emit(
1273                progress,
1274                OptimizeEvent::PhaseStart {
1275                    table,
1276                    phase: OptimizePhase::IndexCreate,
1277                    detail: Some(intent.name.to_owned()),
1278                },
1279            );
1280            let started = Instant::now();
1281            dataset
1282                .create_index(
1283                    &[intent.column],
1284                    index_type,
1285                    Some(intent.name.to_owned()),
1286                    params.as_ref(),
1287                    false,
1288                )
1289                .await
1290                .with_context(|| format!("failed to create index {}", intent.name))?;
1291            emit(
1292                progress,
1293                OptimizeEvent::PhaseDone {
1294                    table,
1295                    phase: OptimizePhase::IndexCreate,
1296                    elapsed_ms: started.elapsed().as_millis() as u64,
1297                },
1298            );
1299            did_work = true;
1300            continue;
1301        }
1302
1303        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1304        if unindexed.is_empty() {
1305            continue;
1306        }
1307        // Lag guard: let fragments accumulate behind the brute-force fallback
1308        // rather than firing a commit per tiny append. Threshold is operator-
1309        // tunable via `[search].index_lag_threshold`.
1310        if unindexed.len() < index_lag_threshold() {
1311            continue;
1312        }
1313        match intent.params {
1314            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1315                let params = intent.params.build(dataset).await?;
1316                let index_type = intent.params.index_type();
1317                tracing::debug!(
1318                    target: "pond::perf",
1319                    index = intent.name,
1320                    column = intent.column,
1321                    "rebuilding Lance BTree index",
1322                );
1323                emit(
1324                    progress,
1325                    OptimizeEvent::PhaseStart {
1326                        table,
1327                        phase: OptimizePhase::IndexRebuild,
1328                        detail: Some(intent.name.to_owned()),
1329                    },
1330                );
1331                let started = Instant::now();
1332                dataset
1333                    .create_index(
1334                        &[intent.column],
1335                        index_type,
1336                        Some(intent.name.to_owned()),
1337                        params.as_ref(),
1338                        true,
1339                    )
1340                    .await
1341                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1342                emit(
1343                    progress,
1344                    OptimizeEvent::PhaseDone {
1345                        table,
1346                        phase: OptimizePhase::IndexRebuild,
1347                        elapsed_ms: started.elapsed().as_millis() as u64,
1348                    },
1349                );
1350                did_work = true;
1351            }
1352            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1353            | IndexParamsKind::InvertedFtsNgram { .. }
1354            | IndexParamsKind::IvfPqCosine { .. } => {
1355                append_indices.push(intent.name.to_owned());
1356            }
1357            IndexParamsKind::Scalar(_) => {
1358                let params = intent.params.build(dataset).await?;
1359                emit(
1360                    progress,
1361                    OptimizeEvent::PhaseStart {
1362                        table,
1363                        phase: OptimizePhase::IndexRebuild,
1364                        detail: Some(intent.name.to_owned()),
1365                    },
1366                );
1367                let started = Instant::now();
1368                dataset
1369                    .create_index(
1370                        &[intent.column],
1371                        intent.params.index_type(),
1372                        Some(intent.name.to_owned()),
1373                        params.as_ref(),
1374                        true,
1375                    )
1376                    .await
1377                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1378                emit(
1379                    progress,
1380                    OptimizeEvent::PhaseDone {
1381                        table,
1382                        phase: OptimizePhase::IndexRebuild,
1383                        elapsed_ms: started.elapsed().as_millis() as u64,
1384                    },
1385                );
1386                did_work = true;
1387            }
1388        }
1389    }
1390
1391    if !append_indices.is_empty() {
1392        let to_append = append_indices.clone();
1393        emit(
1394            progress,
1395            OptimizeEvent::PhaseStart {
1396                table,
1397                phase: OptimizePhase::IndexAppend,
1398                detail: Some(append_indices.join(", ")),
1399            },
1400        );
1401        let started = Instant::now();
1402        dataset
1403            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1404            .await
1405            .context("optimize_indices(append) failed during index optimize")?;
1406        emit(
1407            progress,
1408            OptimizeEvent::PhaseDone {
1409                table,
1410                phase: OptimizePhase::IndexAppend,
1411                elapsed_ms: started.elapsed().as_millis() as u64,
1412            },
1413        );
1414        tracing::debug!(
1415            target: "pond::perf",
1416            indices = ?append_indices,
1417            "appended trailing fragments into indices",
1418        );
1419        did_work = true;
1420    }
1421
1422    Ok(did_work)
1423}
1424
1425async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1426    if !intent.trigger.should_create(dataset).await? {
1427        return Ok(());
1428    }
1429    let params = intent.params.build(dataset).await?;
1430    dataset
1431        .create_index(
1432            &[intent.column],
1433            intent.params.index_type(),
1434            Some(intent.name.to_owned()),
1435            params.as_ref(),
1436            true,
1437        )
1438        .await
1439        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1440    Ok(())
1441}
1442
1443async fn index_status(
1444    table: Table,
1445    dataset: &Dataset,
1446    intents: &[IndexIntent],
1447) -> Result<Vec<IndexStatus>> {
1448    let existing = dataset.load_indices().await?;
1449    let existing_names: std::collections::HashSet<String> =
1450        existing.iter().map(|index| index.name.clone()).collect();
1451    let total_fragments = dataset.get_fragments().len();
1452    let total_rows = dataset.count_rows(None).await?;
1453    let mut statuses = Vec::with_capacity(intents.len());
1454    for intent in intents {
1455        let exists = existing_names.contains(intent.name);
1456        if !exists {
1457            statuses.push(IndexStatus {
1458                table,
1459                intent_name: intent.name.to_owned(),
1460                fragments_covered: 0,
1461                unindexed_rows: total_rows,
1462                exists,
1463            });
1464            continue;
1465        }
1466        let unindexed = dataset
1467            .unindexed_fragments(intent.name)
1468            .await
1469            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1470        let unindexed_rows = unindexed
1471            .iter()
1472            .map(|fragment| fragment.num_rows().unwrap_or(0))
1473            .sum();
1474        statuses.push(IndexStatus {
1475            table,
1476            intent_name: intent.name.to_owned(),
1477            fragments_covered: total_fragments.saturating_sub(unindexed.len()),
1478            unindexed_rows,
1479            exists,
1480        });
1481    }
1482    Ok(statuses)
1483}
1484
1485/// Open the table at `table_name` via the namespace; create + initialize on
1486/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1487/// expectation so a stale data dir surfaces early.
1488///
1489/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1490/// the builder re-wraps an already-`Namespace`-wrapped error
1491/// (lance/src/dataset/builder.rs:142), so going through it would force a
1492/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1493/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1494/// namespace external-manifest commits) is not wired here; v1 ships
1495/// Directory v2 only.
1496async fn open_or_create_via_ns(
1497    nm: &Arc<dyn LanceNamespace>,
1498    nm_ident: &NamespaceIdent,
1499    table_name: &str,
1500    schema: lance::deps::arrow_schema::SchemaRef,
1501    session: &Arc<Session>,
1502    storage_options: &HashMap<String, String>,
1503) -> Result<Dataset> {
1504    let table_id = nm_ident.as_table_id(table_name);
1505
1506    let request = DescribeTableRequest {
1507        id: Some(table_id.clone()),
1508        ..Default::default()
1509    };
1510    match nm.describe_table(request).await {
1511        Ok(response) => {
1512            let location = response.location.with_context(|| {
1513                format!("namespace returned no location for table {table_name}")
1514            })?;
1515            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1516            if !storage_options.is_empty() {
1517                builder = builder.with_storage_options(storage_options.clone());
1518            }
1519            let dataset = builder
1520                .load()
1521                .await
1522                .with_context(|| format!("failed to open table {table_name}"))?;
1523            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1524            return Ok(dataset);
1525        }
1526        Err(error) => match &error {
1527            lance::Error::Namespace { source, .. }
1528                if matches!(
1529                    source.downcast_ref::<NamespaceError>(),
1530                    Some(NamespaceError::TableNotFound { .. })
1531                ) =>
1532            {
1533                // fall through to create
1534            }
1535            _ => {
1536                return Err(anyhow::Error::from(error))
1537                    .with_context(|| format!("failed to describe table {table_name}"));
1538            }
1539        },
1540    }
1541
1542    // Create path: pond seeds an empty dataset with the canonical schema so
1543    // every subsequent open lands on a real Lance dataset, not a phantom.
1544    let mut write_params = sessions::write_params_for_create();
1545    write_params.session = Some(session.clone());
1546    write_params.mode = WriteMode::Create;
1547    if !storage_options.is_empty() {
1548        write_params.store_params = Some(ObjectStoreParams {
1549            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1550                storage_options.clone(),
1551            ))),
1552            ..Default::default()
1553        });
1554    }
1555    let reader = sessions::empty_reader(schema)?;
1556    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1557        .await
1558        .with_context(|| format!("failed to create table {table_name}"))
1559}
1560
1561fn scanner_with_prefilter(
1562    dataset: &Dataset,
1563    predicate: Option<&Predicate>,
1564) -> Result<lance::dataset::scanner::Scanner> {
1565    let mut scanner = dataset.scan();
1566    scanner.prefilter(true);
1567    if let Some(predicate) = predicate {
1568        let filter = predicate.to_lance();
1569        if !filter.is_empty() {
1570            scanner.filter(&filter)?;
1571        }
1572    }
1573    Ok(scanner)
1574}
1575fn ensure_schema_matches(
1576    dataset: &Dataset,
1577    expected: &lance::deps::arrow_schema::Schema,
1578    table_name: &str,
1579) -> Result<()> {
1580    use lance::deps::arrow_schema::DataType;
1581    use std::collections::BTreeSet;
1582    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1583    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1584    let expected_names: BTreeSet<&str> = expected
1585        .fields()
1586        .iter()
1587        .map(|f| f.name().as_str())
1588        .collect();
1589    if actual_names != expected_names {
1590        anyhow::bail!(
1591            "table {table_name} has columns {actual_names:?} but this pond build expects \
1592             {expected_names:?} - the on-disk store predates a schema change; delete the \
1593             data directory and re-run `pond ingest`",
1594        );
1595    }
1596    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1597    // the on-disk vector column width) early with a friendly message. Lance
1598    // would otherwise reject the next write with an opaque schema-mismatch
1599    // error inside the `merge_update` path.
1600    for actual_field in actual.fields() {
1601        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1602            continue;
1603        };
1604        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1605            (actual_field.data_type(), expected_field.data_type())
1606            && actual_dim != expected_dim
1607        {
1608            tracing::warn!(
1609                table = table_name,
1610                column = actual_field.name(),
1611                actual_dim,
1612                expected_dim,
1613                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1614            );
1615        }
1616    }
1617    Ok(())
1618}
1619/// Object-store defaults injected for any non-local pond location. Each key
1620/// is only set when neither the user-provided key nor its env-var-form alias
1621/// is already present, so explicit overrides in `[storage]` always win.
1622/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1623/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1624/// signature is wasted work the server does not validate.
1625fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1626    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1627        if aliases
1628            .iter()
1629            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1630        {
1631            return;
1632        }
1633        options.insert(aliases[0].to_owned(), value.to_owned());
1634    }
1635    set_default(options, &["pool_idle_timeout"], "300 seconds");
1636    set_default(options, &["connect_timeout"], "10 seconds");
1637    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1638        .iter()
1639        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1640    if has_custom_endpoint {
1641        set_default(
1642            options,
1643            &["aws_unsigned_payload", "unsigned_payload"],
1644            "true",
1645        );
1646    }
1647}
1648
1649fn quoted_string(value: &str) -> String {
1650    format!("'{}'", value.replace('\'', "''"))
1651}
1652fn like_contains(value: &str) -> String {
1653    let escaped = value
1654        .replace('\\', "\\\\")
1655        .replace('%', "\\%")
1656        .replace('_', "\\_")
1657        .replace('\'', "''");
1658    format!("'%{escaped}%'")
1659}
1660
1661#[cfg(test)]
1662mod tests {
1663    use super::*;
1664    use tempfile::TempDir;
1665
1666    /// Round-trip: opening a fresh data dir through `lance-namespace`
1667    /// produces all three tables, and `Handle::scan` returns an empty batch
1668    /// for each (no spurious schema mismatch, no namespace error).
1669    #[tokio::test]
1670    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1671        let temp = TempDir::new()?;
1672        let url = Url::from_directory_path(temp.path())
1673            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1674        let handle = Handle::open(&url).await?;
1675        // Each table has its own PK column; project the canonical one so the
1676        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
1677        let cases: [(Table, &[&str]); 3] = [
1678            (Table::Sessions, &["id"]),
1679            (Table::Messages, &["id"]),
1680            (Table::Parts, &["id"]),
1681        ];
1682        for (table, projection) in cases {
1683            let scanner = handle
1684                .scan(table, ScanOpts::project_only(projection))
1685                .await?;
1686            let batch = scanner.try_into_batch().await?;
1687            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1688        }
1689        Ok(())
1690    }
1691}