Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30    collections::HashMap,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37/// Embedded-row count at which pond builds the IVF_PQ vector index on
38/// `messages.vector` (spec.md#search). Below it, vector search runs a
39/// brute-force flat scan - exact and fast at small and medium scale, and
40/// IVF_PQ cannot train well on fewer vectors anyway.
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43/// Default minimum unindexed-fragment count required before a per-intent
44/// append/rebuild step is admitted into `optimize_table_indices`. Lower
45/// values make each commit smaller and more frequent (bad on remote
46/// stores); higher values let fragments accumulate behind the brute-force
47/// fallback. 4 is the floor of the documented 4-8 band.
48pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52/// Seed the process-wide index-lag threshold from `[search].index_lag_threshold`.
53/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
54pub fn init_index_lag_threshold(value: usize) {
55    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59    INDEX_LAG_THRESHOLD_RUNTIME
60        .get()
61        .copied()
62        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65/// Declarative description of one index pond keeps on a table. Created when
66/// its trigger fires; folded forward by `pond index optimize`.
67#[derive(Debug, Clone)]
68pub struct IndexIntent {
69    /// Stable on-disk name. Must match across runs so existence checks
70    /// resolve.
71    pub name: &'static str,
72    /// Column the index covers.
73    pub column: &'static str,
74    /// Condition evaluated against the live dataset before each cycle.
75    pub trigger: IndexTrigger,
76    /// How the params are built at create time. Some intents have static
77    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
78    pub params: IndexParamsKind,
79}
80
81/// When an [`IndexIntent`] should exist on disk.
82#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84    /// Build whenever the table has any rows. Used for FTS and scalar
85    /// indices: there is no training cost worth delaying.
86    OnAnyRows,
87    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
88    /// IVF_PQ vector index, which trains poorly on too few vectors.
89    OnNonNullCount {
90        column: &'static str,
91        threshold: usize,
92    },
93}
94
95/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
96/// right `IndexParams` at create time.
97#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
100    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
101    Scalar(BuiltinIndexType),
102    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
103    /// `[min, max]` range and stemming / stop-words off
104    /// (spec.md#search-language-neutral-index).
105    InvertedFtsNgram { min: u32, max: u32 },
106    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
107    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
108    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
109    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
110    IvfPqCosine {
111        sub_vectors: usize,
112        num_bits: u8,
113        max_iters: usize,
114    },
115}
116
117impl IndexTrigger {
118    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119        match self {
120            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121            Self::OnNonNullCount { column, threshold } => {
122                let count = dataset
123                    .count_rows(Some(format!("{column} IS NOT NULL")))
124                    .await?;
125                Ok(count >= *threshold)
126            }
127        }
128    }
129}
130
131impl IndexParamsKind {
132    fn index_type(&self) -> IndexType {
133        match self {
134            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135            Self::Scalar(_) => IndexType::BTree,
136            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137            Self::IvfPqCosine { .. } => IndexType::Vector,
138        }
139    }
140
141    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142        match self {
143            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145                InvertedIndexParams::default()
146                    .base_tokenizer("ngram".to_owned())
147                    .ngram_min_length(*min)
148                    .ngram_max_length(*max)
149                    .stem(false)
150                    .remove_stop_words(false),
151            )),
152            Self::IvfPqCosine {
153                sub_vectors,
154                num_bits,
155                max_iters,
156            } => {
157                let count = dataset
158                    .count_rows(Some("vector IS NOT NULL".to_owned()))
159                    .await?;
160                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161                Ok(Box::new(VectorIndexParams::ivf_pq(
162                    partitions,
163                    *num_bits,
164                    *sub_vectors,
165                    MetricType::Cosine,
166                    *max_iters,
167                )))
168            }
169        }
170    }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175    pub table: Table,
176    pub intent_name: String,
177    pub fragments_covered: usize,
178    pub unindexed_fragments: usize,
179    pub unindexed_rows: usize,
180    pub exists: bool,
181}
182
183/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
184/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
185/// downcasts to this type to classify the outcome as `conflict` rather than
186/// the generic `storage_unavailable`.
187#[derive(Debug, Clone, Copy)]
188pub struct ConflictExhausted {
189    pub attempts: u8,
190}
191
192impl std::fmt::Display for ConflictExhausted {
193    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(
195            formatter,
196            "commit conflict exhausted after {} attempt(s)",
197            self.attempts
198        )
199    }
200}
201
202impl std::error::Error for ConflictExhausted {}
203
204/// Per-phase result for one table's pass through `Handle::optimize_table`.
205/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
206/// compaction phase get independent retry budgets and independent commits,
207/// so a hot writer that starves the Rewrite cannot abort the index Update.
208#[derive(Debug)]
209pub enum PhaseOutcome {
210    /// Phase attempted and committed work.
211    Ok,
212    /// Phase attempted; no work was needed.
213    Noop,
214    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
215    /// can rerun later once the hot writer quiesces).
216    SkippedConflict,
217    /// Phase failed with a non-conflict error.
218    Failed(anyhow::Error),
219    /// Phase not requested by the caller (e.g. compaction skipped under
220    /// `Store::build_indices_only`).
221    NotAttempted,
222}
223
224impl PhaseOutcome {
225    pub fn is_failed(&self) -> bool {
226        matches!(self, Self::Failed(_))
227    }
228}
229
230/// What `Handle::optimize_table` did for one table.
231#[derive(Debug)]
232pub struct TableOptimizeOutcome {
233    pub table: Table,
234    pub indices: PhaseOutcome,
235    pub compaction: PhaseOutcome,
236}
237
238/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
239/// progress callback to render a live spinner; library callers pass `None`.
240#[derive(Debug, Clone)]
241pub enum OptimizeEvent {
242    PhaseStart {
243        table: Table,
244        phase: OptimizePhase,
245        detail: Option<String>,
246    },
247    PhaseDone {
248        table: Table,
249        phase: OptimizePhase,
250        elapsed_ms: u64,
251    },
252}
253
254#[derive(Debug, Clone, Copy)]
255pub enum OptimizePhase {
256    Compact,
257    Cleanup,
258    IndexCreate,
259    IndexRebuild,
260    IndexAppend,
261}
262
263impl OptimizePhase {
264    pub fn label(self) -> &'static str {
265        match self {
266            Self::Compact => "compact",
267            Self::Cleanup => "cleanup",
268            Self::IndexCreate => "index-create",
269            Self::IndexRebuild => "index-rebuild",
270            Self::IndexAppend => "index-append",
271        }
272    }
273}
274
275pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
276
277fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
278    if let Some(callback) = progress {
279        callback(event);
280    }
281}
282
283/// True when the chain root is one of Lance's commit-conflict variants
284/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
285/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
286pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
287    error.downcast_ref::<lance::Error>().is_some_and(|err| {
288        matches!(
289            err,
290            lance::Error::CommitConflict { .. }
291                | lance::Error::RetryableCommitConflict { .. }
292                | lance::Error::TooMuchWriteContention { .. }
293        )
294    })
295}
296
297/// True when `retry_lance` exhausted retries against an OCC conflict and
298/// attached `ConflictExhausted` to the chain head.
299fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
300    error.chain().any(|cause| cause.is::<ConflictExhausted>())
301}
302
303/// On-disk byte totals for the three session datasets, plus everything else
304/// under the data-dir root. Sized by listing through Lance's object-store
305/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
306#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
307pub struct TableSizes {
308    pub sessions: u64,
309    pub messages: u64,
310    pub parts: u64,
311    pub other: u64,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub enum ScalarValue {
316    String(String),
317    Int32(i32),
318    Raw(String),
319}
320impl From<&str> for ScalarValue {
321    fn from(value: &str) -> Self {
322        Self::String(value.to_owned())
323    }
324}
325impl From<String> for ScalarValue {
326    fn from(value: String) -> Self {
327        Self::String(value)
328    }
329}
330impl From<i32> for ScalarValue {
331    fn from(value: i32) -> Self {
332        Self::Int32(value)
333    }
334}
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum Predicate {
337    Eq(&'static str, ScalarValue),
338    Ne(&'static str, ScalarValue),
339    IsNull(&'static str),
340    IsNotNull(&'static str),
341    In(&'static str, Vec<ScalarValue>),
342    LikeContains(&'static str, String),
343    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
344    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
345    /// so the filter is a full-scan-with-predicate - acceptable for
346    /// human-driven `--project re:...` queries, not for hot paths.
347    Regex(&'static str, String),
348    Gte(&'static str, ScalarValue),
349    Lte(&'static str, ScalarValue),
350    And(Vec<Predicate>),
351    Or(Vec<Predicate>),
352    Not(Box<Predicate>),
353}
354impl Predicate {
355    pub fn to_lance(&self) -> String {
356        match self {
357            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
358            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
359            Self::IsNull(column) => format!("{column} IS NULL"),
360            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
361            Self::In(column, values) => {
362                let values = values
363                    .iter()
364                    .map(ScalarValue::to_lance)
365                    .collect::<Vec<_>>()
366                    .join(", ");
367                format!("{column} IN ({values})")
368            }
369            Self::LikeContains(column, value) => {
370                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
371            }
372            Self::Regex(column, pattern) => {
373                format!("regexp_like({column}, {})", quoted_string(pattern))
374            }
375            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
376            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
377            Self::And(predicates) => predicates
378                .iter()
379                .map(Self::to_lance)
380                .filter(|predicate| !predicate.is_empty())
381                .collect::<Vec<_>>()
382                .join(" AND "),
383            Self::Or(predicates) => {
384                // Wrap in parens so the disjunction composes safely as a child
385                // of an outer `And` (SQL `OR` binds looser than `AND`).
386                let body = predicates
387                    .iter()
388                    .map(Self::to_lance)
389                    .filter(|predicate| !predicate.is_empty())
390                    .collect::<Vec<_>>()
391                    .join(" OR ");
392                if body.is_empty() {
393                    String::new()
394                } else {
395                    format!("({body})")
396                }
397            }
398            Self::Not(inner) => {
399                let body = inner.to_lance();
400                if body.is_empty() {
401                    String::new()
402                } else {
403                    format!("NOT ({body})")
404                }
405            }
406        }
407    }
408}
409/// Read-side options for `Handle::scan`: optional prefilter predicate and
410/// optional projection. Default = no filter, all columns.
411#[derive(Default)]
412pub struct ScanOpts<'a> {
413    pub predicate: Option<&'a Predicate>,
414    pub projection: Option<&'a [&'a str]>,
415}
416
417impl<'a> ScanOpts<'a> {
418    pub fn project_only(projection: &'a [&'a str]) -> Self {
419        Self {
420            predicate: None,
421            projection: Some(projection),
422        }
423    }
424    pub fn with_predicate_and_projection(
425        predicate: &'a Predicate,
426        projection: &'a [&'a str],
427    ) -> Self {
428        Self {
429            predicate: Some(predicate),
430            projection: Some(projection),
431        }
432    }
433}
434
435impl ScalarValue {
436    fn to_lance(&self) -> String {
437        match self {
438            Self::String(value) => quoted_string(value),
439            Self::Int32(value) => value.to_string(),
440            Self::Raw(value) => value.clone(),
441        }
442    }
443}
444/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
445/// default (local FS gets a tighter cap; object stores stay near Lance's
446/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
447#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
448pub struct RuntimeCaps {
449    pub index_cache_bytes: Option<usize>,
450    pub metadata_cache_bytes: Option<usize>,
451}
452
453impl RuntimeCaps {
454    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
455        Self {
456            index_cache_bytes: config.index_cache_bytes,
457            metadata_cache_bytes: config.metadata_cache_bytes,
458        }
459    }
460}
461
462/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
463/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
464/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
465const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
466const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
467/// Object-store defaults: latency to refill is per-page, so keep more in cache.
468const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
469const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
470
471fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
472    let (index_default, metadata_default) = if config::is_local(location) {
473        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
474    } else {
475        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
476    };
477    (
478        caps.index_cache_bytes.unwrap_or(index_default),
479        caps.metadata_cache_bytes.unwrap_or(metadata_default),
480    )
481}
482
483pub struct Handle {
484    datasets: DatasetSet,
485    retry: RetryPolicy,
486    /// One `lance::Session` shared across all three datasets. Carries the
487    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
488    /// the underlying object_store / S3 client). Sharing the session means
489    /// one cache pool covers all three tables and one S3 client serves all
490    /// three datasets - load-bearing on object-store backends where a
491    /// per-dataset client would mean 3x the connection pools and 3x the
492    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
493    #[allow(dead_code)]
494    session: Arc<Session>,
495    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
496    /// future hosted pond swaps to "rest" without touching read/write paths
497    /// (spec.md#lance-chokepoints-catalog).
498    nm: Arc<dyn LanceNamespace>,
499    /// Namespace identifier this handle binds to. v1 is always `root()`; the
500    /// typed seam matches `resolve_namespace`'s return so multi-namespace
501    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
502    nm_ident: NamespaceIdent,
503    /// Object-store options threaded through every `DatasetBuilder` and
504    /// `Dataset::write` call so refresh / index-creation paths inherit the
505    /// same credentials and region as the initial open. Empty on local-FS
506    /// installs.
507    storage_options: HashMap<String, String>,
508    /// Data-dir URL the handle was opened against. `pond status` reads this
509    /// to display where the bytes live and to decide whether to walk a local
510    /// directory or issue a remote `LIST` for sizing.
511    location: Url,
512    /// Cached `parts.lance` open metadata, used the first time a caller asks
513    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
514    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
515    parts_refresh_after: Duration,
516}
517
518impl std::fmt::Debug for Handle {
519    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520        formatter
521            .debug_struct("Handle")
522            .field("datasets", &self.datasets)
523            .field("retry", &self.retry)
524            .field("nm_ident", &self.nm_ident)
525            .field("storage_options", &self.storage_options)
526            .field("location", &self.location)
527            .finish()
528    }
529}
530
531#[derive(Debug, Clone, Copy, PartialEq, Eq)]
532pub enum Table {
533    Sessions,
534    Messages,
535    Parts,
536}
537impl Table {
538    pub fn as_str(self) -> &'static str {
539        self.label()
540    }
541
542    fn label(self) -> &'static str {
543        match self {
544            Self::Sessions => "sessions",
545            Self::Messages => "messages",
546            Self::Parts => "parts",
547        }
548    }
549}
550#[derive(Debug)]
551struct DatasetSet {
552    sessions: Mutex<CachedDataset>,
553    messages: Mutex<CachedDataset>,
554    /// `parts.lance` opens lazily on the first read or write that needs it:
555    /// any `pond_get` (every mode reads parts to build summaries), grouped
556    /// search hydrating user-hit summaries, or ingest with Part events. A
557    /// process that does none of those skips the file, saving its metadata
558    /// pages and file handle at cold-open. The OnceCell makes init
559    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
560    /// to the other two.
561    parts: OnceCell<Mutex<CachedDataset>>,
562}
563#[derive(Debug)]
564struct CachedDataset {
565    dataset: Dataset,
566    last_refresh: Instant,
567    refresh_after: Duration,
568}
569impl CachedDataset {
570    async fn latest(&mut self) -> Result<Dataset> {
571        if self.last_refresh.elapsed() >= self.refresh_after {
572            self.dataset.checkout_latest().await?;
573            self.last_refresh = Instant::now();
574        }
575        Ok(self.dataset.clone())
576    }
577    fn replace(&mut self, dataset: Dataset) {
578        self.dataset = dataset;
579        self.last_refresh = Instant::now();
580    }
581}
582impl Handle {
583    /// Open without storage options or explicit cache caps. Backend-aware
584    /// defaults from `[runtime]` apply.
585    pub async fn open(location: &Url) -> Result<Self> {
586        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
587    }
588
589    /// Open with object-store options handed through to Lance verbatim, plus
590    /// the resolved `[runtime]` cache caps. Object-store keys are the
591    /// `object_store` crate's standard config names; pond does not parse them.
592    /// Opening datasets never performs index work; index lifecycle lives under
593    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
594    pub async fn open_with_options(
595        location: &Url,
596        mut storage_options: HashMap<String, String>,
597        caps: RuntimeCaps,
598    ) -> Result<Self> {
599        if let Some(path) = config::local_path(location) {
600            tokio::fs::create_dir_all(&path)
601                .await
602                .with_context(|| format!("failed to create data dir {}", path.display()))?;
603        } else {
604            apply_remote_storage_defaults(&mut storage_options);
605        }
606        // One Session shared across all three datasets so metadata/index
607        // caches and the object_store registry (and thus any S3 client) are
608        // pooled rather than duplicated three times. Caps are sized by the
609        // `[runtime]` block; explicit values from `caps` win, otherwise the
610        // local/remote backend default kicks in.
611        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
612        let session = Arc::new(Session::new(
613            index_cache_bytes,
614            metadata_cache_bytes,
615            Arc::new(ObjectStoreRegistry::default()),
616        ));
617        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
618        // The `root` property is whatever URL the Directory impl understands;
619        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
620        // URLs, so passing the scheme-qualified URL for local FS works the
621        // same as the bare-path form. Trailing slash stripped for clean logs.
622        let root = location.as_str().trim_end_matches('/').to_string();
623        let mut connect = ConnectBuilder::new("dir")
624            .property("root", root)
625            .session(session.clone());
626        // Object-store credentials/region/endpoint flow into the namespace
627        // via the `storage.<key>` property convention (lance-namespace-impls
628        // dir.rs from_properties: lines 423-436).
629        for (key, value) in &storage_options {
630            connect = connect.property(format!("storage.{key}"), value.clone());
631        }
632        let nm: Arc<dyn LanceNamespace> = connect
633            .connect()
634            .await
635            .context("failed to connect lance Directory namespace")?;
636        let nm_ident = NamespaceIdent::root();
637        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
638        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
639        // essentially free and removes the stale-read window entirely. Object
640        // stores have real per-call cost; `5s` caps manifest fetch overhead at
641        // acceptable lag for human-driven queries.
642        let refresh_after = if config::is_local(location) {
643            Duration::ZERO
644        } else {
645            Duration::from_secs(5)
646        };
647        let handle = Self {
648            datasets: DatasetSet {
649                sessions: Mutex::new(CachedDataset {
650                    dataset: open_or_create_via_ns(
651                        &nm,
652                        &nm_ident,
653                        sessions::SESSIONS,
654                        sessions::session_schema(),
655                        &session,
656                        &storage_options,
657                    )
658                    .await?,
659                    last_refresh: Instant::now(),
660                    refresh_after,
661                }),
662                messages: Mutex::new(CachedDataset {
663                    dataset: open_or_create_via_ns(
664                        &nm,
665                        &nm_ident,
666                        sessions::MESSAGES,
667                        sessions::message_schema(),
668                        &session,
669                        &storage_options,
670                    )
671                    .await?,
672                    last_refresh: Instant::now(),
673                    refresh_after,
674                }),
675                parts: OnceCell::new(),
676            },
677            retry: RetryPolicy::default(),
678            session,
679            nm,
680            nm_ident,
681            storage_options,
682            location: location.clone(),
683            parts_refresh_after: refresh_after,
684        };
685        Ok(handle)
686    }
687
688    pub fn location(&self) -> &Url {
689        &self.location
690    }
691
692    /// Read-only view of the `storage_options` the handle was opened with.
693    /// `pond status` needs them to instantiate a raw `object_store` client
694    /// that can `LIST` the remote bucket for sizing.
695    pub fn storage_options(&self) -> &HashMap<String, String> {
696        &self.storage_options
697    }
698
699    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
700        Ok((
701            self.count_rows(Table::Sessions).await?,
702            self.count_rows(Table::Messages).await?,
703            self.count_rows(Table::Parts).await?,
704        ))
705    }
706
707    /// Insert-only merge: append new rows, never overwrite a matched PK.
708    /// Returns rows inserted. The fold lives separately under
709    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
710    pub(crate) async fn merge_insert(
711        &self,
712        table: Table,
713        batch: RecordBatch,
714        row_count: usize,
715    ) -> Result<u64> {
716        self.merge(
717            table,
718            batch,
719            row_count,
720            "merge_insert",
721            WhenMatched::DoNothing,
722            WhenNotMatched::InsertAll,
723        )
724        .await
725    }
726
727    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
728    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
729    pub(crate) async fn merge_update(
730        &self,
731        table: Table,
732        batch: RecordBatch,
733        row_count: usize,
734    ) -> Result<u64> {
735        self.merge(
736            table,
737            batch,
738            row_count,
739            "merge_update",
740            WhenMatched::UpdateAll,
741            WhenNotMatched::DoNothing,
742        )
743        .await
744    }
745
746    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
747    /// Returns the number of rows affected (inserted or updated, whichever the
748    /// behaviors produce).
749    async fn merge(
750        &self,
751        table: Table,
752        batch: RecordBatch,
753        row_count: usize,
754        op: &'static str,
755        when_matched: WhenMatched,
756        when_not_matched: WhenNotMatched,
757    ) -> Result<u64> {
758        if row_count == 0 {
759            return Ok(0);
760        }
761        let started = Instant::now();
762        let result = self
763            .retry_lance(table.label(), || async {
764                let mut cached = self.cached(table).await?.lock().await;
765                let existing = cached.latest().await?;
766                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
767                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
768                builder.when_matched(when_matched.clone());
769                builder.when_not_matched(when_not_matched.clone());
770                // pond presents each PK at most once per batch; FirstSeen keeps
771                // the first occurrence rather than failing (Lance's default).
772                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
773                // Cleanup is operator-driven via `pond index optimize`; the
774                // per-commit auto hook would add a LIST per write on remote
775                // backends without changing the steady-state retention.
776                builder.skip_auto_cleanup(true);
777                let (dataset, stats) = builder
778                    .try_build()?
779                    .execute_reader(Box::new(reader))
780                    .await?;
781                cached.replace(dataset.as_ref().clone());
782                Ok((
783                    stats.num_inserted_rows + stats.num_updated_rows,
784                    stats.num_skipped_duplicates,
785                ))
786            })
787            .await;
788        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
789        tracing::info!(
790            target: "pond::perf",
791            op,
792            table = %table.label(),
793            rows = row_count,
794            elapsed_ms = started.elapsed().as_millis() as u64,
795            skipped,
796            "merge",
797        );
798        result.map(|(affected, _)| affected)
799    }
800
801    /// Run the table-local maintenance cycle for the supplied index intents.
802    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
803    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
804    ///
805    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
806    /// commit independently and use independent retry budgets, so a hot writer
807    /// that starves compaction (Rewrite) does not abort the index build
808    /// (Update) the operator actually asked for.
809    pub async fn optimize_table(
810        &self,
811        table: Table,
812        intents: &[IndexIntent],
813        progress: Option<&OptimizeProgressFn>,
814        cleanup: crate::sessions::CleanupConfig,
815    ) -> TableOptimizeOutcome {
816        let compaction = self
817            .run_optimize_compact_phase(table, progress, cleanup)
818            .await;
819        let indices = self
820            .run_optimize_indices_phase(table, intents, progress)
821            .await;
822        TableOptimizeOutcome {
823            table,
824            indices,
825            compaction,
826        }
827    }
828
829    /// Run only the indices phase for one table. Used by `pond embed`'s tail
830    /// to fold newly written vectors into the indices without paying the
831    /// compaction retry budget while embed itself may still be writing.
832    pub async fn optimize_table_indices_only(
833        &self,
834        table: Table,
835        intents: &[IndexIntent],
836        progress: Option<&OptimizeProgressFn>,
837    ) -> PhaseOutcome {
838        self.run_optimize_indices_phase(table, intents, progress)
839            .await
840    }
841
842    async fn run_optimize_indices_phase(
843        &self,
844        table: Table,
845        intents: &[IndexIntent],
846        progress: Option<&OptimizeProgressFn>,
847    ) -> PhaseOutcome {
848        if intents.is_empty() {
849            return PhaseOutcome::Noop;
850        }
851        let result = self
852            .retry_lance(table.label(), || async {
853                let mut guard = self.cached(table).await?.lock().await;
854                let mut dataset = guard.latest().await?;
855                let did_work =
856                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
857                guard.replace(dataset);
858                Ok::<_, anyhow::Error>(did_work)
859            })
860            .await;
861        match result {
862            Ok(true) => PhaseOutcome::Ok,
863            Ok(false) => PhaseOutcome::Noop,
864            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
865            Err(error) => PhaseOutcome::Failed(error),
866        }
867    }
868
869    async fn run_optimize_compact_phase(
870        &self,
871        table: Table,
872        progress: Option<&OptimizeProgressFn>,
873        cleanup: crate::sessions::CleanupConfig,
874    ) -> PhaseOutcome {
875        let result = self
876            .retry_lance(table.label(), || async {
877                let mut guard = self.cached(table).await?.lock().await;
878                let mut dataset = guard.latest().await?;
879                optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
880                guard.replace(dataset);
881                Ok::<_, anyhow::Error>(())
882            })
883            .await;
884        match result {
885            Ok(()) => PhaseOutcome::Ok,
886            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
887            Err(error) => PhaseOutcome::Failed(error),
888        }
889    }
890
891    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
892        self.retry_lance(table.label(), || async {
893            let mut guard = self.cached(table).await?.lock().await;
894            let mut dataset = guard.latest().await?;
895            rebuild_index(&mut dataset, intent).await?;
896            guard.replace(dataset);
897            Ok(())
898        })
899        .await
900    }
901
902    pub async fn index_status(
903        &self,
904        table: Table,
905        intents: &[IndexIntent],
906    ) -> Result<Vec<IndexStatus>> {
907        let dataset = self.dataset(table).await?;
908        index_status(table, &dataset, intents).await
909    }
910
911    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
912        let mut cached = self.cached(table).await?.lock().await;
913        cached.latest().await
914    }
915    /// Build a prefiltered `Scanner` for `table`. Composable read entry
916    /// point for callers that need to layer extra builder calls
917    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
918    /// Routine scans should prefer `Handle::scan`.
919    pub(crate) async fn scanner(
920        &self,
921        table: Table,
922        predicate: Option<&Predicate>,
923    ) -> Result<lance::dataset::scanner::Scanner> {
924        let dataset = self.dataset(table).await?;
925        scanner_with_prefilter(&dataset, predicate)
926    }
927    /// Single read entry point: prefilter via `predicate`, optionally
928    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
929    pub async fn scan(
930        &self,
931        table: Table,
932        opts: ScanOpts<'_>,
933    ) -> Result<lance::dataset::scanner::Scanner> {
934        let mut scanner = self.scanner(table, opts.predicate).await?;
935        if let Some(projection) = opts.projection {
936            scanner.project(projection)?;
937        }
938        Ok(scanner)
939    }
940    pub(crate) async fn scan_batch(
941        &self,
942        table: Table,
943        predicate: Option<&Predicate>,
944        projection: &[&str],
945    ) -> Result<RecordBatch> {
946        let opts = ScanOpts {
947            predicate,
948            projection: (!projection.is_empty()).then_some(projection),
949        };
950        self.scan(table, opts)
951            .await?
952            .try_into_batch()
953            .await
954            .context("scan failed")
955    }
956    pub async fn count_rows(&self, table: Table) -> Result<usize> {
957        self.dataset(table)
958            .await?
959            .count_rows(None)
960            .await
961            .map_err(Into::into)
962    }
963    /// Names of every index on `messages` - the vector-index tests read this.
964    #[cfg(test)]
965    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
966        let dataset = self.dataset(Table::Messages).await?;
967        let indices = dataset.load_indices().await?;
968        Ok(indices.iter().map(|index| index.name.clone()).collect())
969    }
970
971    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
972    /// a missing index reports the whole table. Powers `pond index status`.
973    pub(crate) async fn unindexed_row_count(
974        &self,
975        table: Table,
976        index_name: &str,
977    ) -> Result<usize> {
978        let dataset = self.dataset(table).await?;
979        let fragments = dataset
980            .unindexed_fragments(index_name)
981            .await
982            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
983        Ok(fragments
984            .iter()
985            .map(|fragment| fragment.num_rows().unwrap_or(0))
986            .sum())
987    }
988
989    /// Drop the named index. Used by the `pond embed --force` model-swap path
990    /// to retire an IVF_PQ whose centroids belong to the old distance
991    /// space, before the next write re-bootstraps it over the new model's
992    /// vectors. Errors when the index does not exist; callers may swallow
993    /// that.
994    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
995        let mut guard = self.cached(table).await?.lock().await;
996        let mut dataset = guard.latest().await?;
997        dataset
998            .drop_index(name)
999            .await
1000            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1001        guard.replace(dataset);
1002        Ok(())
1003    }
1004
1005    /// Resolve each table's stored location through the namespace catalog
1006    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
1007    async fn table_location(&self, table_name: &str) -> Result<String> {
1008        let request = DescribeTableRequest {
1009            id: Some(self.nm_ident.as_table_id(table_name)),
1010            ..Default::default()
1011        };
1012        let response = self
1013            .nm
1014            .describe_table(request)
1015            .await
1016            .with_context(|| format!("failed to describe table {table_name}"))?;
1017        response
1018            .location
1019            .with_context(|| format!("namespace returned no location for table {table_name}"))
1020    }
1021
1022    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1023    /// Every byte is sized by listing through Lance's object store
1024    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1025    pub async fn table_sizes(&self) -> Result<TableSizes> {
1026        let registry = Arc::new(ObjectStoreRegistry::default());
1027        let params = ObjectStoreParams {
1028            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1029                Arc::new(StorageOptionsAccessor::with_static_options(
1030                    self.storage_options.clone(),
1031                ))
1032            }),
1033            ..Default::default()
1034        };
1035
1036        let sessions = self
1037            .listed_size(
1038                &registry,
1039                &params,
1040                &self.table_location(sessions::SESSIONS).await?,
1041            )
1042            .await?;
1043        let messages = self
1044            .listed_size(
1045                &registry,
1046                &params,
1047                &self.table_location(sessions::MESSAGES).await?,
1048            )
1049            .await?;
1050        let parts = self
1051            .listed_size(
1052                &registry,
1053                &params,
1054                &self.table_location(sessions::PARTS).await?,
1055            )
1056            .await?;
1057        // `other` is whatever sits under the data-dir root but not in the three
1058        // tables (config.toml, stray index temp files): root total minus them.
1059        let root_total = self
1060            .listed_size(&registry, &params, self.location.as_str())
1061            .await?;
1062        let other = root_total.saturating_sub(sessions + messages + parts);
1063        Ok(TableSizes {
1064            sessions,
1065            messages,
1066            parts,
1067            other,
1068        })
1069    }
1070
1071    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1072    async fn listed_size(
1073        &self,
1074        registry: &Arc<ObjectStoreRegistry>,
1075        params: &ObjectStoreParams,
1076        uri: &str,
1077    ) -> Result<u64> {
1078        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1079            .await
1080            .with_context(|| format!("failed to open object store for {uri}"))?;
1081        let mut listing = store.list(Some(base));
1082        let mut total = 0u64;
1083        while let Some(meta) = listing.next().await {
1084            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1085            total += meta.size;
1086        }
1087        Ok(total)
1088    }
1089    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1090        match table {
1091            Table::Sessions => Ok(&self.datasets.sessions),
1092            Table::Messages => Ok(&self.datasets.messages),
1093            Table::Parts => self.parts_cached().await,
1094        }
1095    }
1096
1097    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1098    /// `OnceCell`; once initialized, behaves identically to the other two.
1099    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1100        self.datasets
1101            .parts
1102            .get_or_try_init(|| async {
1103                let dataset = open_or_create_via_ns(
1104                    &self.nm,
1105                    &self.nm_ident,
1106                    sessions::PARTS,
1107                    sessions::part_schema(),
1108                    &self.session,
1109                    &self.storage_options,
1110                )
1111                .await?;
1112                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1113                    dataset,
1114                    last_refresh: Instant::now(),
1115                    refresh_after: self.parts_refresh_after,
1116                }))
1117            })
1118            .await
1119    }
1120    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1121    where
1122        Fut: std::future::Future<Output = Result<T>>,
1123        Op: FnMut() -> Fut,
1124    {
1125        let mut attempt = 0u8;
1126        loop {
1127            attempt = attempt.saturating_add(1);
1128            match operation().await {
1129                Ok(value) => return Ok(value),
1130                Err(error) if attempt < self.retry.attempts => {
1131                    let backoff = self.backoff(attempt);
1132                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1133                    // drops everything below the top-level message.
1134                    let error_chain = format!("{error:#}");
1135                    tracing::warn!(
1136                        label,
1137                        attempt,
1138                        ?backoff,
1139                        error = %error_chain,
1140                        "retrying Lance operation"
1141                    );
1142                    tokio::time::sleep(backoff).await;
1143                }
1144                Err(error) => {
1145                    let error_chain = format!("{error:#}");
1146                    tracing::warn!(
1147                        label,
1148                        attempt,
1149                        error = %error_chain,
1150                        "Lance operation exhausted retries"
1151                    );
1152                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1153                    // rather than the generic `storage_unavailable` bucket. The
1154                    // chain root is a `lance::Error` (commit-conflict family) when
1155                    // pond's retry layer exhausted because the manifest could not
1156                    // be advanced; everything else (timeouts, IAM, disk) stays
1157                    // `storage_unavailable`.
1158                    if is_commit_conflict(&error) {
1159                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1160                    }
1161                    return Err(error);
1162                }
1163            }
1164        }
1165    }
1166    fn backoff(&self, attempt: u8) -> Duration {
1167        let shift = u32::from(attempt.saturating_sub(1));
1168        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1169        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1170        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1171        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1172        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1173        base.mul_f64(factor).min(self.retry.max_backoff)
1174    }
1175}
1176/// Compaction phase: `compact_files` + `cleanup_old_versions`, both inside one
1177/// retry block. Distinct from the indices phase so a hot writer that loses the
1178/// Rewrite race here does not abort index work the operator actually asked for.
1179///
1180/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1181/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1182/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1183/// requires row_addrs but none were provided": `rewrite_files` skips
1184/// row_addrs when stable row ids are on, then the FRI builder demands
1185/// them. With stable_row_ids the remap step is already a no-op
1186/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1187/// !defer_index_remap`), so running without FRI is correct - we only
1188/// lose the documented concurrency-with-index-build benefit. Flip to
1189/// `true` once upstream fixes the conflict.
1190async fn optimize_table_compact(
1191    dataset: &mut Dataset,
1192    table: Table,
1193    progress: Option<&OptimizeProgressFn>,
1194    cleanup: crate::sessions::CleanupConfig,
1195) -> Result<()> {
1196    let compaction = CompactionOptions {
1197        defer_index_remap: false,
1198        ..CompactionOptions::default()
1199    };
1200
1201    emit(
1202        progress,
1203        OptimizeEvent::PhaseStart {
1204            table,
1205            phase: OptimizePhase::Compact,
1206            detail: None,
1207        },
1208    );
1209    let started = Instant::now();
1210    compact_files(dataset, compaction, None).await?;
1211    emit(
1212        progress,
1213        OptimizeEvent::PhaseDone {
1214            table,
1215            phase: OptimizePhase::Compact,
1216            elapsed_ms: started.elapsed().as_millis() as u64,
1217        },
1218    );
1219
1220    emit(
1221        progress,
1222        OptimizeEvent::PhaseStart {
1223            table,
1224            phase: OptimizePhase::Cleanup,
1225            detail: None,
1226        },
1227    );
1228    let started = Instant::now();
1229    // delete_unverified=true is the operator opt-in via `--cleanup-older-than`
1230    // / `--vacuum` that bypasses Lance's 7-day in-progress safety guard
1231    // (UNVERIFIED_THRESHOLD_DAYS in lance/dataset/cleanup.rs). Required to
1232    // reclaim files younger than 7 days; unsafe under concurrent writers.
1233    dataset
1234        .cleanup_old_versions(
1235            cleanup.older_than,
1236            Some(cleanup.delete_unverified),
1237            Some(false),
1238        )
1239        .await
1240        .context("cleanup_old_versions failed during index optimize")?;
1241    emit(
1242        progress,
1243        OptimizeEvent::PhaseDone {
1244            table,
1245            phase: OptimizePhase::Cleanup,
1246            elapsed_ms: started.elapsed().as_millis() as u64,
1247        },
1248    );
1249
1250    Ok(())
1251}
1252
1253/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1254/// for incremental families. Returns `true` if anything committed.
1255async fn optimize_table_indices(
1256    dataset: &mut Dataset,
1257    intents: &[IndexIntent],
1258    table: Table,
1259    progress: Option<&OptimizeProgressFn>,
1260) -> Result<bool> {
1261    let existing = dataset.load_indices().await?;
1262    let existing_names: std::collections::HashSet<String> =
1263        existing.iter().map(|index| index.name.clone()).collect();
1264
1265    let mut append_indices: Vec<String> = Vec::new();
1266    let mut did_work = false;
1267
1268    for intent in intents {
1269        let exists = existing_names.contains(intent.name);
1270
1271        if !exists {
1272            if !intent.trigger.should_create(dataset).await? {
1273                continue;
1274            }
1275            let params = intent.params.build(dataset).await?;
1276            let index_type = intent.params.index_type();
1277            tracing::info!(
1278                index = intent.name,
1279                column = intent.column,
1280                "creating Lance index (trigger fired)",
1281            );
1282            emit(
1283                progress,
1284                OptimizeEvent::PhaseStart {
1285                    table,
1286                    phase: OptimizePhase::IndexCreate,
1287                    detail: Some(intent.name.to_owned()),
1288                },
1289            );
1290            let started = Instant::now();
1291            dataset
1292                .create_index(
1293                    &[intent.column],
1294                    index_type,
1295                    Some(intent.name.to_owned()),
1296                    params.as_ref(),
1297                    false,
1298                )
1299                .await
1300                .with_context(|| format!("failed to create index {}", intent.name))?;
1301            emit(
1302                progress,
1303                OptimizeEvent::PhaseDone {
1304                    table,
1305                    phase: OptimizePhase::IndexCreate,
1306                    elapsed_ms: started.elapsed().as_millis() as u64,
1307                },
1308            );
1309            did_work = true;
1310            continue;
1311        }
1312
1313        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1314        if unindexed.is_empty() {
1315            continue;
1316        }
1317        // Lag guard: let fragments accumulate behind the brute-force fallback
1318        // rather than firing a commit per tiny append. Threshold is operator-
1319        // tunable via `[search].index_lag_threshold`.
1320        if unindexed.len() < index_lag_threshold() {
1321            continue;
1322        }
1323        match intent.params {
1324            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1325                let params = intent.params.build(dataset).await?;
1326                let index_type = intent.params.index_type();
1327                tracing::debug!(
1328                    target: "pond::perf",
1329                    index = intent.name,
1330                    column = intent.column,
1331                    "rebuilding Lance BTree index",
1332                );
1333                emit(
1334                    progress,
1335                    OptimizeEvent::PhaseStart {
1336                        table,
1337                        phase: OptimizePhase::IndexRebuild,
1338                        detail: Some(intent.name.to_owned()),
1339                    },
1340                );
1341                let started = Instant::now();
1342                dataset
1343                    .create_index(
1344                        &[intent.column],
1345                        index_type,
1346                        Some(intent.name.to_owned()),
1347                        params.as_ref(),
1348                        true,
1349                    )
1350                    .await
1351                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1352                emit(
1353                    progress,
1354                    OptimizeEvent::PhaseDone {
1355                        table,
1356                        phase: OptimizePhase::IndexRebuild,
1357                        elapsed_ms: started.elapsed().as_millis() as u64,
1358                    },
1359                );
1360                did_work = true;
1361            }
1362            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1363            | IndexParamsKind::InvertedFtsNgram { .. }
1364            | IndexParamsKind::IvfPqCosine { .. } => {
1365                append_indices.push(intent.name.to_owned());
1366            }
1367            IndexParamsKind::Scalar(_) => {
1368                let params = intent.params.build(dataset).await?;
1369                emit(
1370                    progress,
1371                    OptimizeEvent::PhaseStart {
1372                        table,
1373                        phase: OptimizePhase::IndexRebuild,
1374                        detail: Some(intent.name.to_owned()),
1375                    },
1376                );
1377                let started = Instant::now();
1378                dataset
1379                    .create_index(
1380                        &[intent.column],
1381                        intent.params.index_type(),
1382                        Some(intent.name.to_owned()),
1383                        params.as_ref(),
1384                        true,
1385                    )
1386                    .await
1387                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1388                emit(
1389                    progress,
1390                    OptimizeEvent::PhaseDone {
1391                        table,
1392                        phase: OptimizePhase::IndexRebuild,
1393                        elapsed_ms: started.elapsed().as_millis() as u64,
1394                    },
1395                );
1396                did_work = true;
1397            }
1398        }
1399    }
1400
1401    if !append_indices.is_empty() {
1402        let to_append = append_indices.clone();
1403        emit(
1404            progress,
1405            OptimizeEvent::PhaseStart {
1406                table,
1407                phase: OptimizePhase::IndexAppend,
1408                detail: Some(append_indices.join(", ")),
1409            },
1410        );
1411        let started = Instant::now();
1412        dataset
1413            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1414            .await
1415            .context("optimize_indices(append) failed during index optimize")?;
1416        emit(
1417            progress,
1418            OptimizeEvent::PhaseDone {
1419                table,
1420                phase: OptimizePhase::IndexAppend,
1421                elapsed_ms: started.elapsed().as_millis() as u64,
1422            },
1423        );
1424        tracing::debug!(
1425            target: "pond::perf",
1426            indices = ?append_indices,
1427            "appended trailing fragments into indices",
1428        );
1429        did_work = true;
1430    }
1431
1432    Ok(did_work)
1433}
1434
1435async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1436    if !intent.trigger.should_create(dataset).await? {
1437        return Ok(());
1438    }
1439    let params = intent.params.build(dataset).await?;
1440    dataset
1441        .create_index(
1442            &[intent.column],
1443            intent.params.index_type(),
1444            Some(intent.name.to_owned()),
1445            params.as_ref(),
1446            true,
1447        )
1448        .await
1449        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1450    Ok(())
1451}
1452
1453async fn index_status(
1454    table: Table,
1455    dataset: &Dataset,
1456    intents: &[IndexIntent],
1457) -> Result<Vec<IndexStatus>> {
1458    let existing = dataset.load_indices().await?;
1459    let existing_names: std::collections::HashSet<String> =
1460        existing.iter().map(|index| index.name.clone()).collect();
1461    let total_fragments = dataset.get_fragments().len();
1462    let total_rows = dataset.count_rows(None).await?;
1463    let mut statuses = Vec::with_capacity(intents.len());
1464    for intent in intents {
1465        let exists = existing_names.contains(intent.name);
1466        if !exists {
1467            statuses.push(IndexStatus {
1468                table,
1469                intent_name: intent.name.to_owned(),
1470                fragments_covered: 0,
1471                unindexed_fragments: total_fragments,
1472                unindexed_rows: total_rows,
1473                exists,
1474            });
1475            continue;
1476        }
1477        let unindexed = dataset
1478            .unindexed_fragments(intent.name)
1479            .await
1480            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1481        let unindexed_fragments = unindexed.len();
1482        let unindexed_rows = unindexed
1483            .iter()
1484            .map(|fragment| fragment.num_rows().unwrap_or(0))
1485            .sum();
1486        statuses.push(IndexStatus {
1487            table,
1488            intent_name: intent.name.to_owned(),
1489            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1490            unindexed_fragments,
1491            unindexed_rows,
1492            exists,
1493        });
1494    }
1495    Ok(statuses)
1496}
1497
1498/// Open the table at `table_name` via the namespace; create + initialize on
1499/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1500/// expectation so a stale data dir surfaces early.
1501///
1502/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1503/// the builder re-wraps an already-`Namespace`-wrapped error
1504/// (lance/src/dataset/builder.rs:142), so going through it would force a
1505/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1506/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1507/// namespace external-manifest commits) is not wired here; v1 ships
1508/// Directory v2 only.
1509async fn open_or_create_via_ns(
1510    nm: &Arc<dyn LanceNamespace>,
1511    nm_ident: &NamespaceIdent,
1512    table_name: &str,
1513    schema: lance::deps::arrow_schema::SchemaRef,
1514    session: &Arc<Session>,
1515    storage_options: &HashMap<String, String>,
1516) -> Result<Dataset> {
1517    let table_id = nm_ident.as_table_id(table_name);
1518
1519    let request = DescribeTableRequest {
1520        id: Some(table_id.clone()),
1521        ..Default::default()
1522    };
1523    match nm.describe_table(request).await {
1524        Ok(response) => {
1525            let location = response.location.with_context(|| {
1526                format!("namespace returned no location for table {table_name}")
1527            })?;
1528            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1529            if !storage_options.is_empty() {
1530                builder = builder.with_storage_options(storage_options.clone());
1531            }
1532            let dataset = builder
1533                .load()
1534                .await
1535                .with_context(|| format!("failed to open table {table_name}"))?;
1536            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1537            return Ok(dataset);
1538        }
1539        Err(error) => match &error {
1540            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1541                // fall through to create
1542            }
1543            _ => {
1544                return Err(anyhow::Error::from(error))
1545                    .with_context(|| format!("failed to describe table {table_name}"));
1546            }
1547        },
1548    }
1549
1550    // Create path: pond seeds an empty dataset with the canonical schema so
1551    // every subsequent open lands on a real Lance dataset, not a phantom.
1552    let mut write_params = sessions::write_params_for_create();
1553    write_params.session = Some(session.clone());
1554    write_params.mode = WriteMode::Create;
1555    if !storage_options.is_empty() {
1556        write_params.store_params = Some(ObjectStoreParams {
1557            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1558                storage_options.clone(),
1559            ))),
1560            ..Default::default()
1561        });
1562    }
1563    let reader = sessions::empty_reader(schema)?;
1564    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1565        .await
1566        .with_context(|| format!("failed to create table {table_name}"))
1567}
1568
1569// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
1570// before the underlying `NamespaceError`; walk the whole `.source()` chain
1571// rather than only matching the outer variant.
1572fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1573    if !matches!(error, lance::Error::Namespace { .. }) {
1574        return false;
1575    }
1576    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1577        link.source()
1578    })
1579    .filter_map(|link| link.downcast_ref::<NamespaceError>())
1580    .any(|inner| inner.code() == code)
1581}
1582
1583fn scanner_with_prefilter(
1584    dataset: &Dataset,
1585    predicate: Option<&Predicate>,
1586) -> Result<lance::dataset::scanner::Scanner> {
1587    let mut scanner = dataset.scan();
1588    scanner.prefilter(true);
1589    if let Some(predicate) = predicate {
1590        let filter = predicate.to_lance();
1591        if !filter.is_empty() {
1592            scanner.filter(&filter)?;
1593        }
1594    }
1595    Ok(scanner)
1596}
1597fn ensure_schema_matches(
1598    dataset: &Dataset,
1599    expected: &lance::deps::arrow_schema::Schema,
1600    table_name: &str,
1601) -> Result<()> {
1602    use lance::deps::arrow_schema::DataType;
1603    use std::collections::BTreeSet;
1604    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1605    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1606    let expected_names: BTreeSet<&str> = expected
1607        .fields()
1608        .iter()
1609        .map(|f| f.name().as_str())
1610        .collect();
1611    if actual_names != expected_names {
1612        anyhow::bail!(
1613            "table {table_name} has columns {actual_names:?} but this pond build expects \
1614             {expected_names:?} - the on-disk store predates a schema change; delete the \
1615             data directory and re-run `pond ingest`",
1616        );
1617    }
1618    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1619    // the on-disk vector column width) early with a friendly message. Lance
1620    // would otherwise reject the next write with an opaque schema-mismatch
1621    // error inside the `merge_update` path.
1622    for actual_field in actual.fields() {
1623        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1624            continue;
1625        };
1626        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1627            (actual_field.data_type(), expected_field.data_type())
1628            && actual_dim != expected_dim
1629        {
1630            tracing::warn!(
1631                table = table_name,
1632                column = actual_field.name(),
1633                actual_dim,
1634                expected_dim,
1635                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1636            );
1637        }
1638    }
1639    Ok(())
1640}
1641/// Object-store defaults injected for any non-local pond location. Each key
1642/// is only set when neither the user-provided key nor its env-var-form alias
1643/// is already present, so explicit overrides in `[storage]` always win.
1644/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1645/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1646/// signature is wasted work the server does not validate.
1647fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1648    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1649        if aliases
1650            .iter()
1651            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1652        {
1653            return;
1654        }
1655        options.insert(aliases[0].to_owned(), value.to_owned());
1656    }
1657    set_default(options, &["pool_idle_timeout"], "300 seconds");
1658    set_default(options, &["connect_timeout"], "10 seconds");
1659    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1660        .iter()
1661        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1662    if has_custom_endpoint {
1663        set_default(
1664            options,
1665            &["aws_unsigned_payload", "unsigned_payload"],
1666            "true",
1667        );
1668    }
1669}
1670
1671fn quoted_string(value: &str) -> String {
1672    format!("'{}'", value.replace('\'', "''"))
1673}
1674fn like_contains(value: &str) -> String {
1675    let escaped = value
1676        .replace('\\', "\\\\")
1677        .replace('%', "\\%")
1678        .replace('_', "\\_")
1679        .replace('\'', "''");
1680    format!("'%{escaped}%'")
1681}
1682
1683#[cfg(test)]
1684mod tests {
1685    use super::*;
1686    use tempfile::TempDir;
1687
1688    #[test]
1689    fn namespace_error_code_walks_wrapped_chain() {
1690        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1691            message: "missing".into(),
1692        }));
1693        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1694
1695        let wrapped = lance::Error::namespace_source(Box::new(direct));
1696        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1697
1698        let other_code =
1699            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1700                message: "nope".into(),
1701            }));
1702        assert!(!is_namespace_error_code(
1703            &other_code,
1704            ErrorCode::TableNotFound
1705        ));
1706
1707        let not_namespace = lance::Error::internal("unrelated");
1708        assert!(!is_namespace_error_code(
1709            &not_namespace,
1710            ErrorCode::TableNotFound
1711        ));
1712    }
1713
1714    /// Round-trip: opening a fresh data dir through `lance-namespace`
1715    /// produces all three tables, and `Handle::scan` returns an empty batch
1716    /// for each (no spurious schema mismatch, no namespace error).
1717    #[tokio::test]
1718    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1719        let temp = TempDir::new()?;
1720        let url = Url::from_directory_path(temp.path())
1721            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1722        let handle = Handle::open(&url).await?;
1723        // Each table has its own PK column; project the canonical one so the
1724        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
1725        let cases: [(Table, &[&str]); 3] = [
1726            (Table::Sessions, &["id"]),
1727            (Table::Messages, &["id"]),
1728            (Table::Parts, &["id"]),
1729        ];
1730        for (table, projection) in cases {
1731            let scanner = handle
1732                .scan(table, ScanOpts::project_only(projection))
1733                .await?;
1734            let batch = scanner.try_into_batch().await?;
1735            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1736        }
1737        Ok(())
1738    }
1739}