1use std::sync::Arc;
2
3use indexmap::IndexMap;
4use object_store::{ObjectStore, ObjectStoreExt, path::Path};
5use serde::{Deserialize, Serialize};
6use tracing::warn;
7
8use crate::{
9 Error, Result,
10 config::{Codec, META_VARIANTS, MetaFormat},
11 schema::{ArraySchema, Attr},
12};
13
14#[derive(Debug, Default, Clone, Serialize, Deserialize)]
19pub struct DatasetMeta {
20 #[serde(default)]
22 pub arrays: IndexMap<String, ArraySchema>,
23 #[serde(default)]
25 pub attributes: IndexMap<String, Attr>,
26}
27
28#[derive(Debug, Default, Clone, Serialize, Deserialize)]
29pub(crate) struct StoreMeta {
30 pub version: u32,
31 #[serde(default)]
35 pub codec: Codec,
36 pub datasets: IndexMap<String, DatasetMeta>,
37}
38
39pub(crate) async fn load_meta(
57 store: &Arc<dyn ObjectStore>,
58) -> Result<(StoreMeta, MetaFormat, Codec)> {
59 let listing = store
60 .list_with_delimiter(None)
61 .await
62 .map_err(Error::ObjectStore)?;
63
64 let present: std::collections::HashSet<&str> = listing
66 .objects
67 .iter()
68 .filter_map(|o| o.location.filename())
69 .collect();
70
71 let matches: Vec<(MetaFormat, Codec)> = META_VARIANTS
72 .iter()
73 .copied()
74 .filter(|&(fmt, comp)| present.contains(fmt.filename(comp)))
75 .collect();
76
77 let (format, compression) = match matches.as_slice() {
78 [] => return Ok((StoreMeta::default(), MetaFormat::Json, Codec::Uncompressed)),
79 [single] => *single,
80 many => {
81 let names: Vec<&str> = many
82 .iter()
83 .map(|&(f, c)| f.filename(c))
84 .collect();
85 let chosen = many[0];
86 warn!(
87 "multiple metadata files present ({names:?}); loading {} by priority order",
88 chosen.0.filename(chosen.1)
89 );
90 chosen
91 }
92 };
93
94 let bytes = store
95 .get(&Path::from(format.filename(compression)))
96 .await
97 .map_err(Error::ObjectStore)?
98 .bytes()
99 .await
100 .map_err(Error::ObjectStore)?;
101 let raw = decompress(&bytes, compression)?;
102 let meta = decode(&raw, format)?;
103 Ok((meta, format, compression))
104}
105
106fn decode(bytes: &[u8], format: MetaFormat) -> Result<StoreMeta> {
107 match format {
108 MetaFormat::Json => Ok(serde_json::from_slice(bytes)?),
109 MetaFormat::MsgPack => Ok(rmp_serde::from_slice(bytes)?),
110 }
111}
112
113fn encode(meta: &StoreMeta, format: MetaFormat) -> Result<Vec<u8>> {
114 match format {
115 MetaFormat::Json => Ok(serde_json::to_vec_pretty(meta)?),
116 MetaFormat::MsgPack => Ok(rmp_serde::to_vec_named(meta)?),
117 }
118}
119
120fn compress(bytes: Vec<u8>, codec: Codec) -> Result<Vec<u8>> {
121 match codec {
122 Codec::Uncompressed => Ok(bytes),
123 Codec::Zstd => Ok(zstd::stream::encode_all(bytes.as_slice(), 0)?),
126 Codec::Lz4 => Ok(lz4_flex::compress_prepend_size(&bytes)),
129 }
130}
131
132fn decompress(bytes: &[u8], codec: Codec) -> Result<Vec<u8>> {
133 match codec {
134 Codec::Uncompressed => Ok(bytes.to_vec()),
135 Codec::Zstd => Ok(zstd::stream::decode_all(bytes)?),
136 Codec::Lz4 => Ok(lz4_flex::decompress_size_prepended(bytes)?),
137 }
138}
139
140pub(crate) async fn save_meta(
141 store: &Arc<dyn ObjectStore>,
142 meta: &StoreMeta,
143 format: MetaFormat,
144 compression: Codec,
145) -> Result<()> {
146 let encoded = encode(meta, format)?;
147 let bytes = compress(encoded, compression)?;
148 store
149 .put(&Path::from(format.filename(compression)), bytes.into())
150 .await
151 .map_err(Error::ObjectStore)?;
152 Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::config::Codec;
159 use array_format::DType;
160 use object_store::memory::InMemory;
161
162 fn make_store() -> Arc<dyn ObjectStore> {
163 Arc::new(InMemory::new())
164 }
165
166 fn sample_meta() -> StoreMeta {
167 use crate::schema::ArraySchema;
168 let mut meta = StoreMeta {
169 version: 1,
170 ..Default::default()
171 };
172 meta.datasets.insert(
173 "ds1".into(),
174 DatasetMeta {
175 arrays: IndexMap::from([(
176 "temp".into(),
177 ArraySchema {
178 dtype: DType::Float32,
179 shape: vec![4, 8],
180 chunk_shape: vec![2, 4],
181 dimension_names: vec!["lat".into(), "lon".into()],
182 codec: Codec::default(),
183 },
184 )]),
185 attributes: IndexMap::from([
186 ("month".into(), Attr::Int64(6)),
187 ("active".into(), Attr::Bool(true)),
188 ]),
189 },
190 );
191 meta
192 }
193
194 #[tokio::test]
195 async fn load_meta_missing_returns_default_json_uncompressed() {
196 let store = make_store();
197 let (meta, format, compression) = load_meta(&store).await.unwrap();
198 assert_eq!(meta.version, 0);
199 assert!(meta.datasets.is_empty());
200 assert_eq!(format, MetaFormat::Json);
201 assert_eq!(compression, Codec::Uncompressed);
202 }
203
204 #[tokio::test]
207 async fn save_and_load_roundtrip_all_variants() {
208 for &(format, compression) in &META_VARIANTS {
209 let store = make_store();
210 let meta = sample_meta();
211 save_meta(&store, &meta, format, compression).await.unwrap();
212
213 let (loaded, detected_fmt, detected_comp) = load_meta(&store).await.unwrap();
214 assert_eq!(detected_fmt, format, "format mismatch for {format:?}/{compression:?}");
215 assert_eq!(
216 detected_comp, compression,
217 "compression mismatch for {format:?}/{compression:?}"
218 );
219 assert_eq!(loaded.version, 1);
220 let dm = &loaded.datasets["ds1"];
221 assert_eq!(dm.arrays["temp"].dtype, DType::Float32);
222 assert_eq!(dm.arrays["temp"].shape, vec![4, 8]);
223 assert!(matches!(dm.attributes["month"], Attr::Int64(6)));
224 }
225 }
226
227 #[tokio::test]
228 async fn msgpack_is_smaller_than_json() {
229 let meta = sample_meta();
230 let json = encode(&meta, MetaFormat::Json).unwrap();
231 let mp = encode(&meta, MetaFormat::MsgPack).unwrap();
232 assert!(
233 mp.len() < json.len(),
234 "msgpack ({}) should be smaller than JSON ({})",
235 mp.len(),
236 json.len()
237 );
238 }
239
240 #[tokio::test]
243 async fn compression_shrinks_encoded_bytes() {
244 use crate::schema::ArraySchema;
245 let mut meta = StoreMeta {
246 version: 1,
247 ..Default::default()
248 };
249 for i in 0..30 {
250 let mut dm = DatasetMeta::default();
251 for j in 0..5 {
252 dm.arrays.insert(
253 format!("arr_{j}"),
254 ArraySchema {
255 dtype: DType::Float32,
256 shape: vec![100, 200, 300],
257 chunk_shape: vec![10, 20, 30],
258 dimension_names: vec!["a".into(), "b".into(), "c".into()],
259 codec: Codec::default(),
260 },
261 );
262 }
263 meta.datasets.insert(format!("dataset_{i}"), dm);
264 }
265
266 for format in [MetaFormat::Json, MetaFormat::MsgPack] {
267 let raw = encode(&meta, format).unwrap();
268 let zstd = compress(raw.clone(), Codec::Zstd).unwrap();
269 let lz4 = compress(raw.clone(), Codec::Lz4).unwrap();
270 assert!(
271 zstd.len() < raw.len(),
272 "{format:?}: zstd ({}) should be smaller than raw ({})",
273 zstd.len(),
274 raw.len()
275 );
276 assert!(
277 lz4.len() < raw.len(),
278 "{format:?}: lz4 ({}) should be smaller than raw ({})",
279 lz4.len(),
280 raw.len()
281 );
282 }
283 }
284
285 #[tokio::test]
286 async fn load_detects_msgpack_zstd_when_only_that_present() {
287 let store = make_store();
288 save_meta(&store, &sample_meta(), MetaFormat::MsgPack, Codec::Zstd)
289 .await
290 .unwrap();
291 let (_, format, compression) = load_meta(&store).await.unwrap();
292 assert_eq!(format, MetaFormat::MsgPack);
293 assert_eq!(compression, Codec::Zstd);
294 }
295
296 #[tokio::test]
299 async fn load_priority_order_when_many_present() {
300 let store = make_store();
301 let mut a = sample_meta();
302 a.version = 1;
303 let mut b = sample_meta();
304 b.version = 2;
305 let mut c = sample_meta();
306 c.version = 3;
307 save_meta(&store, &c, MetaFormat::MsgPack, Codec::Zstd).await.unwrap();
309 save_meta(&store, &b, MetaFormat::Json, Codec::Zstd).await.unwrap();
310 save_meta(&store, &a, MetaFormat::Json, Codec::Uncompressed).await.unwrap();
311
312 let (loaded, format, compression) = load_meta(&store).await.unwrap();
313 assert_eq!(format, MetaFormat::Json);
314 assert_eq!(compression, Codec::Uncompressed);
315 assert_eq!(loaded.version, 1);
316 }
317
318 #[tokio::test]
319 async fn save_overwrites_previous_meta() {
320 let store = make_store();
321 let meta1 = StoreMeta {
322 version: 1,
323 ..Default::default()
324 };
325 save_meta(&store, &meta1, MetaFormat::Json, Codec::Uncompressed)
326 .await
327 .unwrap();
328
329 let mut meta2 = StoreMeta {
330 version: 2,
331 ..Default::default()
332 };
333 meta2
334 .datasets
335 .insert("new_ds".into(), DatasetMeta::default());
336 save_meta(&store, &meta2, MetaFormat::Json, Codec::Uncompressed)
337 .await
338 .unwrap();
339
340 let (loaded, _, _) = load_meta(&store).await.unwrap();
341 assert_eq!(loaded.version, 2);
342 assert!(loaded.datasets.contains_key("new_ds"));
343 }
344
345 #[test]
346 fn attr_roundtrip_via_serde() {
347 let cases = vec![
348 Attr::Bool(true),
349 Attr::Int64(-1_000_000),
350 Attr::Float64(2.5),
351 Attr::String("hello".into()),
352 Attr::TimestampNanoseconds(1_700_000_000_000_000_000),
353 ];
354 for v in cases {
355 let json = serde_json::to_string(&v).unwrap();
356 let back: Attr = serde_json::from_str(&json).unwrap();
357 assert_eq!(v, back);
358 }
359 }
360
361 #[test]
362 fn attr_json_shapes() {
363 assert_eq!(serde_json::to_string(&Attr::Bool(true)).unwrap(), "true");
364 assert_eq!(serde_json::to_string(&Attr::Int64(42)).unwrap(), "42");
365 assert_eq!(serde_json::to_string(&Attr::Float64(1.5)).unwrap(), "1.5");
366 assert_eq!(
367 serde_json::to_string(&Attr::String("x".into())).unwrap(),
368 "\"x\""
369 );
370 assert_eq!(
371 serde_json::to_string(&Attr::TimestampNanoseconds(1_700_000_000_000_000_000)).unwrap(),
372 "\"2023-11-14T22:13:20Z\"",
373 );
374
375 let back: Attr = serde_json::from_str("\"not-a-date\"").unwrap();
377 assert_eq!(back, Attr::String("not-a-date".into()));
378
379 let back: Attr = serde_json::from_str("\"2023-11-14T22:13:20Z\"").unwrap();
381 assert_eq!(back, Attr::TimestampNanoseconds(1_700_000_000_000_000_000));
382 }
383
384 #[test]
385 fn array_schema_roundtrip_via_serde() {
386 use crate::schema::ArraySchema;
387 let schema = ArraySchema {
388 dtype: DType::Float64,
389 shape: vec![10, 20],
390 chunk_shape: vec![5, 5],
391 dimension_names: vec!["lat".into(), "lon".into()],
392 codec: Codec::default(),
393 };
394 let json = serde_json::to_string(&schema).unwrap();
395 let back: ArraySchema = serde_json::from_str(&json).unwrap();
396 assert_eq!(schema, back);
397 }
398}