Skip to main content

pond/
substrate.rs

1//! The storage substrate (spec.md#substrate): pond's one seam to Lance,
2//! generic over consumers.
3
4use crate::{
5    RetryPolicy,
6    config::{self, CredsSet},
7    handlers::NamespaceIdent,
8    sessions::{self},
9};
10use anyhow::{Context, Result, anyhow, bail};
11use lance::Dataset;
12use lance::dataset::builder::DatasetBuilder;
13use lance::dataset::index::DatasetIndexRemapperOptions;
14use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
15use lance::dataset::write::merge_insert::SourceDedupeBehavior;
16use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
17use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
18use lance::index::DatasetIndexExt;
19use lance::index::DatasetIndexInternalExt;
20use lance::index::vector::VectorIndexParams;
21use lance::session::Session;
22use lance_index::IndexType;
23use lance_index::optimize::OptimizeOptions;
24use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
25use lance_io::object_store::{
26    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, uri_to_url,
27};
28use lance_linalg::distance::MetricType;
29use lance_namespace::LanceNamespace;
30use lance_namespace::error::{ErrorCode, NamespaceError};
31use lance_namespace::models::DescribeTableRequest;
32use lance_namespace_impls::ConnectBuilder;
33use std::{
34    collections::{BTreeMap, HashMap},
35    sync::Arc,
36    time::{Duration, Instant},
37};
38use tokio::sync::{Mutex, OnceCell};
39use tokio_stream::StreamExt;
40use url::Url;
41/// Embedded-row count at which pond builds the IVF_PQ vector index on
42/// `messages.vector` (spec.md#search). Below it, vector search runs a
43/// brute-force flat scan - exact and fast at small and medium scale, and
44/// IVF_PQ cannot train well on fewer vectors anyway.
45pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
46
47/// Default minimum unindexed-fragment count required before a per-intent
48/// append/rebuild step is admitted into `optimize_table_indices`. Lower
49/// values make each commit smaller and more frequent (bad on remote
50/// stores); higher values let fragments accumulate behind the brute-force
51/// fallback. 4 is the floor of the documented 4-8 band.
52pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
53
54static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
55
56/// Seed the process-wide index-lag threshold from `[maintenance].index_lag_threshold`.
57/// First call wins (mirrors `embed::init_model_id` / `sessions::init_embedding_dim`).
58pub fn init_index_lag_threshold(value: usize) {
59    INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
60}
61
62pub fn index_lag_threshold() -> usize {
63    INDEX_LAG_THRESHOLD_RUNTIME
64        .get()
65        .copied()
66        .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
67}
68
69// ---------------------------------------------------------------------------
70// Storage addresses (spec.md#storage-url-grammar)
71// ---------------------------------------------------------------------------
72
73/// A parsed pond storage address. The fat-URL grammar
74/// (`s3+https://host/bucket/prefix`) folds the endpoint into the address so
75/// it can never desync from the bucket (the litestream out-of-band-endpoint
76/// failure class); parsing splits it back into the URL Lance opens plus the
77/// `object_store` options the endpoint implies.
78#[derive(Debug, Clone, PartialEq)]
79pub struct StorageUrl {
80    /// The address as written, canonicalized (scheme/host lowercased by
81    /// `url`, default port stripped, recognized query params removed). Scope
82    /// matching (spec.md#creds-scope-match) and display use this form.
83    canonical: Url,
84    /// The URL handed to Lance.
85    lance: Url,
86    /// Options implied by the scheme - lowest precedence in assembly.
87    scheme_options: Vec<(&'static str, String)>,
88    /// Recognized `?key=value` params - highest precedence.
89    query_options: Vec<(&'static str, String)>,
90    /// `?creds=<name>`: explicit set binding, beats scope matching.
91    creds_pointer: Option<String>,
92    /// Endpoint pieces for the `s3+` schemes. The final endpoint URL depends
93    /// on the resolved `virtual_hosted_style_request` value (object_store
94    /// wants the bucket inside the endpoint host under virtual-hosted
95    /// addressing), so it is assembled at resolve time, not parse time.
96    endpoint: Option<S3Endpoint>,
97}
98
99#[derive(Debug, Clone, PartialEq)]
100struct S3Endpoint {
101    scheme: &'static str,
102    /// host[:port]
103    authority: String,
104    bucket: String,
105}
106
107/// Query params pond recognizes (and strips before the URL reaches Lance).
108/// Anything else is a hard error - a typoed param must not silently reach
109/// the object store as part of the path.
110const RECOGNIZED_QUERY_PARAMS: [&str; 3] = ["creds", "region", "virtual_hosted_style_request"];
111
112impl StorageUrl {
113    /// Parse a storage address (spec.md#storage-url-grammar): bare/`~` paths,
114    /// `file://`, `s3://`, `s3+https://` / `s3+http://`, `gs://`, `az://`,
115    /// and the test-only `memory://` / `shared-memory://`.
116    pub fn parse(input: &str) -> Result<Self> {
117        let trimmed = input.trim();
118        if trimmed.is_empty() {
119            bail!("storage path is empty");
120        }
121        // Bare paths, `~/...`, and `file://` go through Lance's own
122        // `uri_to_url` so pond accepts exactly what Lance accepts.
123        if !trimmed.contains("://") || trimmed.starts_with("file://") {
124            let url =
125                uri_to_url(trimmed).with_context(|| format!("invalid storage path {trimmed:?}"))?;
126            // Bare paths percent-encode `?` (a legal filename character), so
127            // only an explicit `file://...?x=y` parses a query here. No local
128            // scheme takes one; reject like the remote schemes do instead of
129            // silently carrying it into the path Lance opens.
130            if url.query().is_some() {
131                bail!("storage URL {trimmed:?} carries query params; local URLs take none");
132            }
133            return Ok(Self::plain(url));
134        }
135        let url =
136            Url::parse(trimmed).with_context(|| format!("invalid storage URL {trimmed:?}"))?;
137        // RFC 3986 deprecates userinfo; argv/history/ps/logs leak it. Never.
138        if !url.username().is_empty() || url.password().is_some() {
139            bail!(
140                "storage URL {trimmed:?} embeds credentials; put them in [creds.*] (or POND_CREDS_*) instead"
141            );
142        }
143        match url.scheme() {
144            "memory" | "shared-memory" => {
145                if url.query().is_some() {
146                    bail!(
147                        "storage URL {trimmed:?} carries query params; {}:// URLs take none",
148                        url.scheme(),
149                    );
150                }
151                Ok(Self::plain(url))
152            }
153            "s3" | "gs" => {
154                let (canonical, query_options, creds_pointer) = strip_query(url)?;
155                let mut lance = canonical.clone();
156                lance.set_query(None);
157                Ok(Self {
158                    canonical,
159                    lance,
160                    scheme_options: Vec::new(),
161                    query_options,
162                    creds_pointer,
163                    endpoint: None,
164                })
165            }
166            "s3+https" | "s3+http" => {
167                let (mut canonical, query_options, creds_pointer) = strip_query(url)?;
168                let tls = canonical.scheme() == "s3+https";
169                // `url` treats non-special schemes' default ports as
170                // explicit; strip them so scope matching can't split on
171                // `:443` vs nothing.
172                if canonical.port() == Some(if tls { 443 } else { 80 }) {
173                    let _ = canonical.set_port(None);
174                }
175                let host = canonical
176                    .host_str()
177                    .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no endpoint host"))?;
178                let endpoint_authority = match canonical.port() {
179                    Some(port) => format!("{host}:{port}"),
180                    None => host.to_owned(),
181                };
182                let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
183                let bucket = segments.next().unwrap_or_default().to_owned();
184                let prefix = segments.next().unwrap_or_default().to_owned();
185                if bucket.is_empty() {
186                    bail!(
187                        "storage URL {trimmed:?} is missing the bucket: the form is {}://host/bucket/prefix",
188                        canonical.scheme(),
189                    );
190                }
191                let lance = Url::parse(&format!("s3://{bucket}/{prefix}")).with_context(|| {
192                    format!("storage URL {trimmed:?}: bucket/prefix do not form a valid s3:// URL")
193                })?;
194                let scheme = if tls { "https" } else { "http" };
195                // Virtual-hosted is the Hetzner / R2 / B2 default, but an IP
196                // host can't carry a bucket subdomain (`bucket.127.0.0.1`
197                // does not resolve), so MinIO-style IP endpoints flip to
198                // path-style. Override either way via the creds-set field or
199                // `?virtual_hosted_style_request=`. Note: `url` keeps IPv4
200                // hosts as `Host::Domain` on non-special schemes, hence the
201                // explicit IpAddr parse; IPv6 brackets still need the Host
202                // match.
203                let virtual_hosted = host.parse::<std::net::IpAddr>().is_err()
204                    && !matches!(canonical.host(), Some(url::Host::Ipv6(_)));
205                let scheme_options = vec![
206                    ("allow_http", (!tls).to_string()),
207                    ("virtual_hosted_style_request", virtual_hosted.to_string()),
208                    // S3-compatible stores ignore the SigV4 region, so a
209                    // deterministic default (the DuckDB / litestream
210                    // convention) beats Lance's env-chain fallback, where a
211                    // stray AWS_REGION changes behavior. Real AWS (`s3://`,
212                    // no endpoint) auto-resolves the bucket region inside
213                    // Lance instead. Override: creds-set field or ?region=.
214                    ("region", "us-east-1".to_owned()),
215                ];
216                Ok(Self {
217                    canonical,
218                    lance,
219                    scheme_options,
220                    query_options,
221                    creds_pointer,
222                    endpoint: Some(S3Endpoint {
223                        scheme,
224                        authority: endpoint_authority,
225                        bucket,
226                    }),
227                })
228            }
229            "az" => {
230                let (canonical, query_options, creds_pointer) = strip_query(url)?;
231                let account = canonical
232                    .host_str()
233                    .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no account: the form is az://account/container/prefix"))?
234                    .to_owned();
235                let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
236                let container = segments.next().unwrap_or_default();
237                if container.is_empty() {
238                    bail!(
239                        "storage URL {trimmed:?} is missing the container: the form is az://account/container/prefix"
240                    );
241                }
242                let prefix = segments.next().unwrap_or_default();
243                let lance = Url::parse(&format!("az://{container}/{prefix}"))
244                    .with_context(|| format!("storage URL {trimmed:?}: container/prefix do not form a valid az:// URL"))?;
245                Ok(Self {
246                    canonical,
247                    lance,
248                    scheme_options: vec![("account_name", account)],
249                    query_options,
250                    creds_pointer,
251                    endpoint: None,
252                })
253            }
254            other => bail!(
255                "storage URL scheme {other:?} not recognized; use a local path, s3://, s3+https://, s3+http://, gs://, or az://"
256            ),
257        }
258    }
259
260    /// A scheme with no creds machinery: canonical == lance, no options.
261    fn plain(url: Url) -> Self {
262        Self {
263            canonical: url.clone(),
264            lance: url,
265            scheme_options: Vec::new(),
266            query_options: Vec::new(),
267            creds_pointer: None,
268            endpoint: None,
269        }
270    }
271
272    /// The URL Lance opens (endpoint folded into options, not the URL).
273    pub fn lance_url(&self) -> &Url {
274        &self.lance
275    }
276
277    /// The canonical as-written address - what scope matching compares
278    /// against and what display surfaces show (it carries the endpoint).
279    pub fn canonical(&self) -> &Url {
280        &self.canonical
281    }
282
283    pub fn is_local(&self) -> bool {
284        config::is_local(&self.canonical)
285    }
286
287    /// Render for human output: local URLs as plain paths, remote verbatim.
288    pub fn display(&self) -> String {
289        config::display(&self.canonical)
290    }
291
292    /// Whether this scheme authenticates at all. `file`, `memory`, and
293    /// `shared-memory` take no credentials; resolution skips them entirely.
294    fn takes_credentials(&self) -> bool {
295        !matches!(
296            self.canonical.scheme(),
297            "file" | "file+uring" | "memory" | "shared-memory"
298        )
299    }
300
301    /// Resolve this address against the configured creds sets
302    /// (spec.md#creds-scope-match): `?creds=` pointer > longest scoped
303    /// prefix match > the scope-less catch-all > none (object_store's
304    /// ambient SDK chain). Option assembly, later wins: scheme-derived ->
305    /// matched set (non-secret fields + `extra`, then materialized secrets)
306    /// -> URL query params.
307    pub fn resolve(&self, creds: &BTreeMap<String, CredsSet>) -> Result<ResolvedStorage> {
308        if !self.takes_credentials() {
309            return Ok(ResolvedStorage {
310                storage: self.clone(),
311                options: HashMap::new(),
312                binding: CredsBinding::NotApplicable,
313            });
314        }
315        let matched: Option<(&String, &CredsSet, BindVia)> = match &self.creds_pointer {
316            Some(name) => {
317                let set = creds.get(name).ok_or_else(|| {
318                    anyhow!(
319                        "URL names ?creds={name} but no [creds.{name}] set is configured; define it or drop the pointer"
320                    )
321                })?;
322                Some((name, set, BindVia::Pointer))
323            }
324            None => {
325                let mut best: Option<(&String, &CredsSet, String)> = None;
326                for (name, set) in creds {
327                    let Some(scope) = &set.scope else { continue };
328                    let scope_url = parse_scope(scope).with_context(|| {
329                        format!("[creds.{name}] scope {scope:?} is not a valid URL prefix")
330                    })?;
331                    if scope_matches(&scope_url, &self.canonical)
332                        && best
333                            .as_ref()
334                            .is_none_or(|(_, _, len)| scope_url.as_str().len() > len.len())
335                    {
336                        best = Some((name, set, scope_url.as_str().to_owned()));
337                    }
338                }
339                match best {
340                    Some((name, set, _)) => Some((name, set, BindVia::Scope)),
341                    None => creds
342                        .iter()
343                        .find(|(_, set)| set.scope.is_none())
344                        .map(|(name, set)| (name, set, BindVia::CatchAll)),
345                }
346            }
347        };
348        let mut options: HashMap<String, String> = self
349            .scheme_options
350            .iter()
351            .map(|(key, value)| ((*key).to_owned(), value.clone()))
352            .collect();
353        let binding = match matched {
354            None => CredsBinding::Ambient,
355            Some((name, set, via)) => {
356                if let Some(region) = &set.region {
357                    options.insert("region".to_owned(), region.clone());
358                }
359                if let Some(virtual_hosted) = set.virtual_hosted_style_request {
360                    options.insert(
361                        "virtual_hosted_style_request".to_owned(),
362                        virtual_hosted.to_string(),
363                    );
364                }
365                for (key, value) in &set.extra {
366                    options.insert(key.clone(), value.clone());
367                }
368                if let Some(value) = materialize_secret(
369                    name,
370                    "access_key_id",
371                    set.access_key_id.as_deref(),
372                    set.access_key_id_file.as_deref(),
373                    None,
374                )? {
375                    options.insert("access_key_id".to_owned(), value);
376                }
377                if let Some(value) = materialize_secret(
378                    name,
379                    "secret_access_key",
380                    set.secret_access_key.as_deref(),
381                    set.secret_access_key_file.as_deref(),
382                    set.secret_access_key_command.as_deref(),
383                )? {
384                    options.insert("secret_access_key".to_owned(), value);
385                }
386                CredsBinding::Set {
387                    name: name.clone(),
388                    via,
389                }
390            }
391        };
392        for (key, value) in &self.query_options {
393            options.insert((*key).to_owned(), value.clone());
394        }
395        // The endpoint is assembled last: under virtual-hosted addressing
396        // object_store expects the bucket inside the endpoint host, so the
397        // URL depends on the final virtual_hosted_style_request value. An
398        // explicit endpoint in `extra` wins (the escape hatch).
399        if let Some(endpoint) = &self.endpoint
400            && !options.keys().any(|key| {
401                key.eq_ignore_ascii_case("endpoint") || key.eq_ignore_ascii_case("aws_endpoint")
402            })
403        {
404            let virtual_hosted = options
405                .get("virtual_hosted_style_request")
406                .is_some_and(|value| value == "true");
407            let url = if virtual_hosted {
408                format!(
409                    "{}://{}.{}",
410                    endpoint.scheme, endpoint.bucket, endpoint.authority
411                )
412            } else {
413                format!("{}://{}", endpoint.scheme, endpoint.authority)
414            };
415            options.insert("endpoint".to_owned(), url);
416        }
417        Ok(ResolvedStorage {
418            storage: self.clone(),
419            options,
420            binding,
421        })
422    }
423}
424
425/// (canonical URL, recognized query options, `?creds=` pointer).
426type StrippedQuery = (Url, Vec<(&'static str, String)>, Option<String>);
427
428/// Pull recognized query params off the URL; reject unrecognized ones.
429fn strip_query(url: Url) -> Result<StrippedQuery> {
430    let mut query_options = Vec::new();
431    let mut creds_pointer = None;
432    for (key, value) in url.query_pairs() {
433        match RECOGNIZED_QUERY_PARAMS
434            .iter()
435            .find(|known| **known == key.as_ref())
436        {
437            Some(&"creds") => creds_pointer = Some(value.into_owned()),
438            Some(known) => query_options.push((*known, value.into_owned())),
439            None => bail!(
440                "storage URL query param {key:?} not recognized (known: {})",
441                RECOGNIZED_QUERY_PARAMS.join(", "),
442            ),
443        }
444    }
445    let mut canonical = url;
446    canonical.set_query(None);
447    Ok((canonical, query_options, creds_pointer))
448}
449
450/// Parse a `[creds.*] scope` URL prefix into the same canonical form
451/// `StorageUrl::parse` produces, so comparison is exact.
452pub(crate) fn parse_scope(scope: &str) -> Result<Url> {
453    let mut url = Url::parse(scope.trim())?;
454    if !url.username().is_empty() || url.password().is_some() {
455        bail!("scope embeds credentials");
456    }
457    if url.query().is_some() {
458        bail!("scope carries query params; scopes are plain URL prefixes");
459    }
460    match (url.scheme(), url.port()) {
461        ("s3+https", Some(443)) | ("s3+http", Some(80)) => {
462            let _ = url.set_port(None);
463        }
464        _ => {}
465    }
466    Ok(url)
467}
468
469/// spec.md#creds-scope-match: scheme, host, and port equal; path matches at
470/// `/` segment boundaries only (`.../pond` does not match `.../pond-2`). No
471/// cross-scheme normalization: a `s3+https://host/bucket/` scope does not
472/// match a `s3://bucket/` URL.
473fn scope_matches(scope: &Url, address: &Url) -> bool {
474    if scope.scheme() != address.scheme()
475        || scope.host_str() != address.host_str()
476        || scope.port() != address.port()
477    {
478        return false;
479    }
480    let scope_path = scope.path().trim_end_matches('/');
481    let address_path = address.path().trim_end_matches('/');
482    address_path == scope_path
483        || address_path
484            .strip_prefix(scope_path)
485            .is_some_and(|rest| rest.starts_with('/'))
486}
487
488/// How a creds set got bound to a URL - surfaced in binding lines so a wrong
489/// match is visible before any auth error.
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491pub enum BindVia {
492    /// `?creds=<name>` pointer on the URL.
493    Pointer,
494    /// Longest-prefix `scope` match.
495    Scope,
496    /// The scope-less catch-all set.
497    CatchAll,
498}
499
500#[derive(Debug, Clone, PartialEq)]
501pub enum CredsBinding {
502    /// A `[creds.<name>]` set bound to this URL.
503    Set { name: String, via: BindVia },
504    /// No set matched; object_store's ambient SDK chain applies (AWS_* env,
505    /// shared credentials file, IMDS/container metadata). A documented
506    /// invariant, not an accident - instance profiles and OIDC work with
507    /// zero pond config.
508    Ambient,
509    /// Local / in-memory scheme; credentials don't apply.
510    NotApplicable,
511}
512
513impl CredsBinding {
514    /// One-line human rendering for binding lines and `pond config show`.
515    pub fn describe(&self) -> String {
516        match self {
517            Self::Set { name, via } => {
518                let via = match via {
519                    BindVia::Pointer => "?creds",
520                    BindVia::Scope => "scope match",
521                    BindVia::CatchAll => "catch-all",
522                };
523                format!("creds {name} ({via})")
524            }
525            Self::Ambient => "ambient chain".to_owned(),
526            Self::NotApplicable => "local (no credentials)".to_owned(),
527        }
528    }
529}
530
531/// A storage address with its options assembled and secrets materialized -
532/// everything `Store::open_with_options` needs, plus the binding for
533/// display.
534#[derive(Debug, Clone)]
535pub struct ResolvedStorage {
536    storage: StorageUrl,
537    pub options: HashMap<String, String>,
538    pub binding: CredsBinding,
539}
540
541impl ResolvedStorage {
542    pub fn lance_url(&self) -> &Url {
543        self.storage.lance_url()
544    }
545
546    pub fn display(&self) -> String {
547        self.storage.display()
548    }
549}
550
551/// Names of defined creds sets that bound to none of this invocation's URLs
552/// (spec.md#creds-scope-match: misbinding must never be silent). Empty when
553/// the invocation touched no credential-taking URL - a local-only command
554/// must not nag about sets kept for remote work.
555pub fn unmatched_creds_sets<'c>(
556    resolved: &[&ResolvedStorage],
557    creds: &'c BTreeMap<String, CredsSet>,
558) -> Vec<&'c str> {
559    if resolved
560        .iter()
561        .all(|entry| matches!(entry.binding, CredsBinding::NotApplicable))
562    {
563        return Vec::new();
564    }
565    creds
566        .keys()
567        .filter(|name| {
568            !resolved.iter().any(|entry| {
569                matches!(&entry.binding, CredsBinding::Set { name: bound, .. } if bound == *name)
570            })
571        })
572        .map(String::as_str)
573        .collect()
574}
575
576/// Materialize one logical secret from its inline / `_file` / `_command`
577/// variant (validation guarantees at most one is set).
578fn materialize_secret(
579    set: &str,
580    field: &str,
581    inline: Option<&str>,
582    file: Option<&std::path::Path>,
583    command: Option<&str>,
584) -> Result<Option<String>> {
585    if let Some(value) = inline {
586        return Ok(Some(value.to_owned()));
587    }
588    if let Some(path) = file {
589        let text = std::fs::read_to_string(path).with_context(|| {
590            format!(
591                "[creds.{set}] {field}_file: failed to read {}",
592                path.display()
593            )
594        })?;
595        return Ok(Some(strip_one_newline(text)));
596    }
597    if let Some(command) = command {
598        return Ok(Some(run_secret_command(set, field, command)?));
599    }
600    Ok(None)
601}
602
603/// Run a `*_command` secret source. Output is cached per command text per
604/// process, so N URLs resolving through one set cost one subprocess.
605fn run_secret_command(set: &str, field: &str, command: &str) -> Result<String> {
606    static CACHE: std::sync::OnceLock<std::sync::Mutex<HashMap<String, String>>> =
607        std::sync::OnceLock::new();
608    let cache = CACHE.get_or_init(Default::default);
609    if let Some(hit) = cache
610        .lock()
611        .unwrap_or_else(std::sync::PoisonError::into_inner)
612        .get(command)
613    {
614        return Ok(hit.clone());
615    }
616    let output = std::process::Command::new("sh")
617        .arg("-c")
618        .arg(command)
619        .output()
620        .with_context(|| format!("[creds.{set}] {field}_command failed to spawn: {command}"))?;
621    if !output.status.success() {
622        bail!(
623            "[creds.{set}] {field}_command exited {}: {command}\n{}",
624            output.status,
625            String::from_utf8_lossy(&output.stderr).trim_end(),
626        );
627    }
628    let value = strip_one_newline(
629        String::from_utf8(output.stdout)
630            .with_context(|| format!("[creds.{set}] {field}_command output is not UTF-8"))?,
631    );
632    cache
633        .lock()
634        .unwrap_or_else(std::sync::PoisonError::into_inner)
635        .insert(command.to_owned(), value.clone());
636    Ok(value)
637}
638
639/// Strip exactly one trailing newline (the one `echo` / `op read` append);
640/// anything beyond that is part of the secret.
641fn strip_one_newline(mut text: String) -> String {
642    if text.ends_with('\n') {
643        text.pop();
644        if text.ends_with('\r') {
645            text.pop();
646        }
647    }
648    text
649}
650
651/// `pond storage check` failure classes, each with its own exit code at the
652/// CLI so cron and CI can branch on them. Display carries only the
653/// fix-naming lead; the underlying error is exposed separately through
654/// [`CheckFailure::concise_cause`] so surfaces stay one readable line
655/// instead of trailing the upstream chain (Lance flattens its inner errors
656/// into each level's Display, so the raw chain prints the same failure
657/// several times over).
658#[derive(Debug, thiserror::Error)]
659pub enum CheckFailure {
660    #[error(
661        "authentication failed and no creds set matched this URL; define [creds.*] (or POND_CREDS_*), or provide ambient AWS_* credentials"
662    )]
663    NoCreds { source: anyhow::Error },
664    #[error("authentication failed using creds set {set:?}; check its keys and scope")]
665    Auth { set: String, source: anyhow::Error },
666    #[error(
667        "backend does not enforce conditional writes (If-None-Match); concurrent pond writers would corrupt each other - {detail}"
668    )]
669    OccUnsupported { detail: String },
670    #[error("storage probe failed")]
671    Io { source: anyhow::Error },
672}
673
674impl CheckFailure {
675    /// The root cause, condensed to one operator-readable line: the deepest
676    /// error in the chain with upstream noise stripped - Lance's bug-report
677    /// boilerplate, internal `<WORKSPACE>` source locations, and the repeated
678    /// wrapper text that follows them. `None` for `OccUnsupported`, whose
679    /// `detail` is already curated into its Display.
680    pub fn concise_cause(&self) -> Option<String> {
681        let source = match self {
682            Self::NoCreds { source } | Self::Auth { source, .. } | Self::Io { source } => source,
683            Self::OccUnsupported { .. } => return None,
684        };
685        Some(condense_error_chain(source))
686    }
687}
688
689/// One-line root cause for a probe error. Takes the deepest chain entry
690/// (each outer Lance/object_store layer re-prints its inner error, so the
691/// deepest is the least redundant), cuts at the first internal source
692/// location (everything after it is upstream re-printing), strips Lance's
693/// bug-report boilerplate, and middle-truncates - the tail is kept because
694/// wrapped transport errors put the root (DNS, connect) at the end.
695fn condense_error_chain(error: &anyhow::Error) -> String {
696    let mut text = error
697        .chain()
698        .last()
699        .map(ToString::to_string)
700        .unwrap_or_else(|| format!("{error:#}"));
701    if let Some(pos) = text.find(", <WORKSPACE>") {
702        text.truncate(pos);
703    }
704    text = text.replace(
705        "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. ",
706        "",
707    );
708    let line = text.split_whitespace().collect::<Vec<_>>().join(" ");
709    const HEAD: usize = 120;
710    const TAIL: usize = 120;
711    let chars: Vec<char> = line.chars().collect();
712    if chars.len() > HEAD + TAIL + 5 {
713        let head: String = chars[..HEAD].iter().collect();
714        let tail: String = chars[chars.len() - TAIL..].iter().collect();
715        format!("{head} ... {tail}")
716    } else {
717        line
718    }
719}
720
721/// Probe a resolved storage destination end-to-end (spec.md#substrate): a
722/// conditional `PutMode::Create` pair proving the `If-None-Match` -> 412 OCC
723/// primitive Lance's commit handler relies on, then read-back and delete of
724/// the synthetic key.
725pub async fn storage_check(resolved: &ResolvedStorage) -> std::result::Result<(), CheckFailure> {
726    use object_store::{Error as OsError, ObjectStoreExt, PutMode, PutOptions, PutPayload};
727
728    let classify =
729        |error: OsError, step: &str| classify_check_error(error, &resolved.binding, step);
730
731    let probe_uri = format!(
732        "{}/_config-check/{}",
733        resolved.lance_url().as_str().trim_end_matches('/'),
734        uuid::Uuid::now_v7(),
735    );
736    let params = ObjectStoreParams {
737        storage_options_accessor: (!resolved.options.is_empty()).then(|| {
738            Arc::new(StorageOptionsAccessor::with_static_options(
739                resolved.options.clone(),
740            ))
741        }),
742        ..Default::default()
743    };
744    let registry = Arc::new(ObjectStoreRegistry::default());
745    let (store, path) = ObjectStore::from_uri_and_params(registry, &probe_uri, &params)
746        .await
747        .map_err(|error| CheckFailure::Io {
748            source: anyhow!(error).context(format!("failed to open object store for {probe_uri}")),
749        })?;
750
751    let body: &[u8] = b"pond storage check";
752    let create = PutOptions::from(PutMode::Create);
753    store
754        .inner
755        .put_opts(&path, PutPayload::from_static(body), create.clone())
756        .await
757        .map_err(|error| classify(error, "initial conditional put"))?;
758    // The probe key exists from here on: run the remaining steps, then
759    // best-effort delete it whatever they returned - a failed probe must
760    // not leave litter behind.
761    let outcome = async {
762        // The second create MUST lose: this is the `If-None-Match: *` -> 412
763        // primitive multi-writer OCC stands on. A backend that lets it
764        // through (or rejects the header) silently overwrites concurrent
765        // commits.
766        match store
767            .inner
768            .put_opts(&path, PutPayload::from_static(body), create)
769            .await
770        {
771            Err(OsError::AlreadyExists { .. }) => {}
772            Ok(_) => {
773                return Err(CheckFailure::OccUnsupported {
774                    detail: "a second create over an existing key succeeded".to_owned(),
775                });
776            }
777            Err(OsError::NotImplemented { .. }) => {
778                return Err(CheckFailure::OccUnsupported {
779                    detail: "the backend rejects conditional puts as unimplemented".to_owned(),
780                });
781            }
782            Err(error) => return Err(classify(error, "conditional-put probe")),
783        }
784        let read_back = store
785            .inner
786            .get(&path)
787            .await
788            .map_err(|error| classify(error, "read-back"))?
789            .bytes()
790            .await
791            .map_err(|error| classify(error, "read-back body"))?;
792        if read_back.as_ref() != body {
793            return Err(CheckFailure::Io {
794                source: anyhow!("read-back returned different bytes than written"),
795            });
796        }
797        Ok(())
798    }
799    .await;
800    let cleanup = store.inner.delete(&path).await;
801    outcome?;
802    cleanup.map_err(|error| classify(error, "cleanup delete"))?;
803    Ok(())
804}
805
806/// Map an `object_store` error onto the check's failure classes: an auth
807/// error is attributed to the bound creds set when one matched, and to the
808/// (empty) ambient chain when none did; everything else is I/O.
809fn classify_check_error(
810    error: object_store::Error,
811    binding: &CredsBinding,
812    step: &str,
813) -> CheckFailure {
814    use object_store::Error as OsError;
815    // Lance erases a missing-credentials failure into a `Generic` error - the
816    // typed `Unauthenticated` never surfaces for an empty provider chain - so
817    // also match the AWS SDK's rendered `CredentialsNotLoaded` signal. Both
818    // are auth-class: attributed to the bound set, else the empty ambient chain.
819    let auth_class = matches!(
820        error,
821        OsError::Unauthenticated { .. } | OsError::PermissionDenied { .. }
822    ) || {
823        let rendered = error.to_string();
824        rendered.contains("CredentialsNotLoaded")
825            || rendered.contains("no providers in chain provided credentials")
826    };
827    match (auth_class, binding) {
828        (true, CredsBinding::Set { name, .. }) => CheckFailure::Auth {
829            set: name.clone(),
830            source: anyhow!(error).context(step.to_owned()),
831        },
832        (true, _) => CheckFailure::NoCreds {
833            source: anyhow!(error).context(step.to_owned()),
834        },
835        (false, _) => CheckFailure::Io {
836            source: anyhow!(error).context(step.to_owned()),
837        },
838    }
839}
840
841/// Per-task fragment-count backstop: tasks this wide always run, bounding
842/// manifest growth even when the amplification veto would skip them. As
843/// policy cap, 0 disables the veto (tests).
844pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
845
846/// Fragments are sized by bytes, not Lance's 1M-row default: kilobyte-average
847/// rows make a row target tolerate multi-GiB fragments that compaction
848/// re-rewrites wholesale to absorb tiny appends (~190 GiB/day of churn).
849pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
850
851const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
852/// Ceiling = Lance's own default.
853const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
854
855/// Keep a task only when the merged-in remainder is >= largest/this:
856/// size-tiered amortization, O(log n) lifetime rewrites per row.
857pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
858
859/// Default manifest-retention window for the safe cleanup pass. Matches
860/// LanceDB's recommended OSS-operator practice (lancedb docs: performance.mdx,
861/// tables/update.mdx). With `delete_unverified=false`, Lance's 7-day
862/// in-progress guard still protects unverified files regardless of this value
863/// (`UNVERIFIED_THRESHOLD_DAYS` in lance/dataset/cleanup.rs).
864pub fn default_cleanup_older_than() -> chrono::Duration {
865    chrono::Duration::days(1)
866}
867
868/// Resolved per-call inputs to the storage-maintenance pass. Built from
869/// `[maintenance]` (and any per-invocation CLI override) at the entry point;
870/// threaded down to `optimize_table_compact` so the substrate never re-reads
871/// `Config` itself.
872#[derive(Debug, Clone, Copy)]
873pub struct MaintenancePolicy {
874    /// See [`DEFAULT_COMPACTION_FRAGMENT_CAP`]; `0` disables the veto.
875    pub compaction_fragment_cap: usize,
876    /// Manifest-retention window handed to `cleanup_old_versions`.
877    pub cleanup_older_than: chrono::Duration,
878}
879
880impl MaintenancePolicy {
881    /// Veto off: run every task Lance plans (the optimize tests assume this).
882    pub fn always_compact() -> Self {
883        Self {
884            compaction_fragment_cap: 0,
885            cleanup_older_than: default_cleanup_older_than(),
886        }
887    }
888}
889
890struct FragmentStat {
891    /// `None` when the manifest lacks any file's size.
892    bytes: Option<u64>,
893    rows: u64,
894    deleted_rows: u64,
895}
896
897/// Data-file bytes of one fragment; `None` (poisoning) when any size is
898/// missing from the manifest.
899fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
900    fragment.files.iter().try_fold(0u64, |total, file| {
901        Some(total + file.file_size_bytes.get()?.get())
902    })
903}
904
905fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
906    FragmentStat {
907        bytes: fragment_bytes(fragment),
908        rows: fragment.physical_rows.unwrap_or(0) as u64,
909        deleted_rows: fragment
910            .deletion_file
911            .as_ref()
912            .and_then(|deletions| deletions.num_deleted_rows)
913            .unwrap_or(0) as u64,
914    }
915}
916
917/// Rows per [`TARGET_FRAGMENT_BYTES`] at the table's average row size.
918fn derived_target_rows(stats: &[FragmentStat]) -> usize {
919    let (mut bytes, mut rows) = (0u64, 0u64);
920    for stat in stats {
921        if let Some(fragment_bytes) = stat.bytes
922            && stat.rows > 0
923        {
924            bytes += fragment_bytes;
925            rows += stat.rows;
926        }
927    }
928    if bytes == 0 || rows == 0 {
929        return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
930    }
931    let avg_row_bytes = (bytes / rows).max(1);
932    (TARGET_FRAGMENT_BYTES / avg_row_bytes)
933        .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
934}
935
936/// Amplification veto: skip tasks that mostly rewrite one big fragment to
937/// absorb fresh appends. Deletion-materialization tasks always pass (vetoing
938/// them would leave tombstones unreclaimed forever); compared in bytes when
939/// every file size is known, rows otherwise.
940fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
941    if stats.iter().any(|stat| {
942        stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
943    }) {
944        return true;
945    }
946    if stats.len() >= cap {
947        return true;
948    }
949    let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
950        stats.iter().filter_map(|stat| stat.bytes).collect()
951    } else {
952        stats.iter().map(|stat| stat.rows).collect()
953    };
954    let total: u64 = weights.iter().sum();
955    let largest = weights.iter().copied().max().unwrap_or(0);
956    (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
957}
958
959/// Declarative description of one index pond keeps on a table. Created when
960/// its trigger fires; folded forward by `pond index optimize`.
961#[derive(Debug, Clone)]
962pub struct IndexIntent {
963    /// Stable on-disk name. Must match across runs so existence checks
964    /// resolve.
965    pub name: &'static str,
966    /// Column the index covers.
967    pub column: &'static str,
968    /// Condition evaluated against the live dataset before each cycle.
969    pub trigger: IndexTrigger,
970    /// How the params are built at create time. Some intents have static
971    /// params (FTS, scalars); IVF_PQ needs the row count to size partitions.
972    pub params: IndexParamsKind,
973}
974
975/// When an [`IndexIntent`] should exist on disk.
976#[derive(Debug, Clone)]
977pub enum IndexTrigger {
978    /// Build whenever the table has any rows. Used for FTS and scalar
979    /// indices: there is no training cost worth delaying.
980    OnAnyRows,
981    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
982    /// IVF_PQ vector index, which trains poorly on too few vectors.
983    OnNonNullCount {
984        column: &'static str,
985        threshold: usize,
986    },
987}
988
989/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
990/// right `IndexParams` at create time.
991#[derive(Debug, Clone)]
992pub enum IndexParamsKind {
993    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
994    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
995    Scalar(BuiltinIndexType),
996    /// `InvertedIndexParams` with a character `ngram` tokenizer in the
997    /// `[min, max]` range and stemming / stop-words off
998    /// (spec.md#search-language-neutral-index).
999    InvertedFtsNgram { min: u32, max: u32 },
1000    /// `VectorIndexParams::ivf_pq` with cosine metric (e5 vectors are
1001    /// L2-normalized). `sub_vectors = embedding_dim / 8` and `num_bits = 8`
1002    /// are pond's conventions; `max_iters` caps kmeans. Partitions follow
1003    /// LanceDB's documented `num_rows // 4096` guidance, floored at one.
1004    IvfPqCosine {
1005        sub_vectors: usize,
1006        num_bits: u8,
1007        max_iters: usize,
1008    },
1009}
1010
1011impl IndexTrigger {
1012    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
1013        match self {
1014            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
1015            Self::OnNonNullCount { column, threshold } => {
1016                let count = dataset
1017                    .count_rows(Some(format!("{column} IS NOT NULL")))
1018                    .await?;
1019                Ok(count >= *threshold)
1020            }
1021        }
1022    }
1023}
1024
1025impl IndexParamsKind {
1026    fn index_type(&self) -> IndexType {
1027        match self {
1028            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
1029            Self::Scalar(_) => IndexType::BTree,
1030            Self::InvertedFtsNgram { .. } => IndexType::Inverted,
1031            Self::IvfPqCosine { .. } => IndexType::Vector,
1032        }
1033    }
1034
1035    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
1036        match self {
1037            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
1038            Self::InvertedFtsNgram { min, max } => Ok(Box::new(
1039                InvertedIndexParams::default()
1040                    .base_tokenizer("ngram".to_owned())
1041                    .ngram_min_length(*min)
1042                    .ngram_max_length(*max)
1043                    .stem(false)
1044                    .remove_stop_words(false),
1045            )),
1046            Self::IvfPqCosine {
1047                sub_vectors,
1048                num_bits,
1049                max_iters,
1050            } => {
1051                let count = dataset
1052                    .count_rows(Some("vector IS NOT NULL".to_owned()))
1053                    .await?;
1054                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
1055                Ok(Box::new(VectorIndexParams::ivf_pq(
1056                    partitions,
1057                    *num_bits,
1058                    *sub_vectors,
1059                    MetricType::Cosine,
1060                    *max_iters,
1061                )))
1062            }
1063        }
1064    }
1065}
1066
1067#[derive(Debug, Clone, PartialEq, Eq)]
1068pub struct IndexStatus {
1069    pub table: Table,
1070    pub intent_name: String,
1071    pub fragments_covered: usize,
1072    pub unindexed_fragments: usize,
1073    pub unindexed_rows: usize,
1074    pub exists: bool,
1075}
1076
1077/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
1078/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
1079/// downcasts to this type to classify the outcome as `conflict` rather than
1080/// the generic `storage_unavailable`.
1081#[derive(Debug, Clone, Copy)]
1082pub struct ConflictExhausted {
1083    pub attempts: u8,
1084}
1085
1086impl std::fmt::Display for ConflictExhausted {
1087    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1088        write!(
1089            formatter,
1090            "commit conflict exhausted after {} attempt(s)",
1091            self.attempts
1092        )
1093    }
1094}
1095
1096impl std::error::Error for ConflictExhausted {}
1097
1098/// Per-phase result for one table's pass through `Handle::optimize_table`.
1099/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
1100/// compaction phase get independent retry budgets and independent commits,
1101/// so a hot writer that starves the Rewrite cannot abort the index Update.
1102#[derive(Debug)]
1103pub enum PhaseOutcome {
1104    /// Phase attempted and committed work.
1105    Ok,
1106    /// Phase attempted; no work was needed.
1107    Noop,
1108    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
1109    /// can rerun later once the hot writer quiesces).
1110    SkippedConflict,
1111    /// Phase failed with a non-conflict error.
1112    Failed(anyhow::Error),
1113    /// Phase not requested by the caller (e.g. compaction skipped under
1114    /// `Store::build_indices_only`).
1115    NotAttempted,
1116}
1117
1118impl PhaseOutcome {
1119    pub fn is_failed(&self) -> bool {
1120        matches!(self, Self::Failed(_))
1121    }
1122}
1123
1124/// What `Handle::optimize_table` did for one table.
1125#[derive(Debug)]
1126pub struct TableOptimizeOutcome {
1127    pub table: Table,
1128    pub indices: PhaseOutcome,
1129    pub compaction: PhaseOutcome,
1130}
1131
1132/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
1133/// progress callback to render a live spinner; library callers pass `None`.
1134#[derive(Debug, Clone)]
1135pub enum OptimizeEvent {
1136    PhaseStart {
1137        table: Table,
1138        phase: OptimizePhase,
1139        detail: Option<String>,
1140    },
1141    PhaseDone {
1142        table: Table,
1143        phase: OptimizePhase,
1144        elapsed_ms: u64,
1145    },
1146}
1147
1148#[derive(Debug, Clone, Copy)]
1149pub enum OptimizePhase {
1150    Compact,
1151    Cleanup,
1152    IndexCreate,
1153    IndexRebuild,
1154    IndexAppend,
1155}
1156
1157impl OptimizePhase {
1158    pub fn label(self) -> &'static str {
1159        match self {
1160            Self::Compact => "compact",
1161            Self::Cleanup => "cleanup",
1162            Self::IndexCreate => "index-create",
1163            Self::IndexRebuild => "index-rebuild",
1164            Self::IndexAppend => "index-append",
1165        }
1166    }
1167}
1168
1169pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
1170
1171fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
1172    if let Some(callback) = progress {
1173        callback(event);
1174    }
1175}
1176
1177/// True when the chain root is one of Lance's commit-conflict variants
1178/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
1179/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
1180pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
1181    error.downcast_ref::<lance::Error>().is_some_and(|err| {
1182        matches!(
1183            err,
1184            lance::Error::CommitConflict { .. }
1185                | lance::Error::RetryableCommitConflict { .. }
1186                | lance::Error::TooMuchWriteContention { .. }
1187        )
1188    })
1189}
1190
1191/// True when `retry_lance` exhausted retries against an OCC conflict and
1192/// attached `ConflictExhausted` to the chain head.
1193fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
1194    error.chain().any(|cause| cause.is::<ConflictExhausted>())
1195}
1196
1197/// On-disk byte totals for the three session datasets, plus everything else
1198/// under the data-dir root. Sized by listing through Lance's object-store
1199/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
1200#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1201pub struct TableSizes {
1202    pub sessions: u64,
1203    pub messages: u64,
1204    pub parts: u64,
1205    pub other: u64,
1206    pub sessions_data: DataLiveness,
1207    pub messages_data: DataLiveness,
1208    pub parts_data: DataLiveness,
1209}
1210
1211/// `data/` bytes on disk vs bytes the latest manifest references; the gap is
1212/// superseded versions awaiting the cleanup retention window.
1213#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1214pub struct DataLiveness {
1215    pub on_disk: u64,
1216    /// `None` when the manifest lacks any referenced file's size.
1217    pub live: Option<u64>,
1218}
1219
1220impl DataLiveness {
1221    pub fn dead(&self) -> Option<u64> {
1222        self.live.map(|live| self.on_disk.saturating_sub(live))
1223    }
1224}
1225
1226#[derive(Debug, Clone, PartialEq, Eq)]
1227pub enum ScalarValue {
1228    String(String),
1229    Int32(i32),
1230    Raw(String),
1231}
1232impl From<&str> for ScalarValue {
1233    fn from(value: &str) -> Self {
1234        Self::String(value.to_owned())
1235    }
1236}
1237impl From<String> for ScalarValue {
1238    fn from(value: String) -> Self {
1239        Self::String(value)
1240    }
1241}
1242impl From<i32> for ScalarValue {
1243    fn from(value: i32) -> Self {
1244        Self::Int32(value)
1245    }
1246}
1247#[derive(Debug, Clone, PartialEq, Eq)]
1248pub enum Predicate {
1249    Eq(&'static str, ScalarValue),
1250    Ne(&'static str, ScalarValue),
1251    IsNull(&'static str),
1252    IsNotNull(&'static str),
1253    In(&'static str, Vec<ScalarValue>),
1254    LikeContains(&'static str, String),
1255    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
1256    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
1257    /// so the filter is a full-scan-with-predicate - acceptable for
1258    /// human-driven `--project re:...` queries, not for hot paths.
1259    Regex(&'static str, String),
1260    Gte(&'static str, ScalarValue),
1261    Lte(&'static str, ScalarValue),
1262    And(Vec<Predicate>),
1263    Or(Vec<Predicate>),
1264    Not(Box<Predicate>),
1265}
1266impl Predicate {
1267    pub fn to_lance(&self) -> String {
1268        match self {
1269            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
1270            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
1271            Self::IsNull(column) => format!("{column} IS NULL"),
1272            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
1273            Self::In(column, values) => {
1274                let values = values
1275                    .iter()
1276                    .map(ScalarValue::to_lance)
1277                    .collect::<Vec<_>>()
1278                    .join(", ");
1279                format!("{column} IN ({values})")
1280            }
1281            Self::LikeContains(column, value) => {
1282                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
1283            }
1284            Self::Regex(column, pattern) => {
1285                format!("regexp_like({column}, {})", quoted_string(pattern))
1286            }
1287            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
1288            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
1289            Self::And(predicates) => predicates
1290                .iter()
1291                .map(Self::to_lance)
1292                .filter(|predicate| !predicate.is_empty())
1293                .collect::<Vec<_>>()
1294                .join(" AND "),
1295            Self::Or(predicates) => {
1296                // Wrap in parens so the disjunction composes safely as a child
1297                // of an outer `And` (SQL `OR` binds looser than `AND`).
1298                let body = predicates
1299                    .iter()
1300                    .map(Self::to_lance)
1301                    .filter(|predicate| !predicate.is_empty())
1302                    .collect::<Vec<_>>()
1303                    .join(" OR ");
1304                if body.is_empty() {
1305                    String::new()
1306                } else {
1307                    format!("({body})")
1308                }
1309            }
1310            Self::Not(inner) => {
1311                let body = inner.to_lance();
1312                if body.is_empty() {
1313                    String::new()
1314                } else {
1315                    format!("NOT ({body})")
1316                }
1317            }
1318        }
1319    }
1320}
1321/// Read-side options for `Handle::scan`: optional prefilter predicate and
1322/// optional projection. Default = no filter, all columns.
1323#[derive(Default)]
1324pub struct ScanOpts<'a> {
1325    pub predicate: Option<&'a Predicate>,
1326    pub projection: Option<&'a [&'a str]>,
1327}
1328
1329impl<'a> ScanOpts<'a> {
1330    pub fn project_only(projection: &'a [&'a str]) -> Self {
1331        Self {
1332            predicate: None,
1333            projection: Some(projection),
1334        }
1335    }
1336    pub fn with_predicate_and_projection(
1337        predicate: &'a Predicate,
1338        projection: &'a [&'a str],
1339    ) -> Self {
1340        Self {
1341            predicate: Some(predicate),
1342            projection: Some(projection),
1343        }
1344    }
1345}
1346
1347impl ScalarValue {
1348    fn to_lance(&self) -> String {
1349        match self {
1350            Self::String(value) => quoted_string(value),
1351            Self::Int32(value) => value.to_string(),
1352            Self::Raw(value) => value.clone(),
1353        }
1354    }
1355}
1356/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
1357/// default (local FS gets a tighter cap; object stores stay near Lance's
1358/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
1359#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1360pub struct RuntimeCaps {
1361    pub index_cache_bytes: Option<usize>,
1362    pub metadata_cache_bytes: Option<usize>,
1363}
1364
1365impl RuntimeCaps {
1366    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
1367        Self {
1368            index_cache_bytes: config.index_cache_bytes,
1369            metadata_cache_bytes: config.metadata_cache_bytes,
1370        }
1371    }
1372}
1373
1374/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
1375/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
1376/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
1377const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
1378const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
1379/// Object-store defaults: latency to refill is per-page, so keep more in cache.
1380const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
1381const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
1382
1383fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
1384    let (index_default, metadata_default) = if config::is_local(location) {
1385        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
1386    } else {
1387        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
1388    };
1389    (
1390        caps.index_cache_bytes.unwrap_or(index_default),
1391        caps.metadata_cache_bytes.unwrap_or(metadata_default),
1392    )
1393}
1394
1395pub struct Handle {
1396    datasets: DatasetSet,
1397    retry: RetryPolicy,
1398    /// One `lance::Session` shared across all three datasets. Carries the
1399    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
1400    /// the underlying object_store / S3 client). Sharing the session means
1401    /// one cache pool covers all three tables and one S3 client serves all
1402    /// three datasets - load-bearing on object-store backends where a
1403    /// per-dataset client would mean 3x the connection pools and 3x the
1404    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
1405    #[allow(dead_code)]
1406    session: Arc<Session>,
1407    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
1408    /// future hosted pond swaps to "rest" without touching read/write paths
1409    /// (spec.md#lance-chokepoints-catalog).
1410    nm: Arc<dyn LanceNamespace>,
1411    /// Namespace identifier this handle binds to. v1 is always `root()`; the
1412    /// typed seam matches `resolve_namespace`'s return so multi-namespace
1413    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
1414    nm_ident: NamespaceIdent,
1415    /// Object-store options threaded through every `DatasetBuilder` and
1416    /// `Dataset::write` call so refresh / index-creation paths inherit the
1417    /// same credentials and region as the initial open. Empty on local-FS
1418    /// installs.
1419    storage_options: HashMap<String, String>,
1420    /// Data-dir URL the handle was opened against. `pond status` reads this
1421    /// to display where the bytes live and to decide whether to walk a local
1422    /// directory or issue a remote `LIST` for sizing.
1423    location: Url,
1424    /// Cached `parts.lance` open metadata, used the first time a caller asks
1425    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
1426    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
1427    parts_refresh_after: Duration,
1428}
1429
1430impl std::fmt::Debug for Handle {
1431    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1432        formatter
1433            .debug_struct("Handle")
1434            .field("datasets", &self.datasets)
1435            .field("retry", &self.retry)
1436            .field("nm_ident", &self.nm_ident)
1437            .field("storage_options", &self.storage_options)
1438            .field("location", &self.location)
1439            .finish()
1440    }
1441}
1442
1443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1444pub enum Table {
1445    Sessions,
1446    Messages,
1447    Parts,
1448}
1449impl Table {
1450    pub fn as_str(self) -> &'static str {
1451        self.label()
1452    }
1453
1454    fn label(self) -> &'static str {
1455        match self {
1456            Self::Sessions => "sessions",
1457            Self::Messages => "messages",
1458            Self::Parts => "parts",
1459        }
1460    }
1461}
1462#[derive(Debug)]
1463struct DatasetSet {
1464    sessions: Mutex<CachedDataset>,
1465    messages: Mutex<CachedDataset>,
1466    /// `parts.lance` opens lazily on the first read or write that needs it:
1467    /// any `pond_get` (every mode reads parts to build summaries), grouped
1468    /// search hydrating user-hit summaries, or ingest with Part events. A
1469    /// process that does none of those skips the file, saving its metadata
1470    /// pages and file handle at cold-open. The OnceCell makes init
1471    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
1472    /// to the other two.
1473    parts: OnceCell<Mutex<CachedDataset>>,
1474}
1475#[derive(Debug)]
1476struct CachedDataset {
1477    dataset: Dataset,
1478    last_refresh: Instant,
1479    refresh_after: Duration,
1480}
1481impl CachedDataset {
1482    async fn latest(&mut self) -> Result<Dataset> {
1483        if self.last_refresh.elapsed() >= self.refresh_after {
1484            self.dataset.checkout_latest().await?;
1485            self.last_refresh = Instant::now();
1486        }
1487        Ok(self.dataset.clone())
1488    }
1489    fn replace(&mut self, dataset: Dataset) {
1490        self.dataset = dataset;
1491        self.last_refresh = Instant::now();
1492    }
1493}
1494impl Handle {
1495    /// Open without storage options or explicit cache caps. Backend-aware
1496    /// defaults from `[runtime]` apply.
1497    pub async fn open(location: &Url) -> Result<Self> {
1498        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
1499    }
1500
1501    /// Open with object-store options handed through to Lance verbatim, plus
1502    /// the resolved `[runtime]` cache caps. Object-store keys are the
1503    /// `object_store` crate's standard config names; pond does not parse them.
1504    /// Opening datasets never performs index work; index lifecycle lives under
1505    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
1506    pub async fn open_with_options(
1507        location: &Url,
1508        mut storage_options: HashMap<String, String>,
1509        caps: RuntimeCaps,
1510    ) -> Result<Self> {
1511        if let Some(path) = config::local_path(location) {
1512            tokio::fs::create_dir_all(&path).await.with_context(|| {
1513                format!(
1514                    "failed to create data dir {}; fix the storage destination ([storage].path in config) or re-run `pond init`",
1515                    path.display()
1516                )
1517            })?;
1518        } else {
1519            apply_remote_storage_defaults(&mut storage_options);
1520        }
1521        // One Session shared across all three datasets so metadata/index
1522        // caches and the object_store registry (and thus any S3 client) are
1523        // pooled rather than duplicated three times. Caps are sized by the
1524        // `[runtime]` block; explicit values from `caps` win, otherwise the
1525        // local/remote backend default kicks in.
1526        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
1527        let session = Arc::new(Session::new(
1528            index_cache_bytes,
1529            metadata_cache_bytes,
1530            Arc::new(ObjectStoreRegistry::default()),
1531        ));
1532        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
1533        // The `root` property is whatever URL the Directory impl understands;
1534        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
1535        // URLs, so passing the scheme-qualified URL for local FS works the
1536        // same as the bare-path form. Trailing slash stripped for clean logs.
1537        let root = location.as_str().trim_end_matches('/').to_string();
1538        let mut connect = ConnectBuilder::new("dir")
1539            .property("root", root)
1540            .session(session.clone());
1541        // Object-store credentials/region/endpoint flow into the namespace
1542        // via the `storage.<key>` property convention (lance-namespace-impls
1543        // dir.rs from_properties: lines 423-436).
1544        for (key, value) in &storage_options {
1545            connect = connect.property(format!("storage.{key}"), value.clone());
1546        }
1547        let nm: Arc<dyn LanceNamespace> = connect
1548            .connect()
1549            .await
1550            .context("failed to connect lance Directory namespace")?;
1551        let nm_ident = NamespaceIdent::root();
1552        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
1553        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
1554        // essentially free and removes the stale-read window entirely. Object
1555        // stores have real per-call cost; `5s` caps manifest fetch overhead at
1556        // acceptable lag for human-driven queries.
1557        let refresh_after = if config::is_local(location) {
1558            Duration::ZERO
1559        } else {
1560            Duration::from_secs(5)
1561        };
1562        let handle = Self {
1563            datasets: DatasetSet {
1564                sessions: Mutex::new(CachedDataset {
1565                    dataset: open_or_create_via_ns(
1566                        &nm,
1567                        &nm_ident,
1568                        sessions::SESSIONS,
1569                        sessions::session_schema(),
1570                        &session,
1571                        &storage_options,
1572                    )
1573                    .await?,
1574                    last_refresh: Instant::now(),
1575                    refresh_after,
1576                }),
1577                messages: Mutex::new(CachedDataset {
1578                    dataset: open_or_create_via_ns(
1579                        &nm,
1580                        &nm_ident,
1581                        sessions::MESSAGES,
1582                        sessions::message_schema(),
1583                        &session,
1584                        &storage_options,
1585                    )
1586                    .await?,
1587                    last_refresh: Instant::now(),
1588                    refresh_after,
1589                }),
1590                parts: OnceCell::new(),
1591            },
1592            retry: RetryPolicy::default(),
1593            session,
1594            nm,
1595            nm_ident,
1596            storage_options,
1597            location: location.clone(),
1598            parts_refresh_after: refresh_after,
1599        };
1600        Ok(handle)
1601    }
1602
1603    pub fn location(&self) -> &Url {
1604        &self.location
1605    }
1606
1607    /// Read-only view of the `storage_options` the handle was opened with.
1608    /// `pond status` needs them to instantiate a raw `object_store` client
1609    /// that can `LIST` the remote bucket for sizing.
1610    pub fn storage_options(&self) -> &HashMap<String, String> {
1611        &self.storage_options
1612    }
1613
1614    /// Object-store URI for a `pond_sql_query` export artifact:
1615    /// `<location>/exports/<name>`. A sibling of the `*.lance` table dirs;
1616    /// the Directory namespace tracks tables in its `__manifest` table rather
1617    /// than by listing prefixes, so this prefix is never seen as a table
1618    /// (lance-namespace-impls dir/manifest.rs). Never `register_table`'d.
1619    fn export_uri(&self, name: &str) -> String {
1620        format!(
1621            "{}/exports/{name}",
1622            self.location.as_str().trim_end_matches('/')
1623        )
1624    }
1625
1626    /// `ObjectStoreParams` carrying the handle's `storage_options` so raw
1627    /// object-store opens (export I/O, `table_sizes` listing) inherit the same
1628    /// credentials/region as the dataset opens. Empty options -> no accessor.
1629    fn object_store_params(&self) -> ObjectStoreParams {
1630        ObjectStoreParams {
1631            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1632                Arc::new(StorageOptionsAccessor::with_static_options(
1633                    self.storage_options.clone(),
1634                ))
1635            }),
1636            ..Default::default()
1637        }
1638    }
1639
1640    /// Write a `pond_sql_query` export artifact, reusing the handle's
1641    /// storage_options so S3 installs inherit the same credentials.
1642    pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1643        let uri = self.export_uri(name);
1644        let registry = Arc::new(ObjectStoreRegistry::default());
1645        let (store, path) =
1646            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1647                .await
1648                .with_context(|| format!("failed to open object store for {uri}"))?;
1649        store
1650            .put(&path, bytes)
1651            .await
1652            .with_context(|| format!("failed to write export {uri}"))?;
1653        Ok(())
1654    }
1655
1656    /// Read a `pond_sql_query` export artifact back (for the
1657    /// `pond-sql-export://` MCP resource).
1658    pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1659        let uri = self.export_uri(name);
1660        let registry = Arc::new(ObjectStoreRegistry::default());
1661        let (store, path) =
1662            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1663                .await
1664                .with_context(|| format!("failed to open object store for {uri}"))?;
1665        let bytes = store
1666            .read_one_all(&path)
1667            .await
1668            .with_context(|| format!("failed to read export {uri}"))?;
1669        Ok(bytes.to_vec())
1670    }
1671
1672    /// Local filesystem path of an export artifact, when the data dir is
1673    /// `file://`. The stdio MCP client shares this filesystem, so it can read
1674    /// the file directly (e.g. duckdb/polars) instead of pulling base64 via
1675    /// `resources/read`. `None` on object-store installs.
1676    pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1677        if self.location.scheme() != "file" {
1678            return None;
1679        }
1680        let dir = self.location.to_file_path().ok()?;
1681        Some(dir.join("exports").join(name))
1682    }
1683
1684    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1685        Ok((
1686            self.count_rows(Table::Sessions).await?,
1687            self.count_rows(Table::Messages).await?,
1688            self.count_rows(Table::Parts).await?,
1689        ))
1690    }
1691
1692    /// Insert-only merge: append new rows, never overwrite a matched PK.
1693    /// Returns rows inserted. The fold lives separately under
1694    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
1695    pub(crate) async fn merge_insert(
1696        &self,
1697        table: Table,
1698        batch: RecordBatch,
1699        row_count: usize,
1700    ) -> Result<u64> {
1701        self.merge(
1702            table,
1703            batch,
1704            row_count,
1705            "merge_insert",
1706            WhenMatched::DoNothing,
1707            WhenNotMatched::InsertAll,
1708        )
1709        .await
1710    }
1711
1712    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
1713    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
1714    pub(crate) async fn merge_update(
1715        &self,
1716        table: Table,
1717        batch: RecordBatch,
1718        row_count: usize,
1719    ) -> Result<u64> {
1720        self.merge(
1721            table,
1722            batch,
1723            row_count,
1724            "merge_update",
1725            WhenMatched::UpdateAll,
1726            WhenNotMatched::DoNothing,
1727        )
1728        .await
1729    }
1730
1731    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
1732    /// Returns the number of rows affected (inserted or updated, whichever the
1733    /// behaviors produce).
1734    async fn merge(
1735        &self,
1736        table: Table,
1737        batch: RecordBatch,
1738        row_count: usize,
1739        op: &'static str,
1740        when_matched: WhenMatched,
1741        when_not_matched: WhenNotMatched,
1742    ) -> Result<u64> {
1743        if row_count == 0 {
1744            return Ok(0);
1745        }
1746        let started = Instant::now();
1747        let result = self
1748            .retry_lance(table.label(), || async {
1749                let mut cached = self.cached(table).await?.lock().await;
1750                let existing = cached.latest().await?;
1751                let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
1752                let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
1753                builder.when_matched(when_matched.clone());
1754                builder.when_not_matched(when_not_matched.clone());
1755                // pond presents each PK at most once per batch; FirstSeen keeps
1756                // the first occurrence rather than failing (Lance's default).
1757                builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1758                // Cleanup is operator-driven via `pond index optimize`; the
1759                // per-commit auto hook would add a LIST per write on remote
1760                // backends without changing the steady-state retention.
1761                builder.skip_auto_cleanup(true);
1762                let (dataset, stats) = builder
1763                    .try_build()?
1764                    .execute_reader(Box::new(reader))
1765                    .await?;
1766                cached.replace(dataset.as_ref().clone());
1767                Ok((
1768                    stats.num_inserted_rows + stats.num_updated_rows,
1769                    stats.num_skipped_duplicates,
1770                ))
1771            })
1772            .await;
1773        let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
1774        tracing::info!(
1775            target: "pond::perf",
1776            op,
1777            table = %table.label(),
1778            rows = row_count,
1779            elapsed_ms = started.elapsed().as_millis() as u64,
1780            skipped,
1781            "merge",
1782        );
1783        result.map(|(affected, _)| affected)
1784    }
1785
1786    /// Run the table-local maintenance cycle for the supplied index intents.
1787    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
1788    /// BTree combine bug; Bitmap, FTS, and IVF_PQ fold via append.
1789    ///
1790    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
1791    /// commit independently and use independent retry budgets, so a hot writer
1792    /// that starves compaction (Rewrite) does not abort the index build
1793    /// (Update) the operator actually asked for.
1794    pub async fn optimize_table(
1795        &self,
1796        table: Table,
1797        intents: &[IndexIntent],
1798        progress: Option<&OptimizeProgressFn>,
1799        policy: &MaintenancePolicy,
1800    ) -> TableOptimizeOutcome {
1801        let compaction = self
1802            .run_optimize_compact_phase(table, progress, policy)
1803            .await;
1804        let indices = self
1805            .run_optimize_indices_phase(table, intents, progress)
1806            .await;
1807        TableOptimizeOutcome {
1808            table,
1809            indices,
1810            compaction,
1811        }
1812    }
1813
1814    /// Run only the indices phase for one table. Used by `pond embed`'s tail
1815    /// to fold newly written vectors into the indices without paying the
1816    /// compaction retry budget while embed itself may still be writing.
1817    pub async fn optimize_table_indices_only(
1818        &self,
1819        table: Table,
1820        intents: &[IndexIntent],
1821        progress: Option<&OptimizeProgressFn>,
1822    ) -> PhaseOutcome {
1823        self.run_optimize_indices_phase(table, intents, progress)
1824            .await
1825    }
1826
1827    async fn run_optimize_indices_phase(
1828        &self,
1829        table: Table,
1830        intents: &[IndexIntent],
1831        progress: Option<&OptimizeProgressFn>,
1832    ) -> PhaseOutcome {
1833        if intents.is_empty() {
1834            return PhaseOutcome::Noop;
1835        }
1836        let result = self
1837            .retry_lance(table.label(), || async {
1838                let mut guard = self.cached(table).await?.lock().await;
1839                let mut dataset = guard.latest().await?;
1840                let did_work =
1841                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
1842                guard.replace(dataset);
1843                Ok::<_, anyhow::Error>(did_work)
1844            })
1845            .await;
1846        match result {
1847            Ok(true) => PhaseOutcome::Ok,
1848            Ok(false) => PhaseOutcome::Noop,
1849            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1850            Err(error) => PhaseOutcome::Failed(error),
1851        }
1852    }
1853
1854    async fn run_optimize_compact_phase(
1855        &self,
1856        table: Table,
1857        progress: Option<&OptimizeProgressFn>,
1858        policy: &MaintenancePolicy,
1859    ) -> PhaseOutcome {
1860        let result = self
1861            .retry_lance(table.label(), || async {
1862                let mut guard = self.cached(table).await?.lock().await;
1863                let mut dataset = guard.latest().await?;
1864                optimize_table_compact(&mut dataset, table, progress, policy).await?;
1865                guard.replace(dataset);
1866                Ok::<_, anyhow::Error>(())
1867            })
1868            .await;
1869        match result {
1870            Ok(()) => PhaseOutcome::Ok,
1871            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1872            Err(error) => PhaseOutcome::Failed(error),
1873        }
1874    }
1875
1876    pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1877        self.retry_lance(table.label(), || async {
1878            let mut guard = self.cached(table).await?.lock().await;
1879            let mut dataset = guard.latest().await?;
1880            rebuild_index(&mut dataset, intent).await?;
1881            guard.replace(dataset);
1882            Ok(())
1883        })
1884        .await
1885    }
1886
1887    pub async fn index_status(
1888        &self,
1889        table: Table,
1890        intents: &[IndexIntent],
1891    ) -> Result<Vec<IndexStatus>> {
1892        let dataset = self.dataset(table).await?;
1893        index_status(table, &dataset, intents).await
1894    }
1895
1896    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1897        let mut cached = self.cached(table).await?.lock().await;
1898        cached.latest().await
1899    }
1900    /// Build a prefiltered `Scanner` for `table`. Composable read entry
1901    /// point for callers that need to layer extra builder calls
1902    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
1903    /// Routine scans should prefer `Handle::scan`.
1904    pub(crate) async fn scanner(
1905        &self,
1906        table: Table,
1907        predicate: Option<&Predicate>,
1908    ) -> Result<lance::dataset::scanner::Scanner> {
1909        let dataset = self.dataset(table).await?;
1910        scanner_with_prefilter(&dataset, predicate)
1911    }
1912    /// Single read entry point: prefilter via `predicate`, optionally
1913    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
1914    pub async fn scan(
1915        &self,
1916        table: Table,
1917        opts: ScanOpts<'_>,
1918    ) -> Result<lance::dataset::scanner::Scanner> {
1919        let mut scanner = self.scanner(table, opts.predicate).await?;
1920        if let Some(projection) = opts.projection {
1921            scanner.project(projection)?;
1922        }
1923        Ok(scanner)
1924    }
1925    pub(crate) async fn scan_batch(
1926        &self,
1927        table: Table,
1928        predicate: Option<&Predicate>,
1929        projection: &[&str],
1930    ) -> Result<RecordBatch> {
1931        let opts = ScanOpts {
1932            predicate,
1933            projection: (!projection.is_empty()).then_some(projection),
1934        };
1935        self.scan(table, opts)
1936            .await?
1937            .try_into_batch()
1938            .await
1939            .context("scan failed")
1940    }
1941    pub async fn count_rows(&self, table: Table) -> Result<usize> {
1942        self.dataset(table)
1943            .await?
1944            .count_rows(None)
1945            .await
1946            .map_err(Into::into)
1947    }
1948    /// Names of every index on `messages` - the vector-index tests read this.
1949    #[cfg(test)]
1950    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1951        let dataset = self.dataset(Table::Messages).await?;
1952        let indices = dataset.load_indices().await?;
1953        Ok(indices.iter().map(|index| index.name.clone()).collect())
1954    }
1955
1956    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
1957    /// a missing index reports the whole table. Powers `pond index status`.
1958    pub(crate) async fn unindexed_row_count(
1959        &self,
1960        table: Table,
1961        index_name: &str,
1962    ) -> Result<usize> {
1963        let dataset = self.dataset(table).await?;
1964        let fragments = dataset
1965            .unindexed_fragments(index_name)
1966            .await
1967            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1968        Ok(fragments
1969            .iter()
1970            .map(|fragment| fragment.num_rows().unwrap_or(0))
1971            .sum())
1972    }
1973
1974    /// Drop the named index. Used by the `pond embed --force` model-swap path
1975    /// to retire an IVF_PQ whose centroids belong to the old distance
1976    /// space, before the next write re-bootstraps it over the new model's
1977    /// vectors. Errors when the index does not exist; callers may swallow
1978    /// that.
1979    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1980        let mut guard = self.cached(table).await?.lock().await;
1981        let mut dataset = guard.latest().await?;
1982        dataset
1983            .drop_index(name)
1984            .await
1985            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1986        guard.replace(dataset);
1987        Ok(())
1988    }
1989
1990    /// Resolve each table's stored location through the namespace catalog
1991    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
1992    async fn table_location(&self, table_name: &str) -> Result<String> {
1993        let request = DescribeTableRequest {
1994            id: Some(self.nm_ident.as_table_id(table_name)),
1995            ..Default::default()
1996        };
1997        let response = self
1998            .nm
1999            .describe_table(request)
2000            .await
2001            .with_context(|| format!("failed to describe table {table_name}"))?;
2002        response
2003            .location
2004            .with_context(|| format!("namespace returned no location for table {table_name}"))
2005    }
2006
2007    /// Whether the store holds synced data yet. `open` eagerly creates empty
2008    /// `sessions`/`messages` datasets, but `parts` opens lazily on first write
2009    /// (see `open_with_options`), so its presence is the "has been synced"
2010    /// signal - letting read-only surfaces (`pond status`, `pond storage`)
2011    /// render an empty state instead of erroring on the first `parts` describe.
2012    pub async fn initialized(&self) -> Result<bool> {
2013        let request = DescribeTableRequest {
2014            id: Some(self.nm_ident.as_table_id(sessions::PARTS)),
2015            ..Default::default()
2016        };
2017        match self.nm.describe_table(request).await {
2018            Ok(_) => Ok(true),
2019            Err(error) if is_namespace_error_code(&error, ErrorCode::TableNotFound) => Ok(false),
2020            Err(error) => {
2021                Err(anyhow::Error::from(error)).context("failed to probe table existence")
2022            }
2023        }
2024    }
2025
2026    /// On-disk byte totals for the three datasets plus the data-dir remainder.
2027    /// Every byte is sized by listing through Lance's object store
2028    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
2029    pub async fn table_sizes(&self) -> Result<TableSizes> {
2030        let registry = Arc::new(ObjectStoreRegistry::default());
2031        let params = self.object_store_params();
2032
2033        let sessions = self
2034            .listed_size(
2035                &registry,
2036                &params,
2037                &self.table_location(sessions::SESSIONS).await?,
2038            )
2039            .await?;
2040        let messages = self
2041            .listed_size(
2042                &registry,
2043                &params,
2044                &self.table_location(sessions::MESSAGES).await?,
2045            )
2046            .await?;
2047        let parts = self
2048            .listed_size(
2049                &registry,
2050                &params,
2051                &self.table_location(sessions::PARTS).await?,
2052            )
2053            .await?;
2054        // `other` is whatever sits under the data-dir root but not in the three
2055        // tables (config.toml, stray index temp files): root total minus them.
2056        let root_total = self
2057            .listed_size(&registry, &params, self.location.as_str())
2058            .await?;
2059        let other = root_total.saturating_sub(sessions + messages + parts);
2060        let sessions_data = self
2061            .data_liveness(&registry, &params, Table::Sessions, sessions::SESSIONS)
2062            .await?;
2063        let messages_data = self
2064            .data_liveness(&registry, &params, Table::Messages, sessions::MESSAGES)
2065            .await?;
2066        let parts_data = self
2067            .data_liveness(&registry, &params, Table::Parts, sessions::PARTS)
2068            .await?;
2069        Ok(TableSizes {
2070            sessions,
2071            messages,
2072            parts,
2073            other,
2074            sessions_data,
2075            messages_data,
2076            parts_data,
2077        })
2078    }
2079
2080    async fn data_liveness(
2081        &self,
2082        registry: &Arc<ObjectStoreRegistry>,
2083        params: &ObjectStoreParams,
2084        table: Table,
2085        table_name: &str,
2086    ) -> Result<DataLiveness> {
2087        let location = self.table_location(table_name).await?;
2088        let data_dir = format!("{}/data", location.trim_end_matches('/'));
2089        let on_disk = self.listed_size(registry, params, &data_dir).await?;
2090        let dataset = self.dataset(table).await?;
2091        let live = dataset
2092            .get_fragments()
2093            .iter()
2094            .try_fold(0u64, |total, fragment| {
2095                Some(total + fragment_bytes(fragment.metadata())?)
2096            });
2097        Ok(DataLiveness { on_disk, live })
2098    }
2099
2100    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
2101    async fn listed_size(
2102        &self,
2103        registry: &Arc<ObjectStoreRegistry>,
2104        params: &ObjectStoreParams,
2105        uri: &str,
2106    ) -> Result<u64> {
2107        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
2108            .await
2109            .with_context(|| format!("failed to open object store for {uri}"))?;
2110        let mut listing = store.list(Some(base));
2111        let mut total = 0u64;
2112        while let Some(meta) = listing.next().await {
2113            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
2114            total += meta.size;
2115        }
2116        Ok(total)
2117    }
2118    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
2119        match table {
2120            Table::Sessions => Ok(&self.datasets.sessions),
2121            Table::Messages => Ok(&self.datasets.messages),
2122            Table::Parts => self.parts_cached().await,
2123        }
2124    }
2125
2126    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
2127    /// `OnceCell`; once initialized, behaves identically to the other two.
2128    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
2129        self.datasets
2130            .parts
2131            .get_or_try_init(|| async {
2132                let dataset = open_or_create_via_ns(
2133                    &self.nm,
2134                    &self.nm_ident,
2135                    sessions::PARTS,
2136                    sessions::part_schema(),
2137                    &self.session,
2138                    &self.storage_options,
2139                )
2140                .await?;
2141                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
2142                    dataset,
2143                    last_refresh: Instant::now(),
2144                    refresh_after: self.parts_refresh_after,
2145                }))
2146            })
2147            .await
2148    }
2149    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
2150    where
2151        Fut: std::future::Future<Output = Result<T>>,
2152        Op: FnMut() -> Fut,
2153    {
2154        let mut attempt = 0u8;
2155        loop {
2156            attempt = attempt.saturating_add(1);
2157            match operation().await {
2158                Ok(value) => return Ok(value),
2159                Err(error) if attempt < self.retry.attempts => {
2160                    let backoff = self.backoff(attempt);
2161                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
2162                    // drops everything below the top-level message.
2163                    let error_chain = format!("{error:#}");
2164                    tracing::warn!(
2165                        label,
2166                        attempt,
2167                        ?backoff,
2168                        error = %error_chain,
2169                        "retrying Lance operation"
2170                    );
2171                    tokio::time::sleep(backoff).await;
2172                }
2173                Err(error) => {
2174                    let error_chain = format!("{error:#}");
2175                    tracing::warn!(
2176                        label,
2177                        attempt,
2178                        error = %error_chain,
2179                        "Lance operation exhausted retries"
2180                    );
2181                    // spec.md#protocol: surface OCC failures as a typed `conflict`
2182                    // rather than the generic `storage_unavailable` bucket. The
2183                    // chain root is a `lance::Error` (commit-conflict family) when
2184                    // pond's retry layer exhausted because the manifest could not
2185                    // be advanced; everything else (timeouts, IAM, disk) stays
2186                    // `storage_unavailable`.
2187                    if is_commit_conflict(&error) {
2188                        return Err(error.context(ConflictExhausted { attempts: attempt }));
2189                    }
2190                    return Err(error);
2191                }
2192            }
2193        }
2194    }
2195    fn backoff(&self, attempt: u8) -> Duration {
2196        let shift = u32::from(attempt.saturating_sub(1));
2197        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
2198        let base = self.retry.initial_backoff.saturating_mul(multiplier);
2199        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
2200        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
2201        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
2202        base.mul_f64(factor).min(self.retry.max_backoff)
2203    }
2204}
2205/// Compaction phase: plan + amplification veto + execute + `cleanup_old_versions`,
2206/// one retry block, separate from the indices phase so a lost Rewrite race
2207/// does not abort index work.
2208///
2209/// Vetoes Lance-planned tasks instead of pre-gating on pond fragment math:
2210/// Lance bins split at index-coverage boundaries, so pond predictions diverge
2211/// from what Lance actually rewrites (the old run-sum gate latched open and
2212/// rewrote a 665 MiB tail fragment every 5-min sync). Only whole planned
2213/// tasks are filtered, so OCC and conflict semantics are untouched.
2214///
2215/// spec.md#lance-index-maintenance mandates FRI on by default, but at
2216/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
2217/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
2218/// requires row_addrs but none were provided": `rewrite_files` skips
2219/// row_addrs when stable row ids are on, then the FRI builder demands
2220/// them. With stable_row_ids the remap step is already a no-op
2221/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
2222/// !defer_index_remap`), so running without FRI is correct - we only
2223/// lose the documented concurrency-with-index-build benefit. Flip to
2224/// `true` once upstream fixes the conflict.
2225async fn optimize_table_compact(
2226    dataset: &mut Dataset,
2227    table: Table,
2228    progress: Option<&OptimizeProgressFn>,
2229    policy: &MaintenancePolicy,
2230) -> Result<()> {
2231    let stats: Vec<FragmentStat> = dataset
2232        .get_fragments()
2233        .iter()
2234        .map(|fragment| fragment_stat(fragment.metadata()))
2235        .collect();
2236    let compaction = CompactionOptions {
2237        target_rows_per_fragment: derived_target_rows(&stats),
2238        max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
2239        defer_index_remap: false,
2240        ..CompactionOptions::default()
2241    };
2242
2243    let mut plan = plan_compaction(dataset, &compaction).await?;
2244    if policy.compaction_fragment_cap > 0 {
2245        plan.tasks.retain(|task| {
2246            let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
2247            let keep = keep_task(
2248                &task_stats,
2249                policy.compaction_fragment_cap,
2250                compaction.materialize_deletions_threshold,
2251            );
2252            if !keep {
2253                tracing::debug!(
2254                    target: "pond::perf",
2255                    table = table.as_str(),
2256                    fragments = task_stats.len(),
2257                    "compaction task vetoed: merge dominated by one large fragment",
2258                );
2259            }
2260            keep
2261        });
2262    }
2263    if plan.tasks.is_empty() {
2264        tracing::debug!(
2265            target: "pond::perf",
2266            table = table.as_str(),
2267            "compaction skipped: no task to run",
2268        );
2269    } else {
2270        emit(
2271            progress,
2272            OptimizeEvent::PhaseStart {
2273                table,
2274                phase: OptimizePhase::Compact,
2275                detail: None,
2276            },
2277        );
2278        let started = Instant::now();
2279        let mut completed = Vec::with_capacity(plan.tasks.len());
2280        for task in plan.compaction_tasks() {
2281            completed.push(task.execute(dataset).await?);
2282        }
2283        commit_compaction(
2284            dataset,
2285            completed,
2286            Arc::new(DatasetIndexRemapperOptions::default()),
2287            &compaction,
2288        )
2289        .await?;
2290        emit(
2291            progress,
2292            OptimizeEvent::PhaseDone {
2293                table,
2294                phase: OptimizePhase::Compact,
2295                elapsed_ms: started.elapsed().as_millis() as u64,
2296            },
2297        );
2298    }
2299
2300    // Safe GC only. delete_unverified=false keeps Lance's 7-day in-progress
2301    // guard, so this never races a concurrent writer (spec.md#concurrency); GC
2302    // runs outside OCC, so the guard is what makes it safe on any backend.
2303    emit(
2304        progress,
2305        OptimizeEvent::PhaseStart {
2306            table,
2307            phase: OptimizePhase::Cleanup,
2308            detail: None,
2309        },
2310    );
2311    let started = Instant::now();
2312    dataset
2313        .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
2314        .await
2315        .context("cleanup_old_versions failed during index optimize")?;
2316    emit(
2317        progress,
2318        OptimizeEvent::PhaseDone {
2319            table,
2320            phase: OptimizePhase::Cleanup,
2321            elapsed_ms: started.elapsed().as_millis() as u64,
2322        },
2323    );
2324
2325    Ok(())
2326}
2327
2328/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
2329/// for incremental families. Returns `true` if anything committed.
2330async fn optimize_table_indices(
2331    dataset: &mut Dataset,
2332    intents: &[IndexIntent],
2333    table: Table,
2334    progress: Option<&OptimizeProgressFn>,
2335) -> Result<bool> {
2336    let existing = dataset.load_indices().await?;
2337    let existing_names: std::collections::HashSet<String> =
2338        existing.iter().map(|index| index.name.clone()).collect();
2339
2340    let mut append_indices: Vec<String> = Vec::new();
2341    let mut did_work = false;
2342
2343    for intent in intents {
2344        let exists = existing_names.contains(intent.name);
2345
2346        if !exists {
2347            if !intent.trigger.should_create(dataset).await? {
2348                continue;
2349            }
2350            let params = intent.params.build(dataset).await?;
2351            let index_type = intent.params.index_type();
2352            tracing::info!(
2353                index = intent.name,
2354                column = intent.column,
2355                "creating Lance index (trigger fired)",
2356            );
2357            emit(
2358                progress,
2359                OptimizeEvent::PhaseStart {
2360                    table,
2361                    phase: OptimizePhase::IndexCreate,
2362                    detail: Some(intent.name.to_owned()),
2363                },
2364            );
2365            let started = Instant::now();
2366            dataset
2367                .create_index(
2368                    &[intent.column],
2369                    index_type,
2370                    Some(intent.name.to_owned()),
2371                    params.as_ref(),
2372                    false,
2373                )
2374                .await
2375                .with_context(|| format!("failed to create index {}", intent.name))?;
2376            emit(
2377                progress,
2378                OptimizeEvent::PhaseDone {
2379                    table,
2380                    phase: OptimizePhase::IndexCreate,
2381                    elapsed_ms: started.elapsed().as_millis() as u64,
2382                },
2383            );
2384            did_work = true;
2385            continue;
2386        }
2387
2388        let unindexed = dataset.unindexed_fragments(intent.name).await?;
2389        if unindexed.is_empty() {
2390            continue;
2391        }
2392        // Lag guard: let fragments accumulate behind the brute-force fallback
2393        // rather than firing a commit per tiny append. Threshold is operator-
2394        // tunable via `[maintenance].index_lag_threshold`.
2395        if unindexed.len() < index_lag_threshold() {
2396            continue;
2397        }
2398        match intent.params {
2399            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
2400                let params = intent.params.build(dataset).await?;
2401                let index_type = intent.params.index_type();
2402                tracing::debug!(
2403                    target: "pond::perf",
2404                    index = intent.name,
2405                    column = intent.column,
2406                    "rebuilding Lance BTree index",
2407                );
2408                emit(
2409                    progress,
2410                    OptimizeEvent::PhaseStart {
2411                        table,
2412                        phase: OptimizePhase::IndexRebuild,
2413                        detail: Some(intent.name.to_owned()),
2414                    },
2415                );
2416                let started = Instant::now();
2417                dataset
2418                    .create_index(
2419                        &[intent.column],
2420                        index_type,
2421                        Some(intent.name.to_owned()),
2422                        params.as_ref(),
2423                        true,
2424                    )
2425                    .await
2426                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2427                emit(
2428                    progress,
2429                    OptimizeEvent::PhaseDone {
2430                        table,
2431                        phase: OptimizePhase::IndexRebuild,
2432                        elapsed_ms: started.elapsed().as_millis() as u64,
2433                    },
2434                );
2435                did_work = true;
2436            }
2437            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
2438            | IndexParamsKind::InvertedFtsNgram { .. }
2439            | IndexParamsKind::IvfPqCosine { .. } => {
2440                append_indices.push(intent.name.to_owned());
2441            }
2442            IndexParamsKind::Scalar(_) => {
2443                let params = intent.params.build(dataset).await?;
2444                emit(
2445                    progress,
2446                    OptimizeEvent::PhaseStart {
2447                        table,
2448                        phase: OptimizePhase::IndexRebuild,
2449                        detail: Some(intent.name.to_owned()),
2450                    },
2451                );
2452                let started = Instant::now();
2453                dataset
2454                    .create_index(
2455                        &[intent.column],
2456                        intent.params.index_type(),
2457                        Some(intent.name.to_owned()),
2458                        params.as_ref(),
2459                        true,
2460                    )
2461                    .await
2462                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2463                emit(
2464                    progress,
2465                    OptimizeEvent::PhaseDone {
2466                        table,
2467                        phase: OptimizePhase::IndexRebuild,
2468                        elapsed_ms: started.elapsed().as_millis() as u64,
2469                    },
2470                );
2471                did_work = true;
2472            }
2473        }
2474    }
2475
2476    if !append_indices.is_empty() {
2477        let to_append = append_indices.clone();
2478        emit(
2479            progress,
2480            OptimizeEvent::PhaseStart {
2481                table,
2482                phase: OptimizePhase::IndexAppend,
2483                detail: Some(append_indices.join(", ")),
2484            },
2485        );
2486        let started = Instant::now();
2487        dataset
2488            .optimize_indices(&OptimizeOptions::append().index_names(to_append))
2489            .await
2490            .context("optimize_indices(append) failed during index optimize")?;
2491        emit(
2492            progress,
2493            OptimizeEvent::PhaseDone {
2494                table,
2495                phase: OptimizePhase::IndexAppend,
2496                elapsed_ms: started.elapsed().as_millis() as u64,
2497            },
2498        );
2499        tracing::debug!(
2500            target: "pond::perf",
2501            indices = ?append_indices,
2502            "appended trailing fragments into indices",
2503        );
2504        did_work = true;
2505    }
2506
2507    Ok(did_work)
2508}
2509
2510async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
2511    if !intent.trigger.should_create(dataset).await? {
2512        return Ok(());
2513    }
2514    let params = intent.params.build(dataset).await?;
2515    dataset
2516        .create_index(
2517            &[intent.column],
2518            intent.params.index_type(),
2519            Some(intent.name.to_owned()),
2520            params.as_ref(),
2521            true,
2522        )
2523        .await
2524        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2525    Ok(())
2526}
2527
2528async fn index_status(
2529    table: Table,
2530    dataset: &Dataset,
2531    intents: &[IndexIntent],
2532) -> Result<Vec<IndexStatus>> {
2533    let existing = dataset.load_indices().await?;
2534    let existing_names: std::collections::HashSet<String> =
2535        existing.iter().map(|index| index.name.clone()).collect();
2536    let total_fragments = dataset.get_fragments().len();
2537    let total_rows = dataset.count_rows(None).await?;
2538    let mut statuses = Vec::with_capacity(intents.len());
2539    for intent in intents {
2540        let exists = existing_names.contains(intent.name);
2541        if !exists {
2542            statuses.push(IndexStatus {
2543                table,
2544                intent_name: intent.name.to_owned(),
2545                fragments_covered: 0,
2546                unindexed_fragments: total_fragments,
2547                unindexed_rows: total_rows,
2548                exists,
2549            });
2550            continue;
2551        }
2552        let unindexed = dataset
2553            .unindexed_fragments(intent.name)
2554            .await
2555            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
2556        let unindexed_fragments = unindexed.len();
2557        let unindexed_rows = unindexed
2558            .iter()
2559            .map(|fragment| fragment.num_rows().unwrap_or(0))
2560            .sum();
2561        statuses.push(IndexStatus {
2562            table,
2563            intent_name: intent.name.to_owned(),
2564            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
2565            unindexed_fragments,
2566            unindexed_rows,
2567            exists,
2568        });
2569    }
2570    Ok(statuses)
2571}
2572
2573/// Open the table at `table_name` via the namespace; create + initialize on
2574/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
2575/// expectation so a stale data dir surfaces early.
2576///
2577/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
2578/// the builder re-wraps an already-`Namespace`-wrapped error
2579/// (lance/src/dataset/builder.rs:142), so going through it would force a
2580/// chain-walk to classify `TableNotFound`. The direct probe stays at one
2581/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
2582/// namespace external-manifest commits) is not wired here; v1 ships
2583/// Directory v2 only.
2584async fn open_or_create_via_ns(
2585    nm: &Arc<dyn LanceNamespace>,
2586    nm_ident: &NamespaceIdent,
2587    table_name: &str,
2588    schema: lance::deps::arrow_schema::SchemaRef,
2589    session: &Arc<Session>,
2590    storage_options: &HashMap<String, String>,
2591) -> Result<Dataset> {
2592    let table_id = nm_ident.as_table_id(table_name);
2593
2594    let request = DescribeTableRequest {
2595        id: Some(table_id.clone()),
2596        ..Default::default()
2597    };
2598    match nm.describe_table(request).await {
2599        Ok(response) => {
2600            let location = response.location.with_context(|| {
2601                format!("namespace returned no location for table {table_name}")
2602            })?;
2603            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
2604            if !storage_options.is_empty() {
2605                builder = builder.with_storage_options(storage_options.clone());
2606            }
2607            let dataset = builder
2608                .load()
2609                .await
2610                .with_context(|| format!("failed to open table {table_name}"))?;
2611            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
2612            return Ok(dataset);
2613        }
2614        Err(error) => match &error {
2615            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
2616                // fall through to create
2617            }
2618            _ => {
2619                return Err(anyhow::Error::from(error))
2620                    .with_context(|| format!("failed to describe table {table_name}"));
2621            }
2622        },
2623    }
2624
2625    // Create path: pond seeds an empty dataset with the canonical schema so
2626    // every subsequent open lands on a real Lance dataset, not a phantom.
2627    let mut write_params = sessions::write_params_for_create();
2628    write_params.session = Some(session.clone());
2629    write_params.mode = WriteMode::Create;
2630    if !storage_options.is_empty() {
2631        write_params.store_params = Some(ObjectStoreParams {
2632            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
2633                storage_options.clone(),
2634            ))),
2635            ..Default::default()
2636        });
2637    }
2638    let reader = sessions::empty_reader(schema)?;
2639    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
2640        .await
2641        .with_context(|| format!("failed to create table {table_name}"))
2642}
2643
2644// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
2645// before the underlying `NamespaceError`; walk the whole `.source()` chain
2646// rather than only matching the outer variant.
2647fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
2648    if !matches!(error, lance::Error::Namespace { .. }) {
2649        return false;
2650    }
2651    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
2652        link.source()
2653    })
2654    .filter_map(|link| link.downcast_ref::<NamespaceError>())
2655    .any(|inner| inner.code() == code)
2656}
2657
2658fn scanner_with_prefilter(
2659    dataset: &Dataset,
2660    predicate: Option<&Predicate>,
2661) -> Result<lance::dataset::scanner::Scanner> {
2662    let mut scanner = dataset.scan();
2663    scanner.prefilter(true);
2664    if let Some(predicate) = predicate {
2665        let filter = predicate.to_lance();
2666        if !filter.is_empty() {
2667            scanner.filter(&filter)?;
2668        }
2669    }
2670    Ok(scanner)
2671}
2672fn ensure_schema_matches(
2673    dataset: &Dataset,
2674    expected: &lance::deps::arrow_schema::Schema,
2675    table_name: &str,
2676) -> Result<()> {
2677    use lance::deps::arrow_schema::DataType;
2678    use std::collections::BTreeSet;
2679    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
2680    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
2681    let expected_names: BTreeSet<&str> = expected
2682        .fields()
2683        .iter()
2684        .map(|f| f.name().as_str())
2685        .collect();
2686    if actual_names != expected_names {
2687        anyhow::bail!(
2688            "table {table_name} has columns {actual_names:?} but this pond build expects \
2689             {expected_names:?} - the on-disk store predates a schema change; delete the \
2690             data directory and re-run `pond ingest`",
2691        );
2692    }
2693    // Catch a vector-dim change (configured `[embeddings].dim` differs from
2694    // the on-disk vector column width) early with a friendly message. Lance
2695    // would otherwise reject the next write with an opaque schema-mismatch
2696    // error inside the `merge_update` path.
2697    for actual_field in actual.fields() {
2698        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
2699            continue;
2700        };
2701        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
2702            (actual_field.data_type(), expected_field.data_type())
2703            && actual_dim != expected_dim
2704        {
2705            tracing::warn!(
2706                table = table_name,
2707                column = actual_field.name(),
2708                actual_dim,
2709                expected_dim,
2710                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
2711            );
2712        }
2713    }
2714    Ok(())
2715}
2716/// Object-store defaults injected for any non-local pond location. Each key
2717/// is only set when neither the user-provided key nor its env-var-form alias
2718/// is already present, so explicit overrides in `[storage]` always win.
2719/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
2720/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
2721/// signature is wasted work the server does not validate.
2722fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
2723    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
2724        if aliases
2725            .iter()
2726            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
2727        {
2728            return;
2729        }
2730        options.insert(aliases[0].to_owned(), value.to_owned());
2731    }
2732    set_default(options, &["pool_idle_timeout"], "300 seconds");
2733    set_default(options, &["connect_timeout"], "10 seconds");
2734    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
2735        .iter()
2736        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
2737    if has_custom_endpoint {
2738        set_default(
2739            options,
2740            &["aws_unsigned_payload", "unsigned_payload"],
2741            "true",
2742        );
2743    }
2744}
2745
2746fn quoted_string(value: &str) -> String {
2747    format!("'{}'", value.replace('\'', "''"))
2748}
2749fn like_contains(value: &str) -> String {
2750    let escaped = value
2751        .replace('\\', "\\\\")
2752        .replace('%', "\\%")
2753        .replace('_', "\\_")
2754        .replace('\'', "''");
2755    format!("'%{escaped}%'")
2756}
2757
2758#[cfg(test)]
2759mod tests {
2760    #![allow(clippy::expect_used, clippy::unwrap_used)]
2761
2762    use super::*;
2763    use tempfile::TempDir;
2764
2765    fn set(scope: Option<&str>) -> CredsSet {
2766        CredsSet {
2767            scope: scope.map(str::to_owned),
2768            access_key_id: Some("AKIA".to_owned()),
2769            secret_access_key: Some("shh".to_owned()),
2770            ..CredsSet::default()
2771        }
2772    }
2773
2774    fn opts(resolved: &ResolvedStorage, key: &str) -> Option<String> {
2775        resolved.options.get(key).cloned()
2776    }
2777
2778    #[test]
2779    fn storage_url_translation_table() {
2780        // file (Lance's `uri_to_url` appends the trailing slash; `child_uri`
2781        // trims it downstream)
2782        let local = StorageUrl::parse("/srv/pond").unwrap();
2783        assert_eq!(local.lance_url().as_str(), "file:///srv/pond/");
2784        assert!(local.is_local());
2785        assert!(local.scheme_options.is_empty());
2786        // s3 passthrough
2787        let aws = StorageUrl::parse("s3://bucket/prefix").unwrap();
2788        assert_eq!(aws.lance_url().as_str(), "s3://bucket/prefix");
2789        assert!(aws.scheme_options.is_empty());
2790        // s3+https: TLS stays on, virtual-hosted defaults on for domain
2791        // hosts, region defaults deterministically. The endpoint is
2792        // assembled at resolve time with the bucket folded into the host
2793        // (object_store's virtual-hosted convention).
2794        let fat = StorageUrl::parse("s3+https://nbg1.example.com/my-pond/sub").unwrap();
2795        assert_eq!(fat.lance_url().as_str(), "s3://my-pond/sub");
2796        assert_eq!(
2797            fat.scheme_options,
2798            vec![
2799                ("allow_http", "false".to_owned()),
2800                ("virtual_hosted_style_request", "true".to_owned()),
2801                ("region", "us-east-1".to_owned()),
2802            ],
2803        );
2804        let resolved = fat.resolve(&BTreeMap::new()).unwrap();
2805        assert_eq!(
2806            opts(&resolved, "endpoint").as_deref(),
2807            Some("https://my-pond.nbg1.example.com"),
2808        );
2809        assert_eq!(opts(&resolved, "region").as_deref(), Some("us-east-1"));
2810        // s3+http on an IP host: allow_http flips, path-style auto-selected
2811        // (a bucket subdomain on an IP can't resolve), port survives.
2812        let plain = StorageUrl::parse("s3+http://127.0.0.1:9000/pond").unwrap();
2813        assert_eq!(plain.lance_url().as_str(), "s3://pond/");
2814        assert_eq!(plain.scheme_options[0], ("allow_http", "true".to_owned()));
2815        assert_eq!(
2816            plain.scheme_options[1],
2817            ("virtual_hosted_style_request", "false".to_owned()),
2818        );
2819        let resolved = plain.resolve(&BTreeMap::new()).unwrap();
2820        assert_eq!(
2821            opts(&resolved, "endpoint").as_deref(),
2822            Some("http://127.0.0.1:9000"),
2823        );
2824        // An explicit endpoint in `extra` is the escape hatch and wins.
2825        let mut pinned = BTreeMap::new();
2826        pinned.insert(
2827            "default".to_owned(),
2828            CredsSet {
2829                extra: [(
2830                    "endpoint".to_owned(),
2831                    "https://pinned.example.com".to_owned(),
2832                )]
2833                .into_iter()
2834                .collect(),
2835                ..CredsSet::default()
2836            },
2837        );
2838        let resolved = fat.resolve(&pinned).unwrap();
2839        assert_eq!(
2840            opts(&resolved, "endpoint").as_deref(),
2841            Some("https://pinned.example.com"),
2842        );
2843        // gs passthrough
2844        let gcs = StorageUrl::parse("gs://bucket/p").unwrap();
2845        assert_eq!(gcs.lance_url().as_str(), "gs://bucket/p");
2846        // az: account folds into options
2847        let azure = StorageUrl::parse("az://acct/container/p").unwrap();
2848        assert_eq!(azure.lance_url().as_str(), "az://container/p");
2849        assert_eq!(
2850            azure.scheme_options,
2851            vec![("account_name", "acct".to_owned())]
2852        );
2853        // tests-only schemes pass through untouched
2854        let shared = StorageUrl::parse("shared-memory://pond-test-x/").unwrap();
2855        assert_eq!(shared.lance_url().as_str(), "shared-memory://pond-test-x/");
2856    }
2857
2858    #[test]
2859    fn storage_url_rejects_bad_shapes() {
2860        // RFC 3986 userinfo is a leak class, never accepted.
2861        let err = StorageUrl::parse("s3+https://user:pass@host/bucket")
2862            .expect_err("userinfo must be rejected")
2863            .to_string();
2864        assert!(
2865            err.contains("creds"),
2866            "error must name the alternative: {err}"
2867        );
2868        // Missing bucket.
2869        assert!(StorageUrl::parse("s3+https://host").is_err());
2870        assert!(StorageUrl::parse("az://acct").is_err());
2871        // Unknown scheme names the grammar.
2872        let err = StorageUrl::parse("ftp://host/x")
2873            .expect_err("ftp")
2874            .to_string();
2875        assert!(err.contains("s3+https"), "got: {err}");
2876        // Unrecognized query params die loudly.
2877        let err = StorageUrl::parse("s3://b/p?regoin=x")
2878            .expect_err("typo")
2879            .to_string();
2880        assert!(err.contains("regoin"), "got: {err}");
2881        // Query params on local / in-memory schemes die just as loudly -
2882        // no silent carry into the URL Lance opens.
2883        let err = StorageUrl::parse("memory://x?creds=y")
2884            .expect_err("memory query")
2885            .to_string();
2886        assert!(err.contains("query params"), "got: {err}");
2887        let err = StorageUrl::parse("file:///x?creds=y")
2888            .expect_err("file query")
2889            .to_string();
2890        assert!(err.contains("query params"), "got: {err}");
2891        // `?` in a bare path is a filename character, not a query.
2892        assert!(StorageUrl::parse("/tmp/a?b").is_ok());
2893    }
2894
2895    #[test]
2896    fn storage_url_canonicalizes_ports_and_keeps_percent_encoding() {
2897        // Default port strips so scope matching can't split on `:443`.
2898        let with_port = StorageUrl::parse("s3+https://host:443/bucket/p").unwrap();
2899        let without = StorageUrl::parse("s3+https://host/bucket/p").unwrap();
2900        assert_eq!(with_port.canonical(), without.canonical());
2901        // Non-default port survives into the assembled endpoint.
2902        let odd = StorageUrl::parse("s3+https://host:8443/bucket").unwrap();
2903        let resolved = odd.resolve(&BTreeMap::new()).unwrap();
2904        assert_eq!(
2905            resolved.options.get("endpoint").map(String::as_str),
2906            Some("https://bucket.host:8443"),
2907        );
2908        // Percent-encoded prefix passes through to the Lance URL verbatim.
2909        let encoded = StorageUrl::parse("s3+https://host/bucket/pre%20fix").unwrap();
2910        assert_eq!(encoded.lance_url().as_str(), "s3://bucket/pre%20fix");
2911    }
2912
2913    #[test]
2914    fn query_params_strip_and_apply_over_set_fields() {
2915        let mut creds = BTreeMap::new();
2916        creds.insert(
2917            "default".to_owned(),
2918            CredsSet {
2919                region: Some("from-set".to_owned()),
2920                virtual_hosted_style_request: Some(false),
2921                ..set(None)
2922            },
2923        );
2924        let url = StorageUrl::parse(
2925            "s3+https://host/bucket/p?region=from-query&virtual_hosted_style_request=true",
2926        )
2927        .unwrap();
2928        // Stripped before Lance sees the URL.
2929        assert_eq!(url.lance_url().as_str(), "s3://bucket/p");
2930        assert!(url.canonical().query().is_none());
2931        let resolved = url.resolve(&creds).unwrap();
2932        // Assembly precedence: scheme < set < query.
2933        assert_eq!(opts(&resolved, "region").as_deref(), Some("from-query"));
2934        assert_eq!(
2935            opts(&resolved, "virtual_hosted_style_request").as_deref(),
2936            Some("true"),
2937        );
2938        // virtual_hosted=true (query) -> the bucket rides in the endpoint host.
2939        assert_eq!(
2940            opts(&resolved, "endpoint").as_deref(),
2941            Some("https://bucket.host"),
2942        );
2943    }
2944
2945    #[test]
2946    fn scope_matching_binds_by_longest_prefix_at_segment_boundaries() {
2947        let mut creds = BTreeMap::new();
2948        creds.insert("all".to_owned(), set(None));
2949        creds.insert("bucket".to_owned(), set(Some("s3+https://host/pond/")));
2950        creds.insert("deep".to_owned(), set(Some("s3+https://host/pond/sub")));
2951
2952        let bind = |input: &str| {
2953            StorageUrl::parse(input)
2954                .unwrap()
2955                .resolve(&creds)
2956                .unwrap()
2957                .binding
2958        };
2959        // Longest match wins.
2960        assert_eq!(
2961            bind("s3+https://host/pond/sub/x"),
2962            CredsBinding::Set {
2963                name: "deep".to_owned(),
2964                via: BindVia::Scope
2965            },
2966        );
2967        assert_eq!(
2968            bind("s3+https://host/pond/other"),
2969            CredsBinding::Set {
2970                name: "bucket".to_owned(),
2971                via: BindVia::Scope
2972            },
2973        );
2974        // Segment boundary: `/pond` does not match `/pond-2`.
2975        assert_eq!(
2976            bind("s3+https://host/pond-2"),
2977            CredsBinding::Set {
2978                name: "all".to_owned(),
2979                via: BindVia::CatchAll
2980            },
2981        );
2982        // No cross-scheme normalization: the scoped sets don't match s3://.
2983        assert_eq!(
2984            bind("s3://pond/sub"),
2985            CredsBinding::Set {
2986                name: "all".to_owned(),
2987                via: BindVia::CatchAll
2988            },
2989        );
2990        // Default-port spelling matches the portless scope.
2991        assert_eq!(
2992            bind("s3+https://host:443/pond/x"),
2993            CredsBinding::Set {
2994                name: "bucket".to_owned(),
2995                via: BindVia::Scope
2996            },
2997        );
2998        // `?creds=` pointer beats every scope...
2999        assert_eq!(
3000            bind("s3+https://host/pond/sub/x?creds=all"),
3001            CredsBinding::Set {
3002                name: "all".to_owned(),
3003                via: BindVia::Pointer
3004            },
3005        );
3006        // ...and a pointer to a missing set is an error, not a fallback.
3007        let err = StorageUrl::parse("s3://b/p?creds=nope")
3008            .unwrap()
3009            .resolve(&creds)
3010            .expect_err("missing set")
3011            .to_string();
3012        assert!(err.contains("creds=nope"), "got: {err}");
3013
3014        // No sets at all -> ambient chain; local URLs skip resolution.
3015        let empty = BTreeMap::new();
3016        assert_eq!(
3017            StorageUrl::parse("s3://b/p")
3018                .unwrap()
3019                .resolve(&empty)
3020                .unwrap()
3021                .binding,
3022            CredsBinding::Ambient,
3023        );
3024        assert_eq!(
3025            StorageUrl::parse("/srv/pond")
3026                .unwrap()
3027                .resolve(&creds)
3028                .unwrap()
3029                .binding,
3030            CredsBinding::NotApplicable,
3031        );
3032    }
3033
3034    #[test]
3035    fn unmatched_sets_are_reported_only_on_remote_invocations() {
3036        let mut creds = BTreeMap::new();
3037        creds.insert("used".to_owned(), set(Some("s3://bucket/")));
3038        creds.insert("idle".to_owned(), set(Some("s3://other/")));
3039
3040        let remote = StorageUrl::parse("s3://bucket/p")
3041            .unwrap()
3042            .resolve(&creds)
3043            .unwrap();
3044        assert_eq!(unmatched_creds_sets(&[&remote], &creds), vec!["idle"]);
3045
3046        // A purely local invocation must not nag about remote-only sets.
3047        let local = StorageUrl::parse("/srv/pond")
3048            .unwrap()
3049            .resolve(&creds)
3050            .unwrap();
3051        assert!(unmatched_creds_sets(&[&local], &creds).is_empty());
3052    }
3053
3054    #[test]
3055    fn secrets_materialize_from_file_and_command() {
3056        let dir = TempDir::new().unwrap();
3057        let key_path = dir.path().join("key");
3058        std::fs::write(&key_path, "from-file\n").unwrap();
3059        let mut creds = BTreeMap::new();
3060        creds.insert(
3061            "default".to_owned(),
3062            CredsSet {
3063                access_key_id_file: Some(key_path),
3064                // Two trailing newlines: exactly one is stripped.
3065                secret_access_key_command: Some("printf 'from-command\\n\\n'".to_owned()),
3066                ..CredsSet::default()
3067            },
3068        );
3069        let url = StorageUrl::parse("s3://bucket/p").unwrap();
3070        let resolved = url.resolve(&creds).unwrap();
3071        assert_eq!(
3072            opts(&resolved, "access_key_id").as_deref(),
3073            Some("from-file")
3074        );
3075        assert_eq!(
3076            opts(&resolved, "secret_access_key").as_deref(),
3077            Some("from-command\n"),
3078        );
3079
3080        // A failing command surfaces its text and exit status.
3081        let mut failing = BTreeMap::new();
3082        failing.insert(
3083            "default".to_owned(),
3084            CredsSet {
3085                secret_access_key_command: Some("exit 3".to_owned()),
3086                ..CredsSet::default()
3087            },
3088        );
3089        let err = url
3090            .resolve(&failing)
3091            .expect_err("command must fail")
3092            .to_string();
3093        assert!(err.contains("exit 3"), "got: {err}");
3094
3095        // The command cache: one subprocess per command text per process.
3096        let marker = dir.path().join("runs");
3097        let command = format!("echo run >> {} && echo secret", marker.display());
3098        let mut counted = BTreeMap::new();
3099        counted.insert(
3100            "default".to_owned(),
3101            CredsSet {
3102                secret_access_key_command: Some(command),
3103                ..CredsSet::default()
3104            },
3105        );
3106        url.resolve(&counted).unwrap();
3107        url.resolve(&counted).unwrap();
3108        let runs = std::fs::read_to_string(&marker).unwrap();
3109        assert_eq!(runs.lines().count(), 1, "command must run exactly once");
3110    }
3111
3112    #[test]
3113    fn check_errors_classify_by_kind_and_binding() {
3114        let auth_error = || object_store::Error::Unauthenticated {
3115            path: "k".to_owned(),
3116            source: "denied".into(),
3117        };
3118        let bound = CredsBinding::Set {
3119            name: "work".to_owned(),
3120            via: BindVia::Scope,
3121        };
3122        // Auth-class error with a bound set names the set...
3123        match classify_check_error(auth_error(), &bound, "put") {
3124            CheckFailure::Auth { set, .. } => assert_eq!(set, "work"),
3125            other => panic!("want Auth, got {other:?}"),
3126        }
3127        // ...and without one, points at the (empty) ambient chain.
3128        assert!(matches!(
3129            classify_check_error(auth_error(), &CredsBinding::Ambient, "put"),
3130            CheckFailure::NoCreds { .. },
3131        ));
3132        let denied = object_store::Error::PermissionDenied {
3133            path: "k".to_owned(),
3134            source: "403".into(),
3135        };
3136        assert!(matches!(
3137            classify_check_error(denied, &bound, "put"),
3138            CheckFailure::Auth { .. },
3139        ));
3140        // Anything else is I/O, set or no set.
3141        let missing = object_store::Error::NotFound {
3142            path: "k".to_owned(),
3143            source: "404".into(),
3144        };
3145        assert!(matches!(
3146            classify_check_error(missing, &bound, "get"),
3147            CheckFailure::Io { .. },
3148        ));
3149        // Lance wraps an empty-creds chain as a `Generic` error, never the
3150        // typed `Unauthenticated`; the rendered `CredentialsNotLoaded` is the
3151        // signal. Bound -> Auth (the set is wrong), unbound -> NoCreds.
3152        let no_creds = || object_store::Error::Generic {
3153            store: "S3",
3154            source: "Failed to get AWS credentials: CredentialsNotLoaded".into(),
3155        };
3156        assert!(matches!(
3157            classify_check_error(no_creds(), &bound, "put"),
3158            CheckFailure::Auth { .. },
3159        ));
3160        assert!(matches!(
3161            classify_check_error(no_creds(), &CredsBinding::Ambient, "put"),
3162            CheckFailure::NoCreds { .. },
3163        ));
3164    }
3165
3166    #[test]
3167    fn concise_cause_strips_upstream_noise_to_one_line() {
3168        // The shape Lance actually produces: bug-report boilerplate, the real
3169        // cause, an internal source location, then the same text re-printed.
3170        let inner = "Encountered internal error. Please file a bug report at \
3171                     https://github.com/lance-format/lance/issues. Failed to get AWS \
3172                     credentials: CredentialsNotLoaded, <WORKSPACE>/src/object_store/providers/aws.rs:401:21: \
3173                     Encountered internal error. Please file a bug report at \
3174                     https://github.com/lance-format/lance/issues. Failed to get AWS \
3175                     credentials: CredentialsNotLoaded";
3176        let failure = CheckFailure::NoCreds {
3177            source: anyhow!(inner.to_owned()).context("initial conditional put"),
3178        };
3179        let cause = failure.concise_cause().expect("auth-class carries a cause");
3180        assert_eq!(cause, "Failed to get AWS credentials: CredentialsNotLoaded");
3181        // Display carries only the fix-naming lead, no chain.
3182        assert!(
3183            !failure.to_string().contains("file a bug report"),
3184            "lead must not trail the chain: {failure}"
3185        );
3186        // OccUnsupported's detail is already curated into Display.
3187        let occ = CheckFailure::OccUnsupported {
3188            detail: "put-if-none-match ignored".to_owned(),
3189        };
3190        assert!(occ.concise_cause().is_none());
3191        // Oversized single-line causes middle-truncate, keeping the tail
3192        // (wrapped transport errors put the root cause at the end).
3193        let long = CheckFailure::Io {
3194            source: anyhow!(format!("{} dns error: lookup failed", "x".repeat(500))),
3195        };
3196        let cause = long.concise_cause().expect("io carries a cause");
3197        assert!(cause.contains(" ... "), "long causes truncate: {cause}");
3198        assert!(
3199            cause.ends_with("dns error: lookup failed"),
3200            "the tail survives: {cause}"
3201        );
3202    }
3203
3204    #[tokio::test]
3205    async fn storage_check_passes_on_memory_backend() {
3206        let resolved = StorageUrl::parse("memory://check/probe")
3207            .unwrap()
3208            .resolve(&BTreeMap::new())
3209            .unwrap();
3210        storage_check(&resolved).await.expect("memory probe passes");
3211    }
3212
3213    fn stat(bytes: u64) -> FragmentStat {
3214        FragmentStat {
3215            bytes: Some(bytes),
3216            rows: bytes / 1_000,
3217            deleted_rows: 0,
3218        }
3219    }
3220
3221    #[test]
3222    fn compaction_veto_blocks_absorb_keeps_peers() {
3223        // One 665 MiB tail fragment + tiny appends -> vetoed.
3224        let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
3225        assert!(!keep_task(&absorb, 64, 0.1));
3226        // Peer-sized merge halves fragment count -> kept.
3227        let peers = [stat(300_000_000), stat(300_000_000)];
3228        assert!(keep_task(&peers, 64, 0.1));
3229        // Remainder reaches largest / COMPACTION_ABSORB_FACTOR -> kept.
3230        let tiered = [stat(400_000), stat(60_000), stat(40_000)];
3231        assert!(keep_task(&tiered, 64, 0.1));
3232    }
3233
3234    #[test]
3235    fn compaction_veto_passes_deletions_and_cap() {
3236        let mut deleting = stat(665_000_000);
3237        deleting.deleted_rows = deleting.rows / 5;
3238        assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
3239
3240        let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
3241            .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
3242            .collect();
3243        assert!(keep_task(&wide, 64, 0.1));
3244    }
3245
3246    #[test]
3247    fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
3248        let mut unknown = stat(665_000_000);
3249        unknown.bytes = None;
3250        // Rows comparison: 665k vs 3k -> still vetoed.
3251        assert!(!keep_task(
3252            &[unknown, stat(1_000_000), stat(2_000_000)],
3253            64,
3254            0.1
3255        ));
3256    }
3257
3258    #[test]
3259    fn derived_target_rows_tracks_row_size_and_clamps() {
3260        // ~1.3 KiB rows -> ~200k-row target.
3261        let parts_like = [FragmentStat {
3262            bytes: Some(665_000_000),
3263            rows: 511_000,
3264            deleted_rows: 0,
3265        }];
3266        let target = derived_target_rows(&parts_like);
3267        assert!((150_000..300_000).contains(&target), "{target}");
3268        // No usable sizes -> Lance default.
3269        let unknown = [FragmentStat {
3270            bytes: None,
3271            rows: 511_000,
3272            deleted_rows: 0,
3273        }];
3274        assert_eq!(
3275            derived_target_rows(&unknown),
3276            MAX_TARGET_ROWS_PER_FRAGMENT as usize
3277        );
3278        // Tiny rows clamp at the ceiling, huge rows at the floor.
3279        let tiny = [FragmentStat {
3280            bytes: Some(1_000_000),
3281            rows: 100_000,
3282            deleted_rows: 0,
3283        }];
3284        assert_eq!(
3285            derived_target_rows(&tiny),
3286            MAX_TARGET_ROWS_PER_FRAGMENT as usize
3287        );
3288        let huge = [FragmentStat {
3289            bytes: Some(1_000_000_000),
3290            rows: 100,
3291            deleted_rows: 0,
3292        }];
3293        assert_eq!(
3294            derived_target_rows(&huge),
3295            MIN_TARGET_ROWS_PER_FRAGMENT as usize
3296        );
3297    }
3298
3299    #[test]
3300    fn namespace_error_code_walks_wrapped_chain() {
3301        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
3302            message: "missing".into(),
3303        }));
3304        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
3305
3306        let wrapped = lance::Error::namespace_source(Box::new(direct));
3307        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
3308
3309        let other_code =
3310            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
3311                message: "nope".into(),
3312            }));
3313        assert!(!is_namespace_error_code(
3314            &other_code,
3315            ErrorCode::TableNotFound
3316        ));
3317
3318        let not_namespace = lance::Error::internal("unrelated");
3319        assert!(!is_namespace_error_code(
3320            &not_namespace,
3321            ErrorCode::TableNotFound
3322        ));
3323    }
3324
3325    /// Round-trip: opening a fresh data dir through `lance-namespace`
3326    /// produces all three tables, and `Handle::scan` returns an empty batch
3327    /// for each (no spurious schema mismatch, no namespace error).
3328    #[tokio::test]
3329    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
3330        let temp = TempDir::new()?;
3331        let url = Url::from_directory_path(temp.path())
3332            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
3333        let handle = Handle::open(&url).await?;
3334        // Each table has its own PK column; project the canonical one so the
3335        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
3336        let cases: [(Table, &[&str]); 3] = [
3337            (Table::Sessions, &["id"]),
3338            (Table::Messages, &["id"]),
3339            (Table::Parts, &["id"]),
3340        ];
3341        for (table, projection) in cases {
3342            let scanner = handle
3343                .scan(table, ScanOpts::project_only(projection))
3344                .await?;
3345            let batch = scanner.try_into_batch().await?;
3346            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
3347        }
3348        Ok(())
3349    }
3350}