1use crate::batch::v1::RecordHeader;
31use crate::error::ReductError;
32#[cfg(feature = "io")]
33use crate::io::RecordMeta;
34use crate::unprocessable_entity;
35use crate::Labels;
36use http::{HeaderMap, HeaderName, HeaderValue};
37use std::collections::{HashMap, HashSet};
38use std::str::FromStr;
39
40pub const HEADER_PREFIX: &str = "x-reduct-";
41pub const ERROR_HEADER_PREFIX: &str = "x-reduct-error-";
42pub const ENTRIES_HEADER: &str = "x-reduct-entries";
43pub const START_TS_HEADER: &str = "x-reduct-start-ts";
44pub const LABELS_HEADER: &str = "x-reduct-labels";
45
46pub const QUERY_ID_HEADER: &str = "x-reduct-query-id";
47
48#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct EntryRecordHeader {
51 pub entry: String,
52 pub timestamp: u64,
53 pub header: RecordHeader,
54}
55
56#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct EntryLabelUpdateHeader {
59 pub entry: String,
60 pub timestamp: u64,
61 pub update: Labels,
62 pub remove: HashSet<String>,
63}
64
65fn is_tchar(byte: u8) -> bool {
66 byte.is_ascii_alphanumeric()
67 || matches!(
68 byte,
69 b'!' | b'#'
70 | b'$'
71 | b'%'
72 | b'&'
73 | b'\''
74 | b'*'
75 | b'+'
76 | b'-'
77 | b'.'
78 | b'^'
79 | b'_'
80 | b'`'
81 | b'|'
82 | b'~'
83 )
84}
85
86pub fn encode_entry_name(entry: &str) -> String {
88 let mut encoded = String::with_capacity(entry.len());
89 for byte in entry.as_bytes() {
90 if is_tchar(*byte) {
91 encoded.push(*byte as char);
92 } else {
93 encoded.push_str(&format!("%{:02X}", byte));
94 }
95 }
96 encoded
97}
98
99pub fn decode_entry_name(encoded: &str) -> Result<String, ReductError> {
101 let mut decoded = Vec::with_capacity(encoded.len());
102 let bytes = encoded.as_bytes();
103 let mut pos = 0;
104 while pos < bytes.len() {
105 match bytes[pos] {
106 b'%' => {
107 if pos + 2 >= bytes.len() {
108 return Err(unprocessable_entity!(
109 "Invalid entry encoding in header name: '{}'",
110 encoded
111 ));
112 }
113 let high = (bytes[pos + 1] as char).to_digit(16);
114 let low = (bytes[pos + 2] as char).to_digit(16);
115 if high.is_none() || low.is_none() {
116 return Err(unprocessable_entity!(
117 "Invalid entry encoding in header name: '{}'",
118 encoded
119 ));
120 }
121 decoded.push((high.unwrap() * 16 + low.unwrap()) as u8);
122 pos += 3;
123 }
124 other => {
125 decoded.push(other);
126 pos += 1;
127 }
128 }
129 }
130
131 String::from_utf8(decoded)
132 .map_err(|_| unprocessable_entity!("Entry name is not valid UTF-8 in header '{}'", encoded))
133}
134
135pub fn parse_entries_header(entries: &HeaderValue) -> Result<Vec<String>, ReductError> {
137 let entries = entries
138 .to_str()
139 .map_err(|_| unprocessable_entity!("Invalid entries header"))?;
140 if entries.trim().is_empty() {
141 return Err(unprocessable_entity!("x-reduct-entries header is required"));
142 }
143
144 entries
145 .split(',')
146 .map(|entry| {
147 let entry = entry.trim();
148 if entry.is_empty() {
149 return Err(unprocessable_entity!(
150 "x-reduct-entries header must not contain empty entry names"
151 ));
152 }
153
154 decode_entry_name(entry)
155 })
156 .collect()
157}
158
159pub fn decode_label_name(encoded: &str) -> Result<String, ReductError> {
161 decode_entry_name(encoded)
162 .map_err(|_| unprocessable_entity!("Invalid label encoding in header value: '{}'", encoded))
163}
164
165pub fn encode_label_name(label: &str) -> String {
167 encode_entry_name(label)
168}
169
170pub fn parse_labels_header(labels: &HeaderValue) -> Result<Vec<String>, ReductError> {
172 let labels = labels
173 .to_str()
174 .map_err(|_| unprocessable_entity!("Invalid labels header"))?;
175 if labels.trim().is_empty() {
176 return Err(unprocessable_entity!("x-reduct-labels header is empty"));
177 }
178
179 labels
180 .split(',')
181 .map(|label| decode_label_name(label.trim()))
182 .collect()
183}
184
185#[derive(Debug, Default, Clone)]
187pub struct LabelIndex {
188 names: Vec<String>,
189 lookup: HashMap<String, usize>,
190}
191
192impl LabelIndex {
193 pub fn ensure(&mut self, name: &str) -> usize {
195 if let Some(idx) = self.lookup.get(name) {
196 return *idx;
197 }
198
199 let idx = self.names.len();
200 self.names.push(name.to_string());
201 self.lookup.insert(name.to_string(), idx);
202 idx
203 }
204
205 pub fn as_header(&self) -> Option<HeaderValue> {
207 if self.names.is_empty() {
208 return None;
209 }
210
211 let encoded = self
212 .names
213 .iter()
214 .map(|name| encode_label_name(name))
215 .collect::<Vec<_>>()
216 .join(",");
217 Some(encoded.parse().unwrap())
218 }
219
220 pub fn names(&self) -> &[String] {
222 &self.names
223 }
224}
225
226#[cfg(feature = "io")]
228pub fn build_label_delta(
229 meta: &RecordMeta,
230 previous_labels: Option<&Labels>,
231 label_index: &mut LabelIndex,
232) -> String {
233 let mut deltas: Vec<(usize, String)> = Vec::new();
234
235 let format_value = |value: &str| {
236 if value.contains(',') {
237 format!("\"{}\"", value)
238 } else {
239 value.to_string()
240 }
241 };
242
243 if let Some(prev) = previous_labels {
244 let mut keys: Vec<String> = prev
245 .keys()
246 .chain(meta.labels().keys())
247 .map(|k| k.to_string())
248 .collect();
249 keys.sort();
250 keys.dedup();
251
252 for key in keys {
253 let prev_val = prev.get(&key);
254 let curr_val = meta.labels().get(&key);
255 match (prev_val, curr_val) {
256 (Some(p), Some(c)) if p == c => continue,
257 (Some(_), None) => {
258 let idx = label_index.ensure(&key);
259 deltas.push((idx, String::new()))
260 }
261 (_, Some(c)) => {
262 let idx = label_index.ensure(&key);
263 deltas.push((idx, format_value(c)))
264 }
265 _ => {}
266 }
267 }
268 } else {
269 for (k, v) in meta.labels().iter() {
270 let idx = label_index.ensure(k);
271 deltas.push((idx, format_value(v)));
272 }
273 }
274
275 for (k, v) in meta.computed_labels() {
276 let idx = label_index.ensure(&format!("@{}", k));
277 deltas.push((idx, format_value(v)));
278 }
279
280 deltas.sort_by_key(|(idx, _)| *idx);
281 deltas
282 .into_iter()
283 .map(|(idx, value)| format!("{}={}", idx, value))
284 .collect::<Vec<_>>()
285 .join(",")
286}
287
288#[cfg(feature = "io")]
290pub fn make_record_header_value(
291 meta: &RecordMeta,
292 previous_content_type: Option<&str>,
293 previous_labels: Option<&Labels>,
294 label_index: &mut LabelIndex,
295) -> HeaderValue {
296 let mut parts: Vec<String> = vec![meta.content_length().to_string()];
297
298 let mut content_type = String::new();
299 match previous_content_type {
300 Some(prev) if prev != meta.content_type() => content_type = meta.content_type().to_string(),
301 None => content_type = meta.content_type().to_string(),
302 _ => {}
303 }
304
305 let labels_delta = build_label_delta(meta, previous_labels, label_index);
306 let has_labels = !labels_delta.is_empty();
307
308 if !content_type.is_empty() || has_labels {
309 parts.push(content_type);
310 }
311
312 if has_labels {
313 parts.push(labels_delta);
314 }
315
316 parts.join(",").parse().unwrap()
317}
318
319pub fn make_batched_header_name(entry_index: usize, time_delta: u64) -> HeaderName {
321 HeaderName::from_str(&format!("{}{}-{}", HEADER_PREFIX, entry_index, time_delta))
322 .expect("Entry index and time delta must produce a valid header name")
323}
324
325pub fn parse_batched_header_name(name: &str) -> Result<(usize, u64), ReductError> {
327 if !name.starts_with(HEADER_PREFIX) {
328 return Err(unprocessable_entity!("Invalid batched header '{}'", name));
329 }
330
331 let without_prefix = &name[HEADER_PREFIX.len()..];
332 let (entry_index, delta) = without_prefix
333 .rsplit_once('-')
334 .ok_or(unprocessable_entity!("Invalid batched header '{}'", name))?;
335
336 let entry_index: usize = entry_index.parse().map_err(|_| {
337 unprocessable_entity!("Invalid header '{}': entry index must be a number", name)
338 })?;
339
340 let delta = delta.parse::<u64>().map_err(|_| {
341 unprocessable_entity!(
342 "Invalid header '{}': must be an unix timestamp in microseconds",
343 name
344 )
345 })?;
346
347 Ok((entry_index, delta))
348}
349
350pub fn make_error_batched_header(
351 entry_index: usize,
352 time_delta: u64,
353 err: &ReductError,
354) -> (HeaderName, HeaderValue) {
355 let name = HeaderName::from_str(&format!(
356 "{}{}-{}",
357 ERROR_HEADER_PREFIX, entry_index, time_delta
358 ))
359 .expect("Entry index and time delta must produce a valid header name");
360 let value = HeaderValue::from_str(&format!("{},{}", err.status(), err.message()))
361 .expect("Status code and message must produce a valid header value");
362 (name, value)
363}
364
365#[inline]
366pub fn make_entries_header(entries: &[String]) -> HeaderValue {
367 let encoded = entries
368 .iter()
369 .map(|entry| encode_entry_name(entry))
370 .collect::<Vec<_>>()
371 .join(",");
372 encoded.parse().unwrap()
373}
374
375#[inline]
376pub fn make_start_timestamp_header(start_ts: u64) -> HeaderValue {
377 HeaderValue::from_str(&start_ts.to_string()).unwrap()
378}
379
380fn parse_start_timestamp_internal(headers: &HeaderMap) -> Result<u64, ReductError> {
381 headers
382 .get(START_TS_HEADER)
383 .ok_or(unprocessable_entity!(
384 "x-reduct-start-ts header is required"
385 ))?
386 .to_str()
387 .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))?
388 .parse::<u64>()
389 .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))
390}
391
392pub fn parse_start_timestamp(headers: &HeaderMap) -> Result<u64, ReductError> {
393 parse_start_timestamp_internal(headers)
394}
395
396pub fn parse_entries(headers: &HeaderMap) -> Result<Vec<String>, ReductError> {
397 if let Some(entries) = headers.get(ENTRIES_HEADER) {
398 parse_entries_header(entries)
399 } else {
400 Ok(Vec::new())
401 }
402}
403
404pub fn parse_labels(headers: &HeaderMap) -> Result<Option<Vec<String>>, ReductError> {
405 match headers.get(LABELS_HEADER) {
406 None => Ok(None),
407 Some(labels) => parse_labels_header(labels).map(Some),
408 }
409}
410
411pub fn resolve_label_name<'a>(
412 raw: &'a str,
413 label_names: Option<&Vec<String>>,
414) -> Result<String, ReductError> {
415 if let (Some(label_names), Ok(idx)) = (label_names, raw.parse::<usize>()) {
416 return label_names
417 .get(idx)
418 .cloned()
419 .ok_or_else(|| unprocessable_entity!("Label index '{}' is out of range", raw));
420 }
421
422 if raw.starts_with('@') {
423 return Err(unprocessable_entity!(
424 "Label names must not start with '@': reserved for computed labels",
425 ));
426 }
427
428 Ok(raw.to_string())
429}
430
431fn apply_label_delta(
432 raw_labels: &str,
433 base: &Labels,
434 label_names: Option<&Vec<String>>,
435) -> Result<Labels, ReductError> {
436 let mut labels = base.clone();
437 for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
438 match value {
439 Some(value) => {
440 labels.insert(key.to_string(), value);
441 }
442 None => {
443 labels.remove(&key);
444 }
445 }
446 }
447
448 Ok(labels)
449}
450
451fn parse_label_delta_ops(
452 raw_labels: &str,
453 label_names: Option<&Vec<String>>,
454) -> Result<Vec<(String, Option<String>)>, ReductError> {
455 let mut ops = Vec::new();
456 let mut rest = raw_labels.trim().to_string();
457
458 if rest.is_empty() {
459 return Ok(ops);
460 }
461
462 loop {
463 let (raw_key, value_part) = rest
464 .split_once('=')
465 .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
466 let key = resolve_label_name(raw_key.trim(), label_names)?;
467
468 let (value, next_rest) = if value_part.starts_with('\"') {
469 let value_part = &value_part[1..];
470 let (value, rest) = value_part
471 .split_once('\"')
472 .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
473 (
474 value.trim().to_string(),
475 rest.trim_start_matches(',').trim().to_string(),
476 )
477 } else if let Some((value, rest)) = value_part.split_once(',') {
478 (value.trim().to_string(), rest.trim().to_string())
479 } else {
480 (value_part.trim().to_string(), String::new())
481 };
482
483 let value = if value.is_empty() { None } else { Some(value) };
484 ops.push((key, value));
485
486 if next_rest.is_empty() {
487 break;
488 }
489 rest = next_rest;
490 }
491
492 Ok(ops)
493}
494
495pub fn parse_label_delta(
496 raw_labels: &str,
497 label_names: Option<&Vec<String>>,
498) -> Result<(Labels, HashSet<String>), ReductError> {
499 let mut updates = Labels::new();
500 let mut remove = HashSet::new();
501
502 for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
503 match value {
504 Some(value) => {
505 updates.insert(key, value);
506 }
507 None => {
508 remove.insert(key);
509 }
510 }
511 }
512
513 Ok((updates, remove))
514}
515
516fn parse_record_header_with_defaults(
517 raw: &str,
518 previous: Option<&RecordHeader>,
519 label_names: Option<&Vec<String>>,
520) -> Result<RecordHeader, ReductError> {
521 let (content_length_str, rest_opt) = raw
522 .split_once(',')
523 .map(|(len, rest)| (len.trim(), Some(rest)))
524 .unwrap_or((raw.trim(), None));
525
526 let content_length = content_length_str
527 .parse::<u64>()
528 .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
529
530 if rest_opt.is_none() {
531 let prev = previous.ok_or_else(|| {
532 unprocessable_entity!(
533 "Content-type and labels must be provided for the first record of an entry"
534 )
535 })?;
536 return Ok(RecordHeader {
537 content_length,
538 content_type: prev.content_type.clone(),
539 labels: prev.labels.clone(),
540 });
541 }
542
543 let rest = rest_opt.unwrap();
544 let (content_type_raw, labels_raw) = match rest.split_once(',') {
545 Some((ct, labels)) => (ct, Some(labels)),
546 None => (rest, None),
547 };
548
549 let content_type = if !content_type_raw.trim().is_empty() {
550 content_type_raw.trim().to_string()
551 } else if let Some(prev) = previous {
552 prev.content_type.clone()
553 } else {
554 "application/octet-stream".to_string()
555 };
556
557 let labels = match labels_raw {
558 None => previous
559 .map(|prev| prev.labels.clone())
560 .unwrap_or_else(HashMap::new),
561 Some(raw_labels) => apply_label_delta(
562 raw_labels,
563 previous.map(|prev| &prev.labels).unwrap_or(&HashMap::new()),
564 label_names,
565 )?,
566 };
567
568 Ok(RecordHeader {
569 content_length,
570 content_type,
571 labels,
572 })
573}
574
575pub fn sort_headers_by_entry_and_time(
580 headers: &HeaderMap,
581) -> Result<Vec<(usize, u64, HeaderValue)>, ReductError> {
582 let mut parsed_headers: Vec<(usize, u64, HeaderValue)> = headers
583 .clone()
584 .into_iter()
585 .filter(|(name, _)| name.is_some())
586 .map(|(name, value)| (name.unwrap().to_string(), value))
587 .filter(|(name, _)| name.starts_with(HEADER_PREFIX))
588 .filter(|(name, _)| name.rsplit_once('-').is_some())
589 .filter(|(name, _)| {
590 name.rsplit_once('-')
591 .map(|(_, ts)| ts.chars().all(|ch| ch.is_ascii_digit()))
592 .unwrap_or(false)
593 })
594 .map(|(name, value)| {
595 let (entry_index, time_delta) = parse_batched_header_name(&name)?;
596 Ok((entry_index, time_delta, value))
597 })
598 .collect::<Result<_, ReductError>>()?;
599
600 parsed_headers.sort_by(|(idx_a, delta_a, _), (idx_b, delta_b, _)| {
601 idx_a.cmp(idx_b).then_with(|| delta_a.cmp(delta_b))
602 });
603
604 Ok(parsed_headers)
605}
606
607pub fn parse_batched_headers(headers: &HeaderMap) -> Result<Vec<EntryRecordHeader>, ReductError> {
609 let entries = parse_entries(headers)?;
610 let start_ts = parse_start_timestamp(headers)?;
611 let label_names = parse_labels(headers)?;
612 let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
613 let mut result = Vec::new();
614
615 for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
616 let entry = entries.get(entry_index).ok_or_else(|| {
617 unprocessable_entity!(
618 "Invalid header '{}{}-{}': entry index out of range",
619 HEADER_PREFIX,
620 entry_index,
621 delta
622 )
623 })?;
624
625 let raw_value = value
626 .to_str()
627 .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
628
629 let header = parse_record_header_with_defaults(
630 raw_value,
631 last_header_per_entry.get(entry),
632 label_names.as_ref(),
633 )?;
634 let timestamp = start_ts + delta;
635
636 last_header_per_entry.insert(entry.clone(), header.clone());
637
638 result.push(EntryRecordHeader {
639 entry: entry.clone(),
640 timestamp,
641 header,
642 });
643 }
644
645 Ok(result)
646}
647
648pub fn parse_batched_update_headers(
653 headers: &HeaderMap,
654) -> Result<Vec<EntryLabelUpdateHeader>, ReductError> {
655 let entries = parse_entries(headers)?;
656 let start_ts = parse_start_timestamp(headers)?;
657 let label_names = parse_labels(headers)?;
658 let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
659 let mut entry_state: HashMap<String, HashMap<String, Option<String>>> = HashMap::new();
660 let mut result = Vec::new();
661
662 for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
663 let entry = entries.get(entry_index).ok_or_else(|| {
664 unprocessable_entity!(
665 "Invalid header '{}{}-{}': entry index out of range",
666 HEADER_PREFIX,
667 entry_index,
668 delta
669 )
670 })?;
671
672 let raw_value = value
673 .to_str()
674 .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
675 let parsed_header = parse_record_header_with_defaults(
676 raw_value,
677 last_header_per_entry.get(entry),
678 label_names.as_ref(),
679 )?;
680 last_header_per_entry.insert(entry.clone(), parsed_header);
681
682 if let Some(raw_labels) = raw_value.splitn(3, ',').nth(2) {
683 let (label_updates, label_removals) =
684 parse_label_delta(raw_labels, label_names.as_ref())?;
685 let state = entry_state.entry(entry.clone()).or_default();
686 for (key, value) in label_updates {
687 state.insert(key, Some(value));
688 }
689 for key in label_removals {
690 state.insert(key, None);
691 }
692 }
693
694 let state = entry_state.entry(entry.clone()).or_default();
695 let mut update = Labels::new();
696 let mut remove = HashSet::new();
697 for (key, value) in state {
698 match value {
699 Some(value) => {
700 update.insert(key.clone(), value.clone());
701 }
702 None => {
703 remove.insert(key.clone());
704 }
705 }
706 }
707
708 result.push(EntryLabelUpdateHeader {
709 entry: entry.clone(),
710 timestamp: start_ts + delta,
711 update,
712 remove,
713 });
714 }
715
716 Ok(result)
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722 use http::HeaderValue;
723
724 #[test]
725 fn test_encode_entry_name_slash() {
726 assert_eq!(encode_entry_name("ro/topic/1"), "ro%2Ftopic%2F1");
727 }
728
729 #[test]
730 fn test_encode_entry_name_safe_chars() {
731 assert_eq!(encode_entry_name("entry-1_foo~bar"), "entry-1_foo~bar");
732 }
733
734 #[test]
735 fn test_decode_entry_name_roundtrip() {
736 let entry = "mqtt/topic/1";
737 let encoded = encode_entry_name(entry);
738 assert_eq!(decode_entry_name(&encoded).unwrap(), entry);
739 }
740
741 #[test]
742 fn test_decode_entry_name_invalid_percent() {
743 let err = decode_entry_name("foo%ZZ").err().unwrap();
744 assert_eq!(
745 err,
746 unprocessable_entity!("Invalid entry encoding in header name: 'foo%ZZ'")
747 );
748 }
749
750 #[test]
751 fn test_parse_entries_header_roundtrip() {
752 let value = HeaderValue::from_str("sensor,ro%2Ftopic").unwrap();
753 let entries = parse_entries_header(&value).unwrap();
754 assert_eq!(entries, vec!["sensor".to_string(), "ro/topic".to_string()]);
755 }
756
757 #[test]
758 fn test_parse_entries_header_rejects_empty_entry() {
759 let value = HeaderValue::from_str("sensor,,ro%2Ftopic").unwrap();
760 let err = parse_entries_header(&value).err().unwrap();
761 assert_eq!(
762 err,
763 unprocessable_entity!("x-reduct-entries header must not contain empty entry names")
764 );
765 }
766
767 #[test]
768 fn test_parse_labels_header_roundtrip() {
769 let value = HeaderValue::from_str("label-1,foo%2Fbar").unwrap();
770 let labels = parse_labels_header(&value).unwrap();
771 assert_eq!(labels, vec!["label-1".to_string(), "foo/bar".to_string()]);
772 }
773
774 #[test]
775 fn test_parse_batched_header_name_basic() {
776 let (entry_index, delta) = parse_batched_header_name("x-reduct-1-123").unwrap();
777 assert_eq!(entry_index, 1);
778 assert_eq!(delta, 123);
779 }
780
781 #[test]
782 fn test_parse_batched_header_name_invalid_time() {
783 let err = parse_batched_header_name("x-reduct-1-abc").err().unwrap();
784 assert_eq!(
785 err,
786 unprocessable_entity!(
787 "Invalid header '{}': must be an unix timestamp in microseconds",
788 "x-reduct-1-abc"
789 )
790 );
791 }
792
793 #[test]
794 fn test_sort_headers_by_entry_and_time() {
795 let mut headers = HeaderMap::new();
796 headers.insert(
797 ENTRIES_HEADER,
798 HeaderValue::from_static("sensor,ro%2Ftopic"),
799 );
800 headers.insert(START_TS_HEADER, HeaderValue::from_static("10"));
801 headers.insert(
802 make_batched_header_name(1, 5),
803 HeaderValue::from_static("1,text/plain"),
804 );
805 headers.insert(
806 make_batched_header_name(0, 2),
807 HeaderValue::from_static("1,text/plain"),
808 );
809 headers.insert(
810 make_batched_header_name(0, 3),
811 HeaderValue::from_static("1,text/plain"),
812 );
813 headers.insert(
814 make_batched_header_name(1, 3),
815 HeaderValue::from_static("1,text/plain"),
816 );
817
818 let parsed = sort_headers_by_entry_and_time(&headers).unwrap();
819 assert_eq!(parsed.len(), 4);
820 assert_eq!(parsed[0].0, 0);
821 assert_eq!(parsed[0].1, 2);
822 assert_eq!(parsed[1].0, 0);
823 assert_eq!(parsed[1].1, 3);
824 assert_eq!(parsed[2].0, 1);
825 assert_eq!(parsed[2].1, 3);
826 assert_eq!(parsed[3].0, 1);
827 assert_eq!(parsed[3].1, 5);
828 }
829
830 #[test]
831 fn test_parse_batched_headers_with_values() {
832 let mut headers = HeaderMap::new();
833 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry,ro%2Ftopic"));
834 headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
835 headers.insert(LABELS_HEADER, HeaderValue::from_static("label"));
836 headers.insert(
837 make_batched_header_name(1, 15),
838 HeaderValue::from_static("3,text/plain,0=z"),
839 );
840 headers.insert(
841 make_batched_header_name(0, 10),
842 HeaderValue::from_static("5,text/csv,label=value"),
843 );
844
845 let parsed = parse_batched_headers(&headers).unwrap();
846 assert_eq!(parsed.len(), 2);
847
848 assert_eq!(parsed[0].entry, "entry");
849 assert_eq!(parsed[0].timestamp, 1010);
850 assert_eq!(parsed[0].header.content_length, 5);
851 assert_eq!(parsed[0].header.content_type, "text/csv");
852 assert_eq!(parsed[0].header.labels.get("label").unwrap(), "value");
853
854 assert_eq!(parsed[1].entry, "ro/topic");
855 assert_eq!(parsed[1].timestamp, 1015);
856 assert_eq!(parsed[1].header.content_length, 3);
857 assert_eq!(parsed[1].header.content_type, "text/plain");
858 assert_eq!(parsed[1].header.labels.get("label").unwrap(), "z");
859 }
860
861 #[test]
862 fn test_parse_batched_headers_reuse_metadata() {
863 let mut headers = HeaderMap::new();
864 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
865 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
866 headers.insert(LABELS_HEADER, HeaderValue::from_static("x"));
867 headers.insert(
868 make_batched_header_name(0, 0),
869 HeaderValue::from_static("10,text/plain,0=y"),
870 );
871 headers.insert(
872 make_batched_header_name(0, 5),
873 HeaderValue::from_static("2"),
874 ); headers.insert(
876 make_batched_header_name(0, 10),
877 HeaderValue::from_static("3,,0=z"),
878 ); let parsed = parse_batched_headers(&headers).unwrap();
881 assert_eq!(parsed.len(), 3);
882 assert_eq!(parsed[0].header.content_type, "text/plain");
883 assert_eq!(parsed[0].header.labels.get("x").unwrap(), "y");
884
885 assert_eq!(parsed[1].header.content_type, "text/plain");
886 assert_eq!(parsed[1].header.labels.get("x").unwrap(), "y");
887
888 assert_eq!(parsed[2].header.content_type, "text/plain");
889 assert_eq!(parsed[2].header.labels.get("x").unwrap(), "z");
890 }
891
892 #[test]
893 fn test_parse_batched_headers_with_label_indexes() {
894 let mut headers = HeaderMap::new();
895 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
896 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
897 headers.insert(LABELS_HEADER, HeaderValue::from_static("a,b"));
898
899 headers.insert(
900 make_batched_header_name(0, 0),
901 HeaderValue::from_static("10,text/plain,0=1,1=2"),
902 );
903 headers.insert(
904 make_batched_header_name(0, 5),
905 HeaderValue::from_static("2,,0="), );
907
908 let parsed = parse_batched_headers(&headers).unwrap();
909 assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
910 assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
911 assert!(!parsed[1].header.labels.contains_key("a"));
912 assert_eq!(parsed[1].header.labels.get("b").unwrap(), "2");
913 }
914
915 #[test]
916 fn test_parse_batched_update_headers_preserves_removal_ops() {
917 let mut headers = HeaderMap::new();
918 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
919 headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
920 headers.insert(
921 make_batched_header_name(0, 0),
922 HeaderValue::from_static("0,,a=\"hello,world\",b="),
923 );
924
925 let parsed = parse_batched_update_headers(&headers).unwrap();
926 assert_eq!(parsed.len(), 1);
927 assert_eq!(parsed[0].entry, "entry");
928 assert_eq!(parsed[0].timestamp, 1000);
929 assert_eq!(parsed[0].update.get("a").unwrap(), "hello,world");
930 assert!(parsed[0].remove.contains("b"));
931 }
932
933 #[test]
934 fn test_parse_batched_update_headers_reuses_entry_state() {
935 let mut headers = HeaderMap::new();
936 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
937 headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
938 headers.insert(LABELS_HEADER, HeaderValue::from_static("key,remove"));
939 headers.insert(
940 make_batched_header_name(0, 0),
941 HeaderValue::from_static("0,,0=meta-1,1=true"),
942 );
943 headers.insert(
944 make_batched_header_name(0, 1),
945 HeaderValue::from_static("0,,0=meta-2"),
946 );
947
948 let parsed = parse_batched_update_headers(&headers).unwrap();
949 assert_eq!(parsed.len(), 2);
950 assert_eq!(parsed[0].update.get("key").unwrap(), "meta-1");
951 assert_eq!(parsed[0].update.get("remove").unwrap(), "true");
952 assert!(parsed[0].remove.is_empty());
953
954 assert_eq!(parsed[1].update.get("key").unwrap(), "meta-2");
955 assert_eq!(parsed[1].update.get("remove").unwrap(), "true");
956 assert!(parsed[1].remove.is_empty());
957 }
958
959 #[test]
960 fn test_label_delta_removal() {
961 let mut headers = HeaderMap::new();
962 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
963 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
964 headers.insert(
965 make_batched_header_name(0, 0),
966 HeaderValue::from_static("10,text/plain,a=1,b=2"),
967 );
968 headers.insert(
969 make_batched_header_name(0, 5),
970 HeaderValue::from_static("5,text/plain,b="),
971 );
972
973 let parsed = parse_batched_headers(&headers).unwrap();
974 assert_eq!(parsed.len(), 2);
975 assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
976 assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
977 assert_eq!(parsed[1].header.labels.get("a").unwrap(), "1");
978 assert!(!parsed[1].header.labels.contains_key("b"));
979 }
980
981 #[test]
982 fn test_parse_batched_headers_reuse_without_prev_error() {
983 let mut headers = HeaderMap::new();
984 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
985 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
986 headers.insert(
987 make_batched_header_name(0, 0),
988 HeaderValue::from_static("10"),
989 ); let err = parse_batched_headers(&headers).err().unwrap();
992 assert_eq!(
993 err,
994 unprocessable_entity!(
995 "Content-type and labels must be provided for the first record of an entry"
996 )
997 );
998 }
999
1000 #[test]
1001 fn test_parse_batched_headers_invalid_index() {
1002 let mut headers = HeaderMap::new();
1003 headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
1004 headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
1005 headers.insert(
1006 make_batched_header_name(1, 0),
1007 HeaderValue::from_static("1,text/plain"),
1008 );
1009
1010 let err = parse_batched_headers(&headers).err().unwrap();
1011 assert_eq!(
1012 err,
1013 unprocessable_entity!("Invalid header 'x-reduct-1-0': entry index out of range")
1014 );
1015 }
1016
1017 #[test]
1018 fn test_parse_batched_headers_missing_meta() {
1019 let mut headers = HeaderMap::new();
1020 headers.insert(
1021 make_batched_header_name(0, 0),
1022 HeaderValue::from_static("1,text/plain"),
1023 );
1024
1025 let err = parse_batched_headers(&headers).err().unwrap();
1026 assert_eq!(
1027 err,
1028 unprocessable_entity!("x-reduct-start-ts header is required")
1029 );
1030 }
1031
1032 #[test]
1033 fn test_parse_label_delta_updates_and_removals() {
1034 let label_names = vec!["a".to_string(), "b".to_string()];
1035 let (updates, remove) =
1036 parse_label_delta("0=one,1=,c=\"3,4\"", Some(&label_names)).unwrap();
1037
1038 assert_eq!(updates.get("a").unwrap(), "one");
1039 assert_eq!(updates.get("c").unwrap(), "3,4");
1040 assert!(remove.contains("b"));
1041 assert_eq!(remove.len(), 1);
1042 }
1043
1044 #[test]
1045 fn test_resolve_label_name_reserved_prefix() {
1046 let err = resolve_label_name("@cpu", None).err().unwrap();
1047 assert_eq!(
1048 err,
1049 unprocessable_entity!(
1050 "Label names must not start with '@': reserved for computed labels",
1051 )
1052 );
1053 }
1054
1055 #[test]
1056 fn test_resolve_label_name_index_out_of_range() {
1057 let label_names = vec!["a".to_string()];
1058 let err = resolve_label_name("2", Some(&label_names)).err().unwrap();
1059 assert_eq!(
1060 err,
1061 unprocessable_entity!("Label index '2' is out of range")
1062 );
1063 }
1064
1065 #[test]
1066 fn test_make_error_batched_header() {
1067 let err = unprocessable_entity!("broken");
1068 let (name, value) = make_error_batched_header(2, 10, &err);
1069
1070 assert_eq!(name.as_str(), "x-reduct-error-2-10");
1071 assert_eq!(
1072 value.to_str().unwrap(),
1073 format!("{},{}", err.status(), err.message())
1074 );
1075 }
1076
1077 #[cfg(feature = "io")]
1078 mod io_tests {
1079 use super::*;
1080 use std::collections::HashMap;
1081
1082 fn build_meta(
1083 labels: &[(&str, &str)],
1084 computed: &[(&str, &str)],
1085 content_type: &str,
1086 content_length: u64,
1087 ) -> RecordMeta {
1088 let mut label_map = HashMap::new();
1089 for (key, value) in labels {
1090 label_map.insert((*key).to_string(), (*value).to_string());
1091 }
1092
1093 let mut computed_map = HashMap::new();
1094 for (key, value) in computed {
1095 computed_map.insert((*key).to_string(), (*value).to_string());
1096 }
1097
1098 RecordMeta::builder()
1099 .labels(label_map)
1100 .computed_labels(computed_map)
1101 .content_type(content_type.to_string())
1102 .content_length(content_length)
1103 .build()
1104 }
1105
1106 #[test]
1107 fn test_build_label_delta_with_previous_and_computed() {
1108 let mut label_index = LabelIndex::default();
1109 label_index.ensure("a");
1110 label_index.ensure("b");
1111 label_index.ensure("c");
1112 label_index.ensure("@cpu");
1113
1114 let mut previous = Labels::new();
1115 previous.insert("a".to_string(), "1".to_string());
1116 previous.insert("b".to_string(), "2".to_string());
1117
1118 let meta = build_meta(&[("a", "1"), ("c", "3,4")], &[("cpu", "10")], "text", 1);
1119
1120 let delta = build_label_delta(&meta, Some(&previous), &mut label_index);
1121 assert_eq!(delta, "1=,2=\"3,4\",3=10");
1122 }
1123
1124 #[test]
1125 fn test_make_record_header_value_reuse_metadata() {
1126 let mut label_index = LabelIndex::default();
1127 label_index.ensure("a");
1128
1129 let mut previous = Labels::new();
1130 previous.insert("a".to_string(), "1".to_string());
1131
1132 let meta = build_meta(&[("a", "1")], &[], "text/plain", 8);
1133 let value = make_record_header_value(
1134 &meta,
1135 Some("text/plain"),
1136 Some(&previous),
1137 &mut label_index,
1138 );
1139
1140 assert_eq!(value.to_str().unwrap(), "8");
1141 }
1142
1143 #[test]
1144 fn test_make_record_header_value_with_label_delta() {
1145 let mut label_index = LabelIndex::default();
1146 label_index.ensure("a");
1147
1148 let previous = Labels::new();
1149 let meta = build_meta(&[("a", "1")], &[], "text/plain", 10);
1150 let value = make_record_header_value(
1151 &meta,
1152 Some("text/plain"),
1153 Some(&previous),
1154 &mut label_index,
1155 );
1156
1157 assert_eq!(value.to_str().unwrap(), "10,,0=1");
1158 }
1159 }
1160}