Skip to main content

atlas/
meta.rs

1use std::sync::Arc;
2
3use indexmap::IndexMap;
4use object_store::{ObjectStore, ObjectStoreExt, path::Path};
5use serde::{Deserialize, Serialize};
6use tracing::warn;
7
8use crate::{
9    Error, Result,
10    config::{Codec, META_VARIANTS, MetaFormat},
11    schema::{ArraySchema, Attr},
12};
13
14/// Metadata for a single dataset: array schemas and per-dataset attributes.
15/// Both maps preserve insertion order (via [`IndexMap`]) so on-disk layouts
16/// and Python-side dict iteration mirror the order arrays/attributes were
17/// added.
18#[derive(Debug, Default, Clone, Serialize, Deserialize)]
19pub struct DatasetMeta {
20    /// Array name → schema. Insertion-ordered.
21    #[serde(default)]
22    pub arrays: IndexMap<String, ArraySchema>,
23    /// Attribute key → typed value. Insertion-ordered.
24    #[serde(default)]
25    pub attributes: IndexMap<String, Attr>,
26}
27
28#[derive(Debug, Default, Clone, Serialize, Deserialize)]
29pub(crate) struct StoreMeta {
30    pub version: u32,
31    /// Codec used when new arrays are defined in this store.
32    /// Written by `create`, read by `open`. Defaults to `Zstd` for stores
33    /// created before this field existed.
34    #[serde(default)]
35    pub codec: Codec,
36    pub datasets: IndexMap<String, DatasetMeta>,
37}
38
39/// Load store metadata, auto-detecting both the encoding format and the
40/// compression from the on-disk filename.
41///
42/// Uses a single [`ObjectStore::list_with_delimiter`] to enumerate the
43/// top-level files and matches them against the six known
44/// `atlas.{json,msgpack}{,.zst,.lz4}` filenames. If more than one matches
45/// (shouldn't happen unless the directory was hand-edited), the warning
46/// names them and the priority order in
47/// [`META_VARIANTS`](crate::config::META_VARIANTS) decides — uncompressed
48/// before compressed within each format, JSON before MsgPack overall.
49///
50/// If no metadata file is found, returns the default (empty) metadata with
51/// `(Json, Uncompressed)` so a freshly-created store gets the legacy
52/// `atlas.json` filename on its first save.
53///
54/// The returned `(MetaFormat, Codec)` is what subsequent saves should use so
55/// the same file is overwritten instead of leaving stale copies behind.
56pub(crate) async fn load_meta(
57    store: &Arc<dyn ObjectStore>,
58) -> Result<(StoreMeta, MetaFormat, Codec)> {
59    let listing = store
60        .list_with_delimiter(None)
61        .await
62        .map_err(Error::ObjectStore)?;
63
64    // Collect filenames present at the root.
65    let present: std::collections::HashSet<&str> = listing
66        .objects
67        .iter()
68        .filter_map(|o| o.location.filename())
69        .collect();
70
71    let matches: Vec<(MetaFormat, Codec)> = META_VARIANTS
72        .iter()
73        .copied()
74        .filter(|&(fmt, comp)| present.contains(fmt.filename(comp)))
75        .collect();
76
77    let (format, compression) = match matches.as_slice() {
78        [] => return Ok((StoreMeta::default(), MetaFormat::Json, Codec::Uncompressed)),
79        [single] => *single,
80        many => {
81            let names: Vec<&str> = many
82                .iter()
83                .map(|&(f, c)| f.filename(c))
84                .collect();
85            let chosen = many[0];
86            warn!(
87                "multiple metadata files present ({names:?}); loading {} by priority order",
88                chosen.0.filename(chosen.1)
89            );
90            chosen
91        }
92    };
93
94    let bytes = store
95        .get(&Path::from(format.filename(compression)))
96        .await
97        .map_err(Error::ObjectStore)?
98        .bytes()
99        .await
100        .map_err(Error::ObjectStore)?;
101    let raw = decompress(&bytes, compression)?;
102    let meta = decode(&raw, format)?;
103    Ok((meta, format, compression))
104}
105
106fn decode(bytes: &[u8], format: MetaFormat) -> Result<StoreMeta> {
107    match format {
108        MetaFormat::Json => Ok(serde_json::from_slice(bytes)?),
109        MetaFormat::MsgPack => Ok(rmp_serde::from_slice(bytes)?),
110    }
111}
112
113fn encode(meta: &StoreMeta, format: MetaFormat) -> Result<Vec<u8>> {
114    match format {
115        MetaFormat::Json => Ok(serde_json::to_vec_pretty(meta)?),
116        MetaFormat::MsgPack => Ok(rmp_serde::to_vec_named(meta)?),
117    }
118}
119
120fn compress(bytes: Vec<u8>, codec: Codec) -> Result<Vec<u8>> {
121    match codec {
122        Codec::Uncompressed => Ok(bytes),
123        // zstd default level (3) — good ratio at low CPU. Metadata is small,
124        // so even level 19 would be sub-millisecond, but the default is fine.
125        Codec::Zstd => Ok(zstd::stream::encode_all(bytes.as_slice(), 0)?),
126        // lz4_flex compression is infallible; size prefix lets decode know the
127        // output length without scanning.
128        Codec::Lz4 => Ok(lz4_flex::compress_prepend_size(&bytes)),
129    }
130}
131
132fn decompress(bytes: &[u8], codec: Codec) -> Result<Vec<u8>> {
133    match codec {
134        Codec::Uncompressed => Ok(bytes.to_vec()),
135        Codec::Zstd => Ok(zstd::stream::decode_all(bytes)?),
136        Codec::Lz4 => Ok(lz4_flex::decompress_size_prepended(bytes)?),
137    }
138}
139
140pub(crate) async fn save_meta(
141    store: &Arc<dyn ObjectStore>,
142    meta: &StoreMeta,
143    format: MetaFormat,
144    compression: Codec,
145) -> Result<()> {
146    let encoded = encode(meta, format)?;
147    let bytes = compress(encoded, compression)?;
148    store
149        .put(&Path::from(format.filename(compression)), bytes.into())
150        .await
151        .map_err(Error::ObjectStore)?;
152    Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::config::Codec;
159    use array_format::DType;
160    use object_store::memory::InMemory;
161
162    fn make_store() -> Arc<dyn ObjectStore> {
163        Arc::new(InMemory::new())
164    }
165
166    fn sample_meta() -> StoreMeta {
167        use crate::schema::ArraySchema;
168        let mut meta = StoreMeta {
169            version: 1,
170            ..Default::default()
171        };
172        meta.datasets.insert(
173            "ds1".into(),
174            DatasetMeta {
175                arrays: IndexMap::from([(
176                    "temp".into(),
177                    ArraySchema {
178                        dtype: DType::Float32,
179                        shape: vec![4, 8],
180                        chunk_shape: vec![2, 4],
181                        dimension_names: vec!["lat".into(), "lon".into()],
182                        codec: Codec::default(),
183                    },
184                )]),
185                attributes: IndexMap::from([
186                    ("month".into(), Attr::Int64(6)),
187                    ("active".into(), Attr::Bool(true)),
188                ]),
189            },
190        );
191        meta
192    }
193
194    #[tokio::test]
195    async fn load_meta_missing_returns_default_json_uncompressed() {
196        let store = make_store();
197        let (meta, format, compression) = load_meta(&store).await.unwrap();
198        assert_eq!(meta.version, 0);
199        assert!(meta.datasets.is_empty());
200        assert_eq!(format, MetaFormat::Json);
201        assert_eq!(compression, Codec::Uncompressed);
202    }
203
204    /// Roundtrip every (format, compression) pair through save_meta + load_meta.
205    /// Asserts the detected pair matches what was written.
206    #[tokio::test]
207    async fn save_and_load_roundtrip_all_variants() {
208        for &(format, compression) in &META_VARIANTS {
209            let store = make_store();
210            let meta = sample_meta();
211            save_meta(&store, &meta, format, compression).await.unwrap();
212
213            let (loaded, detected_fmt, detected_comp) = load_meta(&store).await.unwrap();
214            assert_eq!(detected_fmt, format, "format mismatch for {format:?}/{compression:?}");
215            assert_eq!(
216                detected_comp, compression,
217                "compression mismatch for {format:?}/{compression:?}"
218            );
219            assert_eq!(loaded.version, 1);
220            let dm = &loaded.datasets["ds1"];
221            assert_eq!(dm.arrays["temp"].dtype, DType::Float32);
222            assert_eq!(dm.arrays["temp"].shape, vec![4, 8]);
223            assert!(matches!(dm.attributes["month"], Attr::Int64(6)));
224        }
225    }
226
227    #[tokio::test]
228    async fn msgpack_is_smaller_than_json() {
229        let meta = sample_meta();
230        let json = encode(&meta, MetaFormat::Json).unwrap();
231        let mp = encode(&meta, MetaFormat::MsgPack).unwrap();
232        assert!(
233            mp.len() < json.len(),
234            "msgpack ({}) should be smaller than JSON ({})",
235            mp.len(),
236            json.len()
237        );
238    }
239
240    /// Compression should shrink the encoded bytes. Uses a workload large
241    /// enough to overcome compression framing overhead.
242    #[tokio::test]
243    async fn compression_shrinks_encoded_bytes() {
244        use crate::schema::ArraySchema;
245        let mut meta = StoreMeta {
246            version: 1,
247            ..Default::default()
248        };
249        for i in 0..30 {
250            let mut dm = DatasetMeta::default();
251            for j in 0..5 {
252                dm.arrays.insert(
253                    format!("arr_{j}"),
254                    ArraySchema {
255                        dtype: DType::Float32,
256                        shape: vec![100, 200, 300],
257                        chunk_shape: vec![10, 20, 30],
258                        dimension_names: vec!["a".into(), "b".into(), "c".into()],
259                        codec: Codec::default(),
260                    },
261                );
262            }
263            meta.datasets.insert(format!("dataset_{i}"), dm);
264        }
265
266        for format in [MetaFormat::Json, MetaFormat::MsgPack] {
267            let raw = encode(&meta, format).unwrap();
268            let zstd = compress(raw.clone(), Codec::Zstd).unwrap();
269            let lz4 = compress(raw.clone(), Codec::Lz4).unwrap();
270            assert!(
271                zstd.len() < raw.len(),
272                "{format:?}: zstd ({}) should be smaller than raw ({})",
273                zstd.len(),
274                raw.len()
275            );
276            assert!(
277                lz4.len() < raw.len(),
278                "{format:?}: lz4 ({}) should be smaller than raw ({})",
279                lz4.len(),
280                raw.len()
281            );
282        }
283    }
284
285    #[tokio::test]
286    async fn load_detects_msgpack_zstd_when_only_that_present() {
287        let store = make_store();
288        save_meta(&store, &sample_meta(), MetaFormat::MsgPack, Codec::Zstd)
289            .await
290            .unwrap();
291        let (_, format, compression) = load_meta(&store).await.unwrap();
292        assert_eq!(format, MetaFormat::MsgPack);
293        assert_eq!(compression, Codec::Zstd);
294    }
295
296    /// When more than one metadata file is present, priority order picks
297    /// uncompressed JSON over everything else.
298    #[tokio::test]
299    async fn load_priority_order_when_many_present() {
300        let store = make_store();
301        let mut a = sample_meta();
302        a.version = 1;
303        let mut b = sample_meta();
304        b.version = 2;
305        let mut c = sample_meta();
306        c.version = 3;
307        // Write three different files; uncompressed-JSON should win.
308        save_meta(&store, &c, MetaFormat::MsgPack, Codec::Zstd).await.unwrap();
309        save_meta(&store, &b, MetaFormat::Json, Codec::Zstd).await.unwrap();
310        save_meta(&store, &a, MetaFormat::Json, Codec::Uncompressed).await.unwrap();
311
312        let (loaded, format, compression) = load_meta(&store).await.unwrap();
313        assert_eq!(format, MetaFormat::Json);
314        assert_eq!(compression, Codec::Uncompressed);
315        assert_eq!(loaded.version, 1);
316    }
317
318    #[tokio::test]
319    async fn save_overwrites_previous_meta() {
320        let store = make_store();
321        let meta1 = StoreMeta {
322            version: 1,
323            ..Default::default()
324        };
325        save_meta(&store, &meta1, MetaFormat::Json, Codec::Uncompressed)
326            .await
327            .unwrap();
328
329        let mut meta2 = StoreMeta {
330            version: 2,
331            ..Default::default()
332        };
333        meta2
334            .datasets
335            .insert("new_ds".into(), DatasetMeta::default());
336        save_meta(&store, &meta2, MetaFormat::Json, Codec::Uncompressed)
337            .await
338            .unwrap();
339
340        let (loaded, _, _) = load_meta(&store).await.unwrap();
341        assert_eq!(loaded.version, 2);
342        assert!(loaded.datasets.contains_key("new_ds"));
343    }
344
345    #[test]
346    fn attr_roundtrip_via_serde() {
347        let cases = vec![
348            Attr::Bool(true),
349            Attr::Int64(-1_000_000),
350            Attr::Float64(2.5),
351            Attr::String("hello".into()),
352            Attr::TimestampNanoseconds(1_700_000_000_000_000_000),
353        ];
354        for v in cases {
355            let json = serde_json::to_string(&v).unwrap();
356            let back: Attr = serde_json::from_str(&json).unwrap();
357            assert_eq!(v, back);
358        }
359    }
360
361    #[test]
362    fn attr_json_shapes() {
363        assert_eq!(serde_json::to_string(&Attr::Bool(true)).unwrap(), "true");
364        assert_eq!(serde_json::to_string(&Attr::Int64(42)).unwrap(), "42");
365        assert_eq!(serde_json::to_string(&Attr::Float64(1.5)).unwrap(), "1.5");
366        assert_eq!(
367            serde_json::to_string(&Attr::String("x".into())).unwrap(),
368            "\"x\""
369        );
370        assert_eq!(
371            serde_json::to_string(&Attr::TimestampNanoseconds(1_700_000_000_000_000_000)).unwrap(),
372            "\"2023-11-14T22:13:20Z\"",
373        );
374
375        // Round-tripped non-RFC-3339 string stays as String, not TimestampNanoseconds.
376        let back: Attr = serde_json::from_str("\"not-a-date\"").unwrap();
377        assert_eq!(back, Attr::String("not-a-date".into()));
378
379        // RFC 3339 string deserializes as TimestampNanoseconds (won the order race).
380        let back: Attr = serde_json::from_str("\"2023-11-14T22:13:20Z\"").unwrap();
381        assert_eq!(back, Attr::TimestampNanoseconds(1_700_000_000_000_000_000));
382    }
383
384    #[test]
385    fn array_schema_roundtrip_via_serde() {
386        use crate::schema::ArraySchema;
387        let schema = ArraySchema {
388            dtype: DType::Float64,
389            shape: vec![10, 20],
390            chunk_shape: vec![5, 5],
391            dimension_names: vec!["lat".into(), "lon".into()],
392            codec: Codec::default(),
393        };
394        let json = serde_json::to_string(&schema).unwrap();
395        let back: ArraySchema = serde_json::from_str(&json).unwrap();
396        assert_eq!(schema, back);
397    }
398}