Skip to main content

exoware_server/
reduce.rs

1//! Range aggregation over decoded KV rows (same semantics as the public `Reduce` RPC).
2
3use std::cmp::Ordering;
4use std::collections::BTreeMap;
5
6use bytes::Bytes;
7use exoware_proto::{
8    RangeReduceGroup, RangeReduceOp, RangeReduceRequest, RangeReduceResponse, RangeReduceResult,
9};
10use exoware_sdk as exoware_proto;
11use exoware_sdk::keys::Key;
12use exoware_sdk::kv_codec::{
13    canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key, eval_expr,
14    eval_predicate, expr_needs_value, predicate_needs_value, KvReducedValue,
15};
16
17#[derive(Debug)]
18pub enum RangeError {
19    Reduce(String),
20}
21
22impl std::fmt::Display for RangeError {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        match self {
25            RangeError::Reduce(s) => write!(f, "{s}"),
26        }
27    }
28}
29
30impl std::error::Error for RangeError {}
31
32#[derive(Debug)]
33enum ReductionState {
34    Count(u64),
35    Sum(Option<KvReducedValue>),
36    Min(Option<KvReducedValue>),
37    Max(Option<KvReducedValue>),
38}
39
40#[derive(Debug)]
41struct GroupedReductionState {
42    group_values: Vec<Option<KvReducedValue>>,
43    states: Vec<ReductionState>,
44}
45
46#[derive(Debug)]
47struct ExtractedReductionRow {
48    group_values: Vec<Option<KvReducedValue>>,
49    reducer_values: Vec<Option<KvReducedValue>>,
50}
51
52impl ReductionState {
53    fn from_op(op: RangeReduceOp) -> Self {
54        match op {
55            RangeReduceOp::CountAll | RangeReduceOp::CountField => Self::Count(0),
56            RangeReduceOp::SumField => Self::Sum(None),
57            RangeReduceOp::MinField => Self::Min(None),
58            RangeReduceOp::MaxField => Self::Max(None),
59        }
60    }
61
62    fn update(
63        &mut self,
64        op: RangeReduceOp,
65        value: Option<KvReducedValue>,
66    ) -> Result<(), RangeError> {
67        match (self, op) {
68            (Self::Count(count), RangeReduceOp::CountAll) => {
69                *count = count.saturating_add(1);
70                Ok(())
71            }
72            (Self::Count(count), RangeReduceOp::CountField) => {
73                if value.is_some() {
74                    *count = count.saturating_add(1);
75                }
76                Ok(())
77            }
78            (Self::Sum(sum), RangeReduceOp::SumField) => {
79                let Some(value) = value else {
80                    return Ok(());
81                };
82                match sum {
83                    Some(existing) => existing
84                        .checked_add_assign(&value)
85                        .map_err(RangeError::Reduce),
86                    None => {
87                        *sum = Some(value);
88                        Ok(())
89                    }
90                }
91            }
92            (Self::Min(current), RangeReduceOp::MinField) => {
93                update_extreme(current, value, Ordering::Less)
94            }
95            (Self::Max(current), RangeReduceOp::MaxField) => {
96                update_extreme(current, value, Ordering::Greater)
97            }
98            _ => Err(RangeError::Reduce(
99                "reduction state/op mismatch".to_string(),
100            )),
101        }
102    }
103
104    fn finish(self) -> Option<KvReducedValue> {
105        match self {
106            Self::Count(count) => Some(KvReducedValue::UInt64(count)),
107            Self::Sum(value) | Self::Min(value) | Self::Max(value) => value,
108        }
109    }
110}
111
112impl GroupedReductionState {
113    fn new(group_values: Vec<Option<KvReducedValue>>, request: &RangeReduceRequest) -> Self {
114        Self {
115            group_values,
116            states: request
117                .reducers
118                .iter()
119                .map(|reducer| ReductionState::from_op(reducer.op))
120                .collect(),
121        }
122    }
123
124    fn update(
125        &mut self,
126        request: &RangeReduceRequest,
127        reducer_values: Vec<Option<KvReducedValue>>,
128    ) -> Result<(), RangeError> {
129        for ((state, reducer), value) in self
130            .states
131            .iter_mut()
132            .zip(request.reducers.iter())
133            .zip(reducer_values)
134        {
135            state.update(reducer.op, value)?;
136        }
137        Ok(())
138    }
139
140    fn finish(self) -> RangeReduceGroup {
141        RangeReduceGroup {
142            group_values: self.group_values,
143            results: self
144                .states
145                .into_iter()
146                .map(|state| RangeReduceResult {
147                    value: state.finish(),
148                })
149                .collect(),
150        }
151    }
152}
153
154fn update_extreme(
155    current: &mut Option<KvReducedValue>,
156    candidate: Option<KvReducedValue>,
157    replace_when: Ordering,
158) -> Result<(), RangeError> {
159    let Some(candidate) = candidate else {
160        return Ok(());
161    };
162    match current {
163        Some(existing) => {
164            let ordering = candidate
165                .partial_cmp_same_kind(existing)
166                .ok_or_else(|| RangeError::Reduce("min/max type mismatch".to_string()))?;
167            if ordering == replace_when {
168                *current = Some(candidate);
169            }
170        }
171        None => {
172            *current = Some(candidate);
173        }
174    }
175    Ok(())
176}
177
178fn validate_reduce_request(request: &RangeReduceRequest) -> Result<(), RangeError> {
179    if request.reducers.is_empty() && request.group_by.is_empty() {
180        return Err(RangeError::Reduce(
181            "range reduction request requires at least one reducer or group-by field".to_string(),
182        ));
183    }
184    for reducer in &request.reducers {
185        match reducer.op {
186            RangeReduceOp::CountAll => {
187                if reducer.expr.is_some() {
188                    return Err(RangeError::Reduce(
189                        "count_all reducer must not specify an expression".to_string(),
190                    ));
191                }
192            }
193            RangeReduceOp::CountField
194            | RangeReduceOp::SumField
195            | RangeReduceOp::MinField
196            | RangeReduceOp::MaxField => {
197                if reducer.expr.is_none() {
198                    return Err(RangeError::Reduce(
199                        "expression reducer requires an expression".to_string(),
200                    ));
201                }
202            }
203        }
204    }
205    Ok(())
206}
207
208fn reduce_row_into_response(
209    key: &Key,
210    value: &Bytes,
211    request: &RangeReduceRequest,
212    scalar_states: Option<&mut [ReductionState]>,
213    grouped_states: &mut BTreeMap<Vec<u8>, GroupedReductionState>,
214) -> Result<(), RangeError> {
215    let Some(extracted) = extract_reduce_row(key, value, request)? else {
216        return Ok(());
217    };
218
219    if request.group_by.is_empty() {
220        let Some(states) = scalar_states else {
221            return Err(RangeError::Reduce(
222                "missing scalar reduction state for non-grouped request".to_string(),
223            ));
224        };
225        for ((state, reducer), value) in states
226            .iter_mut()
227            .zip(request.reducers.iter())
228            .zip(extracted.reducer_values)
229        {
230            state.update(reducer.op, value)?;
231        }
232        return Ok(());
233    }
234
235    let group_key = encode_reduced_group_key(&extracted.group_values);
236    let group = grouped_states
237        .entry(group_key)
238        .or_insert_with(|| GroupedReductionState::new(extracted.group_values.clone(), request));
239    group.update(request, extracted.reducer_values)?;
240    Ok(())
241}
242
243fn extract_reduce_row(
244    key: &Key,
245    value: &Bytes,
246    request: &RangeReduceRequest,
247) -> Result<Option<ExtractedReductionRow>, RangeError> {
248    let needs_value = request
249        .group_by
250        .iter()
251        .chain(
252            request
253                .reducers
254                .iter()
255                .filter_map(|reducer| reducer.expr.as_ref()),
256        )
257        .any(expr_needs_value)
258        || request.filter.as_ref().is_some_and(predicate_needs_value);
259    let decoded = if needs_value {
260        match decode_stored_row(value.as_ref()) {
261            Ok(row) => Some(row),
262            Err(_) => return Ok(None),
263        }
264    } else {
265        None
266    };
267    let archived = decoded.as_ref();
268
269    if let Some(filter) = &request.filter {
270        match eval_predicate(key, archived, filter) {
271            Ok(true) => {}
272            Ok(false) => return Ok(None),
273            Err(_) => return Ok(None),
274        }
275    }
276
277    let mut group_values = Vec::with_capacity(request.group_by.len());
278    for expr in &request.group_by {
279        let extracted_value = match eval_expr(key, archived, expr) {
280            Ok(value) => value,
281            Err(_) => return Ok(None),
282        };
283        group_values.push(extracted_value);
284    }
285    canonicalize_reduced_group_values(&mut group_values);
286
287    let mut reducer_values = Vec::with_capacity(request.reducers.len());
288    for reducer in &request.reducers {
289        let extracted_value = match (&reducer.expr, archived) {
290            (None, _) => None,
291            (Some(expr), _) => match eval_expr(key, archived, expr) {
292                Ok(value) => value,
293                Err(_) => return Ok(None),
294            },
295        };
296        reducer_values.push(extracted_value);
297    }
298
299    Ok(Some(ExtractedReductionRow {
300        group_values,
301        reducer_values,
302    }))
303}
304
305fn finalize_reduce_response(
306    scalar_states: Option<Vec<ReductionState>>,
307    grouped_states: BTreeMap<Vec<u8>, GroupedReductionState>,
308) -> RangeReduceResponse {
309    match scalar_states {
310        Some(states) => RangeReduceResponse {
311            results: states
312                .into_iter()
313                .map(|state| RangeReduceResult {
314                    value: state.finish(),
315                })
316                .collect(),
317            groups: Vec::new(),
318        },
319        None => RangeReduceResponse {
320            results: Vec::new(),
321            groups: grouped_states
322                .into_values()
323                .map(GroupedReductionState::finish)
324                .collect(),
325        },
326    }
327}
328
329pub(crate) struct RangeReducer<'a> {
330    request: &'a RangeReduceRequest,
331    scalar_states: Option<Vec<ReductionState>>,
332    grouped_states: BTreeMap<Vec<u8>, GroupedReductionState>,
333}
334
335impl<'a> RangeReducer<'a> {
336    pub(crate) fn new(request: &'a RangeReduceRequest) -> Result<Self, RangeError> {
337        validate_reduce_request(request)?;
338        Ok(Self {
339            request,
340            scalar_states: request.group_by.is_empty().then(|| {
341                request
342                    .reducers
343                    .iter()
344                    .map(|reducer| ReductionState::from_op(reducer.op))
345                    .collect::<Vec<_>>()
346            }),
347            grouped_states: BTreeMap::new(),
348        })
349    }
350
351    pub(crate) fn update(&mut self, key: &Key, value: &Bytes) -> Result<(), RangeError> {
352        reduce_row_into_response(
353            key,
354            value,
355            self.request,
356            self.scalar_states.as_deref_mut(),
357            &mut self.grouped_states,
358        )
359    }
360
361    pub(crate) fn finish(self) -> RangeReduceResponse {
362        finalize_reduce_response(self.scalar_states, self.grouped_states)
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use bytes::Bytes;
369    use commonware_codec::Encode as _;
370    use exoware_sdk::keys::Key;
371    use exoware_sdk::kv_codec::{
372        KvExpr, KvFieldKind, KvFieldRef, KvPredicate, KvPredicateCheck, KvPredicateConstraint,
373        KvReducedValue, StoredRow, StoredValue,
374    };
375    use exoware_sdk::{RangeReduceOp, RangeReduceRequest, RangeReducerSpec};
376
377    use super::RangeReducer;
378
379    fn make_row(key: &[u8], values: Vec<Option<StoredValue>>) -> (Key, Bytes) {
380        let encoded = StoredRow { values }.encode();
381        (Key::from(key.to_vec()), encoded)
382    }
383
384    fn reducer(op: RangeReduceOp, expr: Option<KvExpr>) -> RangeReducerSpec {
385        RangeReducerSpec { op, expr }
386    }
387
388    fn int64_value_field(index: u16) -> KvExpr {
389        KvExpr::Field(KvFieldRef::Value {
390            index,
391            kind: KvFieldKind::Int64,
392            nullable: true,
393        })
394    }
395
396    fn float64_value_field(index: u16) -> KvExpr {
397        KvExpr::Field(KvFieldRef::Value {
398            index,
399            kind: KvFieldKind::Float64,
400            nullable: true,
401        })
402    }
403
404    fn utf8_value_field(index: u16) -> KvExpr {
405        KvExpr::Field(KvFieldRef::Value {
406            index,
407            kind: KvFieldKind::Utf8,
408            nullable: true,
409        })
410    }
411
412    fn scalar_request(reducers: Vec<RangeReducerSpec>) -> RangeReduceRequest {
413        RangeReduceRequest {
414            reducers,
415            group_by: Vec::new(),
416            filter: None,
417        }
418    }
419
420    fn result_u64(v: u64) -> Option<KvReducedValue> {
421        Some(KvReducedValue::UInt64(v))
422    }
423
424    fn result_i64(v: i64) -> Option<KvReducedValue> {
425        Some(KvReducedValue::Int64(v))
426    }
427
428    fn result_f64(v: f64) -> Option<KvReducedValue> {
429        Some(KvReducedValue::Float64(v))
430    }
431
432    fn reduce(
433        rows: &[(Key, Bytes)],
434        request: &RangeReduceRequest,
435    ) -> Result<super::RangeReduceResponse, super::RangeError> {
436        let mut reducer = RangeReducer::new(request)?;
437        for (key, value) in rows {
438            reducer.update(key, value)?;
439        }
440        Ok(reducer.finish())
441    }
442
443    #[test]
444    fn count_all_over_empty_rows() {
445        let request = scalar_request(vec![reducer(RangeReduceOp::CountAll, None)]);
446        let response = reduce(&[], &request).unwrap();
447        assert_eq!(response.results.len(), 1);
448        assert_eq!(response.results[0].value, result_u64(0));
449    }
450
451    #[test]
452    fn count_all_over_multiple_rows() {
453        let rows = vec![
454            make_row(b"a", vec![]),
455            make_row(b"b", vec![]),
456            make_row(b"c", vec![]),
457        ];
458        let request = scalar_request(vec![reducer(RangeReduceOp::CountAll, None)]);
459        let response = reduce(&rows, &request).unwrap();
460        assert_eq!(response.results[0].value, result_u64(3));
461    }
462
463    #[test]
464    fn count_field_skips_nulls() {
465        let rows = vec![
466            make_row(b"a", vec![Some(StoredValue::Int64(1))]),
467            make_row(b"b", vec![None]),
468            make_row(b"c", vec![Some(StoredValue::Int64(3))]),
469        ];
470        let request = scalar_request(vec![reducer(
471            RangeReduceOp::CountField,
472            Some(int64_value_field(0)),
473        )]);
474        let response = reduce(&rows, &request).unwrap();
475        assert_eq!(response.results[0].value, result_u64(2));
476    }
477
478    #[test]
479    fn sum_int64_values() {
480        let rows = vec![
481            make_row(b"a", vec![Some(StoredValue::Int64(10))]),
482            make_row(b"b", vec![Some(StoredValue::Int64(20))]),
483            make_row(b"c", vec![Some(StoredValue::Int64(-5))]),
484        ];
485        let request = scalar_request(vec![reducer(
486            RangeReduceOp::SumField,
487            Some(int64_value_field(0)),
488        )]);
489        let response = reduce(&rows, &request).unwrap();
490        assert_eq!(response.results[0].value, result_i64(25));
491    }
492
493    #[test]
494    fn sum_float64_values() {
495        let rows = vec![
496            make_row(b"a", vec![Some(StoredValue::Float64(1.5))]),
497            make_row(b"b", vec![Some(StoredValue::Float64(2.5))]),
498        ];
499        let request = scalar_request(vec![reducer(
500            RangeReduceOp::SumField,
501            Some(float64_value_field(0)),
502        )]);
503        let response = reduce(&rows, &request).unwrap();
504        assert_eq!(response.results[0].value, result_f64(4.0));
505    }
506
507    #[test]
508    fn min_selects_smallest() {
509        let rows = vec![
510            make_row(b"a", vec![Some(StoredValue::Int64(30))]),
511            make_row(b"b", vec![Some(StoredValue::Int64(10))]),
512            make_row(b"c", vec![Some(StoredValue::Int64(20))]),
513        ];
514        let request = scalar_request(vec![reducer(
515            RangeReduceOp::MinField,
516            Some(int64_value_field(0)),
517        )]);
518        let response = reduce(&rows, &request).unwrap();
519        assert_eq!(response.results[0].value, result_i64(10));
520    }
521
522    #[test]
523    fn max_selects_largest() {
524        let rows = vec![
525            make_row(b"a", vec![Some(StoredValue::Int64(30))]),
526            make_row(b"b", vec![Some(StoredValue::Int64(10))]),
527            make_row(b"c", vec![Some(StoredValue::Int64(50))]),
528        ];
529        let request = scalar_request(vec![reducer(
530            RangeReduceOp::MaxField,
531            Some(int64_value_field(0)),
532        )]);
533        let response = reduce(&rows, &request).unwrap();
534        assert_eq!(response.results[0].value, result_i64(50));
535    }
536
537    #[test]
538    fn grouped_count() {
539        let rows = vec![
540            make_row(b"a", vec![Some(StoredValue::Utf8("x".into()))]),
541            make_row(b"b", vec![Some(StoredValue::Utf8("y".into()))]),
542            make_row(b"c", vec![Some(StoredValue::Utf8("x".into()))]),
543            make_row(b"d", vec![Some(StoredValue::Utf8("y".into()))]),
544            make_row(b"e", vec![Some(StoredValue::Utf8("x".into()))]),
545        ];
546        let request = RangeReduceRequest {
547            reducers: vec![reducer(RangeReduceOp::CountAll, None)],
548            group_by: vec![utf8_value_field(0)],
549            filter: None,
550        };
551        let response = reduce(&rows, &request).unwrap();
552        assert!(response.results.is_empty());
553        assert_eq!(response.groups.len(), 2);
554
555        let mut counts: Vec<(Option<KvReducedValue>, Option<KvReducedValue>)> = response
556            .groups
557            .iter()
558            .map(|g| (g.group_values[0].clone(), g.results[0].value.clone()))
559            .collect();
560        counts.sort_by(|a, b| {
561            let a_str = match &a.0 {
562                Some(KvReducedValue::Utf8(s)) => s.clone(),
563                _ => String::new(),
564            };
565            let b_str = match &b.0 {
566                Some(KvReducedValue::Utf8(s)) => s.clone(),
567                _ => String::new(),
568            };
569            a_str.cmp(&b_str)
570        });
571        assert_eq!(
572            counts,
573            vec![
574                (Some(KvReducedValue::Utf8("x".into())), result_u64(3),),
575                (Some(KvReducedValue::Utf8("y".into())), result_u64(2),),
576            ]
577        );
578    }
579
580    #[test]
581    fn validates_empty_request() {
582        let request = RangeReduceRequest {
583            reducers: Vec::new(),
584            group_by: Vec::new(),
585            filter: None,
586        };
587        let err = reduce(&[], &request).unwrap_err();
588        assert!(
589            err.to_string().contains("at least one reducer"),
590            "unexpected error: {err}"
591        );
592    }
593
594    #[test]
595    fn count_all_rejects_expression() {
596        let request = scalar_request(vec![reducer(
597            RangeReduceOp::CountAll,
598            Some(int64_value_field(0)),
599        )]);
600        let err = reduce(&[], &request).unwrap_err();
601        assert!(
602            err.to_string()
603                .contains("count_all reducer must not specify an expression"),
604            "unexpected error: {err}"
605        );
606    }
607
608    #[test]
609    fn expression_reducer_requires_expression() {
610        for op in [
611            RangeReduceOp::SumField,
612            RangeReduceOp::MinField,
613            RangeReduceOp::MaxField,
614            RangeReduceOp::CountField,
615        ] {
616            let request = scalar_request(vec![reducer(op, None)]);
617            let err = reduce(&[], &request).unwrap_err();
618            assert!(
619                err.to_string()
620                    .contains("expression reducer requires an expression"),
621                "op {op:?} should require an expression, got: {err}"
622            );
623        }
624    }
625
626    #[test]
627    fn filter_excludes_rows() {
628        let rows = vec![
629            make_row(b"a", vec![Some(StoredValue::Int64(10))]),
630            make_row(b"b", vec![Some(StoredValue::Int64(20))]),
631            make_row(b"c", vec![Some(StoredValue::Int64(30))]),
632        ];
633        let request = RangeReduceRequest {
634            reducers: vec![reducer(RangeReduceOp::SumField, Some(int64_value_field(0)))],
635            group_by: Vec::new(),
636            filter: Some(KvPredicate {
637                checks: vec![KvPredicateCheck {
638                    field: KvFieldRef::Value {
639                        index: 0,
640                        kind: KvFieldKind::Int64,
641                        nullable: false,
642                    },
643                    constraint: KvPredicateConstraint::IntRange {
644                        min: Some(15),
645                        max: None,
646                    },
647                }],
648                contradiction: false,
649            }),
650        };
651        let response = reduce(&rows, &request).unwrap();
652        assert_eq!(response.results[0].value, result_i64(50));
653    }
654
655    #[test]
656    fn scalar_reducer_handles_multiple_specs() {
657        let rows = vec![
658            make_row(b"a", vec![Some(StoredValue::Int64(10))]),
659            make_row(b"b", vec![None]),
660            make_row(b"c", vec![Some(StoredValue::Int64(30))]),
661        ];
662        let request = scalar_request(vec![
663            reducer(RangeReduceOp::CountAll, None),
664            reducer(RangeReduceOp::SumField, Some(int64_value_field(0))),
665        ]);
666        let response = reduce(&rows, &request).unwrap();
667        assert_eq!(response.results.len(), 2);
668        assert_eq!(response.results[0].value, result_u64(3));
669        assert_eq!(response.results[1].value, result_i64(40));
670    }
671
672    #[test]
673    fn grouped_reducer_sums_per_group() {
674        let rows = vec![
675            make_row(
676                b"a",
677                vec![
678                    Some(StoredValue::Utf8("west".into())),
679                    Some(StoredValue::Int64(10)),
680                ],
681            ),
682            make_row(
683                b"b",
684                vec![
685                    Some(StoredValue::Utf8("east".into())),
686                    Some(StoredValue::Int64(20)),
687                ],
688            ),
689            make_row(
690                b"c",
691                vec![
692                    Some(StoredValue::Utf8("west".into())),
693                    Some(StoredValue::Int64(30)),
694                ],
695            ),
696        ];
697        let request = RangeReduceRequest {
698            reducers: vec![reducer(RangeReduceOp::SumField, Some(int64_value_field(1)))],
699            group_by: vec![utf8_value_field(0)],
700            filter: None,
701        };
702        let response = reduce(&rows, &request).unwrap();
703        assert!(response.results.is_empty());
704        assert_eq!(response.groups.len(), 2);
705
706        let mut sums: Vec<(Option<KvReducedValue>, Option<KvReducedValue>)> = response
707            .groups
708            .iter()
709            .map(|g| (g.group_values[0].clone(), g.results[0].value.clone()))
710            .collect();
711        sums.sort_by(|a, b| {
712            let a_str = match &a.0 {
713                Some(KvReducedValue::Utf8(s)) => s.clone(),
714                _ => String::new(),
715            };
716            let b_str = match &b.0 {
717                Some(KvReducedValue::Utf8(s)) => s.clone(),
718                _ => String::new(),
719            };
720            a_str.cmp(&b_str)
721        });
722        assert_eq!(
723            sums,
724            vec![
725                (Some(KvReducedValue::Utf8("east".into())), result_i64(20),),
726                (Some(KvReducedValue::Utf8("west".into())), result_i64(40),),
727            ]
728        );
729    }
730
731    #[test]
732    fn filtered_reducer_counts_matching_rows() {
733        let rows = vec![
734            make_row(b"a", vec![Some(StoredValue::Int64(10))]),
735            make_row(b"b", vec![Some(StoredValue::Int64(20))]),
736            make_row(b"c", vec![Some(StoredValue::Int64(30))]),
737        ];
738        let request = RangeReduceRequest {
739            reducers: vec![reducer(RangeReduceOp::CountAll, None)],
740            group_by: Vec::new(),
741            filter: Some(KvPredicate {
742                checks: vec![KvPredicateCheck {
743                    field: KvFieldRef::Value {
744                        index: 0,
745                        kind: KvFieldKind::Int64,
746                        nullable: false,
747                    },
748                    constraint: KvPredicateConstraint::IntRange {
749                        min: Some(20),
750                        max: None,
751                    },
752                }],
753                contradiction: false,
754            }),
755        };
756        let response = reduce(&rows, &request).unwrap();
757        assert_eq!(response.results.len(), 1);
758        assert_eq!(response.results[0].value, result_u64(2));
759    }
760
761    #[test]
762    fn mixed_type_min_max_returns_error() {
763        use super::ReductionState;
764
765        let mut state = ReductionState::Min(Some(KvReducedValue::Int64(10)));
766        let result = state.update(
767            RangeReduceOp::MinField,
768            Some(KvReducedValue::Utf8("hello".into())),
769        );
770        assert!(result.is_err());
771        assert!(
772            result.unwrap_err().to_string().contains("type mismatch"),
773            "expected type mismatch error"
774        );
775    }
776}