Skip to main content

pond/
substrate.rs

1use crate::{
2    RetryPolicy,
3    config::{self},
4    handlers::NamespaceIdent,
5    sessions::{self},
6};
7use anyhow::{Context, Result};
8use lance::Dataset;
9use lance::dataset::builder::DatasetBuilder;
10use lance::dataset::index::DatasetIndexRemapperOptions;
11use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
12use lance::dataset::write::merge_insert::SourceDedupeBehavior;
13use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
14use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
15use lance::index::DatasetIndexExt;
16use lance::index::DatasetIndexInternalExt;
17use lance::index::vector::VectorIndexParams;
18use lance::session::Session;
19use lance_index::IndexType;
20use lance_index::optimize::OptimizeOptions;
21use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
22use lance_io::object_store::{
23    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor,
24};
25use lance_linalg::distance::MetricType;
26use lance_namespace::LanceNamespace;
27use lance_namespace::error::{ErrorCode, NamespaceError};
28use lance_namespace::models::DescribeTableRequest;
29use lance_namespace_impls::ConnectBuilder;
30use std::{
31    collections::HashMap,
32    sync::Arc,
33    time::{Duration, Instant},
34};
35use tokio::sync::{Mutex, OnceCell};
36use tokio_stream::StreamExt;
37use url::Url;
38/// Embedded-row count at which pond builds the IVF_PQ vector index on
39/// `messages.vector` (spec.md#search). Below it, vector search runs a
40/// brute-force flat scan - exact and fast at small and medium scale, and
41/// IVF_PQ cannot train well on fewer vectors anyway.
42pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
43
44/// Default minimum unindexed-fragment count required before a per-intent
45/// append/rebuild step is admitted into `optimize_table_indices`. Lower
46/// values make each commit smaller and more frequent (bad on remote
47/// stores); higher values let fragments accumulate behind the brute-force
48/// fallback. 4 is the floor of the documented 4-8 band.
49pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
50
51static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
52
53/// Seed the process-wide index-lag threshold from `[maintenance].index_lag_threshold`.
54/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
55pub fn init_index_lag_threshold(value: usize) {
56    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
57}
58
59pub fn index_lag_threshold() -> usize {
60    INDEX_LAG_THRESHOLD_RUNTIME
61        .get()
62        .copied()
63        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
64}
65
66/// Per-task fragment-count backstop: tasks this wide always run, bounding
67/// manifest growth even when the amplification veto would skip them. As
68/// policy cap, 0 disables the veto (tests).
69pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
70
71/// Fragments are sized by bytes, not Lance's 1M-row default: kilobyte-average
72/// rows make a row target tolerate multi-GiB fragments that compaction
73/// re-rewrites wholesale to absorb tiny appends (~190 GiB/day of churn).
74pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
75
76const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
77/// Ceiling = Lance's own default.
78const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
79
80/// Keep a task only when the merged-in remainder is >= largest/this:
81/// size-tiered amortization, O(log n) lifetime rewrites per row.
82pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
83
84/// Default manifest-retention window for the safe cleanup pass. Matches
85/// LanceDB's recommended OSS-operator practice (lancedb docs: performance.mdx,
86/// tables/update.mdx). With `delete_unverified=false`, Lance's 7-day
87/// in-progress guard still protects unverified files regardless of this value
88/// (`UNVERIFIED_THRESHOLD_DAYS` in lance/dataset/cleanup.rs).
89pub fn default_cleanup_older_than() -> chrono::Duration {
90    chrono::Duration::days(1)
91}
92
93/// Resolved per-call inputs to the storage-maintenance pass. Built from
94/// `[maintenance]` (and any per-invocation CLI override) at the entry point;
95/// threaded down to `optimize_table_compact` so the substrate never re-reads
96/// `Config` itself.
97#[derive(Debug, Clone, Copy)]
98pub struct MaintenancePolicy {
99    /// See [`DEFAULT_COMPACTION_FRAGMENT_CAP`]; `0` disables the veto.
100    pub compaction_fragment_cap: usize,
101    /// Manifest-retention window handed to `cleanup_old_versions`.
102    pub cleanup_older_than: chrono::Duration,
103}
104
105impl MaintenancePolicy {
106    /// Veto off: run every task Lance plans (the optimize tests assume this).
107    pub fn always_compact() -> Self {
108        Self {
109            compaction_fragment_cap: 0,
110            cleanup_older_than: default_cleanup_older_than(),
111        }
112    }
113}
114
115struct FragmentStat {
116    /// `None` when the manifest lacks any file's size.
117    bytes: Option<u64>,
118    rows: u64,
119    deleted_rows: u64,
120}
121
122/// Data-file bytes of one fragment; `None` (poisoning) when any size is
123/// missing from the manifest.
124fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
125    fragment.files.iter().try_fold(0u64, |total, file| {
126        Some(total + file.file_size_bytes.get()?.get())
127    })
128}
129
130fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
131    FragmentStat {
132        bytes: fragment_bytes(fragment),
133        rows: fragment.physical_rows.unwrap_or(0) as u64,
134        deleted_rows: fragment
135            .deletion_file
136            .as_ref()
137            .and_then(|deletions| deletions.num_deleted_rows)
138            .unwrap_or(0) as u64,
139    }
140}
141
142/// Rows per [`TARGET_FRAGMENT_BYTES`] at the table's average row size.
143fn derived_target_rows(stats: &[FragmentStat]) -> usize {
144    let (mut bytes, mut rows) = (0u64, 0u64);
145    for stat in stats {
146        if let Some(fragment_bytes) = stat.bytes
147            && stat.rows > 0
148        {
149            bytes += fragment_bytes;
150            rows += stat.rows;
151        }
152    }
153    if bytes == 0 || rows == 0 {
154        return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
155    }
156    let avg_row_bytes = (bytes / rows).max(1);
157    (TARGET_FRAGMENT_BYTES / avg_row_bytes)
158        .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
159}
160
161/// Amplification veto: skip tasks that mostly rewrite one big fragment to
162/// absorb fresh appends. Deletion-materialization tasks always pass (vetoing
163/// them would leave tombstones unreclaimed forever); compared in bytes when
164/// every file size is known, rows otherwise.
165fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
166    if stats.iter().any(|stat| {
167        stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
168    }) {
169        return true;
170    }
171    if stats.len() >= cap {
172        return true;
173    }
174    let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
175        stats.iter().filter_map(|stat| stat.bytes).collect()
176    } else {
177        stats.iter().map(|stat| stat.rows).collect()
178    };
179    let total: u64 = weights.iter().sum();
180    let largest = weights.iter().copied().max().unwrap_or(0);
181    (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
182}
183
184/// Declarative description of one index pond keeps on a table. Created when
185/// its trigger fires; folded forward by `pond index optimize`.
186#[derive(Debug, Clone)]
187pub struct IndexIntent {
188    /// Stable on-disk name. Must match across runs so existence checks
189    /// resolve.
190    pub name: &'static str,
191    /// Column the index covers.
192    pub column: &'static str,
193    /// Condition evaluated against the live dataset before each cycle.
194    pub trigger: IndexTrigger,
195    /// How the params are built at create time. Some intents have static
196    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
197    pub params: IndexParamsKind,
198}
199
200/// When an [`IndexIntent`] should exist on disk.
201#[derive(Debug, Clone)]
202pub enum IndexTrigger {
203    /// Build whenever the table has any rows. Used for FTS and scalar
204    /// indices: there is no training cost worth delaying.
205    OnAnyRows,
206    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
207    /// IVF_PQ vector index, which trains poorly on too few vectors.
208    OnNonNullCount {
209        column: &'static str,
210        threshold: usize,
211    },
212}
213
214/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
215/// right `IndexParams` at create time.
216#[derive(Debug, Clone)]
217pub enum IndexParamsKind {
218    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
219    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
220    Scalar(BuiltinIndexType),
221    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
222    /// `[min, max]` range and stemming / stop-words off
223    /// (spec.md#search-language-neutral-index).
224    InvertedFtsNgram { min: u32, max: u32 },
225    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
226    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
227    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
228    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
229    IvfPqCosine {
230        sub_vectors: usize,
231        num_bits: u8,
232        max_iters: usize,
233    },
234}
235
236impl IndexTrigger {
237    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
238        match self {
239            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
240            Self::OnNonNullCount { column, threshold } => {
241                let count = dataset
242                    .count_rows(Some(format!("{column} IS NOT NULL")))
243                    .await?;
244                Ok(count >= *threshold)
245            }
246        }
247    }
248}
249
250impl IndexParamsKind {
251    fn index_type(&self) -> IndexType {
252        match self {
253            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
254            Self::Scalar(_) => IndexType::BTree,
255            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
256            Self::IvfPqCosine { .. } => IndexType::Vector,
257        }
258    }
259
260    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
261        match self {
262            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
263            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
264                InvertedIndexParams::default()
265                    .base_tokenizer("ngram".to_owned())
266                    .ngram_min_length(*min)
267                    .ngram_max_length(*max)
268                    .stem(false)
269                    .remove_stop_words(false),
270            )),
271            Self::IvfPqCosine {
272                sub_vectors,
273                num_bits,
274                max_iters,
275            } => {
276                let count = dataset
277                    .count_rows(Some("vector IS NOT NULL".to_owned()))
278                    .await?;
279                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
280                Ok(Box::new(VectorIndexParams::ivf_pq(
281                    partitions,
282                    *num_bits,
283                    *sub_vectors,
284                    MetricType::Cosine,
285                    *max_iters,
286                )))
287            }
288        }
289    }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq)]
293pub struct IndexStatus {
294    pub table: Table,
295    pub intent_name: String,
296    pub fragments_covered: usize,
297    pub unindexed_fragments: usize,
298    pub unindexed_rows: usize,
299    pub exists: bool,
300}
301
302/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
303/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
304/// downcasts to this type to classify the outcome as `conflict` rather than
305/// the generic `storage_unavailable`.
306#[derive(Debug, Clone, Copy)]
307pub struct ConflictExhausted {
308    pub attempts: u8,
309}
310
311impl std::fmt::Display for ConflictExhausted {
312    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
313        write!(
314            formatter,
315            "commit conflict exhausted after {} attempt(s)",
316            self.attempts
317        )
318    }
319}
320
321impl std::error::Error for ConflictExhausted {}
322
323/// Per-phase result for one table's pass through `Handle::optimize_table`.
324/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
325/// compaction phase get independent retry budgets and independent commits,
326/// so a hot writer that starves the Rewrite cannot abort the index Update.
327#[derive(Debug)]
328pub enum PhaseOutcome {
329    /// Phase attempted and committed work.
330    Ok,
331    /// Phase attempted; no work was needed.
332    Noop,
333    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
334    /// can rerun later once the hot writer quiesces).
335    SkippedConflict,
336    /// Phase failed with a non-conflict error.
337    Failed(anyhow::Error),
338    /// Phase not requested by the caller (e.g. compaction skipped under
339    /// `Store::build_indices_only`).
340    NotAttempted,
341}
342
343impl PhaseOutcome {
344    pub fn is_failed(&self) -> bool {
345        matches!(self, Self::Failed(_))
346    }
347}
348
349/// What `Handle::optimize_table` did for one table.
350#[derive(Debug)]
351pub struct TableOptimizeOutcome {
352    pub table: Table,
353    pub indices: PhaseOutcome,
354    pub compaction: PhaseOutcome,
355}
356
357/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
358/// progress callback to render a live spinner; library callers pass `None`.
359#[derive(Debug, Clone)]
360pub enum OptimizeEvent {
361    PhaseStart {
362        table: Table,
363        phase: OptimizePhase,
364        detail: Option<String>,
365    },
366    PhaseDone {
367        table: Table,
368        phase: OptimizePhase,
369        elapsed_ms: u64,
370    },
371}
372
373#[derive(Debug, Clone, Copy)]
374pub enum OptimizePhase {
375    Compact,
376    Cleanup,
377    IndexCreate,
378    IndexRebuild,
379    IndexAppend,
380}
381
382impl OptimizePhase {
383    pub fn label(self) -> &'static str {
384        match self {
385            Self::Compact => "compact",
386            Self::Cleanup => "cleanup",
387            Self::IndexCreate => "index-create",
388            Self::IndexRebuild => "index-rebuild",
389            Self::IndexAppend => "index-append",
390        }
391    }
392}
393
394pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
395
396fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
397    if let Some(callback) = progress {
398        callback(event);
399    }
400}
401
402/// True when the chain root is one of Lance's commit-conflict variants
403/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
404/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
405pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
406    error.downcast_ref::<lance::Error>().is_some_and(|err| {
407        matches!(
408            err,
409            lance::Error::CommitConflict { .. }
410                | lance::Error::RetryableCommitConflict { .. }
411                | lance::Error::TooMuchWriteContention { .. }
412        )
413    })
414}
415
416/// True when `retry_lance` exhausted retries against an OCC conflict and
417/// attached `ConflictExhausted` to the chain head.
418fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
419    error.chain().any(|cause| cause.is::<ConflictExhausted>())
420}
421
422/// On-disk byte totals for the three session datasets, plus everything else
423/// under the data-dir root. Sized by listing through Lance's object-store
424/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
425#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
426pub struct TableSizes {
427    pub sessions: u64,
428    pub messages: u64,
429    pub parts: u64,
430    pub other: u64,
431    pub sessions_data: DataLiveness,
432    pub messages_data: DataLiveness,
433    pub parts_data: DataLiveness,
434}
435
436/// `data/` bytes on disk vs bytes the latest manifest references; the gap is
437/// superseded versions awaiting the cleanup retention window.
438#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
439pub struct DataLiveness {
440    pub on_disk: u64,
441    /// `None` when the manifest lacks any referenced file's size.
442    pub live: Option<u64>,
443}
444
445impl DataLiveness {
446    pub fn dead(&self) -> Option<u64> {
447        self.live.map(|live| self.on_disk.saturating_sub(live))
448    }
449}
450
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum ScalarValue {
453    String(String),
454    Int32(i32),
455    Raw(String),
456}
457impl From<&str> for ScalarValue {
458    fn from(value: &str) -> Self {
459        Self::String(value.to_owned())
460    }
461}
462impl From<String> for ScalarValue {
463    fn from(value: String) -> Self {
464        Self::String(value)
465    }
466}
467impl From<i32> for ScalarValue {
468    fn from(value: i32) -> Self {
469        Self::Int32(value)
470    }
471}
472#[derive(Debug, Clone, PartialEq, Eq)]
473pub enum Predicate {
474    Eq(&'static str, ScalarValue),
475    Ne(&'static str, ScalarValue),
476    IsNull(&'static str),
477    IsNotNull(&'static str),
478    In(&'static str, Vec<ScalarValue>),
479    LikeContains(&'static str, String),
480    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
481    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
482    /// so the filter is a full-scan-with-predicate - acceptable for
483    /// human-driven `--project re:...` queries, not for hot paths.
484    Regex(&'static str, String),
485    Gte(&'static str, ScalarValue),
486    Lte(&'static str, ScalarValue),
487    And(Vec<Predicate>),
488    Or(Vec<Predicate>),
489    Not(Box<Predicate>),
490}
491impl Predicate {
492    pub fn to_lance(&self) -> String {
493        match self {
494            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
495            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
496            Self::IsNull(column) => format!("{column} IS NULL"),
497            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
498            Self::In(column, values) => {
499                let values = values
500                    .iter()
501                    .map(ScalarValue::to_lance)
502                    .collect::<Vec<_>>()
503                    .join(", ");
504                format!("{column} IN ({values})")
505            }
506            Self::LikeContains(column, value) => {
507                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
508            }
509            Self::Regex(column, pattern) => {
510                format!("regexp_like({column}, {})", quoted_string(pattern))
511            }
512            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
513            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
514            Self::And(predicates) => predicates
515                .iter()
516                .map(Self::to_lance)
517                .filter(|predicate| !predicate.is_empty())
518                .collect::<Vec<_>>()
519                .join(" AND "),
520            Self::Or(predicates) => {
521                // Wrap in parens so the disjunction composes safely as a child
522                // of an outer `And` (SQL `OR` binds looser than `AND`).
523                let body = predicates
524                    .iter()
525                    .map(Self::to_lance)
526                    .filter(|predicate| !predicate.is_empty())
527                    .collect::<Vec<_>>()
528                    .join(" OR ");
529                if body.is_empty() {
530                    String::new()
531                } else {
532                    format!("({body})")
533                }
534            }
535            Self::Not(inner) => {
536                let body = inner.to_lance();
537                if body.is_empty() {
538                    String::new()
539                } else {
540                    format!("NOT ({body})")
541                }
542            }
543        }
544    }
545}
546/// Read-side options for `Handle::scan`: optional prefilter predicate and
547/// optional projection. Default = no filter, all columns.
548#[derive(Default)]
549pub struct ScanOpts<'a> {
550    pub predicate: Option<&'a Predicate>,
551    pub projection: Option<&'a [&'a str]>,
552}
553
554impl<'a> ScanOpts<'a> {
555    pub fn project_only(projection: &'a [&'a str]) -> Self {
556        Self {
557            predicate: None,
558            projection: Some(projection),
559        }
560    }
561    pub fn with_predicate_and_projection(
562        predicate: &'a Predicate,
563        projection: &'a [&'a str],
564    ) -> Self {
565        Self {
566            predicate: Some(predicate),
567            projection: Some(projection),
568        }
569    }
570}
571
572impl ScalarValue {
573    fn to_lance(&self) -> String {
574        match self {
575            Self::String(value) => quoted_string(value),
576            Self::Int32(value) => value.to_string(),
577            Self::Raw(value) => value.clone(),
578        }
579    }
580}
581/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
582/// default (local FS gets a tighter cap; object stores stay near Lance's
583/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
584#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
585pub struct RuntimeCaps {
586    pub index_cache_bytes: Option<usize>,
587    pub metadata_cache_bytes: Option<usize>,
588}
589
590impl RuntimeCaps {
591    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
592        Self {
593            index_cache_bytes: config.index_cache_bytes,
594            metadata_cache_bytes: config.metadata_cache_bytes,
595        }
596    }
597}
598
599/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
600/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
601/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
602const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
603const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
604/// Object-store defaults: latency to refill is per-page, so keep more in cache.
605const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
606const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
607
608fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
609    let (index_default, metadata_default) = if config::is_local(location) {
610        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
611    } else {
612        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
613    };
614    (
615        caps.index_cache_bytes.unwrap_or(index_default),
616        caps.metadata_cache_bytes.unwrap_or(metadata_default),
617    )
618}
619
620pub struct Handle {
621    datasets: DatasetSet,
622    retry: RetryPolicy,
623    /// One `lance::Session` shared across all three datasets. Carries the
624    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
625    /// the underlying object_store / S3 client). Sharing the session means
626    /// one cache pool covers all three tables and one S3 client serves all
627    /// three datasets - load-bearing on object-store backends where a
628    /// per-dataset client would mean 3x the connection pools and 3x the
629    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
630    #[allow(dead_code)]
631    session: Arc<Session>,
632    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
633    /// future hosted pond swaps to "rest" without touching read/write paths
634    /// (spec.md#lance-chokepoints-catalog).
635    nm: Arc<dyn LanceNamespace>,
636    /// Namespace identifier this handle binds to. v1 is always `root()`; the
637    /// typed seam matches `resolve_namespace`'s return so multi-namespace
638    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
639    nm_ident: NamespaceIdent,
640    /// Object-store options threaded through every `DatasetBuilder` and
641    /// `Dataset::write` call so refresh / index-creation paths inherit the
642    /// same credentials and region as the initial open. Empty on local-FS
643    /// installs.
644    storage_options: HashMap<String, String>,
645    /// Data-dir URL the handle was opened against. `pond status` reads this
646    /// to display where the bytes live and to decide whether to walk a local
647    /// directory or issue a remote `LIST` for sizing.
648    location: Url,
649    /// Cached `parts.lance` open metadata, used the first time a caller asks
650    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
651    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
652    parts_refresh_after: Duration,
653}
654
655impl std::fmt::Debug for Handle {
656    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
657        formatter
658            .debug_struct("Handle")
659            .field("datasets", &self.datasets)
660            .field("retry", &self.retry)
661            .field("nm_ident", &self.nm_ident)
662            .field("storage_options", &self.storage_options)
663            .field("location", &self.location)
664            .finish()
665    }
666}
667
668#[derive(Debug, Clone, Copy, PartialEq, Eq)]
669pub enum Table {
670    Sessions,
671    Messages,
672    Parts,
673}
674impl Table {
675    pub fn as_str(self) -> &'static str {
676        self.label()
677    }
678
679    fn label(self) -> &'static str {
680        match self {
681            Self::Sessions => "sessions",
682            Self::Messages => "messages",
683            Self::Parts => "parts",
684        }
685    }
686}
687#[derive(Debug)]
688struct DatasetSet {
689    sessions: Mutex<CachedDataset>,
690    messages: Mutex<CachedDataset>,
691    /// `parts.lance` opens lazily on the first read or write that needs it:
692    /// any `pond_get` (every mode reads parts to build summaries), grouped
693    /// search hydrating user-hit summaries, or ingest with Part events. A
694    /// process that does none of those skips the file, saving its metadata
695    /// pages and file handle at cold-open. The OnceCell makes init
696    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
697    /// to the other two.
698    parts: OnceCell<Mutex<CachedDataset>>,
699}
700#[derive(Debug)]
701struct CachedDataset {
702    dataset: Dataset,
703    last_refresh: Instant,
704    refresh_after: Duration,
705}
706impl CachedDataset {
707    async fn latest(&mut self) -> Result<Dataset> {
708        if self.last_refresh.elapsed() >= self.refresh_after {
709            self.dataset.checkout_latest().await?;
710            self.last_refresh = Instant::now();
711        }
712        Ok(self.dataset.clone())
713    }
714    fn replace(&mut self, dataset: Dataset) {
715        self.dataset = dataset;
716        self.last_refresh = Instant::now();
717    }
718}
719impl Handle {
720    /// Open without storage options or explicit cache caps. Backend-aware
721    /// defaults from `[runtime]` apply.
722    pub async fn open(location: &Url) -> Result<Self> {
723        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
724    }
725
726    /// Open with object-store options handed through to Lance verbatim, plus
727    /// the resolved `[runtime]` cache caps. Object-store keys are the
728    /// `object_store` crate's standard config names; pond does not parse them.
729    /// Opening datasets never performs index work; index lifecycle lives under
730    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
731    pub async fn open_with_options(
732        location: &Url,
733        mut storage_options: HashMap<String, String>,
734        caps: RuntimeCaps,
735    ) -> Result<Self> {
736        if let Some(path) = config::local_path(location) {
737            tokio::fs::create_dir_all(&path)
738                .await
739                .with_context(|| format!("failed to create data dir {}", path.display()))?;
740        } else {
741            apply_remote_storage_defaults(&mut storage_options);
742        }
743        // One Session shared across all three datasets so metadata/index
744        // caches and the object_store registry (and thus any S3 client) are
745        // pooled rather than duplicated three times. Caps are sized by the
746        // `[runtime]` block; explicit values from `caps` win, otherwise the
747        // local/remote backend default kicks in.
748        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
749        let session = Arc::new(Session::new(
750            index_cache_bytes,
751            metadata_cache_bytes,
752            Arc::new(ObjectStoreRegistry::default()),
753        ));
754        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
755        // The `root` property is whatever URL the Directory impl understands;
756        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
757        // URLs, so passing the scheme-qualified URL for local FS works the
758        // same as the bare-path form. Trailing slash stripped for clean logs.
759        let root = location.as_str().trim_end_matches('/').to_string();
760        let mut connect = ConnectBuilder::new("dir")
761            .property("root", root)
762            .session(session.clone());
763        // Object-store credentials/region/endpoint flow into the namespace
764        // via the `storage.<key>` property convention (lance-namespace-impls
765        // dir.rs from_properties: lines 423-436).
766        for (key, value) in &storage_options {
767            connect = connect.property(format!("storage.{key}"), value.clone());
768        }
769        let nm: Arc<dyn LanceNamespace> = connect
770            .connect()
771            .await
772            .context("failed to connect lance Directory namespace")?;
773        let nm_ident = NamespaceIdent::root();
774        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
775        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
776        // essentially free and removes the stale-read window entirely. Object
777        // stores have real per-call cost; `5s` caps manifest fetch overhead at
778        // acceptable lag for human-driven queries.
779        let refresh_after = if config::is_local(location) {
780            Duration::ZERO
781        } else {
782            Duration::from_secs(5)
783        };
784        let handle = Self {
785            datasets: DatasetSet {
786                sessions: Mutex::new(CachedDataset {
787                    dataset: open_or_create_via_ns(
788                        &nm,
789                        &nm_ident,
790                        sessions::SESSIONS,
791                        sessions::session_schema(),
792                        &session,
793                        &storage_options,
794                    )
795                    .await?,
796                    last_refresh: Instant::now(),
797                    refresh_after,
798                }),
799                messages: Mutex::new(CachedDataset {
800                    dataset: open_or_create_via_ns(
801                        &nm,
802                        &nm_ident,
803                        sessions::MESSAGES,
804                        sessions::message_schema(),
805                        &session,
806                        &storage_options,
807                    )
808                    .await?,
809                    last_refresh: Instant::now(),
810                    refresh_after,
811                }),
812                parts: OnceCell::new(),
813            },
814            retry: RetryPolicy::default(),
815            session,
816            nm,
817            nm_ident,
818            storage_options,
819            location: location.clone(),
820            parts_refresh_after: refresh_after,
821        };
822        Ok(handle)
823    }
824
825    pub fn location(&self) -> &Url {
826        &self.location
827    }
828
829    /// Read-only view of the `storage_options` the handle was opened with.
830    /// `pond status` needs them to instantiate a raw `object_store` client
831    /// that can `LIST` the remote bucket for sizing.
832    pub fn storage_options(&self) -> &HashMap<String, String> {
833        &self.storage_options
834    }
835
836    /// Object-store URI for a `pond_sql_query` export artifact:
837    /// `<location>/exports/<name>`. A sibling of the `*.lance` table dirs;
838    /// the Directory namespace tracks tables in its `__manifest` table rather
839    /// than by listing prefixes, so this prefix is never seen as a table
840    /// (lance-namespace-impls dir/manifest.rs). Never `register_table`'d.
841    fn export_uri(&self, name: &str) -> String {
842        format!(
843            "{}/exports/{name}",
844            self.location.as_str().trim_end_matches('/')
845        )
846    }
847
848    /// `ObjectStoreParams` carrying the handle's `storage_options` so raw
849    /// object-store opens (export I/O, `table_sizes` listing) inherit the same
850    /// credentials/region as the dataset opens. Empty options -> no accessor.
851    fn object_store_params(&self) -> ObjectStoreParams {
852        ObjectStoreParams {
853            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
854                Arc::new(StorageOptionsAccessor::with_static_options(
855                    self.storage_options.clone(),
856                ))
857            }),
858            ..Default::default()
859        }
860    }
861
862    /// Write a `pond_sql_query` export artifact, reusing the handle's
863    /// storage_options so S3 installs inherit the same credentials.
864    pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
865        let uri = self.export_uri(name);
866        let registry = Arc::new(ObjectStoreRegistry::default());
867        let (store, path) =
868            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
869                .await
870                .with_context(|| format!("failed to open object store for {uri}"))?;
871        store
872            .put(&path, bytes)
873            .await
874            .with_context(|| format!("failed to write export {uri}"))?;
875        Ok(())
876    }
877
878    /// Read a `pond_sql_query` export artifact back (for the
879    /// `pond-sql-export://` MCP resource).
880    pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
881        let uri = self.export_uri(name);
882        let registry = Arc::new(ObjectStoreRegistry::default());
883        let (store, path) =
884            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
885                .await
886                .with_context(|| format!("failed to open object store for {uri}"))?;
887        let bytes = store
888            .read_one_all(&path)
889            .await
890            .with_context(|| format!("failed to read export {uri}"))?;
891        Ok(bytes.to_vec())
892    }
893
894    /// Local filesystem path of an export artifact, when the data dir is
895    /// `file://`. The stdio MCP client shares this filesystem, so it can read
896    /// the file directly (e.g. duckdb/polars) instead of pulling base64 via
897    /// `resources/read`. `None` on object-store installs.
898    pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
899        if self.location.scheme() != "file" {
900            return None;
901        }
902        let dir = self.location.to_file_path().ok()?;
903        Some(dir.join("exports").join(name))
904    }
905
906    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
907        Ok((
908            self.count_rows(Table::Sessions).await?,
909            self.count_rows(Table::Messages).await?,
910            self.count_rows(Table::Parts).await?,
911        ))
912    }
913
914    /// Insert-only merge: append new rows, never overwrite a matched PK.
915    /// Returns rows inserted. The fold lives separately under
916    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
917    pub(crate) async fn merge_insert(
918        &self,
919        table: Table,
920        batch: RecordBatch,
921        row_count: usize,
922    ) -> Result<u64> {
923        self.merge(
924            table,
925            batch,
926            row_count,
927            "merge_insert",
928            WhenMatched::DoNothing,
929            WhenNotMatched::InsertAll,
930        )
931        .await
932    }
933
934    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
935    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
936    pub(crate) async fn merge_update(
937        &self,
938        table: Table,
939        batch: RecordBatch,
940        row_count: usize,
941    ) -> Result<u64> {
942        self.merge(
943            table,
944            batch,
945            row_count,
946            "merge_update",
947            WhenMatched::UpdateAll,
948            WhenNotMatched::DoNothing,
949        )
950        .await
951    }
952
953    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
954    /// Returns the number of rows affected (inserted or updated, whichever the
955    /// behaviors produce).
956    async fn merge(
957        &self,
958        table: Table,
959        batch: RecordBatch,
960        row_count: usize,
961        op: &'static str,
962        when_matched: WhenMatched,
963        when_not_matched: WhenNotMatched,
964    ) -> Result<u64> {
965        if row_count == 0 {
966            return Ok(0);
967        }
968        let started = Instant::now();
969        let result = self
970            .retry_lance(table.label(), || async {
971                let mut cached = self.cached(table).await?.lock().await;
972                let existing = cached.latest().await?;
973                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
974                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
975                builder.when_matched(when_matched.clone());
976                builder.when_not_matched(when_not_matched.clone());
977                // pond presents each PK at most once per batch; FirstSeen keeps
978                // the first occurrence rather than failing (Lance's default).
979                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
980                // Cleanup is operator-driven via `pond index optimize`; the
981                // per-commit auto hook would add a LIST per write on remote
982                // backends without changing the steady-state retention.
983                builder.skip_auto_cleanup(true);
984                let (dataset, stats) = builder
985                    .try_build()?
986                    .execute_reader(Box::new(reader))
987                    .await?;
988                cached.replace(dataset.as_ref().clone());
989                Ok((
990                    stats.num_inserted_rows + stats.num_updated_rows,
991                    stats.num_skipped_duplicates,
992                ))
993            })
994            .await;
995        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
996        tracing::info!(
997            target: "pond::perf",
998            op,
999            table = %table.label(),
1000            rows = row_count,
1001            elapsed_ms = started.elapsed().as_millis() as u64,
1002            skipped,
1003            "merge",
1004        );
1005        result.map(|(affected, _)| affected)
1006    }
1007
1008    /// Run the table-local maintenance cycle for the supplied index intents.
1009    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
1010    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
1011    ///
1012    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
1013    /// commit independently and use independent retry budgets, so a hot writer
1014    /// that starves compaction (Rewrite) does not abort the index build
1015    /// (Update) the operator actually asked for.
1016    pub async fn optimize_table(
1017        &self,
1018        table: Table,
1019        intents: &[IndexIntent],
1020        progress: Option<&OptimizeProgressFn>,
1021        policy: &MaintenancePolicy,
1022    ) -> TableOptimizeOutcome {
1023        let compaction = self
1024            .run_optimize_compact_phase(table, progress, policy)
1025            .await;
1026        let indices = self
1027            .run_optimize_indices_phase(table, intents, progress)
1028            .await;
1029        TableOptimizeOutcome {
1030            table,
1031            indices,
1032            compaction,
1033        }
1034    }
1035
1036    /// Run only the indices phase for one table. Used by `pond embed`'s tail
1037    /// to fold newly written vectors into the indices without paying the
1038    /// compaction retry budget while embed itself may still be writing.
1039    pub async fn optimize_table_indices_only(
1040        &self,
1041        table: Table,
1042        intents: &[IndexIntent],
1043        progress: Option<&OptimizeProgressFn>,
1044    ) -> PhaseOutcome {
1045        self.run_optimize_indices_phase(table, intents, progress)
1046            .await
1047    }
1048
1049    async fn run_optimize_indices_phase(
1050        &self,
1051        table: Table,
1052        intents: &[IndexIntent],
1053        progress: Option<&OptimizeProgressFn>,
1054    ) -> PhaseOutcome {
1055        if intents.is_empty() {
1056            return PhaseOutcome::Noop;
1057        }
1058        let result = self
1059            .retry_lance(table.label(), || async {
1060                let mut guard = self.cached(table).await?.lock().await;
1061                let mut dataset = guard.latest().await?;
1062                let did_work =
1063                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
1064                guard.replace(dataset);
1065                Ok::<_, anyhow::Error>(did_work)
1066            })
1067            .await;
1068        match result {
1069            Ok(true) => PhaseOutcome::Ok,
1070            Ok(false) => PhaseOutcome::Noop,
1071            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1072            Err(error) => PhaseOutcome::Failed(error),
1073        }
1074    }
1075
1076    async fn run_optimize_compact_phase(
1077        &self,
1078        table: Table,
1079        progress: Option<&OptimizeProgressFn>,
1080        policy: &MaintenancePolicy,
1081    ) -> PhaseOutcome {
1082        let result = self
1083            .retry_lance(table.label(), || async {
1084                let mut guard = self.cached(table).await?.lock().await;
1085                let mut dataset = guard.latest().await?;
1086                optimize_table_compact(&mut dataset, table, progress, policy).await?;
1087                guard.replace(dataset);
1088                Ok::<_, anyhow::Error>(())
1089            })
1090            .await;
1091        match result {
1092            Ok(()) => PhaseOutcome::Ok,
1093            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1094            Err(error) => PhaseOutcome::Failed(error),
1095        }
1096    }
1097
1098    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1099        self.retry_lance(table.label(), || async {
1100            let mut guard = self.cached(table).await?.lock().await;
1101            let mut dataset = guard.latest().await?;
1102            rebuild_index(&mut dataset, intent).await?;
1103            guard.replace(dataset);
1104            Ok(())
1105        })
1106        .await
1107    }
1108
1109    pub async fn index_status(
1110        &self,
1111        table: Table,
1112        intents: &[IndexIntent],
1113    ) -> Result<Vec<IndexStatus>> {
1114        let dataset = self.dataset(table).await?;
1115        index_status(table, &dataset, intents).await
1116    }
1117
1118    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1119        let mut cached = self.cached(table).await?.lock().await;
1120        cached.latest().await
1121    }
1122    /// Build a prefiltered `Scanner` for `table`. Composable read entry
1123    /// point for callers that need to layer extra builder calls
1124    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
1125    /// Routine scans should prefer `Handle::scan`.
1126    pub(crate) async fn scanner(
1127        &self,
1128        table: Table,
1129        predicate: Option<&Predicate>,
1130    ) -> Result<lance::dataset::scanner::Scanner> {
1131        let dataset = self.dataset(table).await?;
1132        scanner_with_prefilter(&dataset, predicate)
1133    }
1134    /// Single read entry point: prefilter via `predicate`, optionally
1135    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
1136    pub async fn scan(
1137        &self,
1138        table: Table,
1139        opts: ScanOpts<'_>,
1140    ) -> Result<lance::dataset::scanner::Scanner> {
1141        let mut scanner = self.scanner(table, opts.predicate).await?;
1142        if let Some(projection) = opts.projection {
1143            scanner.project(projection)?;
1144        }
1145        Ok(scanner)
1146    }
1147    pub(crate) async fn scan_batch(
1148        &self,
1149        table: Table,
1150        predicate: Option<&Predicate>,
1151        projection: &[&str],
1152    ) -> Result<RecordBatch> {
1153        let opts = ScanOpts {
1154            predicate,
1155            projection: (!projection.is_empty()).then_some(projection),
1156        };
1157        self.scan(table, opts)
1158            .await?
1159            .try_into_batch()
1160            .await
1161            .context("scan failed")
1162    }
1163    pub async fn count_rows(&self, table: Table) -> Result<usize> {
1164        self.dataset(table)
1165            .await?
1166            .count_rows(None)
1167            .await
1168            .map_err(Into::into)
1169    }
1170    /// Names of every index on `messages` - the vector-index tests read this.
1171    #[cfg(test)]
1172    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1173        let dataset = self.dataset(Table::Messages).await?;
1174        let indices = dataset.load_indices().await?;
1175        Ok(indices.iter().map(|index| index.name.clone()).collect())
1176    }
1177
1178    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
1179    /// a missing index reports the whole table. Powers `pond index status`.
1180    pub(crate) async fn unindexed_row_count(
1181        &self,
1182        table: Table,
1183        index_name: &str,
1184    ) -> Result<usize> {
1185        let dataset = self.dataset(table).await?;
1186        let fragments = dataset
1187            .unindexed_fragments(index_name)
1188            .await
1189            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1190        Ok(fragments
1191            .iter()
1192            .map(|fragment| fragment.num_rows().unwrap_or(0))
1193            .sum())
1194    }
1195
1196    /// Drop the named index. Used by the `pond embed --force` model-swap path
1197    /// to retire an IVF_PQ whose centroids belong to the old distance
1198    /// space, before the next write re-bootstraps it over the new model's
1199    /// vectors. Errors when the index does not exist; callers may swallow
1200    /// that.
1201    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1202        let mut guard = self.cached(table).await?.lock().await;
1203        let mut dataset = guard.latest().await?;
1204        dataset
1205            .drop_index(name)
1206            .await
1207            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1208        guard.replace(dataset);
1209        Ok(())
1210    }
1211
1212    /// Resolve each table's stored location through the namespace catalog
1213    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
1214    async fn table_location(&self, table_name: &str) -> Result<String> {
1215        let request = DescribeTableRequest {
1216            id: Some(self.nm_ident.as_table_id(table_name)),
1217            ..Default::default()
1218        };
1219        let response = self
1220            .nm
1221            .describe_table(request)
1222            .await
1223            .with_context(|| format!("failed to describe table {table_name}"))?;
1224        response
1225            .location
1226            .with_context(|| format!("namespace returned no location for table {table_name}"))
1227    }
1228
1229    /// On-disk byte totals for the three datasets plus the data-dir remainder.
1230    /// Every byte is sized by listing through Lance's object store
1231    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
1232    pub async fn table_sizes(&self) -> Result<TableSizes> {
1233        let registry = Arc::new(ObjectStoreRegistry::default());
1234        let params = self.object_store_params();
1235
1236        let sessions = self
1237            .listed_size(
1238                &registry,
1239                &params,
1240                &self.table_location(sessions::SESSIONS).await?,
1241            )
1242            .await?;
1243        let messages = self
1244            .listed_size(
1245                &registry,
1246                &params,
1247                &self.table_location(sessions::MESSAGES).await?,
1248            )
1249            .await?;
1250        let parts = self
1251            .listed_size(
1252                &registry,
1253                &params,
1254                &self.table_location(sessions::PARTS).await?,
1255            )
1256            .await?;
1257        // `other` is whatever sits under the data-dir root but not in the three
1258        // tables (config.toml, stray index temp files): root total minus them.
1259        let root_total = self
1260            .listed_size(&registry, &params, self.location.as_str())
1261            .await?;
1262        let other = root_total.saturating_sub(sessions + messages + parts);
1263        let sessions_data = self
1264            .data_liveness(&registry, &params, Table::Sessions, sessions::SESSIONS)
1265            .await?;
1266        let messages_data = self
1267            .data_liveness(&registry, &params, Table::Messages, sessions::MESSAGES)
1268            .await?;
1269        let parts_data = self
1270            .data_liveness(&registry, &params, Table::Parts, sessions::PARTS)
1271            .await?;
1272        Ok(TableSizes {
1273            sessions,
1274            messages,
1275            parts,
1276            other,
1277            sessions_data,
1278            messages_data,
1279            parts_data,
1280        })
1281    }
1282
1283    async fn data_liveness(
1284        &self,
1285        registry: &Arc<ObjectStoreRegistry>,
1286        params: &ObjectStoreParams,
1287        table: Table,
1288        table_name: &str,
1289    ) -> Result<DataLiveness> {
1290        let location = self.table_location(table_name).await?;
1291        let data_dir = format!("{}/data", location.trim_end_matches('/'));
1292        let on_disk = self.listed_size(registry, params, &data_dir).await?;
1293        let dataset = self.dataset(table).await?;
1294        let live = dataset
1295            .get_fragments()
1296            .iter()
1297            .try_fold(0u64, |total, fragment| {
1298                Some(total + fragment_bytes(fragment.metadata())?)
1299            });
1300        Ok(DataLiveness { on_disk, live })
1301    }
1302
1303    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
1304    async fn listed_size(
1305        &self,
1306        registry: &Arc<ObjectStoreRegistry>,
1307        params: &ObjectStoreParams,
1308        uri: &str,
1309    ) -> Result<u64> {
1310        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
1311            .await
1312            .with_context(|| format!("failed to open object store for {uri}"))?;
1313        let mut listing = store.list(Some(base));
1314        let mut total = 0u64;
1315        while let Some(meta) = listing.next().await {
1316            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
1317            total += meta.size;
1318        }
1319        Ok(total)
1320    }
1321    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
1322        match table {
1323            Table::Sessions => Ok(&self.datasets.sessions),
1324            Table::Messages => Ok(&self.datasets.messages),
1325            Table::Parts => self.parts_cached().await,
1326        }
1327    }
1328
1329    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
1330    /// `OnceCell`; once initialized, behaves identically to the other two.
1331    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
1332        self.datasets
1333            .parts
1334            .get_or_try_init(|| async {
1335                let dataset = open_or_create_via_ns(
1336                    &self.nm,
1337                    &self.nm_ident,
1338                    sessions::PARTS,
1339                    sessions::part_schema(),
1340                    &self.session,
1341                    &self.storage_options,
1342                )
1343                .await?;
1344                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
1345                    dataset,
1346                    last_refresh: Instant::now(),
1347                    refresh_after: self.parts_refresh_after,
1348                }))
1349            })
1350            .await
1351    }
1352    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
1353    where
1354        Fut: std::future::Future<Output = Result<T>>,
1355        Op: FnMut() -> Fut,
1356    {
1357        let mut attempt = 0u8;
1358        loop {
1359            attempt = attempt.saturating_add(1);
1360            match operation().await {
1361                Ok(value) => return Ok(value),
1362                Err(error) if attempt < self.retry.attempts => {
1363                    let backoff = self.backoff(attempt);
1364                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
1365                    // drops everything below the top-level message.
1366                    let error_chain = format!("{error:#}");
1367                    tracing::warn!(
1368                        label,
1369                        attempt,
1370                        ?backoff,
1371                        error = %error_chain,
1372                        "retrying Lance operation"
1373                    );
1374                    tokio::time::sleep(backoff).await;
1375                }
1376                Err(error) => {
1377                    let error_chain = format!("{error:#}");
1378                    tracing::warn!(
1379                        label,
1380                        attempt,
1381                        error = %error_chain,
1382                        "Lance operation exhausted retries"
1383                    );
1384                    // spec.md#protocol: surface OCC failures as a typed `conflict`
1385                    // rather than the generic `storage_unavailable` bucket. The
1386                    // chain root is a `lance::Error` (commit-conflict family) when
1387                    // pond's retry layer exhausted because the manifest could not
1388                    // be advanced; everything else (timeouts, IAM, disk) stays
1389                    // `storage_unavailable`.
1390                    if is_commit_conflict(&error) {
1391                        return Err(error.context(ConflictExhausted { attempts: attempt }));
1392                    }
1393                    return Err(error);
1394                }
1395            }
1396        }
1397    }
1398    fn backoff(&self, attempt: u8) -> Duration {
1399        let shift = u32::from(attempt.saturating_sub(1));
1400        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
1401        let base = self.retry.initial_backoff.saturating_mul(multiplier);
1402        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
1403        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
1404        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
1405        base.mul_f64(factor).min(self.retry.max_backoff)
1406    }
1407}
1408/// Compaction phase: plan + amplification veto + execute + `cleanup_old_versions`,
1409/// one retry block, separate from the indices phase so a lost Rewrite race
1410/// does not abort index work.
1411///
1412/// Vetoes Lance-planned tasks instead of pre-gating on pond fragment math:
1413/// Lance bins split at index-coverage boundaries, so pond predictions diverge
1414/// from what Lance actually rewrites (the old run-sum gate latched open and
1415/// rewrote a 665 MiB tail fragment every 5-min sync). Only whole planned
1416/// tasks are filtered, so OCC and conflict semantics are untouched.
1417///
1418/// spec.md#lance-index-maintenance mandates FRI on by default, but at
1419/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
1420/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
1421/// requires row_addrs but none were provided": `rewrite_files` skips
1422/// row_addrs when stable row ids are on, then the FRI builder demands
1423/// them. With stable_row_ids the remap step is already a no-op
1424/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
1425/// !defer_index_remap`), so running without FRI is correct - we only
1426/// lose the documented concurrency-with-index-build benefit. Flip to
1427/// `true` once upstream fixes the conflict.
1428async fn optimize_table_compact(
1429    dataset: &mut Dataset,
1430    table: Table,
1431    progress: Option<&OptimizeProgressFn>,
1432    policy: &MaintenancePolicy,
1433) -> Result<()> {
1434    let stats: Vec<FragmentStat> = dataset
1435        .get_fragments()
1436        .iter()
1437        .map(|fragment| fragment_stat(fragment.metadata()))
1438        .collect();
1439    let compaction = CompactionOptions {
1440        target_rows_per_fragment: derived_target_rows(&stats),
1441        max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
1442        defer_index_remap: false,
1443        ..CompactionOptions::default()
1444    };
1445
1446    let mut plan = plan_compaction(dataset, &compaction).await?;
1447    if policy.compaction_fragment_cap > 0 {
1448        plan.tasks.retain(|task| {
1449            let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
1450            let keep = keep_task(
1451                &task_stats,
1452                policy.compaction_fragment_cap,
1453                compaction.materialize_deletions_threshold,
1454            );
1455            if !keep {
1456                tracing::debug!(
1457                    target: "pond::perf",
1458                    table = table.as_str(),
1459                    fragments = task_stats.len(),
1460                    "compaction task vetoed: merge dominated by one large fragment",
1461                );
1462            }
1463            keep
1464        });
1465    }
1466    if plan.tasks.is_empty() {
1467        tracing::debug!(
1468            target: "pond::perf",
1469            table = table.as_str(),
1470            "compaction skipped: no task to run",
1471        );
1472    } else {
1473        emit(
1474            progress,
1475            OptimizeEvent::PhaseStart {
1476                table,
1477                phase: OptimizePhase::Compact,
1478                detail: None,
1479            },
1480        );
1481        let started = Instant::now();
1482        let mut completed = Vec::with_capacity(plan.tasks.len());
1483        for task in plan.compaction_tasks() {
1484            completed.push(task.execute(dataset).await?);
1485        }
1486        commit_compaction(
1487            dataset,
1488            completed,
1489            Arc::new(DatasetIndexRemapperOptions::default()),
1490            &compaction,
1491        )
1492        .await?;
1493        emit(
1494            progress,
1495            OptimizeEvent::PhaseDone {
1496                table,
1497                phase: OptimizePhase::Compact,
1498                elapsed_ms: started.elapsed().as_millis() as u64,
1499            },
1500        );
1501    }
1502
1503    // Safe GC only. delete_unverified=false keeps Lance's 7-day in-progress
1504    // guard, so this never races a concurrent writer (spec.md#concurrency); GC
1505    // runs outside OCC, so the guard is what makes it safe on any backend.
1506    emit(
1507        progress,
1508        OptimizeEvent::PhaseStart {
1509            table,
1510            phase: OptimizePhase::Cleanup,
1511            detail: None,
1512        },
1513    );
1514    let started = Instant::now();
1515    dataset
1516        .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
1517        .await
1518        .context("cleanup_old_versions failed during index optimize")?;
1519    emit(
1520        progress,
1521        OptimizeEvent::PhaseDone {
1522            table,
1523            phase: OptimizePhase::Cleanup,
1524            elapsed_ms: started.elapsed().as_millis() as u64,
1525        },
1526    );
1527
1528    Ok(())
1529}
1530
1531/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
1532/// for incremental families. Returns `true` if anything committed.
1533async fn optimize_table_indices(
1534    dataset: &mut Dataset,
1535    intents: &[IndexIntent],
1536    table: Table,
1537    progress: Option<&OptimizeProgressFn>,
1538) -> Result<bool> {
1539    let existing = dataset.load_indices().await?;
1540    let existing_names: std::collections::HashSet<String> =
1541        existing.iter().map(|index| index.name.clone()).collect();
1542
1543    let mut append_indices: Vec<String> = Vec::new();
1544    let mut did_work = false;
1545
1546    for intent in intents {
1547        let exists = existing_names.contains(intent.name);
1548
1549        if !exists {
1550            if !intent.trigger.should_create(dataset).await? {
1551                continue;
1552            }
1553            let params = intent.params.build(dataset).await?;
1554            let index_type = intent.params.index_type();
1555            tracing::info!(
1556                index = intent.name,
1557                column = intent.column,
1558                "creating Lance index (trigger fired)",
1559            );
1560            emit(
1561                progress,
1562                OptimizeEvent::PhaseStart {
1563                    table,
1564                    phase: OptimizePhase::IndexCreate,
1565                    detail: Some(intent.name.to_owned()),
1566                },
1567            );
1568            let started = Instant::now();
1569            dataset
1570                .create_index(
1571                    &[intent.column],
1572                    index_type,
1573                    Some(intent.name.to_owned()),
1574                    params.as_ref(),
1575                    false,
1576                )
1577                .await
1578                .with_context(|| format!("failed to create index {}", intent.name))?;
1579            emit(
1580                progress,
1581                OptimizeEvent::PhaseDone {
1582                    table,
1583                    phase: OptimizePhase::IndexCreate,
1584                    elapsed_ms: started.elapsed().as_millis() as u64,
1585                },
1586            );
1587            did_work = true;
1588            continue;
1589        }
1590
1591        let unindexed = dataset.unindexed_fragments(intent.name).await?;
1592        if unindexed.is_empty() {
1593            continue;
1594        }
1595        // Lag guard: let fragments accumulate behind the brute-force fallback
1596        // rather than firing a commit per tiny append. Threshold is operator-
1597        // tunable via `[maintenance].index_lag_threshold`.
1598        if unindexed.len() < index_lag_threshold() {
1599            continue;
1600        }
1601        match intent.params {
1602            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
1603                let params = intent.params.build(dataset).await?;
1604                let index_type = intent.params.index_type();
1605                tracing::debug!(
1606                    target: "pond::perf",
1607                    index = intent.name,
1608                    column = intent.column,
1609                    "rebuilding Lance BTree index",
1610                );
1611                emit(
1612                    progress,
1613                    OptimizeEvent::PhaseStart {
1614                        table,
1615                        phase: OptimizePhase::IndexRebuild,
1616                        detail: Some(intent.name.to_owned()),
1617                    },
1618                );
1619                let started = Instant::now();
1620                dataset
1621                    .create_index(
1622                        &[intent.column],
1623                        index_type,
1624                        Some(intent.name.to_owned()),
1625                        params.as_ref(),
1626                        true,
1627                    )
1628                    .await
1629                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1630                emit(
1631                    progress,
1632                    OptimizeEvent::PhaseDone {
1633                        table,
1634                        phase: OptimizePhase::IndexRebuild,
1635                        elapsed_ms: started.elapsed().as_millis() as u64,
1636                    },
1637                );
1638                did_work = true;
1639            }
1640            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
1641            | IndexParamsKind::InvertedFtsNgram { .. }
1642            | IndexParamsKind::IvfPqCosine { .. } => {
1643                append_indices.push(intent.name.to_owned());
1644            }
1645            IndexParamsKind::Scalar(_) => {
1646                let params = intent.params.build(dataset).await?;
1647                emit(
1648                    progress,
1649                    OptimizeEvent::PhaseStart {
1650                        table,
1651                        phase: OptimizePhase::IndexRebuild,
1652                        detail: Some(intent.name.to_owned()),
1653                    },
1654                );
1655                let started = Instant::now();
1656                dataset
1657                    .create_index(
1658                        &[intent.column],
1659                        intent.params.index_type(),
1660                        Some(intent.name.to_owned()),
1661                        params.as_ref(),
1662                        true,
1663                    )
1664                    .await
1665                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1666                emit(
1667                    progress,
1668                    OptimizeEvent::PhaseDone {
1669                        table,
1670                        phase: OptimizePhase::IndexRebuild,
1671                        elapsed_ms: started.elapsed().as_millis() as u64,
1672                    },
1673                );
1674                did_work = true;
1675            }
1676        }
1677    }
1678
1679    if !append_indices.is_empty() {
1680        let to_append = append_indices.clone();
1681        emit(
1682            progress,
1683            OptimizeEvent::PhaseStart {
1684                table,
1685                phase: OptimizePhase::IndexAppend,
1686                detail: Some(append_indices.join(", ")),
1687            },
1688        );
1689        let started = Instant::now();
1690        dataset
1691            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
1692            .await
1693            .context("optimize_indices(append) failed during index optimize")?;
1694        emit(
1695            progress,
1696            OptimizeEvent::PhaseDone {
1697                table,
1698                phase: OptimizePhase::IndexAppend,
1699                elapsed_ms: started.elapsed().as_millis() as u64,
1700            },
1701        );
1702        tracing::debug!(
1703            target: "pond::perf",
1704            indices = ?append_indices,
1705            "appended trailing fragments into indices",
1706        );
1707        did_work = true;
1708    }
1709
1710    Ok(did_work)
1711}
1712
1713async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
1714    if !intent.trigger.should_create(dataset).await? {
1715        return Ok(());
1716    }
1717    let params = intent.params.build(dataset).await?;
1718    dataset
1719        .create_index(
1720            &[intent.column],
1721            intent.params.index_type(),
1722            Some(intent.name.to_owned()),
1723            params.as_ref(),
1724            true,
1725        )
1726        .await
1727        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
1728    Ok(())
1729}
1730
1731async fn index_status(
1732    table: Table,
1733    dataset: &Dataset,
1734    intents: &[IndexIntent],
1735) -> Result<Vec<IndexStatus>> {
1736    let existing = dataset.load_indices().await?;
1737    let existing_names: std::collections::HashSet<String> =
1738        existing.iter().map(|index| index.name.clone()).collect();
1739    let total_fragments = dataset.get_fragments().len();
1740    let total_rows = dataset.count_rows(None).await?;
1741    let mut statuses = Vec::with_capacity(intents.len());
1742    for intent in intents {
1743        let exists = existing_names.contains(intent.name);
1744        if !exists {
1745            statuses.push(IndexStatus {
1746                table,
1747                intent_name: intent.name.to_owned(),
1748                fragments_covered: 0,
1749                unindexed_fragments: total_fragments,
1750                unindexed_rows: total_rows,
1751                exists,
1752            });
1753            continue;
1754        }
1755        let unindexed = dataset
1756            .unindexed_fragments(intent.name)
1757            .await
1758            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1759        let unindexed_fragments = unindexed.len();
1760        let unindexed_rows = unindexed
1761            .iter()
1762            .map(|fragment| fragment.num_rows().unwrap_or(0))
1763            .sum();
1764        statuses.push(IndexStatus {
1765            table,
1766            intent_name: intent.name.to_owned(),
1767            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
1768            unindexed_fragments,
1769            unindexed_rows,
1770            exists,
1771        });
1772    }
1773    Ok(statuses)
1774}
1775
1776/// Open the table at `table_name` via the namespace; create + initialize on
1777/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
1778/// expectation so a stale data dir surfaces early.
1779///
1780/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
1781/// the builder re-wraps an already-`Namespace`-wrapped error
1782/// (lance/src/dataset/builder.rs:142), so going through it would force a
1783/// chain-walk to classify `TableNotFound`. The direct probe stays at one
1784/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
1785/// namespace external-manifest commits) is not wired here; v1 ships
1786/// Directory v2 only.
1787async fn open_or_create_via_ns(
1788    nm: &Arc<dyn LanceNamespace>,
1789    nm_ident: &NamespaceIdent,
1790    table_name: &str,
1791    schema: lance::deps::arrow_schema::SchemaRef,
1792    session: &Arc<Session>,
1793    storage_options: &HashMap<String, String>,
1794) -> Result<Dataset> {
1795    let table_id = nm_ident.as_table_id(table_name);
1796
1797    let request = DescribeTableRequest {
1798        id: Some(table_id.clone()),
1799        ..Default::default()
1800    };
1801    match nm.describe_table(request).await {
1802        Ok(response) => {
1803            let location = response.location.with_context(|| {
1804                format!("namespace returned no location for table {table_name}")
1805            })?;
1806            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
1807            if !storage_options.is_empty() {
1808                builder = builder.with_storage_options(storage_options.clone());
1809            }
1810            let dataset = builder
1811                .load()
1812                .await
1813                .with_context(|| format!("failed to open table {table_name}"))?;
1814            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
1815            return Ok(dataset);
1816        }
1817        Err(error) => match &error {
1818            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
1819                // fall through to create
1820            }
1821            _ => {
1822                return Err(anyhow::Error::from(error))
1823                    .with_context(|| format!("failed to describe table {table_name}"));
1824            }
1825        },
1826    }
1827
1828    // Create path: pond seeds an empty dataset with the canonical schema so
1829    // every subsequent open lands on a real Lance dataset, not a phantom.
1830    let mut write_params = sessions::write_params_for_create();
1831    write_params.session = Some(session.clone());
1832    write_params.mode = WriteMode::Create;
1833    if !storage_options.is_empty() {
1834        write_params.store_params = Some(ObjectStoreParams {
1835            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
1836                storage_options.clone(),
1837            ))),
1838            ..Default::default()
1839        });
1840    }
1841    let reader = sessions::empty_reader(schema)?;
1842    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
1843        .await
1844        .with_context(|| format!("failed to create table {table_name}"))
1845}
1846
1847// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
1848// before the underlying `NamespaceError`; walk the whole `.source()` chain
1849// rather than only matching the outer variant.
1850fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
1851    if !matches!(error, lance::Error::Namespace { .. }) {
1852        return false;
1853    }
1854    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
1855        link.source()
1856    })
1857    .filter_map(|link| link.downcast_ref::<NamespaceError>())
1858    .any(|inner| inner.code() == code)
1859}
1860
1861fn scanner_with_prefilter(
1862    dataset: &Dataset,
1863    predicate: Option<&Predicate>,
1864) -> Result<lance::dataset::scanner::Scanner> {
1865    let mut scanner = dataset.scan();
1866    scanner.prefilter(true);
1867    if let Some(predicate) = predicate {
1868        let filter = predicate.to_lance();
1869        if !filter.is_empty() {
1870            scanner.filter(&filter)?;
1871        }
1872    }
1873    Ok(scanner)
1874}
1875fn ensure_schema_matches(
1876    dataset: &Dataset,
1877    expected: &lance::deps::arrow_schema::Schema,
1878    table_name: &str,
1879) -> Result<()> {
1880    use lance::deps::arrow_schema::DataType;
1881    use std::collections::BTreeSet;
1882    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
1883    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
1884    let expected_names: BTreeSet<&str> = expected
1885        .fields()
1886        .iter()
1887        .map(|f| f.name().as_str())
1888        .collect();
1889    if actual_names != expected_names {
1890        anyhow::bail!(
1891            "table {table_name} has columns {actual_names:?} but this pond build expects \
1892             {expected_names:?} - the on-disk store predates a schema change; delete the \
1893             data directory and re-run `pond ingest`",
1894        );
1895    }
1896    // Catch a vector-dim change (configured `[embeddings].dim` differs from
1897    // the on-disk vector column width) early with a friendly message. Lance
1898    // would otherwise reject the next write with an opaque schema-mismatch
1899    // error inside the `merge_update` path.
1900    for actual_field in actual.fields() {
1901        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
1902            continue;
1903        };
1904        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
1905            (actual_field.data_type(), expected_field.data_type())
1906            && actual_dim != expected_dim
1907        {
1908            tracing::warn!(
1909                table = table_name,
1910                column = actual_field.name(),
1911                actual_dim,
1912                expected_dim,
1913                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
1914            );
1915        }
1916    }
1917    Ok(())
1918}
1919/// Object-store defaults injected for any non-local pond location. Each key
1920/// is only set when neither the user-provided key nor its env-var-form alias
1921/// is already present, so explicit overrides in `[storage]` always win.
1922/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
1923/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
1924/// signature is wasted work the server does not validate.
1925fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
1926    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
1927        if aliases
1928            .iter()
1929            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
1930        {
1931            return;
1932        }
1933        options.insert(aliases[0].to_owned(), value.to_owned());
1934    }
1935    set_default(options, &["pool_idle_timeout"], "300 seconds");
1936    set_default(options, &["connect_timeout"], "10 seconds");
1937    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
1938        .iter()
1939        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
1940    if has_custom_endpoint {
1941        set_default(
1942            options,
1943            &["aws_unsigned_payload", "unsigned_payload"],
1944            "true",
1945        );
1946    }
1947}
1948
1949fn quoted_string(value: &str) -> String {
1950    format!("'{}'", value.replace('\'', "''"))
1951}
1952fn like_contains(value: &str) -> String {
1953    let escaped = value
1954        .replace('\\', "\\\\")
1955        .replace('%', "\\%")
1956        .replace('_', "\\_")
1957        .replace('\'', "''");
1958    format!("'%{escaped}%'")
1959}
1960
1961#[cfg(test)]
1962mod tests {
1963    use super::*;
1964    use tempfile::TempDir;
1965
1966    fn stat(bytes: u64) -> FragmentStat {
1967        FragmentStat {
1968            bytes: Some(bytes),
1969            rows: bytes / 1_000,
1970            deleted_rows: 0,
1971        }
1972    }
1973
1974    #[test]
1975    fn compaction_veto_blocks_absorb_keeps_peers() {
1976        // One 665 MiB tail fragment + tiny appends -> vetoed.
1977        let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
1978        assert!(!keep_task(&absorb, 64, 0.1));
1979        // Peer-sized merge halves fragment count -> kept.
1980        let peers = [stat(300_000_000), stat(300_000_000)];
1981        assert!(keep_task(&peers, 64, 0.1));
1982        // Remainder reaches largest / COMPACTION_ABSORB_FACTOR -> kept.
1983        let tiered = [stat(400_000), stat(60_000), stat(40_000)];
1984        assert!(keep_task(&tiered, 64, 0.1));
1985    }
1986
1987    #[test]
1988    fn compaction_veto_passes_deletions_and_cap() {
1989        let mut deleting = stat(665_000_000);
1990        deleting.deleted_rows = deleting.rows / 5;
1991        assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
1992
1993        let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
1994            .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
1995            .collect();
1996        assert!(keep_task(&wide, 64, 0.1));
1997    }
1998
1999    #[test]
2000    fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
2001        let mut unknown = stat(665_000_000);
2002        unknown.bytes = None;
2003        // Rows comparison: 665k vs 3k -> still vetoed.
2004        assert!(!keep_task(
2005            &[unknown, stat(1_000_000), stat(2_000_000)],
2006            64,
2007            0.1
2008        ));
2009    }
2010
2011    #[test]
2012    fn derived_target_rows_tracks_row_size_and_clamps() {
2013        // ~1.3 KiB rows -> ~200k-row target.
2014        let parts_like = [FragmentStat {
2015            bytes: Some(665_000_000),
2016            rows: 511_000,
2017            deleted_rows: 0,
2018        }];
2019        let target = derived_target_rows(&parts_like);
2020        assert!((150_000..300_000).contains(&target), "{target}");
2021        // No usable sizes -> Lance default.
2022        let unknown = [FragmentStat {
2023            bytes: None,
2024            rows: 511_000,
2025            deleted_rows: 0,
2026        }];
2027        assert_eq!(
2028            derived_target_rows(&unknown),
2029            MAX_TARGET_ROWS_PER_FRAGMENT as usize
2030        );
2031        // Tiny rows clamp at the ceiling, huge rows at the floor.
2032        let tiny = [FragmentStat {
2033            bytes: Some(1_000_000),
2034            rows: 100_000,
2035            deleted_rows: 0,
2036        }];
2037        assert_eq!(
2038            derived_target_rows(&tiny),
2039            MAX_TARGET_ROWS_PER_FRAGMENT as usize
2040        );
2041        let huge = [FragmentStat {
2042            bytes: Some(1_000_000_000),
2043            rows: 100,
2044            deleted_rows: 0,
2045        }];
2046        assert_eq!(
2047            derived_target_rows(&huge),
2048            MIN_TARGET_ROWS_PER_FRAGMENT as usize
2049        );
2050    }
2051
2052    #[test]
2053    fn namespace_error_code_walks_wrapped_chain() {
2054        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
2055            message: "missing".into(),
2056        }));
2057        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
2058
2059        let wrapped = lance::Error::namespace_source(Box::new(direct));
2060        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
2061
2062        let other_code =
2063            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
2064                message: "nope".into(),
2065            }));
2066        assert!(!is_namespace_error_code(
2067            &other_code,
2068            ErrorCode::TableNotFound
2069        ));
2070
2071        let not_namespace = lance::Error::internal("unrelated");
2072        assert!(!is_namespace_error_code(
2073            &not_namespace,
2074            ErrorCode::TableNotFound
2075        ));
2076    }
2077
2078    /// Round-trip: opening a fresh data dir through `lance-namespace`
2079    /// produces all three tables, and `Handle::scan` returns an empty batch
2080    /// for each (no spurious schema mismatch, no namespace error).
2081    #[tokio::test]
2082    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
2083        let temp = TempDir::new()?;
2084        let url = Url::from_directory_path(temp.path())
2085            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
2086        let handle = Handle::open(&url).await?;
2087        // Each table has its own PK column; project the canonical one so the
2088        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
2089        let cases: [(Table, &[&str]); 3] = [
2090            (Table::Sessions, &["id"]),
2091            (Table::Messages, &["id"]),
2092            (Table::Parts, &["id"]),
2093        ];
2094        for (table, projection) in cases {
2095            let scanner = handle
2096                .scan(table, ScanOpts::project_only(projection))
2097                .await?;
2098            let batch = scanner.try_into_batch().await?;
2099            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
2100        }
2101        Ok(())
2102    }
2103}