use std::{
collections::HashMap, num::NonZeroU16, ops::Bound, path::PathBuf, sync::OnceLock,
};
use itertools::Either;
use regex::bytes::Regex;
use serde::{Deserialize, Serialize};
use crate::{
format::Path,
storage::{self, RetriesSettings},
virtual_chunks::VirtualChunkContainer,
};
pub use crate::storage::s3_config::{
S3Credentials, S3CredentialsFetcher, S3Options, S3StaticCredentials,
};
#[cfg(feature = "object-store-azure")]
pub use crate::storage::{
AzureCredentials, AzureCredentialsFetcher, AzureRefreshableCredential,
AzureStaticCredentials,
};
#[cfg(feature = "object-store-gcs")]
pub use crate::storage::{
GcsBearerCredential, GcsCredentials, GcsCredentialsFetcher, GcsStaticCredentials,
};
#[cfg(feature = "object-store-gcs")]
pub use icechunk_arrow_object_store::object_store::gcp::GcpCredential;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ObjectStoreConfig {
InMemory,
#[cfg(feature = "object-store-fs")]
LocalFileSystem(PathBuf),
#[cfg(feature = "object-store-http")]
Http(HashMap<String, String>),
S3Compatible(S3Options),
S3(S3Options),
#[cfg(feature = "object-store-gcs")]
Gcs(HashMap<String, String>),
#[cfg(feature = "object-store-azure")]
Azure(HashMap<String, String>),
Tigris(S3Options),
}
#[derive(Debug, PartialEq, Eq, Default, Serialize, Deserialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum CompressionAlgorithm {
#[default]
Zstd,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy, Default)]
pub struct CompressionConfig {
#[serde(default)]
pub algorithm: Option<CompressionAlgorithm>,
#[serde(default)]
pub level: Option<u8>,
}
impl CompressionConfig {
pub fn algorithm(&self) -> CompressionAlgorithm {
self.algorithm.unwrap_or_default()
}
pub fn level(&self) -> u8 {
self.level.unwrap_or(3)
}
pub fn merge(&self, other: Self) -> Self {
Self {
algorithm: other.algorithm.or(self.algorithm),
level: other.level.or(self.level),
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy, Default)]
pub struct CachingConfig {
#[serde(default)]
pub num_snapshot_nodes: Option<u64>,
#[serde(default)]
pub num_chunk_refs: Option<u64>,
#[serde(default)]
pub num_transaction_changes: Option<u64>,
#[serde(default)]
pub num_bytes_attributes: Option<u64>,
#[serde(default)]
pub num_bytes_chunks: Option<u64>,
}
impl CachingConfig {
pub fn num_snapshot_nodes(&self) -> u64 {
self.num_snapshot_nodes.unwrap_or(500_000)
}
pub fn num_chunk_refs(&self) -> u64 {
self.num_chunk_refs.unwrap_or(15_000_000)
}
pub fn num_transaction_changes(&self) -> u64 {
self.num_transaction_changes.unwrap_or(0)
}
pub fn num_bytes_attributes(&self) -> u64 {
self.num_bytes_attributes.unwrap_or(0)
}
pub fn num_bytes_chunks(&self) -> u64 {
self.num_bytes_chunks.unwrap_or(0)
}
pub fn merge(&self, other: Self) -> Self {
Self {
num_snapshot_nodes: other.num_snapshot_nodes.or(self.num_snapshot_nodes),
num_chunk_refs: other.num_chunk_refs.or(self.num_chunk_refs),
num_transaction_changes: other
.num_transaction_changes
.or(self.num_transaction_changes),
num_bytes_attributes: other
.num_bytes_attributes
.or(self.num_bytes_attributes),
num_bytes_chunks: other.num_bytes_chunks.or(self.num_bytes_chunks),
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Hash, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum ManifestSplitCondition {
Or(Vec<ManifestSplitCondition>),
And(Vec<ManifestSplitCondition>),
PathMatches { regex: String },
NameMatches { regex: String },
AnyArray,
}
impl ManifestSplitCondition {
pub fn matches(&self, path: &Path) -> bool {
use ManifestSplitCondition::*;
match self {
AnyArray => true,
Or(vec) => vec.iter().any(|c| c.matches(path)),
And(vec) => vec.iter().all(|c| c.matches(path)),
PathMatches { regex } => Regex::new(regex)
.map(|regex| regex.is_match(path.to_string().as_bytes()))
.unwrap_or(false),
NameMatches { regex } => Regex::new(regex)
.map(|regex| {
path.name()
.map(|name| regex.is_match(name.as_bytes()))
.unwrap_or(false)
})
.unwrap_or(false),
}
}
}
#[derive(Debug, Hash, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub enum ManifestSplitDimCondition {
Axis(usize),
DimensionName(String),
Any,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct ManifestSplitDim {
pub condition: ManifestSplitDimCondition,
pub num_chunks: u32,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
pub struct ManifestSplittingConfig {
pub split_sizes: Option<Vec<(ManifestSplitCondition, Vec<ManifestSplitDim>)>>,
}
impl Default for ManifestSplittingConfig {
fn default() -> Self {
let inner = vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: u32::MAX,
}];
let new = vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
inner,
)];
Self { split_sizes: Some(new) }
}
}
impl ManifestSplittingConfig {
pub fn with_size(split_size: u32) -> Self {
let split_sizes = vec![(
ManifestSplitCondition::PathMatches { regex: r".*".to_string() },
vec![ManifestSplitDim {
condition: ManifestSplitDimCondition::Any,
num_chunks: split_size,
}],
)];
ManifestSplittingConfig { split_sizes: Some(split_sizes) }
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone)]
#[serde(rename_all = "snake_case")]
pub enum ManifestPreloadCondition {
Or(Vec<ManifestPreloadCondition>),
And(Vec<ManifestPreloadCondition>),
PathMatches { regex: String },
NameMatches { regex: String },
NumRefs { from: Bound<u32>, to: Bound<u32> },
True,
False,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)]
pub struct ManifestPreloadConfig {
pub max_total_refs: Option<u32>,
pub preload_if: Option<ManifestPreloadCondition>,
pub max_arrays_to_scan: Option<u32>,
}
impl ManifestPreloadConfig {
pub fn merge(&self, other: Self) -> Self {
Self {
max_total_refs: other.max_total_refs.or(self.max_total_refs),
preload_if: other.preload_if.or(self.preload_if.clone()),
max_arrays_to_scan: other.max_arrays_to_scan.or(self.max_arrays_to_scan),
}
}
pub fn max_total_refs(&self) -> u32 {
self.max_total_refs.unwrap_or(10_000)
}
pub fn max_arrays_to_scan(&self) -> u32 {
self.max_arrays_to_scan.unwrap_or(50)
}
pub fn preload_if(&self) -> &ManifestPreloadCondition {
self.preload_if.as_ref().unwrap_or_else(|| {
DEFAULT_MANIFEST_PRELOAD_CONDITION.get_or_init(|| {
ManifestPreloadCondition::And(vec![
ManifestPreloadCondition::Or(vec![
ManifestPreloadCondition::NameMatches {
regex: r#"^\bt\b$|^(time|min|hour|day|week|month|year)[0-9]*$"#.to_string(), },
ManifestPreloadCondition::NameMatches {
regex: r#"^(z|nav_lev|gdep|lv_|[o]*lev|bottom_top|sigma|h(ei)?ght|altitude|depth|isobaric|pres|isotherm)[a-z_]*[0-9]*$"#.to_string(),
},
ManifestPreloadCondition::NameMatches {
regex: r#"^(y|j|nlat|rlat|nj)$"#.to_string(), },
ManifestPreloadCondition::NameMatches {
regex: r#"^y?(nav_lat|lat|gphi)[a-z0-9]*$"#.to_string(), },
ManifestPreloadCondition::NameMatches {
regex: r#"^x?(nav_lon|lon|glam)[a-z0-9]*$"#.to_string(), },
ManifestPreloadCondition::NameMatches {
regex: r#"^(x|i|nlon|rlon|ni)$"#.to_string(), },
]),
ManifestPreloadCondition::NumRefs {
from: Bound::Unbounded,
to: Bound::Included(1000),
},
])
})
})
}
}
static DEFAULT_MANIFEST_PRELOAD_CONDITION: OnceLock<ManifestPreloadCondition> =
OnceLock::new();
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Copy, Default)]
pub struct ManifestVirtualChunkLocationCompressionConfig {
#[serde(default)]
pub min_num_chunks: Option<u16>,
#[serde(default)]
pub dictionary_max_training_samples: Option<u16>,
#[serde(default)]
pub dictionary_max_size_bytes: Option<u32>,
#[serde(default)]
pub compression_level: Option<i32>,
}
impl ManifestVirtualChunkLocationCompressionConfig {
pub fn min_num_chunks(&self) -> u16 {
self.min_num_chunks.unwrap_or(1000)
}
pub fn dictionary_max_training_samples(&self) -> u16 {
self.dictionary_max_training_samples.unwrap_or(100)
}
pub fn dictionary_max_size_bytes(&self) -> u32 {
self.dictionary_max_size_bytes.unwrap_or(2 * 1024)
}
pub fn compression_level(&self) -> i32 {
self.compression_level.unwrap_or(3)
}
pub fn merge(&self, other: Self) -> Self {
Self {
min_num_chunks: other.min_num_chunks.or(self.min_num_chunks),
dictionary_max_training_samples: other
.dictionary_max_training_samples
.or(self.dictionary_max_training_samples),
dictionary_max_size_bytes: other
.dictionary_max_size_bytes
.or(self.dictionary_max_size_bytes),
compression_level: other.compression_level.or(self.compression_level),
}
}
}
impl From<&ManifestVirtualChunkLocationCompressionConfig>
for crate::format::manifest::LocationCompressionConfig
{
fn from(c: &ManifestVirtualChunkLocationCompressionConfig) -> Self {
Self {
min_num_chunks: c.min_num_chunks(),
dictionary_max_training_samples: c.dictionary_max_training_samples(),
dictionary_max_size_bytes: c.dictionary_max_size_bytes(),
compression_level: c.compression_level(),
}
}
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, Default)]
pub struct ManifestConfig {
#[serde(default)]
pub preload: Option<ManifestPreloadConfig>,
#[serde(default)]
pub splitting: Option<ManifestSplittingConfig>,
#[serde(default)]
pub virtual_chunk_location_compression:
Option<ManifestVirtualChunkLocationCompressionConfig>,
}
static DEFAULT_MANIFEST_PRELOAD_CONFIG: OnceLock<ManifestPreloadConfig> = OnceLock::new();
static DEFAULT_MANIFEST_SPLITTING_CONFIG: OnceLock<ManifestSplittingConfig> =
OnceLock::new();
static DEFAULT_MANIFEST_VIRTUAL_CHUNK_LOCATION_COMPRESSION_CONFIG: OnceLock<
ManifestVirtualChunkLocationCompressionConfig,
> = OnceLock::new();
impl ManifestConfig {
pub fn merge(&self, other: Self) -> Self {
Self {
preload: other.preload.or(self.preload.clone()),
splitting: other.splitting.or(self.splitting.clone()),
virtual_chunk_location_compression: match (
&self.virtual_chunk_location_compression,
other.virtual_chunk_location_compression,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
}
}
pub fn preload(&self) -> &ManifestPreloadConfig {
self.preload.as_ref().unwrap_or_else(|| {
DEFAULT_MANIFEST_PRELOAD_CONFIG.get_or_init(ManifestPreloadConfig::default)
})
}
pub fn splitting(&self) -> &ManifestSplittingConfig {
self.splitting.as_ref().unwrap_or_else(|| {
DEFAULT_MANIFEST_SPLITTING_CONFIG
.get_or_init(ManifestSplittingConfig::default)
})
}
pub fn virtual_chunk_location_compression(
&self,
) -> &ManifestVirtualChunkLocationCompressionConfig {
self.virtual_chunk_location_compression.as_ref().unwrap_or_else(|| {
DEFAULT_MANIFEST_VIRTUAL_CHUNK_LOCATION_COMPRESSION_CONFIG
.get_or_init(ManifestVirtualChunkLocationCompressionConfig::default)
})
}
pub fn empty() -> Self {
ManifestConfig {
preload: Some(ManifestPreloadConfig {
max_total_refs: None,
preload_if: None,
max_arrays_to_scan: None,
}),
splitting: None,
virtual_chunk_location_compression: None,
}
}
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RepoUpdateRetryConfig {
#[serde(default)]
pub default: Option<RetriesSettings>,
}
static DEFAULT_REPO_UPDATE_RETRIES: OnceLock<RetriesSettings> = OnceLock::new();
impl RepoUpdateRetryConfig {
fn default_retries() -> &'static RetriesSettings {
DEFAULT_REPO_UPDATE_RETRIES.get_or_init(|| RetriesSettings {
max_tries: Some(NonZeroU16::new(100).unwrap_or(NonZeroU16::MIN)),
initial_backoff_ms: Some(50),
max_backoff_ms: Some(30_000),
})
}
pub fn retries(&self) -> &RetriesSettings {
self.default.as_ref().unwrap_or(Self::default_retries())
}
pub fn merge(&self, other: Self) -> Self {
Self {
default: match (&self.default, other.default) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RepositoryConfig {
#[serde(default)]
pub inline_chunk_threshold_bytes: Option<u16>,
#[serde(default)]
pub get_partial_values_concurrency: Option<u16>,
#[serde(default)]
pub compression: Option<CompressionConfig>,
#[serde(default)]
pub max_concurrent_requests: Option<u16>,
#[serde(default)]
pub caching: Option<CachingConfig>,
#[serde(default)]
pub storage: Option<storage::Settings>,
#[serde(default)]
pub virtual_chunk_containers: Option<HashMap<String, VirtualChunkContainer>>,
#[serde(default)]
pub manifest: Option<ManifestConfig>,
#[serde(default)]
pub previous_file: Option<String>,
#[serde(default)]
pub repo_update_retries: Option<RepoUpdateRetryConfig>,
#[serde(default)]
pub num_updates_per_repo_info_file: Option<u16>,
}
static DEFAULT_COMPRESSION: OnceLock<CompressionConfig> = OnceLock::new();
static DEFAULT_CACHING: OnceLock<CachingConfig> = OnceLock::new();
static DEFAULT_MANIFEST_CONFIG: OnceLock<ManifestConfig> = OnceLock::new();
static DEFAULT_REPO_UPDATE_RETRY_CONFIG: OnceLock<RepoUpdateRetryConfig> =
OnceLock::new();
pub const DEFAULT_MAX_CONCURRENT_REQUESTS: u16 = 256;
pub const DEFAULT_NUM_UPDATES_PER_REPO_INFO_FILE: u16 = 1_000;
impl RepositoryConfig {
pub fn inline_chunk_threshold_bytes(&self) -> u16 {
self.inline_chunk_threshold_bytes.unwrap_or(512)
}
pub fn get_partial_values_concurrency(&self) -> u16 {
self.get_partial_values_concurrency.unwrap_or(10)
}
pub fn compression(&self) -> &CompressionConfig {
self.compression.as_ref().unwrap_or_else(|| {
DEFAULT_COMPRESSION.get_or_init(CompressionConfig::default)
})
}
pub fn caching(&self) -> &CachingConfig {
self.caching
.as_ref()
.unwrap_or_else(|| DEFAULT_CACHING.get_or_init(CachingConfig::default))
}
pub fn storage(&self) -> Option<&storage::Settings> {
self.storage.as_ref()
}
pub fn manifest(&self) -> &ManifestConfig {
self.manifest.as_ref().unwrap_or_else(|| {
DEFAULT_MANIFEST_CONFIG.get_or_init(ManifestConfig::default)
})
}
pub fn max_concurrent_requests(&self) -> u16 {
self.max_concurrent_requests.unwrap_or(DEFAULT_MAX_CONCURRENT_REQUESTS)
}
pub fn repo_update_retries(&self) -> &RepoUpdateRetryConfig {
self.repo_update_retries.as_ref().unwrap_or_else(|| {
DEFAULT_REPO_UPDATE_RETRY_CONFIG.get_or_init(RepoUpdateRetryConfig::default)
})
}
pub fn num_updates_per_repo_info_file(&self) -> u16 {
self.num_updates_per_repo_info_file
.unwrap_or(DEFAULT_NUM_UPDATES_PER_REPO_INFO_FILE)
}
pub fn merge(&self, other: Self) -> Self {
Self {
inline_chunk_threshold_bytes: other
.inline_chunk_threshold_bytes
.or(self.inline_chunk_threshold_bytes),
get_partial_values_concurrency: other
.get_partial_values_concurrency
.or(self.get_partial_values_concurrency),
compression: match (&self.compression, other.compression) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
max_concurrent_requests: match (
&self.max_concurrent_requests,
other.max_concurrent_requests,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
caching: match (&self.caching, other.caching) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
storage: match (&self.storage, other.storage) {
(None, None) => None,
(None, Some(s)) => Some(s),
(Some(s), None) => Some(s.clone()),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
virtual_chunk_containers: match (
&self.virtual_chunk_containers,
other.virtual_chunk_containers,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(c.clone()),
(Some(_), Some(theirs)) => Some(theirs),
},
manifest: match (&self.manifest, other.manifest) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(c.clone()),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
previous_file: match (&self.previous_file, other.previous_file) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(c.clone()),
(Some(_), Some(theirs)) => Some(theirs),
},
repo_update_retries: match (
&self.repo_update_retries,
other.repo_update_retries,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(mine), Some(theirs)) => Some(mine.merge(theirs)),
},
num_updates_per_repo_info_file: match (
&self.num_updates_per_repo_info_file,
other.num_updates_per_repo_info_file,
) {
(None, None) => None,
(None, Some(c)) => Some(c),
(Some(c), None) => Some(*c),
(Some(_), Some(theirs)) => Some(theirs),
},
}
}
}
impl RepositoryConfig {
pub fn set_virtual_chunk_container(
&mut self,
cont: VirtualChunkContainer,
) -> Result<(), String> {
let containers =
self.virtual_chunk_containers.get_or_insert_with(Default::default);
if let Some(ref new_name) = cont.name {
containers.retain(|_, v| v.name.as_deref() != Some(new_name.as_str()));
}
containers.insert(cont.url_prefix().to_string(), cont);
Ok(())
}
pub fn get_virtual_chunk_container(
&self,
url_prefix: &str,
) -> Option<&VirtualChunkContainer> {
self.virtual_chunk_containers.as_ref().and_then(|h| h.get(url_prefix))
}
pub fn virtual_chunk_containers(
&self,
) -> impl Iterator<Item = &VirtualChunkContainer> {
match self.virtual_chunk_containers.as_ref() {
Some(h) => Either::Left(h.values()),
None => Either::Right(std::iter::empty()),
}
}
pub fn clear_virtual_chunk_containers(&mut self) {
self.virtual_chunk_containers = Some(Default::default());
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "credential_type")]
#[serde(rename_all = "snake_case")]
pub enum Credentials {
S3(S3Credentials),
#[cfg(feature = "object-store-gcs")]
Gcs(GcsCredentials),
#[cfg(feature = "object-store-azure")]
Azure(AzureCredentials),
}
#[cfg(test)]
mod tests {
use crate::{
ObjectStoreConfig, RepositoryConfig,
config::S3Options,
strategies::{repository_config, s3_static_credentials},
virtual_chunks::VirtualChunkContainer,
};
use icechunk_format::roundtrip_serialization_tests;
use proptest::prelude::*;
roundtrip_serialization_tests!(
test_config_roundtrip - repository_config,
test_s3_static_credentials_roundtrip - s3_static_credentials
);
#[cfg(feature = "object-store-gcs")]
roundtrip_serialization_tests!(
test_gcs_static_credentials_roundtrip - gcs_static_credentials
);
#[cfg(feature = "object-store-gcs")]
use crate::strategies::gcs_static_credentials;
#[cfg(feature = "object-store-azure")]
roundtrip_serialization_tests!(
test_azure_static_credentials_roundtrip - azure_static_credentials
);
#[cfg(feature = "object-store-azure")]
use crate::strategies::azure_static_credentials;
#[icechunk_macros::test]
fn test_merge_replaces_virtual_chunk_containers() {
let mut config1 = RepositoryConfig::default();
config1
.set_virtual_chunk_container(
VirtualChunkContainer::new(
"s3://bucket1/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-east-1".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
let mut config2 = RepositoryConfig::default();
config2.clear_virtual_chunk_containers();
let merged = config1.merge(config2);
assert_eq!(
merged.virtual_chunk_containers,
Some(std::collections::HashMap::new()),
"Merging with cleared VCCs should result in empty VCCs"
);
}
#[icechunk_macros::test]
fn test_merge_replaces_virtual_chunk_containers_with_new_ones() {
let mut config1 = RepositoryConfig::default();
config1
.set_virtual_chunk_container(
VirtualChunkContainer::new(
"s3://bucket1/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-east-1".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
let mut config2 = RepositoryConfig::default();
config2
.set_virtual_chunk_container(
VirtualChunkContainer::new(
"s3://bucket2/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-west-2".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
let merged = config1.merge(config2);
let vccs = merged.virtual_chunk_containers.unwrap();
assert_eq!(vccs.len(), 1, "Should have exactly one VCC after merge");
assert!(
vccs.contains_key("s3://bucket2/"),
"Should contain bucket2, not bucket1"
);
assert!(!vccs.contains_key("s3://bucket1/"), "Should not contain bucket1");
}
#[icechunk_macros::test]
fn test_yaml_config_deserialization() {
let mut yaml_parts = vec![
r#"
inline_chunk_threshold_bytes: null
get_partial_values_concurrency: null
compression: null
max_concurrent_requests: null
caching: null
storage: null
virtual_chunk_containers:
s3://my-s3-bucket/:
name: null
url_prefix: s3://my-s3-bucket/
store: !s3
region: us-east-1
endpoint_url: null
anonymous: false
allow_http: false
force_path_style: false
network_stream_timeout_seconds: 60
requester_pays: false"#
.to_string(),
];
#[cfg(feature = "object-store-http")]
yaml_parts.push(
r#"
https://example.com/data/:
name: null
url_prefix: https://example.com/data/
store: !http {}"#
.to_string(),
);
#[cfg(feature = "object-store-gcs")]
yaml_parts.push(
r#"
gcs://my-gcs-bucket/:
name: null
url_prefix: gcs://my-gcs-bucket/
store: !gcs {}"#
.to_string(),
);
yaml_parts.push("\nmanifest: null\n".to_string());
let yaml = yaml_parts.join("");
let config: RepositoryConfig = serde_yaml_ng::from_str(&yaml).unwrap();
let vccs = config.virtual_chunk_containers.as_ref().unwrap();
let mut expected_len = 1;
let s3 = &vccs["s3://my-s3-bucket/"];
assert_eq!(s3.url_prefix(), "s3://my-s3-bucket/");
match &s3.store {
ObjectStoreConfig::S3(opts) => {
assert_eq!(opts.region, Some("us-east-1".to_string()));
assert!(!opts.anonymous);
assert!(!opts.allow_http);
assert!(!opts.force_path_style);
assert_eq!(opts.network_stream_timeout_seconds, Some(60));
assert!(!opts.requester_pays);
}
other => panic!("Expected S3, got {other:?}"),
}
#[cfg(feature = "object-store-http")]
{
expected_len += 1;
let http = &vccs["https://example.com/data/"];
assert_eq!(http.url_prefix(), "https://example.com/data/");
assert!(matches!(http.store, ObjectStoreConfig::Http(_)));
}
#[cfg(feature = "object-store-gcs")]
{
expected_len += 1;
let gcs = &vccs["gcs://my-gcs-bucket/"];
assert_eq!(gcs.url_prefix(), "gcs://my-gcs-bucket/");
assert!(matches!(gcs.store, ObjectStoreConfig::Gcs(_)));
}
assert_eq!(vccs.len(), expected_len);
assert!(config.inline_chunk_threshold_bytes.is_none());
assert!(config.get_partial_values_concurrency.is_none());
assert!(config.compression.is_none());
assert!(config.max_concurrent_requests.is_none());
assert!(config.caching.is_none());
assert!(config.storage.is_none());
assert!(config.manifest.is_none());
}
#[icechunk_macros::test]
fn test_set_vcc_name_uniqueness() {
let mut config = RepositoryConfig::default();
config
.set_virtual_chunk_container(
VirtualChunkContainer::new_named(
"my-data".to_string(),
"s3://bucket1/prefix/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-east-1".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
config
.set_virtual_chunk_container(
VirtualChunkContainer::new_named(
"my-data".to_string(),
"s3://bucket2/other/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-east-1".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
assert!(config.get_virtual_chunk_container("s3://bucket1/prefix/").is_none());
assert!(config.get_virtual_chunk_container("s3://bucket2/other/").is_some());
config
.set_virtual_chunk_container(
VirtualChunkContainer::new_named(
"my-data".to_string(),
"s3://bucket1/prefix/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-west-2".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
config
.set_virtual_chunk_container(
VirtualChunkContainer::new(
"s3://bucket3/".to_string(),
ObjectStoreConfig::S3(S3Options {
region: Some("us-east-1".to_string()),
endpoint_url: None,
anonymous: false,
allow_http: false,
force_path_style: false,
network_stream_timeout_seconds: None,
requester_pays: false,
}),
)
.unwrap(),
)
.unwrap();
}
}