1use crate::error::ReductError;
7use crate::{unprocessable_entity, Labels};
8use http::{HeaderMap, HeaderValue};
9
10pub struct RecordHeader {
11 pub content_length: u64,
12 pub content_type: String,
13 pub labels: Labels,
14}
15
16pub fn parse_batched_header(header: &str) -> Result<RecordHeader, ReductError> {
28 let (content_length, rest) = header
29 .split_once(',')
30 .ok_or(unprocessable_entity!("Invalid batched header"))?;
31 let content_length = content_length
32 .trim()
33 .parse::<u64>()
34 .map_err(|_| unprocessable_entity!("Invalid content length"))?;
35
36 let (content_type, rest) = rest
37 .split_once(',')
38 .unwrap_or((rest, "application/octet-stream"));
39
40 let content_type = if content_type.is_empty() {
41 "application/octet-stream".to_string()
42 } else {
43 content_type.trim().to_string()
44 };
45
46 let mut labels = Labels::new();
47 let mut rest = rest.to_string();
48 while let Some(pair) = rest.split_once('=') {
49 let (key, value) = pair;
50 let key = key.trim();
51 if key.starts_with("@") {
52 return Err(unprocessable_entity!(
53 "Label names must not start with '@': reserved for computed labels",
54 ));
55 }
56
57 rest = if value.starts_with('\"') {
58 let value = value[1..].to_string();
59 let (value, rest) = value
60 .split_once('\"')
61 .ok_or(unprocessable_entity!("Invalid batched header"))?;
62 labels.insert(key.trim().to_string(), value.trim().to_string());
63 rest.trim_start_matches(',').trim().to_string()
64 } else if let Some(ret) = value.split_once(',') {
65 let (value, rest) = ret;
66 labels.insert(key.trim().to_string(), value.trim().to_string());
67 rest.trim().to_string()
68 } else {
69 labels.insert(key.to_string(), value.trim().to_string());
70 break;
71 };
72 }
73
74 Ok(RecordHeader {
75 content_length,
76 content_type,
77 labels,
78 })
79}
80
81pub fn sort_headers_by_time(headers: &HeaderMap) -> Result<Vec<(u64, HeaderValue)>, ReductError> {
82 let sorted_headers: Vec<_> = headers
83 .clone()
84 .into_iter()
85 .filter(|(name, _)| name.is_some())
86 .map(|(name, value)| (name.unwrap().to_string(), value))
87 .filter(|(name, _)| name.starts_with("x-reduct-time-"))
88 .map(|(key, value)| (key[14..].parse::<u64>().ok(), (key, value)))
89 .collect();
90
91 for (time, (key, _)) in &sorted_headers {
92 if time.is_none() {
93 return Err(unprocessable_entity!(
94 "Invalid header '{}': must be an unix timestamp in microseconds",
95 key
96 ));
97 }
98 }
99
100 let mut sorted_headers: Vec<(u64, HeaderValue)> = sorted_headers
101 .into_iter()
102 .map(|(time, (_key, value))| (time.unwrap(), value))
103 .collect();
104 sorted_headers.sort_by(|(ts1, _), (ts2, _)| ts1.cmp(ts2));
105 Ok(sorted_headers)
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111 use rstest::*;
112
113 #[rstest]
114 fn test_parse_batched_header_row() {
115 let header = "123, text/plain, label1=value1, label2=value2";
116 let RecordHeader {
117 content_length,
118 content_type,
119 labels,
120 } = parse_batched_header(header).unwrap();
121 assert_eq!(content_length, 123);
122 assert_eq!(content_type, "text/plain");
123 assert_eq!(labels.len(), 2);
124 assert_eq!(labels.get("label1"), Some(&"value1".to_string()));
125 assert_eq!(labels.get("label2"), Some(&"value2".to_string()));
126 }
127
128 #[rstest]
129 fn test_parse_batched_header_row_quotes() {
130 let header = "123, text/plain, label1=\"[1, 2, 3]\", label2=\"value2\"";
131 let RecordHeader {
132 content_length,
133 content_type,
134 labels,
135 } = parse_batched_header(header).unwrap();
136 assert_eq!(content_length, 123);
137 assert_eq!(content_type, "text/plain");
138 assert_eq!(labels.len(), 2);
139 assert_eq!(labels.get("label1"), Some(&"[1, 2, 3]".to_string()));
140 assert_eq!(labels.get("label2"), Some(&"value2".to_string()));
141 }
142
143 #[rstest]
144 fn test_parse_header_no_labels() {
145 let header = "123, text/plain";
146 let RecordHeader {
147 content_length,
148 content_type,
149 labels,
150 } = parse_batched_header(header).unwrap();
151 assert_eq!(content_length, 123);
152 assert_eq!(content_type, "text/plain");
153 assert_eq!(labels.len(), 0);
154 }
155
156 #[rstest]
157 #[case("")]
158 #[case("xxx")]
159 fn test_parse_header_bad_header(#[case] header: &str) {
160 let err = parse_batched_header(header).err().unwrap();
161 assert_eq!(err, unprocessable_entity!("Invalid batched header"));
162 }
163
164 #[rstest]
165 fn test_parse_header_bad_label() {
166 let err = parse_batched_header("123, text/plain, @label1=value1, label2=value2")
167 .err()
168 .unwrap();
169 assert_eq!(
170 err,
171 unprocessable_entity!(
172 "Label names must not start with '@': reserved for computed labels"
173 )
174 );
175 }
176}