Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::optimize::{CompactionOptions, compact_files};
11use lance::dataset::write::merge_insert::SourceDedupeBehavior;
12use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
13use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
14use lance::index::DatasetIndexExt;
15use lance::index::DatasetIndexInternalExt;
16use lance::index::vector::VectorIndexParams;
17use lance::session::Session;
18use lance_index::IndexType;
19use lance_index::optimize::OptimizeOptions;
20use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
21use lance_io::object_store::{
22    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
23};
24use lance_linalg::distance::MetricType;
25use lance_namespace::LanceNamespace;
26use lance_namespace::error::{ErrorCode, NamespaceError};
27use lance_namespace::models::DescribeTableRequest;
28use lance_namespace_impls::ConnectBuilder;
29use std::{
30    collections::HashMap,
31    sync::Arc,
32    time::{Duration, Instant},
33};
34use tokio::sync::{Mutex, OnceCell};
35use tokio_stream::StreamExt;
36use url::Url;
37/// Embedded-row count at which pond builds the IVF_PQ vector index on
38/// `messages.vector` (spec.md#search). Below it, vector search runs a
39/// brute-force flat scan - exact and fast at small and medium scale, and
40/// IVF_PQ cannot train well on fewer vectors anyway.
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
42
43/// Default minimum unindexed-fragment count required before a per-intent
44/// append/rebuild step is admitted into `optimize_table_indices`. Lower
45/// values make each commit smaller and more frequent (bad on remote
46/// stores); higher values let fragments accumulate behind the brute-force
47/// fallback. 4 is the floor of the documented 4-8 band.
48pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
49
50static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
51
52/// Seed the process-wide index-lag threshold from `[search].index_lag_threshold`.
53/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
54pub fn init_index_lag_threshold(value: usize) {
55    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
56}
57
58pub fn index_lag_threshold() -> usize {
59    INDEX_LAG_THRESHOLD_RUNTIME
60        .get()
61        .copied()
62        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
63}
64
65/// Declarative description of one index pond keeps on a table. Created when
66/// its trigger fires; folded forward by `pond index optimize`.
67#[derive(Debug, Clone)]
68pub struct IndexIntent {
69    /// Stable on-disk name. Must match across runs so existence checks
70    /// resolve.
71    pub name: &'static str,
72    /// Column the index covers.
73    pub column: &'static str,
74    /// Condition evaluated against the live dataset before each cycle.
75    pub trigger: IndexTrigger,
76    /// How the params are built at create time. Some intents have static
77    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
78    pub params: IndexParamsKind,
79}
80
81/// When an [`IndexIntent`] should exist on disk.
82#[derive(Debug, Clone)]
83pub enum IndexTrigger {
84    /// Build whenever the table has any rows. Used for FTS and scalar
85    /// indices: there is no training cost worth delaying.
86    OnAnyRows,
87    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
88    /// IVF_PQ vector index, which trains poorly on too few vectors.
89    OnNonNullCount {
90        column: &'static str,
91        threshold: usize,
92    },
93}
94
95/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
96/// right `IndexParams` at create time.
97#[derive(Debug, Clone)]
98pub enum IndexParamsKind {
99    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
100    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
101    Scalar(BuiltinIndexType),
102    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
103    /// `[min, max]` range and stemming / stop-words off
104    /// (spec.md#search-language-neutral-index).
105    InvertedFtsNgram { min: u32, max: u32 },
106    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
107    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
108    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
109    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
110    IvfPqCosine {
111        sub_vectors: usize,
112        num_bits: u8,
113        max_iters: usize,
114    },
115}
116
117impl IndexTrigger {
118    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
119        match self {
120            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
121            Self::OnNonNullCount { column, threshold } => {
122                let count = dataset
123                    .count_rows(Some(format!("{column} IS NOT NULL")))
124                    .await?;
125                Ok(count >= *threshold)
126            }
127        }
128    }
129}
130
131impl IndexParamsKind {
132    fn index_type(&self) -> IndexType {
133        match self {
134            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
135            Self::Scalar(_) => IndexType::BTree,
136            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
137            Self::IvfPqCosine { .. } => IndexType::Vector,
138        }
139    }
140
141    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
142        match self {
143            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
144            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
145                InvertedIndexParams::default()
146                    .base_tokenizer("ngram".to_owned())
147                    .ngram_min_length(*min)
148                    .ngram_max_length(*max)
149                    .stem(false)
150                    .remove_stop_words(false),
151            )),
152            Self::IvfPqCosine {
153                sub_vectors,
154                num_bits,
155                max_iters,
156            } => {
157                let count = dataset
158                    .count_rows(Some("vector IS NOT NULL".to_owned()))
159                    .await?;
160                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
161                Ok(Box::new(VectorIndexParams::ivf_pq(
162                    partitions,
163                    *num_bits,
164                    *sub_vectors,
165                    MetricType::Cosine,
166                    *max_iters,
167                )))
168            }
169        }
170    }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct IndexStatus {
175    pub table: Table,
176    pub intent_name: String,
177    pub fragments_covered: usize,
178    pub unindexed_fragments: usize,
179    pub unindexed_rows: usize,
180    pub exists: bool,
181}
182
183/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
184/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
185/// downcasts to this type to classify the outcome as `conflict` rather than
186/// the generic `storage_unavailable`.
187#[derive(Debug, Clone, Copy)]
188pub struct ConflictExhausted {
189    pub attempts: u8,
190}
191
192impl std::fmt::Display for ConflictExhausted {
193    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        write!(
195            formatter,
196            "commit conflict exhausted after {} attempt(s)",
197            self.attempts
198        )
199    }
200}
201
202impl std::error::Error for ConflictExhausted {}
203
204/// Per-phase result for one table's pass through `Handle::optimize_table`.
205/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
206/// compaction phase get independent retry budgets and independent commits,
207/// so a hot writer that starves the Rewrite cannot abort the index Update.
208#[derive(Debug)]
209pub enum PhaseOutcome {
210    /// Phase attempted and committed work.
211    Ok,
212    /// Phase attempted; no work was needed.
213    Noop,
214    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
215    /// can rerun later once the hot writer quiesces).
216    SkippedConflict,
217    /// Phase failed with a non-conflict error.
218    Failed(anyhow::Error),
219    /// Phase not requested by the caller (e.g. compaction skipped under
220    /// `Store::build_indices_only`).
221    NotAttempted,
222}
223
224impl PhaseOutcome {
225    pub fn is_failed(&self) -> bool {
226        matches!(self, Self::Failed(_))
227    }
228}
229
230/// What `Handle::optimize_table` did for one table.
231#[derive(Debug)]
232pub struct TableOptimizeOutcome {
233    pub table: Table,
234    pub indices: PhaseOutcome,
235    pub compaction: PhaseOutcome,
236}
237
238/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
239/// progress callback to render a live spinner; library callers pass `None`.
240#[derive(Debug, Clone)]
241pub enum OptimizeEvent {
242    PhaseStart {
243        table: Table,
244        phase: OptimizePhase,
245        detail: Option<String>,
246    },
247    PhaseDone {
248        table: Table,
249        phase: OptimizePhase,
250        elapsed_ms: u64,
251    },
252}
253
254#[derive(Debug, Clone, Copy)]
255pub enum OptimizePhase {
256    Compact,
257    Cleanup,
258    IndexCreate,
259    IndexRebuild,
260    IndexAppend,
261}
262
263impl OptimizePhase {
264    pub fn label(self) -> &'static str {
265        match self {
266            Self::Compact => "compact",
267            Self::Cleanup => "cleanup",
268            Self::IndexCreate => "index-create",
269            Self::IndexRebuild => "index-rebuild",
270            Self::IndexAppend => "index-append",
271        }
272    }
273}
274
275pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
276
277fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
278    if let Some(callback) = progress {
279        callback(event);
280    }
281}
282
283/// True when the chain root is one of Lance's commit-conflict variants
284/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
285/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
286pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
287    error.downcast_ref::<lance::Error>().is_some_and(|err| {
288        matches!(
289            err,
290            lance::Error::CommitConflict { .. }
291                | lance::Error::RetryableCommitConflict { .. }
292                | lance::Error::TooMuchWriteContention { .. }
293        )
294    })
295}
296
297/// True when `retry_lance` exhausted retries against an OCC conflict and
298/// attached `ConflictExhausted` to the chain head.
299fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
300    error.chain().any(|cause| cause.is::<ConflictExhausted>())
301}
302
303/// On-disk byte totals for the three session datasets, plus everything else
304/// under the data-dir root. Sized by listing through Lance's object-store
305/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
306#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
307pub struct TableSizes {
308    pub sessions: u64,
309    pub messages: u64,
310    pub parts: u64,
311    pub other: u64,
312}
313
314#[derive(Debug, Clone, PartialEq, Eq)]
315pub enum ScalarValue {
316    String(String),
317    Int32(i32),
318    Raw(String),
319}
320impl From<&str> for ScalarValue {
321    fn from(value: &str) -> Self {
322        Self::String(value.to_owned())
323    }
324}
325impl From<String> for ScalarValue {
326    fn from(value: String) -> Self {
327        Self::String(value)
328    }
329}
330impl From<i32> for ScalarValue {
331    fn from(value: i32) -> Self {
332        Self::Int32(value)
333    }
334}
335#[derive(Debug, Clone, PartialEq, Eq)]
336pub enum Predicate {
337    Eq(&'static str, ScalarValue),
338    Ne(&'static str, ScalarValue),
339    IsNull(&'static str),
340    IsNotNull(&'static str),
341    In(&'static str, Vec<ScalarValue>),
342    LikeContains(&'static str, String),
343    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
344    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
345    /// so the filter is a full-scan-with-predicate - acceptable for
346    /// human-driven `--project re:...` queries, not for hot paths.
347    Regex(&'static str, String),
348    Gte(&'static str, ScalarValue),
349    Lte(&'static str, ScalarValue),
350    And(Vec<Predicate>),
351    Or(Vec<Predicate>),
352}
353impl Predicate {
354    pub fn to_lance(&self) -> String {
355        match self {
356            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
357            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
358            Self::IsNull(column) => format!("{column} IS NULL"),
359            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
360            Self::In(column, values) => {
361                let values = values
362                    .iter()
363                    .map(ScalarValue::to_lance)
364                    .collect::<Vec<_>>()
365                    .join(", ");
366                format!("{column} IN ({values})")
367            }
368            Self::LikeContains(column, value) => {
369                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
370            }
371            Self::Regex(column, pattern) => {
372                format!("regexp_like({column}, {})", quoted_string(pattern))
373            }
374            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
375            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
376            Self::And(predicates) => predicates
377                .iter()
378                .map(Self::to_lance)
379                .filter(|predicate| !predicate.is_empty())
380                .collect::<Vec<_>>()
381                .join(" AND "),
382            Self::Or(predicates) => {
383                // Wrap in parens so the disjunction composes safely as a child
384                // of an outer `And` (SQL `OR` binds looser than `AND`).
385                let body = predicates
386                    .iter()
387                    .map(Self::to_lance)
388                    .filter(|predicate| !predicate.is_empty())
389                    .collect::<Vec<_>>()
390                    .join(" OR ");
391                if body.is_empty() {
392                    String::new()
393                } else {
394                    format!("({body})")
395                }
396            }
397        }
398    }
399}
400/// Read-side options for `Handle::scan`: optional prefilter predicate and
401/// optional projection. Default = no filter, all columns.
402#[derive(Default)]
403pub struct ScanOpts<'a> {
404    pub predicate: Option<&'a Predicate>,
405    pub projection: Option<&'a [&'a str]>,
406}
407
408impl<'a> ScanOpts<'a> {
409    pub fn project_only(projection: &'a [&'a str]) -> Self {
410        Self {
411            predicate: None,
412            projection: Some(projection),
413        }
414    }
415    pub fn with_predicate_and_projection(
416        predicate: &'a Predicate,
417        projection: &'a [&'a str],
418    ) -> Self {
419        Self {
420            predicate: Some(predicate),
421            projection: Some(projection),
422        }
423    }
424}
425
426impl ScalarValue {
427    fn to_lance(&self) -> String {
428        match self {
429            Self::String(value) => quoted_string(value),
430            Self::Int32(value) => value.to_string(),
431            Self::Raw(value) => value.clone(),
432        }
433    }
434}
435/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
436/// default (local FS gets a tighter cap; object stores stay near Lance's
437/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
438#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
439pub struct RuntimeCaps {
440    pub index_cache_bytes: Option<usize>,
441    pub metadata_cache_bytes: Option<usize>,
442}
443
444impl RuntimeCaps {
445    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
446        Self {
447            index_cache_bytes: config.index_cache_bytes,
448            metadata_cache_bytes: config.metadata_cache_bytes,
449        }
450    }
451}
452
453/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
454/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
455/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
456const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
457const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
458/// Object-store defaults: latency to refill is per-page, so keep more in cache.
459const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
460const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
461
462fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
463    let (index_default, metadata_default) = if config::is_local(location) {
464        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
465    } else {
466        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
467    };
468    (
469        caps.index_cache_bytes.unwrap_or(index_default),
470        caps.metadata_cache_bytes.unwrap_or(metadata_default),
471    )
472}
473
474pub struct Handle {
475    datasets: DatasetSet,
476    retry: RetryPolicy,
477    /// One `lance::Session` shared across all three datasets. Carries the
478    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
479    /// the underlying object_store / S3 client). Sharing the session means
480    /// one cache pool covers all three tables and one S3 client serves all
481    /// three datasets - load-bearing on object-store backends where a
482    /// per-dataset client would mean 3x the connection pools and 3x the
483    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
484    #[allow(dead_code)]
485    session: Arc<Session>,
486    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
487    /// future hosted pond swaps to "rest" without touching read/write paths
488    /// (spec.md#lance-chokepoints-catalog).
489    nm: Arc<dyn LanceNamespace>,
490    /// Namespace identifier this handle binds to. v1 is always `root()`; the
491    /// typed seam matches `resolve_namespace`'s return so multi-namespace
492    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
493    nm_ident: NamespaceIdent,
494    /// Object-store options threaded through every `DatasetBuilder` and
495    /// `Dataset::write` call so refresh / index-creation paths inherit the
496    /// same credentials and region as the initial open. Empty on local-FS
497    /// installs.
498    storage_options: HashMap<String, String>,
499    /// Data-dir URL the handle was opened against. `pond status` reads this
500    /// to display where the bytes live and to decide whether to walk a local
501    /// directory or issue a remote `LIST` for sizing.
502    location: Url,
503    /// Cached `parts.lance` open metadata, used the first time a caller asks
504    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
505    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
506    parts_refresh_after: Duration,
507}
508
509impl std::fmt::Debug for Handle {
510    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        formatter
512            .debug_struct("Handle")
513            .field("datasets", &self.datasets)
514            .field("retry", &self.retry)
515            .field("nm_ident", &self.nm_ident)
516            .field("storage_options", &self.storage_options)
517            .field("location", &self.location)
518            .finish()
519    }
520}
521
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub enum Table {
524    Sessions,
525    Messages,
526    Parts,
527}
528impl Table {
529    pub fn as_str(self) -> &'static str {
530        self.label()
531    }
532
533    fn label(self) -> &'static str {
534        match self {
535            Self::Sessions => "sessions",
536            Self::Messages => "messages",
537            Self::Parts => "parts",
538        }
539    }
540}
541#[derive(Debug)]
542struct DatasetSet {
543    sessions: Mutex<CachedDataset>,
544    messages: Mutex<CachedDataset>,
545    /// `parts.lance` opens lazily on the first read or write that needs it:
546    /// any `pond_get` (every mode reads parts to build summaries), grouped
547    /// search hydrating user-hit summaries, or ingest with Part events. A
548    /// process that does none of those skips the file, saving its metadata
549    /// pages and file handle at cold-open. The OnceCell makes init
550    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
551    /// to the other two.
552    parts: OnceCell<Mutex<CachedDataset>>,
553}
554#[derive(Debug)]
555struct CachedDataset {
556    dataset: Dataset,
557    last_refresh: Instant,
558    refresh_after: Duration,
559}
560impl CachedDataset {
561    async fn latest(&mut self) -> Result<Dataset> {
562        if self.last_refresh.elapsed() >= self.refresh_after {
563            self.dataset.checkout_latest().await?;
564            self.last_refresh = Instant::now();
565        }
566        Ok(self.dataset.clone())
567    }
568    fn replace(&mut self, dataset: Dataset) {
569        self.dataset = dataset;
570        self.last_refresh = Instant::now();
571    }
572}
573impl Handle {
574    /// Open without storage options or explicit cache caps. Backend-aware
575    /// defaults from `[runtime]` apply.
576    pub async fn open(location: &Url) -> Result<Self> {
577        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
578    }
579
580    /// Open with object-store options handed through to Lance verbatim, plus
581    /// the resolved `[runtime]` cache caps. Object-store keys are the
582    /// `object_store` crate's standard config names; pond does not parse them.
583    /// Opening datasets never performs index work; index lifecycle lives under
584    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
585    pub async fn open_with_options(
586        location: &Url,
587        mut storage_options: HashMap<String, String>,
588        caps: RuntimeCaps,
589    ) -> Result<Self> {
590        if let Some(path) = config::local_path(location) {
591            tokio::fs::create_dir_all(&path)
592                .await
593                .with_context(|| format!("failed to create data dir {}", path.display()))?;
594        } else {
595            apply_remote_storage_defaults(&mut storage_options);
596        }
597        // One Session shared across all three datasets so metadata/index
598        // caches and the object_store registry (and thus any S3 client) are
599        // pooled rather than duplicated three times. Caps are sized by the
600        // `[runtime]` block; explicit values from `caps` win, otherwise the
601        // local/remote backend default kicks in.
602        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
603        let session = Arc::new(Session::new(
604            index_cache_bytes,
605            metadata_cache_bytes,
606            Arc::new(ObjectStoreRegistry::default()),
607        ));
608        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
609        // The `root` property is whatever URL the Directory impl understands;
610        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
611        // URLs, so passing the scheme-qualified URL for local FS works the
612        // same as the bare-path form. Trailing slash stripped for clean logs.
613        let root = location.as_str().trim_end_matches('/').to_string();
614        let mut connect = ConnectBuilder::new("dir")
615            .property("root", root)
616            .session(session.clone());
617        // Object-store credentials/region/endpoint flow into the namespace
618        // via the `storage.<key>` property convention (lance-namespace-impls
619        // dir.rs from_properties: lines 423-436).
620        for (key, value) in &storage_options {
621            connect = connect.property(format!("storage.{key}"), value.clone());
622        }
623        let nm: Arc<dyn LanceNamespace> = connect
624            .connect()
625            .await
626            .context("failed to connect lance Directory namespace")?;
627        let nm_ident = NamespaceIdent::root();
628        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
629        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
630        // essentially free and removes the stale-read window entirely. Object
631        // stores have real per-call cost; `5s` caps manifest fetch overhead at
632        // acceptable lag for human-driven queries.
633        let refresh_after = if config::is_local(location) {
634            Duration::ZERO
635        } else {
636            Duration::from_secs(5)
637        };
638        let handle = Self {
639            datasets: DatasetSet {
640                sessions: Mutex::new(CachedDataset {
641                    dataset: open_or_create_via_ns(
642                        &nm,
643                        &nm_ident,
644                        sessions::SESSIONS,
645                        sessions::session_schema(),
646                        &session,
647                        &storage_options,
648                    )
649                    .await?,
650                    last_refresh: Instant::now(),
651                    refresh_after,
652                }),
653                messages: Mutex::new(CachedDataset {
654                    dataset: open_or_create_via_ns(
655                        &nm,
656                        &nm_ident,
657                        sessions::MESSAGES,
658                        sessions::message_schema(),
659                        &session,
660                        &storage_options,
661                    )
662                    .await?,
663                    last_refresh: Instant::now(),
664                    refresh_after,
665                }),
666                parts: OnceCell::new(),
667            },
668            retry: RetryPolicy::default(),
669            session,
670            nm,
671            nm_ident,
672            storage_options,
673            location: location.clone(),
674            parts_refresh_after: refresh_after,
675        };
676        Ok(handle)
677    }
678
679    pub fn location(&self) -> &Url {
680        &self.location
681    }
682
683    /// Read-only view of the `storage_options` the handle was opened with.
684    /// `pond status` needs them to instantiate a raw `object_store` client
685    /// that can `LIST` the remote bucket for sizing.
686    pub fn storage_options(&self) -> &HashMap<String, String> {
687        &self.storage_options
688    }
689
690    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
691        Ok((
692            self.count_rows(Table::Sessions).await?,
693            self.count_rows(Table::Messages).await?,
694            self.count_rows(Table::Parts).await?,
695        ))
696    }
697
698    /// Insert-only merge: append new rows, never overwrite a matched PK.
699    /// Returns rows inserted. The fold lives separately under
700    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
701    pub(crate) async fn merge_insert(
702        &self,
703        table: Table,
704        batch: RecordBatch,
705        row_count: usize,
706    ) -> Result<u64> {
707        self.merge(
708            table,
709            batch,
710            row_count,
711            "merge_insert",
712            WhenMatched::DoNothing,
713            WhenNotMatched::InsertAll,
714        )
715        .await
716    }
717
718    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
719    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
720    pub(crate) async fn merge_update(
721        &self,
722        table: Table,
723        batch: RecordBatch,
724        row_count: usize,
725    ) -> Result<u64> {
726        self.merge(
727            table,
728            batch,
729            row_count,
730            "merge_update",
731            WhenMatched::UpdateAll,
732            WhenNotMatched::DoNothing,
733        )
734        .await
735    }
736
737    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
738    /// Returns the number of rows affected (inserted or updated, whichever the
739    /// behaviors produce).
740    async fn merge(
741        &self,
742        table: Table,
743        batch: RecordBatch,
744        row_count: usize,
745        op: &'static str,
746        when_matched: WhenMatched,
747        when_not_matched: WhenNotMatched,
748    ) -> Result<u64> {
749        if row_count == 0 {
750            return Ok(0);
751        }
752        let started = Instant::now();
753        let result = self
754            .retry_lance(table.label(), || async {
755                let mut cached = self.cached(table).await?.lock().await;
756                let existing = cached.latest().await?;
757                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
758                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
759                builder.when_matched(when_matched.clone());
760                builder.when_not_matched(when_not_matched.clone());
761                // pond presents each PK at most once per batch; FirstSeen keeps
762                // the first occurrence rather than failing (Lance's default).
763                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
764                // Cleanup is operator-driven via `pond index optimize`; the
765                // per-commit auto hook would add a LIST per write on remote
766                // backends without changing the steady-state retention.
767                builder.skip_auto_cleanup(true);
768                let (dataset, stats) = builder
769                    .try_build()?
770                    .execute_reader(Box::new(reader))
771                    .await?;
772                cached.replace(dataset.as_ref().clone());
773                Ok((
774                    stats.num_inserted_rows + stats.num_updated_rows,
775                    stats.num_skipped_duplicates,
776                ))
777            })
778            .await;
779        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
780        tracing::info!(
781            target: "pond::perf",
782            op,
783            table = %table.label(),
784            rows = row_count,
785            elapsed_ms = started.elapsed().as_millis() as u64,
786            skipped,
787            "merge",
788        );
789        result.map(|(affected, _)| affected)
790    }
791
792    /// Run the table-local maintenance cycle for the supplied index intents.
793    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
794    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
795    ///
796    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
797    /// commit independently and use independent retry budgets, so a hot writer
798    /// that starves compaction (Rewrite) does not abort the index build
799    /// (Update) the operator actually asked for.
800    pub async fn optimize_table(
801        &self,
802        table: Table,
803        intents: &[IndexIntent],
804        progress: Option<&OptimizeProgressFn>,
805        cleanup: crate::sessions::CleanupConfig,
806    ) -> TableOptimizeOutcome {
807        let compaction = self
808            .run_optimize_compact_phase(table, progress, cleanup)
809            .await;
810        let indices = self
811            .run_optimize_indices_phase(table, intents, progress)
812            .await;
813        TableOptimizeOutcome {
814            table,
815            indices,
816            compaction,
817        }
818    }
819
820    /// Run only the indices phase for one table. Used by `pond embed`'s tail
821    /// to fold newly written vectors into the indices without paying the
822    /// compaction retry budget while embed itself may still be writing.
823    pub async fn optimize_table_indices_only(
824        &self,
825        table: Table,
826        intents: &[IndexIntent],
827        progress: Option<&OptimizeProgressFn>,
828    ) -> PhaseOutcome {
829        self.run_optimize_indices_phase(table, intents, progress)
830            .await
831    }
832
833    async fn run_optimize_indices_phase(
834        &self,
835        table: Table,
836        intents: &[IndexIntent],
837        progress: Option<&OptimizeProgressFn>,
838    ) -> PhaseOutcome {
839        if intents.is_empty() {
840            return PhaseOutcome::Noop;
841        }
842        let result = self
843            .retry_lance(table.label(), || async {
844                let mut guard = self.cached(table).await?.lock().await;
845                let mut dataset = guard.latest().await?;
846                let did_work =
847                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
848                guard.replace(dataset);
849                Ok::<_, anyhow::Error>(did_work)
850            })
851            .await;
852        match result {
853            Ok(true) => PhaseOutcome::Ok,
854            Ok(false) => PhaseOutcome::Noop,
855            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
856            Err(error) => PhaseOutcome::Failed(error),
857        }
858    }
859
860    async fn run_optimize_compact_phase(
861        &self,
862        table: Table,
863        progress: Option<&OptimizeProgressFn>,
864        cleanup: crate::sessions::CleanupConfig,
865    ) -> PhaseOutcome {
866        let result = self
867            .retry_lance(table.label(), || async {
868                let mut guard = self.cached(table).await?.lock().await;
869                let mut dataset = guard.latest().await?;
870                optimize_table_compact(&mut dataset, table, progress, cleanup).await?;
871                guard.replace(dataset);
872                Ok::<_, anyhow::Error>(())
873            })
874            .await;
875        match result {
876            Ok(()) => PhaseOutcome::Ok,
877            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
878            Err(error) => PhaseOutcome::Failed(error),
879        }
880    }
881
882    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
883        self.retry_lance(table.label(), || async {
884            let mut guard = self.cached(table).await?.lock().await;
885            let mut dataset = guard.latest().await?;
886            rebuild_index(&mut dataset, intent).await?;
887            guard.replace(dataset);
888            Ok(())
889        })
890        .await
891    }
892
893    pub async fn index_status(
894        &self,
895        table: Table,
896        intents: &[IndexIntent],
897    ) -> Result<Vec<IndexStatus>> {
898        let dataset = self.dataset(table).await?;
899        index_status(table, &dataset, intents).await
900    }
901
902    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
903        let mut cached = self.cached(table).await?.lock().await;
904        cached.latest().await
905    }
906    /// Build a prefiltered `Scanner` for `table`. Composable read entry
907    /// point for callers that need to layer extra builder calls
908    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
909    /// Routine scans should prefer `Handle::scan`.
910    pub(crate) async fn scanner(
911        &self,
912        table: Table,
913        predicate: Option<&Predicate>,
914    ) -> Result<lance::dataset::scanner::Scanner> {
915        let dataset = self.dataset(table).await?;
916        scanner_with_prefilter(&dataset, predicate)
917    }
918    /// Single read entry point: prefilter via `predicate`, optionally
919    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
920    pub async fn scan(
921        &self,
922        table: Table,
923        opts: ScanOpts<'_>,
924    ) -> Result<lance::dataset::scanner::Scanner> {
925        let mut scanner = self.scanner(table, opts.predicate).await?;
926        if let Some(projection) = opts.projection {
927            scanner.project(projection)?;
928        }
929        Ok(scanner)
930    }
931    pub(crate) async fn scan_batch(
932        &self,
933        table: Table,
934        predicate: Option<&Predicate>,
935        projection: &[&str],
936    ) -> Result<RecordBatch> {
937        let opts = ScanOpts {
938            predicate,
939            projection: (!projection.is_empty()).then_some(projection),
940        };
941        self.scan(table, opts)
942            .await?
943            .try_into_batch()
944            .await
945            .context("scan failed")
946    }
947    pub async fn count_rows(&self, table: Table) -> Result<usize> {
948        self.dataset(table)
949            .await?
950            .count_rows(None)
951            .await
952            .map_err(Into::into)
953    }
954    /// Names of every index on `messages` - the vector-index tests read this.
955    #[cfg(test)]
956    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
957        let dataset = self.dataset(Table::Messages).await?;
958        let indices = dataset.load_indices().await?;
959        Ok(indices.iter().map(|index| index.name.clone()).collect())
960    }
961
962    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
963    /// a missing index reports the whole table. Powers `pond index status`.
964    pub(crate) async fn unindexed_row_count(
965        &self,
966        table: Table,
967        index_name: &str,
968    ) -> Result<usize> {
969        let dataset = self.dataset(table).await?;
970        let fragments = dataset
971            .unindexed_fragments(index_name)
972            .await
973            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
974        Ok(fragments
975            .iter()
976            .map(|fragment| fragment.num_rows().unwrap_or(0))
977            .sum())
978    }
979
980    /// Drop the named index. Used by the `pond embed --force` model-swap path
981    /// to retire an IVF_PQ whose centroids belong to the old distance
982    /// space, before the next write re-bootstraps it over the new model's
983    /// vectors. Errors when the index does not exist; callers may swallow
984    /// that.
985    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
986        let mut guard = self.cached(table).await?.lock().await;
987        let mut dataset = guard.latest().await?;
988        dataset
989            .drop_index(name)
990            .await
991            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
992        guard.replace(dataset);
993        Ok(())
994    }
995
996    /// Resolve each table's stored location through the namespace catalog
997    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
998    async fn table_location(&self, table_name: &str) -> Result<String> {
999        let request = DescribeTableRequest {
1000            id: Some(self.nm_ident.as_table_id(table_name)),
1001            ..Default::default()
1002        };
1003        let response = self
1004            .nm
1005            .describe_table(request)
1006            .await
1007            .with_context(|| format!("failed to describe table {table_name}"))?;
1008        response
1009            .location
1010            .with_context(|| format!("namespace returned no location for table {table_name}"))
1011    }
1012
1013    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1014    /// Every byte is sized by listing through Lance's object store
1015    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1016    pub async fn table_sizes(&self) -> Result<TableSizes> {
1017        let registry = Arc::new(ObjectStoreRegistry::default());
1018        let params = ObjectStoreParams {
1019            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1020                Arc::new(StorageOptionsAccessor::with_static_options(
1021                    self.storage_options.clone(),
1022                ))
1023            }),
1024            ..Default::default()
1025        };
1026
1027        let sessions = self
1028            .listed_size(
1029                &registry,
1030                &params,
1031                &self.table_location(sessions::SESSIONS).await?,
1032            )
1033            .await?;
1034        let messages = self
1035            .listed_size(
1036                &registry,
1037                &params,
1038                &self.table_location(sessions::MESSAGES).await?,
1039            )
1040            .await?;
1041        let parts = self
1042            .listed_size(
1043                &registry,
1044                &params,
1045                &self.table_location(sessions::PARTS).await?,
1046            )
1047            .await?;
1048        // `other` is whatever sits under the data-dir root but not in the three
1049        // tables (config.toml, stray index temp files): root total minus them.
1050        let root_total = self
1051            .listed_size(&registry, &params, self.location.as_str())
1052            .await?;
1053        let other = root_total.saturating_sub(sessions + messages + parts);
1054        Ok(TableSizes {
1055            sessions,
1056            messages,
1057            parts,
1058            other,
1059        })
1060    }
1061
1062    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1063    async fn listed_size(
1064        &self,
1065        registry: &Arc<ObjectStoreRegistry>,
1066        params: &ObjectStoreParams,
1067        uri: &str,
1068    ) -> Result<u64> {
1069        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1070            .await
1071            .with_context(|| format!("failed to open object store for {uri}"))?;
1072        let mut listing = store.list(Some(base));
1073        let mut total = 0u64;
1074        while let Some(meta) = listing.next().await {
1075            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1076            total += meta.size;
1077        }
1078        Ok(total)
1079    }
1080    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1081        match table {
1082            Table::Sessions => Ok(&self.datasets.sessions),
1083            Table::Messages => Ok(&self.datasets.messages),
1084            Table::Parts => self.parts_cached().await,
1085        }
1086    }
1087
1088    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1089    /// `OnceCell`; once initialized, behaves identically to the other two.
1090    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1091        self.datasets
1092            .parts
1093            .get_or_try_init(|| async {
1094                let dataset = open_or_create_via_ns(
1095                    &self.nm,
1096                    &self.nm_ident,
1097                    sessions::PARTS,
1098                    sessions::part_schema(),
1099                    &self.session,
1100                    &self.storage_options,
1101                )
1102                .await?;
1103                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1104                    dataset,
1105                    last_refresh: Instant::now(),
1106                    refresh_after: self.parts_refresh_after,
1107                }))
1108            })
1109            .await
1110    }
1111    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1112    where
1113        Fut: std::future::Future<Output = Result<T>>,
1114        Op: FnMut() -> Fut,
1115    {
1116        let mut attempt = 0u8;
1117        loop {
1118            attempt = attempt.saturating_add(1);
1119            match operation().await {
1120                Ok(value) => return Ok(value),
1121                Err(error) if attempt < self.retry.attempts => {
1122                    let backoff = self.backoff(attempt);
1123                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1124                    // drops everything below the top-level message.
1125                    let error_chain = format!("{error:#}");
1126                    tracing::warn!(
1127                        label,
1128                        attempt,
1129                        ?backoff,
1130                        error = %error_chain,
1131                        "retrying Lance operation"
1132                    );
1133                    tokio::time::sleep(backoff).await;
1134                }
1135                Err(error) => {
1136                    let error_chain = format!("{error:#}");
1137                    tracing::warn!(
1138                        label,
1139                        attempt,
1140                        error = %error_chain,
1141                        "Lance operation exhausted retries"
1142                    );
1143                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1144                    // rather than the generic `storage_unavailable` bucket. The
1145                    // chain root is a `lance::Error` (commit-conflict family) when
1146                    // pond's retry layer exhausted because the manifest could not
1147                    // be advanced; everything else (timeouts, IAM, disk) stays
1148                    // `storage_unavailable`.
1149                    if is_commit_conflict(&error) {
1150                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1151                    }
1152                    return Err(error);
1153                }
1154            }
1155        }
1156    }
1157    fn backoff(&self, attempt: u8) -> Duration {
1158        let shift = u32::from(attempt.saturating_sub(1));
1159        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1160        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1161        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1162        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1163        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1164        base.mul_f64(factor).min(self.retry.max_backoff)
1165    }
1166}
1167/// Compaction phase: `compact_files` + `cleanup_old_versions`, both inside one
1168/// retry block. Distinct from the indices phase so a hot writer that loses the
1169/// Rewrite race here does not abort index work the operator actually asked for.
1170///
1171/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1172/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1173/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1174/// requires row_addrs but none were provided": `rewrite_files` skips
1175/// row_addrs when stable row ids are on, then the FRI builder demands
1176/// them. With stable_row_ids the remap step is already a no-op
1177/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1178/// !defer_index_remap`), so running without FRI is correct - we only
1179/// lose the documented concurrency-with-index-build benefit. Flip to
1180/// `true` once upstream fixes the conflict.
1181async fn optimize_table_compact(
1182    dataset: &mut Dataset,
1183    table: Table,
1184    progress: Option<&OptimizeProgressFn>,
1185    cleanup: crate::sessions::CleanupConfig,
1186) -> Result<()> {
1187    let compaction = CompactionOptions {
1188        defer_index_remap: false,
1189        ..CompactionOptions::default()
1190    };
1191
1192    emit(
1193        progress,
1194        OptimizeEvent::PhaseStart {
1195            table,
1196            phase: OptimizePhase::Compact,
1197            detail: None,
1198        },
1199    );
1200    let started = Instant::now();
1201    compact_files(dataset, compaction, None).await?;
1202    emit(
1203        progress,
1204        OptimizeEvent::PhaseDone {
1205            table,
1206            phase: OptimizePhase::Compact,
1207            elapsed_ms: started.elapsed().as_millis() as u64,
1208        },
1209    );
1210
1211    emit(
1212        progress,
1213        OptimizeEvent::PhaseStart {
1214            table,
1215            phase: OptimizePhase::Cleanup,
1216            detail: None,
1217        },
1218    );
1219    let started = Instant::now();
1220    // delete_unverified=true is the operator opt-in via `--cleanup-older-than`
1221    // / `--vacuum` that bypasses Lance's 7-day in-progress safety guard
1222    // (UNVERIFIED_THRESHOLD_DAYS in lance/dataset/cleanup.rs). Required to
1223    // reclaim files younger than 7 days; unsafe under concurrent writers.
1224    dataset
1225        .cleanup_old_versions(
1226            cleanup.older_than,
1227            Some(cleanup.delete_unverified),
1228            Some(false),
1229        )
1230        .await
1231        .context("cleanup_old_versions failed during index optimize")?;
1232    emit(
1233        progress,
1234        OptimizeEvent::PhaseDone {
1235            table,
1236            phase: OptimizePhase::Cleanup,
1237            elapsed_ms: started.elapsed().as_millis() as u64,
1238        },
1239    );
1240
1241    Ok(())
1242}
1243
1244/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1245/// for incremental families. Returns `true` if anything committed.
1246async fn optimize_table_indices(
1247    dataset: &mut Dataset,
1248    intents: &[IndexIntent],
1249    table: Table,
1250    progress: Option<&OptimizeProgressFn>,
1251) -> Result<bool> {
1252    let existing = dataset.load_indices().await?;
1253    let existing_names: std::collections::HashSet<String> =
1254        existing.iter().map(|index| index.name.clone()).collect();
1255
1256    let mut append_indices: Vec<String> = Vec::new();
1257    let mut did_work = false;
1258
1259    for intent in intents {
1260        let exists = existing_names.contains(intent.name);
1261
1262        if !exists {
1263            if !intent.trigger.should_create(dataset).await? {
1264                continue;
1265            }
1266            let params = intent.params.build(dataset).await?;
1267            let index_type = intent.params.index_type();
1268            tracing::info!(
1269                index = intent.name,
1270                column = intent.column,
1271                "creating Lance index (trigger fired)",
1272            );
1273            emit(
1274                progress,
1275                OptimizeEvent::PhaseStart {
1276                    table,
1277                    phase: OptimizePhase::IndexCreate,
1278                    detail: Some(intent.name.to_owned()),
1279                },
1280            );
1281            let started = Instant::now();
1282            dataset
1283                .create_index(
1284                    &[intent.column],
1285                    index_type,
1286                    Some(intent.name.to_owned()),
1287                    params.as_ref(),
1288                    false,
1289                )
1290                .await
1291                .with_context(|| format!("failed to create index {}", intent.name))?;
1292            emit(
1293                progress,
1294                OptimizeEvent::PhaseDone {
1295                    table,
1296                    phase: OptimizePhase::IndexCreate,
1297                    elapsed_ms: started.elapsed().as_millis() as u64,
1298                },
1299            );
1300            did_work = true;
1301            continue;
1302        }
1303
1304        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1305        if unindexed.is_empty() {
1306            continue;
1307        }
1308        // Lag guard: let fragments accumulate behind the brute-force fallback
1309        // rather than firing a commit per tiny append. Threshold is operator-
1310        // tunable via `[search].index_lag_threshold`.
1311        if unindexed.len() < index_lag_threshold() {
1312            continue;
1313        }
1314        match intent.params {
1315            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1316                let params = intent.params.build(dataset).await?;
1317                let index_type = intent.params.index_type();
1318                tracing::debug!(
1319                    target: "pond::perf",
1320                    index = intent.name,
1321                    column = intent.column,
1322                    "rebuilding Lance BTree index",
1323                );
1324                emit(
1325                    progress,
1326                    OptimizeEvent::PhaseStart {
1327                        table,
1328                        phase: OptimizePhase::IndexRebuild,
1329                        detail: Some(intent.name.to_owned()),
1330                    },
1331                );
1332                let started = Instant::now();
1333                dataset
1334                    .create_index(
1335                        &[intent.column],
1336                        index_type,
1337                        Some(intent.name.to_owned()),
1338                        params.as_ref(),
1339                        true,
1340                    )
1341                    .await
1342                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1343                emit(
1344                    progress,
1345                    OptimizeEvent::PhaseDone {
1346                        table,
1347                        phase: OptimizePhase::IndexRebuild,
1348                        elapsed_ms: started.elapsed().as_millis() as u64,
1349                    },
1350                );
1351                did_work = true;
1352            }
1353            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1354            | IndexParamsKind::InvertedFtsNgram { .. }
1355            | IndexParamsKind::IvfPqCosine { .. } => {
1356                append_indices.push(intent.name.to_owned());
1357            }
1358            IndexParamsKind::Scalar(_) => {
1359                let params = intent.params.build(dataset).await?;
1360                emit(
1361                    progress,
1362                    OptimizeEvent::PhaseStart {
1363                        table,
1364                        phase: OptimizePhase::IndexRebuild,
1365                        detail: Some(intent.name.to_owned()),
1366                    },
1367                );
1368                let started = Instant::now();
1369                dataset
1370                    .create_index(
1371                        &[intent.column],
1372                        intent.params.index_type(),
1373                        Some(intent.name.to_owned()),
1374                        params.as_ref(),
1375                        true,
1376                    )
1377                    .await
1378                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1379                emit(
1380                    progress,
1381                    OptimizeEvent::PhaseDone {
1382                        table,
1383                        phase: OptimizePhase::IndexRebuild,
1384                        elapsed_ms: started.elapsed().as_millis() as u64,
1385                    },
1386                );
1387                did_work = true;
1388            }
1389        }
1390    }
1391
1392    if !append_indices.is_empty() {
1393        let to_append = append_indices.clone();
1394        emit(
1395            progress,
1396            OptimizeEvent::PhaseStart {
1397                table,
1398                phase: OptimizePhase::IndexAppend,
1399                detail: Some(append_indices.join(", ")),
1400            },
1401        );
1402        let started = Instant::now();
1403        dataset
1404            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1405            .await
1406            .context("optimize_indices(append) failed during index optimize")?;
1407        emit(
1408            progress,
1409            OptimizeEvent::PhaseDone {
1410                table,
1411                phase: OptimizePhase::IndexAppend,
1412                elapsed_ms: started.elapsed().as_millis() as u64,
1413            },
1414        );
1415        tracing::debug!(
1416            target: "pond::perf",
1417            indices = ?append_indices,
1418            "appended trailing fragments into indices",
1419        );
1420        did_work = true;
1421    }
1422
1423    Ok(did_work)
1424}
1425
1426async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1427    if !intent.trigger.should_create(dataset).await? {
1428        return Ok(());
1429    }
1430    let params = intent.params.build(dataset).await?;
1431    dataset
1432        .create_index(
1433            &[intent.column],
1434            intent.params.index_type(),
1435            Some(intent.name.to_owned()),
1436            params.as_ref(),
1437            true,
1438        )
1439        .await
1440        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1441    Ok(())
1442}
1443
1444async fn index_status(
1445    table: Table,
1446    dataset: &Dataset,
1447    intents: &[IndexIntent],
1448) -> Result<Vec<IndexStatus>> {
1449    let existing = dataset.load_indices().await?;
1450    let existing_names: std::collections::HashSet<String> =
1451        existing.iter().map(|index| index.name.clone()).collect();
1452    let total_fragments = dataset.get_fragments().len();
1453    let total_rows = dataset.count_rows(None).await?;
1454    let mut statuses = Vec::with_capacity(intents.len());
1455    for intent in intents {
1456        let exists = existing_names.contains(intent.name);
1457        if !exists {
1458            statuses.push(IndexStatus {
1459                table,
1460                intent_name: intent.name.to_owned(),
1461                fragments_covered: 0,
1462                unindexed_fragments: total_fragments,
1463                unindexed_rows: total_rows,
1464                exists,
1465            });
1466            continue;
1467        }
1468        let unindexed = dataset
1469            .unindexed_fragments(intent.name)
1470            .await
1471            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1472        let unindexed_fragments = unindexed.len();
1473        let unindexed_rows = unindexed
1474            .iter()
1475            .map(|fragment| fragment.num_rows().unwrap_or(0))
1476            .sum();
1477        statuses.push(IndexStatus {
1478            table,
1479            intent_name: intent.name.to_owned(),
1480            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1481            unindexed_fragments,
1482            unindexed_rows,
1483            exists,
1484        });
1485    }
1486    Ok(statuses)
1487}
1488
1489/// Open the table at `table_name` via the namespace; create + initialize on
1490/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1491/// expectation so a stale data dir surfaces early.
1492///
1493/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1494/// the builder re-wraps an already-`Namespace`-wrapped error
1495/// (lance/src/dataset/builder.rs:142), so going through it would force a
1496/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1497/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1498/// namespace external-manifest commits) is not wired here; v1 ships
1499/// Directory v2 only.
1500async fn open_or_create_via_ns(
1501    nm: &Arc<dyn LanceNamespace>,
1502    nm_ident: &NamespaceIdent,
1503    table_name: &str,
1504    schema: lance::deps::arrow_schema::SchemaRef,
1505    session: &Arc<Session>,
1506    storage_options: &HashMap<String, String>,
1507) -> Result<Dataset> {
1508    let table_id = nm_ident.as_table_id(table_name);
1509
1510    let request = DescribeTableRequest {
1511        id: Some(table_id.clone()),
1512        ..Default::default()
1513    };
1514    match nm.describe_table(request).await {
1515        Ok(response) => {
1516            let location = response.location.with_context(|| {
1517                format!("namespace returned no location for table {table_name}")
1518            })?;
1519            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1520            if !storage_options.is_empty() {
1521                builder = builder.with_storage_options(storage_options.clone());
1522            }
1523            let dataset = builder
1524                .load()
1525                .await
1526                .with_context(|| format!("failed to open table {table_name}"))?;
1527            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1528            return Ok(dataset);
1529        }
1530        Err(error) => match &error {
1531            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1532                // fall through to create
1533            }
1534            _ => {
1535                return Err(anyhow::Error::from(error))
1536                    .with_context(|| format!("failed to describe table {table_name}"));
1537            }
1538        },
1539    }
1540
1541    // Create path: pond seeds an empty dataset with the canonical schema so
1542    // every subsequent open lands on a real Lance dataset, not a phantom.
1543    let mut write_params = sessions::write_params_for_create();
1544    write_params.session = Some(session.clone());
1545    write_params.mode = WriteMode::Create;
1546    if !storage_options.is_empty() {
1547        write_params.store_params = Some(ObjectStoreParams {
1548            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1549                storage_options.clone(),
1550            ))),
1551            ..Default::default()
1552        });
1553    }
1554    let reader = sessions::empty_reader(schema)?;
1555    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1556        .await
1557        .with_context(|| format!("failed to create table {table_name}"))
1558}
1559
1560// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
1561// before the underlying `NamespaceError`; walk the whole `.source()` chain
1562// rather than only matching the outer variant.
1563fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1564    if !matches!(error, lance::Error::Namespace { .. }) {
1565        return false;
1566    }
1567    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1568        link.source()
1569    })
1570    .filter_map(|link| link.downcast_ref::<NamespaceError>())
1571    .any(|inner| inner.code() == code)
1572}
1573
1574fn scanner_with_prefilter(
1575    dataset: &Dataset,
1576    predicate: Option<&Predicate>,
1577) -> Result<lance::dataset::scanner::Scanner> {
1578    let mut scanner = dataset.scan();
1579    scanner.prefilter(true);
1580    if let Some(predicate) = predicate {
1581        let filter = predicate.to_lance();
1582        if !filter.is_empty() {
1583            scanner.filter(&filter)?;
1584        }
1585    }
1586    Ok(scanner)
1587}
1588fn ensure_schema_matches(
1589    dataset: &Dataset,
1590    expected: &lance::deps::arrow_schema::Schema,
1591    table_name: &str,
1592) -> Result<()> {
1593    use lance::deps::arrow_schema::DataType;
1594    use std::collections::BTreeSet;
1595    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1596    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1597    let expected_names: BTreeSet<&str> = expected
1598        .fields()
1599        .iter()
1600        .map(|f| f.name().as_str())
1601        .collect();
1602    if actual_names != expected_names {
1603        anyhow::bail!(
1604            "table {table_name} has columns {actual_names:?} but this pond build expects \
1605             {expected_names:?} - the on-disk store predates a schema change; delete the \
1606             data directory and re-run `pond ingest`",
1607        );
1608    }
1609    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1610    // the on-disk vector column width) early with a friendly message. Lance
1611    // would otherwise reject the next write with an opaque schema-mismatch
1612    // error inside the `merge_update` path.
1613    for actual_field in actual.fields() {
1614        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1615            continue;
1616        };
1617        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1618            (actual_field.data_type(), expected_field.data_type())
1619            && actual_dim != expected_dim
1620        {
1621            tracing::warn!(
1622                table = table_name,
1623                column = actual_field.name(),
1624                actual_dim,
1625                expected_dim,
1626                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1627            );
1628        }
1629    }
1630    Ok(())
1631}
1632/// Object-store defaults injected for any non-local pond location. Each key
1633/// is only set when neither the user-provided key nor its env-var-form alias
1634/// is already present, so explicit overrides in `[storage]` always win.
1635/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1636/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1637/// signature is wasted work the server does not validate.
1638fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1639    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1640        if aliases
1641            .iter()
1642            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1643        {
1644            return;
1645        }
1646        options.insert(aliases[0].to_owned(), value.to_owned());
1647    }
1648    set_default(options, &["pool_idle_timeout"], "300 seconds");
1649    set_default(options, &["connect_timeout"], "10 seconds");
1650    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1651        .iter()
1652        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1653    if has_custom_endpoint {
1654        set_default(
1655            options,
1656            &["aws_unsigned_payload", "unsigned_payload"],
1657            "true",
1658        );
1659    }
1660}
1661
1662fn quoted_string(value: &str) -> String {
1663    format!("'{}'", value.replace('\'', "''"))
1664}
1665fn like_contains(value: &str) -> String {
1666    let escaped = value
1667        .replace('\\', "\\\\")
1668        .replace('%', "\\%")
1669        .replace('_', "\\_")
1670        .replace('\'', "''");
1671    format!("'%{escaped}%'")
1672}
1673
1674#[cfg(test)]
1675mod tests {
1676    use super::*;
1677    use tempfile::TempDir;
1678
1679    #[test]
1680    fn namespace_error_code_walks_wrapped_chain() {
1681        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
1682            message: "missing".into(),
1683        }));
1684        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
1685
1686        let wrapped = lance::Error::namespace_source(Box::new(direct));
1687        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
1688
1689        let other_code =
1690            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
1691                message: "nope".into(),
1692            }));
1693        assert!(!is_namespace_error_code(
1694            &other_code,
1695            ErrorCode::TableNotFound
1696        ));
1697
1698        let not_namespace = lance::Error::internal("unrelated");
1699        assert!(!is_namespace_error_code(
1700            &not_namespace,
1701            ErrorCode::TableNotFound
1702        ));
1703    }
1704
1705    /// Round-trip: opening a fresh data dir through `lance-namespace`
1706    /// produces all three tables, and `Handle::scan` returns an empty batch
1707    /// for each (no spurious schema mismatch, no namespace error).
1708    #[tokio::test]
1709    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
1710        let temp = TempDir::new()?;
1711        let url = Url::from_directory_path(temp.path())
1712            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
1713        let handle = Handle::open(&url).await?;
1714        // Each table has its own PK column; project the canonical one so the
1715        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
1716        let cases: [(Table, &[&str]); 3] = [
1717            (Table::Sessions, &["id"]),
1718            (Table::Messages, &["id"]),
1719            (Table::Parts, &["id"]),
1720        ];
1721        for (table, projection) in cases {
1722            let scanner = handle
1723                .scan(table, ScanOpts::project_only(projection))
1724                .await?;
1725            let batch = scanner.try_into_batch().await?;
1726            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
1727        }
1728        Ok(())
1729    }
1730}