1use crate::batch::v1::RecordHeader;
33use crate::error::ReductError;
34#[cfg(feature = "io")]
35use crate::io::RecordMeta;
36use crate::unprocessable_entity;
37use crate::Labels;
38use http::{HeaderMap, HeaderName, HeaderValue};
39use std::collections::{HashMap, HashSet};
40use std::str::FromStr;
41
42pub const HEADER_PREFIX: &str = "x-reduct-";
43pub const ERROR_HEADER_PREFIX: &str = "x-reduct-error-";
44pub const ENTRIES_HEADER: &str = "x-reduct-entries";
45pub const START_TS_HEADER: &str = "x-reduct-start-ts";
46pub const LABELS_HEADER: &str = "x-reduct-labels";
47
48pub const QUERY_ID_HEADER: &str = "x-reduct-query-id";
49
50#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct EntryRecordHeader {
53 pub entry: String,
54 pub timestamp: u64,
55 pub header: RecordHeader,
56}
57
58fn is_tchar(byte: u8) -> bool {
59 byte.is_ascii_alphanumeric()
60 || matches!(
61 byte,
62 b'!' | b'#'
63 | b'$'
64 | b'%'
65 | b'&'
66 | b'\''
67 | b'*'
68 | b'+'
69 | b'-'
70 | b'.'
71 | b'^'
72 | b'_'
73 | b'`'
74 | b'|'
75 | b'~'
76 )
77}
78
79pub fn encode_entry_name(entry: &str) -> String {
81 let mut encoded = String::with_capacity(entry.len());
82 for byte in entry.as_bytes() {
83 if is_tchar(*byte) {
84 encoded.push(*byte as char);
85 } else {
86 encoded.push_str(&format!("%{:02X}", byte));
87 }
88 }
89 encoded
90}
91
92pub fn decode_entry_name(encoded: &str) -> Result<String, ReductError> {
94 let mut decoded = Vec::with_capacity(encoded.len());
95 let bytes = encoded.as_bytes();
96 let mut pos = 0;
97 while pos < bytes.len() {
98 match bytes[pos] {
99 b'%' => {
100 if pos + 2 >= bytes.len() {
101 return Err(unprocessable_entity!(
102 "Invalid entry encoding in header name: '{}'",
103 encoded
104 ));
105 }
106 let high = (bytes[pos + 1] as char).to_digit(16);
107 let low = (bytes[pos + 2] as char).to_digit(16);
108 if high.is_none() || low.is_none() {
109 return Err(unprocessable_entity!(
110 "Invalid entry encoding in header name: '{}'",
111 encoded
112 ));
113 }
114 decoded.push((high.unwrap() * 16 + low.unwrap()) as u8);
115 pos += 3;
116 }
117 other => {
118 decoded.push(other);
119 pos += 1;
120 }
121 }
122 }
123
124 String::from_utf8(decoded)
125 .map_err(|_| unprocessable_entity!("Entry name is not valid UTF-8 in header '{}'", encoded))
126}
127
128pub fn parse_entries_header(entries: &HeaderValue) -> Result<Vec<String>, ReductError> {
130 let entries = entries
131 .to_str()
132 .map_err(|_| unprocessable_entity!("Invalid entries header"))?;
133 if entries.trim().is_empty() {
134 return Err(unprocessable_entity!("x-reduct-entries header is required"));
135 }
136
137 entries
138 .split(',')
139 .map(|entry| decode_entry_name(entry.trim()))
140 .collect()
141}
142
143pub fn decode_label_name(encoded: &str) -> Result<String, ReductError> {
145 decode_entry_name(encoded)
146 .map_err(|_| unprocessable_entity!("Invalid label encoding in header value: '{}'", encoded))
147}
148
149pub fn encode_label_name(label: &str) -> String {
151 encode_entry_name(label)
152}
153
154pub fn parse_labels_header(labels: &HeaderValue) -> Result<Vec<String>, ReductError> {
156 let labels = labels
157 .to_str()
158 .map_err(|_| unprocessable_entity!("Invalid labels header"))?;
159 if labels.trim().is_empty() {
160 return Err(unprocessable_entity!("x-reduct-labels header is empty"));
161 }
162
163 labels
164 .split(',')
165 .map(|label| decode_label_name(label.trim()))
166 .collect()
167}
168
169#[derive(Debug, Default, Clone)]
171pub struct LabelIndex {
172 names: Vec<String>,
173 lookup: HashMap<String, usize>,
174}
175
176impl LabelIndex {
177 pub fn ensure(&mut self, name: &str) -> usize {
179 if let Some(idx) = self.lookup.get(name) {
180 return *idx;
181 }
182
183 let idx = self.names.len();
184 self.names.push(name.to_string());
185 self.lookup.insert(name.to_string(), idx);
186 idx
187 }
188
189 pub fn as_header(&self) -> Option<HeaderValue> {
191 if self.names.is_empty() {
192 return None;
193 }
194
195 let encoded = self
196 .names
197 .iter()
198 .map(|name| encode_label_name(name))
199 .collect::<Vec<_>>()
200 .join(",");
201 Some(encoded.parse().unwrap())
202 }
203
204 pub fn names(&self) -> &[String] {
206 &self.names
207 }
208}
209
210#[cfg(feature = "io")]
212pub fn build_label_delta(
213 meta: &RecordMeta,
214 previous_labels: Option<&Labels>,
215 label_index: &mut LabelIndex,
216) -> String {
217 let mut deltas: Vec<(usize, String)> = Vec::new();
218
219 let format_value = |value: &str| {
220 if value.contains(',') {
221 format!("\"{}\"", value)
222 } else {
223 value.to_string()
224 }
225 };
226
227 if let Some(prev) = previous_labels {
228 let mut keys: Vec<String> = prev
229 .keys()
230 .chain(meta.labels().keys())
231 .map(|k| k.to_string())
232 .collect();
233 keys.sort();
234 keys.dedup();
235
236 for key in keys {
237 let prev_val = prev.get(&key);
238 let curr_val = meta.labels().get(&key);
239 match (prev_val, curr_val) {
240 (Some(p), Some(c)) if p == c => continue,
241 (Some(_), None) => {
242 let idx = label_index.ensure(&key);
243 deltas.push((idx, String::new()))
244 }
245 (_, Some(c)) => {
246 let idx = label_index.ensure(&key);
247 deltas.push((idx, format_value(c)))
248 }
249 _ => {}
250 }
251 }
252 } else {
253 for (k, v) in meta.labels().iter() {
254 let idx = label_index.ensure(k);
255 deltas.push((idx, format_value(v)));
256 }
257 }
258
259 for (k, v) in meta.computed_labels() {
260 let idx = label_index.ensure(&format!("@{}", k));
261 deltas.push((idx, format_value(v)));
262 }
263
264 deltas.sort_by_key(|(idx, _)| *idx);
265 deltas
266 .into_iter()
267 .map(|(idx, value)| format!("{}={}", idx, value))
268 .collect::<Vec<_>>()
269 .join(",")
270}
271
272#[cfg(feature = "io")]
274pub fn make_record_header_value(
275 meta: &RecordMeta,
276 previous_content_type: Option<&str>,
277 previous_labels: Option<&Labels>,
278 label_index: &mut LabelIndex,
279) -> HeaderValue {
280 let mut parts: Vec<String> = vec![meta.content_length().to_string()];
281
282 let mut content_type = String::new();
283 match previous_content_type {
284 Some(prev) if prev != meta.content_type() => content_type = meta.content_type().to_string(),
285 None => content_type = meta.content_type().to_string(),
286 _ => {}
287 }
288
289 let labels_delta = build_label_delta(meta, previous_labels, label_index);
290 let has_labels = !labels_delta.is_empty();
291
292 if !content_type.is_empty() || has_labels {
293 parts.push(content_type);
294 }
295
296 if has_labels {
297 parts.push(labels_delta);
298 }
299
300 parts.join(",").parse().unwrap()
301}
302
303pub fn make_batched_header_name(entry_index: usize, time_delta: u64) -> HeaderName {
305 HeaderName::from_str(&format!("{}{}-{}", HEADER_PREFIX, entry_index, time_delta))
306 .expect("Entry index and time delta must produce a valid header name")
307}
308
309pub fn parse_batched_header_name(name: &str) -> Result<(usize, u64), ReductError> {
311 if !name.starts_with(HEADER_PREFIX) {
312 return Err(unprocessable_entity!("Invalid batched header '{}'", name));
313 }
314
315 let without_prefix = &name[HEADER_PREFIX.len()..];
316 let (entry_index, delta) = without_prefix
317 .rsplit_once('-')
318 .ok_or(unprocessable_entity!("Invalid batched header '{}'", name))?;
319
320 let entry_index: usize = entry_index.parse().map_err(|_| {
321 unprocessable_entity!("Invalid header '{}': entry index must be a number", name)
322 })?;
323
324 let delta = delta.parse::<u64>().map_err(|_| {
325 unprocessable_entity!(
326 "Invalid header '{}': must be an unix timestamp in microseconds",
327 name
328 )
329 })?;
330
331 Ok((entry_index, delta))
332}
333
334pub fn make_error_batched_header(
335 entry_index: usize,
336 time_delta: u64,
337 err: &ReductError,
338) -> (HeaderName, HeaderValue) {
339 let name = HeaderName::from_str(&format!(
340 "{}{}-{}",
341 ERROR_HEADER_PREFIX, entry_index, time_delta
342 ))
343 .expect("Entry index and time delta must produce a valid header name");
344 let value = HeaderValue::from_str(&format!("{},{}", err.status(), err.message()))
345 .expect("Status code and message must produce a valid header value");
346 (name, value)
347}
348
349#[inline]
350pub fn make_entries_header(entries: &[String]) -> HeaderValue {
351 let encoded = entries
352 .iter()
353 .map(|entry| encode_entry_name(entry))
354 .collect::<Vec<_>>()
355 .join(",");
356 encoded.parse().unwrap()
357}
358
359#[inline]
360pub fn make_start_timestamp_header(start_ts: u64) -> HeaderValue {
361 HeaderValue::from_str(&start_ts.to_string()).unwrap()
362}
363
364fn parse_start_timestamp_internal(headers: &HeaderMap) -> Result<u64, ReductError> {
365 headers
366 .get(START_TS_HEADER)
367 .ok_or(unprocessable_entity!(
368 "x-reduct-start-ts header is required"
369 ))?
370 .to_str()
371 .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))?
372 .parse::<u64>()
373 .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))
374}
375
376pub fn parse_start_timestamp(headers: &HeaderMap) -> Result<u64, ReductError> {
377 parse_start_timestamp_internal(headers)
378}
379
380pub fn parse_entries(headers: &HeaderMap) -> Result<Vec<String>, ReductError> {
381 if let Some(entries) = headers.get(ENTRIES_HEADER) {
382 parse_entries_header(entries)
383 } else {
384 Ok(Vec::new())
385 }
386}
387
388pub fn parse_labels(headers: &HeaderMap) -> Result<Option<Vec<String>>, ReductError> {
389 match headers.get(LABELS_HEADER) {
390 None => Ok(None),
391 Some(labels) => parse_labels_header(labels).map(Some),
392 }
393}
394
395pub fn resolve_label_name<'a>(
396 raw: &'a str,
397 label_names: Option<&Vec<String>>,
398) -> Result<String, ReductError> {
399 if let (Some(label_names), Ok(idx)) = (label_names, raw.parse::<usize>()) {
400 return label_names
401 .get(idx)
402 .cloned()
403 .ok_or_else(|| unprocessable_entity!("Label index '{}' is out of range", raw));
404 }
405
406 if raw.starts_with('@') {
407 return Err(unprocessable_entity!(
408 "Label names must not start with '@': reserved for computed labels",
409 ));
410 }
411
412 Ok(raw.to_string())
413}
414
415fn apply_label_delta(
416 raw_labels: &str,
417 base: &Labels,
418 label_names: Option<&Vec<String>>,
419) -> Result<Labels, ReductError> {
420 let mut labels = base.clone();
421 for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
422 match value {
423 Some(value) => {
424 labels.insert(key.to_string(), value);
425 }
426 None => {
427 labels.remove(&key);
428 }
429 }
430 }
431
432 Ok(labels)
433}
434
435fn parse_label_delta_ops(
436 raw_labels: &str,
437 label_names: Option<&Vec<String>>,
438) -> Result<Vec<(String, Option<String>)>, ReductError> {
439 let mut ops = Vec::new();
440 let mut rest = raw_labels.trim().to_string();
441
442 if rest.is_empty() {
443 return Ok(ops);
444 }
445
446 loop {
447 let (raw_key, value_part) = rest
448 .split_once('=')
449 .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
450 let key = resolve_label_name(raw_key.trim(), label_names)?;
451
452 let (value, next_rest) = if value_part.starts_with('\"') {
453 let value_part = &value_part[1..];
454 let (value, rest) = value_part
455 .split_once('\"')
456 .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
457 (
458 value.trim().to_string(),
459 rest.trim_start_matches(',').trim().to_string(),
460 )
461 } else if let Some((value, rest)) = value_part.split_once(',') {
462 (value.trim().to_string(), rest.trim().to_string())
463 } else {
464 (value_part.trim().to_string(), String::new())
465 };
466
467 let value = if value.is_empty() { None } else { Some(value) };
468 ops.push((key, value));
469
470 if next_rest.is_empty() {
471 break;
472 }
473 rest = next_rest;
474 }
475
476 Ok(ops)
477}
478
479pub fn parse_label_delta(
480 raw_labels: &str,
481 label_names: Option<&Vec<String>>,
482) -> Result<(Labels, HashSet<String>), ReductError> {
483 let mut updates = Labels::new();
484 let mut remove = HashSet::new();
485
486 for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
487 match value {
488 Some(value) => {
489 updates.insert(key, value);
490 }
491 None => {
492 remove.insert(key);
493 }
494 }
495 }
496
497 Ok((updates, remove))
498}
499
500fn parse_record_header_with_defaults(
501 raw: &str,
502 previous: Option<&RecordHeader>,
503 label_names: Option<&Vec<String>>,
504) -> Result<RecordHeader, ReductError> {
505 let (content_length_str, rest_opt) = raw
506 .split_once(',')
507 .map(|(len, rest)| (len.trim(), Some(rest)))
508 .unwrap_or((raw.trim(), None));
509
510 let content_length = content_length_str
511 .parse::<u64>()
512 .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
513
514 if rest_opt.is_none() {
515 let prev = previous.ok_or_else(|| {
516 unprocessable_entity!(
517 "Content-type and labels must be provided for the first record of an entry"
518 )
519 })?;
520 return Ok(RecordHeader {
521 content_length,
522 content_type: prev.content_type.clone(),
523 labels: prev.labels.clone(),
524 });
525 }
526
527 let rest = rest_opt.unwrap();
528 let (content_type_raw, labels_raw) = match rest.split_once(',') {
529 Some((ct, labels)) => (ct, Some(labels)),
530 None => (rest, None),
531 };
532
533 let content_type = if !content_type_raw.trim().is_empty() {
534 content_type_raw.trim().to_string()
535 } else if let Some(prev) = previous {
536 prev.content_type.clone()
537 } else {
538 "application/octet-stream".to_string()
539 };
540
541 let labels = match labels_raw {
542 None => previous
543 .map(|prev| prev.labels.clone())
544 .unwrap_or_else(HashMap::new),
545 Some(raw_labels) => apply_label_delta(
546 raw_labels,
547 previous.map(|prev| &prev.labels).unwrap_or(&HashMap::new()),
548 label_names,
549 )?,
550 };
551
552 Ok(RecordHeader {
553 content_length,
554 content_type,
555 labels,
556 })
557}
558
559pub fn sort_headers_by_entry_and_time(
564 headers: &HeaderMap,
565) -> Result<Vec<(usize, u64, HeaderValue)>, ReductError> {
566 let mut parsed_headers: Vec<(usize, u64, HeaderValue)> = headers
567 .clone()
568 .into_iter()
569 .filter(|(name, _)| name.is_some())
570 .map(|(name, value)| (name.unwrap().to_string(), value))
571 .filter(|(name, _)| name.starts_with(HEADER_PREFIX))
572 .filter(|(name, _)| name.rsplit_once('-').is_some())
573 .filter(|(name, _)| {
574 name.rsplit_once('-')
575 .map(|(_, ts)| ts.chars().all(|ch| ch.is_ascii_digit()))
576 .unwrap_or(false)
577 })
578 .map(|(name, value)| {
579 let (entry_index, time_delta) = parse_batched_header_name(&name)?;
580 Ok((entry_index, time_delta, value))
581 })
582 .collect::<Result<_, ReductError>>()?;
583
584 parsed_headers.sort_by(|(idx_a, delta_a, _), (idx_b, delta_b, _)| {
585 idx_a.cmp(idx_b).then_with(|| delta_a.cmp(delta_b))
586 });
587
588 Ok(parsed_headers)
589}
590
591pub fn parse_batched_headers(headers: &HeaderMap) -> Result<Vec<EntryRecordHeader>, ReductError> {
593 let entries = parse_entries(headers)?;
594 let start_ts = parse_start_timestamp(headers)?;
595 let label_names = parse_labels(headers)?;
596 let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
597 let mut result = Vec::new();
598
599 for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
600 let entry = entries.get(entry_index).ok_or_else(|| {
601 unprocessable_entity!(
602 "Invalid header '{}{}-{}': entry index out of range",
603 HEADER_PREFIX,
604 entry_index,
605 delta
606 )
607 })?;
608
609 let raw_value = value
610 .to_str()
611 .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
612
613 let header = parse_record_header_with_defaults(
614 raw_value,
615 last_header_per_entry.get(entry),
616 label_names.as_ref(),
617 )?;
618 let timestamp = start_ts + delta;
619
620 last_header_per_entry.insert(entry.clone(), header.clone());
621
622 result.push(EntryRecordHeader {
623 entry: entry.clone(),
624 timestamp,
625 header,
626 });
627 }
628
629 Ok(result)
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use http::HeaderValue;
636
637 #[test]
638 fn test_encode_entry_name_slash() {
639 assert_eq!(encode_entry_name("ro/topic/1"), "ro%2Ftopic%2F1");
640 }
641
642 #[test]
643 fn test_encode_entry_name_safe_chars() {
644 assert_eq!(encode_entry_name("entry-1_foo~bar"), "entry-1_foo~bar");
645 }
646
647 #[test]
648 fn test_decode_entry_name_roundtrip() {
649 let entry = "mqtt/topic/1";
650 let encoded = encode_entry_name(entry);
651 assert_eq!(decode_entry_name(&encoded).unwrap(), entry);
652 }
653
654 #[test]
655 fn test_decode_entry_name_invalid_percent() {
656 let err = decode_entry_name("foo%ZZ").err().unwrap();
657 assert_eq!(
658 err,
659 unprocessable_entity!("Invalid entry encoding in header name: 'foo%ZZ'")
660 );
661 }
662
663 #[test]
664 fn test_parse_entries_header_roundtrip() {
665 let value = HeaderValue::from_str("sensor,ro%2Ftopic").unwrap();
666 let entries = parse_entries_header(&value).unwrap();
667 assert_eq!(entries, vec!["sensor".to_string(), "ro/topic".to_string()]);
668 }
669
670 #[test]
671 fn test_parse_labels_header_roundtrip() {
672 let value = HeaderValue::from_str("label-1,foo%2Fbar").unwrap();
673 let labels = parse_labels_header(&value).unwrap();
674 assert_eq!(labels, vec!["label-1".to_string(), "foo/bar".to_string()]);
675 }
676
677 #[test]
678 fn test_parse_batched_header_name_basic() {
679 let (entry_index, delta) = parse_batched_header_name("x-reduct-1-123").unwrap();
680 assert_eq!(entry_index, 1);
681 assert_eq!(delta, 123);
682 }
683
684 #[test]
685 fn test_parse_batched_header_name_invalid_time() {
686 let err = parse_batched_header_name("x-reduct-1-abc").err().unwrap();
687 assert_eq!(
688 err,
689 unprocessable_entity!(
690 "Invalid header '{}': must be an unix timestamp in microseconds",
691 "x-reduct-1-abc"
692 )
693 );
694 }
695
696 #[test]
697 fn test_sort_headers_by_entry_and_time() {
698 let mut headers = HeaderMap::new();
699 headers.insert(
700 ENTRIES_HEADER,
701 HeaderValue::from_static("sensor,ro%2Ftopic"),
702 );
703 headers.insert(START_TS_HEADER, HeaderValue::from_static("10"));
704 headers.insert(
705 make_batched_header_name(1, 5),
706 HeaderValue::from_static("1,text/plain"),
707 );
708 headers.insert(
709 make_batched_header_name(0, 2),
710 HeaderValue::from_static("1,text/plain"),
711 );
712 headers.insert(
713 make_batched_header_name(0, 3),
714 HeaderValue::from_static("1,text/plain"),
715 );
716 headers.insert(
717 make_batched_header_name(1, 3),
718 HeaderValue::from_static("1,text/plain"),
719 );
720
721 let parsed = sort_headers_by_entry_and_time(&headers).unwrap();
722 assert_eq!(parsed.len(), 4);
723 assert_eq!(parsed[0].0, 0);
724 assert_eq!(parsed[0].1, 2);
725 assert_eq!(parsed[1].0, 0);
726 assert_eq!(parsed[1].1, 3);
727 assert_eq!(parsed[2].0, 1);
728 assert_eq!(parsed[2].1, 3);
729 assert_eq!(parsed[3].0, 1);
730 assert_eq!(parsed[3].1, 5);
731 }
732
733 #[test]
734 fn test_parse_batched_headers_with_values() {
735 let mut headers = HeaderMap::new();
736 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry,ro%2Ftopic"));
737 headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
738 headers.insert(LABELS_HEADER, HeaderValue::from_static("label"));
739 headers.insert(
740 make_batched_header_name(1, 15),
741 HeaderValue::from_static("3,text/plain,0=z"),
742 );
743 headers.insert(
744 make_batched_header_name(0, 10),
745 HeaderValue::from_static("5,text/csv,label=value"),
746 );
747
748 let parsed = parse_batched_headers(&headers).unwrap();
749 assert_eq!(parsed.len(), 2);
750
751 assert_eq!(parsed[0].entry, "entry");
752 assert_eq!(parsed[0].timestamp, 1010);
753 assert_eq!(parsed[0].header.content_length, 5);
754 assert_eq!(parsed[0].header.content_type, "text/csv");
755 assert_eq!(parsed[0].header.labels.get("label").unwrap(), "value");
756
757 assert_eq!(parsed[1].entry, "ro/topic");
758 assert_eq!(parsed[1].timestamp, 1015);
759 assert_eq!(parsed[1].header.content_length, 3);
760 assert_eq!(parsed[1].header.content_type, "text/plain");
761 assert_eq!(parsed[1].header.labels.get("label").unwrap(), "z");
762 }
763
764 #[test]
765 fn test_parse_batched_headers_reuse_metadata() {
766 let mut headers = HeaderMap::new();
767 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
768 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
769 headers.insert(LABELS_HEADER, HeaderValue::from_static("x"));
770 headers.insert(
771 make_batched_header_name(0, 0),
772 HeaderValue::from_static("10,text/plain,0=y"),
773 );
774 headers.insert(
775 make_batched_header_name(0, 5),
776 HeaderValue::from_static("2"),
777 ); headers.insert(
779 make_batched_header_name(0, 10),
780 HeaderValue::from_static("3,,0=z"),
781 ); let parsed = parse_batched_headers(&headers).unwrap();
784 assert_eq!(parsed.len(), 3);
785 assert_eq!(parsed[0].header.content_type, "text/plain");
786 assert_eq!(parsed[0].header.labels.get("x").unwrap(), "y");
787
788 assert_eq!(parsed[1].header.content_type, "text/plain");
789 assert_eq!(parsed[1].header.labels.get("x").unwrap(), "y");
790
791 assert_eq!(parsed[2].header.content_type, "text/plain");
792 assert_eq!(parsed[2].header.labels.get("x").unwrap(), "z");
793 }
794
795 #[test]
796 fn test_parse_batched_headers_with_label_indexes() {
797 let mut headers = HeaderMap::new();
798 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
799 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
800 headers.insert(LABELS_HEADER, HeaderValue::from_static("a,b"));
801
802 headers.insert(
803 make_batched_header_name(0, 0),
804 HeaderValue::from_static("10,text/plain,0=1,1=2"),
805 );
806 headers.insert(
807 make_batched_header_name(0, 5),
808 HeaderValue::from_static("2,,0="), );
810
811 let parsed = parse_batched_headers(&headers).unwrap();
812 assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
813 assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
814 assert!(!parsed[1].header.labels.contains_key("a"));
815 assert_eq!(parsed[1].header.labels.get("b").unwrap(), "2");
816 }
817
818 #[test]
819 fn test_label_delta_removal() {
820 let mut headers = HeaderMap::new();
821 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
822 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
823 headers.insert(
824 make_batched_header_name(0, 0),
825 HeaderValue::from_static("10,text/plain,a=1,b=2"),
826 );
827 headers.insert(
828 make_batched_header_name(0, 5),
829 HeaderValue::from_static("5,text/plain,b="),
830 );
831
832 let parsed = parse_batched_headers(&headers).unwrap();
833 assert_eq!(parsed.len(), 2);
834 assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
835 assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
836 assert_eq!(parsed[1].header.labels.get("a").unwrap(), "1");
837 assert!(!parsed[1].header.labels.contains_key("b"));
838 }
839
840 #[test]
841 fn test_parse_batched_headers_reuse_without_prev_error() {
842 let mut headers = HeaderMap::new();
843 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
844 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
845 headers.insert(
846 make_batched_header_name(0, 0),
847 HeaderValue::from_static("10"),
848 ); let err = parse_batched_headers(&headers).err().unwrap();
851 assert_eq!(
852 err,
853 unprocessable_entity!(
854 "Content-type and labels must be provided for the first record of an entry"
855 )
856 );
857 }
858
859 #[test]
860 fn test_parse_batched_headers_invalid_index() {
861 let mut headers = HeaderMap::new();
862 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
863 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
864 headers.insert(
865 make_batched_header_name(1, 0),
866 HeaderValue::from_static("1,text/plain"),
867 );
868
869 let err = parse_batched_headers(&headers).err().unwrap();
870 assert_eq!(
871 err,
872 unprocessable_entity!("Invalid header 'x-reduct-1-0': entry index out of range")
873 );
874 }
875
876 #[test]
877 fn test_parse_batched_headers_missing_meta() {
878 let mut headers = HeaderMap::new();
879 headers.insert(
880 make_batched_header_name(0, 0),
881 HeaderValue::from_static("1,text/plain"),
882 );
883
884 let err = parse_batched_headers(&headers).err().unwrap();
885 assert_eq!(
886 err,
887 unprocessable_entity!("x-reduct-start-ts header is required")
888 );
889 }
890
891 #[test]
892 fn test_parse_label_delta_updates_and_removals() {
893 let label_names = vec!["a".to_string(), "b".to_string()];
894 let (updates, remove) =
895 parse_label_delta("0=one,1=,c=\"3,4\"", Some(&label_names)).unwrap();
896
897 assert_eq!(updates.get("a").unwrap(), "one");
898 assert_eq!(updates.get("c").unwrap(), "3,4");
899 assert!(remove.contains("b"));
900 assert_eq!(remove.len(), 1);
901 }
902
903 #[test]
904 fn test_resolve_label_name_reserved_prefix() {
905 let err = resolve_label_name("@cpu", None).err().unwrap();
906 assert_eq!(
907 err,
908 unprocessable_entity!(
909 "Label names must not start with '@': reserved for computed labels",
910 )
911 );
912 }
913
914 #[test]
915 fn test_resolve_label_name_index_out_of_range() {
916 let label_names = vec!["a".to_string()];
917 let err = resolve_label_name("2", Some(&label_names)).err().unwrap();
918 assert_eq!(
919 err,
920 unprocessable_entity!("Label index '2' is out of range")
921 );
922 }
923
924 #[test]
925 fn test_make_error_batched_header() {
926 let err = unprocessable_entity!("broken");
927 let (name, value) = make_error_batched_header(2, 10, &err);
928
929 assert_eq!(name.as_str(), "x-reduct-error-2-10");
930 assert_eq!(
931 value.to_str().unwrap(),
932 format!("{},{}", err.status(), err.message())
933 );
934 }
935
936 #[cfg(feature = "io")]
937 mod io_tests {
938 use super::*;
939 use std::collections::HashMap;
940
941 fn build_meta(
942 labels: &[(&str, &str)],
943 computed: &[(&str, &str)],
944 content_type: &str,
945 content_length: u64,
946 ) -> RecordMeta {
947 let mut label_map = HashMap::new();
948 for (key, value) in labels {
949 label_map.insert((*key).to_string(), (*value).to_string());
950 }
951
952 let mut computed_map = HashMap::new();
953 for (key, value) in computed {
954 computed_map.insert((*key).to_string(), (*value).to_string());
955 }
956
957 RecordMeta::builder()
958 .labels(label_map)
959 .computed_labels(computed_map)
960 .content_type(content_type.to_string())
961 .content_length(content_length)
962 .build()
963 }
964
965 #[test]
966 fn test_build_label_delta_with_previous_and_computed() {
967 let mut label_index = LabelIndex::default();
968 label_index.ensure("a");
969 label_index.ensure("b");
970 label_index.ensure("c");
971 label_index.ensure("@cpu");
972
973 let mut previous = Labels::new();
974 previous.insert("a".to_string(), "1".to_string());
975 previous.insert("b".to_string(), "2".to_string());
976
977 let meta = build_meta(&[("a", "1"), ("c", "3,4")], &[("cpu", "10")], "text", 1);
978
979 let delta = build_label_delta(&meta, Some(&previous), &mut label_index);
980 assert_eq!(delta, "1=,2=\"3,4\",3=10");
981 }
982
983 #[test]
984 fn test_make_record_header_value_reuse_metadata() {
985 let mut label_index = LabelIndex::default();
986 label_index.ensure("a");
987
988 let mut previous = Labels::new();
989 previous.insert("a".to_string(), "1".to_string());
990
991 let meta = build_meta(&[("a", "1")], &[], "text/plain", 8);
992 let value = make_record_header_value(
993 &meta,
994 Some("text/plain"),
995 Some(&previous),
996 &mut label_index,
997 );
998
999 assert_eq!(value.to_str().unwrap(), "8");
1000 }
1001
1002 #[test]
1003 fn test_make_record_header_value_with_label_delta() {
1004 let mut label_index = LabelIndex::default();
1005 label_index.ensure("a");
1006
1007 let previous = Labels::new();
1008 let meta = build_meta(&[("a", "1")], &[], "text/plain", 10);
1009 let value = make_record_header_value(
1010 &meta,
1011 Some("text/plain"),
1012 Some(&previous),
1013 &mut label_index,
1014 );
1015
1016 assert_eq!(value.to_str().unwrap(), "10,,0=1");
1017 }
1018 }
1019}