reduct_base/
batch.rs

1// Copyright 2023-2025 ReductSoftware UG
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5
6use crate::error::ReductError;
7use crate::{unprocessable_entity, Labels};
8use http::{HeaderMap, HeaderValue};
9
10pub struct RecordHeader {
11    pub content_length: u64,
12    pub content_type: String,
13    pub labels: Labels,
14}
15
16/// Parse a batched header into a content length, content type, and labels.
17///
18/// # Arguments
19///
20/// * `header` - The batched header to parse.
21///
22/// # Returns
23///
24/// * `content_length` - The content length of the batched header.
25/// * `content_type` - The content type of the batched header.
26/// * `labels` - The labels of the batched header.
27pub fn parse_batched_header(header: &str) -> Result<RecordHeader, ReductError> {
28    let (content_length, rest) = header
29        .split_once(',')
30        .ok_or(unprocessable_entity!("Invalid batched header"))?;
31    let content_length = content_length
32        .trim()
33        .parse::<u64>()
34        .map_err(|_| unprocessable_entity!("Invalid content length"))?;
35
36    let (content_type, rest) = rest
37        .split_once(',')
38        .unwrap_or((rest, "application/octet-stream"));
39
40    let content_type = if content_type.is_empty() {
41        "application/octet-stream".to_string()
42    } else {
43        content_type.trim().to_string()
44    };
45
46    let mut labels = Labels::new();
47    let mut rest = rest.to_string();
48    while let Some(pair) = rest.split_once('=') {
49        let (key, value) = pair;
50        let key = key.trim();
51        if key.starts_with("@") {
52            return Err(unprocessable_entity!(
53                "Label names must not start with '@': reserved for computed labels",
54            ));
55        }
56
57        rest = if value.starts_with('\"') {
58            let value = value[1..].to_string();
59            let (value, rest) = value
60                .split_once('\"')
61                .ok_or(unprocessable_entity!("Invalid batched header"))?;
62            labels.insert(key.trim().to_string(), value.trim().to_string());
63            rest.trim_start_matches(',').trim().to_string()
64        } else if let Some(ret) = value.split_once(',') {
65            let (value, rest) = ret;
66            labels.insert(key.trim().to_string(), value.trim().to_string());
67            rest.trim().to_string()
68        } else {
69            labels.insert(key.to_string(), value.trim().to_string());
70            break;
71        };
72    }
73
74    Ok(RecordHeader {
75        content_length,
76        content_type,
77        labels,
78    })
79}
80
81pub fn sort_headers_by_time(headers: &HeaderMap) -> Result<Vec<(u64, HeaderValue)>, ReductError> {
82    let sorted_headers: Vec<_> = headers
83        .clone()
84        .into_iter()
85        .filter(|(name, _)| name.is_some())
86        .map(|(name, value)| (name.unwrap().to_string(), value))
87        .filter(|(name, _)| name.starts_with("x-reduct-time-"))
88        .map(|(key, value)| (key[14..].parse::<u64>().ok(), (key, value)))
89        .collect();
90
91    for (time, (key, _)) in &sorted_headers {
92        if time.is_none() {
93            return Err(unprocessable_entity!(
94                "Invalid header '{}': must be an unix timestamp in microseconds",
95                key
96            ));
97        }
98    }
99
100    let mut sorted_headers: Vec<(u64, HeaderValue)> = sorted_headers
101        .into_iter()
102        .map(|(time, (_key, value))| (time.unwrap(), value))
103        .collect();
104    sorted_headers.sort_by(|(ts1, _), (ts2, _)| ts1.cmp(ts2));
105    Ok(sorted_headers)
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use rstest::*;
112
113    #[rstest]
114    fn test_parse_batched_header_row() {
115        let header = "123, text/plain, label1=value1, label2=value2";
116        let RecordHeader {
117            content_length,
118            content_type,
119            labels,
120        } = parse_batched_header(header).unwrap();
121        assert_eq!(content_length, 123);
122        assert_eq!(content_type, "text/plain");
123        assert_eq!(labels.len(), 2);
124        assert_eq!(labels.get("label1"), Some(&"value1".to_string()));
125        assert_eq!(labels.get("label2"), Some(&"value2".to_string()));
126    }
127
128    #[rstest]
129    fn test_parse_batched_header_row_quotes() {
130        let header = "123, text/plain, label1=\"[1, 2, 3]\", label2=\"value2\"";
131        let RecordHeader {
132            content_length,
133            content_type,
134            labels,
135        } = parse_batched_header(header).unwrap();
136        assert_eq!(content_length, 123);
137        assert_eq!(content_type, "text/plain");
138        assert_eq!(labels.len(), 2);
139        assert_eq!(labels.get("label1"), Some(&"[1, 2, 3]".to_string()));
140        assert_eq!(labels.get("label2"), Some(&"value2".to_string()));
141    }
142
143    #[rstest]
144    fn test_parse_header_no_labels() {
145        let header = "123, text/plain";
146        let RecordHeader {
147            content_length,
148            content_type,
149            labels,
150        } = parse_batched_header(header).unwrap();
151        assert_eq!(content_length, 123);
152        assert_eq!(content_type, "text/plain");
153        assert_eq!(labels.len(), 0);
154    }
155
156    #[rstest]
157    #[case("")]
158    #[case("xxx")]
159    fn test_parse_header_bad_header(#[case] header: &str) {
160        let err = parse_batched_header(header).err().unwrap();
161        assert_eq!(err, unprocessable_entity!("Invalid batched header"));
162    }
163
164    #[rstest]
165    fn test_parse_header_bad_label() {
166        let err = parse_batched_header("123, text/plain, @label1=value1, label2=value2")
167            .err()
168            .unwrap();
169        assert_eq!(
170            err,
171            unprocessable_entity!(
172                "Label names must not start with '@': reserved for computed labels"
173            )
174        );
175    }
176}