1use crate::{
5 RetryPolicy,
6 config::{self, CredsSet},
7 handlers::NamespaceIdent,
8 sessions::{self},
9};
10use anyhow::{Context, Result, anyhow, bail};
11use lance::Dataset;
12use lance::dataset::builder::DatasetBuilder;
13use lance::dataset::index::DatasetIndexRemapperOptions;
14use lance::dataset::optimize::{CompactionOptions, commit_compaction, plan_compaction};
15use lance::dataset::write::merge_insert::SourceDedupeBehavior;
16use lance::dataset::{MergeInsertBuilder, WhenMatched, WhenNotMatched, WriteMode};
17use lance::deps::arrow_array::{RecordBatch, RecordBatchIterator};
18use lance::index::DatasetIndexExt;
19use lance::index::DatasetIndexInternalExt;
20use lance::index::vector::VectorIndexParams;
21use lance::session::Session;
22use lance_index::IndexType;
23use lance_index::optimize::OptimizeOptions;
24use lance_index::scalar::{BuiltinIndexType, InvertedIndexParams, ScalarIndexParams};
25use lance_io::object_store::{
26 ObjectStore, ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor, uri_to_url,
27};
28use lance_linalg::distance::MetricType;
29use lance_namespace::LanceNamespace;
30use lance_namespace::error::{ErrorCode, NamespaceError};
31use lance_namespace::models::DescribeTableRequest;
32use lance_namespace_impls::ConnectBuilder;
33use std::{
34 collections::{BTreeMap, HashMap},
35 sync::Arc,
36 time::{Duration, Instant},
37};
38use tokio::sync::{Mutex, OnceCell};
39use tokio_stream::StreamExt;
40use url::Url;
41pub const VECTOR_INDEX_ACTIVATION_ROWS: usize = 100_000;
46
47pub const DEFAULT_INDEX_LAG_THRESHOLD: usize = 4;
53
54static INDEX_LAG_THRESHOLD_RUNTIME: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
55
56pub fn init_index_lag_threshold(value: usize) {
59 INDEX_LAG_THRESHOLD_RUNTIME.get_or_init(|| value);
60}
61
62pub fn index_lag_threshold() -> usize {
63 INDEX_LAG_THRESHOLD_RUNTIME
64 .get()
65 .copied()
66 .unwrap_or(DEFAULT_INDEX_LAG_THRESHOLD)
67}
68
69#[derive(Debug, Clone, PartialEq)]
79pub struct StorageUrl {
80 canonical: Url,
84 lance: Url,
86 scheme_options: Vec<(&'static str, String)>,
88 query_options: Vec<(&'static str, String)>,
90 creds_pointer: Option<String>,
92 endpoint: Option<S3Endpoint>,
97}
98
99#[derive(Debug, Clone, PartialEq)]
100struct S3Endpoint {
101 scheme: &'static str,
102 authority: String,
104 bucket: String,
105}
106
107const RECOGNIZED_QUERY_PARAMS: [&str; 3] = ["creds", "region", "virtual_hosted_style_request"];
111
112impl StorageUrl {
113 pub fn parse(input: &str) -> Result<Self> {
117 let trimmed = input.trim();
118 if trimmed.is_empty() {
119 bail!("storage path is empty");
120 }
121 if !trimmed.contains("://") || trimmed.starts_with("file://") {
124 let url =
125 uri_to_url(trimmed).with_context(|| format!("invalid storage path {trimmed:?}"))?;
126 if url.query().is_some() {
131 bail!("storage URL {trimmed:?} carries query params; local URLs take none");
132 }
133 return Ok(Self::plain(url));
134 }
135 let url =
136 Url::parse(trimmed).with_context(|| format!("invalid storage URL {trimmed:?}"))?;
137 if !url.username().is_empty() || url.password().is_some() {
139 bail!(
140 "storage URL {trimmed:?} embeds credentials; put them in [creds.*] (or POND_CREDS_*) instead"
141 );
142 }
143 match url.scheme() {
144 "memory" | "shared-memory" => {
145 if url.query().is_some() {
146 bail!(
147 "storage URL {trimmed:?} carries query params; {}:// URLs take none",
148 url.scheme(),
149 );
150 }
151 Ok(Self::plain(url))
152 }
153 "s3" | "gs" => {
154 let (canonical, query_options, creds_pointer) = strip_query(url)?;
155 let mut lance = canonical.clone();
156 lance.set_query(None);
157 Ok(Self {
158 canonical,
159 lance,
160 scheme_options: Vec::new(),
161 query_options,
162 creds_pointer,
163 endpoint: None,
164 })
165 }
166 "s3+https" | "s3+http" => {
167 let (mut canonical, query_options, creds_pointer) = strip_query(url)?;
168 let tls = canonical.scheme() == "s3+https";
169 if canonical.port() == Some(if tls { 443 } else { 80 }) {
173 let _ = canonical.set_port(None);
174 }
175 let host = canonical
176 .host_str()
177 .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no endpoint host"))?;
178 let endpoint_authority = match canonical.port() {
179 Some(port) => format!("{host}:{port}"),
180 None => host.to_owned(),
181 };
182 let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
183 let bucket = segments.next().unwrap_or_default().to_owned();
184 let prefix = segments.next().unwrap_or_default().to_owned();
185 if bucket.is_empty() {
186 bail!(
187 "storage URL {trimmed:?} is missing the bucket: the form is {}://host/bucket/prefix",
188 canonical.scheme(),
189 );
190 }
191 let lance = Url::parse(&format!("s3://{bucket}/{prefix}")).with_context(|| {
192 format!("storage URL {trimmed:?}: bucket/prefix do not form a valid s3:// URL")
193 })?;
194 let scheme = if tls { "https" } else { "http" };
195 let virtual_hosted = host.parse::<std::net::IpAddr>().is_err()
204 && !matches!(canonical.host(), Some(url::Host::Ipv6(_)));
205 let scheme_options = vec![
206 ("allow_http", (!tls).to_string()),
207 ("virtual_hosted_style_request", virtual_hosted.to_string()),
208 ("region", "us-east-1".to_owned()),
215 ];
216 Ok(Self {
217 canonical,
218 lance,
219 scheme_options,
220 query_options,
221 creds_pointer,
222 endpoint: Some(S3Endpoint {
223 scheme,
224 authority: endpoint_authority,
225 bucket,
226 }),
227 })
228 }
229 "az" => {
230 let (canonical, query_options, creds_pointer) = strip_query(url)?;
231 let account = canonical
232 .host_str()
233 .ok_or_else(|| anyhow!("storage URL {trimmed:?} has no account: the form is az://account/container/prefix"))?
234 .to_owned();
235 let mut segments = canonical.path().trim_start_matches('/').splitn(2, '/');
236 let container = segments.next().unwrap_or_default();
237 if container.is_empty() {
238 bail!(
239 "storage URL {trimmed:?} is missing the container: the form is az://account/container/prefix"
240 );
241 }
242 let prefix = segments.next().unwrap_or_default();
243 let lance = Url::parse(&format!("az://{container}/{prefix}"))
244 .with_context(|| format!("storage URL {trimmed:?}: container/prefix do not form a valid az:// URL"))?;
245 Ok(Self {
246 canonical,
247 lance,
248 scheme_options: vec![("account_name", account)],
249 query_options,
250 creds_pointer,
251 endpoint: None,
252 })
253 }
254 other => bail!(
255 "storage URL scheme {other:?} not recognized; use a local path, s3://, s3+https://, s3+http://, gs://, or az://"
256 ),
257 }
258 }
259
260 fn plain(url: Url) -> Self {
262 Self {
263 canonical: url.clone(),
264 lance: url,
265 scheme_options: Vec::new(),
266 query_options: Vec::new(),
267 creds_pointer: None,
268 endpoint: None,
269 }
270 }
271
272 pub fn lance_url(&self) -> &Url {
274 &self.lance
275 }
276
277 pub fn canonical(&self) -> &Url {
280 &self.canonical
281 }
282
283 pub fn is_local(&self) -> bool {
284 config::is_local(&self.canonical)
285 }
286
287 pub fn display(&self) -> String {
289 config::display(&self.canonical)
290 }
291
292 fn takes_credentials(&self) -> bool {
295 !matches!(
296 self.canonical.scheme(),
297 "file" | "file+uring" | "memory" | "shared-memory"
298 )
299 }
300
301 pub fn resolve(&self, creds: &BTreeMap<String, CredsSet>) -> Result<ResolvedStorage> {
308 if !self.takes_credentials() {
309 return Ok(ResolvedStorage {
310 storage: self.clone(),
311 options: HashMap::new(),
312 binding: CredsBinding::NotApplicable,
313 });
314 }
315 let matched: Option<(&String, &CredsSet, BindVia)> = match &self.creds_pointer {
316 Some(name) => {
317 let set = creds.get(name).ok_or_else(|| {
318 anyhow!(
319 "URL names ?creds={name} but no [creds.{name}] set is configured; define it or drop the pointer"
320 )
321 })?;
322 Some((name, set, BindVia::Pointer))
323 }
324 None => {
325 let mut best: Option<(&String, &CredsSet, String)> = None;
326 for (name, set) in creds {
327 let Some(scope) = &set.scope else { continue };
328 let scope_url = parse_scope(scope).with_context(|| {
329 format!("[creds.{name}] scope {scope:?} is not a valid URL prefix")
330 })?;
331 if scope_matches(&scope_url, &self.canonical)
332 && best
333 .as_ref()
334 .is_none_or(|(_, _, len)| scope_url.as_str().len() > len.len())
335 {
336 best = Some((name, set, scope_url.as_str().to_owned()));
337 }
338 }
339 match best {
340 Some((name, set, _)) => Some((name, set, BindVia::Scope)),
341 None => creds
342 .iter()
343 .find(|(_, set)| set.scope.is_none())
344 .map(|(name, set)| (name, set, BindVia::CatchAll)),
345 }
346 }
347 };
348 let mut options: HashMap<String, String> = self
349 .scheme_options
350 .iter()
351 .map(|(key, value)| ((*key).to_owned(), value.clone()))
352 .collect();
353 let binding = match matched {
354 None => CredsBinding::Ambient,
355 Some((name, set, via)) => {
356 if let Some(region) = &set.region {
357 options.insert("region".to_owned(), region.clone());
358 }
359 if let Some(virtual_hosted) = set.virtual_hosted_style_request {
360 options.insert(
361 "virtual_hosted_style_request".to_owned(),
362 virtual_hosted.to_string(),
363 );
364 }
365 for (key, value) in &set.extra {
366 options.insert(key.clone(), value.clone());
367 }
368 if let Some(value) = materialize_secret(
369 name,
370 "access_key_id",
371 set.access_key_id.as_deref(),
372 set.access_key_id_file.as_deref(),
373 None,
374 )? {
375 options.insert("access_key_id".to_owned(), value);
376 }
377 if let Some(value) = materialize_secret(
378 name,
379 "secret_access_key",
380 set.secret_access_key.as_deref(),
381 set.secret_access_key_file.as_deref(),
382 set.secret_access_key_command.as_deref(),
383 )? {
384 options.insert("secret_access_key".to_owned(), value);
385 }
386 CredsBinding::Set {
387 name: name.clone(),
388 via,
389 }
390 }
391 };
392 for (key, value) in &self.query_options {
393 options.insert((*key).to_owned(), value.clone());
394 }
395 if let Some(endpoint) = &self.endpoint
400 && !options.keys().any(|key| {
401 key.eq_ignore_ascii_case("endpoint") || key.eq_ignore_ascii_case("aws_endpoint")
402 })
403 {
404 let virtual_hosted = options
405 .get("virtual_hosted_style_request")
406 .is_some_and(|value| value == "true");
407 let url = if virtual_hosted {
408 format!(
409 "{}://{}.{}",
410 endpoint.scheme, endpoint.bucket, endpoint.authority
411 )
412 } else {
413 format!("{}://{}", endpoint.scheme, endpoint.authority)
414 };
415 options.insert("endpoint".to_owned(), url);
416 }
417 Ok(ResolvedStorage {
418 storage: self.clone(),
419 options,
420 binding,
421 })
422 }
423}
424
425type StrippedQuery = (Url, Vec<(&'static str, String)>, Option<String>);
427
428fn strip_query(url: Url) -> Result<StrippedQuery> {
430 let mut query_options = Vec::new();
431 let mut creds_pointer = None;
432 for (key, value) in url.query_pairs() {
433 match RECOGNIZED_QUERY_PARAMS
434 .iter()
435 .find(|known| **known == key.as_ref())
436 {
437 Some(&"creds") => creds_pointer = Some(value.into_owned()),
438 Some(known) => query_options.push((*known, value.into_owned())),
439 None => bail!(
440 "storage URL query param {key:?} not recognized (known: {})",
441 RECOGNIZED_QUERY_PARAMS.join(", "),
442 ),
443 }
444 }
445 let mut canonical = url;
446 canonical.set_query(None);
447 Ok((canonical, query_options, creds_pointer))
448}
449
450pub(crate) fn parse_scope(scope: &str) -> Result<Url> {
453 let mut url = Url::parse(scope.trim())?;
454 if !url.username().is_empty() || url.password().is_some() {
455 bail!("scope embeds credentials");
456 }
457 if url.query().is_some() {
458 bail!("scope carries query params; scopes are plain URL prefixes");
459 }
460 match (url.scheme(), url.port()) {
461 ("s3+https", Some(443)) | ("s3+http", Some(80)) => {
462 let _ = url.set_port(None);
463 }
464 _ => {}
465 }
466 Ok(url)
467}
468
469fn scope_matches(scope: &Url, address: &Url) -> bool {
474 if scope.scheme() != address.scheme()
475 || scope.host_str() != address.host_str()
476 || scope.port() != address.port()
477 {
478 return false;
479 }
480 let scope_path = scope.path().trim_end_matches('/');
481 let address_path = address.path().trim_end_matches('/');
482 address_path == scope_path
483 || address_path
484 .strip_prefix(scope_path)
485 .is_some_and(|rest| rest.starts_with('/'))
486}
487
488#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491pub enum BindVia {
492 Pointer,
494 Scope,
496 CatchAll,
498}
499
500#[derive(Debug, Clone, PartialEq)]
501pub enum CredsBinding {
502 Set { name: String, via: BindVia },
504 Ambient,
509 NotApplicable,
511}
512
513impl CredsBinding {
514 pub fn describe(&self) -> String {
516 match self {
517 Self::Set { name, via } => {
518 let via = match via {
519 BindVia::Pointer => "?creds",
520 BindVia::Scope => "scope match",
521 BindVia::CatchAll => "catch-all",
522 };
523 format!("creds {name} ({via})")
524 }
525 Self::Ambient => "ambient chain".to_owned(),
526 Self::NotApplicable => "local (no credentials)".to_owned(),
527 }
528 }
529}
530
531#[derive(Debug, Clone)]
535pub struct ResolvedStorage {
536 storage: StorageUrl,
537 pub options: HashMap<String, String>,
538 pub binding: CredsBinding,
539}
540
541impl ResolvedStorage {
542 pub fn lance_url(&self) -> &Url {
543 self.storage.lance_url()
544 }
545
546 pub fn display(&self) -> String {
547 self.storage.display()
548 }
549}
550
551pub fn unmatched_creds_sets<'c>(
556 resolved: &[&ResolvedStorage],
557 creds: &'c BTreeMap<String, CredsSet>,
558) -> Vec<&'c str> {
559 if resolved
560 .iter()
561 .all(|entry| matches!(entry.binding, CredsBinding::NotApplicable))
562 {
563 return Vec::new();
564 }
565 creds
566 .keys()
567 .filter(|name| {
568 !resolved.iter().any(|entry| {
569 matches!(&entry.binding, CredsBinding::Set { name: bound, .. } if bound == *name)
570 })
571 })
572 .map(String::as_str)
573 .collect()
574}
575
576fn materialize_secret(
579 set: &str,
580 field: &str,
581 inline: Option<&str>,
582 file: Option<&std::path::Path>,
583 command: Option<&str>,
584) -> Result<Option<String>> {
585 if let Some(value) = inline {
586 return Ok(Some(value.to_owned()));
587 }
588 if let Some(path) = file {
589 let text = std::fs::read_to_string(path).with_context(|| {
590 format!(
591 "[creds.{set}] {field}_file: failed to read {}",
592 path.display()
593 )
594 })?;
595 return Ok(Some(strip_one_newline(text)));
596 }
597 if let Some(command) = command {
598 return Ok(Some(run_secret_command(set, field, command)?));
599 }
600 Ok(None)
601}
602
603fn run_secret_command(set: &str, field: &str, command: &str) -> Result<String> {
606 static CACHE: std::sync::OnceLock<std::sync::Mutex<HashMap<String, String>>> =
607 std::sync::OnceLock::new();
608 let cache = CACHE.get_or_init(Default::default);
609 if let Some(hit) = cache
610 .lock()
611 .unwrap_or_else(std::sync::PoisonError::into_inner)
612 .get(command)
613 {
614 return Ok(hit.clone());
615 }
616 let output = std::process::Command::new("sh")
617 .arg("-c")
618 .arg(command)
619 .output()
620 .with_context(|| format!("[creds.{set}] {field}_command failed to spawn: {command}"))?;
621 if !output.status.success() {
622 bail!(
623 "[creds.{set}] {field}_command exited {}: {command}\n{}",
624 output.status,
625 String::from_utf8_lossy(&output.stderr).trim_end(),
626 );
627 }
628 let value = strip_one_newline(
629 String::from_utf8(output.stdout)
630 .with_context(|| format!("[creds.{set}] {field}_command output is not UTF-8"))?,
631 );
632 cache
633 .lock()
634 .unwrap_or_else(std::sync::PoisonError::into_inner)
635 .insert(command.to_owned(), value.clone());
636 Ok(value)
637}
638
639fn strip_one_newline(mut text: String) -> String {
642 if text.ends_with('\n') {
643 text.pop();
644 if text.ends_with('\r') {
645 text.pop();
646 }
647 }
648 text
649}
650
651#[derive(Debug, thiserror::Error)]
659pub enum CheckFailure {
660 #[error(
661 "authentication failed and no creds set matched this URL; define [creds.*] (or POND_CREDS_*), or provide ambient AWS_* credentials"
662 )]
663 NoCreds { source: anyhow::Error },
664 #[error("authentication failed using creds set {set:?}; check its keys and scope")]
665 Auth { set: String, source: anyhow::Error },
666 #[error(
667 "backend does not enforce conditional writes (If-None-Match); concurrent pond writers would corrupt each other - {detail}"
668 )]
669 OccUnsupported { detail: String },
670 #[error("storage probe failed")]
671 Io { source: anyhow::Error },
672}
673
674impl CheckFailure {
675 pub fn concise_cause(&self) -> Option<String> {
681 let source = match self {
682 Self::NoCreds { source } | Self::Auth { source, .. } | Self::Io { source } => source,
683 Self::OccUnsupported { .. } => return None,
684 };
685 Some(condense_error_chain(source))
686 }
687}
688
689fn condense_error_chain(error: &anyhow::Error) -> String {
696 let mut text = error
697 .chain()
698 .last()
699 .map(ToString::to_string)
700 .unwrap_or_else(|| format!("{error:#}"));
701 if let Some(pos) = text.find(", <WORKSPACE>") {
702 text.truncate(pos);
703 }
704 text = text.replace(
705 "Encountered internal error. Please file a bug report at https://github.com/lance-format/lance/issues. ",
706 "",
707 );
708 let line = text.split_whitespace().collect::<Vec<_>>().join(" ");
709 const HEAD: usize = 120;
710 const TAIL: usize = 120;
711 let chars: Vec<char> = line.chars().collect();
712 if chars.len() > HEAD + TAIL + 5 {
713 let head: String = chars[..HEAD].iter().collect();
714 let tail: String = chars[chars.len() - TAIL..].iter().collect();
715 format!("{head} ... {tail}")
716 } else {
717 line
718 }
719}
720
721pub async fn storage_check(resolved: &ResolvedStorage) -> std::result::Result<(), CheckFailure> {
726 use object_store::{Error as OsError, ObjectStoreExt, PutMode, PutOptions, PutPayload};
727
728 let classify =
729 |error: OsError, step: &str| classify_check_error(error, &resolved.binding, step);
730
731 let probe_uri = format!(
732 "{}/_config-check/{}",
733 resolved.lance_url().as_str().trim_end_matches('/'),
734 uuid::Uuid::now_v7(),
735 );
736 let params = ObjectStoreParams {
737 storage_options_accessor: (!resolved.options.is_empty()).then(|| {
738 Arc::new(StorageOptionsAccessor::with_static_options(
739 resolved.options.clone(),
740 ))
741 }),
742 ..Default::default()
743 };
744 let registry = Arc::new(ObjectStoreRegistry::default());
745 let (store, path) = ObjectStore::from_uri_and_params(registry, &probe_uri, ¶ms)
746 .await
747 .map_err(|error| CheckFailure::Io {
748 source: anyhow!(error).context(format!("failed to open object store for {probe_uri}")),
749 })?;
750
751 let body: &[u8] = b"pond storage check";
752 let create = PutOptions::from(PutMode::Create);
753 store
754 .inner
755 .put_opts(&path, PutPayload::from_static(body), create.clone())
756 .await
757 .map_err(|error| classify(error, "initial conditional put"))?;
758 let outcome = async {
762 match store
767 .inner
768 .put_opts(&path, PutPayload::from_static(body), create)
769 .await
770 {
771 Err(OsError::AlreadyExists { .. }) => {}
772 Ok(_) => {
773 return Err(CheckFailure::OccUnsupported {
774 detail: "a second create over an existing key succeeded".to_owned(),
775 });
776 }
777 Err(OsError::NotImplemented { .. }) => {
778 return Err(CheckFailure::OccUnsupported {
779 detail: "the backend rejects conditional puts as unimplemented".to_owned(),
780 });
781 }
782 Err(error) => return Err(classify(error, "conditional-put probe")),
783 }
784 let read_back = store
785 .inner
786 .get(&path)
787 .await
788 .map_err(|error| classify(error, "read-back"))?
789 .bytes()
790 .await
791 .map_err(|error| classify(error, "read-back body"))?;
792 if read_back.as_ref() != body {
793 return Err(CheckFailure::Io {
794 source: anyhow!("read-back returned different bytes than written"),
795 });
796 }
797 Ok(())
798 }
799 .await;
800 let cleanup = store.inner.delete(&path).await;
801 outcome?;
802 cleanup.map_err(|error| classify(error, "cleanup delete"))?;
803 Ok(())
804}
805
806fn classify_check_error(
810 error: object_store::Error,
811 binding: &CredsBinding,
812 step: &str,
813) -> CheckFailure {
814 use object_store::Error as OsError;
815 let auth_class = matches!(
820 error,
821 OsError::Unauthenticated { .. } | OsError::PermissionDenied { .. }
822 ) || {
823 let rendered = error.to_string();
824 rendered.contains("CredentialsNotLoaded")
825 || rendered.contains("no providers in chain provided credentials")
826 };
827 match (auth_class, binding) {
828 (true, CredsBinding::Set { name, .. }) => CheckFailure::Auth {
829 set: name.clone(),
830 source: anyhow!(error).context(step.to_owned()),
831 },
832 (true, _) => CheckFailure::NoCreds {
833 source: anyhow!(error).context(step.to_owned()),
834 },
835 (false, _) => CheckFailure::Io {
836 source: anyhow!(error).context(step.to_owned()),
837 },
838 }
839}
840
841pub const DEFAULT_COMPACTION_FRAGMENT_CAP: usize = 64;
845
846pub const TARGET_FRAGMENT_BYTES: u64 = 256 * 1024 * 1024;
850
851const MIN_TARGET_ROWS_PER_FRAGMENT: u64 = 50_000;
852const MAX_TARGET_ROWS_PER_FRAGMENT: u64 = 1024 * 1024;
854
855pub const COMPACTION_ABSORB_FACTOR: u64 = 4;
858
859pub fn default_cleanup_older_than() -> chrono::Duration {
865 chrono::Duration::days(1)
866}
867
868#[derive(Debug, Clone, Copy)]
873pub struct MaintenancePolicy {
874 pub compaction_fragment_cap: usize,
876 pub cleanup_older_than: chrono::Duration,
878}
879
880impl MaintenancePolicy {
881 pub fn always_compact() -> Self {
883 Self {
884 compaction_fragment_cap: 0,
885 cleanup_older_than: default_cleanup_older_than(),
886 }
887 }
888}
889
890struct FragmentStat {
891 bytes: Option<u64>,
893 rows: u64,
894 deleted_rows: u64,
895}
896
897fn fragment_bytes(fragment: &lance::table::format::Fragment) -> Option<u64> {
900 fragment.files.iter().try_fold(0u64, |total, file| {
901 Some(total + file.file_size_bytes.get()?.get())
902 })
903}
904
905fn fragment_stat(fragment: &lance::table::format::Fragment) -> FragmentStat {
906 FragmentStat {
907 bytes: fragment_bytes(fragment),
908 rows: fragment.physical_rows.unwrap_or(0) as u64,
909 deleted_rows: fragment
910 .deletion_file
911 .as_ref()
912 .and_then(|deletions| deletions.num_deleted_rows)
913 .unwrap_or(0) as u64,
914 }
915}
916
917fn derived_target_rows(stats: &[FragmentStat]) -> usize {
919 let (mut bytes, mut rows) = (0u64, 0u64);
920 for stat in stats {
921 if let Some(fragment_bytes) = stat.bytes
922 && stat.rows > 0
923 {
924 bytes += fragment_bytes;
925 rows += stat.rows;
926 }
927 }
928 if bytes == 0 || rows == 0 {
929 return MAX_TARGET_ROWS_PER_FRAGMENT as usize;
930 }
931 let avg_row_bytes = (bytes / rows).max(1);
932 (TARGET_FRAGMENT_BYTES / avg_row_bytes)
933 .clamp(MIN_TARGET_ROWS_PER_FRAGMENT, MAX_TARGET_ROWS_PER_FRAGMENT) as usize
934}
935
936fn keep_task(stats: &[FragmentStat], cap: usize, deletion_threshold: f32) -> bool {
941 if stats.iter().any(|stat| {
942 stat.rows > 0 && (stat.deleted_rows as f32 / stat.rows as f32) > deletion_threshold
943 }) {
944 return true;
945 }
946 if stats.len() >= cap {
947 return true;
948 }
949 let weights: Vec<u64> = if stats.iter().all(|stat| stat.bytes.is_some()) {
950 stats.iter().filter_map(|stat| stat.bytes).collect()
951 } else {
952 stats.iter().map(|stat| stat.rows).collect()
953 };
954 let total: u64 = weights.iter().sum();
955 let largest = weights.iter().copied().max().unwrap_or(0);
956 (total - largest) * COMPACTION_ABSORB_FACTOR >= largest
957}
958
959#[derive(Debug, Clone)]
962pub struct IndexIntent {
963 pub name: &'static str,
966 pub column: &'static str,
968 pub trigger: IndexTrigger,
970 pub params: IndexParamsKind,
973}
974
975#[derive(Debug, Clone)]
977pub enum IndexTrigger {
978 OnAnyRows,
981 OnNonNullCount {
984 column: &'static str,
985 threshold: usize,
986 },
987}
988
989#[derive(Debug, Clone)]
992pub enum IndexParamsKind {
993 Scalar(BuiltinIndexType),
996 InvertedFtsNgram { min: u32, max: u32 },
1000 IvfPqCosine {
1005 sub_vectors: usize,
1006 num_bits: u8,
1007 max_iters: usize,
1008 },
1009}
1010
1011impl IndexTrigger {
1012 async fn should_create(&self, dataset: &Dataset) -> Result<bool> {
1013 match self {
1014 Self::OnAnyRows => Ok(dataset.count_rows(None).await? > 0),
1015 Self::OnNonNullCount { column, threshold } => {
1016 let count = dataset
1017 .count_rows(Some(format!("{column} IS NOT NULL")))
1018 .await?;
1019 Ok(count >= *threshold)
1020 }
1021 }
1022 }
1023}
1024
1025impl IndexParamsKind {
1026 fn index_type(&self) -> IndexType {
1027 match self {
1028 Self::Scalar(BuiltinIndexType::Bitmap) => IndexType::Bitmap,
1029 Self::Scalar(_) => IndexType::BTree,
1030 Self::InvertedFtsNgram { .. } => IndexType::Inverted,
1031 Self::IvfPqCosine { .. } => IndexType::Vector,
1032 }
1033 }
1034
1035 async fn build(&self, dataset: &Dataset) -> Result<Box<dyn lance::index::IndexParams>> {
1036 match self {
1037 Self::Scalar(kind) => Ok(Box::new(ScalarIndexParams::for_builtin(kind.clone()))),
1038 Self::InvertedFtsNgram { min, max } => Ok(Box::new(
1039 InvertedIndexParams::default()
1040 .base_tokenizer("ngram".to_owned())
1041 .ngram_min_length(*min)
1042 .ngram_max_length(*max)
1043 .stem(false)
1044 .remove_stop_words(false),
1045 )),
1046 Self::IvfPqCosine {
1047 sub_vectors,
1048 num_bits,
1049 max_iters,
1050 } => {
1051 let count = dataset
1052 .count_rows(Some("vector IS NOT NULL".to_owned()))
1053 .await?;
1054 let partitions = count.checked_div(4096).unwrap_or(0).max(1);
1055 Ok(Box::new(VectorIndexParams::ivf_pq(
1056 partitions,
1057 *num_bits,
1058 *sub_vectors,
1059 MetricType::Cosine,
1060 *max_iters,
1061 )))
1062 }
1063 }
1064 }
1065}
1066
1067#[derive(Debug, Clone, PartialEq, Eq)]
1068pub struct IndexStatus {
1069 pub table: Table,
1070 pub intent_name: String,
1071 pub fragments_covered: usize,
1072 pub unindexed_fragments: usize,
1073 pub unindexed_rows: usize,
1074 pub exists: bool,
1075}
1076
1077#[derive(Debug, Clone, Copy)]
1082pub struct ConflictExhausted {
1083 pub attempts: u8,
1084}
1085
1086impl std::fmt::Display for ConflictExhausted {
1087 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1088 write!(
1089 formatter,
1090 "commit conflict exhausted after {} attempt(s)",
1091 self.attempts
1092 )
1093 }
1094}
1095
1096impl std::error::Error for ConflictExhausted {}
1097
1098#[derive(Debug)]
1103pub enum PhaseOutcome {
1104 Ok,
1106 Noop,
1108 SkippedConflict,
1111 Failed(anyhow::Error),
1113 NotAttempted,
1116}
1117
1118impl PhaseOutcome {
1119 pub fn is_failed(&self) -> bool {
1120 matches!(self, Self::Failed(_))
1121 }
1122}
1123
1124#[derive(Debug)]
1126pub struct TableOptimizeOutcome {
1127 pub table: Table,
1128 pub indices: PhaseOutcome,
1129 pub compaction: PhaseOutcome,
1130}
1131
1132#[derive(Debug, Clone)]
1135pub enum OptimizeEvent {
1136 PhaseStart {
1137 table: Table,
1138 phase: OptimizePhase,
1139 detail: Option<String>,
1140 },
1141 PhaseDone {
1142 table: Table,
1143 phase: OptimizePhase,
1144 elapsed_ms: u64,
1145 },
1146}
1147
1148#[derive(Debug, Clone, Copy)]
1149pub enum OptimizePhase {
1150 Compact,
1151 Cleanup,
1152 IndexCreate,
1153 IndexRebuild,
1154 IndexAppend,
1155}
1156
1157impl OptimizePhase {
1158 pub fn label(self) -> &'static str {
1159 match self {
1160 Self::Compact => "compact",
1161 Self::Cleanup => "cleanup",
1162 Self::IndexCreate => "index-create",
1163 Self::IndexRebuild => "index-rebuild",
1164 Self::IndexAppend => "index-append",
1165 }
1166 }
1167}
1168
1169pub type OptimizeProgressFn = Box<dyn Fn(OptimizeEvent) + Send + Sync>;
1170
1171fn emit(progress: Option<&OptimizeProgressFn>, event: OptimizeEvent) {
1172 if let Some(callback) = progress {
1173 callback(event);
1174 }
1175}
1176
1177pub fn is_commit_conflict(error: &anyhow::Error) -> bool {
1181 error.downcast_ref::<lance::Error>().is_some_and(|err| {
1182 matches!(
1183 err,
1184 lance::Error::CommitConflict { .. }
1185 | lance::Error::RetryableCommitConflict { .. }
1186 | lance::Error::TooMuchWriteContention { .. }
1187 )
1188 })
1189}
1190
1191fn is_conflict_exhausted(error: &anyhow::Error) -> bool {
1194 error.chain().any(|cause| cause.is::<ConflictExhausted>())
1195}
1196
1197#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1201pub struct TableSizes {
1202 pub sessions: u64,
1203 pub messages: u64,
1204 pub parts: u64,
1205 pub other: u64,
1206 pub sessions_data: DataLiveness,
1207 pub messages_data: DataLiveness,
1208 pub parts_data: DataLiveness,
1209}
1210
1211#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1214pub struct DataLiveness {
1215 pub on_disk: u64,
1216 pub live: Option<u64>,
1218}
1219
1220impl DataLiveness {
1221 pub fn dead(&self) -> Option<u64> {
1222 self.live.map(|live| self.on_disk.saturating_sub(live))
1223 }
1224}
1225
1226#[derive(Debug, Clone, PartialEq, Eq)]
1227pub enum ScalarValue {
1228 String(String),
1229 Int32(i32),
1230 Raw(String),
1231}
1232impl From<&str> for ScalarValue {
1233 fn from(value: &str) -> Self {
1234 Self::String(value.to_owned())
1235 }
1236}
1237impl From<String> for ScalarValue {
1238 fn from(value: String) -> Self {
1239 Self::String(value)
1240 }
1241}
1242impl From<i32> for ScalarValue {
1243 fn from(value: i32) -> Self {
1244 Self::Int32(value)
1245 }
1246}
1247#[derive(Debug, Clone, PartialEq, Eq)]
1248pub enum Predicate {
1249 Eq(&'static str, ScalarValue),
1250 Ne(&'static str, ScalarValue),
1251 IsNull(&'static str),
1252 IsNotNull(&'static str),
1253 In(&'static str, Vec<ScalarValue>),
1254 LikeContains(&'static str, String),
1255 Regex(&'static str, String),
1260 Gte(&'static str, ScalarValue),
1261 Lte(&'static str, ScalarValue),
1262 And(Vec<Predicate>),
1263 Or(Vec<Predicate>),
1264 Not(Box<Predicate>),
1265}
1266impl Predicate {
1267 pub fn to_lance(&self) -> String {
1268 match self {
1269 Self::Eq(column, value) => format!("{column} = {}", value.to_lance()),
1270 Self::Ne(column, value) => format!("{column} <> {}", value.to_lance()),
1271 Self::IsNull(column) => format!("{column} IS NULL"),
1272 Self::IsNotNull(column) => format!("{column} IS NOT NULL"),
1273 Self::In(column, values) => {
1274 let values = values
1275 .iter()
1276 .map(ScalarValue::to_lance)
1277 .collect::<Vec<_>>()
1278 .join(", ");
1279 format!("{column} IN ({values})")
1280 }
1281 Self::LikeContains(column, value) => {
1282 format!("{column} LIKE {} ESCAPE '\\'", like_contains(value))
1283 }
1284 Self::Regex(column, pattern) => {
1285 format!("regexp_like({column}, {})", quoted_string(pattern))
1286 }
1287 Self::Gte(column, value) => format!("{column} >= {}", value.to_lance()),
1288 Self::Lte(column, value) => format!("{column} <= {}", value.to_lance()),
1289 Self::And(predicates) => predicates
1290 .iter()
1291 .map(Self::to_lance)
1292 .filter(|predicate| !predicate.is_empty())
1293 .collect::<Vec<_>>()
1294 .join(" AND "),
1295 Self::Or(predicates) => {
1296 let body = predicates
1299 .iter()
1300 .map(Self::to_lance)
1301 .filter(|predicate| !predicate.is_empty())
1302 .collect::<Vec<_>>()
1303 .join(" OR ");
1304 if body.is_empty() {
1305 String::new()
1306 } else {
1307 format!("({body})")
1308 }
1309 }
1310 Self::Not(inner) => {
1311 let body = inner.to_lance();
1312 if body.is_empty() {
1313 String::new()
1314 } else {
1315 format!("NOT ({body})")
1316 }
1317 }
1318 }
1319 }
1320}
1321#[derive(Default)]
1324pub struct ScanOpts<'a> {
1325 pub predicate: Option<&'a Predicate>,
1326 pub projection: Option<&'a [&'a str]>,
1327}
1328
1329impl<'a> ScanOpts<'a> {
1330 pub fn project_only(projection: &'a [&'a str]) -> Self {
1331 Self {
1332 predicate: None,
1333 projection: Some(projection),
1334 }
1335 }
1336 pub fn with_predicate_and_projection(
1337 predicate: &'a Predicate,
1338 projection: &'a [&'a str],
1339 ) -> Self {
1340 Self {
1341 predicate: Some(predicate),
1342 projection: Some(projection),
1343 }
1344 }
1345}
1346
1347impl ScalarValue {
1348 fn to_lance(&self) -> String {
1349 match self {
1350 Self::String(value) => quoted_string(value),
1351 Self::Int32(value) => value.to_string(),
1352 Self::Raw(value) => value.clone(),
1353 }
1354 }
1355}
1356#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1360pub struct RuntimeCaps {
1361 pub index_cache_bytes: Option<usize>,
1362 pub metadata_cache_bytes: Option<usize>,
1363}
1364
1365impl RuntimeCaps {
1366 pub fn from_config(config: &crate::config::RuntimeConfig) -> Self {
1367 Self {
1368 index_cache_bytes: config.index_cache_bytes,
1369 metadata_cache_bytes: config.metadata_cache_bytes,
1370 }
1371 }
1372}
1373
1374const LOCAL_INDEX_CACHE_BYTES: usize = 256 * 1024 * 1024;
1378const LOCAL_METADATA_CACHE_BYTES: usize = 128 * 1024 * 1024;
1379const REMOTE_INDEX_CACHE_BYTES: usize = 2 * 1024 * 1024 * 1024;
1381const REMOTE_METADATA_CACHE_BYTES: usize = 512 * 1024 * 1024;
1382
1383fn resolve_cache_caps(location: &Url, caps: RuntimeCaps) -> (usize, usize) {
1384 let (index_default, metadata_default) = if config::is_local(location) {
1385 (LOCAL_INDEX_CACHE_BYTES, LOCAL_METADATA_CACHE_BYTES)
1386 } else {
1387 (REMOTE_INDEX_CACHE_BYTES, REMOTE_METADATA_CACHE_BYTES)
1388 };
1389 (
1390 caps.index_cache_bytes.unwrap_or(index_default),
1391 caps.metadata_cache_bytes.unwrap_or(metadata_default),
1392 )
1393}
1394
1395pub struct Handle {
1396 datasets: DatasetSet,
1397 retry: RetryPolicy,
1398 #[allow(dead_code)]
1406 session: Arc<Session>,
1407 nm: Arc<dyn LanceNamespace>,
1411 nm_ident: NamespaceIdent,
1415 storage_options: HashMap<String, String>,
1420 location: Url,
1424 parts_refresh_after: Duration,
1428}
1429
1430impl std::fmt::Debug for Handle {
1431 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1432 formatter
1433 .debug_struct("Handle")
1434 .field("datasets", &self.datasets)
1435 .field("retry", &self.retry)
1436 .field("nm_ident", &self.nm_ident)
1437 .field("storage_options", &self.storage_options)
1438 .field("location", &self.location)
1439 .finish()
1440 }
1441}
1442
1443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1444pub enum Table {
1445 Sessions,
1446 Messages,
1447 Parts,
1448}
1449impl Table {
1450 pub fn as_str(self) -> &'static str {
1451 self.label()
1452 }
1453
1454 fn label(self) -> &'static str {
1455 match self {
1456 Self::Sessions => "sessions",
1457 Self::Messages => "messages",
1458 Self::Parts => "parts",
1459 }
1460 }
1461}
1462#[derive(Debug)]
1463struct DatasetSet {
1464 sessions: Mutex<CachedDataset>,
1465 messages: Mutex<CachedDataset>,
1466 parts: OnceCell<Mutex<CachedDataset>>,
1474}
1475#[derive(Debug)]
1476struct CachedDataset {
1477 dataset: Dataset,
1478 last_refresh: Instant,
1479 refresh_after: Duration,
1480}
1481impl CachedDataset {
1482 async fn latest(&mut self) -> Result<Dataset> {
1483 if self.last_refresh.elapsed() >= self.refresh_after {
1484 self.dataset.checkout_latest().await?;
1485 self.last_refresh = Instant::now();
1486 }
1487 Ok(self.dataset.clone())
1488 }
1489 fn replace(&mut self, dataset: Dataset) {
1490 self.dataset = dataset;
1491 self.last_refresh = Instant::now();
1492 }
1493}
1494impl Handle {
1495 pub async fn open(location: &Url) -> Result<Self> {
1498 Self::open_with_options(location, HashMap::new(), RuntimeCaps::default()).await
1499 }
1500
1501 pub async fn open_with_options(
1507 location: &Url,
1508 mut storage_options: HashMap<String, String>,
1509 caps: RuntimeCaps,
1510 ) -> Result<Self> {
1511 if let Some(path) = config::local_path(location) {
1512 tokio::fs::create_dir_all(&path).await.with_context(|| {
1513 format!(
1514 "failed to create data dir {}; fix the storage destination ([storage].path in config) or re-run `pond init`",
1515 path.display()
1516 )
1517 })?;
1518 } else {
1519 apply_remote_storage_defaults(&mut storage_options);
1520 }
1521 let (index_cache_bytes, metadata_cache_bytes) = resolve_cache_caps(location, caps);
1527 let session = Arc::new(Session::new(
1528 index_cache_bytes,
1529 metadata_cache_bytes,
1530 Arc::new(ObjectStoreRegistry::default()),
1531 ));
1532 let root = location.as_str().trim_end_matches('/').to_string();
1538 let mut connect = ConnectBuilder::new("dir")
1539 .property("root", root)
1540 .session(session.clone());
1541 for (key, value) in &storage_options {
1545 connect = connect.property(format!("storage.{key}"), value.clone());
1546 }
1547 let nm: Arc<dyn LanceNamespace> = connect
1548 .connect()
1549 .await
1550 .context("failed to connect lance Directory namespace")?;
1551 let nm_ident = NamespaceIdent::root();
1552 let refresh_after = if config::is_local(location) {
1558 Duration::ZERO
1559 } else {
1560 Duration::from_secs(5)
1561 };
1562 let handle = Self {
1563 datasets: DatasetSet {
1564 sessions: Mutex::new(CachedDataset {
1565 dataset: open_or_create_via_ns(
1566 &nm,
1567 &nm_ident,
1568 sessions::SESSIONS,
1569 sessions::session_schema(),
1570 &session,
1571 &storage_options,
1572 )
1573 .await?,
1574 last_refresh: Instant::now(),
1575 refresh_after,
1576 }),
1577 messages: Mutex::new(CachedDataset {
1578 dataset: open_or_create_via_ns(
1579 &nm,
1580 &nm_ident,
1581 sessions::MESSAGES,
1582 sessions::message_schema(),
1583 &session,
1584 &storage_options,
1585 )
1586 .await?,
1587 last_refresh: Instant::now(),
1588 refresh_after,
1589 }),
1590 parts: OnceCell::new(),
1591 },
1592 retry: RetryPolicy::default(),
1593 session,
1594 nm,
1595 nm_ident,
1596 storage_options,
1597 location: location.clone(),
1598 parts_refresh_after: refresh_after,
1599 };
1600 Ok(handle)
1601 }
1602
1603 pub fn location(&self) -> &Url {
1604 &self.location
1605 }
1606
1607 pub fn storage_options(&self) -> &HashMap<String, String> {
1611 &self.storage_options
1612 }
1613
1614 fn export_uri(&self, name: &str) -> String {
1620 format!(
1621 "{}/exports/{name}",
1622 self.location.as_str().trim_end_matches('/')
1623 )
1624 }
1625
1626 fn object_store_params(&self) -> ObjectStoreParams {
1630 ObjectStoreParams {
1631 storage_options_accessor: (!self.storage_options.is_empty()).then(|| {
1632 Arc::new(StorageOptionsAccessor::with_static_options(
1633 self.storage_options.clone(),
1634 ))
1635 }),
1636 ..Default::default()
1637 }
1638 }
1639
1640 pub(crate) async fn export_write(&self, name: &str, bytes: &[u8]) -> Result<()> {
1643 let uri = self.export_uri(name);
1644 let registry = Arc::new(ObjectStoreRegistry::default());
1645 let (store, path) =
1646 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1647 .await
1648 .with_context(|| format!("failed to open object store for {uri}"))?;
1649 store
1650 .put(&path, bytes)
1651 .await
1652 .with_context(|| format!("failed to write export {uri}"))?;
1653 Ok(())
1654 }
1655
1656 pub(crate) async fn export_read(&self, name: &str) -> Result<Vec<u8>> {
1659 let uri = self.export_uri(name);
1660 let registry = Arc::new(ObjectStoreRegistry::default());
1661 let (store, path) =
1662 ObjectStore::from_uri_and_params(registry, &uri, &self.object_store_params())
1663 .await
1664 .with_context(|| format!("failed to open object store for {uri}"))?;
1665 let bytes = store
1666 .read_one_all(&path)
1667 .await
1668 .with_context(|| format!("failed to read export {uri}"))?;
1669 Ok(bytes.to_vec())
1670 }
1671
1672 pub(crate) fn export_local_path(&self, name: &str) -> Option<std::path::PathBuf> {
1677 if self.location.scheme() != "file" {
1678 return None;
1679 }
1680 let dir = self.location.to_file_path().ok()?;
1681 Some(dir.join("exports").join(name))
1682 }
1683
1684 pub async fn row_counts(&self) -> Result<(usize, usize, usize)> {
1685 Ok((
1686 self.count_rows(Table::Sessions).await?,
1687 self.count_rows(Table::Messages).await?,
1688 self.count_rows(Table::Parts).await?,
1689 ))
1690 }
1691
1692 pub(crate) async fn merge_insert(
1696 &self,
1697 table: Table,
1698 batch: RecordBatch,
1699 row_count: usize,
1700 ) -> Result<u64> {
1701 self.merge(
1702 table,
1703 batch,
1704 row_count,
1705 "merge_insert",
1706 WhenMatched::DoNothing,
1707 WhenNotMatched::InsertAll,
1708 )
1709 .await
1710 }
1711
1712 pub(crate) async fn merge_update(
1715 &self,
1716 table: Table,
1717 batch: RecordBatch,
1718 row_count: usize,
1719 ) -> Result<u64> {
1720 self.merge(
1721 table,
1722 batch,
1723 row_count,
1724 "merge_update",
1725 WhenMatched::UpdateAll,
1726 WhenNotMatched::DoNothing,
1727 )
1728 .await
1729 }
1730
1731 async fn merge(
1735 &self,
1736 table: Table,
1737 batch: RecordBatch,
1738 row_count: usize,
1739 op: &'static str,
1740 when_matched: WhenMatched,
1741 when_not_matched: WhenNotMatched,
1742 ) -> Result<u64> {
1743 if row_count == 0 {
1744 return Ok(0);
1745 }
1746 let started = Instant::now();
1747 let result = self
1748 .retry_lance(table.label(), || async {
1749 let mut cached = self.cached(table).await?.lock().await;
1750 let existing = cached.latest().await?;
1751 let reader = RecordBatchIterator::new([Ok(batch.clone())], batch.schema());
1752 let mut builder = MergeInsertBuilder::try_new(Arc::new(existing), Vec::new())?;
1753 builder.when_matched(when_matched.clone());
1754 builder.when_not_matched(when_not_matched.clone());
1755 builder.source_dedupe_behavior(SourceDedupeBehavior::FirstSeen);
1758 builder.skip_auto_cleanup(true);
1762 let (dataset, stats) = builder
1763 .try_build()?
1764 .execute_reader(Box::new(reader))
1765 .await?;
1766 cached.replace(dataset.as_ref().clone());
1767 Ok((
1768 stats.num_inserted_rows + stats.num_updated_rows,
1769 stats.num_skipped_duplicates,
1770 ))
1771 })
1772 .await;
1773 let skipped = result.as_ref().map(|(_, s)| *s).unwrap_or(0);
1774 tracing::info!(
1775 target: "pond::perf",
1776 op,
1777 table = %table.label(),
1778 rows = row_count,
1779 elapsed_ms = started.elapsed().as_millis() as u64,
1780 skipped,
1781 "merge",
1782 );
1783 result.map(|(affected, _)| affected)
1784 }
1785
1786 pub async fn optimize_table(
1795 &self,
1796 table: Table,
1797 intents: &[IndexIntent],
1798 progress: Option<&OptimizeProgressFn>,
1799 policy: &MaintenancePolicy,
1800 ) -> TableOptimizeOutcome {
1801 let compaction = self
1802 .run_optimize_compact_phase(table, progress, policy)
1803 .await;
1804 let indices = self
1805 .run_optimize_indices_phase(table, intents, progress)
1806 .await;
1807 TableOptimizeOutcome {
1808 table,
1809 indices,
1810 compaction,
1811 }
1812 }
1813
1814 pub async fn optimize_table_indices_only(
1818 &self,
1819 table: Table,
1820 intents: &[IndexIntent],
1821 progress: Option<&OptimizeProgressFn>,
1822 ) -> PhaseOutcome {
1823 self.run_optimize_indices_phase(table, intents, progress)
1824 .await
1825 }
1826
1827 async fn run_optimize_indices_phase(
1828 &self,
1829 table: Table,
1830 intents: &[IndexIntent],
1831 progress: Option<&OptimizeProgressFn>,
1832 ) -> PhaseOutcome {
1833 if intents.is_empty() {
1834 return PhaseOutcome::Noop;
1835 }
1836 let result = self
1837 .retry_lance(table.label(), || async {
1838 let mut guard = self.cached(table).await?.lock().await;
1839 let mut dataset = guard.latest().await?;
1840 let did_work =
1841 optimize_table_indices(&mut dataset, intents, table, progress).await?;
1842 guard.replace(dataset);
1843 Ok::<_, anyhow::Error>(did_work)
1844 })
1845 .await;
1846 match result {
1847 Ok(true) => PhaseOutcome::Ok,
1848 Ok(false) => PhaseOutcome::Noop,
1849 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1850 Err(error) => PhaseOutcome::Failed(error),
1851 }
1852 }
1853
1854 async fn run_optimize_compact_phase(
1855 &self,
1856 table: Table,
1857 progress: Option<&OptimizeProgressFn>,
1858 policy: &MaintenancePolicy,
1859 ) -> PhaseOutcome {
1860 let result = self
1861 .retry_lance(table.label(), || async {
1862 let mut guard = self.cached(table).await?.lock().await;
1863 let mut dataset = guard.latest().await?;
1864 optimize_table_compact(&mut dataset, table, progress, policy).await?;
1865 guard.replace(dataset);
1866 Ok::<_, anyhow::Error>(())
1867 })
1868 .await;
1869 match result {
1870 Ok(()) => PhaseOutcome::Ok,
1871 Err(error) if is_conflict_exhausted(&error) => PhaseOutcome::SkippedConflict,
1872 Err(error) => PhaseOutcome::Failed(error),
1873 }
1874 }
1875
1876 pub async fn rebuild_index(&self, table: Table, intent: &IndexIntent) -> Result<()> {
1877 self.retry_lance(table.label(), || async {
1878 let mut guard = self.cached(table).await?.lock().await;
1879 let mut dataset = guard.latest().await?;
1880 rebuild_index(&mut dataset, intent).await?;
1881 guard.replace(dataset);
1882 Ok(())
1883 })
1884 .await
1885 }
1886
1887 pub async fn index_status(
1888 &self,
1889 table: Table,
1890 intents: &[IndexIntent],
1891 ) -> Result<Vec<IndexStatus>> {
1892 let dataset = self.dataset(table).await?;
1893 index_status(table, &dataset, intents).await
1894 }
1895
1896 pub(crate) async fn dataset(&self, table: Table) -> Result<Dataset> {
1897 let mut cached = self.cached(table).await?.lock().await;
1898 cached.latest().await
1899 }
1900 pub(crate) async fn scanner(
1905 &self,
1906 table: Table,
1907 predicate: Option<&Predicate>,
1908 ) -> Result<lance::dataset::scanner::Scanner> {
1909 let dataset = self.dataset(table).await?;
1910 scanner_with_prefilter(&dataset, predicate)
1911 }
1912 pub async fn scan(
1915 &self,
1916 table: Table,
1917 opts: ScanOpts<'_>,
1918 ) -> Result<lance::dataset::scanner::Scanner> {
1919 let mut scanner = self.scanner(table, opts.predicate).await?;
1920 if let Some(projection) = opts.projection {
1921 scanner.project(projection)?;
1922 }
1923 Ok(scanner)
1924 }
1925 pub(crate) async fn scan_batch(
1926 &self,
1927 table: Table,
1928 predicate: Option<&Predicate>,
1929 projection: &[&str],
1930 ) -> Result<RecordBatch> {
1931 let opts = ScanOpts {
1932 predicate,
1933 projection: (!projection.is_empty()).then_some(projection),
1934 };
1935 self.scan(table, opts)
1936 .await?
1937 .try_into_batch()
1938 .await
1939 .context("scan failed")
1940 }
1941 pub async fn count_rows(&self, table: Table) -> Result<usize> {
1942 self.dataset(table)
1943 .await?
1944 .count_rows(None)
1945 .await
1946 .map_err(Into::into)
1947 }
1948 #[cfg(test)]
1950 pub(crate) async fn messages_index_names(&self) -> Result<Vec<String>> {
1951 let dataset = self.dataset(Table::Messages).await?;
1952 let indices = dataset.load_indices().await?;
1953 Ok(indices.iter().map(|index| index.name.clone()).collect())
1954 }
1955
1956 pub(crate) async fn unindexed_row_count(
1959 &self,
1960 table: Table,
1961 index_name: &str,
1962 ) -> Result<usize> {
1963 let dataset = self.dataset(table).await?;
1964 let fragments = dataset
1965 .unindexed_fragments(index_name)
1966 .await
1967 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
1968 Ok(fragments
1969 .iter()
1970 .map(|fragment| fragment.num_rows().unwrap_or(0))
1971 .sum())
1972 }
1973
1974 pub(crate) async fn drop_index(&self, table: Table, name: &str) -> Result<()> {
1980 let mut guard = self.cached(table).await?.lock().await;
1981 let mut dataset = guard.latest().await?;
1982 dataset
1983 .drop_index(name)
1984 .await
1985 .with_context(|| format!("drop_index({name}) failed for {}", table.label()))?;
1986 guard.replace(dataset);
1987 Ok(())
1988 }
1989
1990 async fn table_location(&self, table_name: &str) -> Result<String> {
1993 let request = DescribeTableRequest {
1994 id: Some(self.nm_ident.as_table_id(table_name)),
1995 ..Default::default()
1996 };
1997 let response = self
1998 .nm
1999 .describe_table(request)
2000 .await
2001 .with_context(|| format!("failed to describe table {table_name}"))?;
2002 response
2003 .location
2004 .with_context(|| format!("namespace returned no location for table {table_name}"))
2005 }
2006
2007 pub async fn initialized(&self) -> Result<bool> {
2013 let request = DescribeTableRequest {
2014 id: Some(self.nm_ident.as_table_id(sessions::PARTS)),
2015 ..Default::default()
2016 };
2017 match self.nm.describe_table(request).await {
2018 Ok(_) => Ok(true),
2019 Err(error) if is_namespace_error_code(&error, ErrorCode::TableNotFound) => Ok(false),
2020 Err(error) => {
2021 Err(anyhow::Error::from(error)).context("failed to probe table existence")
2022 }
2023 }
2024 }
2025
2026 pub async fn table_sizes(&self) -> Result<TableSizes> {
2030 let registry = Arc::new(ObjectStoreRegistry::default());
2031 let params = self.object_store_params();
2032
2033 let sessions = self
2034 .listed_size(
2035 ®istry,
2036 ¶ms,
2037 &self.table_location(sessions::SESSIONS).await?,
2038 )
2039 .await?;
2040 let messages = self
2041 .listed_size(
2042 ®istry,
2043 ¶ms,
2044 &self.table_location(sessions::MESSAGES).await?,
2045 )
2046 .await?;
2047 let parts = self
2048 .listed_size(
2049 ®istry,
2050 ¶ms,
2051 &self.table_location(sessions::PARTS).await?,
2052 )
2053 .await?;
2054 let root_total = self
2057 .listed_size(®istry, ¶ms, self.location.as_str())
2058 .await?;
2059 let other = root_total.saturating_sub(sessions + messages + parts);
2060 let sessions_data = self
2061 .data_liveness(®istry, ¶ms, Table::Sessions, sessions::SESSIONS)
2062 .await?;
2063 let messages_data = self
2064 .data_liveness(®istry, ¶ms, Table::Messages, sessions::MESSAGES)
2065 .await?;
2066 let parts_data = self
2067 .data_liveness(®istry, ¶ms, Table::Parts, sessions::PARTS)
2068 .await?;
2069 Ok(TableSizes {
2070 sessions,
2071 messages,
2072 parts,
2073 other,
2074 sessions_data,
2075 messages_data,
2076 parts_data,
2077 })
2078 }
2079
2080 async fn data_liveness(
2081 &self,
2082 registry: &Arc<ObjectStoreRegistry>,
2083 params: &ObjectStoreParams,
2084 table: Table,
2085 table_name: &str,
2086 ) -> Result<DataLiveness> {
2087 let location = self.table_location(table_name).await?;
2088 let data_dir = format!("{}/data", location.trim_end_matches('/'));
2089 let on_disk = self.listed_size(registry, params, &data_dir).await?;
2090 let dataset = self.dataset(table).await?;
2091 let live = dataset
2092 .get_fragments()
2093 .iter()
2094 .try_fold(0u64, |total, fragment| {
2095 Some(total + fragment_bytes(fragment.metadata())?)
2096 });
2097 Ok(DataLiveness { on_disk, live })
2098 }
2099
2100 async fn listed_size(
2102 &self,
2103 registry: &Arc<ObjectStoreRegistry>,
2104 params: &ObjectStoreParams,
2105 uri: &str,
2106 ) -> Result<u64> {
2107 let (store, base) = ObjectStore::from_uri_and_params(registry.clone(), uri, params)
2108 .await
2109 .with_context(|| format!("failed to open object store for {uri}"))?;
2110 let mut listing = store.list(Some(base));
2111 let mut total = 0u64;
2112 while let Some(meta) = listing.next().await {
2113 let meta = meta.with_context(|| format!("listing {uri} failed"))?;
2114 total += meta.size;
2115 }
2116 Ok(total)
2117 }
2118 async fn cached(&self, table: Table) -> Result<&Mutex<CachedDataset>> {
2119 match table {
2120 Table::Sessions => Ok(&self.datasets.sessions),
2121 Table::Messages => Ok(&self.datasets.messages),
2122 Table::Parts => self.parts_cached().await,
2123 }
2124 }
2125
2126 async fn parts_cached(&self) -> Result<&Mutex<CachedDataset>> {
2129 self.datasets
2130 .parts
2131 .get_or_try_init(|| async {
2132 let dataset = open_or_create_via_ns(
2133 &self.nm,
2134 &self.nm_ident,
2135 sessions::PARTS,
2136 sessions::part_schema(),
2137 &self.session,
2138 &self.storage_options,
2139 )
2140 .await?;
2141 Ok::<_, anyhow::Error>(Mutex::new(CachedDataset {
2142 dataset,
2143 last_refresh: Instant::now(),
2144 refresh_after: self.parts_refresh_after,
2145 }))
2146 })
2147 .await
2148 }
2149 async fn retry_lance<T, Fut, Op>(&self, label: &str, mut operation: Op) -> Result<T>
2150 where
2151 Fut: std::future::Future<Output = Result<T>>,
2152 Op: FnMut() -> Fut,
2153 {
2154 let mut attempt = 0u8;
2155 loop {
2156 attempt = attempt.saturating_add(1);
2157 match operation().await {
2158 Ok(value) => return Ok(value),
2159 Err(error) if attempt < self.retry.attempts => {
2160 let backoff = self.backoff(attempt);
2161 let error_chain = format!("{error:#}");
2164 tracing::warn!(
2165 label,
2166 attempt,
2167 ?backoff,
2168 error = %error_chain,
2169 "retrying Lance operation"
2170 );
2171 tokio::time::sleep(backoff).await;
2172 }
2173 Err(error) => {
2174 let error_chain = format!("{error:#}");
2175 tracing::warn!(
2176 label,
2177 attempt,
2178 error = %error_chain,
2179 "Lance operation exhausted retries"
2180 );
2181 if is_commit_conflict(&error) {
2188 return Err(error.context(ConflictExhausted { attempts: attempt }));
2189 }
2190 return Err(error);
2191 }
2192 }
2193 }
2194 }
2195 fn backoff(&self, attempt: u8) -> Duration {
2196 let shift = u32::from(attempt.saturating_sub(1));
2197 let multiplier = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
2198 let base = self.retry.initial_backoff.saturating_mul(multiplier);
2199 let factor = (1.0 + self.retry.jitter * (fastrand::f64() * 2.0 - 1.0)).max(0.0);
2202 base.mul_f64(factor).min(self.retry.max_backoff)
2203 }
2204}
2205async fn optimize_table_compact(
2226 dataset: &mut Dataset,
2227 table: Table,
2228 progress: Option<&OptimizeProgressFn>,
2229 policy: &MaintenancePolicy,
2230) -> Result<()> {
2231 let stats: Vec<FragmentStat> = dataset
2232 .get_fragments()
2233 .iter()
2234 .map(|fragment| fragment_stat(fragment.metadata()))
2235 .collect();
2236 let compaction = CompactionOptions {
2237 target_rows_per_fragment: derived_target_rows(&stats),
2238 max_bytes_per_file: Some(TARGET_FRAGMENT_BYTES as usize),
2239 defer_index_remap: false,
2240 ..CompactionOptions::default()
2241 };
2242
2243 let mut plan = plan_compaction(dataset, &compaction).await?;
2244 if policy.compaction_fragment_cap > 0 {
2245 plan.tasks.retain(|task| {
2246 let task_stats: Vec<FragmentStat> = task.fragments.iter().map(fragment_stat).collect();
2247 let keep = keep_task(
2248 &task_stats,
2249 policy.compaction_fragment_cap,
2250 compaction.materialize_deletions_threshold,
2251 );
2252 if !keep {
2253 tracing::debug!(
2254 target: "pond::perf",
2255 table = table.as_str(),
2256 fragments = task_stats.len(),
2257 "compaction task vetoed: merge dominated by one large fragment",
2258 );
2259 }
2260 keep
2261 });
2262 }
2263 if plan.tasks.is_empty() {
2264 tracing::debug!(
2265 target: "pond::perf",
2266 table = table.as_str(),
2267 "compaction skipped: no task to run",
2268 );
2269 } else {
2270 emit(
2271 progress,
2272 OptimizeEvent::PhaseStart {
2273 table,
2274 phase: OptimizePhase::Compact,
2275 detail: None,
2276 },
2277 );
2278 let started = Instant::now();
2279 let mut completed = Vec::with_capacity(plan.tasks.len());
2280 for task in plan.compaction_tasks() {
2281 completed.push(task.execute(dataset).await?);
2282 }
2283 commit_compaction(
2284 dataset,
2285 completed,
2286 Arc::new(DatasetIndexRemapperOptions::default()),
2287 &compaction,
2288 )
2289 .await?;
2290 emit(
2291 progress,
2292 OptimizeEvent::PhaseDone {
2293 table,
2294 phase: OptimizePhase::Compact,
2295 elapsed_ms: started.elapsed().as_millis() as u64,
2296 },
2297 );
2298 }
2299
2300 emit(
2304 progress,
2305 OptimizeEvent::PhaseStart {
2306 table,
2307 phase: OptimizePhase::Cleanup,
2308 detail: None,
2309 },
2310 );
2311 let started = Instant::now();
2312 dataset
2313 .cleanup_old_versions(policy.cleanup_older_than, Some(false), Some(false))
2314 .await
2315 .context("cleanup_old_versions failed during index optimize")?;
2316 emit(
2317 progress,
2318 OptimizeEvent::PhaseDone {
2319 table,
2320 phase: OptimizePhase::Cleanup,
2321 elapsed_ms: started.elapsed().as_millis() as u64,
2322 },
2323 );
2324
2325 Ok(())
2326}
2327
2328async fn optimize_table_indices(
2331 dataset: &mut Dataset,
2332 intents: &[IndexIntent],
2333 table: Table,
2334 progress: Option<&OptimizeProgressFn>,
2335) -> Result<bool> {
2336 let existing = dataset.load_indices().await?;
2337 let existing_names: std::collections::HashSet<String> =
2338 existing.iter().map(|index| index.name.clone()).collect();
2339
2340 let mut append_indices: Vec<String> = Vec::new();
2341 let mut did_work = false;
2342
2343 for intent in intents {
2344 let exists = existing_names.contains(intent.name);
2345
2346 if !exists {
2347 if !intent.trigger.should_create(dataset).await? {
2348 continue;
2349 }
2350 let params = intent.params.build(dataset).await?;
2351 let index_type = intent.params.index_type();
2352 tracing::info!(
2353 index = intent.name,
2354 column = intent.column,
2355 "creating Lance index (trigger fired)",
2356 );
2357 emit(
2358 progress,
2359 OptimizeEvent::PhaseStart {
2360 table,
2361 phase: OptimizePhase::IndexCreate,
2362 detail: Some(intent.name.to_owned()),
2363 },
2364 );
2365 let started = Instant::now();
2366 dataset
2367 .create_index(
2368 &[intent.column],
2369 index_type,
2370 Some(intent.name.to_owned()),
2371 params.as_ref(),
2372 false,
2373 )
2374 .await
2375 .with_context(|| format!("failed to create index {}", intent.name))?;
2376 emit(
2377 progress,
2378 OptimizeEvent::PhaseDone {
2379 table,
2380 phase: OptimizePhase::IndexCreate,
2381 elapsed_ms: started.elapsed().as_millis() as u64,
2382 },
2383 );
2384 did_work = true;
2385 continue;
2386 }
2387
2388 let unindexed = dataset.unindexed_fragments(intent.name).await?;
2389 if unindexed.is_empty() {
2390 continue;
2391 }
2392 if unindexed.len() < index_lag_threshold() {
2396 continue;
2397 }
2398 match intent.params {
2399 IndexParamsKind::Scalar(BuiltinIndexType::BTree) => {
2400 let params = intent.params.build(dataset).await?;
2401 let index_type = intent.params.index_type();
2402 tracing::debug!(
2403 target: "pond::perf",
2404 index = intent.name,
2405 column = intent.column,
2406 "rebuilding Lance BTree index",
2407 );
2408 emit(
2409 progress,
2410 OptimizeEvent::PhaseStart {
2411 table,
2412 phase: OptimizePhase::IndexRebuild,
2413 detail: Some(intent.name.to_owned()),
2414 },
2415 );
2416 let started = Instant::now();
2417 dataset
2418 .create_index(
2419 &[intent.column],
2420 index_type,
2421 Some(intent.name.to_owned()),
2422 params.as_ref(),
2423 true,
2424 )
2425 .await
2426 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2427 emit(
2428 progress,
2429 OptimizeEvent::PhaseDone {
2430 table,
2431 phase: OptimizePhase::IndexRebuild,
2432 elapsed_ms: started.elapsed().as_millis() as u64,
2433 },
2434 );
2435 did_work = true;
2436 }
2437 IndexParamsKind::Scalar(BuiltinIndexType::Bitmap)
2438 | IndexParamsKind::InvertedFtsNgram { .. }
2439 | IndexParamsKind::IvfPqCosine { .. } => {
2440 append_indices.push(intent.name.to_owned());
2441 }
2442 IndexParamsKind::Scalar(_) => {
2443 let params = intent.params.build(dataset).await?;
2444 emit(
2445 progress,
2446 OptimizeEvent::PhaseStart {
2447 table,
2448 phase: OptimizePhase::IndexRebuild,
2449 detail: Some(intent.name.to_owned()),
2450 },
2451 );
2452 let started = Instant::now();
2453 dataset
2454 .create_index(
2455 &[intent.column],
2456 intent.params.index_type(),
2457 Some(intent.name.to_owned()),
2458 params.as_ref(),
2459 true,
2460 )
2461 .await
2462 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2463 emit(
2464 progress,
2465 OptimizeEvent::PhaseDone {
2466 table,
2467 phase: OptimizePhase::IndexRebuild,
2468 elapsed_ms: started.elapsed().as_millis() as u64,
2469 },
2470 );
2471 did_work = true;
2472 }
2473 }
2474 }
2475
2476 if !append_indices.is_empty() {
2477 let to_append = append_indices.clone();
2478 emit(
2479 progress,
2480 OptimizeEvent::PhaseStart {
2481 table,
2482 phase: OptimizePhase::IndexAppend,
2483 detail: Some(append_indices.join(", ")),
2484 },
2485 );
2486 let started = Instant::now();
2487 dataset
2488 .optimize_indices(&OptimizeOptions::append().index_names(to_append))
2489 .await
2490 .context("optimize_indices(append) failed during index optimize")?;
2491 emit(
2492 progress,
2493 OptimizeEvent::PhaseDone {
2494 table,
2495 phase: OptimizePhase::IndexAppend,
2496 elapsed_ms: started.elapsed().as_millis() as u64,
2497 },
2498 );
2499 tracing::debug!(
2500 target: "pond::perf",
2501 indices = ?append_indices,
2502 "appended trailing fragments into indices",
2503 );
2504 did_work = true;
2505 }
2506
2507 Ok(did_work)
2508}
2509
2510async fn rebuild_index(dataset: &mut Dataset, intent: &IndexIntent) -> Result<()> {
2511 if !intent.trigger.should_create(dataset).await? {
2512 return Ok(());
2513 }
2514 let params = intent.params.build(dataset).await?;
2515 dataset
2516 .create_index(
2517 &[intent.column],
2518 intent.params.index_type(),
2519 Some(intent.name.to_owned()),
2520 params.as_ref(),
2521 true,
2522 )
2523 .await
2524 .with_context(|| format!("failed to rebuild index {}", intent.name))?;
2525 Ok(())
2526}
2527
2528async fn index_status(
2529 table: Table,
2530 dataset: &Dataset,
2531 intents: &[IndexIntent],
2532) -> Result<Vec<IndexStatus>> {
2533 let existing = dataset.load_indices().await?;
2534 let existing_names: std::collections::HashSet<String> =
2535 existing.iter().map(|index| index.name.clone()).collect();
2536 let total_fragments = dataset.get_fragments().len();
2537 let total_rows = dataset.count_rows(None).await?;
2538 let mut statuses = Vec::with_capacity(intents.len());
2539 for intent in intents {
2540 let exists = existing_names.contains(intent.name);
2541 if !exists {
2542 statuses.push(IndexStatus {
2543 table,
2544 intent_name: intent.name.to_owned(),
2545 fragments_covered: 0,
2546 unindexed_fragments: total_fragments,
2547 unindexed_rows: total_rows,
2548 exists,
2549 });
2550 continue;
2551 }
2552 let unindexed = dataset
2553 .unindexed_fragments(intent.name)
2554 .await
2555 .with_context(|| format!("unindexed_fragments failed for {}", table.label()))?;
2556 let unindexed_fragments = unindexed.len();
2557 let unindexed_rows = unindexed
2558 .iter()
2559 .map(|fragment| fragment.num_rows().unwrap_or(0))
2560 .sum();
2561 statuses.push(IndexStatus {
2562 table,
2563 intent_name: intent.name.to_owned(),
2564 fragments_covered: total_fragments.saturating_sub(unindexed_fragments),
2565 unindexed_fragments,
2566 unindexed_rows,
2567 exists,
2568 });
2569 }
2570 Ok(statuses)
2571}
2572
2573async fn open_or_create_via_ns(
2585 nm: &Arc<dyn LanceNamespace>,
2586 nm_ident: &NamespaceIdent,
2587 table_name: &str,
2588 schema: lance::deps::arrow_schema::SchemaRef,
2589 session: &Arc<Session>,
2590 storage_options: &HashMap<String, String>,
2591) -> Result<Dataset> {
2592 let table_id = nm_ident.as_table_id(table_name);
2593
2594 let request = DescribeTableRequest {
2595 id: Some(table_id.clone()),
2596 ..Default::default()
2597 };
2598 match nm.describe_table(request).await {
2599 Ok(response) => {
2600 let location = response.location.with_context(|| {
2601 format!("namespace returned no location for table {table_name}")
2602 })?;
2603 let mut builder = DatasetBuilder::from_uri(&location).with_session(session.clone());
2604 if !storage_options.is_empty() {
2605 builder = builder.with_storage_options(storage_options.clone());
2606 }
2607 let dataset = builder
2608 .load()
2609 .await
2610 .with_context(|| format!("failed to open table {table_name}"))?;
2611 ensure_schema_matches(&dataset, schema.as_ref(), table_name)?;
2612 return Ok(dataset);
2613 }
2614 Err(error) => match &error {
2615 error if is_namespace_error_code(error, ErrorCode::TableNotFound) => {
2616 }
2618 _ => {
2619 return Err(anyhow::Error::from(error))
2620 .with_context(|| format!("failed to describe table {table_name}"));
2621 }
2622 },
2623 }
2624
2625 let mut write_params = sessions::write_params_for_create();
2628 write_params.session = Some(session.clone());
2629 write_params.mode = WriteMode::Create;
2630 if !storage_options.is_empty() {
2631 write_params.store_params = Some(ObjectStoreParams {
2632 storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::with_static_options(
2633 storage_options.clone(),
2634 ))),
2635 ..Default::default()
2636 });
2637 }
2638 let reader = sessions::empty_reader(schema)?;
2639 Dataset::write_into_namespace(reader, nm.clone(), table_id, Some(write_params))
2640 .await
2641 .with_context(|| format!("failed to create table {table_name}"))
2642}
2643
2644fn is_namespace_error_code(error: &lance::Error, code: ErrorCode) -> bool {
2648 if !matches!(error, lance::Error::Namespace { .. }) {
2649 return false;
2650 }
2651 std::iter::successors(Some(error as &(dyn std::error::Error + 'static)), |link| {
2652 link.source()
2653 })
2654 .filter_map(|link| link.downcast_ref::<NamespaceError>())
2655 .any(|inner| inner.code() == code)
2656}
2657
2658fn scanner_with_prefilter(
2659 dataset: &Dataset,
2660 predicate: Option<&Predicate>,
2661) -> Result<lance::dataset::scanner::Scanner> {
2662 let mut scanner = dataset.scan();
2663 scanner.prefilter(true);
2664 if let Some(predicate) = predicate {
2665 let filter = predicate.to_lance();
2666 if !filter.is_empty() {
2667 scanner.filter(&filter)?;
2668 }
2669 }
2670 Ok(scanner)
2671}
2672fn ensure_schema_matches(
2673 dataset: &Dataset,
2674 expected: &lance::deps::arrow_schema::Schema,
2675 table_name: &str,
2676) -> Result<()> {
2677 use lance::deps::arrow_schema::DataType;
2678 use std::collections::BTreeSet;
2679 let actual = lance::deps::arrow_schema::Schema::from(dataset.schema());
2680 let actual_names: BTreeSet<&str> = actual.fields().iter().map(|f| f.name().as_str()).collect();
2681 let expected_names: BTreeSet<&str> = expected
2682 .fields()
2683 .iter()
2684 .map(|f| f.name().as_str())
2685 .collect();
2686 if actual_names != expected_names {
2687 anyhow::bail!(
2688 "table {table_name} has columns {actual_names:?} but this pond build expects \
2689 {expected_names:?} - the on-disk store predates a schema change; delete the \
2690 data directory and re-run `pond ingest`",
2691 );
2692 }
2693 for actual_field in actual.fields() {
2698 let Some(expected_field) = expected.field_with_name(actual_field.name()).ok() else {
2699 continue;
2700 };
2701 if let (DataType::FixedSizeList(_, actual_dim), DataType::FixedSizeList(_, expected_dim)) =
2702 (actual_field.data_type(), expected_field.data_type())
2703 && actual_dim != expected_dim
2704 {
2705 tracing::warn!(
2706 table = table_name,
2707 column = actual_field.name(),
2708 actual_dim,
2709 expected_dim,
2710 "embedding dimension differs from config; open proceeds because model swaps are operator-driven",
2711 );
2712 }
2713 }
2714 Ok(())
2715}
2716fn apply_remote_storage_defaults(options: &mut HashMap<String, String>) {
2723 fn set_default(options: &mut HashMap<String, String>, aliases: &[&str], value: &str) {
2724 if aliases
2725 .iter()
2726 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)))
2727 {
2728 return;
2729 }
2730 options.insert(aliases[0].to_owned(), value.to_owned());
2731 }
2732 set_default(options, &["pool_idle_timeout"], "300 seconds");
2733 set_default(options, &["connect_timeout"], "10 seconds");
2734 let has_custom_endpoint = ["aws_endpoint", "endpoint"]
2735 .iter()
2736 .any(|alias| options.keys().any(|k| k.eq_ignore_ascii_case(alias)));
2737 if has_custom_endpoint {
2738 set_default(
2739 options,
2740 &["aws_unsigned_payload", "unsigned_payload"],
2741 "true",
2742 );
2743 }
2744}
2745
2746fn quoted_string(value: &str) -> String {
2747 format!("'{}'", value.replace('\'', "''"))
2748}
2749fn like_contains(value: &str) -> String {
2750 let escaped = value
2751 .replace('\\', "\\\\")
2752 .replace('%', "\\%")
2753 .replace('_', "\\_")
2754 .replace('\'', "''");
2755 format!("'%{escaped}%'")
2756}
2757
2758#[cfg(test)]
2759mod tests {
2760 #![allow(clippy::expect_used, clippy::unwrap_used)]
2761
2762 use super::*;
2763 use tempfile::TempDir;
2764
2765 fn set(scope: Option<&str>) -> CredsSet {
2766 CredsSet {
2767 scope: scope.map(str::to_owned),
2768 access_key_id: Some("AKIA".to_owned()),
2769 secret_access_key: Some("shh".to_owned()),
2770 ..CredsSet::default()
2771 }
2772 }
2773
2774 fn opts(resolved: &ResolvedStorage, key: &str) -> Option<String> {
2775 resolved.options.get(key).cloned()
2776 }
2777
2778 #[test]
2779 fn storage_url_translation_table() {
2780 let local = StorageUrl::parse("/srv/pond").unwrap();
2783 assert_eq!(local.lance_url().as_str(), "file:///srv/pond/");
2784 assert!(local.is_local());
2785 assert!(local.scheme_options.is_empty());
2786 let aws = StorageUrl::parse("s3://bucket/prefix").unwrap();
2788 assert_eq!(aws.lance_url().as_str(), "s3://bucket/prefix");
2789 assert!(aws.scheme_options.is_empty());
2790 let fat = StorageUrl::parse("s3+https://nbg1.example.com/my-pond/sub").unwrap();
2795 assert_eq!(fat.lance_url().as_str(), "s3://my-pond/sub");
2796 assert_eq!(
2797 fat.scheme_options,
2798 vec![
2799 ("allow_http", "false".to_owned()),
2800 ("virtual_hosted_style_request", "true".to_owned()),
2801 ("region", "us-east-1".to_owned()),
2802 ],
2803 );
2804 let resolved = fat.resolve(&BTreeMap::new()).unwrap();
2805 assert_eq!(
2806 opts(&resolved, "endpoint").as_deref(),
2807 Some("https://my-pond.nbg1.example.com"),
2808 );
2809 assert_eq!(opts(&resolved, "region").as_deref(), Some("us-east-1"));
2810 let plain = StorageUrl::parse("s3+http://127.0.0.1:9000/pond").unwrap();
2813 assert_eq!(plain.lance_url().as_str(), "s3://pond/");
2814 assert_eq!(plain.scheme_options[0], ("allow_http", "true".to_owned()));
2815 assert_eq!(
2816 plain.scheme_options[1],
2817 ("virtual_hosted_style_request", "false".to_owned()),
2818 );
2819 let resolved = plain.resolve(&BTreeMap::new()).unwrap();
2820 assert_eq!(
2821 opts(&resolved, "endpoint").as_deref(),
2822 Some("http://127.0.0.1:9000"),
2823 );
2824 let mut pinned = BTreeMap::new();
2826 pinned.insert(
2827 "default".to_owned(),
2828 CredsSet {
2829 extra: [(
2830 "endpoint".to_owned(),
2831 "https://pinned.example.com".to_owned(),
2832 )]
2833 .into_iter()
2834 .collect(),
2835 ..CredsSet::default()
2836 },
2837 );
2838 let resolved = fat.resolve(&pinned).unwrap();
2839 assert_eq!(
2840 opts(&resolved, "endpoint").as_deref(),
2841 Some("https://pinned.example.com"),
2842 );
2843 let gcs = StorageUrl::parse("gs://bucket/p").unwrap();
2845 assert_eq!(gcs.lance_url().as_str(), "gs://bucket/p");
2846 let azure = StorageUrl::parse("az://acct/container/p").unwrap();
2848 assert_eq!(azure.lance_url().as_str(), "az://container/p");
2849 assert_eq!(
2850 azure.scheme_options,
2851 vec![("account_name", "acct".to_owned())]
2852 );
2853 let shared = StorageUrl::parse("shared-memory://pond-test-x/").unwrap();
2855 assert_eq!(shared.lance_url().as_str(), "shared-memory://pond-test-x/");
2856 }
2857
2858 #[test]
2859 fn storage_url_rejects_bad_shapes() {
2860 let err = StorageUrl::parse("s3+https://user:pass@host/bucket")
2862 .expect_err("userinfo must be rejected")
2863 .to_string();
2864 assert!(
2865 err.contains("creds"),
2866 "error must name the alternative: {err}"
2867 );
2868 assert!(StorageUrl::parse("s3+https://host").is_err());
2870 assert!(StorageUrl::parse("az://acct").is_err());
2871 let err = StorageUrl::parse("ftp://host/x")
2873 .expect_err("ftp")
2874 .to_string();
2875 assert!(err.contains("s3+https"), "got: {err}");
2876 let err = StorageUrl::parse("s3://b/p?regoin=x")
2878 .expect_err("typo")
2879 .to_string();
2880 assert!(err.contains("regoin"), "got: {err}");
2881 let err = StorageUrl::parse("memory://x?creds=y")
2884 .expect_err("memory query")
2885 .to_string();
2886 assert!(err.contains("query params"), "got: {err}");
2887 let err = StorageUrl::parse("file:///x?creds=y")
2888 .expect_err("file query")
2889 .to_string();
2890 assert!(err.contains("query params"), "got: {err}");
2891 assert!(StorageUrl::parse("/tmp/a?b").is_ok());
2893 }
2894
2895 #[test]
2896 fn storage_url_canonicalizes_ports_and_keeps_percent_encoding() {
2897 let with_port = StorageUrl::parse("s3+https://host:443/bucket/p").unwrap();
2899 let without = StorageUrl::parse("s3+https://host/bucket/p").unwrap();
2900 assert_eq!(with_port.canonical(), without.canonical());
2901 let odd = StorageUrl::parse("s3+https://host:8443/bucket").unwrap();
2903 let resolved = odd.resolve(&BTreeMap::new()).unwrap();
2904 assert_eq!(
2905 resolved.options.get("endpoint").map(String::as_str),
2906 Some("https://bucket.host:8443"),
2907 );
2908 let encoded = StorageUrl::parse("s3+https://host/bucket/pre%20fix").unwrap();
2910 assert_eq!(encoded.lance_url().as_str(), "s3://bucket/pre%20fix");
2911 }
2912
2913 #[test]
2914 fn query_params_strip_and_apply_over_set_fields() {
2915 let mut creds = BTreeMap::new();
2916 creds.insert(
2917 "default".to_owned(),
2918 CredsSet {
2919 region: Some("from-set".to_owned()),
2920 virtual_hosted_style_request: Some(false),
2921 ..set(None)
2922 },
2923 );
2924 let url = StorageUrl::parse(
2925 "s3+https://host/bucket/p?region=from-query&virtual_hosted_style_request=true",
2926 )
2927 .unwrap();
2928 assert_eq!(url.lance_url().as_str(), "s3://bucket/p");
2930 assert!(url.canonical().query().is_none());
2931 let resolved = url.resolve(&creds).unwrap();
2932 assert_eq!(opts(&resolved, "region").as_deref(), Some("from-query"));
2934 assert_eq!(
2935 opts(&resolved, "virtual_hosted_style_request").as_deref(),
2936 Some("true"),
2937 );
2938 assert_eq!(
2940 opts(&resolved, "endpoint").as_deref(),
2941 Some("https://bucket.host"),
2942 );
2943 }
2944
2945 #[test]
2946 fn scope_matching_binds_by_longest_prefix_at_segment_boundaries() {
2947 let mut creds = BTreeMap::new();
2948 creds.insert("all".to_owned(), set(None));
2949 creds.insert("bucket".to_owned(), set(Some("s3+https://host/pond/")));
2950 creds.insert("deep".to_owned(), set(Some("s3+https://host/pond/sub")));
2951
2952 let bind = |input: &str| {
2953 StorageUrl::parse(input)
2954 .unwrap()
2955 .resolve(&creds)
2956 .unwrap()
2957 .binding
2958 };
2959 assert_eq!(
2961 bind("s3+https://host/pond/sub/x"),
2962 CredsBinding::Set {
2963 name: "deep".to_owned(),
2964 via: BindVia::Scope
2965 },
2966 );
2967 assert_eq!(
2968 bind("s3+https://host/pond/other"),
2969 CredsBinding::Set {
2970 name: "bucket".to_owned(),
2971 via: BindVia::Scope
2972 },
2973 );
2974 assert_eq!(
2976 bind("s3+https://host/pond-2"),
2977 CredsBinding::Set {
2978 name: "all".to_owned(),
2979 via: BindVia::CatchAll
2980 },
2981 );
2982 assert_eq!(
2984 bind("s3://pond/sub"),
2985 CredsBinding::Set {
2986 name: "all".to_owned(),
2987 via: BindVia::CatchAll
2988 },
2989 );
2990 assert_eq!(
2992 bind("s3+https://host:443/pond/x"),
2993 CredsBinding::Set {
2994 name: "bucket".to_owned(),
2995 via: BindVia::Scope
2996 },
2997 );
2998 assert_eq!(
3000 bind("s3+https://host/pond/sub/x?creds=all"),
3001 CredsBinding::Set {
3002 name: "all".to_owned(),
3003 via: BindVia::Pointer
3004 },
3005 );
3006 let err = StorageUrl::parse("s3://b/p?creds=nope")
3008 .unwrap()
3009 .resolve(&creds)
3010 .expect_err("missing set")
3011 .to_string();
3012 assert!(err.contains("creds=nope"), "got: {err}");
3013
3014 let empty = BTreeMap::new();
3016 assert_eq!(
3017 StorageUrl::parse("s3://b/p")
3018 .unwrap()
3019 .resolve(&empty)
3020 .unwrap()
3021 .binding,
3022 CredsBinding::Ambient,
3023 );
3024 assert_eq!(
3025 StorageUrl::parse("/srv/pond")
3026 .unwrap()
3027 .resolve(&creds)
3028 .unwrap()
3029 .binding,
3030 CredsBinding::NotApplicable,
3031 );
3032 }
3033
3034 #[test]
3035 fn unmatched_sets_are_reported_only_on_remote_invocations() {
3036 let mut creds = BTreeMap::new();
3037 creds.insert("used".to_owned(), set(Some("s3://bucket/")));
3038 creds.insert("idle".to_owned(), set(Some("s3://other/")));
3039
3040 let remote = StorageUrl::parse("s3://bucket/p")
3041 .unwrap()
3042 .resolve(&creds)
3043 .unwrap();
3044 assert_eq!(unmatched_creds_sets(&[&remote], &creds), vec!["idle"]);
3045
3046 let local = StorageUrl::parse("/srv/pond")
3048 .unwrap()
3049 .resolve(&creds)
3050 .unwrap();
3051 assert!(unmatched_creds_sets(&[&local], &creds).is_empty());
3052 }
3053
3054 #[test]
3055 fn secrets_materialize_from_file_and_command() {
3056 let dir = TempDir::new().unwrap();
3057 let key_path = dir.path().join("key");
3058 std::fs::write(&key_path, "from-file\n").unwrap();
3059 let mut creds = BTreeMap::new();
3060 creds.insert(
3061 "default".to_owned(),
3062 CredsSet {
3063 access_key_id_file: Some(key_path),
3064 secret_access_key_command: Some("printf 'from-command\\n\\n'".to_owned()),
3066 ..CredsSet::default()
3067 },
3068 );
3069 let url = StorageUrl::parse("s3://bucket/p").unwrap();
3070 let resolved = url.resolve(&creds).unwrap();
3071 assert_eq!(
3072 opts(&resolved, "access_key_id").as_deref(),
3073 Some("from-file")
3074 );
3075 assert_eq!(
3076 opts(&resolved, "secret_access_key").as_deref(),
3077 Some("from-command\n"),
3078 );
3079
3080 let mut failing = BTreeMap::new();
3082 failing.insert(
3083 "default".to_owned(),
3084 CredsSet {
3085 secret_access_key_command: Some("exit 3".to_owned()),
3086 ..CredsSet::default()
3087 },
3088 );
3089 let err = url
3090 .resolve(&failing)
3091 .expect_err("command must fail")
3092 .to_string();
3093 assert!(err.contains("exit 3"), "got: {err}");
3094
3095 let marker = dir.path().join("runs");
3097 let command = format!("echo run >> {} && echo secret", marker.display());
3098 let mut counted = BTreeMap::new();
3099 counted.insert(
3100 "default".to_owned(),
3101 CredsSet {
3102 secret_access_key_command: Some(command),
3103 ..CredsSet::default()
3104 },
3105 );
3106 url.resolve(&counted).unwrap();
3107 url.resolve(&counted).unwrap();
3108 let runs = std::fs::read_to_string(&marker).unwrap();
3109 assert_eq!(runs.lines().count(), 1, "command must run exactly once");
3110 }
3111
3112 #[test]
3113 fn check_errors_classify_by_kind_and_binding() {
3114 let auth_error = || object_store::Error::Unauthenticated {
3115 path: "k".to_owned(),
3116 source: "denied".into(),
3117 };
3118 let bound = CredsBinding::Set {
3119 name: "work".to_owned(),
3120 via: BindVia::Scope,
3121 };
3122 match classify_check_error(auth_error(), &bound, "put") {
3124 CheckFailure::Auth { set, .. } => assert_eq!(set, "work"),
3125 other => panic!("want Auth, got {other:?}"),
3126 }
3127 assert!(matches!(
3129 classify_check_error(auth_error(), &CredsBinding::Ambient, "put"),
3130 CheckFailure::NoCreds { .. },
3131 ));
3132 let denied = object_store::Error::PermissionDenied {
3133 path: "k".to_owned(),
3134 source: "403".into(),
3135 };
3136 assert!(matches!(
3137 classify_check_error(denied, &bound, "put"),
3138 CheckFailure::Auth { .. },
3139 ));
3140 let missing = object_store::Error::NotFound {
3142 path: "k".to_owned(),
3143 source: "404".into(),
3144 };
3145 assert!(matches!(
3146 classify_check_error(missing, &bound, "get"),
3147 CheckFailure::Io { .. },
3148 ));
3149 let no_creds = || object_store::Error::Generic {
3153 store: "S3",
3154 source: "Failed to get AWS credentials: CredentialsNotLoaded".into(),
3155 };
3156 assert!(matches!(
3157 classify_check_error(no_creds(), &bound, "put"),
3158 CheckFailure::Auth { .. },
3159 ));
3160 assert!(matches!(
3161 classify_check_error(no_creds(), &CredsBinding::Ambient, "put"),
3162 CheckFailure::NoCreds { .. },
3163 ));
3164 }
3165
3166 #[test]
3167 fn concise_cause_strips_upstream_noise_to_one_line() {
3168 let inner = "Encountered internal error. Please file a bug report at \
3171 https://github.com/lance-format/lance/issues. Failed to get AWS \
3172 credentials: CredentialsNotLoaded, <WORKSPACE>/src/object_store/providers/aws.rs:401:21: \
3173 Encountered internal error. Please file a bug report at \
3174 https://github.com/lance-format/lance/issues. Failed to get AWS \
3175 credentials: CredentialsNotLoaded";
3176 let failure = CheckFailure::NoCreds {
3177 source: anyhow!(inner.to_owned()).context("initial conditional put"),
3178 };
3179 let cause = failure.concise_cause().expect("auth-class carries a cause");
3180 assert_eq!(cause, "Failed to get AWS credentials: CredentialsNotLoaded");
3181 assert!(
3183 !failure.to_string().contains("file a bug report"),
3184 "lead must not trail the chain: {failure}"
3185 );
3186 let occ = CheckFailure::OccUnsupported {
3188 detail: "put-if-none-match ignored".to_owned(),
3189 };
3190 assert!(occ.concise_cause().is_none());
3191 let long = CheckFailure::Io {
3194 source: anyhow!(format!("{} dns error: lookup failed", "x".repeat(500))),
3195 };
3196 let cause = long.concise_cause().expect("io carries a cause");
3197 assert!(cause.contains(" ... "), "long causes truncate: {cause}");
3198 assert!(
3199 cause.ends_with("dns error: lookup failed"),
3200 "the tail survives: {cause}"
3201 );
3202 }
3203
3204 #[tokio::test]
3205 async fn storage_check_passes_on_memory_backend() {
3206 let resolved = StorageUrl::parse("memory://check/probe")
3207 .unwrap()
3208 .resolve(&BTreeMap::new())
3209 .unwrap();
3210 storage_check(&resolved).await.expect("memory probe passes");
3211 }
3212
3213 fn stat(bytes: u64) -> FragmentStat {
3214 FragmentStat {
3215 bytes: Some(bytes),
3216 rows: bytes / 1_000,
3217 deleted_rows: 0,
3218 }
3219 }
3220
3221 #[test]
3222 fn compaction_veto_blocks_absorb_keeps_peers() {
3223 let absorb = [stat(665_000_000), stat(1_000_000), stat(2_000_000)];
3225 assert!(!keep_task(&absorb, 64, 0.1));
3226 let peers = [stat(300_000_000), stat(300_000_000)];
3228 assert!(keep_task(&peers, 64, 0.1));
3229 let tiered = [stat(400_000), stat(60_000), stat(40_000)];
3231 assert!(keep_task(&tiered, 64, 0.1));
3232 }
3233
3234 #[test]
3235 fn compaction_veto_passes_deletions_and_cap() {
3236 let mut deleting = stat(665_000_000);
3237 deleting.deleted_rows = deleting.rows / 5;
3238 assert!(keep_task(&[deleting, stat(1_000)], 64, 0.1));
3239
3240 let wide: Vec<FragmentStat> = std::iter::once(stat(665_000_000))
3241 .chain(std::iter::repeat_with(|| stat(1_000)).take(63))
3242 .collect();
3243 assert!(keep_task(&wide, 64, 0.1));
3244 }
3245
3246 #[test]
3247 fn compaction_veto_falls_back_to_rows_on_unknown_sizes() {
3248 let mut unknown = stat(665_000_000);
3249 unknown.bytes = None;
3250 assert!(!keep_task(
3252 &[unknown, stat(1_000_000), stat(2_000_000)],
3253 64,
3254 0.1
3255 ));
3256 }
3257
3258 #[test]
3259 fn derived_target_rows_tracks_row_size_and_clamps() {
3260 let parts_like = [FragmentStat {
3262 bytes: Some(665_000_000),
3263 rows: 511_000,
3264 deleted_rows: 0,
3265 }];
3266 let target = derived_target_rows(&parts_like);
3267 assert!((150_000..300_000).contains(&target), "{target}");
3268 let unknown = [FragmentStat {
3270 bytes: None,
3271 rows: 511_000,
3272 deleted_rows: 0,
3273 }];
3274 assert_eq!(
3275 derived_target_rows(&unknown),
3276 MAX_TARGET_ROWS_PER_FRAGMENT as usize
3277 );
3278 let tiny = [FragmentStat {
3280 bytes: Some(1_000_000),
3281 rows: 100_000,
3282 deleted_rows: 0,
3283 }];
3284 assert_eq!(
3285 derived_target_rows(&tiny),
3286 MAX_TARGET_ROWS_PER_FRAGMENT as usize
3287 );
3288 let huge = [FragmentStat {
3289 bytes: Some(1_000_000_000),
3290 rows: 100,
3291 deleted_rows: 0,
3292 }];
3293 assert_eq!(
3294 derived_target_rows(&huge),
3295 MIN_TARGET_ROWS_PER_FRAGMENT as usize
3296 );
3297 }
3298
3299 #[test]
3300 fn namespace_error_code_walks_wrapped_chain() {
3301 let direct = lance::Error::namespace_source(Box::new(NamespaceError::TableNotFound {
3302 message: "missing".into(),
3303 }));
3304 assert!(is_namespace_error_code(&direct, ErrorCode::TableNotFound));
3305
3306 let wrapped = lance::Error::namespace_source(Box::new(direct));
3307 assert!(is_namespace_error_code(&wrapped, ErrorCode::TableNotFound));
3308
3309 let other_code =
3310 lance::Error::namespace_source(Box::new(NamespaceError::NamespaceNotFound {
3311 message: "nope".into(),
3312 }));
3313 assert!(!is_namespace_error_code(
3314 &other_code,
3315 ErrorCode::TableNotFound
3316 ));
3317
3318 let not_namespace = lance::Error::internal("unrelated");
3319 assert!(!is_namespace_error_code(
3320 ¬_namespace,
3321 ErrorCode::TableNotFound
3322 ));
3323 }
3324
3325 #[tokio::test]
3329 async fn store_opens_via_namespace_and_scan_works() -> Result<()> {
3330 let temp = TempDir::new()?;
3331 let url = Url::from_directory_path(temp.path())
3332 .map_err(|()| anyhow::anyhow!("temp path is not absolute"))?;
3333 let handle = Handle::open(&url).await?;
3334 let cases: [(Table, &[&str]); 3] = [
3337 (Table::Sessions, &["id"]),
3338 (Table::Messages, &["id"]),
3339 (Table::Parts, &["id"]),
3340 ];
3341 for (table, projection) in cases {
3342 let scanner = handle
3343 .scan(table, ScanOpts::project_only(projection))
3344 .await?;
3345 let batch = scanner.try_into_batch().await?;
3346 assert_eq!(batch.num_rows(), 0, "fresh table should be empty");
3347 }
3348 Ok(())
3349 }
3350}