Skip to main content

reduct_base/batch/
v2.rs

1// Copyright 2021-2026 ReductSoftware UG
2// Licensed under the Apache License, Version 2.0
3//
4// Batched protocol v2
5// -------------------
6// Metadata:
7//   x-reduct-entries: comma-separated, percent-encoded entry names (index == position)
8//   x-reduct-labels (optional): comma-separated, percent-encoded label names (index == position)
9//   x-reduct-start-ts: first timestamp in the batch
10//   Content is sorted by entry index in ascending order, then by timestamp (start + delta).
11//
12// Per record:
13//   x-reduct-<ENTRY-INDEX>-<TIME-DELTA-uS>: batched header value
14//   (error responses use the same suffix: x-reduct-error-<ENTRY-INDEX>-<TIME-DELTA-uS>)
15//
16// Header value rules (optimised to avoid repetition):
17//   - Always includes content-length.
18//   - Metadata can be omitted or sent as deltas per entry:
19//       * "123" -> reuse previous content-type and labels for the entry.
20//       * "123,<ct>" -> reuse previous labels; set content-type to <ct>.
21//       * "123,,<label-delta>" -> reuse previous content-type; apply label delta.
22//       * "123,<ct>,<label-delta>" -> explicit content-type and label delta.
23//   - Label delta sends only changed labels; unset a label with `k=`. Unmentioned labels
24//     keep their previous value for the entry. When `x-reduct-labels` is provided, the
25//     label delta may use indexes instead of names (`<IDX>=<VALUE>`), where the index
26//     refers to the position in `x-reduct-labels`.
27//   - The first record for an entry must provide content-type and labels when re-use is
28//     requested; otherwise defaults apply (content-type defaults to octet-stream, labels to empty).
29
30use crate::batch::v1::RecordHeader;
31use crate::error::ReductError;
32#[cfg(feature = "io")]
33use crate::io::RecordMeta;
34use crate::unprocessable_entity;
35use crate::Labels;
36use http::{HeaderMap, HeaderName, HeaderValue};
37use std::collections::{HashMap, HashSet};
38use std::str::FromStr;
39
40pub const HEADER_PREFIX: &str = "x-reduct-";
41pub const ERROR_HEADER_PREFIX: &str = "x-reduct-error-";
42pub const ENTRIES_HEADER: &str = "x-reduct-entries";
43pub const START_TS_HEADER: &str = "x-reduct-start-ts";
44pub const LABELS_HEADER: &str = "x-reduct-labels";
45
46pub const QUERY_ID_HEADER: &str = "x-reduct-query-id";
47
48/// Represents a parsed batched header that includes the entry name and timestamp.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub struct EntryRecordHeader {
51    pub entry: String,
52    pub timestamp: u64,
53    pub header: RecordHeader,
54}
55
56/// Represents parsed label update operations for a single record in batch update requests.
57#[derive(Debug, Clone, PartialEq, Eq)]
58pub struct EntryLabelUpdateHeader {
59    pub entry: String,
60    pub timestamp: u64,
61    pub update: Labels,
62    pub remove: HashSet<String>,
63}
64
65fn is_tchar(byte: u8) -> bool {
66    byte.is_ascii_alphanumeric()
67        || matches!(
68            byte,
69            b'!' | b'#'
70                | b'$'
71                | b'%'
72                | b'&'
73                | b'\''
74                | b'*'
75                | b'+'
76                | b'-'
77                | b'.'
78                | b'^'
79                | b'_'
80                | b'`'
81                | b'|'
82                | b'~'
83        )
84}
85
86/// Percent-encode an entry name so it can be used in header values.
87pub fn encode_entry_name(entry: &str) -> String {
88    let mut encoded = String::with_capacity(entry.len());
89    for byte in entry.as_bytes() {
90        if is_tchar(*byte) {
91            encoded.push(*byte as char);
92        } else {
93            encoded.push_str(&format!("%{:02X}", byte));
94        }
95    }
96    encoded
97}
98
99/// Decode an entry name that was percent-encoded for use in headers.
100pub fn decode_entry_name(encoded: &str) -> Result<String, ReductError> {
101    let mut decoded = Vec::with_capacity(encoded.len());
102    let bytes = encoded.as_bytes();
103    let mut pos = 0;
104    while pos < bytes.len() {
105        match bytes[pos] {
106            b'%' => {
107                if pos + 2 >= bytes.len() {
108                    return Err(unprocessable_entity!(
109                        "Invalid entry encoding in header name: '{}'",
110                        encoded
111                    ));
112                }
113                let high = (bytes[pos + 1] as char).to_digit(16);
114                let low = (bytes[pos + 2] as char).to_digit(16);
115                if high.is_none() || low.is_none() {
116                    return Err(unprocessable_entity!(
117                        "Invalid entry encoding in header name: '{}'",
118                        encoded
119                    ));
120                }
121                decoded.push((high.unwrap() * 16 + low.unwrap()) as u8);
122                pos += 3;
123            }
124            other => {
125                decoded.push(other);
126                pos += 1;
127            }
128        }
129    }
130
131    String::from_utf8(decoded)
132        .map_err(|_| unprocessable_entity!("Entry name is not valid UTF-8 in header '{}'", encoded))
133}
134
135/// Parse the `x-reduct-entries` header containing a comma separated list of percent-encoded entries.
136pub fn parse_entries_header(entries: &HeaderValue) -> Result<Vec<String>, ReductError> {
137    let entries = entries
138        .to_str()
139        .map_err(|_| unprocessable_entity!("Invalid entries header"))?;
140    if entries.trim().is_empty() {
141        return Err(unprocessable_entity!("x-reduct-entries header is required"));
142    }
143
144    entries
145        .split(',')
146        .map(|entry| {
147            let entry = entry.trim();
148            if entry.is_empty() {
149                return Err(unprocessable_entity!(
150                    "x-reduct-entries header must not contain empty entry names"
151                ));
152            }
153
154            decode_entry_name(entry)
155        })
156        .collect()
157}
158
159/// Decode a percent-encoded label name.
160pub fn decode_label_name(encoded: &str) -> Result<String, ReductError> {
161    decode_entry_name(encoded)
162        .map_err(|_| unprocessable_entity!("Invalid label encoding in header value: '{}'", encoded))
163}
164
165/// Percent-encode a label name.
166pub fn encode_label_name(label: &str) -> String {
167    encode_entry_name(label)
168}
169
170/// Parse the `x-reduct-labels` header containing a comma separated list of percent-encoded labels.
171pub fn parse_labels_header(labels: &HeaderValue) -> Result<Vec<String>, ReductError> {
172    let labels = labels
173        .to_str()
174        .map_err(|_| unprocessable_entity!("Invalid labels header"))?;
175    if labels.trim().is_empty() {
176        return Err(unprocessable_entity!("x-reduct-labels header is empty"));
177    }
178
179    labels
180        .split(',')
181        .map(|label| decode_label_name(label.trim()))
182        .collect()
183}
184
185/// Keeps label names unique and provides the value for the `x-reduct-labels` header.
186#[derive(Debug, Default, Clone)]
187pub struct LabelIndex {
188    names: Vec<String>,
189    lookup: HashMap<String, usize>,
190}
191
192impl LabelIndex {
193    /// Returns the index of the label name, inserting it if missing.
194    pub fn ensure(&mut self, name: &str) -> usize {
195        if let Some(idx) = self.lookup.get(name) {
196            return *idx;
197        }
198
199        let idx = self.names.len();
200        self.names.push(name.to_string());
201        self.lookup.insert(name.to_string(), idx);
202        idx
203    }
204
205    /// Returns the header value for `x-reduct-labels` if at least one label was registered.
206    pub fn as_header(&self) -> Option<HeaderValue> {
207        if self.names.is_empty() {
208            return None;
209        }
210
211        let encoded = self
212            .names
213            .iter()
214            .map(|name| encode_label_name(name))
215            .collect::<Vec<_>>()
216            .join(",");
217        Some(encoded.parse().unwrap())
218    }
219
220    /// Returns the collected label names.
221    pub fn names(&self) -> &[String] {
222        &self.names
223    }
224}
225
226/// Build a label delta string for batched protocol v2.
227#[cfg(feature = "io")]
228pub fn build_label_delta(
229    meta: &RecordMeta,
230    previous_labels: Option<&Labels>,
231    label_index: &mut LabelIndex,
232) -> String {
233    let mut deltas: Vec<(usize, String)> = Vec::new();
234
235    let format_value = |value: &str| {
236        if value.contains(',') {
237            format!("\"{}\"", value)
238        } else {
239            value.to_string()
240        }
241    };
242
243    if let Some(prev) = previous_labels {
244        let mut keys: Vec<String> = prev
245            .keys()
246            .chain(meta.labels().keys())
247            .map(|k| k.to_string())
248            .collect();
249        keys.sort();
250        keys.dedup();
251
252        for key in keys {
253            let prev_val = prev.get(&key);
254            let curr_val = meta.labels().get(&key);
255            match (prev_val, curr_val) {
256                (Some(p), Some(c)) if p == c => continue,
257                (Some(_), None) => {
258                    let idx = label_index.ensure(&key);
259                    deltas.push((idx, String::new()))
260                }
261                (_, Some(c)) => {
262                    let idx = label_index.ensure(&key);
263                    deltas.push((idx, format_value(c)))
264                }
265                _ => {}
266            }
267        }
268    } else {
269        for (k, v) in meta.labels().iter() {
270            let idx = label_index.ensure(k);
271            deltas.push((idx, format_value(v)));
272        }
273    }
274
275    for (k, v) in meta.computed_labels() {
276        let idx = label_index.ensure(&format!("@{}", k));
277        deltas.push((idx, format_value(v)));
278    }
279
280    deltas.sort_by_key(|(idx, _)| *idx);
281    deltas
282        .into_iter()
283        .map(|(idx, value)| format!("{}={}", idx, value))
284        .collect::<Vec<_>>()
285        .join(",")
286}
287
288/// Construct a record header value for batched protocol v2.
289#[cfg(feature = "io")]
290pub fn make_record_header_value(
291    meta: &RecordMeta,
292    previous_content_type: Option<&str>,
293    previous_labels: Option<&Labels>,
294    label_index: &mut LabelIndex,
295) -> HeaderValue {
296    let mut parts: Vec<String> = vec![meta.content_length().to_string()];
297
298    let mut content_type = String::new();
299    match previous_content_type {
300        Some(prev) if prev != meta.content_type() => content_type = meta.content_type().to_string(),
301        None => content_type = meta.content_type().to_string(),
302        _ => {}
303    }
304
305    let labels_delta = build_label_delta(meta, previous_labels, label_index);
306    let has_labels = !labels_delta.is_empty();
307
308    if !content_type.is_empty() || has_labels {
309        parts.push(content_type);
310    }
311
312    if has_labels {
313        parts.push(labels_delta);
314    }
315
316    parts.join(",").parse().unwrap()
317}
318
319/// Create a header name for batched protocol v2: `x-reduct-<ENTRY-INDEX>-<TIME-DELTA>`.
320pub fn make_batched_header_name(entry_index: usize, time_delta: u64) -> HeaderName {
321    HeaderName::from_str(&format!("{}{}-{}", HEADER_PREFIX, entry_index, time_delta))
322        .expect("Entry index and time delta must produce a valid header name")
323}
324
325/// Parse a v2 batched header name (`x-reduct-<ENTRY-INDEX>-<TIME-DELTA>`) into index and delta.
326pub fn parse_batched_header_name(name: &str) -> Result<(usize, u64), ReductError> {
327    if !name.starts_with(HEADER_PREFIX) {
328        return Err(unprocessable_entity!("Invalid batched header '{}'", name));
329    }
330
331    let without_prefix = &name[HEADER_PREFIX.len()..];
332    let (entry_index, delta) = without_prefix
333        .rsplit_once('-')
334        .ok_or(unprocessable_entity!("Invalid batched header '{}'", name))?;
335
336    let entry_index: usize = entry_index.parse().map_err(|_| {
337        unprocessable_entity!("Invalid header '{}': entry index must be a number", name)
338    })?;
339
340    let delta = delta.parse::<u64>().map_err(|_| {
341        unprocessable_entity!(
342            "Invalid header '{}': must be an unix timestamp in microseconds",
343            name
344        )
345    })?;
346
347    Ok((entry_index, delta))
348}
349
350pub fn make_error_batched_header(
351    entry_index: usize,
352    time_delta: u64,
353    err: &ReductError,
354) -> (HeaderName, HeaderValue) {
355    let name = HeaderName::from_str(&format!(
356        "{}{}-{}",
357        ERROR_HEADER_PREFIX, entry_index, time_delta
358    ))
359    .expect("Entry index and time delta must produce a valid header name");
360    let value = HeaderValue::from_str(&format!("{},{}", err.status(), err.message()))
361        .expect("Status code and message must produce a valid header value");
362    (name, value)
363}
364
365#[inline]
366pub fn make_entries_header(entries: &[String]) -> HeaderValue {
367    let encoded = entries
368        .iter()
369        .map(|entry| encode_entry_name(entry))
370        .collect::<Vec<_>>()
371        .join(",");
372    encoded.parse().unwrap()
373}
374
375#[inline]
376pub fn make_start_timestamp_header(start_ts: u64) -> HeaderValue {
377    HeaderValue::from_str(&start_ts.to_string()).unwrap()
378}
379
380fn parse_start_timestamp_internal(headers: &HeaderMap) -> Result<u64, ReductError> {
381    headers
382        .get(START_TS_HEADER)
383        .ok_or(unprocessable_entity!(
384            "x-reduct-start-ts header is required"
385        ))?
386        .to_str()
387        .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))?
388        .parse::<u64>()
389        .map_err(|_| unprocessable_entity!("Invalid x-reduct-start-ts header"))
390}
391
392pub fn parse_start_timestamp(headers: &HeaderMap) -> Result<u64, ReductError> {
393    parse_start_timestamp_internal(headers)
394}
395
396pub fn parse_entries(headers: &HeaderMap) -> Result<Vec<String>, ReductError> {
397    if let Some(entries) = headers.get(ENTRIES_HEADER) {
398        parse_entries_header(entries)
399    } else {
400        Ok(Vec::new())
401    }
402}
403
404pub fn parse_labels(headers: &HeaderMap) -> Result<Option<Vec<String>>, ReductError> {
405    match headers.get(LABELS_HEADER) {
406        None => Ok(None),
407        Some(labels) => parse_labels_header(labels).map(Some),
408    }
409}
410
411pub fn resolve_label_name<'a>(
412    raw: &'a str,
413    label_names: Option<&Vec<String>>,
414) -> Result<String, ReductError> {
415    if let (Some(label_names), Ok(idx)) = (label_names, raw.parse::<usize>()) {
416        return label_names
417            .get(idx)
418            .cloned()
419            .ok_or_else(|| unprocessable_entity!("Label index '{}' is out of range", raw));
420    }
421
422    if raw.starts_with('@') {
423        return Err(unprocessable_entity!(
424            "Label names must not start with '@': reserved for computed labels",
425        ));
426    }
427
428    Ok(raw.to_string())
429}
430
431fn apply_label_delta(
432    raw_labels: &str,
433    base: &Labels,
434    label_names: Option<&Vec<String>>,
435) -> Result<Labels, ReductError> {
436    let mut labels = base.clone();
437    for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
438        match value {
439            Some(value) => {
440                labels.insert(key.to_string(), value);
441            }
442            None => {
443                labels.remove(&key);
444            }
445        }
446    }
447
448    Ok(labels)
449}
450
451fn parse_label_delta_ops(
452    raw_labels: &str,
453    label_names: Option<&Vec<String>>,
454) -> Result<Vec<(String, Option<String>)>, ReductError> {
455    let mut ops = Vec::new();
456    let mut rest = raw_labels.trim().to_string();
457
458    if rest.is_empty() {
459        return Ok(ops);
460    }
461
462    loop {
463        let (raw_key, value_part) = rest
464            .split_once('=')
465            .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
466        let key = resolve_label_name(raw_key.trim(), label_names)?;
467
468        let (value, next_rest) = if value_part.starts_with('\"') {
469            let value_part = &value_part[1..];
470            let (value, rest) = value_part
471                .split_once('\"')
472                .ok_or_else(|| unprocessable_entity!("Invalid batched header"))?;
473            (
474                value.trim().to_string(),
475                rest.trim_start_matches(',').trim().to_string(),
476            )
477        } else if let Some((value, rest)) = value_part.split_once(',') {
478            (value.trim().to_string(), rest.trim().to_string())
479        } else {
480            (value_part.trim().to_string(), String::new())
481        };
482
483        let value = if value.is_empty() { None } else { Some(value) };
484        ops.push((key, value));
485
486        if next_rest.is_empty() {
487            break;
488        }
489        rest = next_rest;
490    }
491
492    Ok(ops)
493}
494
495pub fn parse_label_delta(
496    raw_labels: &str,
497    label_names: Option<&Vec<String>>,
498) -> Result<(Labels, HashSet<String>), ReductError> {
499    let mut updates = Labels::new();
500    let mut remove = HashSet::new();
501
502    for (key, value) in parse_label_delta_ops(raw_labels, label_names)? {
503        match value {
504            Some(value) => {
505                updates.insert(key, value);
506            }
507            None => {
508                remove.insert(key);
509            }
510        }
511    }
512
513    Ok((updates, remove))
514}
515
516fn parse_record_header_with_defaults(
517    raw: &str,
518    previous: Option<&RecordHeader>,
519    label_names: Option<&Vec<String>>,
520) -> Result<RecordHeader, ReductError> {
521    let (content_length_str, rest_opt) = raw
522        .split_once(',')
523        .map(|(len, rest)| (len.trim(), Some(rest)))
524        .unwrap_or((raw.trim(), None));
525
526    let content_length = content_length_str
527        .parse::<u64>()
528        .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
529
530    if rest_opt.is_none() {
531        let prev = previous.ok_or_else(|| {
532            unprocessable_entity!(
533                "Content-type and labels must be provided for the first record of an entry"
534            )
535        })?;
536        return Ok(RecordHeader {
537            content_length,
538            content_type: prev.content_type.clone(),
539            labels: prev.labels.clone(),
540        });
541    }
542
543    let rest = rest_opt.unwrap();
544    let (content_type_raw, labels_raw) = match rest.split_once(',') {
545        Some((ct, labels)) => (ct, Some(labels)),
546        None => (rest, None),
547    };
548
549    let content_type = if !content_type_raw.trim().is_empty() {
550        content_type_raw.trim().to_string()
551    } else if let Some(prev) = previous {
552        prev.content_type.clone()
553    } else {
554        "application/octet-stream".to_string()
555    };
556
557    let labels = match labels_raw {
558        None => previous
559            .map(|prev| prev.labels.clone())
560            .unwrap_or_else(HashMap::new),
561        Some(raw_labels) => apply_label_delta(
562            raw_labels,
563            previous.map(|prev| &prev.labels).unwrap_or(&HashMap::new()),
564            label_names,
565        )?,
566    };
567
568    Ok(RecordHeader {
569        content_length,
570        content_type,
571        labels,
572    })
573}
574
575/// Sort and parse v2 batched headers in a header map.
576///
577/// Only headers following the `x-reduct-<ENTRY-INDEX>-<TIME-DELTA>` pattern are considered.
578/// Headers are sorted by entry index, then timestamp (start + delta).
579pub fn sort_headers_by_entry_and_time(
580    headers: &HeaderMap,
581) -> Result<Vec<(usize, u64, HeaderValue)>, ReductError> {
582    let mut parsed_headers: Vec<(usize, u64, HeaderValue)> = headers
583        .clone()
584        .into_iter()
585        .filter(|(name, _)| name.is_some())
586        .map(|(name, value)| (name.unwrap().to_string(), value))
587        .filter(|(name, _)| name.starts_with(HEADER_PREFIX))
588        .filter(|(name, _)| name.rsplit_once('-').is_some())
589        .filter(|(name, _)| {
590            name.rsplit_once('-')
591                .map(|(_, ts)| ts.chars().all(|ch| ch.is_ascii_digit()))
592                .unwrap_or(false)
593        })
594        .map(|(name, value)| {
595            let (entry_index, time_delta) = parse_batched_header_name(&name)?;
596            Ok((entry_index, time_delta, value))
597        })
598        .collect::<Result<_, ReductError>>()?;
599
600    parsed_headers.sort_by(|(idx_a, delta_a, _), (idx_b, delta_b, _)| {
601        idx_a.cmp(idx_b).then_with(|| delta_a.cmp(delta_b))
602    });
603
604    Ok(parsed_headers)
605}
606
607/// Parse and sort v2 batched headers including record metadata.
608pub fn parse_batched_headers(headers: &HeaderMap) -> Result<Vec<EntryRecordHeader>, ReductError> {
609    let entries = parse_entries(headers)?;
610    let start_ts = parse_start_timestamp(headers)?;
611    let label_names = parse_labels(headers)?;
612    let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
613    let mut result = Vec::new();
614
615    for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
616        let entry = entries.get(entry_index).ok_or_else(|| {
617            unprocessable_entity!(
618                "Invalid header '{}{}-{}': entry index out of range",
619                HEADER_PREFIX,
620                entry_index,
621                delta
622            )
623        })?;
624
625        let raw_value = value
626            .to_str()
627            .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
628
629        let header = parse_record_header_with_defaults(
630            raw_value,
631            last_header_per_entry.get(entry),
632            label_names.as_ref(),
633        )?;
634        let timestamp = start_ts + delta;
635
636        last_header_per_entry.insert(entry.clone(), header.clone());
637
638        result.push(EntryRecordHeader {
639            entry: entry.clone(),
640            timestamp,
641            header,
642        });
643    }
644
645    Ok(result)
646}
647
648/// Parse and sort v2 batched headers into label update operations.
649///
650/// The parser preserves explicit removals from deltas (for example `k=`), while still applying
651/// per-entry state re-use between subsequent records in the same batch.
652pub fn parse_batched_update_headers(
653    headers: &HeaderMap,
654) -> Result<Vec<EntryLabelUpdateHeader>, ReductError> {
655    let entries = parse_entries(headers)?;
656    let start_ts = parse_start_timestamp(headers)?;
657    let label_names = parse_labels(headers)?;
658    let mut last_header_per_entry: HashMap<String, RecordHeader> = HashMap::new();
659    let mut entry_state: HashMap<String, HashMap<String, Option<String>>> = HashMap::new();
660    let mut result = Vec::new();
661
662    for (entry_index, delta, value) in sort_headers_by_entry_and_time(headers)? {
663        let entry = entries.get(entry_index).ok_or_else(|| {
664            unprocessable_entity!(
665                "Invalid header '{}{}-{}': entry index out of range",
666                HEADER_PREFIX,
667                entry_index,
668                delta
669            )
670        })?;
671
672        let raw_value = value
673            .to_str()
674            .map_err(|_| unprocessable_entity!("Invalid batched header"))?;
675        let parsed_header = parse_record_header_with_defaults(
676            raw_value,
677            last_header_per_entry.get(entry),
678            label_names.as_ref(),
679        )?;
680        last_header_per_entry.insert(entry.clone(), parsed_header);
681
682        if let Some(raw_labels) = raw_value.splitn(3, ',').nth(2) {
683            let (label_updates, label_removals) =
684                parse_label_delta(raw_labels, label_names.as_ref())?;
685            let state = entry_state.entry(entry.clone()).or_default();
686            for (key, value) in label_updates {
687                state.insert(key, Some(value));
688            }
689            for key in label_removals {
690                state.insert(key, None);
691            }
692        }
693
694        let state = entry_state.entry(entry.clone()).or_default();
695        let mut update = Labels::new();
696        let mut remove = HashSet::new();
697        for (key, value) in state {
698            match value {
699                Some(value) => {
700                    update.insert(key.clone(), value.clone());
701                }
702                None => {
703                    remove.insert(key.clone());
704                }
705            }
706        }
707
708        result.push(EntryLabelUpdateHeader {
709            entry: entry.clone(),
710            timestamp: start_ts + delta,
711            update,
712            remove,
713        });
714    }
715
716    Ok(result)
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722    use http::HeaderValue;
723
724    #[test]
725    fn test_encode_entry_name_slash() {
726        assert_eq!(encode_entry_name("ro/topic/1"), "ro%2Ftopic%2F1");
727    }
728
729    #[test]
730    fn test_encode_entry_name_safe_chars() {
731        assert_eq!(encode_entry_name("entry-1_foo~bar"), "entry-1_foo~bar");
732    }
733
734    #[test]
735    fn test_decode_entry_name_roundtrip() {
736        let entry = "mqtt/topic/1";
737        let encoded = encode_entry_name(entry);
738        assert_eq!(decode_entry_name(&encoded).unwrap(), entry);
739    }
740
741    #[test]
742    fn test_decode_entry_name_invalid_percent() {
743        let err = decode_entry_name("foo%ZZ").err().unwrap();
744        assert_eq!(
745            err,
746            unprocessable_entity!("Invalid entry encoding in header name: 'foo%ZZ'")
747        );
748    }
749
750    #[test]
751    fn test_parse_entries_header_roundtrip() {
752        let value = HeaderValue::from_str("sensor,ro%2Ftopic").unwrap();
753        let entries = parse_entries_header(&value).unwrap();
754        assert_eq!(entries, vec!["sensor".to_string(), "ro/topic".to_string()]);
755    }
756
757    #[test]
758    fn test_parse_entries_header_rejects_empty_entry() {
759        let value = HeaderValue::from_str("sensor,,ro%2Ftopic").unwrap();
760        let err = parse_entries_header(&value).err().unwrap();
761        assert_eq!(
762            err,
763            unprocessable_entity!("x-reduct-entries header must not contain empty entry names")
764        );
765    }
766
767    #[test]
768    fn test_parse_labels_header_roundtrip() {
769        let value = HeaderValue::from_str("label-1,foo%2Fbar").unwrap();
770        let labels = parse_labels_header(&value).unwrap();
771        assert_eq!(labels, vec!["label-1".to_string(), "foo/bar".to_string()]);
772    }
773
774    #[test]
775    fn test_parse_batched_header_name_basic() {
776        let (entry_index, delta) = parse_batched_header_name("x-reduct-1-123").unwrap();
777        assert_eq!(entry_index, 1);
778        assert_eq!(delta, 123);
779    }
780
781    #[test]
782    fn test_parse_batched_header_name_invalid_time() {
783        let err = parse_batched_header_name("x-reduct-1-abc").err().unwrap();
784        assert_eq!(
785            err,
786            unprocessable_entity!(
787                "Invalid header '{}': must be an unix timestamp in microseconds",
788                "x-reduct-1-abc"
789            )
790        );
791    }
792
793    #[test]
794    fn test_sort_headers_by_entry_and_time() {
795        let mut headers = HeaderMap::new();
796        headers.insert(
797            ENTRIES_HEADER,
798            HeaderValue::from_static("sensor,ro%2Ftopic"),
799        );
800        headers.insert(START_TS_HEADER, HeaderValue::from_static("10"));
801        headers.insert(
802            make_batched_header_name(1, 5),
803            HeaderValue::from_static("1,text/plain"),
804        );
805        headers.insert(
806            make_batched_header_name(0, 2),
807            HeaderValue::from_static("1,text/plain"),
808        );
809        headers.insert(
810            make_batched_header_name(0, 3),
811            HeaderValue::from_static("1,text/plain"),
812        );
813        headers.insert(
814            make_batched_header_name(1, 3),
815            HeaderValue::from_static("1,text/plain"),
816        );
817
818        let parsed = sort_headers_by_entry_and_time(&headers).unwrap();
819        assert_eq!(parsed.len(), 4);
820        assert_eq!(parsed[0].0, 0);
821        assert_eq!(parsed[0].1, 2);
822        assert_eq!(parsed[1].0, 0);
823        assert_eq!(parsed[1].1, 3);
824        assert_eq!(parsed[2].0, 1);
825        assert_eq!(parsed[2].1, 3);
826        assert_eq!(parsed[3].0, 1);
827        assert_eq!(parsed[3].1, 5);
828    }
829
830    #[test]
831    fn test_parse_batched_headers_with_values() {
832        let mut headers = HeaderMap::new();
833        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry,ro%2Ftopic"));
834        headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
835        headers.insert(LABELS_HEADER, HeaderValue::from_static("label"));
836        headers.insert(
837            make_batched_header_name(1, 15),
838            HeaderValue::from_static("3,text/plain,0=z"),
839        );
840        headers.insert(
841            make_batched_header_name(0, 10),
842            HeaderValue::from_static("5,text/csv,label=value"),
843        );
844
845        let parsed = parse_batched_headers(&headers).unwrap();
846        assert_eq!(parsed.len(), 2);
847
848        assert_eq!(parsed[0].entry, "entry");
849        assert_eq!(parsed[0].timestamp, 1010);
850        assert_eq!(parsed[0].header.content_length, 5);
851        assert_eq!(parsed[0].header.content_type, "text/csv");
852        assert_eq!(parsed[0].header.labels.get("label").unwrap(), "value");
853
854        assert_eq!(parsed[1].entry, "ro/topic");
855        assert_eq!(parsed[1].timestamp, 1015);
856        assert_eq!(parsed[1].header.content_length, 3);
857        assert_eq!(parsed[1].header.content_type, "text/plain");
858        assert_eq!(parsed[1].header.labels.get("label").unwrap(), "z");
859    }
860
861    #[test]
862    fn test_parse_batched_headers_reuse_metadata() {
863        let mut headers = HeaderMap::new();
864        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
865        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
866        headers.insert(LABELS_HEADER, HeaderValue::from_static("x"));
867        headers.insert(
868            make_batched_header_name(0, 0),
869            HeaderValue::from_static("10,text/plain,0=y"),
870        );
871        headers.insert(
872            make_batched_header_name(0, 5),
873            HeaderValue::from_static("2"),
874        ); // reuse type + labels
875        headers.insert(
876            make_batched_header_name(0, 10),
877            HeaderValue::from_static("3,,0=z"),
878        ); // reuse type, override labels
879
880        let parsed = parse_batched_headers(&headers).unwrap();
881        assert_eq!(parsed.len(), 3);
882        assert_eq!(parsed[0].header.content_type, "text/plain");
883        assert_eq!(parsed[0].header.labels.get("x").unwrap(), "y");
884
885        assert_eq!(parsed[1].header.content_type, "text/plain");
886        assert_eq!(parsed[1].header.labels.get("x").unwrap(), "y");
887
888        assert_eq!(parsed[2].header.content_type, "text/plain");
889        assert_eq!(parsed[2].header.labels.get("x").unwrap(), "z");
890    }
891
892    #[test]
893    fn test_parse_batched_headers_with_label_indexes() {
894        let mut headers = HeaderMap::new();
895        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
896        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
897        headers.insert(LABELS_HEADER, HeaderValue::from_static("a,b"));
898
899        headers.insert(
900            make_batched_header_name(0, 0),
901            HeaderValue::from_static("10,text/plain,0=1,1=2"),
902        );
903        headers.insert(
904            make_batched_header_name(0, 5),
905            HeaderValue::from_static("2,,0="), // remove label a
906        );
907
908        let parsed = parse_batched_headers(&headers).unwrap();
909        assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
910        assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
911        assert!(!parsed[1].header.labels.contains_key("a"));
912        assert_eq!(parsed[1].header.labels.get("b").unwrap(), "2");
913    }
914
915    #[test]
916    fn test_parse_batched_update_headers_preserves_removal_ops() {
917        let mut headers = HeaderMap::new();
918        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
919        headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
920        headers.insert(
921            make_batched_header_name(0, 0),
922            HeaderValue::from_static("0,,a=\"hello,world\",b="),
923        );
924
925        let parsed = parse_batched_update_headers(&headers).unwrap();
926        assert_eq!(parsed.len(), 1);
927        assert_eq!(parsed[0].entry, "entry");
928        assert_eq!(parsed[0].timestamp, 1000);
929        assert_eq!(parsed[0].update.get("a").unwrap(), "hello,world");
930        assert!(parsed[0].remove.contains("b"));
931    }
932
933    #[test]
934    fn test_parse_batched_update_headers_reuses_entry_state() {
935        let mut headers = HeaderMap::new();
936        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
937        headers.insert(START_TS_HEADER, HeaderValue::from_static("1000"));
938        headers.insert(LABELS_HEADER, HeaderValue::from_static("key,remove"));
939        headers.insert(
940            make_batched_header_name(0, 0),
941            HeaderValue::from_static("0,,0=meta-1,1=true"),
942        );
943        headers.insert(
944            make_batched_header_name(0, 1),
945            HeaderValue::from_static("0,,0=meta-2"),
946        );
947
948        let parsed = parse_batched_update_headers(&headers).unwrap();
949        assert_eq!(parsed.len(), 2);
950        assert_eq!(parsed[0].update.get("key").unwrap(), "meta-1");
951        assert_eq!(parsed[0].update.get("remove").unwrap(), "true");
952        assert!(parsed[0].remove.is_empty());
953
954        assert_eq!(parsed[1].update.get("key").unwrap(), "meta-2");
955        assert_eq!(parsed[1].update.get("remove").unwrap(), "true");
956        assert!(parsed[1].remove.is_empty());
957    }
958
959    #[test]
960    fn test_label_delta_removal() {
961        let mut headers = HeaderMap::new();
962        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
963        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
964        headers.insert(
965            make_batched_header_name(0, 0),
966            HeaderValue::from_static("10,text/plain,a=1,b=2"),
967        );
968        headers.insert(
969            make_batched_header_name(0, 5),
970            HeaderValue::from_static("5,text/plain,b="),
971        );
972
973        let parsed = parse_batched_headers(&headers).unwrap();
974        assert_eq!(parsed.len(), 2);
975        assert_eq!(parsed[0].header.labels.get("a").unwrap(), "1");
976        assert_eq!(parsed[0].header.labels.get("b").unwrap(), "2");
977        assert_eq!(parsed[1].header.labels.get("a").unwrap(), "1");
978        assert!(!parsed[1].header.labels.contains_key("b"));
979    }
980
981    #[test]
982    fn test_parse_batched_headers_reuse_without_prev_error() {
983        let mut headers = HeaderMap::new();
984        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
985        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
986        headers.insert(
987            make_batched_header_name(0, 0),
988            HeaderValue::from_static("10"),
989        ); // no previous metadata
990
991        let err = parse_batched_headers(&headers).err().unwrap();
992        assert_eq!(
993            err,
994            unprocessable_entity!(
995                "Content-type and labels must be provided for the first record of an entry"
996            )
997        );
998    }
999
1000    #[test]
1001    fn test_parse_batched_headers_invalid_index() {
1002        let mut headers = HeaderMap::new();
1003        headers.insert(ENTRIES_HEADER, HeaderValue::from_static("entry"));
1004        headers.insert(START_TS_HEADER, HeaderValue::from_static("0"));
1005        headers.insert(
1006            make_batched_header_name(1, 0),
1007            HeaderValue::from_static("1,text/plain"),
1008        );
1009
1010        let err = parse_batched_headers(&headers).err().unwrap();
1011        assert_eq!(
1012            err,
1013            unprocessable_entity!("Invalid header 'x-reduct-1-0': entry index out of range")
1014        );
1015    }
1016
1017    #[test]
1018    fn test_parse_batched_headers_missing_meta() {
1019        let mut headers = HeaderMap::new();
1020        headers.insert(
1021            make_batched_header_name(0, 0),
1022            HeaderValue::from_static("1,text/plain"),
1023        );
1024
1025        let err = parse_batched_headers(&headers).err().unwrap();
1026        assert_eq!(
1027            err,
1028            unprocessable_entity!("x-reduct-start-ts header is required")
1029        );
1030    }
1031
1032    #[test]
1033    fn test_parse_label_delta_updates_and_removals() {
1034        let label_names = vec!["a".to_string(), "b".to_string()];
1035        let (updates, remove) =
1036            parse_label_delta("0=one,1=,c=\"3,4\"", Some(&label_names)).unwrap();
1037
1038        assert_eq!(updates.get("a").unwrap(), "one");
1039        assert_eq!(updates.get("c").unwrap(), "3,4");
1040        assert!(remove.contains("b"));
1041        assert_eq!(remove.len(), 1);
1042    }
1043
1044    #[test]
1045    fn test_resolve_label_name_reserved_prefix() {
1046        let err = resolve_label_name("@cpu", None).err().unwrap();
1047        assert_eq!(
1048            err,
1049            unprocessable_entity!(
1050                "Label names must not start with '@': reserved for computed labels",
1051            )
1052        );
1053    }
1054
1055    #[test]
1056    fn test_resolve_label_name_index_out_of_range() {
1057        let label_names = vec!["a".to_string()];
1058        let err = resolve_label_name("2", Some(&label_names)).err().unwrap();
1059        assert_eq!(
1060            err,
1061            unprocessable_entity!("Label index '2' is out of range")
1062        );
1063    }
1064
1065    #[test]
1066    fn test_make_error_batched_header() {
1067        let err = unprocessable_entity!("broken");
1068        let (name, value) = make_error_batched_header(2, 10, &err);
1069
1070        assert_eq!(name.as_str(), "x-reduct-error-2-10");
1071        assert_eq!(
1072            value.to_str().unwrap(),
1073            format!("{},{}", err.status(), err.message())
1074        );
1075    }
1076
1077    #[cfg(feature = "io")]
1078    mod io_tests {
1079        use super::*;
1080        use std::collections::HashMap;
1081
1082        fn build_meta(
1083            labels: &[(&str, &str)],
1084            computed: &[(&str, &str)],
1085            content_type: &str,
1086            content_length: u64,
1087        ) -> RecordMeta {
1088            let mut label_map = HashMap::new();
1089            for (key, value) in labels {
1090                label_map.insert((*key).to_string(), (*value).to_string());
1091            }
1092
1093            let mut computed_map = HashMap::new();
1094            for (key, value) in computed {
1095                computed_map.insert((*key).to_string(), (*value).to_string());
1096            }
1097
1098            RecordMeta::builder()
1099                .labels(label_map)
1100                .computed_labels(computed_map)
1101                .content_type(content_type.to_string())
1102                .content_length(content_length)
1103                .build()
1104        }
1105
1106        #[test]
1107        fn test_build_label_delta_with_previous_and_computed() {
1108            let mut label_index = LabelIndex::default();
1109            label_index.ensure("a");
1110            label_index.ensure("b");
1111            label_index.ensure("c");
1112            label_index.ensure("@cpu");
1113
1114            let mut previous = Labels::new();
1115            previous.insert("a".to_string(), "1".to_string());
1116            previous.insert("b".to_string(), "2".to_string());
1117
1118            let meta = build_meta(&[("a", "1"), ("c", "3,4")], &[("cpu", "10")], "text", 1);
1119
1120            let delta = build_label_delta(&meta, Some(&previous), &mut label_index);
1121            assert_eq!(delta, "1=,2=\"3,4\",3=10");
1122        }
1123
1124        #[test]
1125        fn test_make_record_header_value_reuse_metadata() {
1126            let mut label_index = LabelIndex::default();
1127            label_index.ensure("a");
1128
1129            let mut previous = Labels::new();
1130            previous.insert("a".to_string(), "1".to_string());
1131
1132            let meta = build_meta(&[("a", "1")], &[], "text/plain", 8);
1133            let value = make_record_header_value(
1134                &meta,
1135                Some("text/plain"),
1136                Some(&previous),
1137                &mut label_index,
1138            );
1139
1140            assert_eq!(value.to_str().unwrap(), "8");
1141        }
1142
1143        #[test]
1144        fn test_make_record_header_value_with_label_delta() {
1145            let mut label_index = LabelIndex::default();
1146            label_index.ensure("a");
1147
1148            let previous = Labels::new();
1149            let meta = build_meta(&[("a", "1")], &[], "text/plain", 10);
1150            let value = make_record_header_value(
1151                &meta,
1152                Some("text/plain"),
1153                Some(&previous),
1154                &mut label_index,
1155            );
1156
1157            assert_eq!(value.to_str().unwrap(), "10,,0=1");
1158        }
1159    }
1160}