Skip to main content

oxigdal_stac/
collection_aggregation.rs

1//! Collection-level aggregation statistics for STAC collections.
2//!
3//! This module provides a streaming aggregation builder that processes STAC
4//! items one by one and accumulates statistics without buffering all items in
5//! memory.  The final [`CollectionStats`] object is produced by calling
6//! [`CollectionAggregator::build`].
7
8use std::collections::HashMap;
9
10use serde::{Deserialize, Serialize};
11
12// ── Descriptive statistics ─────────────────────────────────────────────────
13
14/// Descriptive statistics for a numeric property across a set of items.
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
16pub struct NumericStats {
17    /// Minimum observed value.
18    pub min: f64,
19    /// Maximum observed value.
20    pub max: f64,
21    /// Arithmetic mean.
22    pub mean: f64,
23    /// Population standard deviation.
24    pub std_dev: f64,
25    /// Number of observations.
26    pub count: u64,
27}
28
29impl NumericStats {
30    /// Computes statistics from a slice of values.
31    ///
32    /// Returns `None` when the slice is empty.
33    pub fn from_values(values: &[f64]) -> Option<Self> {
34        if values.is_empty() {
35            return None;
36        }
37        let n = values.len() as f64;
38        let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
39        let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
40        let mean = values.iter().sum::<f64>() / n;
41        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / n;
42        Some(Self {
43            min,
44            max,
45            mean,
46            std_dev: variance.sqrt(),
47            count: values.len() as u64,
48        })
49    }
50}
51
52// ── Collection stats ───────────────────────────────────────────────────────
53
54/// Aggregated statistics for a STAC collection.
55#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
56pub struct CollectionStats {
57    /// Collection identifier.
58    pub collection_id: String,
59    /// Total number of items ingested.
60    pub item_count: u64,
61    /// Temporal extent `[earliest_datetime, latest_datetime]` in RFC 3339.
62    pub temporal_extent: Option<[String; 2]>,
63    /// Spatial extent `[west, south, east, north]` in WGS 84.
64    pub spatial_extent: Option<[f64; 4]>,
65    /// Cloud cover statistics (populated only when EO items are ingested).
66    pub cloud_cover: Option<NumericStats>,
67    /// Value frequencies for categorical string properties.
68    pub property_frequencies: HashMap<String, HashMap<String, u64>>,
69    /// Count of items per platform identifier.
70    pub platforms: HashMap<String, u64>,
71}
72
73// ── Aggregation builder ─────────────────────────────────────────────────────
74
75/// Streaming aggregation builder for a STAC collection.
76///
77/// Create one, call [`ingest`] for each item, then call [`build`] to obtain
78/// the final [`CollectionStats`].
79///
80/// [`ingest`]: CollectionAggregator::ingest
81/// [`build`]: CollectionAggregator::build
82pub struct CollectionAggregator {
83    collection_id: String,
84    item_count: u64,
85    cloud_covers: Vec<f64>,
86    platforms: HashMap<String, u64>,
87    /// Running counts for categorical string properties.
88    property_counts: HashMap<String, HashMap<String, u64>>,
89    // Bounding-box accumulators.
90    min_lon: f64,
91    max_lon: f64,
92    min_lat: f64,
93    max_lat: f64,
94    has_bbox: bool,
95}
96
97impl CollectionAggregator {
98    /// Creates a new aggregator for the named collection.
99    pub fn new(collection_id: impl Into<String>) -> Self {
100        Self {
101            collection_id: collection_id.into(),
102            item_count: 0,
103            cloud_covers: Vec::new(),
104            platforms: HashMap::new(),
105            property_counts: HashMap::new(),
106            min_lon: f64::INFINITY,
107            max_lon: f64::NEG_INFINITY,
108            min_lat: f64::INFINITY,
109            max_lat: f64::NEG_INFINITY,
110            has_bbox: false,
111        }
112    }
113
114    /// Ingests a single STAC item (as a JSON value) into the aggregator.
115    ///
116    /// Only fields that are present and parseable contribute to the statistics;
117    /// missing or invalid fields are silently ignored.
118    pub fn ingest(&mut self, item: &serde_json::Value) {
119        self.item_count += 1;
120
121        // ── Spatial extent from `bbox` ─────────────────────────────────────
122        if let Some(bbox_arr) = item.get("bbox").and_then(|b| b.as_array()) {
123            if bbox_arr.len() >= 4 {
124                if let (Some(w), Some(s), Some(e), Some(n)) = (
125                    bbox_arr[0].as_f64(),
126                    bbox_arr[1].as_f64(),
127                    bbox_arr[2].as_f64(),
128                    bbox_arr[3].as_f64(),
129                ) {
130                    self.min_lon = self.min_lon.min(w);
131                    self.max_lon = self.max_lon.max(e);
132                    self.min_lat = self.min_lat.min(s);
133                    self.max_lat = self.max_lat.max(n);
134                    self.has_bbox = true;
135                }
136            }
137        }
138
139        let props = item.get("properties").and_then(|p| p.as_object());
140
141        // ── EO cloud cover ─────────────────────────────────────────────────
142        if let Some(cc) = props
143            .and_then(|p| p.get("eo:cloud_cover"))
144            .and_then(|v| v.as_f64())
145        {
146            self.cloud_covers.push(cc);
147        }
148
149        // ── Platform ───────────────────────────────────────────────────────
150        if let Some(platform) = props
151            .and_then(|p| p.get("platform"))
152            .and_then(|v| v.as_str())
153        {
154            *self.platforms.entry(platform.to_string()).or_insert(0) += 1;
155        }
156
157        // ── Generic categorical string properties ──────────────────────────
158        if let Some(props_map) = props {
159            for (key, val) in props_map {
160                if let Some(s) = val.as_str() {
161                    *self
162                        .property_counts
163                        .entry(key.clone())
164                        .or_default()
165                        .entry(s.to_string())
166                        .or_insert(0) += 1;
167                }
168            }
169        }
170    }
171
172    /// Finalises the aggregation and returns the [`CollectionStats`].
173    pub fn build(self) -> CollectionStats {
174        let spatial_extent = if self.has_bbox {
175            Some([self.min_lon, self.min_lat, self.max_lon, self.max_lat])
176        } else {
177            None
178        };
179
180        CollectionStats {
181            collection_id: self.collection_id,
182            item_count: self.item_count,
183            temporal_extent: None, // Populated externally from item datetimes if required
184            spatial_extent,
185            cloud_cover: NumericStats::from_values(&self.cloud_covers),
186            property_frequencies: self.property_counts,
187            platforms: self.platforms,
188        }
189    }
190}
191
192// ── Tests ──────────────────────────────────────────────────────────────────
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197    use serde_json::json;
198
199    fn eo_item(id: &str, cloud: f64, platform: &str, bbox: [f64; 4]) -> serde_json::Value {
200        let [w, s, e, n] = bbox;
201        json!({
202            "id": id,
203            "type": "Feature",
204            "bbox": [w, s, e, n],
205            "properties": {
206                "eo:cloud_cover": cloud,
207                "platform": platform,
208                "constellation": "sentinel"
209            }
210        })
211    }
212
213    #[test]
214    fn test_empty_aggregator() {
215        let agg = CollectionAggregator::new("empty-col");
216        let stats = agg.build();
217        assert_eq!(stats.collection_id, "empty-col");
218        assert_eq!(stats.item_count, 0);
219        assert!(stats.spatial_extent.is_none());
220        assert!(stats.cloud_cover.is_none());
221    }
222
223    #[test]
224    fn test_ingest_bbox_spatial_extent() {
225        let mut agg = CollectionAggregator::new("col-a");
226        agg.ingest(&eo_item("i1", 0.0, "s2a", [-10.0, -5.0, 10.0, 5.0]));
227        agg.ingest(&eo_item("i2", 0.0, "s2a", [-20.0, -15.0, 5.0, 15.0]));
228        let stats = agg.build();
229        let ext = stats.spatial_extent.expect("spatial_extent");
230        assert!((ext[0] - (-20.0)).abs() < 1e-9); // west
231        assert!((ext[1] - (-15.0)).abs() < 1e-9); // south
232        assert!((ext[2] - 10.0).abs() < 1e-9); // east
233        assert!((ext[3] - 15.0).abs() < 1e-9); // north
234    }
235
236    #[test]
237    fn test_cloud_cover_stats() {
238        let mut agg = CollectionAggregator::new("col-b");
239        for cc in [0.0_f64, 25.0, 50.0, 75.0, 100.0] {
240            agg.ingest(&eo_item("x", cc, "plat", [0.0, 0.0, 1.0, 1.0]));
241        }
242        let stats = agg.build();
243        let cc = stats.cloud_cover.expect("cloud_cover");
244        assert!((cc.min - 0.0).abs() < 1e-9);
245        assert!((cc.max - 100.0).abs() < 1e-9);
246        assert!((cc.mean - 50.0).abs() < 1e-9);
247        assert_eq!(cc.count, 5);
248    }
249
250    #[test]
251    fn test_platform_counts() {
252        let mut agg = CollectionAggregator::new("col-c");
253        agg.ingest(&eo_item("a", 10.0, "s2a", [0.0, 0.0, 1.0, 1.0]));
254        agg.ingest(&eo_item("b", 20.0, "s2a", [0.0, 0.0, 1.0, 1.0]));
255        agg.ingest(&eo_item("c", 30.0, "s2b", [0.0, 0.0, 1.0, 1.0]));
256        let stats = agg.build();
257        assert_eq!(stats.platforms["s2a"], 2);
258        assert_eq!(stats.platforms["s2b"], 1);
259    }
260
261    #[test]
262    fn test_numeric_stats_from_values() {
263        let vals = vec![2.0_f64, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0];
264        let ns = NumericStats::from_values(&vals).expect("stats");
265        assert!((ns.mean - 5.0).abs() < 1e-9);
266        assert!((ns.min - 2.0).abs() < 1e-9);
267        assert!((ns.max - 9.0).abs() < 1e-9);
268        // Population std dev = 2.0
269        assert!((ns.std_dev - 2.0).abs() < 1e-9);
270    }
271
272    #[test]
273    fn test_numeric_stats_empty() {
274        assert!(NumericStats::from_values(&[]).is_none());
275    }
276
277    #[test]
278    fn test_multiple_items_aggregated() {
279        let mut agg = CollectionAggregator::new("multi");
280        for i in 0..10 {
281            agg.ingest(&eo_item(
282                &format!("item-{}", i),
283                (i * 10) as f64,
284                "plat-x",
285                [0.0, 0.0, 1.0, 1.0],
286            ));
287        }
288        let stats = agg.build();
289        assert_eq!(stats.item_count, 10);
290        let cc = stats.cloud_cover.expect("cloud");
291        assert!((cc.mean - 45.0).abs() < 1e-9);
292        assert_eq!(stats.platforms["plat-x"], 10);
293    }
294
295    #[test]
296    fn test_property_frequencies() {
297        let mut agg = CollectionAggregator::new("freq-col");
298        agg.ingest(&eo_item("i1", 10.0, "s2a", [0.0, 0.0, 1.0, 1.0]));
299        agg.ingest(&eo_item("i2", 20.0, "s2a", [0.0, 0.0, 1.0, 1.0]));
300        agg.ingest(&eo_item("i3", 30.0, "s2b", [0.0, 0.0, 1.0, 1.0]));
301        let stats = agg.build();
302        // "platform" is captured as a categorical property
303        let platform_freqs = &stats.property_frequencies["platform"];
304        assert_eq!(platform_freqs["s2a"], 2);
305        assert_eq!(platform_freqs["s2b"], 1);
306    }
307
308    #[test]
309    fn test_serialization_roundtrip() {
310        let mut agg = CollectionAggregator::new("rt-col");
311        agg.ingest(&eo_item("rt-1", 15.0, "s2a", [-5.0, -5.0, 5.0, 5.0]));
312        let stats = agg.build();
313        let json = serde_json::to_string(&stats).expect("serialize");
314        let back: CollectionStats = serde_json::from_str(&json).expect("deserialize");
315        assert_eq!(stats, back);
316    }
317}