Skip to main content

reduct_base/batch/
v2.rs

1// Copyright 2025 ReductSoftware UG
2// This Source Code Form is subject to the terms of the Mozilla Public
3//    License, v. 2.0. If a copy of the MPL was not distributed with this
4//    file, You can obtain one at https://mozilla.org/MPL/2.0/.
5//
6// Batched protocol v2
7// -------------------
8// Metadata:
9//   x-reduct-entries: comma-separated, percent-encoded entry names (index == position)
10//   x-reduct-labels (optional): comma-separated, percent-encoded label names (index == position)
11//   x-reduct-start-ts: first timestamp in the batch
12//   Content is sorted by entry index in ascending order, then by timestamp (start + delta).
13//
14// Per record:
15//   x-reduct-<ENTRY-INDEX>-<TIME-DELTA-uS>: batched header value
16//   (error responses use the same suffix: x-reduct-error-<ENTRY-INDEX>-<TIME-DELTA-uS>)
17//
18// Header value rules (optimised to avoid repetition):
19//   - Always includes content-length.
20//   - Metadata can be omitted or sent as deltas per entry:
21//       * "123" -> reuse previous content-type and labels for the entry.
22//       * "123,<ct>" -> reuse previous labels; set content-type to <ct>.
23//       * "123,,<label-delta>" -> reuse previous content-type; apply label delta.
24//       * "123,<ct>,<label-delta>" -> explicit content-type and label delta.
25//   - Label delta sends only changed labels; unset a label with `k=`. Unmentioned labels
26//     keep their previous value for the entry. When `x-reduct-labels` is provided, the
27//     label delta may use indexes instead of names (`<IDX>=<VALUE>`), where the index
28//     refers to the position in `x-reduct-labels`.
29//   - The first record for an entry must provide content-type and labels when re-use is
30//     requested; otherwise defaults apply (content-type defaults to octet-stream, labels to empty).
31
32use crate::batch::v1::RecordHeader;
33use crate::error::ReductError;
34#[cfg(feature = "io")]
35use crate::io::RecordMeta;
36use crate::unprocessable_entity;
37use crate::Labels;
38use http::{HeaderMap, HeaderName, HeaderValue};
39use std::collections::{HashMap, HashSet};
40use std::str::FromStr;
41
42pub const HEADER_PREFIX: &str = "x-reduct-";
43pub const ERROR_HEADER_PREFIX: &str = "x-reduct-error-";
44pub const ENTRIES_HEADER: &str = "x-reduct-entries";
45pub const START_TS_HEADER: &str = "x-reduct-start-ts";
46pub const LABELS_HEADER: &str = "x-reduct-labels";
47
48pub const QUERY_ID_HEADER: &str = "x-reduct-query-id";
49
50/// Represents a parsed batched header that includes the entry name and timestamp.
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct EntryRecordHeader {
53    pub entry: String,
54    pub timestamp: u64,
55    pub header: RecordHeader,
56}
57
58fn is_tchar(byte: u8) -> bool {
59    byte.is_ascii_alphanumeric()
60        || matches!(
61            byte,
62            b'!' | b'#'
63                | b'$'
64                | b'%'
65                | b'&'
66                | b'\''
67                | b'*'
68                | b'+'
69                | b'-'
70                | b'.'
71                | b'^'
72                | b'_'
73                | b'`'
74                | b'|'
75                | b'~'
76        )
77}
78
79/// Percent-encode an entry name so it can be used in header values.
80pub fn encode_entry_name(entry: &str) -> String {
81    let mut encoded = String::with_capacity(entry.len());
82    for byte in entry.as_bytes() {
83        if is_tchar(*byte) {
84            encoded.push(*byte as char);
85        } else {
86            encoded.push_str(&format!("%{:02X}", byte));
87        }
88    }
89    encoded
90}
91
92/// Decode an entry name that was percent-encoded for use in headers.
93pub fn decode_entry_name(encoded: &str) -> Result<String, ReductError> {
94    let mut decoded = Vec::with_capacity(encoded.len());
95    let bytes = encoded.as_bytes();
96    let mut pos = 0;
97    while pos < bytes.len() {
98        match bytes[pos] {
99            b'%' => {
100                if pos + 2 >= bytes.len() {
101                    return Err(unprocessable_entity!(
102                        "Invalid entry encoding in header name: '{}'",
103                        encoded
104                    ));
105                }
106                let high = (bytes[pos + 1] as char).to_digit(16);
107                let low = (bytes[pos + 2] as char).to_digit(16);
108                if high.is_none() || low.is_none() {
109                    return Err(unprocessable_entity!(
110                        "Invalid entry encoding in header name: '{}'",
111                        encoded
112                    ));
113                }
114                decoded.push((high.unwrap() * 16 + low.unwrap()) as u8);
115                pos += 3;
116            }
117            other => {
118                decoded.push(other);
119                pos += 1;
120            }
121        }
122    }
123
124    String::from_utf8(decoded)
125        .map_err(|_| unprocessable_entity!("Entry name is not valid UTF-8 in header '{}'", encoded))
126}
127
128/// Parse the `x-reduct-entries` header containing a comma separated list of percent-encoded entries.
129pub fn parse_entries_header(entries: &HeaderValue) -> Result<Vec<String>, ReductError> {
130    let entries = entries
131        .to_str()
132        .map_err(|_| unprocessable_entity!("Invalid entries header"))?;
133    if entries.trim().is_empty() {
134        return Err(unprocessable_entity!("x-reduct-entries header is required"));
135    }
136
137    entries
138        .split(',')
139        .map(|entry| decode_entry_name(entry.trim()))
140        .collect()
141}
142
143/// Decode a percent-encoded label name.
144pub fn decode_label_name(encoded: &str) -> Result<String, ReductError> {
145    decode_entry_name(encoded)
146        .map_err(|_| unprocessable_entity!("Invalid label encoding in header value: '{}'", encoded))
147}
148
149/// Percent-encode a label name.
150pub fn encode_label_name(label: &str) -> String {
151    encode_entry_name(label)
152}
153
154/// Parse the `x-reduct-labels` header containing a comma separated list of percent-encoded labels.
155pub fn parse_labels_header(labels: &HeaderValue) -> Result<Vec<String>, ReductError> {
156    let labels = labels
157        .to_str()
158        .map_err(|_| unprocessable_entity!("Invalid labels header"))?;
159    if labels.trim().is_empty() {
160        return Err(unprocessable_entity!("x-reduct-labels header is empty"));
161    }
162
163    labels
164        .split(',')
165        .map(|label| decode_label_name(label.trim()))
166        .collect()
167}
168
169/// Keeps label names unique and provides the value for the `x-reduct-labels` header.
170#[derive(Debug, Default, Clone)]
171pub struct LabelIndex {
172    names: Vec<String>,
173    lookup: HashMap<String, usize>,
174}
175
176impl LabelIndex {
177    /// Returns the index of the label name, inserting it if missing.
178    pub fn ensure(&mut self, name: &str) -> usize {
179        if let Some(idx) = self.lookup.get(name) {
180            return *idx;
181        }
182
183        let idx = self.names.len();
184        self.names.push(name.to_string());
185        self.lookup.insert(name.to_string(), idx);
186        idx
187    }
188
189    /// Returns the header value for `x-reduct-labels` if at least one label was registered.
190    pub fn as_header(&self) -> Option<HeaderValue> {
191        if self.names.is_empty() {
192            return None;
193        }
194
195        let encoded = self
196            .names
197            .iter()
198            .map(|name| encode_label_name(name))
199            .collect::<Vec<_>>()
200            .join(",");
201        Some(encoded.parse().unwrap())
202    }
203
204    /// Returns the collected label names.
205    pub fn names(&self) -> &[String] {
206        &self.names
207    }
208}
209
210/// Build a label delta string for batched protocol v2.
211#[cfg(feature = "io")]
212pub fn build_label_delta(
213    meta: &RecordMeta,
214    previous_labels: Option<&Labels>,
215    label_index: &mut LabelIndex,
216) -> String {
217    let mut deltas: Vec<(usize, String)> = Vec::new();
218
219    let format_value = |value: &str| {
220        if value.contains(',') {
221            format!("\"{}\"", value)
222        } else {
223            value.to_string()
224        }
225    };
226
227    if let Some(prev) = previous_labels {
228        let mut keys: Vec<String> = prev
229            .keys()
230            .chain(meta.labels().keys())
231            .map(|k| k.to_string())
232            .collect();
233        keys.sort();
234        keys.dedup();
235
236        for key in keys {
237            let prev_val = prev.get(&key);
238            let curr_val = meta.labels().get(&key);
239            match (prev_val, curr_val) {
240                (Some(p), Some(c)) if p == c => continue,
241                (Some(_), None) => {
242                    let idx = label_index.ensure(&key);
243                    deltas.push((idx, String::new()))
244                }
245                (_, Some(c)) => {
246                    let idx = label_index.ensure(&key);
247                    deltas.push((idx, format_value(c)))
248                }
249                _ => {}
250            }
251        }
252    } else {
253        for (k, v) in meta.labels().iter() {
254            let idx = label_index.ensure(k);
255            deltas.push((idx, format_value(v)));
256        }
257    }
258
259    for (k, v) in meta.computed_labels() {
260        let idx = label_index.ensure(&format!("@{}", k));
261        deltas.push((idx, format_value(v)));
262    }
263
264    deltas.sort_by_key(|(idx, _)| *idx);
265    deltas
266        .into_iter()
267        .map(|(idx, value)| format!("{}={}", idx, value))
268        .collect::<Vec<_>>()
269        .join(",")
270}
271
272/// Construct a record header value for batched protocol v2.
273#[cfg(feature = "io")]
274pub fn make_record_header_value(
275    meta: &RecordMeta,
276    previous_content_type: Option<&str>,
277    previous_labels: Option<&Labels>,
278    label_index: &mut LabelIndex,
279) -> HeaderValue {
280    let mut parts: Vec<String> = vec![meta.content_length().to_string()];
281
282    let mut content_type = String::new();
283    match previous_content_type {
284        Some(prev) if prev != meta.content_type() => content_type = meta.content_type().to_string(),
285        None => content_type = meta.content_type().to_string(),
286        _ => {}
287    }
288
289    let labels_delta = build_label_delta(meta, previous_labels, label_index);
290    let has_labels = !labels_delta.is_empty();
291
292    if !content_type.is_empty() || has_labels {
293        parts.push(content_type);
294    }
295
296    if has_labels {
297        parts.push(labels_delta);
298    }
299
300    parts.join(",").parse().unwrap()
301}
302
303/// Create a header name for batched protocol v2: `x-reduct-<ENTRY-INDEX>-<TIME-DELTA>`.
304pub fn make_batched_header_name(entry_index: usize, time_delta: u64) -> HeaderName {
305    HeaderName::from_str(&format!("{}{}-{}", HEADER_PREFIX, entry_index, time_delta))
306        .expect("Entry index and time delta must produce a valid header name")
307}
308
309/// Parse a v2 batched header name (`x-reduct-<ENTRY-INDEX>-<TIME-DELTA>`) into index and delta.
310pub fn parse_batched_header_name(name: &str) -> Result<(usize, u64), ReductError> {
311    if !name.starts_with(HEADER_PREFIX) {
312        return Err(unprocessable_entity!("Invalid batched header '{}'", name));
313    }
314
315    let without_prefix = &name[HEADER_PREFIX.len()..];
316    let (entry_index, delta) = without_prefix
317        .rsplit_once('-')
318        .ok_or(unprocessable_entity!("Invalid batched header '{}'", name))?;
319
320    let entry_index: usize = entry_index.parse().map_err(|_| {
321        unprocessable_entity!("Invalid header '{}': entry index must be a number", name)
322    })?;
323
324    let delta = delta.parse::<u64>().map_err(|_| {
325        unprocessable_entity!(
326            "Invalid header '{}': must be an unix timestamp in microseconds",
327            name
328        )
329    })?;
330
331    Ok((entry_index, delta))
332}
333
334pub fn make_error_batched_header(
335    entry_index: usize,
336    time_delta: u64,
337    err: &ReductError,
338) -> (HeaderName, HeaderValue) {
339    let name = HeaderName::from_str(&format!(
340        "{}{}-{}",
341        ERROR_HEADER_PREFIX, entry_index, time_delta
342    ))
343    .expect("Entry index and time delta must produce a valid header name");
344    let value = HeaderValue::from_str(&format!("{},{}", err.status(), err.message()))
345        .expect("Status code and message must produce a valid header value");
346    (name, value)
347}
348
349#[inline]
350pub fn make_entries_header(entries: &[String]) -> HeaderValue {
351    let encoded = entries
352        .iter()
353        .map(|entry| encode_entry_name(entry))
354        .collect::<Vec<_>>()
355        .join(",");
356    encoded.parse().unwrap()
357}
358
359#[inline]
360pub fn make_start_timestamp_header(start_ts: u64) -> HeaderValue {
361    HeaderValue::from_str(&start_ts.to_string()).unwrap()
362}
363
364fn parse_start_timestamp_internal(headers: &HeaderMap) -> Result<u64, ReductError> {
365    headers
366        .get(START_TS_HEADER)
367        .ok_or(unprocessable_entity!(
368            "x-reduct-start-ts header is required"
369        ))?
370        .to_str()
371        .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))?
372        .parse::<u64>()
373        .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))
374}
375
376pub fn parse_start_timestamp(headers: &HeaderMap) -> Result<u64, ReductError> {
377    parse_start_timestamp_internal(headers)
378}
379
380pub fn parse_entries(headers: &HeaderMap) -> Result<Vec<String>, ReductError> {
381    if let Some(entries) = headers.get(ENTRIES_HEADER) {
382        parse_entries_header(entries)
383    } else {
384        Ok(Vec::new())
385    }
386}
387
388pub fn parse_labels(headers: &HeaderMap) -> Result<Option<Vec<String>>, ReductError> {
389    match headers.get(LABELS_HEADER) {
390        None => Ok(None),
391        Some(labels) => parse_labels_header(labels).map(Some),
392    }
393}
394
395pub fn resolve_label_name<'a>(
396    raw: &'a str,
397    label_names: Option<&Vec<String>>,
398) -> Result<String, ReductError> {
399    if let (Some(label_names), Ok(idx)) = (label_names, raw.parse::<usize>()) {
400        return label_names
401            .get(idx)
402            .cloned()
403            .ok_or_else(|| unprocessable_entity!("Label index '{}' is out of range", raw));
404    }
405
406    if raw.starts_with('@') {
407        return Err(unprocessable_entity!(
408            "Label names must not start with '@': reserved for computed labels",
409        ));
410    }
411
412    Ok(raw.to_string())
413}
414
415fn apply_label_delta(
416    raw_labels: &str,
417    base: &Labels,
418    label_names: Option<&Vec<String>>,
419) -> Result<Labels, ReductError> {
420    let mut labels = base.clone();
421    for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
422        match value {
423            Some(value) => {
424                labels.insert(key.to_string(), value);
425            }
426            None => {
427                labels.remove(&key);
428            }
429        }
430    }
431
432    Ok(labels)
433}
434
435fn parse_label_delta_ops(
436    raw_labels: &str,
437    label_names: Option<&Vec<String>>,
438) -> Result<Vec<(String, Option<String>)>, ReductError> {
439    let mut ops = Vec::new();
440    let mut rest = raw_labels.trim().to_string();
441
442    if rest.is_empty() {
443        return Ok(ops);
444    }
445
446    loop {
447        let (raw_key, value_part) = rest
448            .split_once('=')
449            .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
450        let key = resolve_label_name(raw_key.trim(), label_names)?;
451
452        let (value, next_rest) = if value_part.starts_with('\"') {
453            let value_part = &value_part[1..];
454            let (value, rest) = value_part
455                .split_once('\"')
456                .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
457            (
458                value.trim().to_string(),
459                rest.trim_start_matches(',').trim().to_string(),
460            )
461        } else if let Some((value, rest)) = value_part.split_once(',') {
462            (value.trim().to_string(), rest.trim().to_string())
463        } else {
464            (value_part.trim().to_string(), String::new())
465        };
466
467        let value = if value.is_empty() { None } else { Some(value) };
468        ops.push((key, value));
469
470        if next_rest.is_empty() {
471            break;
472        }
473        rest = next_rest;
474    }
475
476    Ok(ops)
477}
478
479pub fn parse_label_delta(
480    raw_labels: &str,
481    label_names: Option<&Vec<String>>,
482) -> Result<(Labels, HashSet<String>), ReductError> {
483    let mut updates = Labels::new();
484    let mut remove = HashSet::new();
485
486    for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
487        match value {
488            Some(value) => {
489                updates.insert(key, value);
490            }
491            None => {
492                remove.insert(key);
493            }
494        }
495    }
496
497    Ok((updates, remove))
498}
499
500fn parse_record_header_with_defaults(
501    raw: &str,
502    previous: Option<&RecordHeader>,
503    label_names: Option<&Vec<String>>,
504) -> Result<RecordHeader, ReductError> {
505    let (content_length_str, rest_opt) = raw
506        .split_once(',')
507        .map(|(len, rest)| (len.trim(), Some(rest)))
508        .unwrap_or((raw.trim(), None));
509
510    let content_length = content_length_str
511        .parse::<u64>()
512        .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
513
514    if rest_opt.is_none() {
515        let prev = previous.ok_or_else(|| {
516            unprocessable_entity!(
517                "Content-type and labels must be provided for the first record of an entry"
518            )
519        })?;
520        return Ok(RecordHeader {
521            content_length,
522            content_type: prev.content_type.clone(),
523            labels: prev.labels.clone(),
524        });
525    }
526
527    let rest = rest_opt.unwrap();
528    let (content_type_raw, labels_raw) = match rest.split_once(',') {
529        Some((ct, labels)) => (ct, Some(labels)),
530        None => (rest, None),
531    };
532
533    let content_type = if !content_type_raw.trim().is_empty() {
534        content_type_raw.trim().to_string()
535    } else if let Some(prev) = previous {
536        prev.content_type.clone()
537    } else {
538        "application/octet-stream".to_string()
539    };
540
541    let labels = match labels_raw {
542        None => previous
543            .map(|prev| prev.labels.clone())
544            .unwrap_or_else(HashMap::new),
545        Some(raw_labels) => apply_label_delta(
546            raw_labels,
547            previous.map(|prev| &prev.labels).unwrap_or(&HashMap::new()),
548            label_names,
549        )?,
550    };
551
552    Ok(RecordHeader {
553        content_length,
554        content_type,
555        labels,
556    })
557}
558
559/// Sort and parse v2 batched headers in a header map.
560///
561/// Only headers following the `x-reduct-<ENTRY-INDEX>-<TIME-DELTA>` pattern are considered.
562/// Headers are sorted by entry index, then timestamp (start + delta).
563pub fn sort_headers_by_entry_and_time(
564    headers: &HeaderMap,
565) -> Result<Vec<(usize, u64, HeaderValue)>, ReductError> {
566    let mut parsed_headers: Vec<(usize, u64, HeaderValue)> = headers
567        .clone()
568        .into_iter()
569        .filter(|(name, _)| name.is_some())
570        .map(|(name, value)| (name.unwrap().to_string(), value))
571        .filter(|(name, _)| name.starts_with(HEADER_PREFIX))
572        .filter(|(name, _)| name.rsplit_once('-').is_some())
573        .filter(|(name, _)| {
574            name.rsplit_once('-')
575                .map(|(_, ts)| ts.chars().all(|ch| ch.is_ascii_digit()))
576                .unwrap_or(false)
577        })
578        .map(|(name, value)| {
579            let (entry_index, time_delta) = parse_batched_header_name(&name)?;
580            Ok((entry_index, time_delta, value))
581        })
582        .collect::<Result<_, ReductError>>()?;
583
584    parsed_headers.sort_by(|(idx_a, delta_a, _), (idx_b, delta_b, _)| {
585        idx_a.cmp(idx_b).then_with(|| delta_a.cmp(delta_b))
586    });
587
588    Ok(parsed_headers)
589}
590
591/// Parse and sort v2 batched headers including record metadata.
592pub fn parse_batched_headers(headers: &HeaderMap) -> Result<Vec<EntryRecordHeader>, ReductError> {
593    let entries = parse_entries(headers)?;
594    let start_ts = parse_start_timestamp(headers)?;
595    let label_names = parse_labels(headers)?;
596    let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
597    let mut result = Vec::new();
598
599    for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
600        let entry = entries.get(entry_index).ok_or_else(|| {
601            unprocessable_entity!(
602                "Invalid header '{}{}-{}': entry index out of range",
603                HEADER_PREFIX,
604                entry_index,
605                delta
606            )
607        })?;
608
609        let raw_value = value
610            .to_str()
611            .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
612
613        let header = parse_record_header_with_defaults(
614            raw_value,
615            last_header_per_entry.get(entry),
616            label_names.as_ref(),
617        )?;
618        let timestamp = start_ts + delta;
619
620        last_header_per_entry.insert(entry.clone(), header.clone());
621
622        result.push(EntryRecordHeader {
623            entry: entry.clone(),
624            timestamp,
625            header,
626        });
627    }
628
629    Ok(result)
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use http::HeaderValue;
636
637    #[test]
638    fn test_encode_entry_name_slash() {
639        assert_eq!(encode_entry_name("ro/topic/1"), "ro%2Ftopic%2F1");
640    }
641
642    #[test]
643    fn test_encode_entry_name_safe_chars() {
644        assert_eq!(encode_entry_name("entry-1_foo~bar"), "entry-1_foo~bar");
645    }
646
647    #[test]
648    fn test_decode_entry_name_roundtrip() {
649        let entry = "mqtt/topic/1";
650        let encoded = encode_entry_name(entry);
651        assert_eq!(decode_entry_name(&encoded).unwrap(), entry);
652    }
653
654    #[test]
655    fn test_decode_entry_name_invalid_percent() {
656        let err = decode_entry_name("foo%ZZ").err().unwrap();
657        assert_eq!(
658            err,
659            unprocessable_entity!("Invalid entry encoding in header name: 'foo%ZZ'")
660        );
661    }
662
663    #[test]
664    fn test_parse_entries_header_roundtrip() {
665        let value = HeaderValue::from_str("sensor,ro%2Ftopic").unwrap();
666        let entries = parse_entries_header(&value).unwrap();
667        assert_eq!(entries, vec!["sensor".to_string(), "ro/topic".to_string()]);
668    }
669
670    #[test]
671    fn test_parse_labels_header_roundtrip() {
672        let value = HeaderValue::from_str("label-1,foo%2Fbar").unwrap();
673        let labels = parse_labels_header(&value).unwrap();
674        assert_eq!(labels, vec!["label-1".to_string(), "foo/bar".to_string()]);
675    }
676
677    #[test]
678    fn test_parse_batched_header_name_basic() {
679        let (entry_index, delta) = parse_batched_header_name("x-reduct-1-123").unwrap();
680        assert_eq!(entry_index, 1);
681        assert_eq!(delta, 123);
682    }
683
684    #[test]
685    fn test_parse_batched_header_name_invalid_time() {
686        let err = parse_batched_header_name("x-reduct-1-abc").err().unwrap();
687        assert_eq!(
688            err,
689            unprocessable_entity!(
690                "Invalid header '{}': must be an unix timestamp in microseconds",
691                "x-reduct-1-abc"
692            )
693        );
694    }
695
696    #[test]
697    fn test_sort_headers_by_entry_and_time() {
698        let mut headers = HeaderMap::new();
699        headers.insert(
700            ENTRIES_HEADER,
701            HeaderValue::from_static("sensor,ro%2Ftopic"),
702        );
703        headers.insert(START_TS_HEADER, HeaderValue::from_static("10"));
704        headers.insert(
705            make_batched_header_name(1, 5),
706            HeaderValue::from_static("1,text/plain"),
707        );
708        headers.insert(
709            make_batched_header_name(0, 2),
710            HeaderValue::from_static("1,text/plain"),
711        );
712        headers.insert(
713            make_batched_header_name(0, 3),
714            HeaderValue::from_static("1,text/plain"),
715        );
716        headers.insert(
717            make_batched_header_name(1, 3),
718            HeaderValue::from_static("1,text/plain"),
719        );
720
721        let parsed = sort_headers_by_entry_and_time(&headers).unwrap();
722        assert_eq!(parsed.len(), 4);
723        assert_eq!(parsed[0].0, 0);
724        assert_eq!(parsed[0].1, 2);
725        assert_eq!(parsed[1].0, 0);
726        assert_eq!(parsed[1].1, 3);
727        assert_eq!(parsed[2].0, 1);
728        assert_eq!(parsed[2].1, 3);
729        assert_eq!(parsed[3].0, 1);
730        assert_eq!(parsed[3].1, 5);
731    }
732
733    #[test]
734    fn test_parse_batched_headers_with_values() {
735        let mut headers = HeaderMap::new();
736        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry,ro%2Ftopic"));
737        headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
738        headers.insert(LABELS_HEADER, HeaderValue::from_static("label"));
739        headers.insert(
740            make_batched_header_name(1, 15),
741            HeaderValue::from_static("3,text/plain,0=z"),
742        );
743        headers.insert(
744            make_batched_header_name(0, 10),
745            HeaderValue::from_static("5,text/csv,label=value"),
746        );
747
748        let parsed = parse_batched_headers(&headers).unwrap();
749        assert_eq!(parsed.len(), 2);
750
751        assert_eq!(parsed[0].entry, "entry");
752        assert_eq!(parsed[0].timestamp, 1010);
753        assert_eq!(parsed[0].header.content_length, 5);
754        assert_eq!(parsed[0].header.content_type, "text/csv");
755        assert_eq!(parsed[0].header.labels.get("label").unwrap(), "value");
756
757        assert_eq!(parsed[1].entry, "ro/topic");
758        assert_eq!(parsed[1].timestamp, 1015);
759        assert_eq!(parsed[1].header.content_length, 3);
760        assert_eq!(parsed[1].header.content_type, "text/plain");
761        assert_eq!(parsed[1].header.labels.get("label").unwrap(), "z");
762    }
763
764    #[test]
765    fn test_parse_batched_headers_reuse_metadata() {
766        let mut headers = HeaderMap::new();
767        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
768        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
769        headers.insert(LABELS_HEADER, HeaderValue::from_static("x"));
770        headers.insert(
771            make_batched_header_name(0, 0),
772            HeaderValue::from_static("10,text/plain,0=y"),
773        );
774        headers.insert(
775            make_batched_header_name(0, 5),
776            HeaderValue::from_static("2"),
777        ); // reuse type + labels
778        headers.insert(
779            make_batched_header_name(0, 10),
780            HeaderValue::from_static("3,,0=z"),
781        ); // reuse type, override labels
782
783        let parsed = parse_batched_headers(&headers).unwrap();
784        assert_eq!(parsed.len(), 3);
785        assert_eq!(parsed[0].header.content_type, "text/plain");
786        assert_eq!(parsed[0].header.labels.get("x").unwrap(), "y");
787
788        assert_eq!(parsed[1].header.content_type, "text/plain");
789        assert_eq!(parsed[1].header.labels.get("x").unwrap(), "y");
790
791        assert_eq!(parsed[2].header.content_type, "text/plain");
792        assert_eq!(parsed[2].header.labels.get("x").unwrap(), "z");
793    }
794
795    #[test]
796    fn test_parse_batched_headers_with_label_indexes() {
797        let mut headers = HeaderMap::new();
798        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
799        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
800        headers.insert(LABELS_HEADER, HeaderValue::from_static("a,b"));
801
802        headers.insert(
803            make_batched_header_name(0, 0),
804            HeaderValue::from_static("10,text/plain,0=1,1=2"),
805        );
806        headers.insert(
807            make_batched_header_name(0, 5),
808            HeaderValue::from_static("2,,0="), // remove label a
809        );
810
811        let parsed = parse_batched_headers(&headers).unwrap();
812        assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
813        assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
814        assert!(!parsed[1].header.labels.contains_key("a"));
815        assert_eq!(parsed[1].header.labels.get("b").unwrap(), "2");
816    }
817
818    #[test]
819    fn test_label_delta_removal() {
820        let mut headers = HeaderMap::new();
821        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
822        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
823        headers.insert(
824            make_batched_header_name(0, 0),
825            HeaderValue::from_static("10,text/plain,a=1,b=2"),
826        );
827        headers.insert(
828            make_batched_header_name(0, 5),
829            HeaderValue::from_static("5,text/plain,b="),
830        );
831
832        let parsed = parse_batched_headers(&headers).unwrap();
833        assert_eq!(parsed.len(), 2);
834        assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
835        assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
836        assert_eq!(parsed[1].header.labels.get("a").unwrap(), "1");
837        assert!(!parsed[1].header.labels.contains_key("b"));
838    }
839
840    #[test]
841    fn test_parse_batched_headers_reuse_without_prev_error() {
842        let mut headers = HeaderMap::new();
843        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
844        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
845        headers.insert(
846            make_batched_header_name(0, 0),
847            HeaderValue::from_static("10"),
848        ); // no previous metadata
849
850        let err = parse_batched_headers(&headers).err().unwrap();
851        assert_eq!(
852            err,
853            unprocessable_entity!(
854                "Content-type and labels must be provided for the first record of an entry"
855            )
856        );
857    }
858
859    #[test]
860    fn test_parse_batched_headers_invalid_index() {
861        let mut headers = HeaderMap::new();
862        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
863        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
864        headers.insert(
865            make_batched_header_name(1, 0),
866            HeaderValue::from_static("1,text/plain"),
867        );
868
869        let err = parse_batched_headers(&headers).err().unwrap();
870        assert_eq!(
871            err,
872            unprocessable_entity!("Invalid header 'x-reduct-1-0': entry index out of range")
873        );
874    }
875
876    #[test]
877    fn test_parse_batched_headers_missing_meta() {
878        let mut headers = HeaderMap::new();
879        headers.insert(
880            make_batched_header_name(0, 0),
881            HeaderValue::from_static("1,text/plain"),
882        );
883
884        let err = parse_batched_headers(&headers).err().unwrap();
885        assert_eq!(
886            err,
887            unprocessable_entity!("x-reduct-start-ts header is required")
888        );
889    }
890
891    #[test]
892    fn test_parse_label_delta_updates_and_removals() {
893        let label_names = vec!["a".to_string(), "b".to_string()];
894        let (updates, remove) =
895            parse_label_delta("0=one,1=,c=\"3,4\"", Some(&label_names)).unwrap();
896
897        assert_eq!(updates.get("a").unwrap(), "one");
898        assert_eq!(updates.get("c").unwrap(), "3,4");
899        assert!(remove.contains("b"));
900        assert_eq!(remove.len(), 1);
901    }
902
903    #[test]
904    fn test_resolve_label_name_reserved_prefix() {
905        let err = resolve_label_name("@cpu", None).err().unwrap();
906        assert_eq!(
907            err,
908            unprocessable_entity!(
909                "Label names must not start with '@': reserved for computed labels",
910            )
911        );
912    }
913
914    #[test]
915    fn test_resolve_label_name_index_out_of_range() {
916        let label_names = vec!["a".to_string()];
917        let err = resolve_label_name("2", Some(&label_names)).err().unwrap();
918        assert_eq!(
919            err,
920            unprocessable_entity!("Label index '2' is out of range")
921        );
922    }
923
924    #[test]
925    fn test_make_error_batched_header() {
926        let err = unprocessable_entity!("broken");
927        let (name, value) = make_error_batched_header(2, 10, &err);
928
929        assert_eq!(name.as_str(), "x-reduct-error-2-10");
930        assert_eq!(
931            value.to_str().unwrap(),
932            format!("{},{}", err.status(), err.message())
933        );
934    }
935
936    #[cfg(feature = "io")]
937    mod io_tests {
938        use super::*;
939        use std::collections::HashMap;
940
941        fn build_meta(
942            labels: &[(&str, &str)],
943            computed: &[(&str, &str)],
944            content_type: &str,
945            content_length: u64,
946        ) -> RecordMeta {
947            let mut label_map = HashMap::new();
948            for (key, value) in labels {
949                label_map.insert((*key).to_string(), (*value).to_string());
950            }
951
952            let mut computed_map = HashMap::new();
953            for (key, value) in computed {
954                computed_map.insert((*key).to_string(), (*value).to_string());
955            }
956
957            RecordMeta::builder()
958                .labels(label_map)
959                .computed_labels(computed_map)
960                .content_type(content_type.to_string())
961                .content_length(content_length)
962                .build()
963        }
964
965        #[test]
966        fn test_build_label_delta_with_previous_and_computed() {
967            let mut label_index = LabelIndex::default();
968            label_index.ensure("a");
969            label_index.ensure("b");
970            label_index.ensure("c");
971            label_index.ensure("@cpu");
972
973            let mut previous = Labels::new();
974            previous.insert("a".to_string(), "1".to_string());
975            previous.insert("b".to_string(), "2".to_string());
976
977            let meta = build_meta(&[("a", "1"), ("c", "3,4")], &[("cpu", "10")], "text", 1);
978
979            let delta = build_label_delta(&meta, Some(&previous), &mut label_index);
980            assert_eq!(delta, "1=,2=\"3,4\",3=10");
981        }
982
983        #[test]
984        fn test_make_record_header_value_reuse_metadata() {
985            let mut label_index = LabelIndex::default();
986            label_index.ensure("a");
987
988            let mut previous = Labels::new();
989            previous.insert("a".to_string(), "1".to_string());
990
991            let meta = build_meta(&[("a", "1")], &[], "text/plain", 8);
992            let value = make_record_header_value(
993                &meta,
994                Some("text/plain"),
995                Some(&previous),
996                &mut label_index,
997            );
998
999            assert_eq!(value.to_str().unwrap(), "8");
1000        }
1001
1002        #[test]
1003        fn test_make_record_header_value_with_label_delta() {
1004            let mut label_index = LabelIndex::default();
1005            label_index.ensure("a");
1006
1007            let previous = Labels::new();
1008            let meta = build_meta(&[("a", "1")], &[], "text/plain", 10);
1009            let value = make_record_header_value(
1010                &meta,
1011                Some("text/plain"),
1012                Some(&previous),
1013                &mut label_index,
1014            );
1015
1016            assert_eq!(value.to_str().unwrap(), "10,,0=1");
1017        }
1018    }
1019}