ecmwf_opendata/
client.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fs::OpenOptions;
3use std::io::{Read, Write};
4
5use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
6use reqwest::blocking::Client as HttpClient;
7use reqwest::header::{HeaderMap, HeaderValue, RANGE, USER_AGENT};
8
9use crate::date::{canonical_time_to_hour, expand_date_value, expand_time_value, full_datetime_from_date_time};
10use crate::error::{Error, Result as EResult};
11use crate::request::{expand_numeric_syntax, Request, RequestValue};
12use crate::sources::{is_http_url, source_to_base_url};
13use crate::url_builder::{format_url, patch_stream, user_to_url_value, HOURLY_PATTERN, MONTHLY_PATTERN};
14
15const URL_COMPONENTS: [&str; 8] = [
16    "date", "time", "model", "resol", "stream", "type", "step", "fcmonth",
17];
18
19const INDEX_COMPONENTS: [&str; 6] = ["param", "type", "step", "fcmonth", "number", "levelist"];
20
21#[derive(Debug, Clone)]
22pub struct ClientOptions {
23    pub source: String,
24    pub model: String,
25    pub resol: String,
26    pub beta: bool,
27    pub preserve_request_order: bool,
28    pub infer_stream_keyword: bool,
29    pub verify_tls: bool,
30    pub use_sas_token: Option<bool>,
31    pub sas_known_key: String,
32    pub sas_custom_url: Option<String>,
33}
34
35impl Default for ClientOptions {
36    fn default() -> Self {
37        Self {
38            source: "ecmwf".to_string(),
39            model: "ifs".to_string(),
40            resol: "0p25".to_string(),
41            beta: false,
42            preserve_request_order: false,
43            infer_stream_keyword: true,
44            verify_tls: true,
45            use_sas_token: None,
46            sas_known_key: "ecmwf".to_string(),
47            sas_custom_url: None,
48        }
49    }
50}
51
52#[derive(Debug, Clone)]
53pub struct Result {
54    pub urls: Vec<String>,
55    pub target: String,
56    pub datetime: DateTime<Utc>,
57    pub for_urls: BTreeMap<String, Vec<String>>,
58    pub for_index: BTreeMap<String, Vec<String>>,
59    pub size_bytes: u64,
60}
61
62#[derive(Debug, Clone)]
63pub struct Client {
64    opts: ClientOptions,
65    base_url: String,
66    http: HttpClient,
67    sas_token: Option<String>,
68}
69
70impl Client {
71    pub fn new(opts: ClientOptions) -> EResult<Self> {
72        let base_url = if is_http_url(&opts.source) {
73            opts.source.clone()
74        } else {
75            source_to_base_url(&opts.source)
76                .ok_or_else(|| Error::InvalidRequest(format!("unknown source: {}", opts.source)))?
77                .to_string()
78        };
79
80        let mut headers = HeaderMap::new();
81        headers.insert(
82            USER_AGENT,
83            HeaderValue::from_static("ecmwf-opendata-rs/0.1"),
84        );
85
86        let mut builder = HttpClient::builder().default_headers(headers);
87        if !opts.verify_tls {
88            builder = builder.danger_accept_invalid_certs(true);
89        }
90        let http = builder.build()?;
91
92        let use_sas = opts
93            .use_sas_token
94            .unwrap_or_else(|| opts.source == "azure");
95
96        let mut client = Self {
97            base_url,
98            http,
99            opts,
100            sas_token: None,
101        };
102
103        if use_sas {
104            let token = client.get_azure_sas_token()?;
105            client.sas_token = Some(token);
106        }
107
108        Ok(client)
109    }
110
111    pub fn retrieve(&self, request: Request, target: impl Into<String>) -> EResult<Result> {
112        let target = target.into();
113        let res = self.get_urls(Some(&request), true, Some(&target))?;
114        self.download_result(&res, true)
115    }
116
117    /// Python-like convenience: `retrieve(request)` where `target` may be inside the request.
118    /// If no target is provided, defaults to `data.grib2`.
119    pub fn retrieve_request(&self, request: Request) -> EResult<Result> {
120        let res = self.get_urls(Some(&request), true, None)?;
121        self.download_result(&res, true)
122    }
123
124    /// Python-kwargs-like convenience: build a request from pairs and retrieve it.
125    ///
126    /// Example:
127    /// `client.retrieve_pairs([("step", 240), ("type", "fc"), ("param", "msl")])?;`
128    pub fn retrieve_pairs<K>(
129        &self,
130        pairs: impl IntoIterator<Item = (K, RequestValue)>,
131    ) -> EResult<Result>
132    where
133        K: Into<String>,
134    {
135        self.retrieve_request(Request::from_pairs(pairs))
136    }
137
138    pub fn download(&self, request: Request, target: impl Into<String>) -> EResult<Result> {
139        let target = target.into();
140        let res = self.get_urls(Some(&request), false, Some(&target))?;
141        self.download_result(&res, false)
142    }
143
144    /// Python-like convenience: `download(request)` where `target` may be inside the request.
145    /// If no target is provided, defaults to `data.grib2`.
146    pub fn download_request(&self, request: Request) -> EResult<Result> {
147        let res = self.get_urls(Some(&request), false, None)?;
148        self.download_result(&res, false)
149    }
150
151    pub fn latest(&self, request: Request) -> EResult<DateTime<Utc>> {
152        self.latest_inner(&request)
153    }
154
155    /// Convenience constructor similar to Python's `Client()` defaults.
156    pub fn default_client() -> EResult<Self> {
157        Self::new(ClientOptions::default())
158    }
159
160    fn latest_inner(&self, request: &Request) -> EResult<DateTime<Utc>> {
161        let mut params = request.clone().into_inner();
162
163        let now = Utc::now();
164
165        // If time not in request: probe the most recent 6-hour cycle and step back by 6 hours.
166        // If time is in request: keep that hour and step back by 1 day.
167        let has_time = params.contains_key("time");
168        let delta = if has_time { Duration::days(1) } else { Duration::hours(6) };
169
170        let time_hour = if let Some(tv) = params.get("time") {
171            let t = tv.as_strings().get(0).cloned().unwrap_or_else(|| "18".into());
172            canonical_time_to_hour(&t)?
173        } else {
174            18
175        };
176
177        let mut candidate = if has_time {
178            // Start at today with that hour, but never in the future.
179            let start_date = now.date_naive();
180            let mut dt = Utc
181                .with_ymd_and_hms(
182                    start_date.year(),
183                    start_date.month(),
184                    start_date.day(),
185                    time_hour,
186                    0,
187                    0,
188                )
189                .single()
190                .ok_or_else(|| Error::InvalidRequest("invalid start datetime".into()))?;
191            if dt > now {
192                dt = dt - Duration::days(1);
193            }
194            dt
195        } else {
196            // Round down to the nearest 6-hour cycle: 00/06/12/18.
197            let hour = (now.hour() / 6) * 6;
198            Utc.with_ymd_and_hms(now.year(), now.month(), now.day(), hour, 0, 0)
199                .single()
200                .ok_or_else(|| Error::InvalidRequest("invalid start datetime".into()))?
201        };
202
203        // Search back up to ~5 days.
204        let stop = candidate - Duration::days(5);
205
206        loop {
207            if candidate <= stop {
208                break;
209            }
210
211            params.insert(
212                "date".to_string(),
213                RequestValue::Str(candidate.format("%Y%m%d").to_string()),
214            );
215            let probe_hour: u32 = if has_time {
216                time_hour
217            } else {
218                candidate.hour()
219            };
220            params.insert("time".to_string(), RequestValue::Int(probe_hour as i64));
221
222            let tmp_req = Request::from_inner(params.clone());
223            let res = self.get_urls(Some(&tmp_req), false, None)?;
224
225            let mut ok = !res.urls.is_empty();
226            for u in &res.urls {
227                let url = self.apply_sas_to_url(u);
228                if !self.probe_exists(&url)? {
229                    ok = false;
230                    break;
231                }
232            }
233            if ok {
234                return Ok(candidate);
235            }
236
237            candidate = candidate - delta;
238        }
239
240        Err(Error::CannotEstablishLatest)
241    }
242
243    /// Probe a URL for existence.
244    ///
245    /// Upstream Python uses HTTP HEAD. Some endpoints may block HEAD or respond
246    /// with non-200 even though GET works; in that case we fall back to a tiny
247    /// ranged GET.
248    fn probe_exists(&self, url: &str) -> EResult<bool> {
249        // Try HEAD first (cheap when supported).
250        match self.http.head(url).send() {
251            Ok(resp) => {
252                if resp.status() == 200 {
253                    return Ok(true);
254                }
255
256                // If HEAD is not usable, fall back to a ranged GET.
257                if matches!(
258                    resp.status().as_u16(),
259                    403 | 404 | 405 | 409 | 429 | 500 | 501 | 502 | 503
260                ) {
261                    // continue to GET probe
262                } else {
263                    return Ok(false);
264                }
265            }
266            Err(_) => {
267                // Fall back to GET probe.
268            }
269        }
270
271        // GET with a single byte range; accept 206 (partial) or 200.
272        let resp = self
273            .http
274            .get(url)
275            .header(RANGE, "bytes=0-0")
276            .send()?;
277
278        Ok(matches!(resp.status().as_u16(), 200 | 206))
279    }
280
281    fn get_urls(
282        &self,
283        request: Option<&Request>,
284        use_index: bool,
285        target: Option<&str>,
286    ) -> EResult<Result> {
287        let mut params = match request {
288            Some(r) => r.clone().into_inner(),
289            None => BTreeMap::new(),
290        };
291
292        // defaults
293        let model = params
294            .get("model")
295            .map(|v| v.as_strings().get(0).cloned().unwrap_or_else(|| self.opts.model.clone()))
296            .unwrap_or_else(|| self.opts.model.clone());
297
298        if model == "aifs-ens" && !params.contains_key("stream") {
299            params.insert("stream".to_string(), RequestValue::Str("enfo".to_string()));
300        }
301
302        params.entry("model".to_string()).or_insert(RequestValue::Str(model.clone()));
303        params
304            .entry("resol".to_string())
305            .or_insert(RequestValue::Str(self.opts.resol.clone()));
306
307        params.entry("type".to_string()).or_insert(RequestValue::Str("fc".to_string()));
308        params
309            .entry("stream".to_string())
310            .or_insert(RequestValue::Str("oper".to_string()));
311
312        // If date missing, resolve latest.
313        if !params.contains_key("date") {
314            let tmp_req = Request::from_inner(params.clone());
315            let latest = self.latest_inner(&tmp_req)?;
316            params.insert(
317                "date".to_string(),
318                RequestValue::Str(latest.format("%Y%m%d").to_string()),
319            );
320            // Keep request's time if present; else use latest hour.
321            if !params.contains_key("time") {
322                params.insert("time".to_string(), RequestValue::Int(latest.hour() as i64));
323            }
324        }
325
326        // Normalize / expand into for_urls and for_index
327        let now = Utc::now();
328
329        let mut for_urls: BTreeMap<String, Vec<String>> = BTreeMap::new();
330        let mut for_index: BTreeMap<String, Vec<String>> = BTreeMap::new();
331
332        // Build for_urls types first to allow step mapping for probabilities.
333        let typ_values_user: Vec<String> = params
334            .get("type")
335            .map(|v| v.as_strings())
336            .unwrap_or_else(|| vec!["fc".to_string()]);
337
338        let mut for_urls_type: Vec<String> = Vec::new();
339        for tv in typ_values_user {
340            for_urls_type.push(user_to_url_value(&model, "type", &tv, &[]));
341        }
342        if for_urls_type.is_empty() {
343            for_urls_type.push("fc".to_string());
344        }
345        for_urls.insert("type".to_string(), unique_preserve(for_urls_type));
346
347        // Process each param
348        for (k, v) in params.iter() {
349            let mut values = v.as_strings();
350
351            // allow slash-separated lists
352            if values.len() == 1 && values[0].contains('/') {
353                values = values[0]
354                    .split('/')
355                    .filter(|t| !t.is_empty())
356                    .map(|t| t.to_string())
357                    .collect();
358            }
359
360            let expanded: Vec<String> = match k.as_str() {
361                "date" => {
362                    let mut out = Vec::new();
363                    for x in values {
364                        out.extend(expand_date_value(&x, now)?);
365                    }
366                    out
367                }
368                "time" => {
369                    let mut out = Vec::new();
370                    for x in values {
371                        out.extend(expand_time_value(&x)?);
372                    }
373                    out
374                }
375                "step" | "fcmonth" | "number" | "levelist" => {
376                    let mut out = Vec::new();
377                    for x in values {
378                        out.extend(expand_numeric_syntax(&x)?);
379                    }
380                    out
381                }
382                _ => values,
383            };
384
385            if URL_COMPONENTS.contains(&k.as_str()) {
386                let mut mapped = Vec::new();
387                for x in &expanded {
388                    let url_t = for_urls.get("type").cloned().unwrap_or_default();
389                    mapped.push(user_to_url_value(&model, k, x, &url_t));
390                }
391                for_urls
392                    .entry(k.clone())
393                    .or_default()
394                    .extend(mapped);
395            }
396
397            if INDEX_COMPONENTS.contains(&k.as_str()) {
398                // user_to_index: type=ef expands to cf/pf for index selection.
399                let mut mapped = Vec::new();
400                if k == "type" {
401                    for x in &expanded {
402                        if x == "ef" {
403                            mapped.push("cf".to_string());
404                            mapped.push("pf".to_string());
405                        } else {
406                            mapped.push(x.clone());
407                        }
408                    }
409                } else {
410                    mapped = expanded.clone();
411                }
412                for_index.entry(k.clone()).or_default().extend(mapped);
413            }
414        }
415
416        // Canonicalize time: store hour string (00/06/12/18)
417        if let Some(times) = for_urls.get_mut("time") {
418            let mut out = Vec::new();
419            for t in times.drain(..) {
420                let hour = canonical_time_to_hour(&t)?;
421                out.push(format!("{hour:02}"));
422            }
423            *times = unique_preserve(out);
424        }
425
426        // Infer/patch stream in URL building; we keep stream values but will patch later per product.
427        for (k, vals) in for_urls.iter_mut() {
428            *vals = unique_preserve(std::mem::take(vals));
429            if k == "stream" || k == "type" {
430                vals.iter_mut().for_each(|s| s.make_ascii_lowercase());
431            }
432        }
433        for (k, vals) in for_index.iter_mut() {
434            *vals = unique_preserve(std::mem::take(vals));
435            if k == "stream" || k == "type" {
436                vals.iter_mut().for_each(|s| s.make_ascii_lowercase());
437            }
438        }
439
440        // If tf (tropical cyclone tracks), do not use index selection.
441        let user_type = params
442            .get("type")
443            .map(|v| v.as_strings().get(0).cloned().unwrap_or_else(|| "fc".into()))
444            .unwrap_or_else(|| "fc".into());
445        if user_type == "tf" {
446            for_index.clear();
447        }
448
449        // If time missing (possible if date contains time), default time based on date.
450        if !for_urls.contains_key("time") {
451            for_urls.insert("time".to_string(), vec!["18".to_string()]);
452        }
453
454        // Now expand into concrete URLs
455        let mut urls = Vec::new();
456        let mut dates = BTreeSet::new();
457
458        let date_vals = for_urls
459            .get("date")
460            .cloned()
461            .ok_or_else(|| Error::InvalidRequest("date missing after normalization".into()))?;
462        let time_vals = for_urls
463            .get("time")
464            .cloned()
465            .ok_or_else(|| Error::InvalidRequest("time missing after normalization".into()))?;
466
467        let model_vals = for_urls.get("model").cloned().unwrap_or_else(|| vec![model.clone()]);
468        let resol_vals = for_urls
469            .get("resol")
470            .cloned()
471            .unwrap_or_else(|| vec![self.opts.resol.clone()]);
472        let stream_vals = for_urls
473            .get("stream")
474            .cloned()
475            .unwrap_or_else(|| vec!["oper".to_string()]);
476        let type_vals = for_urls
477            .get("type")
478            .cloned()
479            .unwrap_or_else(|| vec!["fc".to_string()]);
480        let step_vals_opt = for_urls.get("step").cloned();
481        let fcmonth_vals = for_urls
482            .get("fcmonth")
483            .cloned()
484            .unwrap_or_else(|| vec!["1".to_string()]);
485
486        for d in &date_vals {
487            for t in &time_vals {
488                let dt = full_datetime_from_date_time(d, t.parse::<u32>().map_err(|_| {
489                    Error::InvalidRequest(format!("invalid canonical time hour: {t}"))
490                })?)?;
491                dates.insert(dt);
492
493                for m in &model_vals {
494                    for r in &resol_vals {
495                        for s in &stream_vals {
496                            for ty in &type_vals {
497                                // patch stream based on time and type
498                                let hour_2d = dt.format("%H").to_string();
499                                let patched_stream = patch_stream(
500                                    self.opts.infer_stream_keyword,
501                                    m,
502                                    s,
503                                    &hour_2d,
504                                    ty,
505                                );
506
507                                let is_monthly = s == "mmsa" || s == "mmsf";
508                                let pattern = if is_monthly {
509                                    MONTHLY_PATTERN
510                                } else {
511                                    HOURLY_PATTERN
512                                };
513
514                                // beta tweaks
515                                let mut resol = r.clone();
516                                if self.opts.beta {
517                                    resol = format!("{resol}/experimental");
518                                }
519
520                                if is_monthly {
521                                    for fcmonth in &fcmonth_vals {
522                                        let u = format_url(
523                                            pattern,
524                                            &self.base_url,
525                                            dt,
526                                            m,
527                                            &resol,
528                                            &patched_stream,
529                                            ty,
530                                            None,
531                                            Some(fcmonth),
532                                        );
533                                        urls.push(self.fix_0p4_beta(u));
534                                    }
535                                } else {
536                                    let steps_for_url: Vec<String> = match &step_vals_opt {
537                                        Some(v) => v.clone(),
538                                        None => vec![default_step_for_url(&patched_stream, ty, dt.hour())],
539                                    };
540                                    for step in &steps_for_url {
541                                        let u = format_url(
542                                            pattern,
543                                            &self.base_url,
544                                            dt,
545                                            m,
546                                            &resol,
547                                            &patched_stream,
548                                            ty,
549                                            Some(step),
550                                            None,
551                                        );
552                                        urls.push(self.fix_0p4_beta(u));
553                                    }
554                                }
555                            }
556                        }
557                    }
558                }
559            }
560        }
561
562        urls = unique_preserve(urls);
563
564        let dt = *dates
565            .iter()
566            .next()
567            .ok_or_else(|| Error::InvalidRequest("no datetime".into()))?;
568
569        let target_path = target
570            .map(|s| s.to_string())
571            .or_else(|| params.get("target").map(|v| v.as_strings().get(0).cloned()).flatten())
572            .unwrap_or_else(|| "data.grib2".to_string());
573
574        let mut res = Result {
575            urls,
576            target: target_path,
577            datetime: dt,
578            for_urls,
579            for_index,
580            size_bytes: 0,
581        };
582
583        if use_index && !res.for_index.is_empty() {
584            res.urls = self.expand_urls_to_ranges(&res.urls, &res.for_index)?;
585        }
586
587        Ok(res)
588    }
589
590    fn fix_0p4_beta(&self, url: String) -> String {
591        if self.opts.resol == "0p4-beta" {
592            url.replace("/ifs/", "/")
593        } else {
594            url
595        }
596    }
597
598    fn get_azure_sas_token(&self) -> EResult<String> {
599        let known = match self.opts.sas_known_key.as_str() {
600            "ecmwf" => Some("https://planetarycomputer.microsoft.com/api/sas/v1/token/ai4edataeuwest/ecmwf"),
601            _ => None,
602        };
603
604        let url = if let Some(u) = known {
605            u.to_string()
606        } else if let Some(custom) = &self.opts.sas_custom_url {
607            custom.clone()
608        } else {
609            return Err(Error::InvalidRequest(
610                "no known sas token url and no custom provided".into(),
611            ));
612        };
613
614        let v: serde_json::Value = self.http.get(url).send()?.error_for_status()?.json()?;
615        let token = v
616            .get("token")
617            .and_then(|x| x.as_str())
618            .ok_or_else(|| Error::InvalidRequest("invalid sas token response".into()))?;
619        Ok(token.to_string())
620    }
621
622    fn apply_sas_to_url(&self, url: &str) -> String {
623        let Some(token) = &self.sas_token else {
624            return url.to_string();
625        };
626        if url.contains("sig=") {
627            return url.to_string();
628        }
629        if url.contains('?') {
630            format!("{url}&{token}")
631        } else {
632            format!("{url}?{token}")
633        }
634    }
635
636    /// Expand each data URL to (url, ranges) by reading its `.index`.
637    ///
638    /// This returns a list of synthetic URLs with embedded range data encoded as
639    /// `url|start-end;start-end;...`.
640    /// The actual download uses these to issue HTTP Range requests.
641    fn expand_urls_to_ranges(
642        &self,
643        urls: &[String],
644        for_index: &BTreeMap<String, Vec<String>>,
645    ) -> EResult<Vec<String>> {
646        // Keep index keyword order consistent with upstream.
647        let ordered_keys: Vec<&str> = INDEX_COMPONENTS
648            .iter()
649            .copied()
650            .filter(|k| for_index.contains_key(*k))
651            .collect();
652
653        let mut out = Vec::new();
654        for url in urls {
655            let base = url.rsplit_once('.').map(|(b, _)| b).unwrap_or(url);
656            let index_url = format!("{base}.index");
657            let index_url = self.apply_sas_to_url(&index_url);
658
659            let resp = self.http.get(index_url).send()?.error_for_status()?;
660            let mut body = String::new();
661            let mut reader = resp;
662            reader.read_to_string(&mut body)?;
663
664            if ordered_keys.is_empty() {
665                // No index keywords, nothing to do.
666                out.push(url.clone());
667                continue;
668            }
669
670            if self.opts.preserve_request_order {
671                // (sort_key, (offset,length)) where sort_key is a lexicographic tuple
672                // capturing requested keyword/value order.
673                let mut parts: Vec<(Vec<(usize, usize)>, (u64, u64))> = Vec::new();
674
675                for line in body.lines() {
676                    if line.trim().is_empty() {
677                        continue;
678                    }
679                    let v: serde_json::Value = serde_json::from_str(line)?;
680                    let offset = v
681                        .get("_offset")
682                        .and_then(|x| x.as_u64())
683                        .ok_or_else(|| Error::InvalidRequest("index missing _offset".into()))?;
684                    let length = v
685                        .get("_length")
686                        .and_then(|x| x.as_u64())
687                        .ok_or_else(|| Error::InvalidRequest("index missing _length".into()))?;
688
689                    let mut key: Vec<(usize, usize)> = Vec::with_capacity(ordered_keys.len());
690
691                    let mut ok = true;
692                    for (i, k) in ordered_keys.iter().enumerate() {
693                        let Some(val) = v.get(*k).and_then(|x| x.as_str()) else {
694                            ok = false;
695                            break;
696                        };
697                        let allowed = for_index
698                            .get(*k)
699                            .ok_or_else(|| Error::InvalidRequest("internal for_index missing key".into()))?;
700                        let Some(j) = allowed.iter().position(|a| a == val) else {
701                            ok = false;
702                            break;
703                        };
704                        key.push((i, j));
705                    }
706
707                    if ok {
708                        parts.push((key, (offset, length)));
709                    }
710                }
711
712                if parts.is_empty() {
713                    continue;
714                }
715
716                parts.sort_by(|a, b| a.0.cmp(&b.0));
717
718                let ranges: Vec<(u64, u64)> = parts.into_iter().map(|(_, r)| r).collect();
719                let merged = merge_ranges(ranges);
720
721                let mut enc = String::new();
722                for (i, (start, end)) in merged.iter().enumerate() {
723                    if i > 0 {
724                        enc.push(';');
725                    }
726                    enc.push_str(&format!("{start}-{end}"));
727                }
728
729                out.push(format!("{url}|{enc}"));
730            } else {
731                // Fast path: sort by file offset (minimize HTTP requests).
732                let mut matches: Vec<(u64, u64)> = Vec::new();
733
734                for line in body.lines() {
735                    if line.trim().is_empty() {
736                        continue;
737                    }
738                    let v: serde_json::Value = serde_json::from_str(line)?;
739                    let offset = v
740                        .get("_offset")
741                        .and_then(|x| x.as_u64())
742                        .ok_or_else(|| Error::InvalidRequest("index missing _offset".into()))?;
743                    let length = v
744                        .get("_length")
745                        .and_then(|x| x.as_u64())
746                        .ok_or_else(|| Error::InvalidRequest("index missing _length".into()))?;
747
748                    let mut ok = true;
749                    for k in &ordered_keys {
750                        let Some(val) = v.get(*k).and_then(|x| x.as_str()) else {
751                            ok = false;
752                            break;
753                        };
754                        let allowed = for_index
755                            .get(*k)
756                            .ok_or_else(|| Error::InvalidRequest("internal for_index missing key".into()))?;
757                        if !allowed.iter().any(|a| a == val) {
758                            ok = false;
759                            break;
760                        }
761                    }
762
763                    if ok {
764                        matches.push((offset, length));
765                    }
766                }
767
768                if matches.is_empty() {
769                    continue;
770                }
771
772                matches.sort_by_key(|(o, _)| *o);
773                let merged = merge_ranges(matches);
774
775                let mut enc = String::new();
776                for (i, (start, end)) in merged.iter().enumerate() {
777                    if i > 0 {
778                        enc.push(';');
779                    }
780                    enc.push_str(&format!("{start}-{end}"));
781                }
782
783                out.push(format!("{url}|{enc}"));
784            }
785        }
786
787        if out.is_empty() {
788            return Err(Error::NoMatchingIndex);
789        }
790
791        Ok(out)
792    }
793
794    fn download_result(&self, res: &Result, is_partial: bool) -> EResult<Result> {
795        let mut total: u64 = 0;
796        let mut file = OpenOptions::new()
797            .create(true)
798            .write(true)
799            .truncate(true)
800            .open(&res.target)?;
801
802        for u in &res.urls {
803            if is_partial {
804                let (url, ranges) = split_url_ranges(u)?;
805                for (start, end) in ranges {
806                    let url = self.apply_sas_to_url(url);
807                    let range_header = format!("bytes={start}-{end}");
808                    let mut resp = self
809                        .http
810                        .get(url)
811                        .header(RANGE, range_header)
812                        .send()?
813                        .error_for_status()?;
814                    let mut buf = Vec::new();
815                    resp.copy_to(&mut buf)?;
816                    file.write_all(&buf)?;
817                    total += buf.len() as u64;
818                }
819            } else {
820                let url = self.apply_sas_to_url(u);
821                let mut resp = self.http.get(url).send()?.error_for_status()?;
822                let mut buf = Vec::new();
823                resp.copy_to(&mut buf)?;
824                file.write_all(&buf)?;
825                total += buf.len() as u64;
826            }
827        }
828
829        let mut out = res.clone();
830        out.size_bytes = total;
831        Ok(out)
832    }
833}
834
835fn default_step_for_url(patched_stream: &str, typ: &str, hour: u32) -> String {
836    let is_00_12 = hour == 0 || hour == 12;
837    let is_hres = matches!(patched_stream, "oper" | "wave" | "scda" | "scwv");
838    let is_ens = matches!(patched_stream, "enfo" | "waef");
839
840    match (typ, is_ens, is_hres, is_00_12) {
841        // Probabilities are advertised for 00/12; the file step is the max horizon.
842        ("ep", true, _, _) => "360".to_string(),
843        ("ep", false, _, _) => "360".to_string(),
844
845        // Tropical cyclone tracks.
846        ("tf", true, _, true) => "240".to_string(),
847        ("tf", true, _, false) => "144".to_string(),
848        ("tf", false, _, true) => "240".to_string(),
849        ("tf", false, _, false) => "90".to_string(),
850
851        // HRES (deterministic).
852        ("fc", false, true, true) => "240".to_string(),
853        ("fc", false, true, false) => "90".to_string(),
854
855        // ENS (cf/pf/em/es) when stream is inferred to enfo/waef.
856        ("cf" | "pf" | "em" | "es", true, _, true) => "360".to_string(),
857        ("cf" | "pf" | "em" | "es", true, _, false) => "144".to_string(),
858
859        // Fallbacks.
860        (_, true, _, true) => "360".to_string(),
861        (_, true, _, false) => "144".to_string(),
862        (_, false, true, true) => "240".to_string(),
863        (_, false, true, false) => "90".to_string(),
864        _ => "240".to_string(),
865    }
866}
867
868#[cfg(test)]
869mod client_tests {
870    use super::default_step_for_url;
871
872    #[test]
873    fn default_step_matches_readme_table() {
874        // HRES 00/12
875        assert_eq!(default_step_for_url("oper", "fc", 0), "240");
876        // HRES 06/18
877        assert_eq!(default_step_for_url("scda", "fc", 6), "90");
878        // ENS 00/12
879        assert_eq!(default_step_for_url("enfo", "pf", 0), "360");
880        // ENS 06/18
881        assert_eq!(default_step_for_url("enfo", "pf", 18), "144");
882        // Probabilities
883        assert_eq!(default_step_for_url("enfo", "ep", 0), "360");
884    }
885}
886
887fn unique_preserve(xs: Vec<String>) -> Vec<String> {
888    let mut seen = BTreeSet::new();
889    let mut out = Vec::new();
890    for x in xs {
891        if seen.insert(x.clone()) {
892            out.push(x);
893        }
894    }
895    out
896}
897
898fn merge_ranges(mut matches: Vec<(u64, u64)>) -> Vec<(u64, u64)> {
899    // input is (offset, length) -> convert to inclusive (start,end)
900    if matches.is_empty() {
901        return Vec::new();
902    }
903    if matches.len() == 1 {
904        let (o, l) = matches[0];
905        return vec![(o, o + l - 1)];
906    }
907
908    // Ensure sorted by offset.
909    matches.sort_by_key(|(o, _)| *o);
910
911    let mut out: Vec<(u64, u64)> = Vec::new();
912    for (o, l) in matches {
913        let start = o;
914        let end = o + l - 1;
915        if let Some(last) = out.last_mut() {
916            if start <= last.1 + 1 {
917                last.1 = last.1.max(end);
918                continue;
919            }
920        }
921        out.push((start, end));
922    }
923    out
924}
925
926fn split_url_ranges(s: &str) -> EResult<(&str, Vec<(u64, u64)>)> {
927    let Some((url, enc)) = s.split_once('|') else {
928        return Err(Error::InvalidRequest("expected ranged url encoding".into()));
929    };
930
931    let mut ranges = Vec::new();
932    for part in enc.split(';').filter(|p| !p.is_empty()) {
933        let Some((a, b)) = part.split_once('-') else {
934            return Err(Error::InvalidRequest(format!("bad range: {part}")));
935        };
936        let start: u64 = a.parse().map_err(|_| Error::InvalidRequest(format!("bad range: {part}")))?;
937        let end: u64 = b.parse().map_err(|_| Error::InvalidRequest(format!("bad range: {part}")))?;
938        if end < start {
939            return Err(Error::InvalidRequest(format!("bad range: {part}")));
940        }
941        ranges.push((start, end));
942    }
943
944    Ok((url, ranges))
945}