1use std::cmp::Ordering;
4use std::collections::BTreeMap;
5
6use bytes::Bytes;
7use exoware_proto::{
8 RangeReduceGroup, RangeReduceOp, RangeReduceRequest, RangeReduceResponse, RangeReduceResult,
9};
10use exoware_sdk as exoware_proto;
11use exoware_sdk::keys::Key;
12use exoware_sdk::kv_codec::{
13 canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key, eval_expr,
14 eval_predicate, expr_needs_value, predicate_needs_value, KvReducedValue,
15};
16
17#[derive(Debug)]
18pub enum RangeError {
19 Reduce(String),
20}
21
22impl std::fmt::Display for RangeError {
23 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24 match self {
25 RangeError::Reduce(s) => write!(f, "{s}"),
26 }
27 }
28}
29
30impl std::error::Error for RangeError {}
31
32#[derive(Debug)]
33enum ReductionState {
34 Count(u64),
35 Sum(Option<KvReducedValue>),
36 Min(Option<KvReducedValue>),
37 Max(Option<KvReducedValue>),
38}
39
40#[derive(Debug)]
41struct GroupedReductionState {
42 group_values: Vec<Option<KvReducedValue>>,
43 states: Vec<ReductionState>,
44}
45
46#[derive(Debug)]
47struct ExtractedReductionRow {
48 group_values: Vec<Option<KvReducedValue>>,
49 reducer_values: Vec<Option<KvReducedValue>>,
50}
51
52impl ReductionState {
53 fn from_op(op: RangeReduceOp) -> Self {
54 match op {
55 RangeReduceOp::CountAll | RangeReduceOp::CountField => Self::Count(0),
56 RangeReduceOp::SumField => Self::Sum(None),
57 RangeReduceOp::MinField => Self::Min(None),
58 RangeReduceOp::MaxField => Self::Max(None),
59 }
60 }
61
62 fn update(
63 &mut self,
64 op: RangeReduceOp,
65 value: Option<KvReducedValue>,
66 ) -> Result<(), RangeError> {
67 match (self, op) {
68 (Self::Count(count), RangeReduceOp::CountAll) => {
69 *count = count.saturating_add(1);
70 Ok(())
71 }
72 (Self::Count(count), RangeReduceOp::CountField) => {
73 if value.is_some() {
74 *count = count.saturating_add(1);
75 }
76 Ok(())
77 }
78 (Self::Sum(sum), RangeReduceOp::SumField) => {
79 let Some(value) = value else {
80 return Ok(());
81 };
82 match sum {
83 Some(existing) => existing
84 .checked_add_assign(&value)
85 .map_err(RangeError::Reduce),
86 None => {
87 *sum = Some(value);
88 Ok(())
89 }
90 }
91 }
92 (Self::Min(current), RangeReduceOp::MinField) => {
93 update_extreme(current, value, Ordering::Less)
94 }
95 (Self::Max(current), RangeReduceOp::MaxField) => {
96 update_extreme(current, value, Ordering::Greater)
97 }
98 _ => Err(RangeError::Reduce(
99 "reduction state/op mismatch".to_string(),
100 )),
101 }
102 }
103
104 fn finish(self) -> Option<KvReducedValue> {
105 match self {
106 Self::Count(count) => Some(KvReducedValue::UInt64(count)),
107 Self::Sum(value) | Self::Min(value) | Self::Max(value) => value,
108 }
109 }
110}
111
112impl GroupedReductionState {
113 fn new(group_values: Vec<Option<KvReducedValue>>, request: &RangeReduceRequest) -> Self {
114 Self {
115 group_values,
116 states: request
117 .reducers
118 .iter()
119 .map(|reducer| ReductionState::from_op(reducer.op))
120 .collect(),
121 }
122 }
123
124 fn update(
125 &mut self,
126 request: &RangeReduceRequest,
127 reducer_values: Vec<Option<KvReducedValue>>,
128 ) -> Result<(), RangeError> {
129 for ((state, reducer), value) in self
130 .states
131 .iter_mut()
132 .zip(request.reducers.iter())
133 .zip(reducer_values)
134 {
135 state.update(reducer.op, value)?;
136 }
137 Ok(())
138 }
139
140 fn finish(self) -> RangeReduceGroup {
141 RangeReduceGroup {
142 group_values: self.group_values,
143 results: self
144 .states
145 .into_iter()
146 .map(|state| RangeReduceResult {
147 value: state.finish(),
148 })
149 .collect(),
150 }
151 }
152}
153
154fn update_extreme(
155 current: &mut Option<KvReducedValue>,
156 candidate: Option<KvReducedValue>,
157 replace_when: Ordering,
158) -> Result<(), RangeError> {
159 let Some(candidate) = candidate else {
160 return Ok(());
161 };
162 match current {
163 Some(existing) => {
164 let ordering = candidate
165 .partial_cmp_same_kind(existing)
166 .ok_or_else(|| RangeError::Reduce("min/max type mismatch".to_string()))?;
167 if ordering == replace_when {
168 *current = Some(candidate);
169 }
170 }
171 None => {
172 *current = Some(candidate);
173 }
174 }
175 Ok(())
176}
177
178fn validate_reduce_request(request: &RangeReduceRequest) -> Result<(), RangeError> {
179 if request.reducers.is_empty() && request.group_by.is_empty() {
180 return Err(RangeError::Reduce(
181 "range reduction request requires at least one reducer or group-by field".to_string(),
182 ));
183 }
184 for reducer in &request.reducers {
185 match reducer.op {
186 RangeReduceOp::CountAll => {
187 if reducer.expr.is_some() {
188 return Err(RangeError::Reduce(
189 "count_all reducer must not specify an expression".to_string(),
190 ));
191 }
192 }
193 RangeReduceOp::CountField
194 | RangeReduceOp::SumField
195 | RangeReduceOp::MinField
196 | RangeReduceOp::MaxField => {
197 if reducer.expr.is_none() {
198 return Err(RangeError::Reduce(
199 "expression reducer requires an expression".to_string(),
200 ));
201 }
202 }
203 }
204 }
205 Ok(())
206}
207
208fn reduce_row_into_response(
209 key: &Key,
210 value: &Bytes,
211 request: &RangeReduceRequest,
212 scalar_states: Option<&mut [ReductionState]>,
213 grouped_states: &mut BTreeMap<Vec<u8>, GroupedReductionState>,
214) -> Result<(), RangeError> {
215 let Some(extracted) = extract_reduce_row(key, value, request)? else {
216 return Ok(());
217 };
218
219 if request.group_by.is_empty() {
220 let Some(states) = scalar_states else {
221 return Err(RangeError::Reduce(
222 "missing scalar reduction state for non-grouped request".to_string(),
223 ));
224 };
225 for ((state, reducer), value) in states
226 .iter_mut()
227 .zip(request.reducers.iter())
228 .zip(extracted.reducer_values)
229 {
230 state.update(reducer.op, value)?;
231 }
232 return Ok(());
233 }
234
235 let group_key = encode_reduced_group_key(&extracted.group_values);
236 let group = grouped_states
237 .entry(group_key)
238 .or_insert_with(|| GroupedReductionState::new(extracted.group_values.clone(), request));
239 group.update(request, extracted.reducer_values)?;
240 Ok(())
241}
242
243fn extract_reduce_row(
244 key: &Key,
245 value: &Bytes,
246 request: &RangeReduceRequest,
247) -> Result<Option<ExtractedReductionRow>, RangeError> {
248 let needs_value = request
249 .group_by
250 .iter()
251 .chain(
252 request
253 .reducers
254 .iter()
255 .filter_map(|reducer| reducer.expr.as_ref()),
256 )
257 .any(expr_needs_value)
258 || request.filter.as_ref().is_some_and(predicate_needs_value);
259 let decoded = if needs_value {
260 match decode_stored_row(value.as_ref()) {
261 Ok(row) => Some(row),
262 Err(_) => return Ok(None),
263 }
264 } else {
265 None
266 };
267 let archived = decoded.as_ref();
268
269 if let Some(filter) = &request.filter {
270 match eval_predicate(key, archived, filter) {
271 Ok(true) => {}
272 Ok(false) => return Ok(None),
273 Err(_) => return Ok(None),
274 }
275 }
276
277 let mut group_values = Vec::with_capacity(request.group_by.len());
278 for expr in &request.group_by {
279 let extracted_value = match eval_expr(key, archived, expr) {
280 Ok(value) => value,
281 Err(_) => return Ok(None),
282 };
283 group_values.push(extracted_value);
284 }
285 canonicalize_reduced_group_values(&mut group_values);
286
287 let mut reducer_values = Vec::with_capacity(request.reducers.len());
288 for reducer in &request.reducers {
289 let extracted_value = match (&reducer.expr, archived) {
290 (None, _) => None,
291 (Some(expr), _) => match eval_expr(key, archived, expr) {
292 Ok(value) => value,
293 Err(_) => return Ok(None),
294 },
295 };
296 reducer_values.push(extracted_value);
297 }
298
299 Ok(Some(ExtractedReductionRow {
300 group_values,
301 reducer_values,
302 }))
303}
304
305fn finalize_reduce_response(
306 scalar_states: Option<Vec<ReductionState>>,
307 grouped_states: BTreeMap<Vec<u8>, GroupedReductionState>,
308) -> RangeReduceResponse {
309 match scalar_states {
310 Some(states) => RangeReduceResponse {
311 results: states
312 .into_iter()
313 .map(|state| RangeReduceResult {
314 value: state.finish(),
315 })
316 .collect(),
317 groups: Vec::new(),
318 },
319 None => RangeReduceResponse {
320 results: Vec::new(),
321 groups: grouped_states
322 .into_values()
323 .map(GroupedReductionState::finish)
324 .collect(),
325 },
326 }
327}
328
329pub(crate) struct RangeReducer<'a> {
330 request: &'a RangeReduceRequest,
331 scalar_states: Option<Vec<ReductionState>>,
332 grouped_states: BTreeMap<Vec<u8>, GroupedReductionState>,
333}
334
335impl<'a> RangeReducer<'a> {
336 pub(crate) fn new(request: &'a RangeReduceRequest) -> Result<Self, RangeError> {
337 validate_reduce_request(request)?;
338 Ok(Self {
339 request,
340 scalar_states: request.group_by.is_empty().then(|| {
341 request
342 .reducers
343 .iter()
344 .map(|reducer| ReductionState::from_op(reducer.op))
345 .collect::<Vec<_>>()
346 }),
347 grouped_states: BTreeMap::new(),
348 })
349 }
350
351 pub(crate) fn update(&mut self, key: &Key, value: &Bytes) -> Result<(), RangeError> {
352 reduce_row_into_response(
353 key,
354 value,
355 self.request,
356 self.scalar_states.as_deref_mut(),
357 &mut self.grouped_states,
358 )
359 }
360
361 pub(crate) fn finish(self) -> RangeReduceResponse {
362 finalize_reduce_response(self.scalar_states, self.grouped_states)
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use bytes::Bytes;
369 use commonware_codec::Encode as _;
370 use exoware_sdk::keys::Key;
371 use exoware_sdk::kv_codec::{
372 KvExpr, KvFieldKind, KvFieldRef, KvPredicate, KvPredicateCheck, KvPredicateConstraint,
373 KvReducedValue, StoredRow, StoredValue,
374 };
375 use exoware_sdk::{RangeReduceOp, RangeReduceRequest, RangeReducerSpec};
376
377 use super::RangeReducer;
378
379 fn make_row(key: &[u8], values: Vec<Option<StoredValue>>) -> (Key, Bytes) {
380 let encoded = StoredRow { values }.encode();
381 (Key::from(key.to_vec()), encoded)
382 }
383
384 fn reducer(op: RangeReduceOp, expr: Option<KvExpr>) -> RangeReducerSpec {
385 RangeReducerSpec { op, expr }
386 }
387
388 fn int64_value_field(index: u16) -> KvExpr {
389 KvExpr::Field(KvFieldRef::Value {
390 index,
391 kind: KvFieldKind::Int64,
392 nullable: true,
393 })
394 }
395
396 fn float64_value_field(index: u16) -> KvExpr {
397 KvExpr::Field(KvFieldRef::Value {
398 index,
399 kind: KvFieldKind::Float64,
400 nullable: true,
401 })
402 }
403
404 fn utf8_value_field(index: u16) -> KvExpr {
405 KvExpr::Field(KvFieldRef::Value {
406 index,
407 kind: KvFieldKind::Utf8,
408 nullable: true,
409 })
410 }
411
412 fn scalar_request(reducers: Vec<RangeReducerSpec>) -> RangeReduceRequest {
413 RangeReduceRequest {
414 reducers,
415 group_by: Vec::new(),
416 filter: None,
417 }
418 }
419
420 fn result_u64(v: u64) -> Option<KvReducedValue> {
421 Some(KvReducedValue::UInt64(v))
422 }
423
424 fn result_i64(v: i64) -> Option<KvReducedValue> {
425 Some(KvReducedValue::Int64(v))
426 }
427
428 fn result_f64(v: f64) -> Option<KvReducedValue> {
429 Some(KvReducedValue::Float64(v))
430 }
431
432 fn reduce(
433 rows: &[(Key, Bytes)],
434 request: &RangeReduceRequest,
435 ) -> Result<super::RangeReduceResponse, super::RangeError> {
436 let mut reducer = RangeReducer::new(request)?;
437 for (key, value) in rows {
438 reducer.update(key, value)?;
439 }
440 Ok(reducer.finish())
441 }
442
443 #[test]
444 fn count_all_over_empty_rows() {
445 let request = scalar_request(vec![reducer(RangeReduceOp::CountAll, None)]);
446 let response = reduce(&[], &request).unwrap();
447 assert_eq!(response.results.len(), 1);
448 assert_eq!(response.results[0].value, result_u64(0));
449 }
450
451 #[test]
452 fn count_all_over_multiple_rows() {
453 let rows = vec![
454 make_row(b"a", vec![]),
455 make_row(b"b", vec![]),
456 make_row(b"c", vec![]),
457 ];
458 let request = scalar_request(vec![reducer(RangeReduceOp::CountAll, None)]);
459 let response = reduce(&rows, &request).unwrap();
460 assert_eq!(response.results[0].value, result_u64(3));
461 }
462
463 #[test]
464 fn count_field_skips_nulls() {
465 let rows = vec![
466 make_row(b"a", vec![Some(StoredValue::Int64(1))]),
467 make_row(b"b", vec![None]),
468 make_row(b"c", vec![Some(StoredValue::Int64(3))]),
469 ];
470 let request = scalar_request(vec![reducer(
471 RangeReduceOp::CountField,
472 Some(int64_value_field(0)),
473 )]);
474 let response = reduce(&rows, &request).unwrap();
475 assert_eq!(response.results[0].value, result_u64(2));
476 }
477
478 #[test]
479 fn sum_int64_values() {
480 let rows = vec![
481 make_row(b"a", vec![Some(StoredValue::Int64(10))]),
482 make_row(b"b", vec![Some(StoredValue::Int64(20))]),
483 make_row(b"c", vec![Some(StoredValue::Int64(-5))]),
484 ];
485 let request = scalar_request(vec![reducer(
486 RangeReduceOp::SumField,
487 Some(int64_value_field(0)),
488 )]);
489 let response = reduce(&rows, &request).unwrap();
490 assert_eq!(response.results[0].value, result_i64(25));
491 }
492
493 #[test]
494 fn sum_float64_values() {
495 let rows = vec![
496 make_row(b"a", vec![Some(StoredValue::Float64(1.5))]),
497 make_row(b"b", vec![Some(StoredValue::Float64(2.5))]),
498 ];
499 let request = scalar_request(vec![reducer(
500 RangeReduceOp::SumField,
501 Some(float64_value_field(0)),
502 )]);
503 let response = reduce(&rows, &request).unwrap();
504 assert_eq!(response.results[0].value, result_f64(4.0));
505 }
506
507 #[test]
508 fn min_selects_smallest() {
509 let rows = vec![
510 make_row(b"a", vec![Some(StoredValue::Int64(30))]),
511 make_row(b"b", vec![Some(StoredValue::Int64(10))]),
512 make_row(b"c", vec![Some(StoredValue::Int64(20))]),
513 ];
514 let request = scalar_request(vec![reducer(
515 RangeReduceOp::MinField,
516 Some(int64_value_field(0)),
517 )]);
518 let response = reduce(&rows, &request).unwrap();
519 assert_eq!(response.results[0].value, result_i64(10));
520 }
521
522 #[test]
523 fn max_selects_largest() {
524 let rows = vec![
525 make_row(b"a", vec![Some(StoredValue::Int64(30))]),
526 make_row(b"b", vec![Some(StoredValue::Int64(10))]),
527 make_row(b"c", vec![Some(StoredValue::Int64(50))]),
528 ];
529 let request = scalar_request(vec![reducer(
530 RangeReduceOp::MaxField,
531 Some(int64_value_field(0)),
532 )]);
533 let response = reduce(&rows, &request).unwrap();
534 assert_eq!(response.results[0].value, result_i64(50));
535 }
536
537 #[test]
538 fn grouped_count() {
539 let rows = vec![
540 make_row(b"a", vec![Some(StoredValue::Utf8("x".into()))]),
541 make_row(b"b", vec![Some(StoredValue::Utf8("y".into()))]),
542 make_row(b"c", vec![Some(StoredValue::Utf8("x".into()))]),
543 make_row(b"d", vec![Some(StoredValue::Utf8("y".into()))]),
544 make_row(b"e", vec![Some(StoredValue::Utf8("x".into()))]),
545 ];
546 let request = RangeReduceRequest {
547 reducers: vec![reducer(RangeReduceOp::CountAll, None)],
548 group_by: vec![utf8_value_field(0)],
549 filter: None,
550 };
551 let response = reduce(&rows, &request).unwrap();
552 assert!(response.results.is_empty());
553 assert_eq!(response.groups.len(), 2);
554
555 let mut counts: Vec<(Option<KvReducedValue>, Option<KvReducedValue>)> = response
556 .groups
557 .iter()
558 .map(|g| (g.group_values[0].clone(), g.results[0].value.clone()))
559 .collect();
560 counts.sort_by(|a, b| {
561 let a_str = match &a.0 {
562 Some(KvReducedValue::Utf8(s)) => s.clone(),
563 _ => String::new(),
564 };
565 let b_str = match &b.0 {
566 Some(KvReducedValue::Utf8(s)) => s.clone(),
567 _ => String::new(),
568 };
569 a_str.cmp(&b_str)
570 });
571 assert_eq!(
572 counts,
573 vec![
574 (Some(KvReducedValue::Utf8("x".into())), result_u64(3),),
575 (Some(KvReducedValue::Utf8("y".into())), result_u64(2),),
576 ]
577 );
578 }
579
580 #[test]
581 fn validates_empty_request() {
582 let request = RangeReduceRequest {
583 reducers: Vec::new(),
584 group_by: Vec::new(),
585 filter: None,
586 };
587 let err = reduce(&[], &request).unwrap_err();
588 assert!(
589 err.to_string().contains("at least one reducer"),
590 "unexpected error: {err}"
591 );
592 }
593
594 #[test]
595 fn count_all_rejects_expression() {
596 let request = scalar_request(vec![reducer(
597 RangeReduceOp::CountAll,
598 Some(int64_value_field(0)),
599 )]);
600 let err = reduce(&[], &request).unwrap_err();
601 assert!(
602 err.to_string()
603 .contains("count_all reducer must not specify an expression"),
604 "unexpected error: {err}"
605 );
606 }
607
608 #[test]
609 fn expression_reducer_requires_expression() {
610 for op in [
611 RangeReduceOp::SumField,
612 RangeReduceOp::MinField,
613 RangeReduceOp::MaxField,
614 RangeReduceOp::CountField,
615 ] {
616 let request = scalar_request(vec![reducer(op, None)]);
617 let err = reduce(&[], &request).unwrap_err();
618 assert!(
619 err.to_string()
620 .contains("expression reducer requires an expression"),
621 "op {op:?} should require an expression, got: {err}"
622 );
623 }
624 }
625
626 #[test]
627 fn filter_excludes_rows() {
628 let rows = vec![
629 make_row(b"a", vec![Some(StoredValue::Int64(10))]),
630 make_row(b"b", vec![Some(StoredValue::Int64(20))]),
631 make_row(b"c", vec![Some(StoredValue::Int64(30))]),
632 ];
633 let request = RangeReduceRequest {
634 reducers: vec![reducer(RangeReduceOp::SumField, Some(int64_value_field(0)))],
635 group_by: Vec::new(),
636 filter: Some(KvPredicate {
637 checks: vec![KvPredicateCheck {
638 field: KvFieldRef::Value {
639 index: 0,
640 kind: KvFieldKind::Int64,
641 nullable: false,
642 },
643 constraint: KvPredicateConstraint::IntRange {
644 min: Some(15),
645 max: None,
646 },
647 }],
648 contradiction: false,
649 }),
650 };
651 let response = reduce(&rows, &request).unwrap();
652 assert_eq!(response.results[0].value, result_i64(50));
653 }
654
655 #[test]
656 fn scalar_reducer_handles_multiple_specs() {
657 let rows = vec![
658 make_row(b"a", vec![Some(StoredValue::Int64(10))]),
659 make_row(b"b", vec![None]),
660 make_row(b"c", vec![Some(StoredValue::Int64(30))]),
661 ];
662 let request = scalar_request(vec![
663 reducer(RangeReduceOp::CountAll, None),
664 reducer(RangeReduceOp::SumField, Some(int64_value_field(0))),
665 ]);
666 let response = reduce(&rows, &request).unwrap();
667 assert_eq!(response.results.len(), 2);
668 assert_eq!(response.results[0].value, result_u64(3));
669 assert_eq!(response.results[1].value, result_i64(40));
670 }
671
672 #[test]
673 fn grouped_reducer_sums_per_group() {
674 let rows = vec![
675 make_row(
676 b"a",
677 vec![
678 Some(StoredValue::Utf8("west".into())),
679 Some(StoredValue::Int64(10)),
680 ],
681 ),
682 make_row(
683 b"b",
684 vec![
685 Some(StoredValue::Utf8("east".into())),
686 Some(StoredValue::Int64(20)),
687 ],
688 ),
689 make_row(
690 b"c",
691 vec![
692 Some(StoredValue::Utf8("west".into())),
693 Some(StoredValue::Int64(30)),
694 ],
695 ),
696 ];
697 let request = RangeReduceRequest {
698 reducers: vec![reducer(RangeReduceOp::SumField, Some(int64_value_field(1)))],
699 group_by: vec![utf8_value_field(0)],
700 filter: None,
701 };
702 let response = reduce(&rows, &request).unwrap();
703 assert!(response.results.is_empty());
704 assert_eq!(response.groups.len(), 2);
705
706 let mut sums: Vec<(Option<KvReducedValue>, Option<KvReducedValue>)> = response
707 .groups
708 .iter()
709 .map(|g| (g.group_values[0].clone(), g.results[0].value.clone()))
710 .collect();
711 sums.sort_by(|a, b| {
712 let a_str = match &a.0 {
713 Some(KvReducedValue::Utf8(s)) => s.clone(),
714 _ => String::new(),
715 };
716 let b_str = match &b.0 {
717 Some(KvReducedValue::Utf8(s)) => s.clone(),
718 _ => String::new(),
719 };
720 a_str.cmp(&b_str)
721 });
722 assert_eq!(
723 sums,
724 vec![
725 (Some(KvReducedValue::Utf8("east".into())), result_i64(20),),
726 (Some(KvReducedValue::Utf8("west".into())), result_i64(40),),
727 ]
728 );
729 }
730
731 #[test]
732 fn filtered_reducer_counts_matching_rows() {
733 let rows = vec![
734 make_row(b"a", vec![Some(StoredValue::Int64(10))]),
735 make_row(b"b", vec![Some(StoredValue::Int64(20))]),
736 make_row(b"c", vec![Some(StoredValue::Int64(30))]),
737 ];
738 let request = RangeReduceRequest {
739 reducers: vec![reducer(RangeReduceOp::CountAll, None)],
740 group_by: Vec::new(),
741 filter: Some(KvPredicate {
742 checks: vec![KvPredicateCheck {
743 field: KvFieldRef::Value {
744 index: 0,
745 kind: KvFieldKind::Int64,
746 nullable: false,
747 },
748 constraint: KvPredicateConstraint::IntRange {
749 min: Some(20),
750 max: None,
751 },
752 }],
753 contradiction: false,
754 }),
755 };
756 let response = reduce(&rows, &request).unwrap();
757 assert_eq!(response.results.len(), 1);
758 assert_eq!(response.results[0].value, result_u64(2));
759 }
760
761 #[test]
762 fn mixed_type_min_max_returns_error() {
763 use super::ReductionState;
764
765 let mut state = ReductionState::Min(Some(KvReducedValue::Int64(10)));
766 let result = state.update(
767 RangeReduceOp::MinField,
768 Some(KvReducedValue::Utf8("hello".into())),
769 );
770 assert!(result.is_err());
771 assert!(
772 result.unwrap_err().to_string().contains("type mismatch"),
773 "expected type mismatch error"
774 );
775 }
776}