zarr_datafusion/reader/
schema_inference.rs

1//! Schema inference for Zarr v2 and v3 stores
2//!
3//! # Assumptions
4//!
5//! This module assumes a specific Zarr store structure:
6//!
7//! 1. **Coordinates are 1D arrays**: Any array with `shape.len() == 1` is treated as a coordinate.
8//!    Examples: `time(7)`, `lat(10)`, `lon(10)`
9//!
10//! 2. **Data variables are nD arrays**: Arrays with `shape.len() > 1` are treated as data variables.
11//!    Their dimensionality must equal the number of coordinate arrays.
12//!
13//! 3. **Cartesian product structure**: Data variables are assumed to be the Cartesian product
14//!    of all coordinates. For coordinates `[time(7), lat(10), lon(10)]`, data variables must
15//!    have shape `[7, 10, 10]` (i.e., `time × lat × lon`).
16//!
17//! 4. **Dimension ordering**: Coordinates are inferred to match the Zarr arrays' native
18//!    dimension ordering when possible (by matching data variable shapes to coordinate sizes).
19//!    If the ordering cannot be inferred unambiguously, we fall back to alphabetical ordering.
20//!
21//! # Example
22//!
23//! ```text
24//! weather.zarr/
25//! ├── time/       shape: [7]           → coordinate
26//! ├── lat/        shape: [10]          → coordinate
27//! ├── lon/        shape: [10]          → coordinate
28//! ├── temperature/ shape: [7, 10, 10]  → data variable (time × lat × lon)
29//! └── humidity/    shape: [7, 10, 10]  → data variable (time × lat × lon)
30//! ```
31
32use arrow::datatypes::{Field, Schema};
33use std::fs;
34use std::path::Path;
35use tracing::{debug, info, instrument};
36
37use super::dtype::{parse_v2_dtype, zarr_dtype_to_arrow, zarr_dtype_to_arrow_dictionary};
38
39/// Zarr format version
40#[derive(Debug, Clone, Copy, PartialEq)]
41pub enum ZarrVersion {
42    V2,
43    V3,
44}
45
46/// Detect Zarr version by checking metadata files
47pub fn detect_zarr_version(
48    store_path: &str,
49) -> Result<ZarrVersion, Box<dyn std::error::Error + Send + Sync>> {
50    let root = Path::new(store_path);
51
52    // Check for zarr.json (V3)
53    if root.join("zarr.json").exists() {
54        return Ok(ZarrVersion::V3);
55    }
56
57    // Check for .zgroup or .zarray (V2)
58    if root.join(".zgroup").exists() || root.join(".zarray").exists() {
59        return Ok(ZarrVersion::V2);
60    }
61
62    // Try to detect by looking at subdirectories
63    for entry in fs::read_dir(root)? {
64        let entry = entry?;
65        let path = entry.path();
66        if path.is_dir() {
67            if path.join("zarr.json").exists() {
68                return Ok(ZarrVersion::V3);
69            }
70            if path.join(".zarray").exists() {
71                return Ok(ZarrVersion::V2);
72            }
73        }
74    }
75
76    Err("Could not detect Zarr version: no metadata files found".into())
77}
78
79#[derive(Debug, Clone)]
80pub struct ZarrArrayMeta {
81    pub name: String,
82    pub data_type: String,
83    pub shape: Vec<u64>,
84    /// Min/max bounds for coordinate arrays (None for data variables)
85    /// Stored as (min, max) in f64 for simplicity
86    pub coord_min_max: Option<(f64, f64)>,
87}
88
89impl ZarrArrayMeta {
90    pub fn is_coordinate(&self) -> bool {
91        self.shape.len() == 1
92    }
93}
94
95/// Discovered Zarr store structure
96#[derive(Debug, Clone)]
97pub struct ZarrStoreMeta {
98    pub coords: Vec<ZarrArrayMeta>,    // 1D arrays (sorted by name)
99    pub data_vars: Vec<ZarrArrayMeta>, // nD arrays
100    pub total_rows: usize,             // Product of all coordinate sizes
101}
102
103/// Discover all arrays in a Zarr store (v2 or v3)
104pub fn discover_arrays(
105    store_path: &str,
106) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
107    let version = detect_zarr_version(store_path)?;
108
109    match version {
110        ZarrVersion::V2 => discover_arrays_v2(store_path),
111        ZarrVersion::V3 => discover_arrays_v3(store_path),
112    }
113}
114
115/// Discover arrays in a Zarr v2 store
116fn discover_arrays_v2(
117    store_path: &str,
118) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
119    let root = Path::new(store_path);
120    let mut arrays: Vec<ZarrArrayMeta> = Vec::new();
121
122    for entry in fs::read_dir(root)? {
123        let entry = entry?;
124        let path = entry.path();
125
126        if path.is_dir() {
127            let zarray = path.join(".zarray");
128            if zarray.exists() {
129                let content = fs::read_to_string(&zarray)?;
130                let meta: serde_json::Value = serde_json::from_str(&content)?;
131
132                let name = path
133                    .file_name()
134                    .and_then(|n| n.to_str())
135                    .unwrap_or("unknown")
136                    .to_string();
137
138                let shape: Vec<u64> = meta
139                    .get("shape")
140                    .and_then(|v| v.as_array())
141                    .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
142                    .unwrap_or_default();
143
144                // V2 uses numpy dtype format like "<i8", "<f4"
145                let dtype_raw = meta.get("dtype").and_then(|v| v.as_str()).unwrap_or("<f8");
146
147                let data_type = parse_v2_dtype(dtype_raw);
148
149                arrays.push(ZarrArrayMeta {
150                    name,
151                    data_type,
152                    shape,
153                    coord_min_max: None, // Will be computed in separate_and_sort_arrays
154                });
155            }
156        }
157    }
158
159    separate_and_sort_arrays(arrays, store_path)
160}
161
162/// Discover arrays in a Zarr v3 store
163fn discover_arrays_v3(
164    store_path: &str,
165) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
166    let root = Path::new(store_path);
167    let mut arrays: Vec<ZarrArrayMeta> = Vec::new();
168
169    for entry in fs::read_dir(root)? {
170        let entry = entry?;
171        let path = entry.path();
172
173        if path.is_dir() {
174            let zarr_json = path.join("zarr.json");
175            if zarr_json.exists() {
176                let content = fs::read_to_string(&zarr_json)?;
177                let meta: serde_json::Value = serde_json::from_str(&content)?;
178
179                if meta.get("node_type").and_then(|v| v.as_str()) == Some("array") {
180                    let name = path
181                        .file_name()
182                        .and_then(|n| n.to_str())
183                        .unwrap_or("unknown")
184                        .to_string();
185
186                    let shape: Vec<u64> = meta
187                        .get("shape")
188                        .and_then(|v| v.as_array())
189                        .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
190                        .unwrap_or_default();
191
192                    let data_type = meta
193                        .get("data_type")
194                        .and_then(|v| v.as_str())
195                        .unwrap_or("float64")
196                        .to_string();
197
198                    arrays.push(ZarrArrayMeta {
199                        name,
200                        data_type,
201                        shape,
202                        coord_min_max: None, // Will be computed in separate_and_sort_arrays
203                    });
204                }
205            }
206        }
207    }
208
209    separate_and_sort_arrays(arrays, store_path)
210}
211
212/// Compute min/max for a coordinate array by reading its data
213/// Returns None if the computation fails (e.g., unsupported dtype, read error)
214fn compute_coord_min_max(
215    store_path: &str,
216    coord_name: &str,
217    data_type: &str,
218) -> Option<(f64, f64)> {
219    use zarrs::array::Array;
220    use zarrs::array_subset::ArraySubset;
221    use zarrs::filesystem::FilesystemStore;
222
223    // Open store and array
224    let store = FilesystemStore::new(store_path).ok()?;
225    let array_path = format!("/{}", coord_name);
226    let array = Array::open(store.into(), &array_path).ok()?;
227
228    // Get the full subset (entire 1D array)
229    let shape = array.shape();
230    let subset = ArraySubset::new_with_start_shape(vec![0], shape.to_vec()).ok()?;
231
232    // Read based on data type and compute min/max
233    match data_type {
234        "float64" => {
235            let data: Vec<f64> = array.retrieve_array_subset_elements(&subset).ok()?;
236            if data.is_empty() {
237                return None;
238            }
239            let min = data.iter().cloned().fold(f64::INFINITY, f64::min);
240            let max = data.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
241            Some((min, max))
242        }
243        "float32" => {
244            let data: Vec<f32> = array.retrieve_array_subset_elements(&subset).ok()?;
245            if data.is_empty() {
246                return None;
247            }
248            let min = data.iter().cloned().fold(f32::INFINITY, f32::min) as f64;
249            let max = data.iter().cloned().fold(f32::NEG_INFINITY, f32::max) as f64;
250            Some((min, max))
251        }
252        "int64" => {
253            let data: Vec<i64> = array.retrieve_array_subset_elements(&subset).ok()?;
254            if data.is_empty() {
255                return None;
256            }
257            let min = *data.iter().min()? as f64;
258            let max = *data.iter().max()? as f64;
259            Some((min, max))
260        }
261        "int32" => {
262            let data: Vec<i32> = array.retrieve_array_subset_elements(&subset).ok()?;
263            if data.is_empty() {
264                return None;
265            }
266            let min = *data.iter().min()? as f64;
267            let max = *data.iter().max()? as f64;
268            Some((min, max))
269        }
270        "int16" => {
271            let data: Vec<i16> = array.retrieve_array_subset_elements(&subset).ok()?;
272            if data.is_empty() {
273                return None;
274            }
275            let min = *data.iter().min()? as f64;
276            let max = *data.iter().max()? as f64;
277            Some((min, max))
278        }
279        "uint64" => {
280            let data: Vec<u64> = array.retrieve_array_subset_elements(&subset).ok()?;
281            if data.is_empty() {
282                return None;
283            }
284            let min = *data.iter().min()? as f64;
285            let max = *data.iter().max()? as f64;
286            Some((min, max))
287        }
288        "uint32" => {
289            let data: Vec<u32> = array.retrieve_array_subset_elements(&subset).ok()?;
290            if data.is_empty() {
291                return None;
292            }
293            let min = *data.iter().min()? as f64;
294            let max = *data.iter().max()? as f64;
295            Some((min, max))
296        }
297        _ => {
298            debug!(data_type = %data_type, "Unsupported data type for min/max computation");
299            None
300        }
301    }
302}
303
304/// Attempt to infer coordinate ordering from data variable shapes.
305///
306/// We prefer to preserve the native dimension order of Zarr arrays by matching
307/// each data variable's shape to the sizes of discovered coordinates. If a
308/// matching data variable cannot be found or the mapping is ambiguous (e.g.,
309/// multiple coordinates share the same size), we fall back to alphabetical
310/// ordering for stability.
311fn infer_coord_order_from_data_vars(
312    mut coords: Vec<ZarrArrayMeta>,
313    data_vars: &[ZarrArrayMeta],
314) -> Vec<ZarrArrayMeta> {
315    // If we have nothing to infer from, keep alphabetical ordering
316    if coords.is_empty() || data_vars.is_empty() {
317        coords.sort_by(|a, b| a.name.cmp(&b.name));
318        return coords;
319    }
320
321    // Find the first data variable whose dimensionality equals the number
322    // of coordinates and whose shape can be matched to coordinate sizes.
323    for var in data_vars {
324        if var.shape.len() != coords.len() {
325            continue;
326        }
327
328        let mut ordered: Vec<ZarrArrayMeta> = Vec::with_capacity(coords.len());
329        let mut used = vec![false; coords.len()];
330        let mut success = true;
331
332        for &dim_size in &var.shape {
333            let mut found: Option<usize> = None;
334            for (j, c) in coords.iter().enumerate() {
335                if !used[j] && c.shape.first() == Some(&dim_size) {
336                    found = Some(j);
337                    break;
338                }
339            }
340
341            if let Some(j) = found {
342                ordered.push(coords[j].clone());
343                used[j] = true;
344            } else {
345                success = false;
346                break;
347            }
348        }
349
350        if success && ordered.len() == coords.len() {
351            return ordered;
352        }
353    }
354
355    // Fallback to alphabetical ordering if we couldn't infer a mapping
356    coords.sort_by(|a, b| a.name.cmp(&b.name));
357    coords
358}
359
360/// Separate arrays into coordinates and data variables, then sort
361/// Also computes min/max for coordinate arrays by reading their data
362fn separate_and_sort_arrays(
363    arrays: Vec<ZarrArrayMeta>,
364    store_path: &str,
365) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
366    // Use into_iter + partition for single-pass, zero-clone separation
367    let (mut coords, mut data_vars): (Vec<_>, Vec<_>) =
368        arrays.into_iter().partition(|a| a.is_coordinate());
369
370    // Keep data variables in stable alphabetical order for determinism
371    data_vars.sort_by(|a, b| a.name.cmp(&b.name));
372
373    // Try to reorder coordinates to match Zarr arrays' native dimension order
374    // by examining a representative data variable's shape. Fall back to
375    // alphabetical ordering when the mapping is ambiguous.
376    coords = infer_coord_order_from_data_vars(coords, &data_vars);
377
378    // Compute min/max for each coordinate by reading the data
379    for coord in &mut coords {
380        if let Some(min_max) = compute_coord_min_max(store_path, &coord.name, &coord.data_type) {
381            debug!(
382                coord = %coord.name,
383                min = min_max.0,
384                max = min_max.1,
385                "Computed coordinate min/max"
386            );
387            coord.coord_min_max = Some(min_max);
388        }
389    }
390
391    // Compute total_rows = product of all coordinate sizes
392    let total_rows: usize = coords.iter().map(|c| c.shape[0] as usize).product();
393
394    Ok(ZarrStoreMeta {
395        coords,
396        data_vars,
397        total_rows,
398    })
399}
400
401/// Infer Arrow schema from Zarr store metadata (v2 or v3)
402/// Coordinates use DictionaryArray for memory efficiency (stores unique values once)
403pub fn infer_schema(store_path: &str) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
404    let (schema, _meta) = infer_schema_with_meta(store_path)?;
405    Ok(schema)
406}
407
408/// Infer Arrow schema and return the store metadata for statistics
409/// This allows caching the metadata for later use during query execution
410pub fn infer_schema_with_meta(
411    store_path: &str,
412) -> Result<(Schema, ZarrStoreMeta), Box<dyn std::error::Error + Send + Sync>> {
413    let meta = discover_arrays(store_path)?;
414
415    let mut fields: Vec<Field> = Vec::new();
416
417    // Coordinates use Dictionary encoding for memory efficiency
418    // Instead of [0,0,0,1,1,1,2,2,2] we store {values: [0,1,2], indices: [0,0,0,1,1,1,2,2,2]}
419    for coord in &meta.coords {
420        fields.push(Field::new(
421            &coord.name,
422            zarr_dtype_to_arrow_dictionary(&coord.data_type),
423            false,
424        ));
425    }
426
427    // Data variables use regular arrays
428    for var in &meta.data_vars {
429        fields.push(Field::new(
430            &var.name,
431            zarr_dtype_to_arrow(&var.data_type),
432            true,
433        ));
434    }
435
436    Ok((Schema::new(fields), meta))
437}
438
439// =============================================================================
440// Async versions for remote object stores
441// =============================================================================
442
443use zarrs::storage::AsyncReadableListableStorage;
444use zarrs_object_store::object_store::path::Path as ObjectPath;
445
446/// Async version of discover_arrays for remote object stores
447#[instrument(level = "debug", skip_all)]
448pub async fn discover_arrays_async(
449    store: &AsyncReadableListableStorage,
450    prefix: &ObjectPath,
451) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
452    debug!("Detecting Zarr version");
453    let version = detect_zarr_version_async(store, prefix).await?;
454    info!(?version, "Zarr version detected");
455
456    let result = match version {
457        ZarrVersion::V2 => discover_arrays_v2_async(store, prefix).await,
458        ZarrVersion::V3 => discover_arrays_v3_async(store, prefix).await,
459    };
460
461    if let Ok(ref meta) = result {
462        info!(
463            coords = meta.coords.len(),
464            data_vars = meta.data_vars.len(),
465            "Arrays discovered"
466        );
467        for coord in &meta.coords {
468            debug!(name = %coord.name, shape = ?coord.shape, dtype = %coord.data_type, "Coordinate");
469        }
470        for var in &meta.data_vars {
471            debug!(name = %var.name, shape = ?var.shape, dtype = %var.data_type, "Data variable");
472        }
473    }
474
475    result
476}
477
478/// Async version of detect_zarr_version for remote object stores
479pub async fn detect_zarr_version_async(
480    store: &AsyncReadableListableStorage,
481    prefix: &ObjectPath,
482) -> Result<ZarrVersion, Box<dyn std::error::Error + Send + Sync>> {
483    use zarrs::storage::AsyncListableStorageTraits;
484    use zarrs::storage::StorePrefix;
485
486    // Check for root zarr.json (V3)
487    let zarr_json_path = format!("{}/zarr.json", prefix);
488    if store_key_exists(store, &zarr_json_path).await {
489        return Ok(ZarrVersion::V3);
490    }
491
492    // Check for root .zgroup (V2)
493    let zgroup_path = format!("{}/.zgroup", prefix);
494    if store_key_exists(store, &zgroup_path).await {
495        return Ok(ZarrVersion::V2);
496    }
497
498    // List directories and check first one for version detection
499    // StorePrefix requires trailing slash
500    let prefix_str = if prefix.as_ref().is_empty() {
501        "/".to_string()
502    } else {
503        format!("{}/", prefix.as_ref().trim_end_matches('/'))
504    };
505    let store_prefix = StorePrefix::new(&prefix_str)
506        .map_err(|e| format!("Invalid prefix '{}': {}", prefix_str, e))?;
507    let entries = store
508        .list_dir(&store_prefix)
509        .await
510        .map_err(|e| format!("Failed to list directory: {}", e))?;
511
512    for subdir in entries.prefixes() {
513        let subdir_str = subdir.as_str().trim_end_matches('/');
514        // Check for zarr.json in subdirectory (V3)
515        let v3_path = format!("{}/zarr.json", subdir_str);
516        if store_key_exists(store, &v3_path).await {
517            return Ok(ZarrVersion::V3);
518        }
519
520        // Check for .zarray in subdirectory (V2)
521        let v2_path = format!("{}/.zarray", subdir_str);
522        if store_key_exists(store, &v2_path).await {
523            return Ok(ZarrVersion::V2);
524        }
525    }
526
527    Err("Could not detect Zarr version: no metadata files found".into())
528}
529
530/// Check if a key exists in the async store
531async fn store_key_exists(store: &AsyncReadableListableStorage, key: &str) -> bool {
532    use zarrs::storage::{AsyncReadableStorageTraits, StoreKey};
533
534    let store_key = match StoreKey::new(key) {
535        Ok(k) => k,
536        Err(_) => return false,
537    };
538
539    matches!(store.get(&store_key).await, Ok(Some(_)))
540}
541
542/// Read a key from the async store as string
543async fn store_get_string(
544    store: &AsyncReadableListableStorage,
545    key: &str,
546) -> Result<String, Box<dyn std::error::Error + Send + Sync>> {
547    use zarrs::storage::{AsyncReadableStorageTraits, StoreKey};
548
549    let store_key = StoreKey::new(key).map_err(|e| format!("Invalid key '{}': {}", key, e))?;
550
551    let bytes = store
552        .get(&store_key)
553        .await
554        .map_err(|e| format!("Failed to read '{}': {}", key, e))?
555        .ok_or_else(|| format!("Key not found: {}", key))?;
556
557    String::from_utf8(bytes.to_vec())
558        .map_err(|e| format!("Invalid UTF-8 in '{}': {}", key, e).into())
559}
560
561/// Async version of discover_arrays_v2 for remote stores
562async fn discover_arrays_v2_async(
563    store: &AsyncReadableListableStorage,
564    prefix: &ObjectPath,
565) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
566    use zarrs::storage::{AsyncListableStorageTraits, StorePrefix};
567
568    let mut arrays: Vec<ZarrArrayMeta> = Vec::new();
569
570    // StorePrefix requires trailing slash
571    let prefix_str = if prefix.as_ref().is_empty() {
572        "/".to_string()
573    } else {
574        format!("{}/", prefix.as_ref().trim_end_matches('/'))
575    };
576    let store_prefix = StorePrefix::new(&prefix_str)
577        .map_err(|e| format!("Invalid prefix '{}': {}", prefix_str, e))?;
578    let entries = store
579        .list_dir(&store_prefix)
580        .await
581        .map_err(|e| format!("Failed to list directory: {}", e))?;
582
583    for subdir in entries.prefixes() {
584        let subdir_str = subdir.as_str().trim_end_matches('/');
585        let zarray_path = format!("{}/.zarray", subdir_str);
586
587        // Try to read .zarray metadata
588        if let Ok(content) = store_get_string(store, &zarray_path).await {
589            let meta: serde_json::Value = serde_json::from_str(&content)?;
590
591            // Extract array name from path (last component)
592            let name = subdir_str
593                .trim_end_matches('/')
594                .rsplit('/')
595                .next()
596                .unwrap_or("unknown")
597                .to_string();
598
599            let shape: Vec<u64> = meta
600                .get("shape")
601                .and_then(|v| v.as_array())
602                .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
603                .unwrap_or_default();
604
605            let dtype_raw = meta.get("dtype").and_then(|v| v.as_str()).unwrap_or("<f8");
606            let data_type = parse_v2_dtype(dtype_raw);
607
608            arrays.push(ZarrArrayMeta {
609                name,
610                data_type,
611                shape,
612                coord_min_max: None, // Not computed for async/remote stores yet
613            });
614        }
615    }
616
617    separate_and_sort_arrays_async(store, prefix, arrays).await
618}
619
620/// Async version of discover_arrays_v3 for remote stores
621async fn discover_arrays_v3_async(
622    store: &AsyncReadableListableStorage,
623    prefix: &ObjectPath,
624) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
625    use zarrs::storage::{AsyncListableStorageTraits, StorePrefix};
626
627    let mut arrays: Vec<ZarrArrayMeta> = Vec::new();
628
629    // StorePrefix requires trailing slash
630    let prefix_str = if prefix.as_ref().is_empty() {
631        "/".to_string()
632    } else {
633        format!("{}/", prefix.as_ref().trim_end_matches('/'))
634    };
635    let store_prefix = StorePrefix::new(&prefix_str)
636        .map_err(|e| format!("Invalid prefix '{}': {}", prefix_str, e))?;
637    let entries = store
638        .list_dir(&store_prefix)
639        .await
640        .map_err(|e| format!("Failed to list directory: {}", e))?;
641
642    for subdir in entries.prefixes() {
643        let subdir_str = subdir.as_str().trim_end_matches('/');
644        let zarr_json_path = format!("{}/zarr.json", subdir_str);
645
646        // Try to read zarr.json metadata
647        if let Ok(content) = store_get_string(store, &zarr_json_path).await {
648            let meta: serde_json::Value = serde_json::from_str(&content)?;
649
650            // Only process arrays (not groups)
651            if meta.get("node_type").and_then(|v| v.as_str()) == Some("array") {
652                let name = subdir_str
653                    .trim_end_matches('/')
654                    .rsplit('/')
655                    .next()
656                    .unwrap_or("unknown")
657                    .to_string();
658
659                let shape: Vec<u64> = meta
660                    .get("shape")
661                    .and_then(|v| v.as_array())
662                    .map(|arr| arr.iter().filter_map(|v| v.as_u64()).collect())
663                    .unwrap_or_default();
664
665                let data_type = meta
666                    .get("data_type")
667                    .and_then(|v| v.as_str())
668                    .unwrap_or("float64")
669                    .to_string();
670
671                arrays.push(ZarrArrayMeta {
672                    name,
673                    data_type,
674                    shape,
675                    coord_min_max: None, // Not computed for async/remote stores yet
676                });
677            }
678        }
679    }
680
681    separate_and_sort_arrays_async(store, prefix, arrays).await
682}
683
684/// Separate arrays into coordinates and data variables (async version with min/max computation)
685async fn separate_and_sort_arrays_async(
686    store: &AsyncReadableListableStorage,
687    prefix: &ObjectPath,
688    arrays: Vec<ZarrArrayMeta>,
689) -> Result<ZarrStoreMeta, Box<dyn std::error::Error + Send + Sync>> {
690    use zarrs::array::Array;
691    use zarrs::array_subset::ArraySubset;
692
693    // Use into_iter + partition for single-pass, zero-clone separation
694    let (mut coords, mut data_vars): (Vec<_>, Vec<_>) =
695        arrays.into_iter().partition(|a| a.is_coordinate());
696
697    // Keep data variables in stable alphabetical order for determinism
698    data_vars.sort_by(|a, b| a.name.cmp(&b.name));
699
700    // Try to reorder coordinates to match Zarr arrays' native dimension order
701    // by examining a representative data variable's shape. Fall back to
702    // alphabetical ordering when the mapping is ambiguous.
703    coords = infer_coord_order_from_data_vars(coords, &data_vars);
704
705    // Compute min/max for each coordinate by reading the data (async)
706    for coord in &mut coords {
707        // Path format: "/{prefix}/{coord_name}" - zarrs expects absolute paths
708        let coord_path = format!("/{}/{}", prefix.as_ref(), coord.name);
709        debug!(coord = %coord.name, path = %coord_path, "Attempting to compute min/max");
710        match Array::async_open(store.clone(), &coord_path).await {
711            Err(e) => {
712                debug!(coord = %coord.name, error = %e, "Failed to open coordinate array");
713                continue;
714            }
715            Ok(arr) => {
716                let shape = arr.shape();
717                if let Ok(subset) = ArraySubset::new_with_start_shape(vec![0], shape.to_vec()) {
718                    let min_max: Option<(f64, f64)> = match coord.data_type.as_str() {
719                        "float64" => arr
720                            .async_retrieve_array_subset_elements::<f64>(&subset)
721                            .await
722                            .ok()
723                            .filter(|data| !data.is_empty())
724                            .map(|data| {
725                                let min = data.iter().cloned().fold(f64::INFINITY, f64::min);
726                                let max = data.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
727                                (min, max)
728                            }),
729                        "float32" => arr
730                            .async_retrieve_array_subset_elements::<f32>(&subset)
731                            .await
732                            .ok()
733                            .filter(|data| !data.is_empty())
734                            .map(|data| {
735                                let min = data.iter().cloned().fold(f32::INFINITY, f32::min) as f64;
736                                let max =
737                                    data.iter().cloned().fold(f32::NEG_INFINITY, f32::max) as f64;
738                                (min, max)
739                            }),
740                        "int64" => arr
741                            .async_retrieve_array_subset_elements::<i64>(&subset)
742                            .await
743                            .ok()
744                            .filter(|data| !data.is_empty())
745                            .and_then(|data| {
746                                let min = *data.iter().min()? as f64;
747                                let max = *data.iter().max()? as f64;
748                                Some((min, max))
749                            }),
750                        "int32" => arr
751                            .async_retrieve_array_subset_elements::<i32>(&subset)
752                            .await
753                            .ok()
754                            .filter(|data| !data.is_empty())
755                            .and_then(|data| {
756                                let min = *data.iter().min()? as f64;
757                                let max = *data.iter().max()? as f64;
758                                Some((min, max))
759                            }),
760                        "int16" => arr
761                            .async_retrieve_array_subset_elements::<i16>(&subset)
762                            .await
763                            .ok()
764                            .filter(|data| !data.is_empty())
765                            .and_then(|data| {
766                                let min = *data.iter().min()? as f64;
767                                let max = *data.iter().max()? as f64;
768                                Some((min, max))
769                            }),
770                        _ => None,
771                    };
772
773                    if let Some((min, max)) = min_max {
774                        debug!(
775                            coord = %coord.name,
776                            min = min,
777                            max = max,
778                            "Computed coordinate min/max (async)"
779                        );
780                        coord.coord_min_max = Some((min, max));
781                    }
782                }
783            } // end Ok(arr) =>
784        } // end match
785    }
786
787    // Compute total_rows = product of all coordinate sizes
788    let total_rows: usize = coords.iter().map(|c| c.shape[0] as usize).product();
789
790    Ok(ZarrStoreMeta {
791        coords,
792        data_vars,
793        total_rows,
794    })
795}
796
797/// Async version of infer_schema for remote object stores
798#[instrument(level = "debug", skip_all)]
799pub async fn infer_schema_async(
800    store: &AsyncReadableListableStorage,
801    prefix: &ObjectPath,
802) -> Result<Schema, Box<dyn std::error::Error + Send + Sync>> {
803    let (schema, _meta) = infer_schema_with_meta_async(store, prefix).await?;
804    Ok(schema)
805}
806
807/// Async version of infer_schema that also returns the store metadata
808/// This allows caching the metadata for later use during query execution
809#[instrument(level = "debug", skip_all)]
810pub async fn infer_schema_with_meta_async(
811    store: &AsyncReadableListableStorage,
812    prefix: &ObjectPath,
813) -> Result<(Schema, ZarrStoreMeta), Box<dyn std::error::Error + Send + Sync>> {
814    debug!("Starting async schema inference");
815    let meta = discover_arrays_async(store, prefix).await?;
816
817    let mut fields: Vec<Field> = Vec::new();
818
819    for coord in &meta.coords {
820        fields.push(Field::new(
821            &coord.name,
822            zarr_dtype_to_arrow_dictionary(&coord.data_type),
823            false,
824        ));
825    }
826
827    for var in &meta.data_vars {
828        fields.push(Field::new(
829            &var.name,
830            zarr_dtype_to_arrow(&var.data_type),
831            true,
832        ));
833    }
834
835    info!(num_fields = fields.len(), "Schema inferred");
836    Ok((Schema::new(fields), meta))
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use arrow::datatypes::DataType;
843
844    // ==================== detect_zarr_version tests ====================
845
846    #[test]
847    fn test_detect_zarr_version_v2() {
848        assert_eq!(
849            detect_zarr_version("data/synthetic_v2.zarr").unwrap(),
850            ZarrVersion::V2
851        );
852        assert_eq!(
853            detect_zarr_version("data/synthetic_v2_blosc.zarr").unwrap(),
854            ZarrVersion::V2
855        );
856    }
857
858    #[test]
859    fn test_detect_zarr_version_v3() {
860        assert_eq!(
861            detect_zarr_version("data/synthetic_v3.zarr").unwrap(),
862            ZarrVersion::V3
863        );
864        assert_eq!(
865            detect_zarr_version("data/synthetic_v3_blosc.zarr").unwrap(),
866            ZarrVersion::V3
867        );
868    }
869
870    #[test]
871    fn test_detect_zarr_version_error() {
872        assert!(detect_zarr_version("data/nonexistent.zarr").is_err());
873    }
874
875    // ==================== ZarrArrayMeta tests ====================
876
877    #[test]
878    fn test_array_meta_is_coordinate() {
879        // 1D arrays are coordinates
880        let coord = ZarrArrayMeta {
881            name: "lat".to_string(),
882            data_type: "float64".to_string(),
883            shape: vec![10],
884            coord_min_max: Some((0.0, 90.0)),
885        };
886        assert!(coord.is_coordinate());
887
888        // 2D and 3D arrays are NOT coordinates
889        let data_2d = ZarrArrayMeta {
890            name: "temp".to_string(),
891            data_type: "float64".to_string(),
892            shape: vec![10, 10],
893            coord_min_max: None,
894        };
895        assert!(!data_2d.is_coordinate());
896
897        let data_3d = ZarrArrayMeta {
898            name: "temp".to_string(),
899            data_type: "float64".to_string(),
900            shape: vec![7, 10, 10],
901            coord_min_max: None,
902        };
903        assert!(!data_3d.is_coordinate());
904    }
905
906    // ==================== discover_arrays tests ====================
907
908    #[test]
909    fn test_discover_arrays_v2() {
910        let meta = discover_arrays("data/synthetic_v2.zarr").unwrap();
911
912        // 3 coordinates (native Zarr ordering): time, lat, lon
913        assert_eq!(meta.coords.len(), 3);
914        let coord_names: Vec<_> = meta.coords.iter().map(|c| c.name.as_str()).collect();
915        assert_eq!(coord_names, vec!["time", "lon", "lat"]);
916
917        // 2 data variables (sorted): humidity, temperature
918        assert_eq!(meta.data_vars.len(), 2);
919        let var_names: Vec<_> = meta.data_vars.iter().map(|v| v.name.as_str()).collect();
920        assert_eq!(var_names, vec!["humidity", "temperature"]);
921
922        // Shapes
923        assert_eq!(meta.coords[0].shape, vec![7]); // lat
924        assert_eq!(meta.coords[1].shape, vec![10]); // lon
925        assert_eq!(meta.coords[2].shape, vec![10]); // time
926        assert_eq!(meta.data_vars[0].shape, vec![7, 10, 10]); // humidity
927        assert_eq!(meta.data_vars[1].shape, vec![7, 10, 10]); // temperature
928
929        // All dtypes should be int64 (from <i8)
930        for arr in meta.coords.iter().chain(meta.data_vars.iter()) {
931            assert_eq!(arr.data_type, "int64");
932        }
933    }
934
935    #[test]
936    fn test_discover_arrays_v3() {
937        let meta = discover_arrays("data/synthetic_v3.zarr").unwrap();
938
939        // Same structure as v2 (native ordering)
940        assert_eq!(meta.coords.len(), 3);
941        assert_eq!(meta.data_vars.len(), 2);
942
943        let coord_names: Vec<_> = meta.coords.iter().map(|c| c.name.as_str()).collect();
944        assert_eq!(coord_names, vec!["time", "lon" ,"lat"]);
945
946        let var_names: Vec<_> = meta.data_vars.iter().map(|v| v.name.as_str()).collect();
947        assert_eq!(var_names, vec!["humidity", "temperature"]);
948    }
949
950    // ==================== infer_schema tests ====================
951
952    #[test]
953    fn test_infer_schema_structure() {
954        let schema = infer_schema("data/synthetic_v2.zarr").unwrap();
955
956        // 5 fields: 3 coords + 2 data vars
957        assert_eq!(schema.fields().len(), 5);
958
959        let names: Vec<_> = schema.fields().iter().map(|f| f.name().as_str()).collect();
960        assert_eq!(names, vec!["time", "lon", "lat", "humidity", "temperature"]);
961    }
962
963    #[test]
964    fn test_infer_schema_coord_types() {
965        let schema = infer_schema("data/synthetic_v2.zarr").unwrap();
966
967        // First 3 fields (coordinates) should be Dictionary type, non-nullable
968        for i in 0..3 {
969            let field = schema.field(i);
970            assert!(
971                matches!(field.data_type(), DataType::Dictionary(_, _)),
972                "Coordinate {} should be Dictionary type",
973                field.name()
974            );
975            assert!(
976                !field.is_nullable(),
977                "Coordinate {} should not be nullable",
978                field.name()
979            );
980        }
981    }
982
983    #[test]
984    fn test_infer_schema_data_var_types() {
985        let schema = infer_schema("data/synthetic_v2.zarr").unwrap();
986
987        // Last 2 fields (data vars) should be regular Int64, nullable
988        for i in 3..5 {
989            let field = schema.field(i);
990            assert_eq!(
991                field.data_type(),
992                &DataType::Int64,
993                "Data var {} should be Int64",
994                field.name()
995            );
996            assert!(
997                field.is_nullable(),
998                "Data var {} should be nullable",
999                field.name()
1000            );
1001        }
1002    }
1003
1004    #[test]
1005    fn test_infer_schema_v2_v3_parity() {
1006        let schema_v2 = infer_schema("data/synthetic_v2.zarr").unwrap();
1007        let schema_v3 = infer_schema("data/synthetic_v3.zarr").unwrap();
1008
1009        // Both should produce identical schemas
1010        assert_eq!(schema_v2.fields().len(), schema_v3.fields().len());
1011
1012        for (f2, f3) in schema_v2.fields().iter().zip(schema_v3.fields().iter()) {
1013            assert_eq!(f2.name(), f3.name(), "Field names should match");
1014            assert_eq!(
1015                f2.data_type(),
1016                f3.data_type(),
1017                "Data types should match for {}",
1018                f2.name()
1019            );
1020            assert_eq!(
1021                f2.is_nullable(),
1022                f3.is_nullable(),
1023                "Nullability should match for {}",
1024                f2.name()
1025            );
1026        }
1027    }
1028}