Skip to main content

pond/
substrate.rs

1//! The storage substrate (spec.md#substrate): pond's one seam to Lance,
2//! generic over consumers.
3
4use crate::{
5    RetryPolicy,
6    config::{self, CredsSet},
7    handlers::NamespaceIdent,
8    sessions::{self},
9};
10use anyhow::{Context, Result, anyhow, bail};
11use lance::Dataset;
12use lance::dataset::builder::DatasetBuilder;
13use lance::dataset::index::DatasetIndexRemapperOptions;
14use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
15pub use lance::dataset::write::merge_insert::MergeStats;
16use lance::dataset::write::merge_insert::SourceDedupeBehavior;
17use lance::dataset::{InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
18pub use lance::dataset::{WriteParams, WriteStats};
19use lance::deps::arrow_array::{Array, RecordBatch, RecordBatchIterator, StringArray};
20use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
21use lance::index::DatasetIndexExt;
22use lance::index::DatasetIndexInternalExt;
23use lance::index::vector::VectorIndexParams;
24use lance::session::Session;
25use lance_index::IndexType;
26use lance_index::optimize::OptimizeOptions;
27use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
28use lance_index::vector::ivf::IvfBuildParams;
29use lance_index::vector::sq::builder::SQBuildParams;
30use lance_io::object_store::{
31    ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, uri_to_url,
32};
33use lance_linalg::distance::MetricType;
34use lance_namespace::LanceNamespace;
35use lance_namespace::error::{ErrorCode, NamespaceError};
36use lance_namespace::models::DescribeTableRequest;
37use lance_namespace_impls::ConnectBuilder;
38use std::{
39    collections::{BTreeMap, HashMap},
40    sync::Arc,
41    time::{Duration, Instant},
42};
43use tokio::sync::{Mutex, OnceCell};
44use tokio_stream::StreamExt;
45use url::Url;
46/// Embedded-row count at which pond builds the IVF_SQ vector index on
47/// `messages.vector` (spec.md#search). Below it, vector search runs a
48/// brute-force flat scan - exact and fast at small and medium scale, and
49/// IVF_SQ cannot train well on fewer vectors anyway.
50pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
51
52/// Segment count at which an incremental index fold consolidates instead of
53/// appending. Each `optimize_indices(append)` writes a new same-name segment
54/// (lance `num_indices_to_merge=0`), and every vector/FTS query reads the
55/// probed partition or token postings from *every* segment - so unbounded
56/// delta growth multiplies per-query object-store round-trips. At this many
57/// segments pond folds with `merge` to collapse them back into one.
58pub const DELTA_MERGE_THRESHOLD: usize = 4;
59
60// ---------------------------------------------------------------------------
61// Storage addresses (spec.md#storage-url-grammar)
62// ---------------------------------------------------------------------------
63
64/// A parsed pond storage address. The fat-URL grammar
65/// (`s3+https://host/bucket/prefix`) folds the endpoint into the address so
66/// it can never desync from the bucket (the litestream out-of-band-endpoint
67/// failure class); parsing splits it back into the URL Lance opens plus the
68/// `object_store` options the endpoint implies.
69#[derive(Debug, Clone, PartialEq)]
70pub struct StorageUrl {
71    /// The address as written, canonicalized (scheme/host lowercased by
72    /// `url`, default port stripped, recognized query params removed). Scope
73    /// matching (spec.md#creds-scope-match) and display use this form.
74    canonical: Url,
75    /// The URL handed to Lance.
76    lance: Url,
77    /// Options implied by the scheme - lowest precedence in assembly.
78    scheme_options: Vec<(&'static str, String)>,
79    /// Recognized `?key=value` params - highest precedence.
80    query_options: Vec<(&'static str, String)>,
81    /// `?creds=<name>`: explicit set binding, beats scope matching.
82    creds_pointer: Option<String>,
83    /// Endpoint pieces for the `s3+` schemes. The final endpoint URL depends
84    /// on the resolved `virtual_hosted_style_request` value (object_store
85    /// wants the bucket inside the endpoint host under virtual-hosted
86    /// addressing), so it is assembled at resolve time, not parse time.
87    endpoint: Option<S3Endpoint>,
88}
89
90#[derive(Debug, Clone, PartialEq)]
91struct S3Endpoint {
92    scheme: &'static str,
93    /// host[:port]
94    authority: String,
95    bucket: String,
96}
97
98/// Query params pond recognizes (and strips before the URL reaches Lance).
99/// Anything else is a hard error - a typoed param must not silently reach
100/// the object store as part of the path.
101const RECOGNIZED_QUERY_PARAMS: [&str; 3] = ["creds", "region", "virtual_hosted_style_request"];
102
103impl StorageUrl {
104    /// Parse a storage address (spec.md#storage-url-grammar): bare/`~` paths,
105    /// `file://`, `s3://`, `s3+https://` / `s3+http://`, `gs://`, `az://`,
106    /// and the test-only `memory://` / `shared-memory://`.
107    pub fn parse(input: &str) -> Result<Self> {
108        let trimmed = input.trim();
109        if trimmed.is_empty() {
110            bail!("storage path is empty");
111        }
112        // Bare paths, `~/...`, and `file://` go through Lance's own
113        // `uri_to_url` so pond accepts exactly what Lance accepts.
114        if !trimmed.contains("://") || trimmed.starts_with("file://") {
115            let url =
116                uri_to_url(trimmed).with_context(|| format!("invalid storage path {trimmed:?}"))?;
117            // Bare paths percent-encode `?` (a legal filename character), so
118            // only an explicit `file://...?x=y` parses a query here. No local
119            // scheme takes one; reject like the remote schemes do instead of
120            // silently carrying it into the path Lance opens.
121            if url.query().is_some() {
122                bail!("storage URL {trimmed:?} carries query params; local URLs take none");
123            }
124            return Ok(Self::plain(url));
125        }
126        let url =
127            Url::parse(trimmed).with_context(|| format!("invalid storage URL {trimmed:?}"))?;
128        // RFC 3986 deprecates userinfo; argv/history/ps/logs leak it. Never.
129        if !url.username().is_empty() || url.password().is_some() {
130            bail!(
131                "storage URL {trimmed:?} embeds credentials; put them in [creds.*] (or POND_CREDS_*) instead"
132            );
133        }
134        match url.scheme() {
135            "memory" | "shared-memory" => {
136                if url.query().is_some() {
137                    bail!(
138                        "storage URL {trimmed:?} carries query params; {}:// URLs take none",
139                        url.scheme(),
140                    );
141                }
142                Ok(Self::plain(url))
143            }
144            "s3" | "gs" => {
145                let (canonical, query_options, creds_pointer) = strip_query(url)?;
146                let mut lance = canonical.clone();
147                lance.set_query(None);
148                Ok(Self {
149                    canonical,
150                    lance,
151                    scheme_options: Vec::new(),
152                    query_options,
153                    creds_pointer,
154                    endpoint: None,
155                })
156            }
157            "s3+https" | "s3+http" => {
158                let (mut canonical, query_options, creds_pointer) = strip_query(url)?;
159                let tls = canonical.scheme() == "s3+https";
160                // `url` treats non-special schemes' default ports as
161                // explicit; strip them so scope matching can't split on
162                // `:443` vs nothing.
163                if canonical.port() == Some(if tls { 443 } else { 80 }) {
164                    let _ = canonical.set_port(None);
165                }
166                let host = canonical
167                    .host_str()
168                    .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no endpoint host"))?;
169                let endpoint_authority = match canonical.port() {
170                    Some(port) => format!("{host}:{port}"),
171                    None => host.to_owned(),
172                };
173                let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
174                let bucket = segments.next().unwrap_or_default().to_owned();
175                let prefix = segments.next().unwrap_or_default().to_owned();
176                if bucket.is_empty() {
177                    bail!(
178                        "storage URL {trimmed:?} is missing the bucket: the form is {}://host/bucket/prefix",
179                        canonical.scheme(),
180                    );
181                }
182                let lance = Url::parse(&format!("s3://{bucket}/{prefix}")).with_context(|| {
183                    format!("storage URL {trimmed:?}: bucket/prefix do not form a valid s3:// URL")
184                })?;
185                let scheme = if tls { "https" } else { "http" };
186                // Virtual-hosted is the Hetzner / R2 / B2 default, but an IP
187                // host can't carry a bucket subdomain (`bucket.127.0.0.1`
188                // does not resolve), so MinIO-style IP endpoints flip to
189                // path-style. Override either way via the creds-set field or
190                // `?virtual_hosted_style_request=`. Note: `url` keeps IPv4
191                // hosts as `Host::Domain` on non-special schemes, hence the
192                // explicit IpAddr parse; IPv6 brackets still need the Host
193                // match.
194                let virtual_hosted = host.parse::<std::net::IpAddr>().is_err()
195                    && !matches!(canonical.host(), Some(url::Host::Ipv6(_)));
196                let scheme_options = vec![
197                    ("allow_http", (!tls).to_string()),
198                    ("virtual_hosted_style_request", virtual_hosted.to_string()),
199                    // S3-compatible stores ignore the SigV4 region, so a
200                    // deterministic default (the DuckDB / litestream
201                    // convention) beats Lance's env-chain fallback, where a
202                    // stray AWS_REGION changes behavior. Real AWS (`s3://`,
203                    // no endpoint) auto-resolves the bucket region inside
204                    // Lance instead. Override: creds-set field or ?region=.
205                    ("region", "us-east-1".to_owned()),
206                ];
207                Ok(Self {
208                    canonical,
209                    lance,
210                    scheme_options,
211                    query_options,
212                    creds_pointer,
213                    endpoint: Some(S3Endpoint {
214                        scheme,
215                        authority: endpoint_authority,
216                        bucket,
217                    }),
218                })
219            }
220            "az" => {
221                let (canonical, query_options, creds_pointer) = strip_query(url)?;
222                let account = canonical
223                    .host_str()
224                    .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no account: the form is az://account/container/prefix"))?
225                    .to_owned();
226                let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
227                let container = segments.next().unwrap_or_default();
228                if container.is_empty() {
229                    bail!(
230                        "storage URL {trimmed:?} is missing the container: the form is az://account/container/prefix"
231                    );
232                }
233                let prefix = segments.next().unwrap_or_default();
234                let lance = Url::parse(&format!("az://{container}/{prefix}"))
235                    .with_context(|| format!("storage URL {trimmed:?}: container/prefix do not form a valid az:// URL"))?;
236                Ok(Self {
237                    canonical,
238                    lance,
239                    scheme_options: vec![("account_name", account)],
240                    query_options,
241                    creds_pointer,
242                    endpoint: None,
243                })
244            }
245            other => bail!(
246                "storage URL scheme {other:?} not recognized; use a local path, s3://, s3+https://, s3+http://, gs://, or az://"
247            ),
248        }
249    }
250
251    /// A scheme with no creds machinery: canonical == lance, no options.
252    fn plain(url: Url) -> Self {
253        Self {
254            canonical: url.clone(),
255            lance: url,
256            scheme_options: Vec::new(),
257            query_options: Vec::new(),
258            creds_pointer: None,
259            endpoint: None,
260        }
261    }
262
263    /// The URL Lance opens (endpoint folded into options, not the URL).
264    pub fn lance_url(&self) -> &Url {
265        &self.lance
266    }
267
268    /// The canonical as-written address - what scope matching compares
269    /// against and what display surfaces show (it carries the endpoint).
270    pub fn canonical(&self) -> &Url {
271        &self.canonical
272    }
273
274    pub fn is_local(&self) -> bool {
275        config::is_local(&self.canonical)
276    }
277
278    /// Render for human output: local URLs as plain paths, remote verbatim.
279    pub fn display(&self) -> String {
280        config::display(&self.canonical)
281    }
282
283    /// Whether this scheme authenticates at all. `file`, `memory`, and
284    /// `shared-memory` take no credentials; resolution skips them entirely.
285    fn takes_credentials(&self) -> bool {
286        !matches!(
287            self.canonical.scheme(),
288            "file" | "file+uring" | "memory" | "shared-memory"
289        )
290    }
291
292    /// Resolve this address against the configured creds sets
293    /// (spec.md#creds-scope-match): `?creds=` pointer > longest scoped
294    /// prefix match > the scope-less catch-all > none (object_store's
295    /// ambient SDK chain). Option assembly, later wins: scheme-derived ->
296    /// matched set (non-secret fields + `extra`, then materialized secrets)
297    /// -> URL query params.
298    pub fn resolve(&self, creds: &BTreeMap<String, CredsSet>) -> Result<ResolvedStorage> {
299        if !self.takes_credentials() {
300            return Ok(ResolvedStorage {
301                storage: self.clone(),
302                options: HashMap::new(),
303                binding: CredsBinding::NotApplicable,
304            });
305        }
306        let matched: Option<(&String, &CredsSet, BindVia)> = match &self.creds_pointer {
307            Some(name) => {
308                let set = creds.get(name).ok_or_else(|| {
309                    anyhow!(
310                        "URL names ?creds={name} but no [creds.{name}] set is configured; define it or drop the pointer"
311                    )
312                })?;
313                Some((name, set, BindVia::Pointer))
314            }
315            None => {
316                let mut best: Option<(&String, &CredsSet, String)> = None;
317                for (name, set) in creds {
318                    let Some(scope) = &set.scope else { continue };
319                    let scope_url = parse_scope(scope).with_context(|| {
320                        format!("[creds.{name}] scope {scope:?} is not a valid URL prefix")
321                    })?;
322                    if scope_matches(&scope_url, &self.canonical)
323                        && best
324                            .as_ref()
325                            .is_none_or(|(_, _, len)| scope_url.as_str().len() > len.len())
326                    {
327                        best = Some((name, set, scope_url.as_str().to_owned()));
328                    }
329                }
330                match best {
331                    Some((name, set, _)) => Some((name, set, BindVia::Scope)),
332                    None => creds
333                        .iter()
334                        .find(|(_, set)| set.scope.is_none())
335                        .map(|(name, set)| (name, set, BindVia::CatchAll)),
336                }
337            }
338        };
339        let mut options: HashMap<String, String> = self
340            .scheme_options
341            .iter()
342            .map(|(key, value)| ((*key).to_owned(), value.clone()))
343            .collect();
344        let binding = match matched {
345            None => CredsBinding::Ambient,
346            Some((name, set, via)) => {
347                if let Some(region) = &set.region {
348                    options.insert("region".to_owned(), region.clone());
349                }
350                if let Some(virtual_hosted) = set.virtual_hosted_style_request {
351                    options.insert(
352                        "virtual_hosted_style_request".to_owned(),
353                        virtual_hosted.to_string(),
354                    );
355                }
356                for (key, value) in &set.extra {
357                    options.insert(key.clone(), value.clone());
358                }
359                if let Some(value) = materialize_secret(
360                    name,
361                    "access_key_id",
362                    set.access_key_id.as_deref(),
363                    set.access_key_id_file.as_deref(),
364                    None,
365                )? {
366                    options.insert("access_key_id".to_owned(), value);
367                }
368                if let Some(value) = materialize_secret(
369                    name,
370                    "secret_access_key",
371                    set.secret_access_key.as_deref(),
372                    set.secret_access_key_file.as_deref(),
373                    set.secret_access_key_command.as_deref(),
374                )? {
375                    options.insert("secret_access_key".to_owned(), value);
376                }
377                CredsBinding::Set {
378                    name: name.clone(),
379                    via,
380                }
381            }
382        };
383        for (key, value) in &self.query_options {
384            options.insert((*key).to_owned(), value.clone());
385        }
386        // The endpoint is assembled last: under virtual-hosted addressing
387        // object_store expects the bucket inside the endpoint host, so the
388        // URL depends on the final virtual_hosted_style_request value. An
389        // explicit endpoint in `extra` wins (the escape hatch).
390        if let Some(endpoint) = &self.endpoint
391            && !options.keys().any(|key| {
392                key.eq_ignore_ascii_case("endpoint") || key.eq_ignore_ascii_case("aws_endpoint")
393            })
394        {
395            let virtual_hosted = options
396                .get("virtual_hosted_style_request")
397                .is_some_and(|value| value == "true");
398            let url = if virtual_hosted {
399                format!(
400                    "{}://{}.{}",
401                    endpoint.scheme, endpoint.bucket, endpoint.authority
402                )
403            } else {
404                format!("{}://{}", endpoint.scheme, endpoint.authority)
405            };
406            options.insert("endpoint".to_owned(), url);
407        }
408        Ok(ResolvedStorage {
409            storage: self.clone(),
410            options,
411            binding,
412        })
413    }
414}
415
416/// (canonical URL, recognized query options, `?creds=` pointer).
417type StrippedQuery = (Url, Vec<(&'static str, String)>, Option<String>);
418
419/// Pull recognized query params off the URL; reject unrecognized ones.
420fn strip_query(url: Url) -> Result<StrippedQuery> {
421    let mut query_options = Vec::new();
422    let mut creds_pointer = None;
423    for (key, value) in url.query_pairs() {
424        match RECOGNIZED_QUERY_PARAMS
425            .iter()
426            .find(|known| **known == key.as_ref())
427        {
428            Some(&"creds") => creds_pointer = Some(value.into_owned()),
429            Some(known) => query_options.push((*known, value.into_owned())),
430            None => bail!(
431                "storage URL query param {key:?} not recognized (known: {})",
432                RECOGNIZED_QUERY_PARAMS.join(", "),
433            ),
434        }
435    }
436    let mut canonical = url;
437    canonical.set_query(None);
438    Ok((canonical, query_options, creds_pointer))
439}
440
441/// Parse a `[creds.*] scope` URL prefix into the same canonical form
442/// `StorageUrl::parse` produces, so comparison is exact.
443pub(crate) fn parse_scope(scope: &str) -> Result<Url> {
444    let mut url = Url::parse(scope.trim())?;
445    if !url.username().is_empty() || url.password().is_some() {
446        bail!("scope embeds credentials");
447    }
448    if url.query().is_some() {
449        bail!("scope carries query params; scopes are plain URL prefixes");
450    }
451    match (url.scheme(), url.port()) {
452        ("s3+https", Some(443)) | ("s3+http", Some(80)) => {
453            let _ = url.set_port(None);
454        }
455        _ => {}
456    }
457    Ok(url)
458}
459
460/// spec.md#creds-scope-match: scheme, host, and port equal; path matches at
461/// `/` segment boundaries only (`.../pond` does not match `.../pond-2`). No
462/// cross-scheme normalization: a `s3+https://host/bucket/` scope does not
463/// match a `s3://bucket/` URL.
464fn scope_matches(scope: &Url, address: &Url) -> bool {
465    if scope.scheme() != address.scheme()
466        || scope.host_str() != address.host_str()
467        || scope.port() != address.port()
468    {
469        return false;
470    }
471    let scope_path = scope.path().trim_end_matches('/');
472    let address_path = address.path().trim_end_matches('/');
473    address_path == scope_path
474        || address_path
475            .strip_prefix(scope_path)
476            .is_some_and(|rest| rest.starts_with('/'))
477}
478
479/// How a creds set got bound to a URL - surfaced in binding lines so a wrong
480/// match is visible before any auth error.
481#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum BindVia {
483    /// `?creds=<name>` pointer on the URL.
484    Pointer,
485    /// Longest-prefix `scope` match.
486    Scope,
487    /// The scope-less catch-all set.
488    CatchAll,
489}
490
491#[derive(Debug, Clone, PartialEq)]
492pub enum CredsBinding {
493    /// A `[creds.<name>]` set bound to this URL.
494    Set { name: String, via: BindVia },
495    /// No set matched; object_store's ambient SDK chain applies (AWS_* env,
496    /// shared credentials file, IMDS/container metadata). A documented
497    /// invariant, not an accident - instance profiles and OIDC work with
498    /// zero pond config.
499    Ambient,
500    /// Local / in-memory scheme; credentials don't apply.
501    NotApplicable,
502}
503
504impl CredsBinding {
505    /// One-line human rendering for binding lines and `pond config show`.
506    pub fn describe(&self) -> String {
507        match self {
508            Self::Set { name, via } => {
509                let via = match via {
510                    BindVia::Pointer => "?creds",
511                    BindVia::Scope => "scope match",
512                    BindVia::CatchAll => "catch-all",
513                };
514                format!("creds {name} ({via})")
515            }
516            Self::Ambient => "ambient chain".to_owned(),
517            Self::NotApplicable => "local (no credentials)".to_owned(),
518        }
519    }
520}
521
522/// A storage address with its options assembled and secrets materialized -
523/// everything `Store::open_with_options` needs, plus the binding for
524/// display.
525#[derive(Debug, Clone)]
526pub struct ResolvedStorage {
527    storage: StorageUrl,
528    pub options: HashMap<String, String>,
529    pub binding: CredsBinding,
530}
531
532impl ResolvedStorage {
533    pub fn lance_url(&self) -> &Url {
534        self.storage.lance_url()
535    }
536
537    pub fn display(&self) -> String {
538        self.storage.display()
539    }
540}
541
542/// Names of defined creds sets that bound to none of this invocation's URLs
543/// (spec.md#creds-scope-match: misbinding must never be silent). Empty when
544/// the invocation touched no credential-taking URL - a local-only command
545/// must not nag about sets kept for remote work.
546pub fn unmatched_creds_sets<'c>(
547    resolved: &[&ResolvedStorage],
548    creds: &'c BTreeMap<String, CredsSet>,
549) -> Vec<&'c str> {
550    if resolved
551        .iter()
552        .all(|entry| matches!(entry.binding, CredsBinding::NotApplicable))
553    {
554        return Vec::new();
555    }
556    creds
557        .keys()
558        .filter(|name| {
559            !resolved.iter().any(|entry| {
560                matches!(&entry.binding, CredsBinding::Set { name: bound, .. } if bound == *name)
561            })
562        })
563        .map(String::as_str)
564        .collect()
565}
566
567/// Materialize one logical secret from its inline / `_file` / `_command`
568/// variant (validation guarantees at most one is set).
569fn materialize_secret(
570    set: &str,
571    field: &str,
572    inline: Option<&str>,
573    file: Option<&std::path::Path>,
574    command: Option<&str>,
575) -> Result<Option<String>> {
576    if let Some(value) = inline {
577        return Ok(Some(value.to_owned()));
578    }
579    if let Some(path) = file {
580        let text = std::fs::read_to_string(path).with_context(|| {
581            format!(
582                "[creds.{set}] {field}_file: failed to read {}",
583                path.display()
584            )
585        })?;
586        return Ok(Some(strip_one_newline(text)));
587    }
588    if let Some(command) = command {
589        return Ok(Some(run_secret_command(set, field, command)?));
590    }
591    Ok(None)
592}
593
594/// Run a `*_command` secret source. Output is cached per command text per
595/// process, so N URLs resolving through one set cost one subprocess.
596fn run_secret_command(set: &str, field: &str, command: &str) -> Result<String> {
597    static CACHE: std::sync::OnceLock<std::sync::Mutex<HashMap<String, String>>> =
598        std::sync::OnceLock::new();
599    let cache = CACHE.get_or_init(Default::default);
600    if let Some(hit) = cache
601        .lock()
602        .unwrap_or_else(std::sync::PoisonError::into_inner)
603        .get(command)
604    {
605        return Ok(hit.clone());
606    }
607    let output = std::process::Command::new("sh")
608        .arg("-c")
609        .arg(command)
610        .output()
611        .with_context(|| format!("[creds.{set}] {field}_command failed to spawn: {command}"))?;
612    if !output.status.success() {
613        bail!(
614            "[creds.{set}] {field}_command exited {}: {command}\n{}",
615            output.status,
616            String::from_utf8_lossy(&output.stderr).trim_end(),
617        );
618    }
619    let value = strip_one_newline(
620        String::from_utf8(output.stdout)
621            .with_context(|| format!("[creds.{set}] {field}_command output is not UTF-8"))?,
622    );
623    cache
624        .lock()
625        .unwrap_or_else(std::sync::PoisonError::into_inner)
626        .insert(command.to_owned(), value.clone());
627    Ok(value)
628}
629
630/// Strip exactly one trailing newline (the one `echo` / `op read` append);
631/// anything beyond that is part of the secret.
632fn strip_one_newline(mut text: String) -> String {
633    if text.ends_with('\n') {
634        text.pop();
635        if text.ends_with('\r') {
636            text.pop();
637        }
638    }
639    text
640}
641
642/// `pond storage check` failure classes, each with its own exit code at the
643/// CLI so cron and CI can branch on them. Display carries only the
644/// fix-naming lead; the underlying error is exposed separately through
645/// [`CheckFailure::concise_cause`] so surfaces stay one readable line
646/// instead of trailing the upstream chain (Lance flattens its inner errors
647/// into each level's Display, so the raw chain prints the same failure
648/// several times over).
649#[derive(Debug, thiserror::Error)]
650pub enum CheckFailure {
651    #[error(
652        "authentication failed and no creds set matched this URL; add one with `pond creds add` (or set POND_CREDS_*), or provide ambient AWS_* credentials"
653    )]
654    NoCreds { source: anyhow::Error },
655    #[error("authentication failed using creds set {set:?}; check its keys and scope")]
656    Auth { set: String, source: anyhow::Error },
657    #[error(
658        "backend does not enforce conditional writes (If-None-Match); concurrent pond writers would corrupt each other - {detail}"
659    )]
660    OccUnsupported { detail: String },
661    #[error("storage probe failed")]
662    Io { source: anyhow::Error },
663}
664
665impl CheckFailure {
666    /// The root cause, condensed to one operator-readable line: the deepest
667    /// error in the chain with upstream noise stripped - Lance's bug-report
668    /// boilerplate, internal `<WORKSPACE>` source locations, and the repeated
669    /// wrapper text that follows them. `None` for `OccUnsupported`, whose
670    /// `detail` is already curated into its Display.
671    pub fn concise_cause(&self) -> Option<String> {
672        let source = match self {
673            Self::NoCreds { source } | Self::Auth { source, .. } | Self::Io { source } => source,
674            Self::OccUnsupported { .. } => return None,
675        };
676        Some(condense_error_chain(source))
677    }
678}
679
680/// One-line root cause for a probe error. Takes the deepest chain entry
681/// (each outer Lance/object_store layer re-prints its inner error, so the
682/// deepest is the least redundant), cuts at the first internal source
683/// location (everything after it is upstream re-printing), strips Lance's
684/// bug-report boilerplate, and middle-truncates - the tail is kept because
685/// wrapped transport errors put the root (DNS, connect) at the end.
686fn condense_error_chain(error: &anyhow::Error) -> String {
687    let mut text = error
688        .chain()
689        .last()
690        .map(ToString::to_string)
691        .unwrap_or_else(|| format!("{error:#}"));
692    if let Some(pos) = text.find(", <WORKSPACE>") {
693        text.truncate(pos);
694    }
695    text = text.replace(
696        "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. ",
697        "",
698    );
699    let line = text.split_whitespace().collect::<Vec<_>>().join(" ");
700    const HEAD: usize = 120;
701    const TAIL: usize = 120;
702    let chars: Vec<char> = line.chars().collect();
703    if chars.len() > HEAD + TAIL + 5 {
704        let head: String = chars[..HEAD].iter().collect();
705        let tail: String = chars[chars.len() - TAIL..].iter().collect();
706        format!("{head} ... {tail}")
707    } else {
708        line
709    }
710}
711
712/// Probe a resolved storage destination end-to-end (spec.md#substrate): a
713/// conditional `PutMode::Create` pair proving the `If-None-Match` -> 412 OCC
714/// primitive Lance's commit handler relies on, then read-back and delete of
715/// the synthetic key.
716pub async fn storage_check(resolved: &ResolvedStorage) -> std::result::Result<(), CheckFailure> {
717    use object_store::{Error as OsError, ObjectStoreExt, PutMode, PutOptions, PutPayload};
718
719    let classify =
720        |error: OsError, step: &str| classify_check_error(error, &resolved.binding, step);
721
722    let probe_uri = format!(
723        "{}/_config-check/{}",
724        resolved.lance_url().as_str().trim_end_matches('/'),
725        uuid::Uuid::now_v7(),
726    );
727    let params = ObjectStoreParams {
728        storage_options_accessor: (!resolved.options.is_empty()).then(|| {
729            Arc::new(StorageOptionsAccessor::with_static_options(
730                resolved.options.clone(),
731            ))
732        }),
733        ..Default::default()
734    };
735    let registry = Arc::new(ObjectStoreRegistry::default());
736    let (store, path) = ObjectStore::from_uri_and_params(registry, &probe_uri, &params)
737        .await
738        .map_err(|error| CheckFailure::Io {
739            source: anyhow!(error).context(format!("failed to open object store for {probe_uri}")),
740        })?;
741
742    let body: &[u8] = b"pond storage check";
743    let create = PutOptions::from(PutMode::Create);
744    store
745        .inner
746        .put_opts(&path, PutPayload::from_static(body), create.clone())
747        .await
748        .map_err(|error| classify(error, "initial conditional put"))?;
749    // The probe key exists from here on: run the remaining steps, then
750    // best-effort delete it whatever they returned - a failed probe must
751    // not leave litter behind.
752    let outcome = async {
753        // The second create MUST lose: this is the `If-None-Match: *` -> 412
754        // primitive multi-writer OCC stands on. A backend that lets it
755        // through (or rejects the header) silently overwrites concurrent
756        // commits.
757        match store
758            .inner
759            .put_opts(&path, PutPayload::from_static(body), create)
760            .await
761        {
762            Err(OsError::AlreadyExists { .. }) => {}
763            Ok(_) => {
764                return Err(CheckFailure::OccUnsupported {
765                    detail: "a second create over an existing key succeeded".to_owned(),
766                });
767            }
768            Err(OsError::NotImplemented { .. }) => {
769                return Err(CheckFailure::OccUnsupported {
770                    detail: "the backend rejects conditional puts as unimplemented".to_owned(),
771                });
772            }
773            Err(error) => return Err(classify(error, "conditional-put probe")),
774        }
775        let read_back = store
776            .inner
777            .get(&path)
778            .await
779            .map_err(|error| classify(error, "read-back"))?
780            .bytes()
781            .await
782            .map_err(|error| classify(error, "read-back body"))?;
783        if read_back.as_ref() != body {
784            return Err(CheckFailure::Io {
785                source: anyhow!("read-back returned different bytes than written"),
786            });
787        }
788        Ok(())
789    }
790    .await;
791    let cleanup = store.inner.delete(&path).await;
792    outcome?;
793    cleanup.map_err(|error| classify(error, "cleanup delete"))?;
794    Ok(())
795}
796
797/// Map an `object_store` error onto the check's failure classes: an auth
798/// error is attributed to the bound creds set when one matched, and to the
799/// (empty) ambient chain when none did; everything else is I/O.
800fn classify_check_error(
801    error: object_store::Error,
802    binding: &CredsBinding,
803    step: &str,
804) -> CheckFailure {
805    use object_store::Error as OsError;
806    // Lance erases a missing-credentials failure into a `Generic` error - the
807    // typed `Unauthenticated` never surfaces for an empty provider chain - so
808    // also match the AWS SDK's rendered `CredentialsNotLoaded` signal. Both
809    // are auth-class: attributed to the bound set, else the empty ambient chain.
810    let auth_class = matches!(
811        error,
812        OsError::Unauthenticated { .. } | OsError::PermissionDenied { .. }
813    ) || {
814        let rendered = error.to_string();
815        rendered.contains("CredentialsNotLoaded")
816            || rendered.contains("no providers in chain provided credentials")
817    };
818    match (auth_class, binding) {
819        (true, CredsBinding::Set { name, .. }) => CheckFailure::Auth {
820            set: name.clone(),
821            source: anyhow!(error).context(step.to_owned()),
822        },
823        (true, _) => CheckFailure::NoCreds {
824            source: anyhow!(error).context(step.to_owned()),
825        },
826        (false, _) => CheckFailure::Io {
827            source: anyhow!(error).context(step.to_owned()),
828        },
829    }
830}
831
832/// Per-task fragment-count backstop: tasks this wide always run, bounding
833/// manifest growth even when the amplification veto would skip them. As
834/// policy cap, 0 disables the veto (tests).
835pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
836
837/// Fragments are sized by bytes, not Lance's 1M-row default: kilobyte-average
838/// rows make a row target tolerate multi-GiB fragments that compaction
839/// re-rewrites wholesale to absorb tiny appends (~190 GiB/day of churn).
840pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
841
842const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
843/// Ceiling = Lance's own default.
844const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
845
846/// Keep a task only when the merged-in remainder is >= largest/this:
847/// size-tiered amortization, O(log n) lifetime rewrites per row.
848pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
849
850/// Default manifest-retention window for the safe cleanup pass. Matches
851/// LanceDB's recommended OSS-operator practice (lancedb docs: performance.mdx,
852/// tables/update.mdx). With `delete_unverified=false`, Lance's 7-day
853/// in-progress guard still protects unverified files regardless of this value
854/// (`UNVERIFIED_THRESHOLD_DAYS` in lance/dataset/cleanup.rs).
855pub fn default_cleanup_older_than() -> chrono::Duration {
856    // Toward Lance's 1 h floor: fewer retained manifest versions = cheaper
857    // remote open (spec.md#search). The append fast-path already curbs the
858    // version churn that earlier forced a wider window.
859    chrono::Duration::hours(1)
860}
861
862/// Resolved per-call inputs to the storage-maintenance pass. Built from
863/// `[maintenance]` (and any per-invocation CLI override) at the entry point;
864/// threaded down to `optimize_table_compact` so the substrate never re-reads
865/// `Config` itself.
866#[derive(Debug, Clone, Copy)]
867pub struct MaintenancePolicy {
868    /// See [`DEFAULT_COMPACTION_FRAGMENT_CAP`]; `0` disables the veto.
869    pub compaction_fragment_cap: usize,
870    /// Manifest-retention window handed to `cleanup_old_versions`.
871    pub cleanup_older_than: chrono::Duration,
872}
873
874impl MaintenancePolicy {
875    /// Veto off: run every task Lance plans (the optimize tests assume this).
876    pub fn always_compact() -> Self {
877        Self {
878            compaction_fragment_cap: 0,
879            cleanup_older_than: default_cleanup_older_than(),
880        }
881    }
882}
883
884struct FragmentStat {
885    /// `None` when the manifest lacks any file's size.
886    bytes: Option<u64>,
887    rows: u64,
888    deleted_rows: u64,
889}
890
891/// Data-file bytes of one fragment; `None` (poisoning) when any size is
892/// missing from the manifest.
893fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
894    fragment.files.iter().try_fold(0u64, |total, file| {
895        Some(total + file.file_size_bytes.get()?.get())
896    })
897}
898
899fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
900    FragmentStat {
901        bytes: fragment_bytes(fragment),
902        rows: fragment.physical_rows.unwrap_or(0) as u64,
903        deleted_rows: fragment
904            .deletion_file
905            .as_ref()
906            .and_then(|deletions| deletions.num_deleted_rows)
907            .unwrap_or(0) as u64,
908    }
909}
910
911/// Rows per [`TARGET_FRAGMENT_BYTES`] at the table's average row size.
912fn derived_target_rows(stats: &[FragmentStat]) -> usize {
913    let (mut bytes, mut rows) = (0u64, 0u64);
914    for stat in stats {
915        if let Some(fragment_bytes) = stat.bytes
916            && stat.rows > 0
917        {
918            bytes += fragment_bytes;
919            rows += stat.rows;
920        }
921    }
922    if bytes == 0 || rows == 0 {
923        return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
924    }
925    let avg_row_bytes = (bytes / rows).max(1);
926    (TARGET_FRAGMENT_BYTES / avg_row_bytes)
927        .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
928}
929
930/// Amplification veto: skip tasks that mostly rewrite one big fragment to
931/// absorb fresh appends. Deletion-materialization tasks always pass (vetoing
932/// them would leave tombstones unreclaimed forever); compared in bytes when
933/// every file size is known, rows otherwise.
934fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
935    if stats.iter().any(|stat| {
936        stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
937    }) {
938        return true;
939    }
940    if stats.len() >= cap {
941        return true;
942    }
943    let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
944        stats.iter().filter_map(|stat| stat.bytes).collect()
945    } else {
946        stats.iter().map(|stat| stat.rows).collect()
947    };
948    let total: u64 = weights.iter().sum();
949    let largest = weights.iter().copied().max().unwrap_or(0);
950    (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
951}
952
953/// Declarative description of one index pond keeps on a table. Created when
954/// its trigger fires; folded forward by `pond optimize`.
955#[derive(Debug, Clone)]
956pub struct IndexIntent {
957    /// Stable on-disk name. Must match across runs so existence checks
958    /// resolve.
959    pub name: &'static str,
960    /// Column the index covers.
961    pub column: &'static str,
962    /// Condition evaluated against the live dataset before each cycle.
963    pub trigger: IndexTrigger,
964    /// How the params are built at create time. Some intents have static
965    /// params (FTS, scalars); IVF_SQ needs the row count to size partitions.
966    pub params: IndexParamsKind,
967}
968
969/// When an [`IndexIntent`] should exist on disk.
970#[derive(Debug, Clone)]
971pub enum IndexTrigger {
972    /// Build whenever the table has any rows. Used for FTS and scalar
973    /// indices: there is no training cost worth delaying.
974    OnAnyRows,
975    /// Build when `count(<column> IS NOT NULL) >= threshold`. Used for the
976    /// IVF_SQ vector index, which trains poorly on too few vectors.
977    OnNonNullCount {
978        column: &'static str,
979        threshold: usize,
980    },
981}
982
983/// The lance-native shape of an [`IndexIntent`]'s params, dispatched to the
984/// right `IndexParams` at create time.
985#[derive(Debug, Clone)]
986pub enum IndexParamsKind {
987    /// `BuiltinIndexType::BTree` -> [`IndexType::BTree`];
988    /// `BuiltinIndexType::Bitmap` -> [`IndexType::Bitmap`]; etc.
989    Scalar(BuiltinIndexType),
990    /// `InvertedIndexParams` with the word-level `simple` tokenizer plus
991    /// English stemming, stop-words off (spec.md#search-language-neutral-index).
992    /// Word retrieval beats character ngram ~2x on the real corpus at ~4x less
993    /// index weight; substring/symbol lookup stays on the SQL `LIKE` /
994    /// `contains_tokens` path, not here.
995    InvertedFtsWord,
996    /// `VectorIndexParams::with_ivf_sq_params` with cosine metric (e5 vectors
997    /// are L2-normalized). 8-bit scalar quantization stores per-dimension codes
998    /// in the index itself, so kNN computes distances from the prewarmed
999    /// partition with no refine pass - PQ+refine instead re-reads ~k*factor
1000    /// exact vectors from the data files as scattered per-row GETs, the
1001    /// dominant per-query S3 request storm on a throttling remote store
1002    /// (spec.md#search). `max_iters` caps kmeans; partitions follow LanceDB's
1003    /// documented `num_rows // 4096` guidance, floored at one.
1004    IvfSqCosine { num_bits: u16, max_iters: usize },
1005}
1006
1007impl IndexTrigger {
1008    async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
1009        match self {
1010            Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
1011            Self::OnNonNullCount { column, threshold } => {
1012                let count = dataset
1013                    .count_rows(Some(format!("{column} IS NOT NULL")))
1014                    .await?;
1015                Ok(count >= *threshold)
1016            }
1017        }
1018    }
1019}
1020
1021impl IndexParamsKind {
1022    fn index_type(&self) -> IndexType {
1023        match self {
1024            Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
1025            Self::Scalar(BuiltinIndexType::ZoneMap) => IndexType::ZoneMap,
1026            Self::Scalar(_) => IndexType::BTree,
1027            Self::InvertedFtsWord => IndexType::Inverted,
1028            Self::IvfSqCosine { .. } => IndexType::Vector,
1029        }
1030    }
1031
1032    async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
1033        match self {
1034            Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
1035            Self::InvertedFtsWord => Ok(Box::new(
1036                InvertedIndexParams::default()
1037                    .base_tokenizer("simple".to_owned())
1038                    .stem(true)
1039                    .remove_stop_words(false),
1040            )),
1041            Self::IvfSqCosine {
1042                num_bits,
1043                max_iters,
1044            } => {
1045                let count = dataset
1046                    .count_rows(Some("vector IS NOT NULL".to_owned()))
1047                    .await?;
1048                let partitions = count.checked_div(4096).unwrap_or(0).max(1);
1049                let mut ivf = IvfBuildParams::new(partitions);
1050                ivf.max_iters = *max_iters;
1051                let sq = SQBuildParams {
1052                    num_bits: *num_bits,
1053                    ..Default::default()
1054                };
1055                Ok(Box::new(VectorIndexParams::with_ivf_sq_params(
1056                    MetricType::Cosine,
1057                    ivf,
1058                    sq,
1059                )))
1060            }
1061        }
1062    }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq)]
1066pub struct IndexStatus {
1067    pub table: Table,
1068    pub intent_name: String,
1069    pub fragments_covered: usize,
1070    pub unindexed_fragments: usize,
1071    pub unindexed_rows: usize,
1072    pub exists: bool,
1073}
1074
1075/// Anyhow-chain sentinel pond attaches when `retry_lance` exhausts attempts
1076/// against an OCC commit-conflict failure (spec.md#protocol). The wire layer
1077/// downcasts to this type to classify the outcome as `conflict` rather than
1078/// the generic `storage_unavailable`.
1079#[derive(Debug, Clone, Copy)]
1080pub struct ConflictExhausted {
1081    pub attempts: u8,
1082}
1083
1084impl std::fmt::Display for ConflictExhausted {
1085    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1086        write!(
1087            formatter,
1088            "commit conflict exhausted after {} attempt(s)",
1089            self.attempts
1090        )
1091    }
1092}
1093
1094impl std::error::Error for ConflictExhausted {}
1095
1096/// Per-phase result for one table's pass through `Handle::optimize_table`.
1097/// spec.md#substrate 3.7 (`lance-index-maintenance`): the indices phase and the
1098/// compaction phase get independent retry budgets and independent commits,
1099/// so a hot writer that starves the Rewrite cannot abort the index Update.
1100#[derive(Debug)]
1101pub enum PhaseOutcome {
1102    /// Phase attempted and committed work.
1103    Ok,
1104    /// Phase attempted; no work was needed.
1105    Noop,
1106    /// Phase attempted; OCC retry budget exhausted on conflict (the operator
1107    /// can rerun later once the hot writer quiesces).
1108    SkippedConflict,
1109    /// Phase failed with a non-conflict error.
1110    Failed(anyhow::Error),
1111    /// Phase not requested by the caller (e.g. compaction skipped under
1112    /// `Store::build_indices_only`).
1113    NotAttempted,
1114}
1115
1116impl PhaseOutcome {
1117    pub fn is_failed(&self) -> bool {
1118        matches!(self, Self::Failed(_))
1119    }
1120}
1121
1122/// What `Handle::optimize_table` did for one table.
1123#[derive(Debug)]
1124pub struct TableOptimizeOutcome {
1125    pub table: Table,
1126    pub indices: PhaseOutcome,
1127    pub compaction: PhaseOutcome,
1128}
1129
1130/// Boundary event during one `Handle::optimize_table` pass. The CLI binds a
1131/// progress callback to render a live spinner; library callers pass `None`.
1132#[derive(Debug, Clone)]
1133pub enum OptimizeEvent {
1134    PhaseStart {
1135        table: Table,
1136        phase: OptimizePhase,
1137        detail: Option<String>,
1138    },
1139    PhaseDone {
1140        table: Table,
1141        phase: OptimizePhase,
1142        elapsed_ms: u64,
1143    },
1144    /// Intra-index liveness, forwarded from Lance's `IndexBuildProgress`
1145    /// callbacks (FTS tokenize/copy, IVF train/shuffle/merge, BTree/Bitmap
1146    /// build stages). Fires many times per index between `PhaseStart` /
1147    /// `PhaseDone`; the spinner just overwrites its message each tick.
1148    IndexStage {
1149        table: Table,
1150        index: String,
1151        stage: String,
1152        completed: u64,
1153        total: Option<u64>,
1154        unit: String,
1155    },
1156}
1157
1158#[derive(Debug, Clone, Copy)]
1159pub enum OptimizePhase {
1160    Compact,
1161    Cleanup,
1162    IndexCreate,
1163    IndexRebuild,
1164    IndexAppend,
1165}
1166
1167impl OptimizePhase {
1168    pub fn label(self) -> &'static str {
1169        match self {
1170            Self::Compact => "compact",
1171            Self::Cleanup => "cleanup",
1172            Self::IndexCreate => "index-create",
1173            Self::IndexRebuild => "index-rebuild",
1174            Self::IndexAppend => "index-append",
1175        }
1176    }
1177}
1178
1179/// `Arc` rather than `Box` so the same callback can be cloned into the
1180/// `PondIndexProgress` Arc that Lance's `IndexBuildProgress` builder demands -
1181/// otherwise intra-index stage events have no path back to the CLI spinner.
1182pub type OptimizeProgressFn = Arc<dyn Fn(OptimizeEvent) + Send + Sync>;
1183
1184fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
1185    if let Some(callback) = progress {
1186        callback(event);
1187    }
1188}
1189
1190/// Bridges Lance's `IndexBuildProgress` async callbacks (`stage_start`,
1191/// `stage_progress`, `stage_complete`) into pond's `OptimizeEvent::IndexStage`
1192/// stream so the CLI spinner can show "fts tokenize_docs 1.4M / 2M rows"
1193/// instead of going dark for 10-20 minutes during a single `create_index` or
1194/// `optimize_indices` call. Remembers the active stage's `total` / `unit` so
1195/// `stage_progress` (which only carries `completed`) can render a full
1196/// fraction. Emissions are throttled to one every 100ms; FTS's per-batch
1197/// `stage_progress` calls would otherwise contend the spinner mutex.
1198struct PondIndexProgress {
1199    callback: OptimizeProgressFn,
1200    table: Table,
1201    index: String,
1202    state: std::sync::Mutex<PondIndexStageState>,
1203}
1204
1205// `IndexBuildProgress` requires `Debug`; the `callback` field is
1206// `Arc<dyn Fn...>` which has no `Debug` impl, so derive doesn't apply.
1207impl std::fmt::Debug for PondIndexProgress {
1208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1209        f.debug_struct("PondIndexProgress")
1210            .field("table", &self.table)
1211            .field("index", &self.index)
1212            .finish_non_exhaustive()
1213    }
1214}
1215
1216#[derive(Debug, Default)]
1217struct PondIndexStageState {
1218    total: Option<u64>,
1219    unit: String,
1220    last_emit: Option<Instant>,
1221}
1222
1223impl PondIndexProgress {
1224    fn new(callback: OptimizeProgressFn, table: Table, index: String) -> Arc<Self> {
1225        Arc::new(Self {
1226            callback,
1227            table,
1228            index,
1229            state: std::sync::Mutex::new(PondIndexStageState::default()),
1230        })
1231    }
1232}
1233
1234#[async_trait::async_trait]
1235impl lance_index::progress::IndexBuildProgress for PondIndexProgress {
1236    async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
1237        if let Ok(mut state) = self.state.lock() {
1238            state.total = total;
1239            state.unit = unit.to_owned();
1240            state.last_emit = Some(Instant::now());
1241        }
1242        (self.callback)(OptimizeEvent::IndexStage {
1243            table: self.table,
1244            index: self.index.clone(),
1245            stage: stage.to_owned(),
1246            completed: 0,
1247            total,
1248            unit: unit.to_owned(),
1249        });
1250        Ok(())
1251    }
1252
1253    async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
1254        let (total, unit) = {
1255            let Ok(mut state) = self.state.lock() else {
1256                return Ok(());
1257            };
1258            let now = Instant::now();
1259            if let Some(prev) = state.last_emit
1260                && now.duration_since(prev) < Duration::from_millis(100)
1261            {
1262                return Ok(());
1263            }
1264            state.last_emit = Some(now);
1265            (state.total, state.unit.clone())
1266        };
1267        (self.callback)(OptimizeEvent::IndexStage {
1268            table: self.table,
1269            index: self.index.clone(),
1270            stage: stage.to_owned(),
1271            completed,
1272            total,
1273            unit,
1274        });
1275        Ok(())
1276    }
1277
1278    async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
1279        let (total, unit) = {
1280            let Ok(state) = self.state.lock() else {
1281                return Ok(());
1282            };
1283            (state.total, state.unit.clone())
1284        };
1285        (self.callback)(OptimizeEvent::IndexStage {
1286            table: self.table,
1287            index: self.index.clone(),
1288            stage: stage.to_owned(),
1289            completed: total.unwrap_or(0),
1290            total,
1291            unit,
1292        });
1293        Ok(())
1294    }
1295}
1296
1297fn lance_progress(
1298    progress: Option<&OptimizeProgressFn>,
1299    table: Table,
1300    index: &str,
1301) -> Arc<dyn lance_index::progress::IndexBuildProgress> {
1302    match progress {
1303        Some(callback) => PondIndexProgress::new(callback.clone(), table, index.to_owned()),
1304        None => Arc::new(lance_index::progress::NoopIndexBuildProgress),
1305    }
1306}
1307
1308/// True when the chain root is one of Lance's commit-conflict variants
1309/// (`CommitConflict`, `RetryableCommitConflict`, `TooMuchWriteContention`).
1310/// Everything else (timeouts, IAM denials, disk errors) is not a conflict.
1311pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
1312    error.downcast_ref::<lance::Error>().is_some_and(|err| {
1313        matches!(
1314            err,
1315            lance::Error::CommitConflict { .. }
1316                | lance::Error::RetryableCommitConflict { .. }
1317                | lance::Error::TooMuchWriteContention { .. }
1318        )
1319    })
1320}
1321
1322/// True when `retry_lance` exhausted retries against an OCC conflict and
1323/// attached `ConflictExhausted` to the chain head.
1324fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
1325    error.chain().any(|cause| cause.is::<ConflictExhausted>())
1326}
1327
1328/// On-disk byte totals for the three session datasets, plus everything else
1329/// under the data-dir root. Sized by listing through Lance's object-store
1330/// layer (spec.md#lance-chokepoints-storage) so `file://` and `s3://` behave alike.
1331#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1332pub struct TableSizes {
1333    pub sessions: u64,
1334    pub messages: u64,
1335    pub parts: u64,
1336    pub other: u64,
1337    pub sessions_data: DataLiveness,
1338    pub messages_data: DataLiveness,
1339    pub parts_data: DataLiveness,
1340}
1341
1342/// `data/` bytes on disk vs bytes the latest manifest references; the gap is
1343/// superseded versions awaiting the cleanup retention window.
1344#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1345pub struct DataLiveness {
1346    pub on_disk: u64,
1347    /// `None` when the manifest lacks any referenced file's size.
1348    pub live: Option<u64>,
1349}
1350
1351impl DataLiveness {
1352    pub fn dead(&self) -> Option<u64> {
1353        self.live.map(|live| self.on_disk.saturating_sub(live))
1354    }
1355}
1356
1357#[derive(Debug, Clone, PartialEq, Eq)]
1358pub enum ScalarValue {
1359    String(String),
1360    Int32(i32),
1361    Raw(String),
1362}
1363impl From<&str> for ScalarValue {
1364    fn from(value: &str) -> Self {
1365        Self::String(value.to_owned())
1366    }
1367}
1368impl From<String> for ScalarValue {
1369    fn from(value: String) -> Self {
1370        Self::String(value)
1371    }
1372}
1373impl From<i32> for ScalarValue {
1374    fn from(value: i32) -> Self {
1375        Self::Int32(value)
1376    }
1377}
1378#[derive(Debug, Clone, PartialEq, Eq)]
1379pub enum Predicate {
1380    Eq(&'static str, ScalarValue),
1381    Ne(&'static str, ScalarValue),
1382    IsNull(&'static str),
1383    IsNotNull(&'static str),
1384    In(&'static str, Vec<ScalarValue>),
1385    LikeContains(&'static str, String),
1386    /// Regex match. Emitted as `regexp_like(<col>, '<pat>')`. Never pushes
1387    /// down to BTREE indexes (Lance's scalar-index-expr parser ignores it),
1388    /// so the filter is a full-scan-with-predicate - acceptable for
1389    /// human-driven `--project re:...` queries, not for hot paths.
1390    Regex(&'static str, String),
1391    Gte(&'static str, ScalarValue),
1392    Lte(&'static str, ScalarValue),
1393    And(Vec<Predicate>),
1394    Or(Vec<Predicate>),
1395    Not(Box<Predicate>),
1396}
1397impl Predicate {
1398    pub fn to_lance(&self) -> String {
1399        match self {
1400            Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
1401            Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
1402            Self::IsNull(column) => format!("{column} IS NULL"),
1403            Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
1404            Self::In(column, values) => {
1405                let values = values
1406                    .iter()
1407                    .map(ScalarValue::to_lance)
1408                    .collect::<Vec<_>>()
1409                    .join(", ");
1410                format!("{column} IN ({values})")
1411            }
1412            Self::LikeContains(column, value) => {
1413                format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
1414            }
1415            Self::Regex(column, pattern) => {
1416                format!("regexp_like({column}, {})", quoted_string(pattern))
1417            }
1418            Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
1419            Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
1420            Self::And(predicates) => predicates
1421                .iter()
1422                .map(Self::to_lance)
1423                .filter(|predicate| !predicate.is_empty())
1424                .collect::<Vec<_>>()
1425                .join(" AND "),
1426            Self::Or(predicates) => {
1427                // Wrap in parens so the disjunction composes safely as a child
1428                // of an outer `And` (SQL `OR` binds looser than `AND`).
1429                let body = predicates
1430                    .iter()
1431                    .map(Self::to_lance)
1432                    .filter(|predicate| !predicate.is_empty())
1433                    .collect::<Vec<_>>()
1434                    .join(" OR ");
1435                if body.is_empty() {
1436                    String::new()
1437                } else {
1438                    format!("({body})")
1439                }
1440            }
1441            Self::Not(inner) => {
1442                let body = inner.to_lance();
1443                if body.is_empty() {
1444                    String::new()
1445                } else {
1446                    format!("NOT ({body})")
1447                }
1448            }
1449        }
1450    }
1451}
1452/// Read-side options for `Handle::scan`: optional prefilter predicate and
1453/// optional projection. Default = no filter, all columns.
1454#[derive(Default)]
1455pub struct ScanOpts<'a> {
1456    pub predicate: Option<&'a Predicate>,
1457    pub projection: Option<&'a [&'a str]>,
1458}
1459
1460impl<'a> ScanOpts<'a> {
1461    pub fn project_only(projection: &'a [&'a str]) -> Self {
1462        Self {
1463            predicate: None,
1464            projection: Some(projection),
1465        }
1466    }
1467    pub fn with_predicate_and_projection(
1468        predicate: &'a Predicate,
1469        projection: &'a [&'a str],
1470    ) -> Self {
1471        Self {
1472            predicate: Some(predicate),
1473            projection: Some(projection),
1474        }
1475    }
1476}
1477
1478impl ScalarValue {
1479    fn to_lance(&self) -> String {
1480        match self {
1481            Self::String(value) => quoted_string(value),
1482            Self::Int32(value) => value.to_string(),
1483            Self::Raw(value) => value.clone(),
1484        }
1485    }
1486}
1487/// Lance cache caps in bytes. `None` lets the substrate pick the backend-aware
1488/// default (local FS gets a tighter cap; object stores stay near Lance's
1489/// defaults). Wired through `Store::open_with_options` from `[runtime]`.
1490#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1491pub struct RuntimeCaps {
1492    pub index_cache_bytes: Option<usize>,
1493    pub metadata_cache_bytes: Option<usize>,
1494}
1495
1496impl RuntimeCaps {
1497    pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
1498        Self {
1499            index_cache_bytes: config.index_cache_bytes,
1500            metadata_cache_bytes: config.metadata_cache_bytes,
1501        }
1502    }
1503}
1504
1505/// Local-FS default: tight enough that a long-lived `pond mcp` lands well
1506/// under the 500 MiB target without measurable latency cost vs Lance's 6 GiB
1507/// default (see `benches/serve_mem_bench.rs --cap-sweep`).
1508const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
1509const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
1510/// Object-store defaults: latency to refill is per-page, so keep more in cache
1511/// than local - but bounded above the warm working set, not Lance's 6 GiB.
1512/// Post word-tokenizer FTS that set is ~450 MB (simple invert + IVF_SQ aux), so
1513/// 1 GiB holds both indices warm with headroom while capping the RSS ceiling.
1514const REMOTE_INDEX_CACHE_BYTES: usize = 1024 * 1024 * 1024;
1515const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
1516
1517fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
1518    let (index_default, metadata_default) = if config::is_local(location) {
1519        (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
1520    } else {
1521        (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
1522    };
1523    (
1524        caps.index_cache_bytes.unwrap_or(index_default),
1525        caps.metadata_cache_bytes.unwrap_or(metadata_default),
1526    )
1527}
1528
1529pub struct Handle {
1530    datasets: DatasetSet,
1531    retry: RetryPolicy,
1532    /// One `lance::Session` shared across all three datasets. Carries the
1533    /// metadata + index caches and the `ObjectStoreRegistry` (which holds
1534    /// the underlying object_store / S3 client). Sharing the session means
1535    /// one cache pool covers all three tables and one S3 client serves all
1536    /// three datasets - load-bearing on object-store backends where a
1537    /// per-dataset client would mean 3x the connection pools and 3x the
1538    /// credential refreshes (lance/src/dataset/builder.rs:509-517).
1539    #[allow(dead_code)]
1540    session: Arc<Session>,
1541    /// The `lance-namespace` catalog seam. v1 uses the Directory impl;
1542    /// future hosted pond swaps to "rest" without touching read/write paths
1543    /// (spec.md#lance-chokepoints-catalog).
1544    nm: Arc<dyn LanceNamespace>,
1545    /// Namespace identifier this handle binds to. v1 is always `root()`; the
1546    /// typed seam matches `resolve_namespace`'s return so multi-namespace
1547    /// routing can land without churning call sites (spec.md#wire-namespace-resolution).
1548    nm_ident: NamespaceIdent,
1549    /// Object-store options threaded through every `DatasetBuilder` and
1550    /// `Dataset::write` call so refresh / index-creation paths inherit the
1551    /// same credentials and region as the initial open. Empty on local-FS
1552    /// installs.
1553    storage_options: HashMap<String, String>,
1554    /// Data-dir URL the handle was opened against. `pond status` reads this
1555    /// to display where the bytes live and to decide whether to walk a local
1556    /// directory or issue a remote `LIST` for sizing.
1557    location: Url,
1558    /// Cached `parts.lance` open metadata, used the first time a caller asks
1559    /// for parts. Holds the namespace probe shape so the lazy open re-uses the
1560    /// same `lance-chokepoints-catalog` path as the eager opens for sessions/messages.
1561    parts_refresh_after: Duration,
1562}
1563
1564impl std::fmt::Debug for Handle {
1565    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1566        formatter
1567            .debug_struct("Handle")
1568            .field("datasets", &self.datasets)
1569            .field("retry", &self.retry)
1570            .field("nm_ident", &self.nm_ident)
1571            .field("storage_options", &self.storage_options)
1572            .field("location", &self.location)
1573            .finish()
1574    }
1575}
1576
1577#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1578pub enum Table {
1579    Sessions,
1580    Messages,
1581    Parts,
1582}
1583impl Table {
1584    pub fn as_str(self) -> &'static str {
1585        self.label()
1586    }
1587
1588    fn label(self) -> &'static str {
1589        match self {
1590            Self::Sessions => "sessions",
1591            Self::Messages => "messages",
1592            Self::Parts => "parts",
1593        }
1594    }
1595}
1596#[derive(Debug)]
1597struct DatasetSet {
1598    sessions: Mutex<CachedDataset>,
1599    messages: Mutex<CachedDataset>,
1600    /// `parts.lance` opens lazily on the first read or write that needs it:
1601    /// any `pond_get` (every mode reads parts to build summaries), grouped
1602    /// search hydrating user-hit summaries, or ingest with Part events. A
1603    /// process that does none of those skips the file, saving its metadata
1604    /// pages and file handle at cold-open. The OnceCell makes init
1605    /// single-flight; the inner `Mutex<CachedDataset>` then behaves identically
1606    /// to the other two.
1607    parts: OnceCell<Mutex<CachedDataset>>,
1608}
1609#[derive(Debug)]
1610struct CachedDataset {
1611    dataset: Dataset,
1612    last_refresh: Instant,
1613    refresh_after: Duration,
1614}
1615impl CachedDataset {
1616    async fn latest(&mut self) -> Result<Dataset> {
1617        if self.last_refresh.elapsed() >= self.refresh_after {
1618            self.dataset.checkout_latest().await?;
1619            self.last_refresh = Instant::now();
1620        }
1621        Ok(self.dataset.clone())
1622    }
1623    fn replace(&mut self, dataset: Dataset) {
1624        self.dataset = dataset;
1625        self.last_refresh = Instant::now();
1626    }
1627}
1628
1629/// Outcome of one [`Handle::append_stream`] write. Lance's `execute_stream`
1630/// returns only the new `Dataset` (no write summary), so these totals are
1631/// captured from the cumulative `WriteStats` ticks plus pond's own OCC attempt
1632/// counter.
1633#[derive(Debug, Clone, Copy, Default)]
1634pub struct AppendStats {
1635    pub rows: u64,
1636    pub bytes_written: u64,
1637    pub files_written: u64,
1638    pub attempts: u32,
1639}
1640
1641/// Monotonic high-water fold over the cumulative `WriteStats` ticks
1642/// `append_stream` receives. Lance restarts a stream's cumulative counters from
1643/// zero on each OCC retry, so `fetch_max` keeps the fold monotonic - a retry
1644/// contributes nothing until it passes the prior mark, making `AppendStats`
1645/// exact under retries.
1646#[derive(Default)]
1647struct WriteAccum {
1648    rows: std::sync::atomic::AtomicU64,
1649    bytes: std::sync::atomic::AtomicU64,
1650    files: std::sync::atomic::AtomicU64,
1651}
1652
1653impl WriteAccum {
1654    fn observe(&self, stats: &WriteStats) {
1655        use std::sync::atomic::Ordering::Relaxed;
1656        self.rows.fetch_max(stats.rows_written, Relaxed);
1657        self.bytes.fetch_max(stats.bytes_written, Relaxed);
1658        self.files.fetch_max(stats.files_written as u64, Relaxed);
1659    }
1660    fn rows(&self) -> u64 {
1661        self.rows.load(std::sync::atomic::Ordering::Relaxed)
1662    }
1663    fn bytes(&self) -> u64 {
1664        self.bytes.load(std::sync::atomic::Ordering::Relaxed)
1665    }
1666    fn files(&self) -> u64 {
1667        self.files.load(std::sync::atomic::Ordering::Relaxed)
1668    }
1669}
1670
1671/// Append-mode write params. Byte-sized fragments, not Lance's 90 GB default:
1672/// kilobyte rows would otherwise pack multi-GiB fragments that compaction
1673/// rewrites wholesale (see `TARGET_FRAGMENT_BYTES`). Reuses the create params so
1674/// appended fragments match the table's storage version / row-id mode.
1675fn append_write_params() -> WriteParams {
1676    let mut params = sessions::write_params_for_create();
1677    params.mode = WriteMode::Append;
1678    params.max_bytes_per_file = TARGET_FRAGMENT_BYTES as usize;
1679    params
1680}
1681
1682impl Handle {
1683    /// Open without storage options or explicit cache caps. Backend-aware
1684    /// defaults from `[runtime]` apply.
1685    pub async fn open(location: &Url) -> Result<Self> {
1686        Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
1687    }
1688
1689    /// Live size in bytes of the shared Lance session caches (index + metadata).
1690    /// Walks the caches, so it is not cheap - bench/diagnostic use only.
1691    pub fn lance_cache_bytes(&self) -> u64 {
1692        self.session.size_bytes()
1693    }
1694
1695    /// Open with object-store options handed through to Lance verbatim, plus
1696    /// the resolved `[runtime]` cache caps. Object-store keys are the
1697    /// `object_store` crate's standard config names; pond does not parse them.
1698    /// Opening datasets never performs index work; index lifecycle lives under
1699    /// `Handle::optimize_table`. `parts.lance` opens lazily on first use.
1700    pub async fn open_with_options(
1701        location: &Url,
1702        mut storage_options: HashMap<String, String>,
1703        caps: RuntimeCaps,
1704    ) -> Result<Self> {
1705        if let Some(path) = config::local_path(location) {
1706            tokio::fs::create_dir_all(&path).await.with_context(|| {
1707                format!(
1708                    "failed to create data dir {}; fix the storage destination ([storage].path in config) or re-run `pond init`",
1709                    path.display()
1710                )
1711            })?;
1712        } else {
1713            apply_remote_storage_defaults(&mut storage_options);
1714        }
1715        // One Session shared across all three datasets so metadata/index
1716        // caches and the object_store registry (and thus any S3 client) are
1717        // pooled rather than duplicated three times. Caps are sized by the
1718        // `[runtime]` block; explicit values from `caps` win, otherwise the
1719        // local/remote backend default kicks in.
1720        let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
1721        let session = Arc::new(Session::new(
1722            index_cache_bytes,
1723            metadata_cache_bytes,
1724            Arc::new(ObjectStoreRegistry::default()),
1725        ));
1726        // Build the lance-namespace catalog seam once (spec.md#lance-chokepoints-catalog).
1727        // The `root` property is whatever URL the Directory impl understands;
1728        // `uri_to_url` (lance-io/object_store.rs) accepts both bare paths and
1729        // URLs, so passing the scheme-qualified URL for local FS works the
1730        // same as the bare-path form. Trailing slash stripped for clean logs.
1731        let root = location.as_str().trim_end_matches('/').to_string();
1732        let mut connect = ConnectBuilder::new("dir")
1733            .property("root", root)
1734            .session(session.clone());
1735        // Object-store credentials/region/endpoint flow into the namespace
1736        // via the `storage.<key>` property convention (lance-namespace-impls
1737        // dir.rs from_properties: lines 423-436).
1738        for (key, value) in &storage_options {
1739            connect = connect.property(format!("storage.{key}"), value.clone());
1740        }
1741        let nm: Arc<dyn LanceNamespace> = connect
1742            .connect()
1743            .await
1744            .context("failed to connect lance Directory namespace")?;
1745        let nm_ident = NamespaceIdent::root();
1746        // spec.md#lance-handle-freshness: refresh window is scheme-keyed. Local-FS
1747        // manifest reads are microsecond-cheap, so `0` (always-refresh) is
1748        // essentially free and removes the stale-read window entirely. Object
1749        // stores have real per-call cost; `5s` caps manifest fetch overhead at
1750        // acceptable lag for human-driven queries.
1751        let refresh_after = if config::is_local(location) {
1752            Duration::ZERO
1753        } else {
1754            Duration::from_secs(5)
1755        };
1756        let handle = Self {
1757            datasets: DatasetSet {
1758                sessions: Mutex::new(CachedDataset {
1759                    dataset: open_or_create_via_ns(
1760                        &nm,
1761                        &nm_ident,
1762                        sessions::SESSIONS,
1763                        sessions::session_schema(),
1764                        &session,
1765                        &storage_options,
1766                    )
1767                    .await?,
1768                    last_refresh: Instant::now(),
1769                    refresh_after,
1770                }),
1771                messages: Mutex::new(CachedDataset {
1772                    dataset: open_or_create_via_ns(
1773                        &nm,
1774                        &nm_ident,
1775                        sessions::MESSAGES,
1776                        sessions::message_schema(),
1777                        &session,
1778                        &storage_options,
1779                    )
1780                    .await?,
1781                    last_refresh: Instant::now(),
1782                    refresh_after,
1783                }),
1784                parts: OnceCell::new(),
1785            },
1786            retry: RetryPolicy::default(),
1787            session,
1788            nm,
1789            nm_ident,
1790            storage_options,
1791            location: location.clone(),
1792            parts_refresh_after: refresh_after,
1793        };
1794        Ok(handle)
1795    }
1796
1797    pub fn location(&self) -> &Url {
1798        &self.location
1799    }
1800
1801    /// Read-only view of the `storage_options` the handle was opened with.
1802    /// `pond status` needs them to instantiate a raw `object_store` client
1803    /// that can `LIST` the remote bucket for sizing.
1804    pub fn storage_options(&self) -> &HashMap<String, String> {
1805        &self.storage_options
1806    }
1807
1808    /// Object-store URI for a `pond_sql_query` export artifact:
1809    /// `<location>/exports/<name>`. A sibling of the `*.lance` table dirs;
1810    /// the Directory namespace tracks tables in its `__manifest` table rather
1811    /// than by listing prefixes, so this prefix is never seen as a table
1812    /// (lance-namespace-impls dir/manifest.rs). Never `register_table`'d.
1813    fn export_uri(&self, name: &str) -> String {
1814        format!(
1815            "{}/exports/{name}",
1816            self.location.as_str().trim_end_matches('/')
1817        )
1818    }
1819
1820    /// `ObjectStoreParams` carrying the handle's `storage_options` so raw
1821    /// object-store opens (export I/O, `table_sizes` listing) inherit the same
1822    /// credentials/region as the dataset opens. Empty options -> no accessor.
1823    fn object_store_params(&self) -> ObjectStoreParams {
1824        ObjectStoreParams {
1825            storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1826                Arc::new(StorageOptionsAccessor::with_static_options(
1827                    self.storage_options.clone(),
1828                ))
1829            }),
1830            ..Default::default()
1831        }
1832    }
1833
1834    /// Write a `pond_sql_query` export artifact, reusing the handle's
1835    /// storage_options so S3 installs inherit the same credentials.
1836    pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1837        let uri = self.export_uri(name);
1838        let registry = Arc::new(ObjectStoreRegistry::default());
1839        let (store, path) =
1840            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1841                .await
1842                .with_context(|| format!("failed to open object store for {uri}"))?;
1843        store
1844            .put(&path, bytes)
1845            .await
1846            .with_context(|| format!("failed to write export {uri}"))?;
1847        Ok(())
1848    }
1849
1850    /// Read a `pond_sql_query` export artifact back (for the
1851    /// `pond-sql-export://` MCP resource).
1852    pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1853        let uri = self.export_uri(name);
1854        let registry = Arc::new(ObjectStoreRegistry::default());
1855        let (store, path) =
1856            ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1857                .await
1858                .with_context(|| format!("failed to open object store for {uri}"))?;
1859        let bytes = store
1860            .read_one_all(&path)
1861            .await
1862            .with_context(|| format!("failed to read export {uri}"))?;
1863        Ok(bytes.to_vec())
1864    }
1865
1866    /// Local filesystem path of an export artifact, when the data dir is
1867    /// `file://`. The stdio MCP client shares this filesystem, so it can read
1868    /// the file directly (e.g. duckdb/polars) instead of pulling base64 via
1869    /// `resources/read`. `None` on object-store installs.
1870    pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1871        if self.location.scheme() != "file" {
1872            return None;
1873        }
1874        let dir = self.location.to_file_path().ok()?;
1875        Some(dir.join("exports").join(name))
1876    }
1877
1878    pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1879        Ok((
1880            self.count_rows(Table::Sessions).await?,
1881            self.count_rows(Table::Messages).await?,
1882            self.count_rows(Table::Parts).await?,
1883        ))
1884    }
1885
1886    /// Insert-only merge: append new rows, never overwrite a matched PK.
1887    /// Returns rows inserted. The fold lives separately under
1888    /// `Handle::optimize_table` (spec.md#lance-index-maintenance).
1889    pub(crate) async fn merge_insert(
1890        &self,
1891        table: Table,
1892        batch: RecordBatch,
1893        row_count: usize,
1894    ) -> Result<u64> {
1895        self.merge_insert_stats(table, batch, row_count)
1896            .await
1897            .map(|stats| stats.num_inserted_rows + stats.num_updated_rows)
1898    }
1899
1900    /// Insert-only merge that surfaces Lance's full `MergeStats`. Callers that
1901    /// need bytes written, file count, or OCC retry count (e.g. `pond copy`'s
1902    /// progress display) use this; the thin wrapper above keeps the
1903    /// affected-rows return for everyone else.
1904    pub(crate) async fn merge_insert_stats(
1905        &self,
1906        table: Table,
1907        batch: RecordBatch,
1908        row_count: usize,
1909    ) -> Result<MergeStats> {
1910        self.merge(
1911            table,
1912            batch,
1913            row_count,
1914            "merge_insert",
1915            WhenMatched::DoNothing,
1916            WhenNotMatched::InsertAll,
1917        )
1918        .await
1919    }
1920
1921    /// Update-only merge: `WhenMatched::UpdateAll` on matched PKs; unmatched
1922    /// rows dropped. The fold lives separately under `Handle::optimize_table`.
1923    pub(crate) async fn merge_update(
1924        &self,
1925        table: Table,
1926        batch: RecordBatch,
1927        row_count: usize,
1928    ) -> Result<u64> {
1929        self.merge(
1930            table,
1931            batch,
1932            row_count,
1933            "merge_update",
1934            WhenMatched::UpdateAll,
1935            WhenNotMatched::DoNothing,
1936        )
1937        .await
1938        .map(|stats| stats.num_inserted_rows + stats.num_updated_rows)
1939    }
1940
1941    /// The OCC write-commit seam (spec.md#lance-chokepoints-write): every write -
1942    /// `merge` and the append paths - runs through here. It takes the cached
1943    /// handle's lock, hands `execute` the latest dataset, commits the dataset
1944    /// `execute` returns, and keeps the cache coherent - all under retry.
1945    /// `execute` builds the table-specific builder, runs it, and returns the new
1946    /// dataset plus its own stats payload; it reruns per OCC attempt, so it owns
1947    /// what it needs. Write-type specifics (params, stats, tracing) stay with the
1948    /// caller.
1949    async fn write_committed<E, Fut, P>(&self, table: Table, execute: E) -> Result<P>
1950    where
1951        E: Fn(Arc<Dataset>) -> Fut,
1952        Fut: std::future::Future<Output = Result<(Dataset, P)>>,
1953    {
1954        self.retry_lance(table.label(), || {
1955            let execute = &execute;
1956            async move {
1957                let mut cached = self.cached(table).await?.lock().await;
1958                let existing = cached.latest().await?;
1959                let (dataset, payload) = execute(Arc::new(existing)).await?;
1960                cached.replace(dataset);
1961                Ok(payload)
1962            }
1963        })
1964        .await
1965    }
1966
1967    /// Shared merge path for [`Self::merge_insert`] and [`Self::merge_update`].
1968    /// Returns Lance's `MergeStats` verbatim so the progress layer can read
1969    /// `bytes_written` / `num_files_written` / `num_attempts` without a second
1970    /// round-trip; the thin wrappers above project to `u64` for callers that
1971    /// only need the affected-rows count.
1972    async fn merge(
1973        &self,
1974        table: Table,
1975        batch: RecordBatch,
1976        row_count: usize,
1977        op: &'static str,
1978        when_matched: WhenMatched,
1979        when_not_matched: WhenNotMatched,
1980    ) -> Result<MergeStats> {
1981        if row_count == 0 {
1982            return Ok(MergeStats::default());
1983        }
1984        let started = Instant::now();
1985        let result = self
1986            .write_committed(table, |existing| {
1987                let batch = batch.clone();
1988                let when_matched = when_matched.clone();
1989                let when_not_matched = when_not_matched.clone();
1990                async move {
1991                    let schema = batch.schema();
1992                    let reader = RecordBatchIterator::new([Ok(batch)], schema);
1993                    let mut builder = MergeInsertBuilder::try_new(existing, Vec::new())?;
1994                    builder.when_matched(when_matched);
1995                    builder.when_not_matched(when_not_matched);
1996                    // pond presents each PK at most once per batch; FirstSeen keeps
1997                    // the first occurrence rather than failing (Lance's default).
1998                    builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1999                    // Cleanup is operator-driven via `pond optimize`; the per-commit
2000                    // auto hook would add a LIST per write on remote backends without
2001                    // changing the steady-state retention.
2002                    builder.skip_auto_cleanup(true);
2003                    let (dataset, stats) = builder
2004                        .try_build()?
2005                        .execute_reader(Box::new(reader))
2006                        .await?;
2007                    Ok((dataset.as_ref().clone(), stats))
2008                }
2009            })
2010            .await;
2011        let skipped = result
2012            .as_ref()
2013            .map(|s| s.num_skipped_duplicates)
2014            .unwrap_or(0);
2015        tracing::info!(
2016            target: "pond::perf",
2017            op,
2018            table = %table.label(),
2019            rows = row_count,
2020            elapsed_ms = started.elapsed().as_millis() as u64,
2021            skipped,
2022            "merge",
2023        );
2024        result
2025    }
2026
2027    /// Append a streamed source into `table` under a single commit - the
2028    /// bandwidth-bound counterpart to [`Self::merge`]. spec.md#session-durable-copy:
2029    /// rows that cannot collide on the destination (absent sessions) take this
2030    /// path. `Append` never joins or probes the target, so its cost is the
2031    /// bytes written, not the per-batch commit + key-scan that `merge_insert`
2032    /// pays - the fix for store-to-store copy being commit-latency-bound on
2033    /// remote object stores.
2034    ///
2035    /// `make_source` is a *factory*, not a prebuilt stream: a Lance scan stream
2036    /// is one-shot, so an OCC retry rebuilds it. A single per-call `WriteAccum`
2037    /// (shared across attempts, NOT fresh per attempt) makes the row/byte/file
2038    /// fold exact under retries.
2039    pub(crate) async fn append_stream<F, Fut>(
2040        &self,
2041        table: Table,
2042        make_source: F,
2043    ) -> Result<AppendStats>
2044    where
2045        F: Fn() -> Fut,
2046        Fut: std::future::Future<Output = Result<SendableRecordBatchStream>>,
2047    {
2048        let cum = Arc::new(WriteAccum::default());
2049        let attempts = Arc::new(std::sync::atomic::AtomicU32::new(0));
2050        let started = Instant::now();
2051        self.write_committed(table, |existing| {
2052            let make_source = &make_source;
2053            let cum = cum.clone();
2054            let attempts = attempts.clone();
2055            async move {
2056                attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2057                let stream = make_source().await?;
2058                let dataset = InsertBuilder::new(existing)
2059                    .with_params(&append_write_params())
2060                    .progress(move |stats| cum.observe(&stats))
2061                    .execute_stream(stream)
2062                    .await?;
2063                Ok((dataset, ()))
2064            }
2065        })
2066        .await?;
2067
2068        let attempts = attempts.load(std::sync::atomic::Ordering::Relaxed);
2069        let stats = AppendStats {
2070            rows: cum.rows(),
2071            bytes_written: cum.bytes(),
2072            files_written: cum.files(),
2073            attempts,
2074        };
2075        tracing::info!(
2076            target: "pond::perf",
2077            op = "append",
2078            table = %table.label(),
2079            rows = stats.rows,
2080            files = stats.files_written,
2081            attempts,
2082            elapsed_ms = started.elapsed().as_millis() as u64,
2083            "append",
2084        );
2085        Ok(stats)
2086    }
2087
2088    /// [`Self::append_stream`] for batches pond already holds in memory (the sync
2089    /// write path) instead of a source-store scan. Row count is taken from the
2090    /// batches - exact under OCC retry without depending on the progress tick.
2091    pub(crate) async fn append_batches(
2092        &self,
2093        table: Table,
2094        batches: Vec<RecordBatch>,
2095    ) -> Result<AppendStats> {
2096        let total_rows: u64 = batches.iter().map(|batch| batch.num_rows() as u64).sum();
2097        if total_rows == 0 {
2098            return Ok(AppendStats::default());
2099        }
2100        let cum = Arc::new(WriteAccum::default());
2101        let attempts = Arc::new(std::sync::atomic::AtomicU32::new(0));
2102        let started = Instant::now();
2103        self.write_committed(table, |existing| {
2104            let cum = cum.clone();
2105            let attempts = attempts.clone();
2106            let batches = batches.clone();
2107            async move {
2108                attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2109                let dataset = InsertBuilder::new(existing)
2110                    .with_params(&append_write_params())
2111                    .progress(move |stats| cum.observe(&stats))
2112                    .execute(batches)
2113                    .await?;
2114                Ok((dataset, ()))
2115            }
2116        })
2117        .await?;
2118
2119        let attempts = attempts.load(std::sync::atomic::Ordering::Relaxed);
2120        let stats = AppendStats {
2121            rows: total_rows,
2122            bytes_written: cum.bytes(),
2123            files_written: cum.files(),
2124            attempts,
2125        };
2126        tracing::info!(
2127            target: "pond::perf",
2128            op = "append_batches",
2129            table = %table.label(),
2130            rows = stats.rows,
2131            files = stats.files_written,
2132            attempts,
2133            elapsed_ms = started.elapsed().as_millis() as u64,
2134            "append",
2135        );
2136        Ok(stats)
2137    }
2138
2139    /// Run the table-local maintenance cycle for the supplied index intents.
2140    /// BTree is rebuilt from scratch to dodge Lance v7.0.0-beta.16's flat
2141    /// BTree combine bug; Bitmap, FTS, and IVF_SQ fold via append.
2142    ///
2143    /// spec.md#substrate 3.7 (`lance-index-maintenance`): indices and compaction
2144    /// commit independently and use independent retry budgets, so a hot writer
2145    /// that starves compaction (Rewrite) does not abort the index build
2146    /// (Update) the operator actually asked for.
2147    pub async fn optimize_table(
2148        &self,
2149        table: Table,
2150        intents: &[IndexIntent],
2151        progress: Option<&OptimizeProgressFn>,
2152        policy: &MaintenancePolicy,
2153    ) -> TableOptimizeOutcome {
2154        let compaction = self
2155            .run_optimize_compact_phase(table, progress, policy)
2156            .await;
2157        let indices = self
2158            .run_optimize_indices_phase(table, intents, progress)
2159            .await;
2160        TableOptimizeOutcome {
2161            table,
2162            indices,
2163            compaction,
2164        }
2165    }
2166
2167    /// Run only the indices phase for one table. Used by the optimize embed
2168    /// stage's tail
2169    /// to fold newly written vectors into the indices without paying the
2170    /// compaction retry budget while embed itself may still be writing.
2171    pub async fn optimize_table_indices_only(
2172        &self,
2173        table: Table,
2174        intents: &[IndexIntent],
2175        progress: Option<&OptimizeProgressFn>,
2176    ) -> PhaseOutcome {
2177        self.run_optimize_indices_phase(table, intents, progress)
2178            .await
2179    }
2180
2181    async fn run_optimize_indices_phase(
2182        &self,
2183        table: Table,
2184        intents: &[IndexIntent],
2185        progress: Option<&OptimizeProgressFn>,
2186    ) -> PhaseOutcome {
2187        if intents.is_empty() {
2188            return PhaseOutcome::Noop;
2189        }
2190        let result = self
2191            .retry_lance(table.label(), || async {
2192                let mut guard = self.cached(table).await?.lock().await;
2193                let mut dataset = guard.latest().await?;
2194                let did_work =
2195                    optimize_table_indices(&mut dataset, intents, table, progress).await?;
2196                guard.replace(dataset);
2197                Ok::<_, anyhow::Error>(did_work)
2198            })
2199            .await;
2200        match result {
2201            Ok(true) => PhaseOutcome::Ok,
2202            Ok(false) => PhaseOutcome::Noop,
2203            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
2204            Err(error) => PhaseOutcome::Failed(error),
2205        }
2206    }
2207
2208    async fn run_optimize_compact_phase(
2209        &self,
2210        table: Table,
2211        progress: Option<&OptimizeProgressFn>,
2212        policy: &MaintenancePolicy,
2213    ) -> PhaseOutcome {
2214        let result = self
2215            .retry_lance(table.label(), || async {
2216                let mut guard = self.cached(table).await?.lock().await;
2217                let mut dataset = guard.latest().await?;
2218                optimize_table_compact(&mut dataset, table, progress, policy).await?;
2219                guard.replace(dataset);
2220                Ok::<_, anyhow::Error>(())
2221            })
2222            .await;
2223        match result {
2224            Ok(()) => PhaseOutcome::Ok,
2225            Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
2226            Err(error) => PhaseOutcome::Failed(error),
2227        }
2228    }
2229
2230    pub async fn rebuild_index(
2231        &self,
2232        table: Table,
2233        intent: &IndexIntent,
2234        progress: Option<&OptimizeProgressFn>,
2235    ) -> Result<()> {
2236        emit(
2237            progress,
2238            OptimizeEvent::PhaseStart {
2239                table,
2240                phase: OptimizePhase::IndexRebuild,
2241                detail: Some(intent.name.to_owned()),
2242            },
2243        );
2244        let started = Instant::now();
2245        let result = self
2246            .retry_lance(table.label(), || async {
2247                let mut guard = self.cached(table).await?.lock().await;
2248                let mut dataset = guard.latest().await?;
2249                rebuild_index(&mut dataset, intent, progress, table).await?;
2250                guard.replace(dataset);
2251                Ok(())
2252            })
2253            .await;
2254        emit(
2255            progress,
2256            OptimizeEvent::PhaseDone {
2257                table,
2258                phase: OptimizePhase::IndexRebuild,
2259                elapsed_ms: started.elapsed().as_millis() as u64,
2260            },
2261        );
2262        result
2263    }
2264
2265    /// Lance `cleanup_old_versions` for one table: reclaim files no manifest
2266    /// within the retention window references. No compaction and no new commit -
2267    /// it only deletes superseded files, so no OCC retry is needed.
2268    pub async fn cleanup_table_versions(
2269        &self,
2270        table: Table,
2271        older_than: chrono::Duration,
2272    ) -> Result<()> {
2273        let mut guard = self.cached(table).await?.lock().await;
2274        let dataset = guard.latest().await?;
2275        dataset
2276            .cleanup_old_versions(older_than, Some(false), Some(false))
2277            .await
2278            .with_context(|| format!("cleanup_old_versions failed for {}", table.label()))?;
2279        Ok(())
2280    }
2281
2282    pub async fn index_status(
2283        &self,
2284        table: Table,
2285        intents: &[IndexIntent],
2286    ) -> Result<Vec<IndexStatus>> {
2287        let dataset = self.dataset(table).await?;
2288        index_status(table, &dataset, intents).await
2289    }
2290
2291    pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
2292        let mut cached = self.cached(table).await?.lock().await;
2293        cached.latest().await
2294    }
2295    /// Build a prefiltered `Scanner` for `table`. Composable read entry
2296    /// point for callers that need to layer extra builder calls
2297    /// (`full_text_search`, `nearest`) on top of pond's predicate seam.
2298    /// Routine scans should prefer `Handle::scan`.
2299    pub(crate) async fn scanner(
2300        &self,
2301        table: Table,
2302        predicate: Option<&Predicate>,
2303    ) -> Result<lance::dataset::scanner::Scanner> {
2304        let dataset = self.dataset(table).await?;
2305        scanner_with_prefilter(&dataset, predicate)
2306    }
2307    /// Single read entry point: prefilter via `predicate`, optionally
2308    /// project, return the prepared `Scanner` (spec.md#lance-chokepoints-read).
2309    pub async fn scan(
2310        &self,
2311        table: Table,
2312        opts: ScanOpts<'_>,
2313    ) -> Result<lance::dataset::scanner::Scanner> {
2314        let mut scanner = self.scanner(table, opts.predicate).await?;
2315        if let Some(projection) = opts.projection {
2316            scanner.project(projection)?;
2317        }
2318        Ok(scanner)
2319    }
2320    pub(crate) async fn scan_batch(
2321        &self,
2322        table: Table,
2323        predicate: Option<&Predicate>,
2324        projection: &[&str],
2325    ) -> Result<RecordBatch> {
2326        let opts = ScanOpts {
2327            predicate,
2328            projection: (!projection.is_empty()).then_some(projection),
2329        };
2330        self.scan(table, opts)
2331            .await?
2332            .try_into_batch()
2333            .await
2334            .context("scan failed")
2335    }
2336    pub async fn count_rows(&self, table: Table) -> Result<usize> {
2337        self.dataset(table)
2338            .await?
2339            .count_rows(None)
2340            .await
2341            .map_err(Into::into)
2342    }
2343    /// Collect the primary-key (`id`) set for `table`. Storage verification
2344    /// compares these sets across two stores: matching row counts can still
2345    /// hide divergent membership, so proving a destination is a complete
2346    /// superset of a source needs the ids, not the cardinalities
2347    /// (spec.md#substrate, `lance-deterministic-pk`).
2348    pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
2349        let batch = self.scan_batch(table, None, &["id"]).await?;
2350        let ids = batch
2351            .column_by_name("id")
2352            .context("scan projection dropped the id column")?
2353            .as_any()
2354            .downcast_ref::<StringArray>()
2355            .context("id column is not Utf8")?;
2356        Ok(ids.iter().flatten().map(str::to_owned).collect())
2357    }
2358    /// Names of every index on `messages` - the vector-index tests read this.
2359    #[cfg(test)]
2360    pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
2361        let dataset = self.dataset(Table::Messages).await?;
2362        let indices = dataset.load_indices().await?;
2363        Ok(indices.iter().map(|index| index.name.clone()).collect())
2364    }
2365
2366    /// Whether `messages` carries an index named `name`. Manifest-only and
2367    /// cache-backed (`load_indices` hits the dataset index cache), so it is
2368    /// cheap enough to gate `Scanner::fast_search` per query: fast-search
2369    /// returns an empty plan when the index is absent, so the retrievers must
2370    /// only opt in once it exists.
2371    pub(crate) async fn messages_has_index(&self, name: &str) -> Result<bool> {
2372        let dataset = self.dataset(Table::Messages).await?;
2373        let indices = dataset.load_indices().await?;
2374        Ok(indices.iter().any(|index| index.name == name))
2375    }
2376
2377    /// Count rows in `table` not yet covered by `index_name`. Manifest-only;
2378    /// a missing index reports the whole table. Powers `pond status`.
2379    pub(crate) async fn unindexed_row_count(
2380        &self,
2381        table: Table,
2382        index_name: &str,
2383    ) -> Result<usize> {
2384        let dataset = self.dataset(table).await?;
2385        let fragments = dataset
2386            .unindexed_fragments(index_name)
2387            .await
2388            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
2389        Ok(fragments
2390            .iter()
2391            .map(|fragment| fragment.num_rows().unwrap_or(0))
2392            .sum())
2393    }
2394
2395    /// Which table owns the named index, if any. Used by
2396    /// `pond optimize --drop-index <name>` to route the drop to the right
2397    /// dataset without sequentially probing-and-swallowing errors (the prior
2398    /// loop hid permission/network failures behind "no such index"). Runs the
2399    /// three `load_indices` calls in parallel; an error here is a real I/O
2400    /// failure and propagates with context.
2401    pub(crate) async fn find_index_owner(&self, name: &str) -> Result<Option<Table>> {
2402        let list = |table: Table| async move {
2403            let dataset = self.dataset(table).await?;
2404            let names: Vec<String> = dataset
2405                .load_indices()
2406                .await
2407                .with_context(|| format!("load_indices failed for {}", table.label()))?
2408                .iter()
2409                .map(|index| index.name.clone())
2410                .collect();
2411            Ok::<_, anyhow::Error>(names)
2412        };
2413        let (sessions, messages, parts) = tokio::try_join!(
2414            list(Table::Sessions),
2415            list(Table::Messages),
2416            list(Table::Parts),
2417        )?;
2418        for (table, names) in [
2419            (Table::Sessions, sessions),
2420            (Table::Messages, messages),
2421            (Table::Parts, parts),
2422        ] {
2423            if names.iter().any(|n| n == name) {
2424                return Ok(Some(table));
2425            }
2426        }
2427        Ok(None)
2428    }
2429
2430    /// Drop the named index. Used by the `pond optimize --force-embed` model-swap path
2431    /// to retire an IVF_SQ whose centroids belong to the old distance
2432    /// space, before the next write re-bootstraps it over the new model's
2433    /// vectors. Errors when the index does not exist; callers may swallow
2434    /// that.
2435    pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
2436        let mut guard = self.cached(table).await?.lock().await;
2437        let mut dataset = guard.latest().await?;
2438        dataset
2439            .drop_index(name)
2440            .await
2441            .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
2442        guard.replace(dataset);
2443        Ok(())
2444    }
2445
2446    /// Resolve each table's stored location through the namespace catalog
2447    /// (spec.md#lance-chokepoints-catalog) - no hardcoded `.lance` suffix.
2448    async fn table_location(&self, table_name: &str) -> Result<String> {
2449        let request = DescribeTableRequest {
2450            id: Some(self.nm_ident.as_table_id(table_name)),
2451            ..Default::default()
2452        };
2453        let response = self
2454            .nm
2455            .describe_table(request)
2456            .await
2457            .with_context(|| format!("failed to describe table {table_name}"))?;
2458        response
2459            .location
2460            .with_context(|| format!("namespace returned no location for table {table_name}"))
2461    }
2462
2463    /// Whether the store holds synced data yet. `open` eagerly creates empty
2464    /// `sessions`/`messages` datasets, but `parts` opens lazily on first write
2465    /// (see `open_with_options`), so its presence is the "has been synced"
2466    /// signal - letting read-only surfaces (`pond status`)
2467    /// render an empty state instead of erroring on the first `parts` describe.
2468    pub async fn initialized(&self) -> Result<bool> {
2469        let request = DescribeTableRequest {
2470            id: Some(self.nm_ident.as_table_id(sessions::PARTS)),
2471            ..Default::default()
2472        };
2473        match self.nm.describe_table(request).await {
2474            Ok(_) => Ok(true),
2475            Err(error) if is_namespace_error_code(&error, ErrorCode::TableNotFound) => Ok(false),
2476            Err(error) => {
2477                Err(anyhow::Error::from(error)).context("failed to probe table existence")
2478            }
2479        }
2480    }
2481
2482    /// On-disk byte totals for the three datasets plus the data-dir remainder.
2483    /// Every byte is sized by listing through Lance's object store
2484    /// (spec.md#lance-chokepoints-storage), identical for `file://` and `s3://`.
2485    pub async fn table_sizes(&self) -> Result<TableSizes> {
2486        let registry = Arc::new(ObjectStoreRegistry::default());
2487        let params = self.object_store_params();
2488
2489        let sessions = self
2490            .listed_size(
2491                &registry,
2492                &params,
2493                &self.table_location(sessions::SESSIONS).await?,
2494            )
2495            .await?;
2496        let messages = self
2497            .listed_size(
2498                &registry,
2499                &params,
2500                &self.table_location(sessions::MESSAGES).await?,
2501            )
2502            .await?;
2503        let parts = self
2504            .listed_size(
2505                &registry,
2506                &params,
2507                &self.table_location(sessions::PARTS).await?,
2508            )
2509            .await?;
2510        // `other` is whatever sits under the data-dir root but not in the three
2511        // tables (config.toml, stray index temp files): root total minus them.
2512        let root_total = self
2513            .listed_size(&registry, &params, self.location.as_str())
2514            .await?;
2515        let other = root_total.saturating_sub(sessions + messages + parts);
2516        let sessions_data = self
2517            .data_liveness(&registry, &params, Table::Sessions, sessions::SESSIONS)
2518            .await?;
2519        let messages_data = self
2520            .data_liveness(&registry, &params, Table::Messages, sessions::MESSAGES)
2521            .await?;
2522        let parts_data = self
2523            .data_liveness(&registry, &params, Table::Parts, sessions::PARTS)
2524            .await?;
2525        Ok(TableSizes {
2526            sessions,
2527            messages,
2528            parts,
2529            other,
2530            sessions_data,
2531            messages_data,
2532            parts_data,
2533        })
2534    }
2535
2536    async fn data_liveness(
2537        &self,
2538        registry: &Arc<ObjectStoreRegistry>,
2539        params: &ObjectStoreParams,
2540        table: Table,
2541        table_name: &str,
2542    ) -> Result<DataLiveness> {
2543        let location = self.table_location(table_name).await?;
2544        let data_dir = format!("{}/data", location.trim_end_matches('/'));
2545        let on_disk = self.listed_size(registry, params, &data_dir).await?;
2546        let dataset = self.dataset(table).await?;
2547        let live = dataset
2548            .get_fragments()
2549            .iter()
2550            .try_fold(0u64, |total, fragment| {
2551                Some(total + fragment_bytes(fragment.metadata())?)
2552            });
2553        Ok(DataLiveness { on_disk, live })
2554    }
2555
2556    /// Sum `ObjectMeta.size` for every object recursively under `uri`.
2557    async fn listed_size(
2558        &self,
2559        registry: &Arc<ObjectStoreRegistry>,
2560        params: &ObjectStoreParams,
2561        uri: &str,
2562    ) -> Result<u64> {
2563        let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
2564            .await
2565            .with_context(|| format!("failed to open object store for {uri}"))?;
2566        let mut listing = store.list(Some(base));
2567        let mut total = 0u64;
2568        while let Some(meta) = listing.next().await {
2569            let meta = meta.with_context(|| format!("listing {uri} failed"))?;
2570            total += meta.size;
2571        }
2572        Ok(total)
2573    }
2574    async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
2575        match table {
2576            Table::Sessions => Ok(&self.datasets.sessions),
2577            Table::Messages => Ok(&self.datasets.messages),
2578            Table::Parts => self.parts_cached().await,
2579        }
2580    }
2581
2582    /// Open `parts.lance` on first use (spec.md#datasets). Single-flight via
2583    /// `OnceCell`; once initialized, behaves identically to the other two.
2584    async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
2585        self.datasets
2586            .parts
2587            .get_or_try_init(|| async {
2588                let dataset = open_or_create_via_ns(
2589                    &self.nm,
2590                    &self.nm_ident,
2591                    sessions::PARTS,
2592                    sessions::part_schema(),
2593                    &self.session,
2594                    &self.storage_options,
2595                )
2596                .await?;
2597                Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
2598                    dataset,
2599                    last_refresh: Instant::now(),
2600                    refresh_after: self.parts_refresh_after,
2601                }))
2602            })
2603            .await
2604    }
2605    async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
2606    where
2607        Fut: std::future::Future<Output = Result<T>>,
2608        Op: FnMut() -> Fut,
2609    {
2610        let mut attempt = 0u8;
2611        loop {
2612            attempt = attempt.saturating_add(1);
2613            match operation().await {
2614                Ok(value) => return Ok(value),
2615                Err(error) if attempt < self.retry.attempts => {
2616                    let backoff = self.backoff(attempt);
2617                    // `{:#}` walks anyhow's cause chain inline; `%error` (Display)
2618                    // drops everything below the top-level message.
2619                    let error_chain = format!("{error:#}");
2620                    tracing::warn!(
2621                        label,
2622                        attempt,
2623                        ?backoff,
2624                        error = %error_chain,
2625                        "retrying Lance operation"
2626                    );
2627                    tokio::time::sleep(backoff).await;
2628                }
2629                Err(error) => {
2630                    let error_chain = format!("{error:#}");
2631                    tracing::warn!(
2632                        label,
2633                        attempt,
2634                        error = %error_chain,
2635                        "Lance operation exhausted retries"
2636                    );
2637                    // spec.md#protocol: surface OCC failures as a typed `conflict`
2638                    // rather than the generic `storage_unavailable` bucket. The
2639                    // chain root is a `lance::Error` (commit-conflict family) when
2640                    // pond's retry layer exhausted because the manifest could not
2641                    // be advanced; everything else (timeouts, IAM, disk) stays
2642                    // `storage_unavailable`.
2643                    if is_commit_conflict(&error) {
2644                        return Err(error.context(ConflictExhausted { attempts: attempt }));
2645                    }
2646                    return Err(error);
2647                }
2648            }
2649        }
2650    }
2651    fn backoff(&self, attempt: u8) -> Duration {
2652        let shift = u32::from(attempt.saturating_sub(1));
2653        let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
2654        let base = self.retry.initial_backoff.saturating_mul(multiplier);
2655        // Symmetric +/- `jitter` factor de-correlates concurrent retriers on
2656        // a contended manifest (spec.md#lance-retry-jitter); clamped to `max_backoff`.
2657        let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
2658        base.mul_f64(factor).min(self.retry.max_backoff)
2659    }
2660}
2661/// Compaction phase: plan + amplification veto + execute + `cleanup_old_versions`,
2662/// one retry block, separate from the indices phase so a lost Rewrite race
2663/// does not abort index work.
2664///
2665/// Vetoes Lance-planned tasks instead of pre-gating on pond fragment math:
2666/// Lance bins split at index-coverage boundaries, so pond predictions diverge
2667/// from what Lance actually rewrites (the old run-sum gate latched open and
2668/// rewrote a 665 MiB tail fragment every 5-min sync). Only whole planned
2669/// tasks are filtered, so OCC and conflict semantics are untouched.
2670///
2671/// spec.md#lance-index-maintenance mandates FRI on by default, but at
2672/// v7.0.0-beta.16 `defer_index_remap=true` together with `stable-row-ids`
2673/// panics in `optimize.rs::commit_compaction` with "defer_index_remap
2674/// requires row_addrs but none were provided": `rewrite_files` skips
2675/// row_addrs when stable row ids are on, then the FRI builder demands
2676/// them. With stable_row_ids the remap step is already a no-op
2677/// (`optimize.rs:1490`: `needs_remapping = !uses_stable_row_ids() &&
2678/// !defer_index_remap`), so running without FRI is correct - we only
2679/// lose the documented concurrency-with-index-build benefit. Flip to
2680/// `true` once upstream fixes the conflict.
2681async fn optimize_table_compact(
2682    dataset: &mut Dataset,
2683    table: Table,
2684    progress: Option<&OptimizeProgressFn>,
2685    policy: &MaintenancePolicy,
2686) -> Result<()> {
2687    let stats: Vec<FragmentStat> = dataset
2688        .get_fragments()
2689        .iter()
2690        .map(|fragment| fragment_stat(fragment.metadata()))
2691        .collect();
2692    let compaction = CompactionOptions {
2693        target_rows_per_fragment: derived_target_rows(&stats),
2694        max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
2695        defer_index_remap: false,
2696        ..CompactionOptions::default()
2697    };
2698
2699    let mut plan = plan_compaction(dataset, &compaction).await?;
2700    if policy.compaction_fragment_cap > 0 {
2701        plan.tasks.retain(|task| {
2702            let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
2703            let keep = keep_task(
2704                &task_stats,
2705                policy.compaction_fragment_cap,
2706                compaction.materialize_deletions_threshold,
2707            );
2708            if !keep {
2709                tracing::debug!(
2710                    target: "pond::perf",
2711                    table = table.as_str(),
2712                    fragments = task_stats.len(),
2713                    "compaction task vetoed: merge dominated by one large fragment",
2714                );
2715            }
2716            keep
2717        });
2718    }
2719    if plan.tasks.is_empty() {
2720        tracing::debug!(
2721            target: "pond::perf",
2722            table = table.as_str(),
2723            "compaction skipped: no task to run",
2724        );
2725    } else {
2726        emit(
2727            progress,
2728            OptimizeEvent::PhaseStart {
2729                table,
2730                phase: OptimizePhase::Compact,
2731                detail: None,
2732            },
2733        );
2734        let started = Instant::now();
2735        let mut completed = Vec::with_capacity(plan.tasks.len());
2736        for task in plan.compaction_tasks() {
2737            completed.push(task.execute(dataset).await?);
2738        }
2739        commit_compaction(
2740            dataset,
2741            completed,
2742            Arc::new(DatasetIndexRemapperOptions::default()),
2743            &compaction,
2744        )
2745        .await?;
2746        emit(
2747            progress,
2748            OptimizeEvent::PhaseDone {
2749                table,
2750                phase: OptimizePhase::Compact,
2751                elapsed_ms: started.elapsed().as_millis() as u64,
2752            },
2753        );
2754    }
2755
2756    // Safe GC only. delete_unverified=false keeps Lance's 7-day in-progress
2757    // guard, so this never races a concurrent writer (spec.md#concurrency); GC
2758    // runs outside OCC, so the guard is what makes it safe on any backend.
2759    emit(
2760        progress,
2761        OptimizeEvent::PhaseStart {
2762            table,
2763            phase: OptimizePhase::Cleanup,
2764            detail: None,
2765        },
2766    );
2767    let started = Instant::now();
2768    // Lance v7 `cleanup_old_versions` removes orphan files inside
2769    // `_indices/<uuid>/` but does NOT remove the parent dir, so failed/no-op
2770    // index merges accumulate empty UUID dirs forever (one inode each).
2771    // Harmless beyond inode pressure; tracked upstream. No pond-side FS sweep
2772    // here (spec.md#concurrency: Lance-native maintenance only).
2773    dataset
2774        .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
2775        .await
2776        .context("cleanup_old_versions failed during index optimize")?;
2777    emit(
2778        progress,
2779        OptimizeEvent::PhaseDone {
2780            table,
2781            phase: OptimizePhase::Cleanup,
2782            elapsed_ms: started.elapsed().as_millis() as u64,
2783        },
2784    );
2785
2786    Ok(())
2787}
2788
2789/// Indices phase: per-intent create/rebuild + batched `optimize_indices(append)`
2790/// for incremental families. Returns `true` if anything committed.
2791async fn optimize_table_indices(
2792    dataset: &mut Dataset,
2793    intents: &[IndexIntent],
2794    table: Table,
2795    progress: Option<&OptimizeProgressFn>,
2796) -> Result<bool> {
2797    let existing = dataset.load_indices().await?;
2798    let existing_names: std::collections::HashSet<String> =
2799        existing.iter().map(|index| index.name.clone()).collect();
2800
2801    let mut append_indices: Vec<String> = Vec::new();
2802    let mut did_work = false;
2803
2804    for intent in intents {
2805        let exists = existing_names.contains(intent.name);
2806
2807        if !exists {
2808            if !intent.trigger.should_create(dataset).await? {
2809                continue;
2810            }
2811            let params = intent.params.build(dataset).await?;
2812            let index_type = intent.params.index_type();
2813            tracing::info!(
2814                index = intent.name,
2815                column = intent.column,
2816                "creating Lance index (trigger fired)",
2817            );
2818            emit(
2819                progress,
2820                OptimizeEvent::PhaseStart {
2821                    table,
2822                    phase: OptimizePhase::IndexCreate,
2823                    detail: Some(intent.name.to_owned()),
2824                },
2825            );
2826            let started = Instant::now();
2827            dataset
2828                .create_index_builder(&[intent.column], index_type, params.as_ref())
2829                .name(intent.name.to_owned())
2830                .replace(false)
2831                .progress(lance_progress(progress, table, intent.name))
2832                .await
2833                .with_context(|| format!("failed to create index {}", intent.name))?;
2834            emit(
2835                progress,
2836                OptimizeEvent::PhaseDone {
2837                    table,
2838                    phase: OptimizePhase::IndexCreate,
2839                    elapsed_ms: started.elapsed().as_millis() as u64,
2840                },
2841            );
2842            did_work = true;
2843            continue;
2844        }
2845
2846        // Fold every trailing fragment so the index is always current after an
2847        // optimize - no lag threshold. A tiny tail (rows written between this
2848        // fold and the next) stays invisible to `fast_search` queries until the
2849        // next fold, and the `DELTA_MERGE_THRESHOLD` cadence keeps the per-fold
2850        // segment from accumulating without bound (spec.md#search).
2851        let unindexed = dataset.unindexed_fragments(intent.name).await?;
2852        if unindexed.is_empty() {
2853            continue;
2854        }
2855        match intent.params {
2856            IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
2857                let params = intent.params.build(dataset).await?;
2858                let index_type = intent.params.index_type();
2859                tracing::debug!(
2860                    target: "pond::perf",
2861                    index = intent.name,
2862                    column = intent.column,
2863                    "rebuilding Lance BTree index",
2864                );
2865                emit(
2866                    progress,
2867                    OptimizeEvent::PhaseStart {
2868                        table,
2869                        phase: OptimizePhase::IndexRebuild,
2870                        detail: Some(intent.name.to_owned()),
2871                    },
2872                );
2873                let started = Instant::now();
2874                dataset
2875                    .create_index_builder(&[intent.column], index_type, params.as_ref())
2876                    .name(intent.name.to_owned())
2877                    .replace(true)
2878                    .progress(lance_progress(progress, table, intent.name))
2879                    .await
2880                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2881                emit(
2882                    progress,
2883                    OptimizeEvent::PhaseDone {
2884                        table,
2885                        phase: OptimizePhase::IndexRebuild,
2886                        elapsed_ms: started.elapsed().as_millis() as u64,
2887                    },
2888                );
2889                did_work = true;
2890            }
2891            IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
2892            | IndexParamsKind::InvertedFtsWord
2893            | IndexParamsKind::IvfSqCosine { .. } => {
2894                append_indices.push(intent.name.to_owned());
2895            }
2896            IndexParamsKind::Scalar(_) => {
2897                let params = intent.params.build(dataset).await?;
2898                emit(
2899                    progress,
2900                    OptimizeEvent::PhaseStart {
2901                        table,
2902                        phase: OptimizePhase::IndexRebuild,
2903                        detail: Some(intent.name.to_owned()),
2904                    },
2905                );
2906                let started = Instant::now();
2907                dataset
2908                    .create_index_builder(
2909                        &[intent.column],
2910                        intent.params.index_type(),
2911                        params.as_ref(),
2912                    )
2913                    .name(intent.name.to_owned())
2914                    .replace(true)
2915                    .progress(lance_progress(progress, table, intent.name))
2916                    .await
2917                    .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2918                emit(
2919                    progress,
2920                    OptimizeEvent::PhaseDone {
2921                        table,
2922                        phase: OptimizePhase::IndexRebuild,
2923                        elapsed_ms: started.elapsed().as_millis() as u64,
2924                    },
2925                );
2926                did_work = true;
2927            }
2928        }
2929    }
2930
2931    if !append_indices.is_empty() {
2932        // Per-index segment count from the manifest loaded above (delta segments
2933        // share the intent name). Indices that have piled up
2934        // `DELTA_MERGE_THRESHOLD` segments fold with `merge` (collapse to one);
2935        // the rest take the cheap append. Splitting keeps each query reading few
2936        // segments without paying a consolidation on every tiny fold.
2937        let segment_count = |name: &str| {
2938            existing
2939                .iter()
2940                .filter(|index| index.name.as_str() == name)
2941                .count()
2942        };
2943        let (to_merge, to_append): (Vec<String>, Vec<String>) = append_indices
2944            .iter()
2945            .cloned()
2946            .partition(|name| segment_count(name) >= DELTA_MERGE_THRESHOLD);
2947
2948        emit(
2949            progress,
2950            OptimizeEvent::PhaseStart {
2951                table,
2952                phase: OptimizePhase::IndexAppend,
2953                detail: Some(append_indices.join(", ")),
2954            },
2955        );
2956        let started = Instant::now();
2957        if !to_append.is_empty() {
2958            dataset
2959                .optimize_indices(&OptimizeOptions::append().index_names(to_append))
2960                .await
2961                .context("optimize_indices(append) failed during index optimize")?;
2962        }
2963        if !to_merge.is_empty() {
2964            dataset
2965                .optimize_indices(
2966                    &OptimizeOptions::merge(DELTA_MERGE_THRESHOLD).index_names(to_merge),
2967                )
2968                .await
2969                .context("optimize_indices(merge) failed during index optimize")?;
2970        }
2971        emit(
2972            progress,
2973            OptimizeEvent::PhaseDone {
2974                table,
2975                phase: OptimizePhase::IndexAppend,
2976                elapsed_ms: started.elapsed().as_millis() as u64,
2977            },
2978        );
2979        tracing::debug!(
2980            target: "pond::perf",
2981            indices = ?append_indices,
2982            "folded trailing fragments into indices",
2983        );
2984        did_work = true;
2985    }
2986
2987    Ok(did_work)
2988}
2989
2990async fn rebuild_index(
2991    dataset: &mut Dataset,
2992    intent: &IndexIntent,
2993    progress: Option<&OptimizeProgressFn>,
2994    table: Table,
2995) -> Result<()> {
2996    if !intent.trigger.should_create(dataset).await? {
2997        return Ok(());
2998    }
2999    let params = intent.params.build(dataset).await?;
3000    dataset
3001        .create_index_builder(
3002            &[intent.column],
3003            intent.params.index_type(),
3004            params.as_ref(),
3005        )
3006        .name(intent.name.to_owned())
3007        .replace(true)
3008        .progress(lance_progress(progress, table, intent.name))
3009        .await
3010        .with_context(|| format!("failed to rebuild index {}", intent.name))?;
3011    Ok(())
3012}
3013
3014async fn index_status(
3015    table: Table,
3016    dataset: &Dataset,
3017    intents: &[IndexIntent],
3018) -> Result<Vec<IndexStatus>> {
3019    let existing = dataset.load_indices().await?;
3020    let existing_names: std::collections::HashSet<String> =
3021        existing.iter().map(|index| index.name.clone()).collect();
3022    let total_fragments = dataset.get_fragments().len();
3023    let total_rows = dataset.count_rows(None).await?;
3024    let mut statuses = Vec::with_capacity(intents.len());
3025    for intent in intents {
3026        let exists = existing_names.contains(intent.name);
3027        if !exists {
3028            statuses.push(IndexStatus {
3029                table,
3030                intent_name: intent.name.to_owned(),
3031                fragments_covered: 0,
3032                unindexed_fragments: total_fragments,
3033                unindexed_rows: total_rows,
3034                exists,
3035            });
3036            continue;
3037        }
3038        let unindexed = dataset
3039            .unindexed_fragments(intent.name)
3040            .await
3041            .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
3042        let unindexed_fragments = unindexed.len();
3043        let unindexed_rows = unindexed
3044            .iter()
3045            .map(|fragment| fragment.num_rows().unwrap_or(0))
3046            .sum();
3047        statuses.push(IndexStatus {
3048            table,
3049            intent_name: intent.name.to_owned(),
3050            fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
3051            unindexed_fragments,
3052            unindexed_rows,
3053            exists,
3054        });
3055    }
3056    Ok(statuses)
3057}
3058
3059/// Open the table at `table_name` via the namespace; create + initialize on
3060/// `TableNotFound`. Schema-checks the on-disk dataset against pond's
3061/// expectation so a stale data dir surfaces early.
3062///
3063/// Probes via `nm.describe_table` directly rather than `DatasetBuilder::from_namespace`:
3064/// the builder re-wraps an already-`Namespace`-wrapped error
3065/// (lance/src/dataset/builder.rs:142), so going through it would force a
3066/// chain-walk to classify `TableNotFound`. The direct probe stays at one
3067/// wrap level and downcasts cleanly. Managed-versioning hookup (REST
3068/// namespace external-manifest commits) is not wired here; v1 ships
3069/// Directory v2 only.
3070/// Diagnostic S3 IO tracing. Inert unless [`io_trace::enable`] is called
3071/// before the store opens; then a shared `IOTracker` is injected as the
3072/// object-store wrapper on every dataset read open, counting exactly how many
3073/// GETs (and bytes, and - under the `io-trace` feature - which paths) each
3074/// query issues against a remote store. Used by `serve_mem_bench --io-trace`
3075/// to attribute the per-query S3 request load. Not a production code path.
3076pub mod io_trace {
3077    use lance_io::utils::tracking_store::{IOTracker, IoStats};
3078    use std::sync::{Arc, OnceLock};
3079
3080    static TRACKER: OnceLock<IOTracker> = OnceLock::new();
3081
3082    /// Arm tracing. Must run before the store opens so the wrapper is applied
3083    /// when the datasets' object store is built.
3084    pub fn enable() {
3085        let _ = TRACKER.set(IOTracker::default());
3086    }
3087
3088    /// The shared tracker as an object-store wrapper, when armed.
3089    pub(super) fn wrapper() -> Option<Arc<IOTracker>> {
3090        TRACKER.get().map(|tracker| Arc::new(tracker.clone()))
3091    }
3092
3093    /// Read and reset the IO accumulated since the last call.
3094    pub fn take() -> Option<IoStats> {
3095        TRACKER.get().map(IOTracker::incremental_stats)
3096    }
3097}
3098
3099async fn open_or_create_via_ns(
3100    nm: &Arc<dyn LanceNamespace>,
3101    nm_ident: &NamespaceIdent,
3102    table_name: &str,
3103    schema: lance::deps::arrow_schema::SchemaRef,
3104    session: &Arc<Session>,
3105    storage_options: &HashMap<String, String>,
3106) -> Result<Dataset> {
3107    let table_id = nm_ident.as_table_id(table_name);
3108
3109    let request = DescribeTableRequest {
3110        id: Some(table_id.clone()),
3111        ..Default::default()
3112    };
3113    match nm.describe_table(request).await {
3114        Ok(response) => {
3115            let location = response.location.with_context(|| {
3116                format!("namespace returned no location for table {table_name}")
3117            })?;
3118            let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
3119            match io_trace::wrapper() {
3120                Some(wrapper) => {
3121                    builder = builder.with_store_params(ObjectStoreParams {
3122                        object_store_wrapper: Some(wrapper),
3123                        storage_options_accessor: (!storage_options.is_empty()).then(|| {
3124                            Arc::new(StorageOptionsAccessor::with_static_options(
3125                                storage_options.clone(),
3126                            ))
3127                        }),
3128                        ..Default::default()
3129                    });
3130                }
3131                None => {
3132                    if !storage_options.is_empty() {
3133                        builder = builder.with_storage_options(storage_options.clone());
3134                    }
3135                }
3136            }
3137            let dataset = builder
3138                .load()
3139                .await
3140                .with_context(|| format!("failed to open table {table_name}"))?;
3141            ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
3142            return Ok(dataset);
3143        }
3144        Err(error) => match &error {
3145            error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
3146                // fall through to create
3147            }
3148            _ => {
3149                return Err(anyhow::Error::from(error))
3150                    .with_context(|| format!("failed to describe table {table_name}"));
3151            }
3152        },
3153    }
3154
3155    // Create path: pond seeds an empty dataset with the canonical schema so
3156    // every subsequent open lands on a real Lance dataset, not a phantom.
3157    let mut write_params = sessions::write_params_for_create();
3158    write_params.session = Some(session.clone());
3159    write_params.mode = WriteMode::Create;
3160    if !storage_options.is_empty() {
3161        write_params.store_params = Some(ObjectStoreParams {
3162            storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
3163                storage_options.clone(),
3164            ))),
3165            ..Default::default()
3166        });
3167    }
3168    let reader = sessions::empty_reader(schema)?;
3169    Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
3170        .await
3171        .with_context(|| format!("failed to create table {table_name}"))
3172}
3173
3174// lance-namespace sometimes nests one `lance::Error::Namespace` inside another
3175// before the underlying `NamespaceError`; walk the whole `.source()` chain
3176// rather than only matching the outer variant.
3177fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
3178    if !matches!(error, lance::Error::Namespace { .. }) {
3179        return false;
3180    }
3181    std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
3182        link.source()
3183    })
3184    .filter_map(|link| link.downcast_ref::<NamespaceError>())
3185    .any(|inner| inner.code() == code)
3186}
3187
3188fn scanner_with_prefilter(
3189    dataset: &Dataset,
3190    predicate: Option<&Predicate>,
3191) -> Result<lance::dataset::scanner::Scanner> {
3192    let mut scanner = dataset.scan();
3193    scanner.prefilter(true);
3194    if let Some(predicate) = predicate {
3195        let filter = predicate.to_lance();
3196        if !filter.is_empty() {
3197            scanner.filter(&filter)?;
3198        }
3199    }
3200    Ok(scanner)
3201}
3202fn ensure_schema_matches(
3203    dataset: &Dataset,
3204    expected: &lance::deps::arrow_schema::Schema,
3205    table_name: &str,
3206) -> Result<()> {
3207    use lance::deps::arrow_schema::DataType;
3208    use std::collections::BTreeSet;
3209    let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3210    let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
3211    let expected_names: BTreeSet<&str> = expected
3212        .fields()
3213        .iter()
3214        .map(|f| f.name().as_str())
3215        .collect();
3216    if actual_names != expected_names {
3217        anyhow::bail!(
3218            "table {table_name} has columns {actual_names:?} but this pond build expects \
3219             {expected_names:?} - the on-disk store predates a schema change; delete the \
3220             data directory and re-run `pond ingest`",
3221        );
3222    }
3223    // Catch a vector-dim change (configured `[embeddings].dim` differs from
3224    // the on-disk vector column width) early with a friendly message. Lance
3225    // would otherwise reject the next write with an opaque schema-mismatch
3226    // error inside the `merge_update` path.
3227    for actual_field in actual.fields() {
3228        let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
3229            continue;
3230        };
3231        if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
3232            (actual_field.data_type(), expected_field.data_type())
3233            && actual_dim != expected_dim
3234        {
3235            tracing::warn!(
3236                table = table_name,
3237                column = actual_field.name(),
3238                actual_dim,
3239                expected_dim,
3240                "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
3241            );
3242        }
3243    }
3244    Ok(())
3245}
3246/// Object-store defaults injected for any non-local pond location. Each key
3247/// is only set when neither the user-provided key nor its env-var-form alias
3248/// is already present, so explicit overrides in `[storage]` always win.
3249/// `aws_unsigned_payload` is gated on a custom endpoint (the marker for
3250/// S3-compatible stores like Hetzner, MinIO, R2), where the SHA256 payload
3251/// signature is wasted work the server does not validate.
3252fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
3253    fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
3254        if aliases
3255            .iter()
3256            .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
3257        {
3258            return;
3259        }
3260        options.insert(aliases[0].to_owned(), value.to_owned());
3261    }
3262    set_default(options, &["pool_idle_timeout"], "300 seconds");
3263    set_default(options, &["connect_timeout"], "10 seconds");
3264    // `request_timeout` bounds a single object-store request (one range GET/PUT),
3265    // not a whole scan - a streaming read issues many small requests, each well
3266    // under this. We keep it deliberately tight as a HARD BARRIER: a single
3267    // request exceeding 60s means a design/infra problem to fix (chunk the read,
3268    // use change-data-feed, fix the endpoint), never something to paper over with
3269    // a longer timeout. An explicit `[storage]` override still wins.
3270    set_default(options, &["request_timeout"], "60 seconds");
3271    let has_custom_endpoint = ["aws_endpoint", "endpoint"]
3272        .iter()
3273        .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
3274    if has_custom_endpoint {
3275        set_default(
3276            options,
3277            &["aws_unsigned_payload", "unsigned_payload"],
3278            "true",
3279        );
3280    }
3281}
3282
3283fn quoted_string(value: &str) -> String {
3284    format!("'{}'", value.replace('\'', "''"))
3285}
3286fn like_contains(value: &str) -> String {
3287    let escaped = value
3288        .replace('\\', "\\\\")
3289        .replace('%', "\\%")
3290        .replace('_', "\\_")
3291        .replace('\'', "''");
3292    format!("'%{escaped}%'")
3293}
3294
3295#[cfg(test)]
3296mod tests {
3297    #![allow(clippy::expect_used, clippy::unwrap_used)]
3298
3299    use super::*;
3300    use tempfile::TempDir;
3301
3302    fn set(scope: Option<&str>) -> CredsSet {
3303        CredsSet {
3304            scope: scope.map(str::to_owned),
3305            access_key_id: Some("AKIA".to_owned()),
3306            secret_access_key: Some("shh".to_owned()),
3307            ..CredsSet::default()
3308        }
3309    }
3310
3311    fn opts(resolved: &ResolvedStorage, key: &str) -> Option<String> {
3312        resolved.options.get(key).cloned()
3313    }
3314
3315    #[test]
3316    fn storage_url_translation_table() {
3317        // file (Lance's `uri_to_url` appends the trailing slash; `child_uri`
3318        // trims it downstream)
3319        let local = StorageUrl::parse("/srv/pond").unwrap();
3320        assert_eq!(local.lance_url().as_str(), "file:///srv/pond/");
3321        assert!(local.is_local());
3322        assert!(local.scheme_options.is_empty());
3323        // s3 passthrough
3324        let aws = StorageUrl::parse("s3://bucket/prefix").unwrap();
3325        assert_eq!(aws.lance_url().as_str(), "s3://bucket/prefix");
3326        assert!(aws.scheme_options.is_empty());
3327        // s3+https: TLS stays on, virtual-hosted defaults on for domain
3328        // hosts, region defaults deterministically. The endpoint is
3329        // assembled at resolve time with the bucket folded into the host
3330        // (object_store's virtual-hosted convention).
3331        let fat = StorageUrl::parse("s3+https://nbg1.example.com/my-pond/sub").unwrap();
3332        assert_eq!(fat.lance_url().as_str(), "s3://my-pond/sub");
3333        assert_eq!(
3334            fat.scheme_options,
3335            vec![
3336                ("allow_http", "false".to_owned()),
3337                ("virtual_hosted_style_request", "true".to_owned()),
3338                ("region", "us-east-1".to_owned()),
3339            ],
3340        );
3341        let resolved = fat.resolve(&BTreeMap::new()).unwrap();
3342        assert_eq!(
3343            opts(&resolved, "endpoint").as_deref(),
3344            Some("https://my-pond.nbg1.example.com"),
3345        );
3346        assert_eq!(opts(&resolved, "region").as_deref(), Some("us-east-1"));
3347        // s3+http on an IP host: allow_http flips, path-style auto-selected
3348        // (a bucket subdomain on an IP can't resolve), port survives.
3349        let plain = StorageUrl::parse("s3+http://127.0.0.1:9000/pond").unwrap();
3350        assert_eq!(plain.lance_url().as_str(), "s3://pond/");
3351        assert_eq!(plain.scheme_options[0], ("allow_http", "true".to_owned()));
3352        assert_eq!(
3353            plain.scheme_options[1],
3354            ("virtual_hosted_style_request", "false".to_owned()),
3355        );
3356        let resolved = plain.resolve(&BTreeMap::new()).unwrap();
3357        assert_eq!(
3358            opts(&resolved, "endpoint").as_deref(),
3359            Some("http://127.0.0.1:9000"),
3360        );
3361        // An explicit endpoint in `extra` is the escape hatch and wins.
3362        let mut pinned = BTreeMap::new();
3363        pinned.insert(
3364            "default".to_owned(),
3365            CredsSet {
3366                extra: [(
3367                    "endpoint".to_owned(),
3368                    "https://pinned.example.com".to_owned(),
3369                )]
3370                .into_iter()
3371                .collect(),
3372                ..CredsSet::default()
3373            },
3374        );
3375        let resolved = fat.resolve(&pinned).unwrap();
3376        assert_eq!(
3377            opts(&resolved, "endpoint").as_deref(),
3378            Some("https://pinned.example.com"),
3379        );
3380        // gs passthrough
3381        let gcs = StorageUrl::parse("gs://bucket/p").unwrap();
3382        assert_eq!(gcs.lance_url().as_str(), "gs://bucket/p");
3383        // az: account folds into options
3384        let azure = StorageUrl::parse("az://acct/container/p").unwrap();
3385        assert_eq!(azure.lance_url().as_str(), "az://container/p");
3386        assert_eq!(
3387            azure.scheme_options,
3388            vec![("account_name", "acct".to_owned())]
3389        );
3390        // tests-only schemes pass through untouched
3391        let shared = StorageUrl::parse("shared-memory://pond-test-x/").unwrap();
3392        assert_eq!(shared.lance_url().as_str(), "shared-memory://pond-test-x/");
3393    }
3394
3395    #[test]
3396    fn storage_url_rejects_bad_shapes() {
3397        // RFC 3986 userinfo is a leak class, never accepted.
3398        let err = StorageUrl::parse("s3+https://user:pass@host/bucket")
3399            .expect_err("userinfo must be rejected")
3400            .to_string();
3401        assert!(
3402            err.contains("creds"),
3403            "error must name the alternative: {err}"
3404        );
3405        // Missing bucket.
3406        assert!(StorageUrl::parse("s3+https://host").is_err());
3407        assert!(StorageUrl::parse("az://acct").is_err());
3408        // Unknown scheme names the grammar.
3409        let err = StorageUrl::parse("ftp://host/x")
3410            .expect_err("ftp")
3411            .to_string();
3412        assert!(err.contains("s3+https"), "got: {err}");
3413        // Unrecognized query params die loudly.
3414        let err = StorageUrl::parse("s3://b/p?regoin=x")
3415            .expect_err("typo")
3416            .to_string();
3417        assert!(err.contains("regoin"), "got: {err}");
3418        // Query params on local / in-memory schemes die just as loudly -
3419        // no silent carry into the URL Lance opens.
3420        let err = StorageUrl::parse("memory://x?creds=y")
3421            .expect_err("memory query")
3422            .to_string();
3423        assert!(err.contains("query params"), "got: {err}");
3424        let err = StorageUrl::parse("file:///x?creds=y")
3425            .expect_err("file query")
3426            .to_string();
3427        assert!(err.contains("query params"), "got: {err}");
3428        // `?` in a bare path is a filename character, not a query.
3429        assert!(StorageUrl::parse("/tmp/a?b").is_ok());
3430    }
3431
3432    #[test]
3433    fn storage_url_canonicalizes_ports_and_keeps_percent_encoding() {
3434        // Default port strips so scope matching can't split on `:443`.
3435        let with_port = StorageUrl::parse("s3+https://host:443/bucket/p").unwrap();
3436        let without = StorageUrl::parse("s3+https://host/bucket/p").unwrap();
3437        assert_eq!(with_port.canonical(), without.canonical());
3438        // Non-default port survives into the assembled endpoint.
3439        let odd = StorageUrl::parse("s3+https://host:8443/bucket").unwrap();
3440        let resolved = odd.resolve(&BTreeMap::new()).unwrap();
3441        assert_eq!(
3442            resolved.options.get("endpoint").map(String::as_str),
3443            Some("https://bucket.host:8443"),
3444        );
3445        // Percent-encoded prefix passes through to the Lance URL verbatim.
3446        let encoded = StorageUrl::parse("s3+https://host/bucket/pre%20fix").unwrap();
3447        assert_eq!(encoded.lance_url().as_str(), "s3://bucket/pre%20fix");
3448    }
3449
3450    #[test]
3451    fn query_params_strip_and_apply_over_set_fields() {
3452        let mut creds = BTreeMap::new();
3453        creds.insert(
3454            "default".to_owned(),
3455            CredsSet {
3456                region: Some("from-set".to_owned()),
3457                virtual_hosted_style_request: Some(false),
3458                ..set(None)
3459            },
3460        );
3461        let url = StorageUrl::parse(
3462            "s3+https://host/bucket/p?region=from-query&virtual_hosted_style_request=true",
3463        )
3464        .unwrap();
3465        // Stripped before Lance sees the URL.
3466        assert_eq!(url.lance_url().as_str(), "s3://bucket/p");
3467        assert!(url.canonical().query().is_none());
3468        let resolved = url.resolve(&creds).unwrap();
3469        // Assembly precedence: scheme < set < query.
3470        assert_eq!(opts(&resolved, "region").as_deref(), Some("from-query"));
3471        assert_eq!(
3472            opts(&resolved, "virtual_hosted_style_request").as_deref(),
3473            Some("true"),
3474        );
3475        // virtual_hosted=true (query) -> the bucket rides in the endpoint host.
3476        assert_eq!(
3477            opts(&resolved, "endpoint").as_deref(),
3478            Some("https://bucket.host"),
3479        );
3480    }
3481
3482    #[test]
3483    fn scope_matching_binds_by_longest_prefix_at_segment_boundaries() {
3484        let mut creds = BTreeMap::new();
3485        creds.insert("all".to_owned(), set(None));
3486        creds.insert("bucket".to_owned(), set(Some("s3+https://host/pond/")));
3487        creds.insert("deep".to_owned(), set(Some("s3+https://host/pond/sub")));
3488
3489        let bind = |input: &str| {
3490            StorageUrl::parse(input)
3491                .unwrap()
3492                .resolve(&creds)
3493                .unwrap()
3494                .binding
3495        };
3496        // Longest match wins.
3497        assert_eq!(
3498            bind("s3+https://host/pond/sub/x"),
3499            CredsBinding::Set {
3500                name: "deep".to_owned(),
3501                via: BindVia::Scope
3502            },
3503        );
3504        assert_eq!(
3505            bind("s3+https://host/pond/other"),
3506            CredsBinding::Set {
3507                name: "bucket".to_owned(),
3508                via: BindVia::Scope
3509            },
3510        );
3511        // Segment boundary: `/pond` does not match `/pond-2`.
3512        assert_eq!(
3513            bind("s3+https://host/pond-2"),
3514            CredsBinding::Set {
3515                name: "all".to_owned(),
3516                via: BindVia::CatchAll
3517            },
3518        );
3519        // No cross-scheme normalization: the scoped sets don't match s3://.
3520        assert_eq!(
3521            bind("s3://pond/sub"),
3522            CredsBinding::Set {
3523                name: "all".to_owned(),
3524                via: BindVia::CatchAll
3525            },
3526        );
3527        // Default-port spelling matches the portless scope.
3528        assert_eq!(
3529            bind("s3+https://host:443/pond/x"),
3530            CredsBinding::Set {
3531                name: "bucket".to_owned(),
3532                via: BindVia::Scope
3533            },
3534        );
3535        // `?creds=` pointer beats every scope...
3536        assert_eq!(
3537            bind("s3+https://host/pond/sub/x?creds=all"),
3538            CredsBinding::Set {
3539                name: "all".to_owned(),
3540                via: BindVia::Pointer
3541            },
3542        );
3543        // ...and a pointer to a missing set is an error, not a fallback.
3544        let err = StorageUrl::parse("s3://b/p?creds=nope")
3545            .unwrap()
3546            .resolve(&creds)
3547            .expect_err("missing set")
3548            .to_string();
3549        assert!(err.contains("creds=nope"), "got: {err}");
3550
3551        // No sets at all -> ambient chain; local URLs skip resolution.
3552        let empty = BTreeMap::new();
3553        assert_eq!(
3554            StorageUrl::parse("s3://b/p")
3555                .unwrap()
3556                .resolve(&empty)
3557                .unwrap()
3558                .binding,
3559            CredsBinding::Ambient,
3560        );
3561        assert_eq!(
3562            StorageUrl::parse("/srv/pond")
3563                .unwrap()
3564                .resolve(&creds)
3565                .unwrap()
3566                .binding,
3567            CredsBinding::NotApplicable,
3568        );
3569    }
3570
3571    #[test]
3572    fn unmatched_sets_are_reported_only_on_remote_invocations() {
3573        let mut creds = BTreeMap::new();
3574        creds.insert("used".to_owned(), set(Some("s3://bucket/")));
3575        creds.insert("idle".to_owned(), set(Some("s3://other/")));
3576
3577        let remote = StorageUrl::parse("s3://bucket/p")
3578            .unwrap()
3579            .resolve(&creds)
3580            .unwrap();
3581        assert_eq!(unmatched_creds_sets(&[&remote], &creds), vec!["idle"]);
3582
3583        // A purely local invocation must not nag about remote-only sets.
3584        let local = StorageUrl::parse("/srv/pond")
3585            .unwrap()
3586            .resolve(&creds)
3587            .unwrap();
3588        assert!(unmatched_creds_sets(&[&local], &creds).is_empty());
3589    }
3590
3591    #[test]
3592    fn secrets_materialize_from_file_and_command() {
3593        let dir = TempDir::new().unwrap();
3594        let key_path = dir.path().join("key");
3595        std::fs::write(&key_path, "from-file\n").unwrap();
3596        let mut creds = BTreeMap::new();
3597        creds.insert(
3598            "default".to_owned(),
3599            CredsSet {
3600                access_key_id_file: Some(key_path),
3601                // Two trailing newlines: exactly one is stripped.
3602                secret_access_key_command: Some("printf 'from-command\\n\\n'".to_owned()),
3603                ..CredsSet::default()
3604            },
3605        );
3606        let url = StorageUrl::parse("s3://bucket/p").unwrap();
3607        let resolved = url.resolve(&creds).unwrap();
3608        assert_eq!(
3609            opts(&resolved, "access_key_id").as_deref(),
3610            Some("from-file")
3611        );
3612        assert_eq!(
3613            opts(&resolved, "secret_access_key").as_deref(),
3614            Some("from-command\n"),
3615        );
3616
3617        // A failing command surfaces its text and exit status.
3618        let mut failing = BTreeMap::new();
3619        failing.insert(
3620            "default".to_owned(),
3621            CredsSet {
3622                secret_access_key_command: Some("exit 3".to_owned()),
3623                ..CredsSet::default()
3624            },
3625        );
3626        let err = url
3627            .resolve(&failing)
3628            .expect_err("command must fail")
3629            .to_string();
3630        assert!(err.contains("exit 3"), "got: {err}");
3631
3632        // The command cache: one subprocess per command text per process.
3633        let marker = dir.path().join("runs");
3634        let command = format!("echo run >> {} && echo secret", marker.display());
3635        let mut counted = BTreeMap::new();
3636        counted.insert(
3637            "default".to_owned(),
3638            CredsSet {
3639                secret_access_key_command: Some(command),
3640                ..CredsSet::default()
3641            },
3642        );
3643        url.resolve(&counted).unwrap();
3644        url.resolve(&counted).unwrap();
3645        let runs = std::fs::read_to_string(&marker).unwrap();
3646        assert_eq!(runs.lines().count(), 1, "command must run exactly once");
3647    }
3648
3649    #[test]
3650    fn check_errors_classify_by_kind_and_binding() {
3651        let auth_error = || object_store::Error::Unauthenticated {
3652            path: "k".to_owned(),
3653            source: "denied".into(),
3654        };
3655        let bound = CredsBinding::Set {
3656            name: "work".to_owned(),
3657            via: BindVia::Scope,
3658        };
3659        // Auth-class error with a bound set names the set...
3660        match classify_check_error(auth_error(), &bound, "put") {
3661            CheckFailure::Auth { set, .. } => assert_eq!(set, "work"),
3662            other => panic!("want Auth, got {other:?}"),
3663        }
3664        // ...and without one, points at the (empty) ambient chain.
3665        assert!(matches!(
3666            classify_check_error(auth_error(), &CredsBinding::Ambient, "put"),
3667            CheckFailure::NoCreds { .. },
3668        ));
3669        let denied = object_store::Error::PermissionDenied {
3670            path: "k".to_owned(),
3671            source: "403".into(),
3672        };
3673        assert!(matches!(
3674            classify_check_error(denied, &bound, "put"),
3675            CheckFailure::Auth { .. },
3676        ));
3677        // Anything else is I/O, set or no set.
3678        let missing = object_store::Error::NotFound {
3679            path: "k".to_owned(),
3680            source: "404".into(),
3681        };
3682        assert!(matches!(
3683            classify_check_error(missing, &bound, "get"),
3684            CheckFailure::Io { .. },
3685        ));
3686        // Lance wraps an empty-creds chain as a `Generic` error, never the
3687        // typed `Unauthenticated`; the rendered `CredentialsNotLoaded` is the
3688        // signal. Bound -> Auth (the set is wrong), unbound -> NoCreds.
3689        let no_creds = || object_store::Error::Generic {
3690            store: "S3",
3691            source: "Failed to get AWS credentials: CredentialsNotLoaded".into(),
3692        };
3693        assert!(matches!(
3694            classify_check_error(no_creds(), &bound, "put"),
3695            CheckFailure::Auth { .. },
3696        ));
3697        assert!(matches!(
3698            classify_check_error(no_creds(), &CredsBinding::Ambient, "put"),
3699            CheckFailure::NoCreds { .. },
3700        ));
3701    }
3702
3703    #[test]
3704    fn concise_cause_strips_upstream_noise_to_one_line() {
3705        // The shape Lance actually produces: bug-report boilerplate, the real
3706        // cause, an internal source location, then the same text re-printed.
3707        let inner = "Encountered internal error. Please file a bug report at \
3708                     https://github.com/lance-format/lance/issues. Failed to get AWS \
3709                     credentials: CredentialsNotLoaded, <WORKSPACE>/src/object_store/providers/aws.rs:401:21: \
3710                     Encountered internal error. Please file a bug report at \
3711                     https://github.com/lance-format/lance/issues. Failed to get AWS \
3712                     credentials: CredentialsNotLoaded";
3713        let failure = CheckFailure::NoCreds {
3714            source: anyhow!(inner.to_owned()).context("initial conditional put"),
3715        };
3716        let cause = failure.concise_cause().expect("auth-class carries a cause");
3717        assert_eq!(cause, "Failed to get AWS credentials: CredentialsNotLoaded");
3718        // Display carries only the fix-naming lead, no chain.
3719        assert!(
3720            !failure.to_string().contains("file a bug report"),
3721            "lead must not trail the chain: {failure}"
3722        );
3723        // OccUnsupported's detail is already curated into Display.
3724        let occ = CheckFailure::OccUnsupported {
3725            detail: "put-if-none-match ignored".to_owned(),
3726        };
3727        assert!(occ.concise_cause().is_none());
3728        // Oversized single-line causes middle-truncate, keeping the tail
3729        // (wrapped transport errors put the root cause at the end).
3730        let long = CheckFailure::Io {
3731            source: anyhow!(format!("{} dns error: lookup failed", "x".repeat(500))),
3732        };
3733        let cause = long.concise_cause().expect("io carries a cause");
3734        assert!(cause.contains(" ... "), "long causes truncate: {cause}");
3735        assert!(
3736            cause.ends_with("dns error: lookup failed"),
3737            "the tail survives: {cause}"
3738        );
3739    }
3740
3741    #[tokio::test]
3742    async fn storage_check_passes_on_memory_backend() {
3743        let resolved = StorageUrl::parse("memory://check/probe")
3744            .unwrap()
3745            .resolve(&BTreeMap::new())
3746            .unwrap();
3747        storage_check(&resolved).await.expect("memory probe passes");
3748    }
3749
3750    fn stat(bytes: u64) -> FragmentStat {
3751        FragmentStat {
3752            bytes: Some(bytes),
3753            rows: bytes / 1_000,
3754            deleted_rows: 0,
3755        }
3756    }
3757
3758    #[test]
3759    fn compaction_veto_blocks_absorb_keeps_peers() {
3760        // One 665 MiB tail fragment + tiny appends -> vetoed.
3761        let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
3762        assert!(!keep_task(&absorb, 64, 0.1));
3763        // Peer-sized merge halves fragment count -> kept.
3764        let peers = [stat(300_000_000), stat(300_000_000)];
3765        assert!(keep_task(&peers, 64, 0.1));
3766        // Remainder reaches largest / COMPACTION_ABSORB_FACTOR -> kept.
3767        let tiered = [stat(400_000), stat(60_000), stat(40_000)];
3768        assert!(keep_task(&tiered, 64, 0.1));
3769    }
3770
3771    #[test]
3772    fn compaction_veto_passes_deletions_and_cap() {
3773        let mut deleting = stat(665_000_000);
3774        deleting.deleted_rows = deleting.rows / 5;
3775        assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
3776
3777        let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
3778            .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
3779            .collect();
3780        assert!(keep_task(&wide, 64, 0.1));
3781    }
3782
3783    #[test]
3784    fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
3785        let mut unknown = stat(665_000_000);
3786        unknown.bytes = None;
3787        // Rows comparison: 665k vs 3k -> still vetoed.
3788        assert!(!keep_task(
3789            &[unknown, stat(1_000_000), stat(2_000_000)],
3790            64,
3791            0.1
3792        ));
3793    }
3794
3795    #[test]
3796    fn derived_target_rows_tracks_row_size_and_clamps() {
3797        // ~1.3 KiB rows -> ~200k-row target.
3798        let parts_like = [FragmentStat {
3799            bytes: Some(665_000_000),
3800            rows: 511_000,
3801            deleted_rows: 0,
3802        }];
3803        let target = derived_target_rows(&parts_like);
3804        assert!((150_000..300_000).contains(&target), "{target}");
3805        // No usable sizes -> Lance default.
3806        let unknown = [FragmentStat {
3807            bytes: None,
3808            rows: 511_000,
3809            deleted_rows: 0,
3810        }];
3811        assert_eq!(
3812            derived_target_rows(&unknown),
3813            MAX_TARGET_ROWS_PER_FRAGMENT as usize
3814        );
3815        // Tiny rows clamp at the ceiling, huge rows at the floor.
3816        let tiny = [FragmentStat {
3817            bytes: Some(1_000_000),
3818            rows: 100_000,
3819            deleted_rows: 0,
3820        }];
3821        assert_eq!(
3822            derived_target_rows(&tiny),
3823            MAX_TARGET_ROWS_PER_FRAGMENT as usize
3824        );
3825        let huge = [FragmentStat {
3826            bytes: Some(1_000_000_000),
3827            rows: 100,
3828            deleted_rows: 0,
3829        }];
3830        assert_eq!(
3831            derived_target_rows(&huge),
3832            MIN_TARGET_ROWS_PER_FRAGMENT as usize
3833        );
3834    }
3835
3836    #[test]
3837    fn namespace_error_code_walks_wrapped_chain() {
3838        let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
3839            message: "missing".into(),
3840        }));
3841        assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
3842
3843        let wrapped = lance::Error::namespace_source(Box::new(direct));
3844        assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
3845
3846        let other_code =
3847            lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
3848                message: "nope".into(),
3849            }));
3850        assert!(!is_namespace_error_code(
3851            &other_code,
3852            ErrorCode::TableNotFound
3853        ));
3854
3855        let not_namespace = lance::Error::internal("unrelated");
3856        assert!(!is_namespace_error_code(
3857            &not_namespace,
3858            ErrorCode::TableNotFound
3859        ));
3860    }
3861
3862    /// Round-trip: opening a fresh data dir through `lance-namespace`
3863    /// produces all three tables, and `Handle::scan` returns an empty batch
3864    /// for each (no spurious schema mismatch, no namespace error).
3865    #[tokio::test]
3866    async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
3867        let temp = TempDir::new()?;
3868        let url = Url::from_directory_path(temp.path())
3869            .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
3870        let handle = Handle::open(&url).await?;
3871        // Each table has its own PK column; project the canonical one so the
3872        // scan is exercised end-to-end (catalog -> dataset -> scanner -> batch).
3873        let cases: [(Table, &[&str]); 3] = [
3874            (Table::Sessions, &["id"]),
3875            (Table::Messages, &["id"]),
3876            (Table::Parts, &["id"]),
3877        ];
3878        for (table, projection) in cases {
3879            let scanner = handle
3880                .scan(table, ScanOpts::project_only(projection))
3881                .await?;
3882            let batch = scanner.try_into_batch().await?;
3883            assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
3884        }
3885        Ok(())
3886    }
3887}