1use crate::{
5 RetryPolicy,
6 config::{self, CredsSet},
7 handlers::NamespaceIdent,
8 sessions::{self},
9};
10use anyhow::{Context, Result, anyhow, bail};
11use lance::Dataset;
12use lance::dataset::builder::DatasetBuilder;
13use lance::dataset::index::DatasetIndexRemapperOptions;
14use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
15pub use lance::dataset::write::merge_insert::MergeStats;
16use lance::dataset::write::merge_insert::SourceDedupeBehavior;
17use lance::dataset::{InsertBuilder, MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
18pub use lance::dataset::{WriteParams, WriteStats};
19use lance::deps::arrow_array::{Array, RecordBatch, RecordBatchIterator, StringArray};
20use lance::deps::datafusion::physical_plan::SendableRecordBatchStream;
21use lance::index::DatasetIndexExt;
22use lance::index::DatasetIndexInternalExt;
23use lance::index::vector::VectorIndexParams;
24use lance::session::Session;
25use lance_index::IndexType;
26use lance_index::optimize::OptimizeOptions;
27use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
28use lance_index::vector::ivf::IvfBuildParams;
29use lance_index::vector::sq::builder::SQBuildParams;
30use lance_io::object_store::{
31 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, uri_to_url,
32};
33use lance_linalg::distance::MetricType;
34use lance_namespace::LanceNamespace;
35use lance_namespace::error::{ErrorCode, NamespaceError};
36use lance_namespace::models::DescribeTableRequest;
37use lance_namespace_impls::ConnectBuilder;
38use std::{
39 collections::{BTreeMap, HashMap},
40 sync::Arc,
41 time::{Duration, Instant},
42};
43use tokio::sync::{Mutex, OnceCell};
44use tokio_stream::StreamExt;
45use url::Url;
46pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
51
52pub const DELTA_MERGE_THRESHOLD: usize = 4;
59
60#[derive(Debug, Clone, PartialEq)]
70pub struct StorageUrl {
71 canonical: Url,
75 lance: Url,
77 scheme_options: Vec<(&'static str, String)>,
79 query_options: Vec<(&'static str, String)>,
81 creds_pointer: Option<String>,
83 endpoint: Option<S3Endpoint>,
88}
89
90#[derive(Debug, Clone, PartialEq)]
91struct S3Endpoint {
92 scheme: &'static str,
93 authority: String,
95 bucket: String,
96}
97
98const RECOGNIZED_QUERY_PARAMS: [&str; 3] = ["creds", "region", "virtual_hosted_style_request"];
102
103impl StorageUrl {
104 pub fn parse(input: &str) -> Result<Self> {
108 let trimmed = input.trim();
109 if trimmed.is_empty() {
110 bail!("storage path is empty");
111 }
112 if !trimmed.contains("://") || trimmed.starts_with("file://") {
115 let url =
116 uri_to_url(trimmed).with_context(|| format!("invalid storage path {trimmed:?}"))?;
117 if url.query().is_some() {
122 bail!("storage URL {trimmed:?} carries query params; local URLs take none");
123 }
124 return Ok(Self::plain(url));
125 }
126 let url =
127 Url::parse(trimmed).with_context(|| format!("invalid storage URL {trimmed:?}"))?;
128 if !url.username().is_empty() || url.password().is_some() {
130 bail!(
131 "storage URL {trimmed:?} embeds credentials; put them in [creds.*] (or POND_CREDS_*) instead"
132 );
133 }
134 match url.scheme() {
135 "memory" | "shared-memory" => {
136 if url.query().is_some() {
137 bail!(
138 "storage URL {trimmed:?} carries query params; {}:// URLs take none",
139 url.scheme(),
140 );
141 }
142 Ok(Self::plain(url))
143 }
144 "s3" | "gs" => {
145 let (canonical, query_options, creds_pointer) = strip_query(url)?;
146 let mut lance = canonical.clone();
147 lance.set_query(None);
148 Ok(Self {
149 canonical,
150 lance,
151 scheme_options: Vec::new(),
152 query_options,
153 creds_pointer,
154 endpoint: None,
155 })
156 }
157 "s3+https" | "s3+http" => {
158 let (mut canonical, query_options, creds_pointer) = strip_query(url)?;
159 let tls = canonical.scheme() == "s3+https";
160 if canonical.port() == Some(if tls { 443 } else { 80 }) {
164 let _ = canonical.set_port(None);
165 }
166 let host = canonical
167 .host_str()
168 .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no endpoint host"))?;
169 let endpoint_authority = match canonical.port() {
170 Some(port) => format!("{host}:{port}"),
171 None => host.to_owned(),
172 };
173 let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
174 let bucket = segments.next().unwrap_or_default().to_owned();
175 let prefix = segments.next().unwrap_or_default().to_owned();
176 if bucket.is_empty() {
177 bail!(
178 "storage URL {trimmed:?} is missing the bucket: the form is {}://host/bucket/prefix",
179 canonical.scheme(),
180 );
181 }
182 let lance = Url::parse(&format!("s3://{bucket}/{prefix}")).with_context(|| {
183 format!("storage URL {trimmed:?}: bucket/prefix do not form a valid s3:// URL")
184 })?;
185 let scheme = if tls { "https" } else { "http" };
186 let virtual_hosted = host.parse::<std::net::IpAddr>().is_err()
195 && !matches!(canonical.host(), Some(url::Host::Ipv6(_)));
196 let scheme_options = vec![
197 ("allow_http", (!tls).to_string()),
198 ("virtual_hosted_style_request", virtual_hosted.to_string()),
199 ("region", "us-east-1".to_owned()),
206 ];
207 Ok(Self {
208 canonical,
209 lance,
210 scheme_options,
211 query_options,
212 creds_pointer,
213 endpoint: Some(S3Endpoint {
214 scheme,
215 authority: endpoint_authority,
216 bucket,
217 }),
218 })
219 }
220 "az" => {
221 let (canonical, query_options, creds_pointer) = strip_query(url)?;
222 let account = canonical
223 .host_str()
224 .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no account: the form is az://account/container/prefix"))?
225 .to_owned();
226 let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
227 let container = segments.next().unwrap_or_default();
228 if container.is_empty() {
229 bail!(
230 "storage URL {trimmed:?} is missing the container: the form is az://account/container/prefix"
231 );
232 }
233 let prefix = segments.next().unwrap_or_default();
234 let lance = Url::parse(&format!("az://{container}/{prefix}"))
235 .with_context(|| format!("storage URL {trimmed:?}: container/prefix do not form a valid az:// URL"))?;
236 Ok(Self {
237 canonical,
238 lance,
239 scheme_options: vec![("account_name", account)],
240 query_options,
241 creds_pointer,
242 endpoint: None,
243 })
244 }
245 other => bail!(
246 "storage URL scheme {other:?} not recognized; use a local path, s3://, s3+https://, s3+http://, gs://, or az://"
247 ),
248 }
249 }
250
251 fn plain(url: Url) -> Self {
253 Self {
254 canonical: url.clone(),
255 lance: url,
256 scheme_options: Vec::new(),
257 query_options: Vec::new(),
258 creds_pointer: None,
259 endpoint: None,
260 }
261 }
262
263 pub fn lance_url(&self) -> &Url {
265 &self.lance
266 }
267
268 pub fn canonical(&self) -> &Url {
271 &self.canonical
272 }
273
274 pub fn is_local(&self) -> bool {
275 config::is_local(&self.canonical)
276 }
277
278 pub fn display(&self) -> String {
280 config::display(&self.canonical)
281 }
282
283 fn takes_credentials(&self) -> bool {
286 !matches!(
287 self.canonical.scheme(),
288 "file" | "file+uring" | "memory" | "shared-memory"
289 )
290 }
291
292 pub fn resolve(&self, creds: &BTreeMap<String, CredsSet>) -> Result<ResolvedStorage> {
299 if !self.takes_credentials() {
300 return Ok(ResolvedStorage {
301 storage: self.clone(),
302 options: HashMap::new(),
303 binding: CredsBinding::NotApplicable,
304 });
305 }
306 let matched: Option<(&String, &CredsSet, BindVia)> = match &self.creds_pointer {
307 Some(name) => {
308 let set = creds.get(name).ok_or_else(|| {
309 anyhow!(
310 "URL names ?creds={name} but no [creds.{name}] set is configured; define it or drop the pointer"
311 )
312 })?;
313 Some((name, set, BindVia::Pointer))
314 }
315 None => {
316 let mut best: Option<(&String, &CredsSet, String)> = None;
317 for (name, set) in creds {
318 let Some(scope) = &set.scope else { continue };
319 let scope_url = parse_scope(scope).with_context(|| {
320 format!("[creds.{name}] scope {scope:?} is not a valid URL prefix")
321 })?;
322 if scope_matches(&scope_url, &self.canonical)
323 && best
324 .as_ref()
325 .is_none_or(|(_, _, len)| scope_url.as_str().len() > len.len())
326 {
327 best = Some((name, set, scope_url.as_str().to_owned()));
328 }
329 }
330 match best {
331 Some((name, set, _)) => Some((name, set, BindVia::Scope)),
332 None => creds
333 .iter()
334 .find(|(_, set)| set.scope.is_none())
335 .map(|(name, set)| (name, set, BindVia::CatchAll)),
336 }
337 }
338 };
339 let mut options: HashMap<String, String> = self
340 .scheme_options
341 .iter()
342 .map(|(key, value)| ((*key).to_owned(), value.clone()))
343 .collect();
344 let binding = match matched {
345 None => CredsBinding::Ambient,
346 Some((name, set, via)) => {
347 if let Some(region) = &set.region {
348 options.insert("region".to_owned(), region.clone());
349 }
350 if let Some(virtual_hosted) = set.virtual_hosted_style_request {
351 options.insert(
352 "virtual_hosted_style_request".to_owned(),
353 virtual_hosted.to_string(),
354 );
355 }
356 for (key, value) in &set.extra {
357 options.insert(key.clone(), value.clone());
358 }
359 if let Some(value) = materialize_secret(
360 name,
361 "access_key_id",
362 set.access_key_id.as_deref(),
363 set.access_key_id_file.as_deref(),
364 None,
365 )? {
366 options.insert("access_key_id".to_owned(), value);
367 }
368 if let Some(value) = materialize_secret(
369 name,
370 "secret_access_key",
371 set.secret_access_key.as_deref(),
372 set.secret_access_key_file.as_deref(),
373 set.secret_access_key_command.as_deref(),
374 )? {
375 options.insert("secret_access_key".to_owned(), value);
376 }
377 CredsBinding::Set {
378 name: name.clone(),
379 via,
380 }
381 }
382 };
383 for (key, value) in &self.query_options {
384 options.insert((*key).to_owned(), value.clone());
385 }
386 if let Some(endpoint) = &self.endpoint
391 && !options.keys().any(|key| {
392 key.eq_ignore_ascii_case("endpoint") || key.eq_ignore_ascii_case("aws_endpoint")
393 })
394 {
395 let virtual_hosted = options
396 .get("virtual_hosted_style_request")
397 .is_some_and(|value| value == "true");
398 let url = if virtual_hosted {
399 format!(
400 "{}://{}.{}",
401 endpoint.scheme, endpoint.bucket, endpoint.authority
402 )
403 } else {
404 format!("{}://{}", endpoint.scheme, endpoint.authority)
405 };
406 options.insert("endpoint".to_owned(), url);
407 }
408 Ok(ResolvedStorage {
409 storage: self.clone(),
410 options,
411 binding,
412 })
413 }
414}
415
416type StrippedQuery = (Url, Vec<(&'static str, String)>, Option<String>);
418
419fn strip_query(url: Url) -> Result<StrippedQuery> {
421 let mut query_options = Vec::new();
422 let mut creds_pointer = None;
423 for (key, value) in url.query_pairs() {
424 match RECOGNIZED_QUERY_PARAMS
425 .iter()
426 .find(|known| **known == key.as_ref())
427 {
428 Some(&"creds") => creds_pointer = Some(value.into_owned()),
429 Some(known) => query_options.push((*known, value.into_owned())),
430 None => bail!(
431 "storage URL query param {key:?} not recognized (known: {})",
432 RECOGNIZED_QUERY_PARAMS.join(", "),
433 ),
434 }
435 }
436 let mut canonical = url;
437 canonical.set_query(None);
438 Ok((canonical, query_options, creds_pointer))
439}
440
441pub(crate) fn parse_scope(scope: &str) -> Result<Url> {
444 let mut url = Url::parse(scope.trim())?;
445 if !url.username().is_empty() || url.password().is_some() {
446 bail!("scope embeds credentials");
447 }
448 if url.query().is_some() {
449 bail!("scope carries query params; scopes are plain URL prefixes");
450 }
451 match (url.scheme(), url.port()) {
452 ("s3+https", Some(443)) | ("s3+http", Some(80)) => {
453 let _ = url.set_port(None);
454 }
455 _ => {}
456 }
457 Ok(url)
458}
459
460fn scope_matches(scope: &Url, address: &Url) -> bool {
465 if scope.scheme() != address.scheme()
466 || scope.host_str() != address.host_str()
467 || scope.port() != address.port()
468 {
469 return false;
470 }
471 let scope_path = scope.path().trim_end_matches('/');
472 let address_path = address.path().trim_end_matches('/');
473 address_path == scope_path
474 || address_path
475 .strip_prefix(scope_path)
476 .is_some_and(|rest| rest.starts_with('/'))
477}
478
479#[derive(Debug, Clone, Copy, PartialEq, Eq)]
482pub enum BindVia {
483 Pointer,
485 Scope,
487 CatchAll,
489}
490
491#[derive(Debug, Clone, PartialEq)]
492pub enum CredsBinding {
493 Set { name: String, via: BindVia },
495 Ambient,
500 NotApplicable,
502}
503
504impl CredsBinding {
505 pub fn describe(&self) -> String {
507 match self {
508 Self::Set { name, via } => {
509 let via = match via {
510 BindVia::Pointer => "?creds",
511 BindVia::Scope => "scope match",
512 BindVia::CatchAll => "catch-all",
513 };
514 format!("creds {name} ({via})")
515 }
516 Self::Ambient => "ambient chain".to_owned(),
517 Self::NotApplicable => "local (no credentials)".to_owned(),
518 }
519 }
520}
521
522#[derive(Debug, Clone)]
526pub struct ResolvedStorage {
527 storage: StorageUrl,
528 pub options: HashMap<String, String>,
529 pub binding: CredsBinding,
530}
531
532impl ResolvedStorage {
533 pub fn lance_url(&self) -> &Url {
534 self.storage.lance_url()
535 }
536
537 pub fn display(&self) -> String {
538 self.storage.display()
539 }
540}
541
542pub fn unmatched_creds_sets<'c>(
547 resolved: &[&ResolvedStorage],
548 creds: &'c BTreeMap<String, CredsSet>,
549) -> Vec<&'c str> {
550 if resolved
551 .iter()
552 .all(|entry| matches!(entry.binding, CredsBinding::NotApplicable))
553 {
554 return Vec::new();
555 }
556 creds
557 .keys()
558 .filter(|name| {
559 !resolved.iter().any(|entry| {
560 matches!(&entry.binding, CredsBinding::Set { name: bound, .. } if bound == *name)
561 })
562 })
563 .map(String::as_str)
564 .collect()
565}
566
567fn materialize_secret(
570 set: &str,
571 field: &str,
572 inline: Option<&str>,
573 file: Option<&std::path::Path>,
574 command: Option<&str>,
575) -> Result<Option<String>> {
576 if let Some(value) = inline {
577 return Ok(Some(value.to_owned()));
578 }
579 if let Some(path) = file {
580 let text = std::fs::read_to_string(path).with_context(|| {
581 format!(
582 "[creds.{set}] {field}_file: failed to read {}",
583 path.display()
584 )
585 })?;
586 return Ok(Some(strip_one_newline(text)));
587 }
588 if let Some(command) = command {
589 return Ok(Some(run_secret_command(set, field, command)?));
590 }
591 Ok(None)
592}
593
594fn run_secret_command(set: &str, field: &str, command: &str) -> Result<String> {
597 static CACHE: std::sync::OnceLock<std::sync::Mutex<HashMap<String, String>>> =
598 std::sync::OnceLock::new();
599 let cache = CACHE.get_or_init(Default::default);
600 if let Some(hit) = cache
601 .lock()
602 .unwrap_or_else(std::sync::PoisonError::into_inner)
603 .get(command)
604 {
605 return Ok(hit.clone());
606 }
607 let output = std::process::Command::new("sh")
608 .arg("-c")
609 .arg(command)
610 .output()
611 .with_context(|| format!("[creds.{set}] {field}_command failed to spawn: {command}"))?;
612 if !output.status.success() {
613 bail!(
614 "[creds.{set}] {field}_command exited {}: {command}\n{}",
615 output.status,
616 String::from_utf8_lossy(&output.stderr).trim_end(),
617 );
618 }
619 let value = strip_one_newline(
620 String::from_utf8(output.stdout)
621 .with_context(|| format!("[creds.{set}] {field}_command output is not UTF-8"))?,
622 );
623 cache
624 .lock()
625 .unwrap_or_else(std::sync::PoisonError::into_inner)
626 .insert(command.to_owned(), value.clone());
627 Ok(value)
628}
629
630fn strip_one_newline(mut text: String) -> String {
633 if text.ends_with('\n') {
634 text.pop();
635 if text.ends_with('\r') {
636 text.pop();
637 }
638 }
639 text
640}
641
642#[derive(Debug, thiserror::Error)]
650pub enum CheckFailure {
651 #[error(
652 "authentication failed and no creds set matched this URL; add one with `pond creds add` (or set POND_CREDS_*), or provide ambient AWS_* credentials"
653 )]
654 NoCreds { source: anyhow::Error },
655 #[error("authentication failed using creds set {set:?}; check its keys and scope")]
656 Auth { set: String, source: anyhow::Error },
657 #[error(
658 "backend does not enforce conditional writes (If-None-Match); concurrent pond writers would corrupt each other - {detail}"
659 )]
660 OccUnsupported { detail: String },
661 #[error("storage probe failed")]
662 Io { source: anyhow::Error },
663}
664
665impl CheckFailure {
666 pub fn concise_cause(&self) -> Option<String> {
672 let source = match self {
673 Self::NoCreds { source } | Self::Auth { source, .. } | Self::Io { source } => source,
674 Self::OccUnsupported { .. } => return None,
675 };
676 Some(condense_error_chain(source))
677 }
678}
679
680fn condense_error_chain(error: &anyhow::Error) -> String {
687 let mut text = error
688 .chain()
689 .last()
690 .map(ToString::to_string)
691 .unwrap_or_else(|| format!("{error:#}"));
692 if let Some(pos) = text.find(", <WORKSPACE>") {
693 text.truncate(pos);
694 }
695 text = text.replace(
696 "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. ",
697 "",
698 );
699 let line = text.split_whitespace().collect::<Vec<_>>().join(" ");
700 const HEAD: usize = 120;
701 const TAIL: usize = 120;
702 let chars: Vec<char> = line.chars().collect();
703 if chars.len() > HEAD + TAIL + 5 {
704 let head: String = chars[..HEAD].iter().collect();
705 let tail: String = chars[chars.len() - TAIL..].iter().collect();
706 format!("{head} ... {tail}")
707 } else {
708 line
709 }
710}
711
712pub async fn storage_check(resolved: &ResolvedStorage) -> std::result::Result<(), CheckFailure> {
717 use object_store::{Error as OsError, ObjectStoreExt, PutMode, PutOptions, PutPayload};
718
719 let classify =
720 |error: OsError, step: &str| classify_check_error(error, &resolved.binding, step);
721
722 let probe_uri = format!(
723 "{}/_config-check/{}",
724 resolved.lance_url().as_str().trim_end_matches('/'),
725 uuid::Uuid::now_v7(),
726 );
727 let params = ObjectStoreParams {
728 storage_options_accessor: (!resolved.options.is_empty()).then(|| {
729 Arc::new(StorageOptionsAccessor::with_static_options(
730 resolved.options.clone(),
731 ))
732 }),
733 ..Default::default()
734 };
735 let registry = Arc::new(ObjectStoreRegistry::default());
736 let (store, path) = ObjectStore::from_uri_and_params(registry, &probe_uri, ¶ms)
737 .await
738 .map_err(|error| CheckFailure::Io {
739 source: anyhow!(error).context(format!("failed to open object store for {probe_uri}")),
740 })?;
741
742 let body: &[u8] = b"pond storage check";
743 let create = PutOptions::from(PutMode::Create);
744 store
745 .inner
746 .put_opts(&path, PutPayload::from_static(body), create.clone())
747 .await
748 .map_err(|error| classify(error, "initial conditional put"))?;
749 let outcome = async {
753 match store
758 .inner
759 .put_opts(&path, PutPayload::from_static(body), create)
760 .await
761 {
762 Err(OsError::AlreadyExists { .. }) => {}
763 Ok(_) => {
764 return Err(CheckFailure::OccUnsupported {
765 detail: "a second create over an existing key succeeded".to_owned(),
766 });
767 }
768 Err(OsError::NotImplemented { .. }) => {
769 return Err(CheckFailure::OccUnsupported {
770 detail: "the backend rejects conditional puts as unimplemented".to_owned(),
771 });
772 }
773 Err(error) => return Err(classify(error, "conditional-put probe")),
774 }
775 let read_back = store
776 .inner
777 .get(&path)
778 .await
779 .map_err(|error| classify(error, "read-back"))?
780 .bytes()
781 .await
782 .map_err(|error| classify(error, "read-back body"))?;
783 if read_back.as_ref() != body {
784 return Err(CheckFailure::Io {
785 source: anyhow!("read-back returned different bytes than written"),
786 });
787 }
788 Ok(())
789 }
790 .await;
791 let cleanup = store.inner.delete(&path).await;
792 outcome?;
793 cleanup.map_err(|error| classify(error, "cleanup delete"))?;
794 Ok(())
795}
796
797fn classify_check_error(
801 error: object_store::Error,
802 binding: &CredsBinding,
803 step: &str,
804) -> CheckFailure {
805 use object_store::Error as OsError;
806 let auth_class = matches!(
811 error,
812 OsError::Unauthenticated { .. } | OsError::PermissionDenied { .. }
813 ) || {
814 let rendered = error.to_string();
815 rendered.contains("CredentialsNotLoaded")
816 || rendered.contains("no providers in chain provided credentials")
817 };
818 match (auth_class, binding) {
819 (true, CredsBinding::Set { name, .. }) => CheckFailure::Auth {
820 set: name.clone(),
821 source: anyhow!(error).context(step.to_owned()),
822 },
823 (true, _) => CheckFailure::NoCreds {
824 source: anyhow!(error).context(step.to_owned()),
825 },
826 (false, _) => CheckFailure::Io {
827 source: anyhow!(error).context(step.to_owned()),
828 },
829 }
830}
831
832pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
836
837pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
841
842const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
843const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
845
846pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
849
850pub fn default_cleanup_older_than() -> chrono::Duration {
856 chrono::Duration::hours(1)
860}
861
862#[derive(Debug, Clone, Copy)]
867pub struct MaintenancePolicy {
868 pub compaction_fragment_cap: usize,
870 pub cleanup_older_than: chrono::Duration,
872}
873
874impl MaintenancePolicy {
875 pub fn always_compact() -> Self {
877 Self {
878 compaction_fragment_cap: 0,
879 cleanup_older_than: default_cleanup_older_than(),
880 }
881 }
882}
883
884struct FragmentStat {
885 bytes: Option<u64>,
887 rows: u64,
888 deleted_rows: u64,
889}
890
891fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
894 fragment.files.iter().try_fold(0u64, |total, file| {
895 Some(total + file.file_size_bytes.get()?.get())
896 })
897}
898
899fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
900 FragmentStat {
901 bytes: fragment_bytes(fragment),
902 rows: fragment.physical_rows.unwrap_or(0) as u64,
903 deleted_rows: fragment
904 .deletion_file
905 .as_ref()
906 .and_then(|deletions| deletions.num_deleted_rows)
907 .unwrap_or(0) as u64,
908 }
909}
910
911fn derived_target_rows(stats: &[FragmentStat]) -> usize {
913 let (mut bytes, mut rows) = (0u64, 0u64);
914 for stat in stats {
915 if let Some(fragment_bytes) = stat.bytes
916 && stat.rows > 0
917 {
918 bytes += fragment_bytes;
919 rows += stat.rows;
920 }
921 }
922 if bytes == 0 || rows == 0 {
923 return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
924 }
925 let avg_row_bytes = (bytes / rows).max(1);
926 (TARGET_FRAGMENT_BYTES / avg_row_bytes)
927 .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
928}
929
930fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
935 if stats.iter().any(|stat| {
936 stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
937 }) {
938 return true;
939 }
940 if stats.len() >= cap {
941 return true;
942 }
943 let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
944 stats.iter().filter_map(|stat| stat.bytes).collect()
945 } else {
946 stats.iter().map(|stat| stat.rows).collect()
947 };
948 let total: u64 = weights.iter().sum();
949 let largest = weights.iter().copied().max().unwrap_or(0);
950 (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
951}
952
953#[derive(Debug, Clone)]
956pub struct IndexIntent {
957 pub name: &'static str,
960 pub column: &'static str,
962 pub trigger: IndexTrigger,
964 pub params: IndexParamsKind,
967}
968
969#[derive(Debug, Clone)]
971pub enum IndexTrigger {
972 OnAnyRows,
975 OnNonNullCount {
978 column: &'static str,
979 threshold: usize,
980 },
981}
982
983#[derive(Debug, Clone)]
986pub enum IndexParamsKind {
987 Scalar(BuiltinIndexType),
990 InvertedFtsWord,
996 IvfSqCosine { num_bits: u16, max_iters: usize },
1005}
1006
1007impl IndexTrigger {
1008 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
1009 match self {
1010 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
1011 Self::OnNonNullCount { column, threshold } => {
1012 let count = dataset
1013 .count_rows(Some(format!("{column} IS NOT NULL")))
1014 .await?;
1015 Ok(count >= *threshold)
1016 }
1017 }
1018 }
1019}
1020
1021impl IndexParamsKind {
1022 fn index_type(&self) -> IndexType {
1023 match self {
1024 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
1025 Self::Scalar(BuiltinIndexType::ZoneMap) => IndexType::ZoneMap,
1026 Self::Scalar(_) => IndexType::BTree,
1027 Self::InvertedFtsWord => IndexType::Inverted,
1028 Self::IvfSqCosine { .. } => IndexType::Vector,
1029 }
1030 }
1031
1032 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
1033 match self {
1034 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
1035 Self::InvertedFtsWord => Ok(Box::new(
1036 InvertedIndexParams::default()
1037 .base_tokenizer("simple".to_owned())
1038 .stem(true)
1039 .remove_stop_words(false),
1040 )),
1041 Self::IvfSqCosine {
1042 num_bits,
1043 max_iters,
1044 } => {
1045 let count = dataset
1046 .count_rows(Some("vector IS NOT NULL".to_owned()))
1047 .await?;
1048 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
1049 let mut ivf = IvfBuildParams::new(partitions);
1050 ivf.max_iters = *max_iters;
1051 let sq = SQBuildParams {
1052 num_bits: *num_bits,
1053 ..Default::default()
1054 };
1055 Ok(Box::new(VectorIndexParams::with_ivf_sq_params(
1056 MetricType::Cosine,
1057 ivf,
1058 sq,
1059 )))
1060 }
1061 }
1062 }
1063}
1064
1065#[derive(Debug, Clone, PartialEq, Eq)]
1066pub struct IndexStatus {
1067 pub table: Table,
1068 pub intent_name: String,
1069 pub fragments_covered: usize,
1070 pub unindexed_fragments: usize,
1071 pub unindexed_rows: usize,
1072 pub exists: bool,
1073}
1074
1075#[derive(Debug, Clone, Copy)]
1080pub struct ConflictExhausted {
1081 pub attempts: u8,
1082}
1083
1084impl std::fmt::Display for ConflictExhausted {
1085 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1086 write!(
1087 formatter,
1088 "commit conflict exhausted after {} attempt(s)",
1089 self.attempts
1090 )
1091 }
1092}
1093
1094impl std::error::Error for ConflictExhausted {}
1095
1096#[derive(Debug)]
1101pub enum PhaseOutcome {
1102 Ok,
1104 Noop,
1106 SkippedConflict,
1109 Failed(anyhow::Error),
1111 NotAttempted,
1114}
1115
1116impl PhaseOutcome {
1117 pub fn is_failed(&self) -> bool {
1118 matches!(self, Self::Failed(_))
1119 }
1120}
1121
1122#[derive(Debug)]
1124pub struct TableOptimizeOutcome {
1125 pub table: Table,
1126 pub indices: PhaseOutcome,
1127 pub compaction: PhaseOutcome,
1128}
1129
1130#[derive(Debug, Clone)]
1133pub enum OptimizeEvent {
1134 PhaseStart {
1135 table: Table,
1136 phase: OptimizePhase,
1137 detail: Option<String>,
1138 },
1139 PhaseDone {
1140 table: Table,
1141 phase: OptimizePhase,
1142 elapsed_ms: u64,
1143 },
1144 IndexStage {
1149 table: Table,
1150 index: String,
1151 stage: String,
1152 completed: u64,
1153 total: Option<u64>,
1154 unit: String,
1155 },
1156}
1157
1158#[derive(Debug, Clone, Copy)]
1159pub enum OptimizePhase {
1160 Compact,
1161 Cleanup,
1162 IndexCreate,
1163 IndexRebuild,
1164 IndexAppend,
1165}
1166
1167impl OptimizePhase {
1168 pub fn label(self) -> &'static str {
1169 match self {
1170 Self::Compact => "compact",
1171 Self::Cleanup => "cleanup",
1172 Self::IndexCreate => "index-create",
1173 Self::IndexRebuild => "index-rebuild",
1174 Self::IndexAppend => "index-append",
1175 }
1176 }
1177}
1178
1179pub type OptimizeProgressFn = Arc<dyn Fn(OptimizeEvent) + Send + Sync>;
1183
1184fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
1185 if let Some(callback) = progress {
1186 callback(event);
1187 }
1188}
1189
1190struct PondIndexProgress {
1199 callback: OptimizeProgressFn,
1200 table: Table,
1201 index: String,
1202 state: std::sync::Mutex<PondIndexStageState>,
1203}
1204
1205impl std::fmt::Debug for PondIndexProgress {
1208 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1209 f.debug_struct("PondIndexProgress")
1210 .field("table", &self.table)
1211 .field("index", &self.index)
1212 .finish_non_exhaustive()
1213 }
1214}
1215
1216#[derive(Debug, Default)]
1217struct PondIndexStageState {
1218 total: Option<u64>,
1219 unit: String,
1220 last_emit: Option<Instant>,
1221}
1222
1223impl PondIndexProgress {
1224 fn new(callback: OptimizeProgressFn, table: Table, index: String) -> Arc<Self> {
1225 Arc::new(Self {
1226 callback,
1227 table,
1228 index,
1229 state: std::sync::Mutex::new(PondIndexStageState::default()),
1230 })
1231 }
1232}
1233
1234#[async_trait::async_trait]
1235impl lance_index::progress::IndexBuildProgress for PondIndexProgress {
1236 async fn stage_start(&self, stage: &str, total: Option<u64>, unit: &str) -> lance::Result<()> {
1237 if let Ok(mut state) = self.state.lock() {
1238 state.total = total;
1239 state.unit = unit.to_owned();
1240 state.last_emit = Some(Instant::now());
1241 }
1242 (self.callback)(OptimizeEvent::IndexStage {
1243 table: self.table,
1244 index: self.index.clone(),
1245 stage: stage.to_owned(),
1246 completed: 0,
1247 total,
1248 unit: unit.to_owned(),
1249 });
1250 Ok(())
1251 }
1252
1253 async fn stage_progress(&self, stage: &str, completed: u64) -> lance::Result<()> {
1254 let (total, unit) = {
1255 let Ok(mut state) = self.state.lock() else {
1256 return Ok(());
1257 };
1258 let now = Instant::now();
1259 if let Some(prev) = state.last_emit
1260 && now.duration_since(prev) < Duration::from_millis(100)
1261 {
1262 return Ok(());
1263 }
1264 state.last_emit = Some(now);
1265 (state.total, state.unit.clone())
1266 };
1267 (self.callback)(OptimizeEvent::IndexStage {
1268 table: self.table,
1269 index: self.index.clone(),
1270 stage: stage.to_owned(),
1271 completed,
1272 total,
1273 unit,
1274 });
1275 Ok(())
1276 }
1277
1278 async fn stage_complete(&self, stage: &str) -> lance::Result<()> {
1279 let (total, unit) = {
1280 let Ok(state) = self.state.lock() else {
1281 return Ok(());
1282 };
1283 (state.total, state.unit.clone())
1284 };
1285 (self.callback)(OptimizeEvent::IndexStage {
1286 table: self.table,
1287 index: self.index.clone(),
1288 stage: stage.to_owned(),
1289 completed: total.unwrap_or(0),
1290 total,
1291 unit,
1292 });
1293 Ok(())
1294 }
1295}
1296
1297fn lance_progress(
1298 progress: Option<&OptimizeProgressFn>,
1299 table: Table,
1300 index: &str,
1301) -> Arc<dyn lance_index::progress::IndexBuildProgress> {
1302 match progress {
1303 Some(callback) => PondIndexProgress::new(callback.clone(), table, index.to_owned()),
1304 None => Arc::new(lance_index::progress::NoopIndexBuildProgress),
1305 }
1306}
1307
1308pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
1312 error.downcast_ref::<lance::Error>().is_some_and(|err| {
1313 matches!(
1314 err,
1315 lance::Error::CommitConflict { .. }
1316 | lance::Error::RetryableCommitConflict { .. }
1317 | lance::Error::TooMuchWriteContention { .. }
1318 )
1319 })
1320}
1321
1322fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
1325 error.chain().any(|cause| cause.is::<ConflictExhausted>())
1326}
1327
1328#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1332pub struct TableSizes {
1333 pub sessions: u64,
1334 pub messages: u64,
1335 pub parts: u64,
1336 pub other: u64,
1337 pub sessions_data: DataLiveness,
1338 pub messages_data: DataLiveness,
1339 pub parts_data: DataLiveness,
1340}
1341
1342#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1345pub struct DataLiveness {
1346 pub on_disk: u64,
1347 pub live: Option<u64>,
1349}
1350
1351impl DataLiveness {
1352 pub fn dead(&self) -> Option<u64> {
1353 self.live.map(|live| self.on_disk.saturating_sub(live))
1354 }
1355}
1356
1357#[derive(Debug, Clone, PartialEq, Eq)]
1358pub enum ScalarValue {
1359 String(String),
1360 Int32(i32),
1361 Raw(String),
1362}
1363impl From<&str> for ScalarValue {
1364 fn from(value: &str) -> Self {
1365 Self::String(value.to_owned())
1366 }
1367}
1368impl From<String> for ScalarValue {
1369 fn from(value: String) -> Self {
1370 Self::String(value)
1371 }
1372}
1373impl From<i32> for ScalarValue {
1374 fn from(value: i32) -> Self {
1375 Self::Int32(value)
1376 }
1377}
1378#[derive(Debug, Clone, PartialEq, Eq)]
1379pub enum Predicate {
1380 Eq(&'static str, ScalarValue),
1381 Ne(&'static str, ScalarValue),
1382 IsNull(&'static str),
1383 IsNotNull(&'static str),
1384 In(&'static str, Vec<ScalarValue>),
1385 LikeContains(&'static str, String),
1386 Regex(&'static str, String),
1391 Gte(&'static str, ScalarValue),
1392 Lte(&'static str, ScalarValue),
1393 And(Vec<Predicate>),
1394 Or(Vec<Predicate>),
1395 Not(Box<Predicate>),
1396}
1397impl Predicate {
1398 pub fn to_lance(&self) -> String {
1399 match self {
1400 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
1401 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
1402 Self::IsNull(column) => format!("{column} IS NULL"),
1403 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
1404 Self::In(column, values) => {
1405 let values = values
1406 .iter()
1407 .map(ScalarValue::to_lance)
1408 .collect::<Vec<_>>()
1409 .join(", ");
1410 format!("{column} IN ({values})")
1411 }
1412 Self::LikeContains(column, value) => {
1413 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
1414 }
1415 Self::Regex(column, pattern) => {
1416 format!("regexp_like({column}, {})", quoted_string(pattern))
1417 }
1418 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
1419 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
1420 Self::And(predicates) => predicates
1421 .iter()
1422 .map(Self::to_lance)
1423 .filter(|predicate| !predicate.is_empty())
1424 .collect::<Vec<_>>()
1425 .join(" AND "),
1426 Self::Or(predicates) => {
1427 let body = predicates
1430 .iter()
1431 .map(Self::to_lance)
1432 .filter(|predicate| !predicate.is_empty())
1433 .collect::<Vec<_>>()
1434 .join(" OR ");
1435 if body.is_empty() {
1436 String::new()
1437 } else {
1438 format!("({body})")
1439 }
1440 }
1441 Self::Not(inner) => {
1442 let body = inner.to_lance();
1443 if body.is_empty() {
1444 String::new()
1445 } else {
1446 format!("NOT ({body})")
1447 }
1448 }
1449 }
1450 }
1451}
1452#[derive(Default)]
1455pub struct ScanOpts<'a> {
1456 pub predicate: Option<&'a Predicate>,
1457 pub projection: Option<&'a [&'a str]>,
1458}
1459
1460impl<'a> ScanOpts<'a> {
1461 pub fn project_only(projection: &'a [&'a str]) -> Self {
1462 Self {
1463 predicate: None,
1464 projection: Some(projection),
1465 }
1466 }
1467 pub fn with_predicate_and_projection(
1468 predicate: &'a Predicate,
1469 projection: &'a [&'a str],
1470 ) -> Self {
1471 Self {
1472 predicate: Some(predicate),
1473 projection: Some(projection),
1474 }
1475 }
1476}
1477
1478impl ScalarValue {
1479 fn to_lance(&self) -> String {
1480 match self {
1481 Self::String(value) => quoted_string(value),
1482 Self::Int32(value) => value.to_string(),
1483 Self::Raw(value) => value.clone(),
1484 }
1485 }
1486}
1487#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1491pub struct RuntimeCaps {
1492 pub index_cache_bytes: Option<usize>,
1493 pub metadata_cache_bytes: Option<usize>,
1494}
1495
1496impl RuntimeCaps {
1497 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
1498 Self {
1499 index_cache_bytes: config.index_cache_bytes,
1500 metadata_cache_bytes: config.metadata_cache_bytes,
1501 }
1502 }
1503}
1504
1505const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
1509const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
1510const REMOTE_INDEX_CACHE_BYTES: usize = 1024 * 1024 * 1024;
1515const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
1516
1517fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
1518 let (index_default, metadata_default) = if config::is_local(location) {
1519 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
1520 } else {
1521 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
1522 };
1523 (
1524 caps.index_cache_bytes.unwrap_or(index_default),
1525 caps.metadata_cache_bytes.unwrap_or(metadata_default),
1526 )
1527}
1528
1529pub struct Handle {
1530 datasets: DatasetSet,
1531 retry: RetryPolicy,
1532 #[allow(dead_code)]
1540 session: Arc<Session>,
1541 nm: Arc<dyn LanceNamespace>,
1545 nm_ident: NamespaceIdent,
1549 storage_options: HashMap<String, String>,
1554 location: Url,
1558 parts_refresh_after: Duration,
1562}
1563
1564impl std::fmt::Debug for Handle {
1565 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1566 formatter
1567 .debug_struct("Handle")
1568 .field("datasets", &self.datasets)
1569 .field("retry", &self.retry)
1570 .field("nm_ident", &self.nm_ident)
1571 .field("storage_options", &self.storage_options)
1572 .field("location", &self.location)
1573 .finish()
1574 }
1575}
1576
1577#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1578pub enum Table {
1579 Sessions,
1580 Messages,
1581 Parts,
1582}
1583impl Table {
1584 pub fn as_str(self) -> &'static str {
1585 self.label()
1586 }
1587
1588 fn label(self) -> &'static str {
1589 match self {
1590 Self::Sessions => "sessions",
1591 Self::Messages => "messages",
1592 Self::Parts => "parts",
1593 }
1594 }
1595}
1596#[derive(Debug)]
1597struct DatasetSet {
1598 sessions: Mutex<CachedDataset>,
1599 messages: Mutex<CachedDataset>,
1600 parts: OnceCell<Mutex<CachedDataset>>,
1608}
1609#[derive(Debug)]
1610struct CachedDataset {
1611 dataset: Dataset,
1612 last_refresh: Instant,
1613 refresh_after: Duration,
1614}
1615impl CachedDataset {
1616 async fn latest(&mut self) -> Result<Dataset> {
1617 if self.last_refresh.elapsed() >= self.refresh_after {
1618 self.dataset.checkout_latest().await?;
1619 self.last_refresh = Instant::now();
1620 }
1621 Ok(self.dataset.clone())
1622 }
1623 fn replace(&mut self, dataset: Dataset) {
1624 self.dataset = dataset;
1625 self.last_refresh = Instant::now();
1626 }
1627}
1628
1629#[derive(Debug, Clone, Copy, Default)]
1634pub struct AppendStats {
1635 pub rows: u64,
1636 pub bytes_written: u64,
1637 pub files_written: u64,
1638 pub attempts: u32,
1639}
1640
1641#[derive(Default)]
1647struct WriteAccum {
1648 rows: std::sync::atomic::AtomicU64,
1649 bytes: std::sync::atomic::AtomicU64,
1650 files: std::sync::atomic::AtomicU64,
1651}
1652
1653impl WriteAccum {
1654 fn observe(&self, stats: &WriteStats) {
1655 use std::sync::atomic::Ordering::Relaxed;
1656 self.rows.fetch_max(stats.rows_written, Relaxed);
1657 self.bytes.fetch_max(stats.bytes_written, Relaxed);
1658 self.files.fetch_max(stats.files_written as u64, Relaxed);
1659 }
1660 fn rows(&self) -> u64 {
1661 self.rows.load(std::sync::atomic::Ordering::Relaxed)
1662 }
1663 fn bytes(&self) -> u64 {
1664 self.bytes.load(std::sync::atomic::Ordering::Relaxed)
1665 }
1666 fn files(&self) -> u64 {
1667 self.files.load(std::sync::atomic::Ordering::Relaxed)
1668 }
1669}
1670
1671fn append_write_params() -> WriteParams {
1676 let mut params = sessions::write_params_for_create();
1677 params.mode = WriteMode::Append;
1678 params.max_bytes_per_file = TARGET_FRAGMENT_BYTES as usize;
1679 params
1680}
1681
1682impl Handle {
1683 pub async fn open(location: &Url) -> Result<Self> {
1686 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
1687 }
1688
1689 pub fn lance_cache_bytes(&self) -> u64 {
1692 self.session.size_bytes()
1693 }
1694
1695 pub async fn open_with_options(
1701 location: &Url,
1702 mut storage_options: HashMap<String, String>,
1703 caps: RuntimeCaps,
1704 ) -> Result<Self> {
1705 if let Some(path) = config::local_path(location) {
1706 tokio::fs::create_dir_all(&path).await.with_context(|| {
1707 format!(
1708 "failed to create data dir {}; fix the storage destination ([storage].path in config) or re-run `pond init`",
1709 path.display()
1710 )
1711 })?;
1712 } else {
1713 apply_remote_storage_defaults(&mut storage_options);
1714 }
1715 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
1721 let session = Arc::new(Session::new(
1722 index_cache_bytes,
1723 metadata_cache_bytes,
1724 Arc::new(ObjectStoreRegistry::default()),
1725 ));
1726 let root = location.as_str().trim_end_matches('/').to_string();
1732 let mut connect = ConnectBuilder::new("dir")
1733 .property("root", root)
1734 .session(session.clone());
1735 for (key, value) in &storage_options {
1739 connect = connect.property(format!("storage.{key}"), value.clone());
1740 }
1741 let nm: Arc<dyn LanceNamespace> = connect
1742 .connect()
1743 .await
1744 .context("failed to connect lance Directory namespace")?;
1745 let nm_ident = NamespaceIdent::root();
1746 let refresh_after = if config::is_local(location) {
1752 Duration::ZERO
1753 } else {
1754 Duration::from_secs(5)
1755 };
1756 let handle = Self {
1757 datasets: DatasetSet {
1758 sessions: Mutex::new(CachedDataset {
1759 dataset: open_or_create_via_ns(
1760 &nm,
1761 &nm_ident,
1762 sessions::SESSIONS,
1763 sessions::session_schema(),
1764 &session,
1765 &storage_options,
1766 )
1767 .await?,
1768 last_refresh: Instant::now(),
1769 refresh_after,
1770 }),
1771 messages: Mutex::new(CachedDataset {
1772 dataset: open_or_create_via_ns(
1773 &nm,
1774 &nm_ident,
1775 sessions::MESSAGES,
1776 sessions::message_schema(),
1777 &session,
1778 &storage_options,
1779 )
1780 .await?,
1781 last_refresh: Instant::now(),
1782 refresh_after,
1783 }),
1784 parts: OnceCell::new(),
1785 },
1786 retry: RetryPolicy::default(),
1787 session,
1788 nm,
1789 nm_ident,
1790 storage_options,
1791 location: location.clone(),
1792 parts_refresh_after: refresh_after,
1793 };
1794 Ok(handle)
1795 }
1796
1797 pub fn location(&self) -> &Url {
1798 &self.location
1799 }
1800
1801 pub fn storage_options(&self) -> &HashMap<String, String> {
1805 &self.storage_options
1806 }
1807
1808 fn export_uri(&self, name: &str) -> String {
1814 format!(
1815 "{}/exports/{name}",
1816 self.location.as_str().trim_end_matches('/')
1817 )
1818 }
1819
1820 fn object_store_params(&self) -> ObjectStoreParams {
1824 ObjectStoreParams {
1825 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1826 Arc::new(StorageOptionsAccessor::with_static_options(
1827 self.storage_options.clone(),
1828 ))
1829 }),
1830 ..Default::default()
1831 }
1832 }
1833
1834 pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1837 let uri = self.export_uri(name);
1838 let registry = Arc::new(ObjectStoreRegistry::default());
1839 let (store, path) =
1840 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1841 .await
1842 .with_context(|| format!("failed to open object store for {uri}"))?;
1843 store
1844 .put(&path, bytes)
1845 .await
1846 .with_context(|| format!("failed to write export {uri}"))?;
1847 Ok(())
1848 }
1849
1850 pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1853 let uri = self.export_uri(name);
1854 let registry = Arc::new(ObjectStoreRegistry::default());
1855 let (store, path) =
1856 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1857 .await
1858 .with_context(|| format!("failed to open object store for {uri}"))?;
1859 let bytes = store
1860 .read_one_all(&path)
1861 .await
1862 .with_context(|| format!("failed to read export {uri}"))?;
1863 Ok(bytes.to_vec())
1864 }
1865
1866 pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1871 if self.location.scheme() != "file" {
1872 return None;
1873 }
1874 let dir = self.location.to_file_path().ok()?;
1875 Some(dir.join("exports").join(name))
1876 }
1877
1878 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1879 Ok((
1880 self.count_rows(Table::Sessions).await?,
1881 self.count_rows(Table::Messages).await?,
1882 self.count_rows(Table::Parts).await?,
1883 ))
1884 }
1885
1886 pub(crate) async fn merge_insert(
1890 &self,
1891 table: Table,
1892 batch: RecordBatch,
1893 row_count: usize,
1894 ) -> Result<u64> {
1895 self.merge_insert_stats(table, batch, row_count)
1896 .await
1897 .map(|stats| stats.num_inserted_rows + stats.num_updated_rows)
1898 }
1899
1900 pub(crate) async fn merge_insert_stats(
1905 &self,
1906 table: Table,
1907 batch: RecordBatch,
1908 row_count: usize,
1909 ) -> Result<MergeStats> {
1910 self.merge(
1911 table,
1912 batch,
1913 row_count,
1914 "merge_insert",
1915 WhenMatched::DoNothing,
1916 WhenNotMatched::InsertAll,
1917 )
1918 .await
1919 }
1920
1921 pub(crate) async fn merge_update(
1924 &self,
1925 table: Table,
1926 batch: RecordBatch,
1927 row_count: usize,
1928 ) -> Result<u64> {
1929 self.merge(
1930 table,
1931 batch,
1932 row_count,
1933 "merge_update",
1934 WhenMatched::UpdateAll,
1935 WhenNotMatched::DoNothing,
1936 )
1937 .await
1938 .map(|stats| stats.num_inserted_rows + stats.num_updated_rows)
1939 }
1940
1941 async fn write_committed<E, Fut, P>(&self, table: Table, execute: E) -> Result<P>
1950 where
1951 E: Fn(Arc<Dataset>) -> Fut,
1952 Fut: std::future::Future<Output = Result<(Dataset, P)>>,
1953 {
1954 self.retry_lance(table.label(), || {
1955 let execute = &execute;
1956 async move {
1957 let mut cached = self.cached(table).await?.lock().await;
1958 let existing = cached.latest().await?;
1959 let (dataset, payload) = execute(Arc::new(existing)).await?;
1960 cached.replace(dataset);
1961 Ok(payload)
1962 }
1963 })
1964 .await
1965 }
1966
1967 async fn merge(
1973 &self,
1974 table: Table,
1975 batch: RecordBatch,
1976 row_count: usize,
1977 op: &'static str,
1978 when_matched: WhenMatched,
1979 when_not_matched: WhenNotMatched,
1980 ) -> Result<MergeStats> {
1981 if row_count == 0 {
1982 return Ok(MergeStats::default());
1983 }
1984 let started = Instant::now();
1985 let result = self
1986 .write_committed(table, |existing| {
1987 let batch = batch.clone();
1988 let when_matched = when_matched.clone();
1989 let when_not_matched = when_not_matched.clone();
1990 async move {
1991 let schema = batch.schema();
1992 let reader = RecordBatchIterator::new([Ok(batch)], schema);
1993 let mut builder = MergeInsertBuilder::try_new(existing, Vec::new())?;
1994 builder.when_matched(when_matched);
1995 builder.when_not_matched(when_not_matched);
1996 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1999 builder.skip_auto_cleanup(true);
2003 let (dataset, stats) = builder
2004 .try_build()?
2005 .execute_reader(Box::new(reader))
2006 .await?;
2007 Ok((dataset.as_ref().clone(), stats))
2008 }
2009 })
2010 .await;
2011 let skipped = result
2012 .as_ref()
2013 .map(|s| s.num_skipped_duplicates)
2014 .unwrap_or(0);
2015 tracing::info!(
2016 target: "pond::perf",
2017 op,
2018 table = %table.label(),
2019 rows = row_count,
2020 elapsed_ms = started.elapsed().as_millis() as u64,
2021 skipped,
2022 "merge",
2023 );
2024 result
2025 }
2026
2027 pub(crate) async fn append_stream<F, Fut>(
2040 &self,
2041 table: Table,
2042 make_source: F,
2043 ) -> Result<AppendStats>
2044 where
2045 F: Fn() -> Fut,
2046 Fut: std::future::Future<Output = Result<SendableRecordBatchStream>>,
2047 {
2048 let cum = Arc::new(WriteAccum::default());
2049 let attempts = Arc::new(std::sync::atomic::AtomicU32::new(0));
2050 let started = Instant::now();
2051 self.write_committed(table, |existing| {
2052 let make_source = &make_source;
2053 let cum = cum.clone();
2054 let attempts = attempts.clone();
2055 async move {
2056 attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2057 let stream = make_source().await?;
2058 let dataset = InsertBuilder::new(existing)
2059 .with_params(&append_write_params())
2060 .progress(move |stats| cum.observe(&stats))
2061 .execute_stream(stream)
2062 .await?;
2063 Ok((dataset, ()))
2064 }
2065 })
2066 .await?;
2067
2068 let attempts = attempts.load(std::sync::atomic::Ordering::Relaxed);
2069 let stats = AppendStats {
2070 rows: cum.rows(),
2071 bytes_written: cum.bytes(),
2072 files_written: cum.files(),
2073 attempts,
2074 };
2075 tracing::info!(
2076 target: "pond::perf",
2077 op = "append",
2078 table = %table.label(),
2079 rows = stats.rows,
2080 files = stats.files_written,
2081 attempts,
2082 elapsed_ms = started.elapsed().as_millis() as u64,
2083 "append",
2084 );
2085 Ok(stats)
2086 }
2087
2088 pub(crate) async fn append_batches(
2092 &self,
2093 table: Table,
2094 batches: Vec<RecordBatch>,
2095 ) -> Result<AppendStats> {
2096 let total_rows: u64 = batches.iter().map(|batch| batch.num_rows() as u64).sum();
2097 if total_rows == 0 {
2098 return Ok(AppendStats::default());
2099 }
2100 let cum = Arc::new(WriteAccum::default());
2101 let attempts = Arc::new(std::sync::atomic::AtomicU32::new(0));
2102 let started = Instant::now();
2103 self.write_committed(table, |existing| {
2104 let cum = cum.clone();
2105 let attempts = attempts.clone();
2106 let batches = batches.clone();
2107 async move {
2108 attempts.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2109 let dataset = InsertBuilder::new(existing)
2110 .with_params(&append_write_params())
2111 .progress(move |stats| cum.observe(&stats))
2112 .execute(batches)
2113 .await?;
2114 Ok((dataset, ()))
2115 }
2116 })
2117 .await?;
2118
2119 let attempts = attempts.load(std::sync::atomic::Ordering::Relaxed);
2120 let stats = AppendStats {
2121 rows: total_rows,
2122 bytes_written: cum.bytes(),
2123 files_written: cum.files(),
2124 attempts,
2125 };
2126 tracing::info!(
2127 target: "pond::perf",
2128 op = "append_batches",
2129 table = %table.label(),
2130 rows = stats.rows,
2131 files = stats.files_written,
2132 attempts,
2133 elapsed_ms = started.elapsed().as_millis() as u64,
2134 "append",
2135 );
2136 Ok(stats)
2137 }
2138
2139 pub async fn optimize_table(
2148 &self,
2149 table: Table,
2150 intents: &[IndexIntent],
2151 progress: Option<&OptimizeProgressFn>,
2152 policy: &MaintenancePolicy,
2153 ) -> TableOptimizeOutcome {
2154 let compaction = self
2155 .run_optimize_compact_phase(table, progress, policy)
2156 .await;
2157 let indices = self
2158 .run_optimize_indices_phase(table, intents, progress)
2159 .await;
2160 TableOptimizeOutcome {
2161 table,
2162 indices,
2163 compaction,
2164 }
2165 }
2166
2167 pub async fn optimize_table_indices_only(
2172 &self,
2173 table: Table,
2174 intents: &[IndexIntent],
2175 progress: Option<&OptimizeProgressFn>,
2176 ) -> PhaseOutcome {
2177 self.run_optimize_indices_phase(table, intents, progress)
2178 .await
2179 }
2180
2181 async fn run_optimize_indices_phase(
2182 &self,
2183 table: Table,
2184 intents: &[IndexIntent],
2185 progress: Option<&OptimizeProgressFn>,
2186 ) -> PhaseOutcome {
2187 if intents.is_empty() {
2188 return PhaseOutcome::Noop;
2189 }
2190 let result = self
2191 .retry_lance(table.label(), || async {
2192 let mut guard = self.cached(table).await?.lock().await;
2193 let mut dataset = guard.latest().await?;
2194 let did_work =
2195 optimize_table_indices(&mut dataset, intents, table, progress).await?;
2196 guard.replace(dataset);
2197 Ok::<_, anyhow::Error>(did_work)
2198 })
2199 .await;
2200 match result {
2201 Ok(true) => PhaseOutcome::Ok,
2202 Ok(false) => PhaseOutcome::Noop,
2203 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
2204 Err(error) => PhaseOutcome::Failed(error),
2205 }
2206 }
2207
2208 async fn run_optimize_compact_phase(
2209 &self,
2210 table: Table,
2211 progress: Option<&OptimizeProgressFn>,
2212 policy: &MaintenancePolicy,
2213 ) -> PhaseOutcome {
2214 let result = self
2215 .retry_lance(table.label(), || async {
2216 let mut guard = self.cached(table).await?.lock().await;
2217 let mut dataset = guard.latest().await?;
2218 optimize_table_compact(&mut dataset, table, progress, policy).await?;
2219 guard.replace(dataset);
2220 Ok::<_, anyhow::Error>(())
2221 })
2222 .await;
2223 match result {
2224 Ok(()) => PhaseOutcome::Ok,
2225 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
2226 Err(error) => PhaseOutcome::Failed(error),
2227 }
2228 }
2229
2230 pub async fn rebuild_index(
2231 &self,
2232 table: Table,
2233 intent: &IndexIntent,
2234 progress: Option<&OptimizeProgressFn>,
2235 ) -> Result<()> {
2236 emit(
2237 progress,
2238 OptimizeEvent::PhaseStart {
2239 table,
2240 phase: OptimizePhase::IndexRebuild,
2241 detail: Some(intent.name.to_owned()),
2242 },
2243 );
2244 let started = Instant::now();
2245 let result = self
2246 .retry_lance(table.label(), || async {
2247 let mut guard = self.cached(table).await?.lock().await;
2248 let mut dataset = guard.latest().await?;
2249 rebuild_index(&mut dataset, intent, progress, table).await?;
2250 guard.replace(dataset);
2251 Ok(())
2252 })
2253 .await;
2254 emit(
2255 progress,
2256 OptimizeEvent::PhaseDone {
2257 table,
2258 phase: OptimizePhase::IndexRebuild,
2259 elapsed_ms: started.elapsed().as_millis() as u64,
2260 },
2261 );
2262 result
2263 }
2264
2265 pub async fn cleanup_table_versions(
2269 &self,
2270 table: Table,
2271 older_than: chrono::Duration,
2272 ) -> Result<()> {
2273 let mut guard = self.cached(table).await?.lock().await;
2274 let dataset = guard.latest().await?;
2275 dataset
2276 .cleanup_old_versions(older_than, Some(false), Some(false))
2277 .await
2278 .with_context(|| format!("cleanup_old_versions failed for {}", table.label()))?;
2279 Ok(())
2280 }
2281
2282 pub async fn index_status(
2283 &self,
2284 table: Table,
2285 intents: &[IndexIntent],
2286 ) -> Result<Vec<IndexStatus>> {
2287 let dataset = self.dataset(table).await?;
2288 index_status(table, &dataset, intents).await
2289 }
2290
2291 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
2292 let mut cached = self.cached(table).await?.lock().await;
2293 cached.latest().await
2294 }
2295 pub(crate) async fn scanner(
2300 &self,
2301 table: Table,
2302 predicate: Option<&Predicate>,
2303 ) -> Result<lance::dataset::scanner::Scanner> {
2304 let dataset = self.dataset(table).await?;
2305 scanner_with_prefilter(&dataset, predicate)
2306 }
2307 pub async fn scan(
2310 &self,
2311 table: Table,
2312 opts: ScanOpts<'_>,
2313 ) -> Result<lance::dataset::scanner::Scanner> {
2314 let mut scanner = self.scanner(table, opts.predicate).await?;
2315 if let Some(projection) = opts.projection {
2316 scanner.project(projection)?;
2317 }
2318 Ok(scanner)
2319 }
2320 pub(crate) async fn scan_batch(
2321 &self,
2322 table: Table,
2323 predicate: Option<&Predicate>,
2324 projection: &[&str],
2325 ) -> Result<RecordBatch> {
2326 let opts = ScanOpts {
2327 predicate,
2328 projection: (!projection.is_empty()).then_some(projection),
2329 };
2330 self.scan(table, opts)
2331 .await?
2332 .try_into_batch()
2333 .await
2334 .context("scan failed")
2335 }
2336 pub async fn count_rows(&self, table: Table) -> Result<usize> {
2337 self.dataset(table)
2338 .await?
2339 .count_rows(None)
2340 .await
2341 .map_err(Into::into)
2342 }
2343 pub async fn collect_ids(&self, table: Table) -> Result<std::collections::HashSet<String>> {
2349 let batch = self.scan_batch(table, None, &["id"]).await?;
2350 let ids = batch
2351 .column_by_name("id")
2352 .context("scan projection dropped the id column")?
2353 .as_any()
2354 .downcast_ref::<StringArray>()
2355 .context("id column is not Utf8")?;
2356 Ok(ids.iter().flatten().map(str::to_owned).collect())
2357 }
2358 #[cfg(test)]
2360 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
2361 let dataset = self.dataset(Table::Messages).await?;
2362 let indices = dataset.load_indices().await?;
2363 Ok(indices.iter().map(|index| index.name.clone()).collect())
2364 }
2365
2366 pub(crate) async fn messages_has_index(&self, name: &str) -> Result<bool> {
2372 let dataset = self.dataset(Table::Messages).await?;
2373 let indices = dataset.load_indices().await?;
2374 Ok(indices.iter().any(|index| index.name == name))
2375 }
2376
2377 pub(crate) async fn unindexed_row_count(
2380 &self,
2381 table: Table,
2382 index_name: &str,
2383 ) -> Result<usize> {
2384 let dataset = self.dataset(table).await?;
2385 let fragments = dataset
2386 .unindexed_fragments(index_name)
2387 .await
2388 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
2389 Ok(fragments
2390 .iter()
2391 .map(|fragment| fragment.num_rows().unwrap_or(0))
2392 .sum())
2393 }
2394
2395 pub(crate) async fn find_index_owner(&self, name: &str) -> Result<Option<Table>> {
2402 let list = |table: Table| async move {
2403 let dataset = self.dataset(table).await?;
2404 let names: Vec<String> = dataset
2405 .load_indices()
2406 .await
2407 .with_context(|| format!("load_indices failed for {}", table.label()))?
2408 .iter()
2409 .map(|index| index.name.clone())
2410 .collect();
2411 Ok::<_, anyhow::Error>(names)
2412 };
2413 let (sessions, messages, parts) = tokio::try_join!(
2414 list(Table::Sessions),
2415 list(Table::Messages),
2416 list(Table::Parts),
2417 )?;
2418 for (table, names) in [
2419 (Table::Sessions, sessions),
2420 (Table::Messages, messages),
2421 (Table::Parts, parts),
2422 ] {
2423 if names.iter().any(|n| n == name) {
2424 return Ok(Some(table));
2425 }
2426 }
2427 Ok(None)
2428 }
2429
2430 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
2436 let mut guard = self.cached(table).await?.lock().await;
2437 let mut dataset = guard.latest().await?;
2438 dataset
2439 .drop_index(name)
2440 .await
2441 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
2442 guard.replace(dataset);
2443 Ok(())
2444 }
2445
2446 async fn table_location(&self, table_name: &str) -> Result<String> {
2449 let request = DescribeTableRequest {
2450 id: Some(self.nm_ident.as_table_id(table_name)),
2451 ..Default::default()
2452 };
2453 let response = self
2454 .nm
2455 .describe_table(request)
2456 .await
2457 .with_context(|| format!("failed to describe table {table_name}"))?;
2458 response
2459 .location
2460 .with_context(|| format!("namespace returned no location for table {table_name}"))
2461 }
2462
2463 pub async fn initialized(&self) -> Result<bool> {
2469 let request = DescribeTableRequest {
2470 id: Some(self.nm_ident.as_table_id(sessions::PARTS)),
2471 ..Default::default()
2472 };
2473 match self.nm.describe_table(request).await {
2474 Ok(_) => Ok(true),
2475 Err(error) if is_namespace_error_code(&error, ErrorCode::TableNotFound) => Ok(false),
2476 Err(error) => {
2477 Err(anyhow::Error::from(error)).context("failed to probe table existence")
2478 }
2479 }
2480 }
2481
2482 pub async fn table_sizes(&self) -> Result<TableSizes> {
2486 let registry = Arc::new(ObjectStoreRegistry::default());
2487 let params = self.object_store_params();
2488
2489 let sessions = self
2490 .listed_size(
2491 ®istry,
2492 ¶ms,
2493 &self.table_location(sessions::SESSIONS).await?,
2494 )
2495 .await?;
2496 let messages = self
2497 .listed_size(
2498 ®istry,
2499 ¶ms,
2500 &self.table_location(sessions::MESSAGES).await?,
2501 )
2502 .await?;
2503 let parts = self
2504 .listed_size(
2505 ®istry,
2506 ¶ms,
2507 &self.table_location(sessions::PARTS).await?,
2508 )
2509 .await?;
2510 let root_total = self
2513 .listed_size(®istry, ¶ms, self.location.as_str())
2514 .await?;
2515 let other = root_total.saturating_sub(sessions + messages + parts);
2516 let sessions_data = self
2517 .data_liveness(®istry, ¶ms, Table::Sessions, sessions::SESSIONS)
2518 .await?;
2519 let messages_data = self
2520 .data_liveness(®istry, ¶ms, Table::Messages, sessions::MESSAGES)
2521 .await?;
2522 let parts_data = self
2523 .data_liveness(®istry, ¶ms, Table::Parts, sessions::PARTS)
2524 .await?;
2525 Ok(TableSizes {
2526 sessions,
2527 messages,
2528 parts,
2529 other,
2530 sessions_data,
2531 messages_data,
2532 parts_data,
2533 })
2534 }
2535
2536 async fn data_liveness(
2537 &self,
2538 registry: &Arc<ObjectStoreRegistry>,
2539 params: &ObjectStoreParams,
2540 table: Table,
2541 table_name: &str,
2542 ) -> Result<DataLiveness> {
2543 let location = self.table_location(table_name).await?;
2544 let data_dir = format!("{}/data", location.trim_end_matches('/'));
2545 let on_disk = self.listed_size(registry, params, &data_dir).await?;
2546 let dataset = self.dataset(table).await?;
2547 let live = dataset
2548 .get_fragments()
2549 .iter()
2550 .try_fold(0u64, |total, fragment| {
2551 Some(total + fragment_bytes(fragment.metadata())?)
2552 });
2553 Ok(DataLiveness { on_disk, live })
2554 }
2555
2556 async fn listed_size(
2558 &self,
2559 registry: &Arc<ObjectStoreRegistry>,
2560 params: &ObjectStoreParams,
2561 uri: &str,
2562 ) -> Result<u64> {
2563 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
2564 .await
2565 .with_context(|| format!("failed to open object store for {uri}"))?;
2566 let mut listing = store.list(Some(base));
2567 let mut total = 0u64;
2568 while let Some(meta) = listing.next().await {
2569 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
2570 total += meta.size;
2571 }
2572 Ok(total)
2573 }
2574 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
2575 match table {
2576 Table::Sessions => Ok(&self.datasets.sessions),
2577 Table::Messages => Ok(&self.datasets.messages),
2578 Table::Parts => self.parts_cached().await,
2579 }
2580 }
2581
2582 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
2585 self.datasets
2586 .parts
2587 .get_or_try_init(|| async {
2588 let dataset = open_or_create_via_ns(
2589 &self.nm,
2590 &self.nm_ident,
2591 sessions::PARTS,
2592 sessions::part_schema(),
2593 &self.session,
2594 &self.storage_options,
2595 )
2596 .await?;
2597 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
2598 dataset,
2599 last_refresh: Instant::now(),
2600 refresh_after: self.parts_refresh_after,
2601 }))
2602 })
2603 .await
2604 }
2605 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
2606 where
2607 Fut: std::future::Future<Output = Result<T>>,
2608 Op: FnMut() -> Fut,
2609 {
2610 let mut attempt = 0u8;
2611 loop {
2612 attempt = attempt.saturating_add(1);
2613 match operation().await {
2614 Ok(value) => return Ok(value),
2615 Err(error) if attempt < self.retry.attempts => {
2616 let backoff = self.backoff(attempt);
2617 let error_chain = format!("{error:#}");
2620 tracing::warn!(
2621 label,
2622 attempt,
2623 ?backoff,
2624 error = %error_chain,
2625 "retrying Lance operation"
2626 );
2627 tokio::time::sleep(backoff).await;
2628 }
2629 Err(error) => {
2630 let error_chain = format!("{error:#}");
2631 tracing::warn!(
2632 label,
2633 attempt,
2634 error = %error_chain,
2635 "Lance operation exhausted retries"
2636 );
2637 if is_commit_conflict(&error) {
2644 return Err(error.context(ConflictExhausted { attempts: attempt }));
2645 }
2646 return Err(error);
2647 }
2648 }
2649 }
2650 }
2651 fn backoff(&self, attempt: u8) -> Duration {
2652 let shift = u32::from(attempt.saturating_sub(1));
2653 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
2654 let base = self.retry.initial_backoff.saturating_mul(multiplier);
2655 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
2658 base.mul_f64(factor).min(self.retry.max_backoff)
2659 }
2660}
2661async fn optimize_table_compact(
2682 dataset: &mut Dataset,
2683 table: Table,
2684 progress: Option<&OptimizeProgressFn>,
2685 policy: &MaintenancePolicy,
2686) -> Result<()> {
2687 let stats: Vec<FragmentStat> = dataset
2688 .get_fragments()
2689 .iter()
2690 .map(|fragment| fragment_stat(fragment.metadata()))
2691 .collect();
2692 let compaction = CompactionOptions {
2693 target_rows_per_fragment: derived_target_rows(&stats),
2694 max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
2695 defer_index_remap: false,
2696 ..CompactionOptions::default()
2697 };
2698
2699 let mut plan = plan_compaction(dataset, &compaction).await?;
2700 if policy.compaction_fragment_cap > 0 {
2701 plan.tasks.retain(|task| {
2702 let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
2703 let keep = keep_task(
2704 &task_stats,
2705 policy.compaction_fragment_cap,
2706 compaction.materialize_deletions_threshold,
2707 );
2708 if !keep {
2709 tracing::debug!(
2710 target: "pond::perf",
2711 table = table.as_str(),
2712 fragments = task_stats.len(),
2713 "compaction task vetoed: merge dominated by one large fragment",
2714 );
2715 }
2716 keep
2717 });
2718 }
2719 if plan.tasks.is_empty() {
2720 tracing::debug!(
2721 target: "pond::perf",
2722 table = table.as_str(),
2723 "compaction skipped: no task to run",
2724 );
2725 } else {
2726 emit(
2727 progress,
2728 OptimizeEvent::PhaseStart {
2729 table,
2730 phase: OptimizePhase::Compact,
2731 detail: None,
2732 },
2733 );
2734 let started = Instant::now();
2735 let mut completed = Vec::with_capacity(plan.tasks.len());
2736 for task in plan.compaction_tasks() {
2737 completed.push(task.execute(dataset).await?);
2738 }
2739 commit_compaction(
2740 dataset,
2741 completed,
2742 Arc::new(DatasetIndexRemapperOptions::default()),
2743 &compaction,
2744 )
2745 .await?;
2746 emit(
2747 progress,
2748 OptimizeEvent::PhaseDone {
2749 table,
2750 phase: OptimizePhase::Compact,
2751 elapsed_ms: started.elapsed().as_millis() as u64,
2752 },
2753 );
2754 }
2755
2756 emit(
2760 progress,
2761 OptimizeEvent::PhaseStart {
2762 table,
2763 phase: OptimizePhase::Cleanup,
2764 detail: None,
2765 },
2766 );
2767 let started = Instant::now();
2768 dataset
2774 .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
2775 .await
2776 .context("cleanup_old_versions failed during index optimize")?;
2777 emit(
2778 progress,
2779 OptimizeEvent::PhaseDone {
2780 table,
2781 phase: OptimizePhase::Cleanup,
2782 elapsed_ms: started.elapsed().as_millis() as u64,
2783 },
2784 );
2785
2786 Ok(())
2787}
2788
2789async fn optimize_table_indices(
2792 dataset: &mut Dataset,
2793 intents: &[IndexIntent],
2794 table: Table,
2795 progress: Option<&OptimizeProgressFn>,
2796) -> Result<bool> {
2797 let existing = dataset.load_indices().await?;
2798 let existing_names: std::collections::HashSet<String> =
2799 existing.iter().map(|index| index.name.clone()).collect();
2800
2801 let mut append_indices: Vec<String> = Vec::new();
2802 let mut did_work = false;
2803
2804 for intent in intents {
2805 let exists = existing_names.contains(intent.name);
2806
2807 if !exists {
2808 if !intent.trigger.should_create(dataset).await? {
2809 continue;
2810 }
2811 let params = intent.params.build(dataset).await?;
2812 let index_type = intent.params.index_type();
2813 tracing::info!(
2814 index = intent.name,
2815 column = intent.column,
2816 "creating Lance index (trigger fired)",
2817 );
2818 emit(
2819 progress,
2820 OptimizeEvent::PhaseStart {
2821 table,
2822 phase: OptimizePhase::IndexCreate,
2823 detail: Some(intent.name.to_owned()),
2824 },
2825 );
2826 let started = Instant::now();
2827 dataset
2828 .create_index_builder(&[intent.column], index_type, params.as_ref())
2829 .name(intent.name.to_owned())
2830 .replace(false)
2831 .progress(lance_progress(progress, table, intent.name))
2832 .await
2833 .with_context(|| format!("failed to create index {}", intent.name))?;
2834 emit(
2835 progress,
2836 OptimizeEvent::PhaseDone {
2837 table,
2838 phase: OptimizePhase::IndexCreate,
2839 elapsed_ms: started.elapsed().as_millis() as u64,
2840 },
2841 );
2842 did_work = true;
2843 continue;
2844 }
2845
2846 let unindexed = dataset.unindexed_fragments(intent.name).await?;
2852 if unindexed.is_empty() {
2853 continue;
2854 }
2855 match intent.params {
2856 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
2857 let params = intent.params.build(dataset).await?;
2858 let index_type = intent.params.index_type();
2859 tracing::debug!(
2860 target: "pond::perf",
2861 index = intent.name,
2862 column = intent.column,
2863 "rebuilding Lance BTree index",
2864 );
2865 emit(
2866 progress,
2867 OptimizeEvent::PhaseStart {
2868 table,
2869 phase: OptimizePhase::IndexRebuild,
2870 detail: Some(intent.name.to_owned()),
2871 },
2872 );
2873 let started = Instant::now();
2874 dataset
2875 .create_index_builder(&[intent.column], index_type, params.as_ref())
2876 .name(intent.name.to_owned())
2877 .replace(true)
2878 .progress(lance_progress(progress, table, intent.name))
2879 .await
2880 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2881 emit(
2882 progress,
2883 OptimizeEvent::PhaseDone {
2884 table,
2885 phase: OptimizePhase::IndexRebuild,
2886 elapsed_ms: started.elapsed().as_millis() as u64,
2887 },
2888 );
2889 did_work = true;
2890 }
2891 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
2892 | IndexParamsKind::InvertedFtsWord
2893 | IndexParamsKind::IvfSqCosine { .. } => {
2894 append_indices.push(intent.name.to_owned());
2895 }
2896 IndexParamsKind::Scalar(_) => {
2897 let params = intent.params.build(dataset).await?;
2898 emit(
2899 progress,
2900 OptimizeEvent::PhaseStart {
2901 table,
2902 phase: OptimizePhase::IndexRebuild,
2903 detail: Some(intent.name.to_owned()),
2904 },
2905 );
2906 let started = Instant::now();
2907 dataset
2908 .create_index_builder(
2909 &[intent.column],
2910 intent.params.index_type(),
2911 params.as_ref(),
2912 )
2913 .name(intent.name.to_owned())
2914 .replace(true)
2915 .progress(lance_progress(progress, table, intent.name))
2916 .await
2917 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2918 emit(
2919 progress,
2920 OptimizeEvent::PhaseDone {
2921 table,
2922 phase: OptimizePhase::IndexRebuild,
2923 elapsed_ms: started.elapsed().as_millis() as u64,
2924 },
2925 );
2926 did_work = true;
2927 }
2928 }
2929 }
2930
2931 if !append_indices.is_empty() {
2932 let segment_count = |name: &str| {
2938 existing
2939 .iter()
2940 .filter(|index| index.name.as_str() == name)
2941 .count()
2942 };
2943 let (to_merge, to_append): (Vec<String>, Vec<String>) = append_indices
2944 .iter()
2945 .cloned()
2946 .partition(|name| segment_count(name) >= DELTA_MERGE_THRESHOLD);
2947
2948 emit(
2949 progress,
2950 OptimizeEvent::PhaseStart {
2951 table,
2952 phase: OptimizePhase::IndexAppend,
2953 detail: Some(append_indices.join(", ")),
2954 },
2955 );
2956 let started = Instant::now();
2957 if !to_append.is_empty() {
2958 dataset
2959 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
2960 .await
2961 .context("optimize_indices(append) failed during index optimize")?;
2962 }
2963 if !to_merge.is_empty() {
2964 dataset
2965 .optimize_indices(
2966 &OptimizeOptions::merge(DELTA_MERGE_THRESHOLD).index_names(to_merge),
2967 )
2968 .await
2969 .context("optimize_indices(merge) failed during index optimize")?;
2970 }
2971 emit(
2972 progress,
2973 OptimizeEvent::PhaseDone {
2974 table,
2975 phase: OptimizePhase::IndexAppend,
2976 elapsed_ms: started.elapsed().as_millis() as u64,
2977 },
2978 );
2979 tracing::debug!(
2980 target: "pond::perf",
2981 indices = ?append_indices,
2982 "folded trailing fragments into indices",
2983 );
2984 did_work = true;
2985 }
2986
2987 Ok(did_work)
2988}
2989
2990async fn rebuild_index(
2991 dataset: &mut Dataset,
2992 intent: &IndexIntent,
2993 progress: Option<&OptimizeProgressFn>,
2994 table: Table,
2995) -> Result<()> {
2996 if !intent.trigger.should_create(dataset).await? {
2997 return Ok(());
2998 }
2999 let params = intent.params.build(dataset).await?;
3000 dataset
3001 .create_index_builder(
3002 &[intent.column],
3003 intent.params.index_type(),
3004 params.as_ref(),
3005 )
3006 .name(intent.name.to_owned())
3007 .replace(true)
3008 .progress(lance_progress(progress, table, intent.name))
3009 .await
3010 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
3011 Ok(())
3012}
3013
3014async fn index_status(
3015 table: Table,
3016 dataset: &Dataset,
3017 intents: &[IndexIntent],
3018) -> Result<Vec<IndexStatus>> {
3019 let existing = dataset.load_indices().await?;
3020 let existing_names: std::collections::HashSet<String> =
3021 existing.iter().map(|index| index.name.clone()).collect();
3022 let total_fragments = dataset.get_fragments().len();
3023 let total_rows = dataset.count_rows(None).await?;
3024 let mut statuses = Vec::with_capacity(intents.len());
3025 for intent in intents {
3026 let exists = existing_names.contains(intent.name);
3027 if !exists {
3028 statuses.push(IndexStatus {
3029 table,
3030 intent_name: intent.name.to_owned(),
3031 fragments_covered: 0,
3032 unindexed_fragments: total_fragments,
3033 unindexed_rows: total_rows,
3034 exists,
3035 });
3036 continue;
3037 }
3038 let unindexed = dataset
3039 .unindexed_fragments(intent.name)
3040 .await
3041 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
3042 let unindexed_fragments = unindexed.len();
3043 let unindexed_rows = unindexed
3044 .iter()
3045 .map(|fragment| fragment.num_rows().unwrap_or(0))
3046 .sum();
3047 statuses.push(IndexStatus {
3048 table,
3049 intent_name: intent.name.to_owned(),
3050 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
3051 unindexed_fragments,
3052 unindexed_rows,
3053 exists,
3054 });
3055 }
3056 Ok(statuses)
3057}
3058
3059pub mod io_trace {
3077 use lance_io::utils::tracking_store::{IOTracker, IoStats};
3078 use std::sync::{Arc, OnceLock};
3079
3080 static TRACKER: OnceLock<IOTracker> = OnceLock::new();
3081
3082 pub fn enable() {
3085 let _ = TRACKER.set(IOTracker::default());
3086 }
3087
3088 pub(super) fn wrapper() -> Option<Arc<IOTracker>> {
3090 TRACKER.get().map(|tracker| Arc::new(tracker.clone()))
3091 }
3092
3093 pub fn take() -> Option<IoStats> {
3095 TRACKER.get().map(IOTracker::incremental_stats)
3096 }
3097}
3098
3099async fn open_or_create_via_ns(
3100 nm: &Arc<dyn LanceNamespace>,
3101 nm_ident: &NamespaceIdent,
3102 table_name: &str,
3103 schema: lance::deps::arrow_schema::SchemaRef,
3104 session: &Arc<Session>,
3105 storage_options: &HashMap<String, String>,
3106) -> Result<Dataset> {
3107 let table_id = nm_ident.as_table_id(table_name);
3108
3109 let request = DescribeTableRequest {
3110 id: Some(table_id.clone()),
3111 ..Default::default()
3112 };
3113 match nm.describe_table(request).await {
3114 Ok(response) => {
3115 let location = response.location.with_context(|| {
3116 format!("namespace returned no location for table {table_name}")
3117 })?;
3118 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
3119 match io_trace::wrapper() {
3120 Some(wrapper) => {
3121 builder = builder.with_store_params(ObjectStoreParams {
3122 object_store_wrapper: Some(wrapper),
3123 storage_options_accessor: (!storage_options.is_empty()).then(|| {
3124 Arc::new(StorageOptionsAccessor::with_static_options(
3125 storage_options.clone(),
3126 ))
3127 }),
3128 ..Default::default()
3129 });
3130 }
3131 None => {
3132 if !storage_options.is_empty() {
3133 builder = builder.with_storage_options(storage_options.clone());
3134 }
3135 }
3136 }
3137 let dataset = builder
3138 .load()
3139 .await
3140 .with_context(|| format!("failed to open table {table_name}"))?;
3141 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
3142 return Ok(dataset);
3143 }
3144 Err(error) => match &error {
3145 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
3146 }
3148 _ => {
3149 return Err(anyhow::Error::from(error))
3150 .with_context(|| format!("failed to describe table {table_name}"));
3151 }
3152 },
3153 }
3154
3155 let mut write_params = sessions::write_params_for_create();
3158 write_params.session = Some(session.clone());
3159 write_params.mode = WriteMode::Create;
3160 if !storage_options.is_empty() {
3161 write_params.store_params = Some(ObjectStoreParams {
3162 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
3163 storage_options.clone(),
3164 ))),
3165 ..Default::default()
3166 });
3167 }
3168 let reader = sessions::empty_reader(schema)?;
3169 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
3170 .await
3171 .with_context(|| format!("failed to create table {table_name}"))
3172}
3173
3174fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
3178 if !matches!(error, lance::Error::Namespace { .. }) {
3179 return false;
3180 }
3181 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
3182 link.source()
3183 })
3184 .filter_map(|link| link.downcast_ref::<NamespaceError>())
3185 .any(|inner| inner.code() == code)
3186}
3187
3188fn scanner_with_prefilter(
3189 dataset: &Dataset,
3190 predicate: Option<&Predicate>,
3191) -> Result<lance::dataset::scanner::Scanner> {
3192 let mut scanner = dataset.scan();
3193 scanner.prefilter(true);
3194 if let Some(predicate) = predicate {
3195 let filter = predicate.to_lance();
3196 if !filter.is_empty() {
3197 scanner.filter(&filter)?;
3198 }
3199 }
3200 Ok(scanner)
3201}
3202fn ensure_schema_matches(
3203 dataset: &Dataset,
3204 expected: &lance::deps::arrow_schema::Schema,
3205 table_name: &str,
3206) -> Result<()> {
3207 use lance::deps::arrow_schema::DataType;
3208 use std::collections::BTreeSet;
3209 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
3210 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
3211 let expected_names: BTreeSet<&str> = expected
3212 .fields()
3213 .iter()
3214 .map(|f| f.name().as_str())
3215 .collect();
3216 if actual_names != expected_names {
3217 anyhow::bail!(
3218 "table {table_name} has columns {actual_names:?} but this pond build expects \
3219 {expected_names:?} - the on-disk store predates a schema change; delete the \
3220 data directory and re-run `pond ingest`",
3221 );
3222 }
3223 for actual_field in actual.fields() {
3228 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
3229 continue;
3230 };
3231 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
3232 (actual_field.data_type(), expected_field.data_type())
3233 && actual_dim != expected_dim
3234 {
3235 tracing::warn!(
3236 table = table_name,
3237 column = actual_field.name(),
3238 actual_dim,
3239 expected_dim,
3240 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
3241 );
3242 }
3243 }
3244 Ok(())
3245}
3246fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
3253 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
3254 if aliases
3255 .iter()
3256 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
3257 {
3258 return;
3259 }
3260 options.insert(aliases[0].to_owned(), value.to_owned());
3261 }
3262 set_default(options, &["pool_idle_timeout"], "300 seconds");
3263 set_default(options, &["connect_timeout"], "10 seconds");
3264 set_default(options, &["request_timeout"], "60 seconds");
3271 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
3272 .iter()
3273 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
3274 if has_custom_endpoint {
3275 set_default(
3276 options,
3277 &["aws_unsigned_payload", "unsigned_payload"],
3278 "true",
3279 );
3280 }
3281}
3282
3283fn quoted_string(value: &str) -> String {
3284 format!("'{}'", value.replace('\'', "''"))
3285}
3286fn like_contains(value: &str) -> String {
3287 let escaped = value
3288 .replace('\\', "\\\\")
3289 .replace('%', "\\%")
3290 .replace('_', "\\_")
3291 .replace('\'', "''");
3292 format!("'%{escaped}%'")
3293}
3294
3295#[cfg(test)]
3296mod tests {
3297 #![allow(clippy::expect_used, clippy::unwrap_used)]
3298
3299 use super::*;
3300 use tempfile::TempDir;
3301
3302 fn set(scope: Option<&str>) -> CredsSet {
3303 CredsSet {
3304 scope: scope.map(str::to_owned),
3305 access_key_id: Some("AKIA".to_owned()),
3306 secret_access_key: Some("shh".to_owned()),
3307 ..CredsSet::default()
3308 }
3309 }
3310
3311 fn opts(resolved: &ResolvedStorage, key: &str) -> Option<String> {
3312 resolved.options.get(key).cloned()
3313 }
3314
3315 #[test]
3316 fn storage_url_translation_table() {
3317 let local = StorageUrl::parse("/srv/pond").unwrap();
3320 assert_eq!(local.lance_url().as_str(), "file:///srv/pond/");
3321 assert!(local.is_local());
3322 assert!(local.scheme_options.is_empty());
3323 let aws = StorageUrl::parse("s3://bucket/prefix").unwrap();
3325 assert_eq!(aws.lance_url().as_str(), "s3://bucket/prefix");
3326 assert!(aws.scheme_options.is_empty());
3327 let fat = StorageUrl::parse("s3+https://nbg1.example.com/my-pond/sub").unwrap();
3332 assert_eq!(fat.lance_url().as_str(), "s3://my-pond/sub");
3333 assert_eq!(
3334 fat.scheme_options,
3335 vec![
3336 ("allow_http", "false".to_owned()),
3337 ("virtual_hosted_style_request", "true".to_owned()),
3338 ("region", "us-east-1".to_owned()),
3339 ],
3340 );
3341 let resolved = fat.resolve(&BTreeMap::new()).unwrap();
3342 assert_eq!(
3343 opts(&resolved, "endpoint").as_deref(),
3344 Some("https://my-pond.nbg1.example.com"),
3345 );
3346 assert_eq!(opts(&resolved, "region").as_deref(), Some("us-east-1"));
3347 let plain = StorageUrl::parse("s3+http://127.0.0.1:9000/pond").unwrap();
3350 assert_eq!(plain.lance_url().as_str(), "s3://pond/");
3351 assert_eq!(plain.scheme_options[0], ("allow_http", "true".to_owned()));
3352 assert_eq!(
3353 plain.scheme_options[1],
3354 ("virtual_hosted_style_request", "false".to_owned()),
3355 );
3356 let resolved = plain.resolve(&BTreeMap::new()).unwrap();
3357 assert_eq!(
3358 opts(&resolved, "endpoint").as_deref(),
3359 Some("http://127.0.0.1:9000"),
3360 );
3361 let mut pinned = BTreeMap::new();
3363 pinned.insert(
3364 "default".to_owned(),
3365 CredsSet {
3366 extra: [(
3367 "endpoint".to_owned(),
3368 "https://pinned.example.com".to_owned(),
3369 )]
3370 .into_iter()
3371 .collect(),
3372 ..CredsSet::default()
3373 },
3374 );
3375 let resolved = fat.resolve(&pinned).unwrap();
3376 assert_eq!(
3377 opts(&resolved, "endpoint").as_deref(),
3378 Some("https://pinned.example.com"),
3379 );
3380 let gcs = StorageUrl::parse("gs://bucket/p").unwrap();
3382 assert_eq!(gcs.lance_url().as_str(), "gs://bucket/p");
3383 let azure = StorageUrl::parse("az://acct/container/p").unwrap();
3385 assert_eq!(azure.lance_url().as_str(), "az://container/p");
3386 assert_eq!(
3387 azure.scheme_options,
3388 vec![("account_name", "acct".to_owned())]
3389 );
3390 let shared = StorageUrl::parse("shared-memory://pond-test-x/").unwrap();
3392 assert_eq!(shared.lance_url().as_str(), "shared-memory://pond-test-x/");
3393 }
3394
3395 #[test]
3396 fn storage_url_rejects_bad_shapes() {
3397 let err = StorageUrl::parse("s3+https://user:pass@host/bucket")
3399 .expect_err("userinfo must be rejected")
3400 .to_string();
3401 assert!(
3402 err.contains("creds"),
3403 "error must name the alternative: {err}"
3404 );
3405 assert!(StorageUrl::parse("s3+https://host").is_err());
3407 assert!(StorageUrl::parse("az://acct").is_err());
3408 let err = StorageUrl::parse("ftp://host/x")
3410 .expect_err("ftp")
3411 .to_string();
3412 assert!(err.contains("s3+https"), "got: {err}");
3413 let err = StorageUrl::parse("s3://b/p?regoin=x")
3415 .expect_err("typo")
3416 .to_string();
3417 assert!(err.contains("regoin"), "got: {err}");
3418 let err = StorageUrl::parse("memory://x?creds=y")
3421 .expect_err("memory query")
3422 .to_string();
3423 assert!(err.contains("query params"), "got: {err}");
3424 let err = StorageUrl::parse("file:///x?creds=y")
3425 .expect_err("file query")
3426 .to_string();
3427 assert!(err.contains("query params"), "got: {err}");
3428 assert!(StorageUrl::parse("/tmp/a?b").is_ok());
3430 }
3431
3432 #[test]
3433 fn storage_url_canonicalizes_ports_and_keeps_percent_encoding() {
3434 let with_port = StorageUrl::parse("s3+https://host:443/bucket/p").unwrap();
3436 let without = StorageUrl::parse("s3+https://host/bucket/p").unwrap();
3437 assert_eq!(with_port.canonical(), without.canonical());
3438 let odd = StorageUrl::parse("s3+https://host:8443/bucket").unwrap();
3440 let resolved = odd.resolve(&BTreeMap::new()).unwrap();
3441 assert_eq!(
3442 resolved.options.get("endpoint").map(String::as_str),
3443 Some("https://bucket.host:8443"),
3444 );
3445 let encoded = StorageUrl::parse("s3+https://host/bucket/pre%20fix").unwrap();
3447 assert_eq!(encoded.lance_url().as_str(), "s3://bucket/pre%20fix");
3448 }
3449
3450 #[test]
3451 fn query_params_strip_and_apply_over_set_fields() {
3452 let mut creds = BTreeMap::new();
3453 creds.insert(
3454 "default".to_owned(),
3455 CredsSet {
3456 region: Some("from-set".to_owned()),
3457 virtual_hosted_style_request: Some(false),
3458 ..set(None)
3459 },
3460 );
3461 let url = StorageUrl::parse(
3462 "s3+https://host/bucket/p?region=from-query&virtual_hosted_style_request=true",
3463 )
3464 .unwrap();
3465 assert_eq!(url.lance_url().as_str(), "s3://bucket/p");
3467 assert!(url.canonical().query().is_none());
3468 let resolved = url.resolve(&creds).unwrap();
3469 assert_eq!(opts(&resolved, "region").as_deref(), Some("from-query"));
3471 assert_eq!(
3472 opts(&resolved, "virtual_hosted_style_request").as_deref(),
3473 Some("true"),
3474 );
3475 assert_eq!(
3477 opts(&resolved, "endpoint").as_deref(),
3478 Some("https://bucket.host"),
3479 );
3480 }
3481
3482 #[test]
3483 fn scope_matching_binds_by_longest_prefix_at_segment_boundaries() {
3484 let mut creds = BTreeMap::new();
3485 creds.insert("all".to_owned(), set(None));
3486 creds.insert("bucket".to_owned(), set(Some("s3+https://host/pond/")));
3487 creds.insert("deep".to_owned(), set(Some("s3+https://host/pond/sub")));
3488
3489 let bind = |input: &str| {
3490 StorageUrl::parse(input)
3491 .unwrap()
3492 .resolve(&creds)
3493 .unwrap()
3494 .binding
3495 };
3496 assert_eq!(
3498 bind("s3+https://host/pond/sub/x"),
3499 CredsBinding::Set {
3500 name: "deep".to_owned(),
3501 via: BindVia::Scope
3502 },
3503 );
3504 assert_eq!(
3505 bind("s3+https://host/pond/other"),
3506 CredsBinding::Set {
3507 name: "bucket".to_owned(),
3508 via: BindVia::Scope
3509 },
3510 );
3511 assert_eq!(
3513 bind("s3+https://host/pond-2"),
3514 CredsBinding::Set {
3515 name: "all".to_owned(),
3516 via: BindVia::CatchAll
3517 },
3518 );
3519 assert_eq!(
3521 bind("s3://pond/sub"),
3522 CredsBinding::Set {
3523 name: "all".to_owned(),
3524 via: BindVia::CatchAll
3525 },
3526 );
3527 assert_eq!(
3529 bind("s3+https://host:443/pond/x"),
3530 CredsBinding::Set {
3531 name: "bucket".to_owned(),
3532 via: BindVia::Scope
3533 },
3534 );
3535 assert_eq!(
3537 bind("s3+https://host/pond/sub/x?creds=all"),
3538 CredsBinding::Set {
3539 name: "all".to_owned(),
3540 via: BindVia::Pointer
3541 },
3542 );
3543 let err = StorageUrl::parse("s3://b/p?creds=nope")
3545 .unwrap()
3546 .resolve(&creds)
3547 .expect_err("missing set")
3548 .to_string();
3549 assert!(err.contains("creds=nope"), "got: {err}");
3550
3551 let empty = BTreeMap::new();
3553 assert_eq!(
3554 StorageUrl::parse("s3://b/p")
3555 .unwrap()
3556 .resolve(&empty)
3557 .unwrap()
3558 .binding,
3559 CredsBinding::Ambient,
3560 );
3561 assert_eq!(
3562 StorageUrl::parse("/srv/pond")
3563 .unwrap()
3564 .resolve(&creds)
3565 .unwrap()
3566 .binding,
3567 CredsBinding::NotApplicable,
3568 );
3569 }
3570
3571 #[test]
3572 fn unmatched_sets_are_reported_only_on_remote_invocations() {
3573 let mut creds = BTreeMap::new();
3574 creds.insert("used".to_owned(), set(Some("s3://bucket/")));
3575 creds.insert("idle".to_owned(), set(Some("s3://other/")));
3576
3577 let remote = StorageUrl::parse("s3://bucket/p")
3578 .unwrap()
3579 .resolve(&creds)
3580 .unwrap();
3581 assert_eq!(unmatched_creds_sets(&[&remote], &creds), vec!["idle"]);
3582
3583 let local = StorageUrl::parse("/srv/pond")
3585 .unwrap()
3586 .resolve(&creds)
3587 .unwrap();
3588 assert!(unmatched_creds_sets(&[&local], &creds).is_empty());
3589 }
3590
3591 #[test]
3592 fn secrets_materialize_from_file_and_command() {
3593 let dir = TempDir::new().unwrap();
3594 let key_path = dir.path().join("key");
3595 std::fs::write(&key_path, "from-file\n").unwrap();
3596 let mut creds = BTreeMap::new();
3597 creds.insert(
3598 "default".to_owned(),
3599 CredsSet {
3600 access_key_id_file: Some(key_path),
3601 secret_access_key_command: Some("printf 'from-command\\n\\n'".to_owned()),
3603 ..CredsSet::default()
3604 },
3605 );
3606 let url = StorageUrl::parse("s3://bucket/p").unwrap();
3607 let resolved = url.resolve(&creds).unwrap();
3608 assert_eq!(
3609 opts(&resolved, "access_key_id").as_deref(),
3610 Some("from-file")
3611 );
3612 assert_eq!(
3613 opts(&resolved, "secret_access_key").as_deref(),
3614 Some("from-command\n"),
3615 );
3616
3617 let mut failing = BTreeMap::new();
3619 failing.insert(
3620 "default".to_owned(),
3621 CredsSet {
3622 secret_access_key_command: Some("exit 3".to_owned()),
3623 ..CredsSet::default()
3624 },
3625 );
3626 let err = url
3627 .resolve(&failing)
3628 .expect_err("command must fail")
3629 .to_string();
3630 assert!(err.contains("exit 3"), "got: {err}");
3631
3632 let marker = dir.path().join("runs");
3634 let command = format!("echo run >> {} && echo secret", marker.display());
3635 let mut counted = BTreeMap::new();
3636 counted.insert(
3637 "default".to_owned(),
3638 CredsSet {
3639 secret_access_key_command: Some(command),
3640 ..CredsSet::default()
3641 },
3642 );
3643 url.resolve(&counted).unwrap();
3644 url.resolve(&counted).unwrap();
3645 let runs = std::fs::read_to_string(&marker).unwrap();
3646 assert_eq!(runs.lines().count(), 1, "command must run exactly once");
3647 }
3648
3649 #[test]
3650 fn check_errors_classify_by_kind_and_binding() {
3651 let auth_error = || object_store::Error::Unauthenticated {
3652 path: "k".to_owned(),
3653 source: "denied".into(),
3654 };
3655 let bound = CredsBinding::Set {
3656 name: "work".to_owned(),
3657 via: BindVia::Scope,
3658 };
3659 match classify_check_error(auth_error(), &bound, "put") {
3661 CheckFailure::Auth { set, .. } => assert_eq!(set, "work"),
3662 other => panic!("want Auth, got {other:?}"),
3663 }
3664 assert!(matches!(
3666 classify_check_error(auth_error(), &CredsBinding::Ambient, "put"),
3667 CheckFailure::NoCreds { .. },
3668 ));
3669 let denied = object_store::Error::PermissionDenied {
3670 path: "k".to_owned(),
3671 source: "403".into(),
3672 };
3673 assert!(matches!(
3674 classify_check_error(denied, &bound, "put"),
3675 CheckFailure::Auth { .. },
3676 ));
3677 let missing = object_store::Error::NotFound {
3679 path: "k".to_owned(),
3680 source: "404".into(),
3681 };
3682 assert!(matches!(
3683 classify_check_error(missing, &bound, "get"),
3684 CheckFailure::Io { .. },
3685 ));
3686 let no_creds = || object_store::Error::Generic {
3690 store: "S3",
3691 source: "Failed to get AWS credentials: CredentialsNotLoaded".into(),
3692 };
3693 assert!(matches!(
3694 classify_check_error(no_creds(), &bound, "put"),
3695 CheckFailure::Auth { .. },
3696 ));
3697 assert!(matches!(
3698 classify_check_error(no_creds(), &CredsBinding::Ambient, "put"),
3699 CheckFailure::NoCreds { .. },
3700 ));
3701 }
3702
3703 #[test]
3704 fn concise_cause_strips_upstream_noise_to_one_line() {
3705 let inner = "Encountered internal error. Please file a bug report at \
3708 https://github.com/lance-format/lance/issues. Failed to get AWS \
3709 credentials: CredentialsNotLoaded, <WORKSPACE>/src/object_store/providers/aws.rs:401:21: \
3710 Encountered internal error. Please file a bug report at \
3711 https://github.com/lance-format/lance/issues. Failed to get AWS \
3712 credentials: CredentialsNotLoaded";
3713 let failure = CheckFailure::NoCreds {
3714 source: anyhow!(inner.to_owned()).context("initial conditional put"),
3715 };
3716 let cause = failure.concise_cause().expect("auth-class carries a cause");
3717 assert_eq!(cause, "Failed to get AWS credentials: CredentialsNotLoaded");
3718 assert!(
3720 !failure.to_string().contains("file a bug report"),
3721 "lead must not trail the chain: {failure}"
3722 );
3723 let occ = CheckFailure::OccUnsupported {
3725 detail: "put-if-none-match ignored".to_owned(),
3726 };
3727 assert!(occ.concise_cause().is_none());
3728 let long = CheckFailure::Io {
3731 source: anyhow!(format!("{} dns error: lookup failed", "x".repeat(500))),
3732 };
3733 let cause = long.concise_cause().expect("io carries a cause");
3734 assert!(cause.contains(" ... "), "long causes truncate: {cause}");
3735 assert!(
3736 cause.ends_with("dns error: lookup failed"),
3737 "the tail survives: {cause}"
3738 );
3739 }
3740
3741 #[tokio::test]
3742 async fn storage_check_passes_on_memory_backend() {
3743 let resolved = StorageUrl::parse("memory://check/probe")
3744 .unwrap()
3745 .resolve(&BTreeMap::new())
3746 .unwrap();
3747 storage_check(&resolved).await.expect("memory probe passes");
3748 }
3749
3750 fn stat(bytes: u64) -> FragmentStat {
3751 FragmentStat {
3752 bytes: Some(bytes),
3753 rows: bytes / 1_000,
3754 deleted_rows: 0,
3755 }
3756 }
3757
3758 #[test]
3759 fn compaction_veto_blocks_absorb_keeps_peers() {
3760 let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
3762 assert!(!keep_task(&absorb, 64, 0.1));
3763 let peers = [stat(300_000_000), stat(300_000_000)];
3765 assert!(keep_task(&peers, 64, 0.1));
3766 let tiered = [stat(400_000), stat(60_000), stat(40_000)];
3768 assert!(keep_task(&tiered, 64, 0.1));
3769 }
3770
3771 #[test]
3772 fn compaction_veto_passes_deletions_and_cap() {
3773 let mut deleting = stat(665_000_000);
3774 deleting.deleted_rows = deleting.rows / 5;
3775 assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
3776
3777 let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
3778 .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
3779 .collect();
3780 assert!(keep_task(&wide, 64, 0.1));
3781 }
3782
3783 #[test]
3784 fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
3785 let mut unknown = stat(665_000_000);
3786 unknown.bytes = None;
3787 assert!(!keep_task(
3789 &[unknown, stat(1_000_000), stat(2_000_000)],
3790 64,
3791 0.1
3792 ));
3793 }
3794
3795 #[test]
3796 fn derived_target_rows_tracks_row_size_and_clamps() {
3797 let parts_like = [FragmentStat {
3799 bytes: Some(665_000_000),
3800 rows: 511_000,
3801 deleted_rows: 0,
3802 }];
3803 let target = derived_target_rows(&parts_like);
3804 assert!((150_000..300_000).contains(&target), "{target}");
3805 let unknown = [FragmentStat {
3807 bytes: None,
3808 rows: 511_000,
3809 deleted_rows: 0,
3810 }];
3811 assert_eq!(
3812 derived_target_rows(&unknown),
3813 MAX_TARGET_ROWS_PER_FRAGMENT as usize
3814 );
3815 let tiny = [FragmentStat {
3817 bytes: Some(1_000_000),
3818 rows: 100_000,
3819 deleted_rows: 0,
3820 }];
3821 assert_eq!(
3822 derived_target_rows(&tiny),
3823 MAX_TARGET_ROWS_PER_FRAGMENT as usize
3824 );
3825 let huge = [FragmentStat {
3826 bytes: Some(1_000_000_000),
3827 rows: 100,
3828 deleted_rows: 0,
3829 }];
3830 assert_eq!(
3831 derived_target_rows(&huge),
3832 MIN_TARGET_ROWS_PER_FRAGMENT as usize
3833 );
3834 }
3835
3836 #[test]
3837 fn namespace_error_code_walks_wrapped_chain() {
3838 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
3839 message: "missing".into(),
3840 }));
3841 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
3842
3843 let wrapped = lance::Error::namespace_source(Box::new(direct));
3844 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
3845
3846 let other_code =
3847 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
3848 message: "nope".into(),
3849 }));
3850 assert!(!is_namespace_error_code(
3851 &other_code,
3852 ErrorCode::TableNotFound
3853 ));
3854
3855 let not_namespace = lance::Error::internal("unrelated");
3856 assert!(!is_namespace_error_code(
3857 ¬_namespace,
3858 ErrorCode::TableNotFound
3859 ));
3860 }
3861
3862 #[tokio::test]
3866 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
3867 let temp = TempDir::new()?;
3868 let url = Url::from_directory_path(temp.path())
3869 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
3870 let handle = Handle::open(&url).await?;
3871 let cases: [(Table, &[&str]); 3] = [
3874 (Table::Sessions, &["id"]),
3875 (Table::Messages, &["id"]),
3876 (Table::Parts, &["id"]),
3877 ];
3878 for (table, projection) in cases {
3879 let scanner = handle
3880 .scan(table, ScanOpts::project_only(projection))
3881 .await?;
3882 let batch = scanner.try_into_batch().await?;
3883 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
3884 }
3885 Ok(())
3886 }
3887}