Skip to main content

exoware_sql/
lib.rs

1pub mod prune;
2
3mod aggregate;
4mod builder;
5mod codec;
6mod diagnostics;
7mod filter;
8mod predicate;
9mod scan;
10mod schema;
11mod types;
12mod writer;
13
14pub use schema::KvSchema;
15pub use types::default_orders_index_specs;
16pub use types::{
17    CellValue, IndexBackfillEvent, IndexBackfillOptions, IndexBackfillReport, IndexLayout,
18    IndexSpec, TableColumnConfig,
19};
20pub use writer::{BatchWriter, TableWriter};
21
22#[cfg(test)]
23mod tests {
24    use super::aggregate::*;
25    use super::builder::*;
26    use super::codec::*;
27    use super::diagnostics::*;
28    use super::filter::*;
29    use super::predicate::*;
30    use super::scan::*;
31    use super::types::*;
32    use super::writer::*;
33    use super::*;
34    use commonware_codec::Encode;
35    use datafusion::arrow::array::{Float64Array, Int64Array, LargeStringArray, StringViewArray};
36    use datafusion::arrow::datatypes::{i256, DataType, TimeUnit};
37    use datafusion::arrow::record_batch::RecordBatch;
38    use datafusion::common::ScalarValue;
39    use datafusion::logical_expr::{Expr, Operator};
40    use datafusion::physical_plan::ExecutionPlan;
41    use datafusion::prelude::SessionContext;
42    use exoware_sdk_rs::keys::{Key, KeyCodec};
43    use exoware_sdk_rs::kv_codec::{
44        canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key,
45        eval_predicate, KvReducedValue, StoredRow,
46    };
47    use exoware_sdk_rs::{RangeReduceOp, RangeReduceRequest, StoreClient};
48    use std::collections::{BTreeMap, HashSet};
49    use std::ops::Bound::{Included, Unbounded};
50    use std::pin::Pin;
51    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
52    use std::sync::{Arc, Mutex};
53    use std::time::Duration;
54
55    use axum::Router;
56    use bytes::Bytes;
57    use connectrpc::{Chain, ConnectError, ConnectRpcService, Context};
58    use exoware_sdk_rs::connect_compression_registry;
59    use exoware_sdk_rs::kv_codec::{eval_expr, expr_needs_value};
60    use exoware_sdk_rs::store::ingest::v1::{
61        PutResponse as ProtoPutResponse, Service as IngestService,
62        ServiceServer as IngestServiceServer,
63    };
64    use exoware_sdk_rs::store::query::v1::RangeEntry as ProtoRangeEntry;
65    use exoware_sdk_rs::store::query::v1::{
66        GetManyEntry as ProtoGetManyEntry, GetManyFrame as ProtoGetManyFrame,
67        GetResponse as ProtoGetResponse, RangeFrame as ProtoRangeFrame,
68        ReduceResponse as ProtoReduceResponse, Service as QueryService,
69        ServiceServer as QueryServiceServer,
70    };
71    use exoware_sdk_rs::RangeMode;
72    use exoware_sdk_rs::{
73        parse_range_traversal_direction, to_domain_reduce_request, to_proto_optional_reduced_value,
74        to_proto_reduced_value, RangeTraversalDirection, RangeTraversalModeError,
75    };
76    use exoware_sdk_rs::{RangeReduceGroup, RangeReduceResponse, RangeReduceResult};
77    use futures::{stream, Stream, TryStreamExt};
78    use tokio::sync::{mpsc, oneshot, Notify};
79
80    /// Assert EXPLAIN text includes the same `query_stats=...` suffix as [`format_query_stats_explain`].
81    fn assert_explain_includes_query_stats_surface(
82        explain: &str,
83        surface: QueryStatsExplainSurface,
84    ) {
85        let expected = format!("query_stats={}", format_query_stats_explain(surface));
86        assert!(
87            explain.contains(&expected),
88            "expected EXPLAIN output to include `{expected}`\n{explain}"
89        );
90    }
91
92    fn simple_int64_model(prefix: u8) -> TableModel {
93        let config = KvTableConfig::new(
94            prefix,
95            vec![TableColumnConfig::new("id", DataType::Int64, false)],
96            vec!["id".to_string()],
97            vec![],
98        )
99        .unwrap();
100        TableModel::from_config(&config).unwrap()
101    }
102
103    fn codec_payload(codec: KeyCodec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
104        codec.read_payload(key, offset, len).expect("codec payload")
105    }
106
107    fn primary_payload(model: &TableModel, key: &Key, offset: usize, len: usize) -> Vec<u8> {
108        codec_payload(model.primary_key_codec, key, offset, len)
109    }
110
111    fn index_payload(spec: &ResolvedIndexSpec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
112        codec_payload(spec.codec, key, offset, len)
113    }
114
115    fn matches_primary_key(table_prefix: u8, key: &Key) -> bool {
116        primary_key_codec(table_prefix)
117            .expect("primary codec")
118            .matches(key)
119    }
120
121    fn matches_secondary_index_key(table_prefix: u8, index_id: u8, key: &Key) -> bool {
122        secondary_index_codec(table_prefix, index_id)
123            .expect("secondary codec")
124            .matches(key)
125    }
126
127    fn test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
128        let config = KvTableConfig::new(
129            0,
130            vec![
131                TableColumnConfig::new("region", DataType::Utf8, false),
132                TableColumnConfig::new("customer_id", DataType::Int64, false),
133                TableColumnConfig::new("order_id", DataType::Int64, false),
134                TableColumnConfig::new("amount_cents", DataType::Int64, false),
135                TableColumnConfig::new("status", DataType::Utf8, false),
136            ],
137            vec!["order_id".to_string()],
138            vec![
139                IndexSpec::new(
140                    "region_customer",
141                    vec!["region".to_string(), "customer_id".to_string()],
142                )
143                .expect("valid"),
144                IndexSpec::new(
145                    "status_customer",
146                    vec!["status".to_string(), "customer_id".to_string()],
147                )
148                .expect("valid"),
149            ],
150        )
151        .expect("valid config");
152        let model = TableModel::from_config(&config).expect("model");
153        let specs = model
154            .resolve_index_specs(&config.index_specs)
155            .expect("specs");
156        (model, specs)
157    }
158
159    fn zorder_test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
160        let config = KvTableConfig::new(
161            0,
162            vec![
163                TableColumnConfig::new("x", DataType::Int64, false),
164                TableColumnConfig::new("y", DataType::Int64, false),
165                TableColumnConfig::new("id", DataType::Int64, false),
166                TableColumnConfig::new("value", DataType::Int64, false),
167            ],
168            vec!["id".to_string()],
169            vec![
170                IndexSpec::new("xy_lex", vec!["x".to_string(), "y".to_string()])
171                    .expect("valid")
172                    .with_cover_columns(vec!["value".to_string()]),
173                IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
174                    .expect("valid")
175                    .with_cover_columns(vec!["value".to_string()]),
176            ],
177        )
178        .expect("valid config");
179        let model = TableModel::from_config(&config).expect("model");
180        let specs = model
181            .resolve_index_specs(&config.index_specs)
182            .expect("specs");
183        (model, specs)
184    }
185
186    #[derive(Clone)]
187    struct MockState {
188        kv: Arc<Mutex<BTreeMap<Key, Bytes>>>,
189        range_calls: Arc<AtomicUsize>,
190        range_reduce_calls: Arc<AtomicUsize>,
191        sequence_number: Arc<AtomicU64>,
192    }
193
194    #[derive(Debug)]
195    struct MockGroupedReduceState {
196        group_values: Vec<Option<KvReducedValue>>,
197        states: Vec<PartialAggregateState>,
198    }
199
200    type MockReduceRow = (Vec<Option<KvReducedValue>>, Vec<Option<KvReducedValue>>);
201
202    fn extract_mock_reduce_row(
203        key: &Key,
204        value: &Bytes,
205        request: &RangeReduceRequest,
206    ) -> Option<MockReduceRow> {
207        let needs_value = request
208            .group_by
209            .iter()
210            .chain(
211                request
212                    .reducers
213                    .iter()
214                    .filter_map(|reducer| reducer.expr.as_ref()),
215            )
216            .any(expr_needs_value)
217            || request
218                .filter
219                .as_ref()
220                .is_some_and(exoware_sdk_rs::kv_codec::predicate_needs_value);
221        let archived = if needs_value {
222            decode_stored_row(value.as_ref()).ok()
223        } else {
224            None
225        };
226
227        if let Some(filter) = &request.filter {
228            if !eval_predicate(key, archived.as_ref(), filter).ok()? {
229                return None;
230            }
231        }
232
233        let mut group_values = Vec::with_capacity(request.group_by.len());
234        for expr in &request.group_by {
235            let extracted_value = eval_expr(key, archived.as_ref(), expr).ok()?;
236            group_values.push(extracted_value);
237        }
238        canonicalize_reduced_group_values(&mut group_values);
239
240        let mut reducer_values = Vec::with_capacity(request.reducers.len());
241        for reducer in &request.reducers {
242            let extracted_value = match (&reducer.expr, archived.as_ref()) {
243                (None, _) => None,
244                (Some(expr), _) => eval_expr(key, archived.as_ref(), expr).ok()?,
245            };
246            reducer_values.push(extracted_value);
247        }
248
249        Some((group_values, reducer_values))
250    }
251
252    #[allow(clippy::result_large_err)]
253    fn ensure_min_sequence_number(
254        token: &Arc<AtomicU64>,
255        required: Option<u64>,
256    ) -> Result<(), ConnectError> {
257        let current = token.load(AtomicOrdering::Relaxed);
258        if let Some(required) = required {
259            if current < required {
260                return Err(ConnectError::aborted(format!(
261                    "consistency_not_ready: required={required}, current={current}"
262                )));
263            }
264        }
265        Ok(())
266    }
267
268    fn proto_range_entries_frame(results: Vec<(Key, Vec<u8>)>) -> ProtoRangeFrame {
269        ProtoRangeFrame {
270            results: results
271                .into_iter()
272                .map(|(key, value)| ProtoRangeEntry {
273                    key: key.to_vec(),
274                    value,
275                    ..Default::default()
276                })
277                .collect(),
278            ..Default::default()
279        }
280    }
281
282    fn query_detail_trailer_ctx(sequence_number: u64) -> Context {
283        let detail = exoware_sdk_rs::store::query::v1::Detail {
284            sequence_number,
285            read_stats: Default::default(),
286            ..Default::default()
287        };
288        exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail)
289    }
290
291    #[derive(Clone)]
292    struct MockIngestConnect {
293        state: MockState,
294    }
295
296    impl IngestService for MockIngestConnect {
297        async fn put(
298            &self,
299            ctx: Context,
300            request: buffa::view::OwnedView<
301                exoware_sdk_rs::store::ingest::v1::PutRequestView<'static>,
302            >,
303        ) -> Result<(ProtoPutResponse, Context), ConnectError> {
304            let mut parsed = Vec::<(Key, Bytes)>::new();
305            for kv in request.kvs.iter() {
306                parsed.push((kv.key.to_vec().into(), Bytes::copy_from_slice(kv.value)));
307            }
308            let mut guard = self.state.kv.lock().expect("kv mutex poisoned");
309            for (key, value) in parsed.iter() {
310                guard.insert(key.clone(), value.clone());
311            }
312            let seq = self
313                .state
314                .sequence_number
315                .fetch_add(1, AtomicOrdering::SeqCst)
316                + 1;
317            Ok((
318                ProtoPutResponse {
319                    sequence_number: seq,
320                    ..Default::default()
321                },
322                ctx,
323            ))
324        }
325    }
326
327    #[derive(Clone)]
328    struct MockQueryConnect {
329        state: MockState,
330    }
331
332    impl QueryService for MockQueryConnect {
333        async fn get(
334            &self,
335            _ctx: Context,
336            request: buffa::view::OwnedView<
337                exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
338            >,
339        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
340            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
341            let key: Key = request.key.to_vec().into();
342            let guard = self.state.kv.lock().expect("kv mutex poisoned");
343            let value = guard.get(&key).cloned();
344            let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
345            let detail = exoware_sdk_rs::store::query::v1::Detail {
346                sequence_number: token,
347                read_stats: Default::default(),
348                ..Default::default()
349            };
350            Ok((
351                ProtoGetResponse {
352                    value: value.map(|v| v.to_vec()),
353                    ..Default::default()
354                },
355                exoware_sdk_rs::with_query_detail_response_header(Context::default(), &detail),
356            ))
357        }
358
359        async fn range(
360            &self,
361            _ctx: Context,
362            request: buffa::view::OwnedView<
363                exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
364            >,
365        ) -> Result<
366            (
367                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
368                Context,
369            ),
370            ConnectError,
371        > {
372            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
373            self.state.range_calls.fetch_add(1, AtomicOrdering::SeqCst);
374
375            let start_key: Key = request.start.to_vec().into();
376            let end_key: Key = request.end.to_vec().into();
377            let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
378            let batch_size = usize::try_from(request.batch_size).unwrap_or(usize::MAX);
379            if batch_size == 0 {
380                return Err(ConnectError::invalid_argument(
381                    "invalid batch_size: expected positive integer",
382                ));
383            }
384
385            let mode = match parse_range_traversal_direction(request.mode) {
386                Ok(RangeTraversalDirection::Forward) => RangeMode::Forward,
387                Ok(RangeTraversalDirection::Reverse) => RangeMode::Reverse,
388                Err(RangeTraversalModeError::UnknownWireValue(v)) => {
389                    return Err(ConnectError::invalid_argument(format!(
390                        "unknown TraversalMode enum value {v}"
391                    )));
392                }
393            };
394
395            let state = self.state.clone();
396            let guard = state.kv.lock().expect("kv mutex poisoned");
397            // Match `StoreEngine::range_scan`: inclusive [start, end]; empty end = unbounded.
398            let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
399                Included(&start_key),
400                if end_key.is_empty() {
401                    Unbounded
402                } else {
403                    Included(&end_key)
404                },
405            );
406            let range_iter = guard.range::<Key, _>(range);
407            let iter: Box<dyn Iterator<Item = (&Key, &Bytes)> + Send> = match mode {
408                RangeMode::Forward => Box::new(range_iter),
409                RangeMode::Reverse => Box::new(range_iter.rev()),
410            };
411            let mut results: Vec<ProtoRangeEntry> = Vec::new();
412            for (key, value) in iter.take(limit) {
413                results.push(ProtoRangeEntry {
414                    key: key.to_vec(),
415                    value: value.to_vec(),
416                    ..Default::default()
417                });
418            }
419            drop(guard);
420            let token = state.sequence_number.load(AtomicOrdering::Relaxed);
421            let batch = batch_size.max(1);
422            let mut frames: Vec<Result<ProtoRangeFrame, ConnectError>> = Vec::new();
423            for chunk in results.chunks(batch) {
424                frames.push(Ok(ProtoRangeFrame {
425                    results: chunk.to_vec(),
426                    ..Default::default()
427                }));
428            }
429            let detail = exoware_sdk_rs::store::query::v1::Detail {
430                sequence_number: token,
431                read_stats: Default::default(),
432                ..Default::default()
433            };
434            Ok((
435                Box::pin(stream::iter(frames)),
436                exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail),
437            ))
438        }
439
440        async fn get_many(
441            &self,
442            _ctx: Context,
443            request: buffa::view::OwnedView<
444                exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
445            >,
446        ) -> Result<
447            (
448                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
449                Context,
450            ),
451            ConnectError,
452        > {
453            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
454            let batch_size = usize::try_from(request.batch_size)
455                .unwrap_or(usize::MAX)
456                .max(1);
457            let guard = self.state.kv.lock().expect("kv mutex poisoned");
458            let mut entries: Vec<ProtoGetManyEntry> = Vec::new();
459            for key_bytes in request.keys.iter() {
460                let key: Key = key_bytes.to_vec().into();
461                let value = guard.get(&key).cloned();
462                entries.push(ProtoGetManyEntry {
463                    key: key.to_vec(),
464                    value: value.map(|v| v.to_vec()),
465                    ..Default::default()
466                });
467            }
468            drop(guard);
469            let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
470            let mut frames: Vec<Result<ProtoGetManyFrame, ConnectError>> = Vec::new();
471            for chunk in entries.chunks(batch_size) {
472                frames.push(Ok(ProtoGetManyFrame {
473                    results: chunk.to_vec(),
474                    ..Default::default()
475                }));
476            }
477            let detail = exoware_sdk_rs::store::query::v1::Detail {
478                sequence_number: token,
479                read_stats: Default::default(),
480                ..Default::default()
481            };
482            Ok((
483                Box::pin(stream::iter(frames)),
484                exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail),
485            ))
486        }
487
488        async fn reduce(
489            &self,
490            _ctx: Context,
491            request: buffa::view::OwnedView<
492                exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
493            >,
494        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
495            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
496            self.state
497                .range_reduce_calls
498                .fetch_add(1, AtomicOrdering::SeqCst);
499            let owned = request.to_owned_message();
500            let start_key: Key = owned.start.clone().into();
501            let end_key: Key = owned.end.clone().into();
502            let reduce_req = owned
503                .params
504                .as_option()
505                .ok_or_else(|| ConnectError::invalid_argument("missing range reduce params"))?;
506            let domain_request =
507                to_domain_reduce_request(reduce_req).map_err(ConnectError::invalid_argument)?;
508
509            let state = self.state.clone();
510            let guard = state.kv.lock().expect("kv mutex poisoned");
511            let mut states = domain_request.group_by.is_empty().then(|| {
512                domain_request
513                    .reducers
514                    .iter()
515                    .map(|reducer| PartialAggregateState::from_op(reducer.op))
516                    .collect::<Vec<_>>()
517            });
518            let mut grouped = BTreeMap::<Vec<u8>, MockGroupedReduceState>::new();
519
520            let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
521                Included(&start_key),
522                if end_key.is_empty() {
523                    Unbounded
524                } else {
525                    Included(&end_key)
526                },
527            );
528            for (key, value) in guard.range::<Key, _>(range) {
529                let Some((group_values, reducer_values)) =
530                    extract_mock_reduce_row(key, value, &domain_request)
531                else {
532                    continue;
533                };
534                if domain_request.group_by.is_empty() {
535                    let states = states.as_mut().expect("scalar states");
536                    for ((state, reducer), value) in states
537                        .iter_mut()
538                        .zip(domain_request.reducers.iter())
539                        .zip(reducer_values.into_iter())
540                    {
541                        match reducer.op {
542                            RangeReduceOp::CountAll => state
543                                .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
544                                .map_err(|e| ConnectError::internal(e.to_string()))?,
545                            RangeReduceOp::CountField => {
546                                let partial =
547                                    KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
548                                state
549                                    .merge_partial(reducer.op, Some(&partial))
550                                    .map_err(|e| ConnectError::internal(e.to_string()))?
551                            }
552                            _ => state
553                                .merge_partial(reducer.op, value.as_ref())
554                                .map_err(|e| ConnectError::internal(e.to_string()))?,
555                        }
556                    }
557                } else {
558                    let group_key = encode_reduced_group_key(&group_values);
559                    let group =
560                        grouped
561                            .entry(group_key)
562                            .or_insert_with(|| MockGroupedReduceState {
563                                group_values: group_values.clone(),
564                                states: domain_request
565                                    .reducers
566                                    .iter()
567                                    .map(|reducer| PartialAggregateState::from_op(reducer.op))
568                                    .collect(),
569                            });
570                    for ((state, reducer), value) in group
571                        .states
572                        .iter_mut()
573                        .zip(domain_request.reducers.iter())
574                        .zip(reducer_values.into_iter())
575                    {
576                        match reducer.op {
577                            RangeReduceOp::CountAll => state
578                                .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
579                                .map_err(|e| ConnectError::internal(e.to_string()))?,
580                            RangeReduceOp::CountField => {
581                                let partial =
582                                    KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
583                                state
584                                    .merge_partial(reducer.op, Some(&partial))
585                                    .map_err(|e| ConnectError::internal(e.to_string()))?
586                            }
587                            _ => state
588                                .merge_partial(reducer.op, value.as_ref())
589                                .map_err(|e| ConnectError::internal(e.to_string()))?,
590                        }
591                    }
592                }
593            }
594
595            let response = if let Some(states) = states {
596                RangeReduceResponse {
597                    results: states
598                        .iter()
599                        .map(|state| RangeReduceResult {
600                            value: match state {
601                                PartialAggregateState::Count(count) => {
602                                    Some(KvReducedValue::UInt64(*count))
603                                }
604                                PartialAggregateState::Sum(value)
605                                | PartialAggregateState::Min(value)
606                                | PartialAggregateState::Max(value) => value.clone(),
607                            },
608                        })
609                        .collect(),
610                    groups: Vec::new(),
611                }
612            } else {
613                RangeReduceResponse {
614                    results: Vec::new(),
615                    groups: grouped
616                        .into_values()
617                        .map(|group| RangeReduceGroup {
618                            group_values: group.group_values,
619                            results: group
620                                .states
621                                .into_iter()
622                                .map(|state| RangeReduceResult {
623                                    value: match state {
624                                        PartialAggregateState::Count(count) => {
625                                            Some(KvReducedValue::UInt64(count))
626                                        }
627                                        PartialAggregateState::Sum(value)
628                                        | PartialAggregateState::Min(value)
629                                        | PartialAggregateState::Max(value) => value,
630                                    },
631                                })
632                                .collect(),
633                        })
634                        .collect(),
635                }
636            };
637            drop(guard);
638            let token = state.sequence_number.load(AtomicOrdering::Relaxed);
639            let detail = exoware_sdk_rs::store::query::v1::Detail {
640                sequence_number: token,
641                read_stats: Default::default(),
642                ..Default::default()
643            };
644            Ok((
645                ProtoReduceResponse {
646                    results: response
647                        .results
648                        .into_iter()
649                        .map(
650                            |result| exoware_sdk_rs::store::query::v1::RangeReduceResult {
651                                value: result.value.map(to_proto_reduced_value).into(),
652                                ..Default::default()
653                            },
654                        )
655                        .collect(),
656                    groups: response
657                        .groups
658                        .into_iter()
659                        .map(|group| {
660                            let group_values_present: Vec<bool> =
661                                group.group_values.iter().map(|v| v.is_some()).collect();
662                            exoware_sdk_rs::store::query::v1::RangeReduceGroup {
663                                group_values: group
664                                    .group_values
665                                    .into_iter()
666                                    .map(to_proto_optional_reduced_value)
667                                    .collect(),
668                                group_values_present,
669                                results: group
670                                    .results
671                                    .into_iter()
672                                    .map(|result| {
673                                        exoware_sdk_rs::store::query::v1::RangeReduceResult {
674                                            value: result.value.map(to_proto_reduced_value).into(),
675                                            ..Default::default()
676                                        }
677                                    })
678                                    .collect(),
679                                ..Default::default()
680                            }
681                        })
682                        .collect(),
683                    ..Default::default()
684                },
685                exoware_sdk_rs::with_query_detail_response_header(Context::default(), &detail),
686            ))
687        }
688    }
689
690    async fn spawn_mock_server(state: MockState) -> (String, oneshot::Sender<()>) {
691        let connect = ConnectRpcService::new(Chain(
692            IngestServiceServer::new(MockIngestConnect {
693                state: state.clone(),
694            }),
695            QueryServiceServer::new(MockQueryConnect { state }),
696        ))
697        .with_compression(connect_compression_registry());
698        let app = Router::new().fallback_service(connect);
699
700        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
701            .await
702            .expect("bind mock server");
703        let addr = listener.local_addr().expect("local addr");
704        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
705        tokio::spawn(async move {
706            axum::serve(listener, app)
707                .with_graceful_shutdown(async move {
708                    let _ = shutdown_rx.await;
709                })
710                .await
711                .expect("mock server should run");
712        });
713        (format!("http://{addr}"), shutdown_tx)
714    }
715
716    fn assert_count_scalar(batch: &RecordBatch, col_idx: usize, row_idx: usize, expected: u64) {
717        let scalar = ScalarValue::try_from_array(batch.column(col_idx), row_idx)
718            .expect("count scalar should decode");
719        match scalar {
720            ScalarValue::UInt64(Some(value)) => assert_eq!(value, expected),
721            ScalarValue::Int64(Some(value)) => assert_eq!(value, expected as i64),
722            other => panic!("unexpected count scalar: {other:?}"),
723        }
724    }
725
726    async fn explain_plan_rows(ctx: &SessionContext, sql: &str) -> Vec<(String, String)> {
727        let batches = ctx
728            .sql(&format!("EXPLAIN {sql}"))
729            .await
730            .expect("explain query")
731            .collect()
732            .await
733            .expect("explain collect");
734        let mut rows = Vec::new();
735        for batch in batches {
736            for row_idx in 0..batch.num_rows() {
737                let plan_type = scalar_to_string(
738                    &ScalarValue::try_from_array(batch.column(0), row_idx).expect("plan type"),
739                )
740                .expect("plan type string");
741                let plan = scalar_to_string(
742                    &ScalarValue::try_from_array(batch.column(1), row_idx).expect("plan"),
743                )
744                .expect("plan string");
745                rows.push((plan_type, plan));
746            }
747        }
748        rows
749    }
750
751    fn physical_plan_text(rows: &[(String, String)]) -> String {
752        rows.iter()
753            .filter(|(plan_type, _)| plan_type.contains("physical_plan"))
754            .map(|(_, plan)| plan.as_str())
755            .collect::<Vec<_>>()
756            .join("\n")
757    }
758
759    #[tokio::test]
760    async fn explain_reports_full_scan_like_primary_key_scan() {
761        let state = MockState {
762            kv: Arc::new(Mutex::new(BTreeMap::new())),
763            range_calls: Arc::new(AtomicUsize::new(0)),
764            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
765            sequence_number: Arc::new(AtomicU64::new(0)),
766        };
767        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
768        let client = StoreClient::new(&base_url);
769
770        let schema = KvSchema::new(client)
771            .table(
772                "orders",
773                vec![
774                    TableColumnConfig::new("id", DataType::Int64, false),
775                    TableColumnConfig::new("status", DataType::Utf8, false),
776                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
777                ],
778                vec!["id".to_string()],
779                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
780                    .expect("valid")
781                    .with_cover_columns(vec!["amount_cents".to_string()])],
782            )
783            .expect("schema");
784        let ctx = SessionContext::new();
785        schema.register_all(&ctx).expect("register");
786
787        let explain =
788            physical_plan_text(&explain_plan_rows(&ctx, "SELECT id, status FROM orders").await);
789        assert!(explain.contains("KvScanExec:"));
790        assert!(explain.contains("mode=primary_key"));
791        assert!(explain.contains("predicate=<none>"));
792        assert!(explain.contains("row_recheck=false"));
793        assert!(explain.contains("full_scan_like=true"));
794        assert_explain_includes_query_stats_surface(
795            &explain,
796            QueryStatsExplainSurface::StreamedRangeTrailer,
797        );
798
799        let _ = shutdown_tx.send(());
800    }
801
802    #[tokio::test]
803    async fn explain_reports_secondary_index_scan_and_row_recheck() {
804        let state = MockState {
805            kv: Arc::new(Mutex::new(BTreeMap::new())),
806            range_calls: Arc::new(AtomicUsize::new(0)),
807            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
808            sequence_number: Arc::new(AtomicU64::new(0)),
809        };
810        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
811        let client = StoreClient::new(&base_url);
812
813        let schema = KvSchema::new(client)
814            .table(
815                "orders",
816                vec![
817                    TableColumnConfig::new("id", DataType::Int64, false),
818                    TableColumnConfig::new("status", DataType::Utf8, false),
819                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
820                ],
821                vec!["id".to_string()],
822                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
823                    .expect("valid")
824                    .with_cover_columns(vec!["amount_cents".to_string()])],
825            )
826            .expect("schema");
827        let ctx = SessionContext::new();
828        schema.register_all(&ctx).expect("register");
829
830        let explain = physical_plan_text(
831            &explain_plan_rows(
832                &ctx,
833                "SELECT id, status, amount_cents FROM orders \
834                 WHERE status = 'open' AND amount_cents >= 5",
835            )
836            .await,
837        );
838        assert!(explain.contains("KvScanExec:"));
839        assert!(explain.contains("mode=secondary_index(status_idx, lexicographic)"));
840        assert!(explain.contains("predicate=status = 'open' AND amount_cents >= 5"));
841        assert!(explain.contains("exact=false"));
842        assert!(explain.contains("row_recheck=true"));
843        assert!(explain.contains("full_scan_like=false"));
844
845        let _ = shutdown_tx.send(());
846    }
847
848    #[tokio::test]
849    async fn explain_reports_zorder_secondary_index_scan() {
850        let state = MockState {
851            kv: Arc::new(Mutex::new(BTreeMap::new())),
852            range_calls: Arc::new(AtomicUsize::new(0)),
853            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
854            sequence_number: Arc::new(AtomicU64::new(0)),
855        };
856        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
857        let client = StoreClient::new(&base_url);
858
859        let schema = KvSchema::new(client)
860            .table(
861                "points",
862                vec![
863                    TableColumnConfig::new("x", DataType::Int64, false),
864                    TableColumnConfig::new("y", DataType::Int64, false),
865                    TableColumnConfig::new("id", DataType::Int64, false),
866                    TableColumnConfig::new("value", DataType::Int64, false),
867                ],
868                vec!["id".to_string()],
869                vec![
870                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
871                        .expect("valid")
872                        .with_cover_columns(vec!["value".to_string()]),
873                ],
874            )
875            .expect("schema");
876        let ctx = SessionContext::new();
877        schema.register_all(&ctx).expect("register");
878
879        let explain = physical_plan_text(
880            &explain_plan_rows(
881                &ctx,
882                "SELECT id, value FROM points \
883                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
884            )
885            .await,
886        );
887        assert!(explain.contains("KvScanExec:"));
888        assert!(explain.contains("mode=secondary_index(xy_z, z_order)"));
889        assert!(explain.contains("exact=false"));
890        assert!(explain.contains("row_recheck=true"));
891
892        let _ = shutdown_tx.send(());
893    }
894
895    #[tokio::test]
896    async fn explain_reports_aggregate_pushdown_access_path_details() {
897        let state = MockState {
898            kv: Arc::new(Mutex::new(BTreeMap::new())),
899            range_calls: Arc::new(AtomicUsize::new(0)),
900            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
901            sequence_number: Arc::new(AtomicU64::new(0)),
902        };
903        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
904        let client = StoreClient::new(&base_url);
905
906        let schema = KvSchema::new(client)
907            .table(
908                "orders",
909                vec![
910                    TableColumnConfig::new("id", DataType::Int64, false),
911                    TableColumnConfig::new("status", DataType::Utf8, false),
912                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
913                ],
914                vec!["id".to_string()],
915                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
916                    .expect("valid")
917                    .with_cover_columns(vec!["amount_cents".to_string()])],
918            )
919            .expect("schema");
920        let ctx = SessionContext::new();
921        schema.register_all(&ctx).expect("register");
922
923        let explain = physical_plan_text(
924            &explain_plan_rows(
925                &ctx,
926                "SELECT status, SUM(amount_cents) AS total_cents \
927                 FROM orders WHERE status = 'open' GROUP BY status",
928            )
929            .await,
930        );
931        assert!(explain.contains("KvAggregateExec:"));
932        assert!(explain.contains("grouped=true"));
933        assert!(explain.contains("job0{mode=secondary_index(status_idx, lexicographic)"));
934        assert!(explain.contains("predicate=status = 'open'"));
935        assert!(explain.contains("exact=true"));
936        assert!(explain.contains("row_recheck=false"));
937        assert_explain_includes_query_stats_surface(
938            &explain,
939            QueryStatsExplainSurface::RangeReduceHeader,
940        );
941
942        let _ = shutdown_tx.send(());
943    }
944
945    #[test]
946    fn index_spec_constructor_sets_name_and_keys() {
947        let spec = IndexSpec::new(
948            "status_customer",
949            vec!["status".to_string(), "customer_id".to_string()],
950        )
951        .expect("valid index spec");
952        assert_eq!(spec.name(), "status_customer");
953        assert_eq!(spec.key_columns(), &["status", "customer_id"]);
954        assert!(spec.cover_columns().is_empty());
955    }
956
957    #[test]
958    fn index_spec_cover_columns_are_configurable_in_code() {
959        let spec = IndexSpec::new("status_customer", vec!["status".to_string()])
960            .expect("valid")
961            .with_cover_columns(vec!["amount_cents".to_string()]);
962        assert_eq!(spec.key_columns(), &["status"]);
963        assert_eq!(spec.cover_columns(), &["amount_cents"]);
964    }
965
966    #[test]
967    fn describe_in_list_places_truncation_ellipsis_inside_parentheses() {
968        let rendered = describe_in_list((1..=6).map(|v| v.to_string()));
969        assert_eq!(rendered, "IN (1, 2, 3, 4, 5, ...)");
970    }
971
972    #[test]
973    fn normalize_sum_case_then_one_uses_countall_optimization() {
974        let (model, _) = test_model();
975        let argument = normalize_case_then_expr(
976            AggregatePushdownFunction::Sum,
977            &Expr::Literal(ScalarValue::Int64(Some(1)), None),
978            &model,
979        )
980        .expect("normalize");
981        assert_eq!(argument, AggregatePushdownArgument::CountAll);
982    }
983
984    #[test]
985    fn normalize_count_case_then_literal_uses_countall_optimization() {
986        use datafusion::logical_expr::col;
987
988        let (model, _) = test_model();
989        let case_expr = Expr::Case(datafusion::logical_expr::expr::Case {
990            expr: None,
991            when_then_expr: vec![(
992                Box::new(col("status").eq(Expr::Literal(
993                    ScalarValue::Utf8(Some("open".to_string())),
994                    None,
995                ))),
996                Box::new(Expr::Literal(
997                    ScalarValue::Utf8(Some("yes".to_string())),
998                    None,
999                )),
1000            )],
1001            else_expr: Some(Box::new(Expr::Literal(ScalarValue::Utf8(None), None))),
1002        });
1003
1004        let (func, argument, filter) =
1005            normalize_count_aggregate_argument(&case_expr, &model).expect("normalize");
1006        assert_eq!(func, AggregatePushdownFunction::Count);
1007        assert_eq!(argument, AggregatePushdownArgument::CountAll);
1008        assert!(filter.is_some());
1009    }
1010
1011    #[test]
1012    fn reduced_value_to_scalar_preserves_timestamp_timezone_label() {
1013        let tz: Arc<str> = Arc::from("America/New_York");
1014        let scalar = reduced_value_to_scalar(
1015            Some(KvReducedValue::Timestamp(1_700_000_000_000_000)),
1016            &DataType::Timestamp(TimeUnit::Microsecond, Some(tz.clone())),
1017        )
1018        .expect("timestamp scalar");
1019        assert_eq!(
1020            scalar,
1021            ScalarValue::TimestampMicrosecond(Some(1_700_000_000_000_000), Some(tz))
1022        );
1023    }
1024
1025    #[test]
1026    fn index_spec_cover_pk_column_is_rejected() {
1027        let config = KvTableConfig::new(
1028            0,
1029            vec![
1030                TableColumnConfig::new("id", DataType::Int64, false),
1031                TableColumnConfig::new("status", DataType::Utf8, false),
1032            ],
1033            vec!["id".to_string()],
1034            vec![IndexSpec::new("status_idx", vec!["status".to_string()])
1035                .expect("valid")
1036                .with_cover_columns(vec!["id".to_string()])],
1037        )
1038        .expect("valid config");
1039        let model = TableModel::from_config(&config).expect("model");
1040        let err = model
1041            .resolve_index_specs(&config.index_specs)
1042            .expect_err("covering a PK column must be rejected");
1043        assert!(err.contains("primary key column"));
1044    }
1045
1046    #[test]
1047    fn access_plan_requires_cover_columns_for_index_scan() {
1048        let (model, _) = test_model();
1049        let predicate = QueryPredicate::default();
1050        let projection = Some(vec![
1051            *model.columns_by_name.get("order_id").unwrap(),
1052            *model.columns_by_name.get("amount_cents").unwrap(),
1053        ]);
1054        let plan = ScanAccessPlan::new(&model, &projection, &predicate);
1055
1056        let no_cover = IndexSpec::new("status_idx", vec!["status".to_string()]).unwrap();
1057        let with_cover = IndexSpec::new("status_idx", vec!["status".to_string()])
1058            .unwrap()
1059            .with_cover_columns(vec!["amount_cents".to_string()]);
1060        let no_cover_resolved = model.resolve_index_specs(&[no_cover]).unwrap();
1061        let with_cover_resolved = model.resolve_index_specs(&[with_cover]).unwrap();
1062
1063        assert!(!plan.index_covers_required_non_pk(&no_cover_resolved[0]));
1064        assert!(plan.index_covers_required_non_pk(&with_cover_resolved[0]));
1065    }
1066
1067    #[test]
1068    fn choose_index_plan_prefers_longer_prefix() {
1069        let (model, specs) = test_model();
1070        let region_idx = *model.columns_by_name.get("region").unwrap();
1071        let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1072        let mut predicate = QueryPredicate::default();
1073        predicate.constraints.insert(
1074            region_idx,
1075            PredicateConstraint::StringEq("us-east".to_string()),
1076        );
1077        predicate.constraints.insert(
1078            customer_idx,
1079            PredicateConstraint::IntRange {
1080                min: Some(10),
1081                max: Some(20),
1082            },
1083        );
1084        let plan = predicate
1085            .choose_index_plan(&model, &specs)
1086            .expect("plan")
1087            .expect("exists");
1088        assert_eq!(plan.spec_idx, 0);
1089        assert_eq!(plan.constrained_prefix_len, 2);
1090    }
1091
1092    #[test]
1093    fn choose_index_plan_prefers_covering_index_when_prefix_strength_ties() {
1094        let config = KvTableConfig::new(
1095            0,
1096            vec![
1097                TableColumnConfig::new("id", DataType::Int64, false),
1098                TableColumnConfig::new("status", DataType::Utf8, false),
1099                TableColumnConfig::new("amount_cents", DataType::Int64, false),
1100            ],
1101            vec!["id".to_string()],
1102            vec![
1103                IndexSpec::new("status_plain", vec!["status".to_string()]).expect("valid"),
1104                IndexSpec::new("status_covering", vec!["status".to_string()])
1105                    .expect("valid")
1106                    .with_cover_columns(vec!["amount_cents".to_string()]),
1107            ],
1108        )
1109        .expect("config");
1110        let model = TableModel::from_config(&config).expect("model");
1111        let specs = model
1112            .resolve_index_specs(&config.index_specs)
1113            .expect("specs");
1114        let status_idx = *model.columns_by_name.get("status").unwrap();
1115        let amount_idx = *model.columns_by_name.get("amount_cents").unwrap();
1116        let mut predicate = QueryPredicate::default();
1117        predicate.constraints.insert(
1118            status_idx,
1119            PredicateConstraint::StringEq("open".to_string()),
1120        );
1121        predicate.constraints.insert(
1122            amount_idx,
1123            PredicateConstraint::IntRange {
1124                min: Some(10),
1125                max: None,
1126            },
1127        );
1128
1129        let plan = predicate
1130            .choose_index_plan(&model, &specs)
1131            .expect("plan")
1132            .expect("exists");
1133        assert_eq!(specs[plan.spec_idx].name, "status_covering");
1134    }
1135
1136    #[test]
1137    fn choose_index_plan_prefers_zorder_for_multi_column_box_constraints() {
1138        let (model, specs) = zorder_test_model();
1139        let x_idx = *model.columns_by_name.get("x").unwrap();
1140        let y_idx = *model.columns_by_name.get("y").unwrap();
1141        let mut predicate = QueryPredicate::default();
1142        predicate.constraints.insert(
1143            x_idx,
1144            PredicateConstraint::IntRange {
1145                min: Some(1),
1146                max: Some(2),
1147            },
1148        );
1149        predicate.constraints.insert(
1150            y_idx,
1151            PredicateConstraint::IntRange {
1152                min: Some(1),
1153                max: Some(2),
1154            },
1155        );
1156
1157        let plan = predicate
1158            .choose_index_plan(&model, &specs)
1159            .expect("plan")
1160            .expect("exists");
1161        assert_eq!(specs[plan.spec_idx].name, "xy_z");
1162        assert_eq!(specs[plan.spec_idx].layout, IndexLayout::ZOrder);
1163        assert_eq!(plan.constrained_column_count, 2);
1164    }
1165
1166    #[test]
1167    fn secondary_index_key_round_trip() {
1168        let (model, specs) = test_model();
1169        let row = KvRow {
1170            values: vec![
1171                CellValue::Utf8("us-east".to_string()),
1172                CellValue::Int64(42),
1173                CellValue::Int64(9001),
1174                CellValue::Int64(1500),
1175                CellValue::Utf8("open".to_string()),
1176            ],
1177        };
1178        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1179            .expect("encode");
1180        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1181            .expect("decode");
1182        let region_idx = *model.columns_by_name.get("region").unwrap();
1183        let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1184        assert!(matches!(
1185            decoded.values.get(&region_idx),
1186            Some(CellValue::Utf8(v)) if v == "us-east"
1187        ));
1188        assert!(matches!(
1189            decoded.values.get(&customer_idx),
1190            Some(CellValue::Int64(v)) if *v == 42
1191        ));
1192        assert!(matches!(
1193            &decoded.primary_key_values[0],
1194            CellValue::Int64(9001)
1195        ));
1196        let expected_pk = encode_primary_key_from_row(model.table_prefix, &row, &model)
1197            .expect("primary key should encode");
1198        assert_eq!(decoded.primary_key, expected_pk);
1199    }
1200
1201    #[test]
1202    fn zorder_secondary_index_key_round_trip() {
1203        let (model, specs) = zorder_test_model();
1204        let row = KvRow {
1205            values: vec![
1206                CellValue::Int64(2),
1207                CellValue::Int64(1),
1208                CellValue::Int64(42),
1209                CellValue::Int64(900),
1210            ],
1211        };
1212        let key = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1213            .expect("encode");
1214        let decoded = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key)
1215            .expect("decode");
1216        let x_idx = *model.columns_by_name.get("x").unwrap();
1217        let y_idx = *model.columns_by_name.get("y").unwrap();
1218        assert!(matches!(
1219            decoded.values.get(&x_idx),
1220            Some(CellValue::Int64(v)) if *v == 2
1221        ));
1222        assert!(matches!(
1223            decoded.values.get(&y_idx),
1224            Some(CellValue::Int64(v)) if *v == 1
1225        ));
1226        assert!(matches!(
1227            &decoded.primary_key_values[0],
1228            CellValue::Int64(42)
1229        ));
1230    }
1231
1232    #[test]
1233    fn table_config_supports_non_orders_schema() {
1234        let config = KvTableConfig::new(
1235            0,
1236            vec![
1237                TableColumnConfig::new("tenant", DataType::Utf8, false),
1238                TableColumnConfig::new("id", DataType::Int64, false),
1239                TableColumnConfig::new("score", DataType::Int64, false),
1240            ],
1241            vec!["id".to_string()],
1242            vec![IndexSpec::new(
1243                "tenant_score",
1244                vec!["tenant".to_string(), "score".to_string()],
1245            )
1246            .expect("valid")],
1247        )
1248        .expect("schema agnostic config should be valid");
1249        assert_eq!(config.primary_key_columns, vec!["id".to_string()]);
1250        assert_eq!(config.columns.len(), 3);
1251    }
1252
1253    #[test]
1254    fn table_config_accepts_float64_column() {
1255        let config = KvTableConfig::new(
1256            0,
1257            vec![
1258                TableColumnConfig::new("id", DataType::Int64, false),
1259                TableColumnConfig::new("price", DataType::Float64, false),
1260            ],
1261            vec!["id".to_string()],
1262            vec![],
1263        )
1264        .expect("Float64 column should be accepted");
1265        assert_eq!(config.columns.len(), 2);
1266    }
1267
1268    #[test]
1269    fn table_config_accepts_boolean_column() {
1270        let config = KvTableConfig::new(
1271            0,
1272            vec![
1273                TableColumnConfig::new("id", DataType::Int64, false),
1274                TableColumnConfig::new("active", DataType::Boolean, false),
1275            ],
1276            vec!["id".to_string()],
1277            vec![],
1278        )
1279        .expect("Boolean column should be accepted");
1280        assert_eq!(config.columns.len(), 2);
1281    }
1282
1283    #[test]
1284    fn build_projected_batch_uses_large_utf8_type() {
1285        let config = KvTableConfig::new(
1286            0,
1287            vec![
1288                TableColumnConfig::new("id", DataType::Int64, false),
1289                TableColumnConfig::new("name", DataType::LargeUtf8, false),
1290            ],
1291            vec!["id".to_string()],
1292            vec![],
1293        )
1294        .unwrap();
1295        let model = TableModel::from_config(&config).unwrap();
1296        let rows = vec![KvRow {
1297            values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1298        }];
1299        let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1300        assert_eq!(batch.column(1).data_type(), &DataType::LargeUtf8);
1301        let values = batch
1302            .column(1)
1303            .as_any()
1304            .downcast_ref::<LargeStringArray>()
1305            .expect("must build LargeStringArray");
1306        assert_eq!(values.value(0), "hello");
1307    }
1308
1309    #[test]
1310    fn build_projected_batch_uses_utf8_view_type() {
1311        let config = KvTableConfig::new(
1312            0,
1313            vec![
1314                TableColumnConfig::new("id", DataType::Int64, false),
1315                TableColumnConfig::new("name", DataType::Utf8View, false),
1316            ],
1317            vec!["id".to_string()],
1318            vec![],
1319        )
1320        .unwrap();
1321        let model = TableModel::from_config(&config).unwrap();
1322        let rows = vec![KvRow {
1323            values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1324        }];
1325        let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1326        assert_eq!(batch.column(1).data_type(), &DataType::Utf8View);
1327        let values = batch
1328            .column(1)
1329            .as_any()
1330            .downcast_ref::<StringViewArray>()
1331            .expect("must build StringViewArray");
1332        assert_eq!(values.value(0), "hello");
1333    }
1334
1335    #[test]
1336    fn f64_ordered_encoding_preserves_order() {
1337        let values = [
1338            f64::NEG_INFINITY,
1339            f64::MIN,
1340            -1000.0,
1341            -1.0,
1342            -0.001,
1343            0.0,
1344            0.001,
1345            1.0,
1346            1000.0,
1347            f64::MAX,
1348            f64::INFINITY,
1349        ];
1350        let encoded: Vec<[u8; 8]> = values.iter().map(|v| encode_f64_ordered(*v)).collect();
1351        for i in 0..encoded.len() - 1 {
1352            assert!(
1353                encoded[i] < encoded[i + 1],
1354                "encode_f64_ordered({}) >= encode_f64_ordered({})",
1355                values[i],
1356                values[i + 1]
1357            );
1358        }
1359    }
1360
1361    #[test]
1362    fn f64_ordered_encoding_round_trip() {
1363        let values = [
1364            f64::MIN,
1365            -42.5,
1366            -0.0,
1367            0.0,
1368            3.125,
1369            f64::MAX,
1370            f64::INFINITY,
1371            f64::NEG_INFINITY,
1372        ];
1373        for v in values {
1374            let encoded = encode_f64_ordered(v);
1375            let decoded = decode_f64_ordered(encoded);
1376            assert!(
1377                v.to_bits() == decoded.to_bits(),
1378                "round-trip failed for {v}: got {decoded}"
1379            );
1380        }
1381    }
1382
1383    fn mixed_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1384        let config = KvTableConfig::new(
1385            0,
1386            vec![
1387                TableColumnConfig::new("id", DataType::Int64, false),
1388                TableColumnConfig::new("label", DataType::Utf8, false),
1389                TableColumnConfig::new("score", DataType::Float64, false),
1390                TableColumnConfig::new("active", DataType::Boolean, false),
1391            ],
1392            vec!["id".to_string()],
1393            vec![
1394                IndexSpec::new(
1395                    "active_score",
1396                    vec!["active".to_string(), "score".to_string()],
1397                )
1398                .expect("valid"),
1399                IndexSpec::new("label_idx", vec!["label".to_string()]).expect("valid"),
1400            ],
1401        )
1402        .expect("valid config");
1403        let model = TableModel::from_config(&config).expect("model");
1404        let specs = model
1405            .resolve_index_specs(&config.index_specs)
1406            .expect("specs");
1407        (model, specs)
1408    }
1409
1410    #[test]
1411    fn secondary_index_key_round_trip_with_float64_and_boolean() {
1412        let (model, specs) = mixed_model();
1413        let row = KvRow {
1414            values: vec![
1415                CellValue::Int64(100),
1416                CellValue::Utf8("hello".to_string()),
1417                CellValue::Float64(3.125),
1418                CellValue::Boolean(true),
1419            ],
1420        };
1421        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1422            .expect("encode");
1423        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1424            .expect("decode");
1425        let active_idx = *model.columns_by_name.get("active").unwrap();
1426        let score_idx = *model.columns_by_name.get("score").unwrap();
1427        assert!(matches!(
1428            decoded.values.get(&active_idx),
1429            Some(CellValue::Boolean(true))
1430        ));
1431        assert!(
1432            matches!(decoded.values.get(&score_idx), Some(CellValue::Float64(v)) if (*v - 3.125).abs() < f64::EPSILON)
1433        );
1434        assert!(matches!(
1435            &decoded.primary_key_values[0],
1436            CellValue::Int64(100)
1437        ));
1438    }
1439
1440    #[test]
1441    fn base_row_round_trip_with_float64_and_boolean() {
1442        let (model, _specs) = mixed_model();
1443        let row = KvRow {
1444            values: vec![
1445                CellValue::Int64(42),
1446                CellValue::Utf8("world".to_string()),
1447                CellValue::Float64(-99.5),
1448                CellValue::Boolean(false),
1449            ],
1450        };
1451        let encoded = encode_base_row_value(&row, &model).expect("encode");
1452        let decoded =
1453            decode_base_row(vec![CellValue::Int64(42)], &encoded, &model).expect("decode");
1454        assert!(matches!(&decoded.values[0], CellValue::Int64(42)));
1455        assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "world"));
1456        assert!(
1457            matches!(&decoded.values[2], CellValue::Float64(v) if (*v - (-99.5)).abs() < f64::EPSILON)
1458        );
1459        assert!(matches!(&decoded.values[3], CellValue::Boolean(false)));
1460    }
1461
1462    #[test]
1463    fn predicate_bool_eq_matches() {
1464        let (model, _specs) = mixed_model();
1465        let active_idx = *model.columns_by_name.get("active").unwrap();
1466        let mut pred = QueryPredicate::default();
1467        pred.constraints
1468            .insert(active_idx, PredicateConstraint::BoolEq(true));
1469        let row_true = KvRow {
1470            values: vec![
1471                CellValue::Int64(1),
1472                CellValue::Utf8("a".to_string()),
1473                CellValue::Float64(1.0),
1474                CellValue::Boolean(true),
1475            ],
1476        };
1477        let row_false = KvRow {
1478            values: vec![
1479                CellValue::Int64(2),
1480                CellValue::Utf8("b".to_string()),
1481                CellValue::Float64(2.0),
1482                CellValue::Boolean(false),
1483            ],
1484        };
1485        assert!(pred.matches_row(&row_true));
1486        assert!(!pred.matches_row(&row_false));
1487    }
1488
1489    #[test]
1490    fn predicate_float_range_matches() {
1491        let (model, _specs) = mixed_model();
1492        let score_idx = *model.columns_by_name.get("score").unwrap();
1493        let mut pred = QueryPredicate::default();
1494        pred.constraints.insert(
1495            score_idx,
1496            PredicateConstraint::FloatRange {
1497                min: Some((2.0, true)),
1498                max: Some((5.0, false)),
1499            },
1500        );
1501        let make_row = |score: f64| KvRow {
1502            values: vec![
1503                CellValue::Int64(1),
1504                CellValue::Utf8("a".to_string()),
1505                CellValue::Float64(score),
1506                CellValue::Boolean(true),
1507            ],
1508        };
1509        assert!(!pred.matches_row(&make_row(1.99)));
1510        assert!(pred.matches_row(&make_row(2.0)));
1511        assert!(pred.matches_row(&make_row(3.5)));
1512        assert!(pred.matches_row(&make_row(4.99)));
1513        assert!(!pred.matches_row(&make_row(5.0)));
1514        assert!(!pred.matches_row(&make_row(5.01)));
1515    }
1516
1517    #[test]
1518    fn float_range_rejects_nan_row_value() {
1519        let constraint = PredicateConstraint::FloatRange {
1520            min: Some((0.0, true)),
1521            max: Some((10.0, true)),
1522        };
1523        assert!(!matches_constraint(
1524            &CellValue::Float64(f64::NAN),
1525            &constraint
1526        ));
1527    }
1528
1529    #[test]
1530    fn index_plan_with_boolean_prefix() {
1531        let (model, specs) = mixed_model();
1532        let active_idx = *model.columns_by_name.get("active").unwrap();
1533        let mut pred = QueryPredicate::default();
1534        pred.constraints
1535            .insert(active_idx, PredicateConstraint::BoolEq(true));
1536        let plan = pred
1537            .choose_index_plan(&model, &specs)
1538            .expect("plan")
1539            .expect("should find index");
1540        assert_eq!(plan.spec_idx, 0);
1541        assert_eq!(plan.constrained_prefix_len, 1);
1542    }
1543
1544    #[test]
1545    fn float_constraint_contradiction() {
1546        let mut lo: Option<(f64, bool)> = None;
1547        let mut hi: Option<(f64, bool)> = None;
1548        let mut contradiction = false;
1549        apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 10.0, &mut contradiction);
1550        assert!(!contradiction);
1551        apply_float_constraint(&mut lo, &mut hi, Operator::Lt, 5.0, &mut contradiction);
1552        assert!(contradiction);
1553    }
1554
1555    #[test]
1556    fn float_constraint_eq_then_range_contradicts() {
1557        let mut lo: Option<(f64, bool)> = None;
1558        let mut hi: Option<(f64, bool)> = None;
1559        let mut contradiction = false;
1560        apply_float_constraint(&mut lo, &mut hi, Operator::Eq, 5.0, &mut contradiction);
1561        assert!(!contradiction);
1562        apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 5.0, &mut contradiction);
1563        assert!(contradiction);
1564    }
1565
1566    #[test]
1567    fn float_nan_literal_comparison_marks_contradiction() {
1568        let config = KvTableConfig::new(
1569            0,
1570            vec![
1571                TableColumnConfig::new("id", DataType::Int64, false),
1572                TableColumnConfig::new("score", DataType::Float64, false),
1573            ],
1574            vec!["id".to_string()],
1575            vec![],
1576        )
1577        .unwrap();
1578        let model = TableModel::from_config(&config).unwrap();
1579
1580        use datafusion::logical_expr::col;
1581        let filter = col("score").gt(Expr::Literal(ScalarValue::Float64(Some(f64::NAN)), None));
1582        assert!(QueryPredicate::supports_filter(&filter, &model));
1583
1584        let pred = QueryPredicate::from_filters(&[filter], &model);
1585        assert!(
1586            pred.contradiction,
1587            "comparison with NaN literal must produce contradiction"
1588        );
1589    }
1590
1591    #[test]
1592    fn table_config_accepts_date32_column() {
1593        let config = KvTableConfig::new(
1594            0,
1595            vec![
1596                TableColumnConfig::new("id", DataType::Int64, false),
1597                TableColumnConfig::new("created", DataType::Date32, false),
1598            ],
1599            vec!["id".to_string()],
1600            vec![],
1601        )
1602        .expect("Date32 column should be accepted");
1603        assert_eq!(config.columns.len(), 2);
1604    }
1605
1606    #[test]
1607    fn table_config_accepts_timestamp_column() {
1608        let config = KvTableConfig::new(
1609            0,
1610            vec![
1611                TableColumnConfig::new("id", DataType::Int64, false),
1612                TableColumnConfig::new(
1613                    "ts",
1614                    DataType::Timestamp(TimeUnit::Microsecond, None),
1615                    false,
1616                ),
1617            ],
1618            vec!["id".to_string()],
1619            vec![],
1620        )
1621        .expect("Timestamp column should be accepted");
1622        let schema = config.to_schema();
1623        assert!(matches!(
1624            schema.field(1).data_type(),
1625            DataType::Timestamp(TimeUnit::Microsecond, _)
1626        ));
1627    }
1628
1629    #[test]
1630    fn table_config_normalizes_timestamp_to_microsecond() {
1631        let config = KvTableConfig::new(
1632            0,
1633            vec![
1634                TableColumnConfig::new("id", DataType::Int64, false),
1635                TableColumnConfig::new(
1636                    "ts",
1637                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1638                    false,
1639                ),
1640            ],
1641            vec!["id".to_string()],
1642            vec![],
1643        )
1644        .expect("Nanosecond timestamp should be accepted");
1645        let schema = config.to_schema();
1646        assert!(matches!(
1647            schema.field(1).data_type(),
1648            DataType::Timestamp(TimeUnit::Microsecond, _)
1649        ));
1650    }
1651
1652    #[test]
1653    fn table_config_accepts_decimal128_column() {
1654        let config = KvTableConfig::new(
1655            0,
1656            vec![
1657                TableColumnConfig::new("id", DataType::Int64, false),
1658                TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1659            ],
1660            vec!["id".to_string()],
1661            vec![],
1662        )
1663        .expect("Decimal128 column should be accepted");
1664        assert_eq!(config.columns.len(), 2);
1665    }
1666
1667    #[test]
1668    fn table_config_accepts_list_column() {
1669        use datafusion::arrow::datatypes::Field;
1670
1671        let config = KvTableConfig::new(
1672            0,
1673            vec![
1674                TableColumnConfig::new("id", DataType::Int64, false),
1675                TableColumnConfig::new(
1676                    "tags",
1677                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1678                    false,
1679                ),
1680            ],
1681            vec!["id".to_string()],
1682            vec![],
1683        )
1684        .expect("List<Utf8> column should be accepted");
1685        assert_eq!(config.columns.len(), 2);
1686    }
1687
1688    #[test]
1689    fn list_column_rejected_in_index() {
1690        use datafusion::arrow::datatypes::Field;
1691
1692        let result = KvTableConfig::new(
1693            0,
1694            vec![
1695                TableColumnConfig::new("id", DataType::Int64, false),
1696                TableColumnConfig::new(
1697                    "tags",
1698                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1699                    false,
1700                ),
1701            ],
1702            vec!["id".to_string()],
1703            vec![IndexSpec::new("tags_idx", vec!["tags".to_string()]).unwrap()],
1704        );
1705        assert!(
1706            result.is_err() || {
1707                let config = result.unwrap();
1708                let model = TableModel::from_config(&config).unwrap();
1709                model.resolve_index_specs(&config.index_specs).is_err()
1710            }
1711        );
1712    }
1713
1714    #[test]
1715    fn i32_ordered_encoding_round_trip() {
1716        let values = [i32::MIN, -1000, -1, 0, 1, 1000, i32::MAX];
1717        for v in values {
1718            assert_eq!(decode_i32_ordered(encode_i32_ordered(v)), v);
1719        }
1720        let encoded: Vec<[u8; 4]> = values.iter().map(|v| encode_i32_ordered(*v)).collect();
1721        for i in 0..encoded.len() - 1 {
1722            assert!(encoded[i] < encoded[i + 1]);
1723        }
1724    }
1725
1726    #[test]
1727    fn i128_ordered_encoding_round_trip() {
1728        let values = [i128::MIN, -1, 0, 1, 1234567890123456789, i128::MAX];
1729        for v in values {
1730            assert_eq!(decode_i128_ordered(encode_i128_ordered(v)), v);
1731        }
1732        let encoded: Vec<[u8; 16]> = values.iter().map(|v| encode_i128_ordered(*v)).collect();
1733        for i in 0..encoded.len() - 1 {
1734            assert!(encoded[i] < encoded[i + 1]);
1735        }
1736    }
1737
1738    fn extended_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1739        let config = KvTableConfig::new(
1740            0,
1741            vec![
1742                TableColumnConfig::new("id", DataType::Int64, false),
1743                TableColumnConfig::new("created", DataType::Date32, false),
1744                TableColumnConfig::new(
1745                    "ts",
1746                    DataType::Timestamp(TimeUnit::Microsecond, None),
1747                    false,
1748                ),
1749                TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1750                TableColumnConfig::new("label", DataType::Utf8, false),
1751            ],
1752            vec!["id".to_string()],
1753            vec![
1754                IndexSpec::new(
1755                    "date_label",
1756                    vec!["created".to_string(), "label".to_string()],
1757                )
1758                .expect("valid"),
1759                IndexSpec::new("price_idx", vec!["price".to_string()]).expect("valid"),
1760            ],
1761        )
1762        .expect("valid config");
1763        let model = TableModel::from_config(&config).expect("model");
1764        let specs = model
1765            .resolve_index_specs(&config.index_specs)
1766            .expect("specs");
1767        (model, specs)
1768    }
1769
1770    #[test]
1771    fn secondary_index_key_round_trip_date32_and_decimal128() {
1772        let (model, specs) = extended_model();
1773        let row = KvRow {
1774            values: vec![
1775                CellValue::Int64(42),
1776                CellValue::Date32(19000),
1777                CellValue::Timestamp(1_700_000_000_000_000),
1778                CellValue::Decimal128(123456),
1779                CellValue::Utf8("hello".to_string()),
1780            ],
1781        };
1782        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1783            .expect("encode");
1784        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1785            .expect("decode");
1786        let created_idx = *model.columns_by_name.get("created").unwrap();
1787        let label_idx = *model.columns_by_name.get("label").unwrap();
1788        assert!(matches!(
1789            decoded.values.get(&created_idx),
1790            Some(CellValue::Date32(19000))
1791        ));
1792        assert!(matches!(
1793            decoded.values.get(&label_idx),
1794            Some(CellValue::Utf8(v)) if v == "hello"
1795        ));
1796        assert!(matches!(
1797            &decoded.primary_key_values[0],
1798            CellValue::Int64(42)
1799        ));
1800
1801        let key2 = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1802            .expect("encode");
1803        let decoded2 = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key2)
1804            .expect("decode");
1805        let price_idx = *model.columns_by_name.get("price").unwrap();
1806        assert!(matches!(
1807            decoded2.values.get(&price_idx),
1808            Some(CellValue::Decimal128(123456))
1809        ));
1810        assert!(matches!(
1811            &decoded2.primary_key_values[0],
1812            CellValue::Int64(42)
1813        ));
1814    }
1815
1816    #[test]
1817    fn base_row_round_trip_with_date32_timestamp_decimal128() {
1818        let (model, _specs) = extended_model();
1819        let row = KvRow {
1820            values: vec![
1821                CellValue::Int64(7),
1822                CellValue::Date32(19500),
1823                CellValue::Timestamp(1_700_000_000_000_000),
1824                CellValue::Decimal128(-9876543),
1825                CellValue::Utf8("world".to_string()),
1826            ],
1827        };
1828        let encoded = encode_base_row_value(&row, &model).expect("encode");
1829        let decoded = decode_base_row(vec![CellValue::Int64(7)], &encoded, &model).expect("decode");
1830        assert!(matches!(&decoded.values[0], CellValue::Int64(7)));
1831        assert!(matches!(&decoded.values[1], CellValue::Date32(19500)));
1832        assert!(matches!(
1833            &decoded.values[2],
1834            CellValue::Timestamp(1_700_000_000_000_000)
1835        ));
1836        assert!(matches!(
1837            &decoded.values[3],
1838            CellValue::Decimal128(-9876543)
1839        ));
1840        assert!(matches!(&decoded.values[4], CellValue::Utf8(v) if v == "world"));
1841    }
1842
1843    #[test]
1844    fn base_row_round_trip_with_list() {
1845        use datafusion::arrow::datatypes::Field;
1846
1847        let config = KvTableConfig::new(
1848            0,
1849            vec![
1850                TableColumnConfig::new("id", DataType::Int64, false),
1851                TableColumnConfig::new(
1852                    "tags",
1853                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1854                    false,
1855                ),
1856                TableColumnConfig::new(
1857                    "scores",
1858                    DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
1859                    false,
1860                ),
1861            ],
1862            vec!["id".to_string()],
1863            vec![],
1864        )
1865        .expect("valid");
1866        let model = TableModel::from_config(&config).expect("model");
1867        let row = KvRow {
1868            values: vec![
1869                CellValue::Int64(1),
1870                CellValue::List(vec![
1871                    CellValue::Utf8("a".to_string()),
1872                    CellValue::Utf8("b".to_string()),
1873                ]),
1874                CellValue::List(vec![CellValue::Int64(10), CellValue::Int64(20)]),
1875            ],
1876        };
1877        let encoded = encode_base_row_value(&row, &model).expect("encode");
1878        let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).expect("decode");
1879        assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
1880        match &decoded.values[1] {
1881            CellValue::List(items) => {
1882                assert_eq!(items.len(), 2);
1883                assert!(matches!(&items[0], CellValue::Utf8(v) if v == "a"));
1884                assert!(matches!(&items[1], CellValue::Utf8(v) if v == "b"));
1885            }
1886            _ => panic!("expected List"),
1887        }
1888        match &decoded.values[2] {
1889            CellValue::List(items) => {
1890                assert_eq!(items.len(), 2);
1891                assert!(matches!(&items[0], CellValue::Int64(10)));
1892                assert!(matches!(&items[1], CellValue::Int64(20)));
1893            }
1894            _ => panic!("expected List"),
1895        }
1896    }
1897
1898    #[test]
1899    fn decimal128_constraint_range() {
1900        let mut min: Option<i128> = None;
1901        let mut max: Option<i128> = None;
1902        let mut contradiction = false;
1903        apply_decimal128_constraint(&mut min, &mut max, Operator::GtEq, 100, &mut contradiction);
1904        assert!(!contradiction);
1905        apply_decimal128_constraint(&mut min, &mut max, Operator::LtEq, 200, &mut contradiction);
1906        assert!(!contradiction);
1907        assert_eq!(min, Some(100));
1908        assert_eq!(max, Some(200));
1909        assert!(in_i128_bounds(150, min, max));
1910        assert!(!in_i128_bounds(99, min, max));
1911        assert!(!in_i128_bounds(201, min, max));
1912    }
1913
1914    #[test]
1915    fn decimal256_gt_max_is_contradiction() {
1916        let mut min: Option<i256> = None;
1917        let mut max: Option<i256> = None;
1918        let mut contradiction = false;
1919        apply_i256_constraint(
1920            &mut min,
1921            &mut max,
1922            Operator::Gt,
1923            i256::MAX,
1924            &mut contradiction,
1925        );
1926        assert!(contradiction);
1927        assert_eq!(min, None);
1928        assert_eq!(max, None);
1929    }
1930
1931    #[test]
1932    fn decimal256_lt_min_is_contradiction() {
1933        let mut min: Option<i256> = None;
1934        let mut max: Option<i256> = None;
1935        let mut contradiction = false;
1936        apply_i256_constraint(
1937            &mut min,
1938            &mut max,
1939            Operator::Lt,
1940            i256::MIN,
1941            &mut contradiction,
1942        );
1943        assert!(contradiction);
1944        assert_eq!(min, None);
1945        assert_eq!(max, None);
1946    }
1947
1948    #[test]
1949    fn date32_index_bound_clamps_on_i64_overflow() {
1950        let config = KvTableConfig::new(
1951            0,
1952            vec![
1953                TableColumnConfig::new("id", DataType::Int64, false),
1954                TableColumnConfig::new("created", DataType::Date32, false),
1955            ],
1956            vec!["id".to_string()],
1957            vec![IndexSpec::new("created_idx", vec!["created".to_string()]).unwrap()],
1958        )
1959        .unwrap();
1960        let model = TableModel::from_config(&config).unwrap();
1961        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
1962
1963        let created_idx = *model.columns_by_name.get("created").unwrap();
1964        let mut pred = QueryPredicate::default();
1965        pred.constraints.insert(
1966            created_idx,
1967            PredicateConstraint::IntRange {
1968                min: Some(i32::MAX as i64 + 1),
1969                max: None,
1970            },
1971        );
1972
1973        let start = pred
1974            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, false)
1975            .unwrap();
1976        let end = pred
1977            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, true)
1978            .unwrap();
1979
1980        assert!(
1981            start <= end,
1982            "lower bound must not exceed upper bound (was wrapping via as i32)"
1983        );
1984
1985        let encoded_lower = specs[0].codec.read_payload_exact::<4>(&start, 0).unwrap();
1986        let decoded_lower = decode_i32_ordered(encoded_lower);
1987        assert_eq!(
1988            decoded_lower,
1989            i32::MAX,
1990            "out-of-range i64 must clamp to i32::MAX, not wrap"
1991        );
1992    }
1993
1994    #[test]
1995    fn timestamp_nanos_gt_uses_floor_division() {
1996        let micros = timestamp_scalar_to_micros_for_op(
1997            &ScalarValue::TimestampNanosecond(Some(-1500), None),
1998            Operator::Gt,
1999        )
2000        .unwrap();
2001        assert_eq!(micros, -2, "Gt on -1500ns should floor to -2us");
2002
2003        let mut min: Option<i64> = None;
2004        let mut max: Option<i64> = None;
2005        let mut contradiction = false;
2006        apply_int_constraint(&mut min, &mut max, Operator::Gt, micros, &mut contradiction);
2007        assert_eq!(min, Some(-1), "Gt(-2us) + 1 = min -1us");
2008
2009        let row_at_minus_1 = CellValue::Timestamp(-1);
2010        assert!(matches_constraint(
2011            &row_at_minus_1,
2012            &PredicateConstraint::IntRange { min, max }
2013        ));
2014    }
2015
2016    #[test]
2017    fn timestamp_nanos_lteq_uses_floor_division() {
2018        let micros = timestamp_scalar_to_micros_for_op(
2019            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2020            Operator::LtEq,
2021        )
2022        .unwrap();
2023        assert_eq!(micros, -2, "LtEq on -1500ns should floor to -2us");
2024
2025        let row_at_minus_1 = CellValue::Timestamp(-1);
2026        assert!(
2027            !matches_constraint(
2028                &row_at_minus_1,
2029                &PredicateConstraint::IntRange {
2030                    min: None,
2031                    max: Some(micros)
2032                }
2033            ),
2034            "-1us (-1000ns) > -1500ns, must not satisfy <= -1500ns"
2035        );
2036    }
2037
2038    #[test]
2039    fn timestamp_nanos_gteq_uses_ceil_division() {
2040        let micros = timestamp_scalar_to_micros_for_op(
2041            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2042            Operator::GtEq,
2043        )
2044        .unwrap();
2045        assert_eq!(micros, -1, "GtEq on -1500ns should ceil to -1us");
2046    }
2047
2048    #[test]
2049    fn timestamp_nanos_lt_uses_ceil_division() {
2050        let micros = timestamp_scalar_to_micros_for_op(
2051            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2052            Operator::Lt,
2053        )
2054        .unwrap();
2055        assert_eq!(micros, -1, "Lt on -1500ns should ceil to -1us");
2056
2057        let mut min: Option<i64> = None;
2058        let mut max: Option<i64> = None;
2059        let mut contradiction = false;
2060        apply_int_constraint(&mut min, &mut max, Operator::Lt, micros, &mut contradiction);
2061        assert_eq!(max, Some(-2), "Lt(-1us) - 1 = max -2us");
2062    }
2063
2064    #[test]
2065    fn timestamp_nanos_eq_non_aligned_is_contradiction() {
2066        let result = timestamp_scalar_to_micros_for_op(
2067            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2068            Operator::Eq,
2069        );
2070        assert!(
2071            result.is_none(),
2072            "non-aligned ns Eq must produce contradiction"
2073        );
2074    }
2075
2076    #[test]
2077    fn timestamp_nanos_exact_multiple_is_unchanged() {
2078        for op in [
2079            Operator::Eq,
2080            Operator::Gt,
2081            Operator::GtEq,
2082            Operator::Lt,
2083            Operator::LtEq,
2084        ] {
2085            let micros = timestamp_scalar_to_micros_for_op(
2086                &ScalarValue::TimestampNanosecond(Some(-2000), None),
2087                op,
2088            )
2089            .unwrap();
2090            assert_eq!(micros, -2, "exact multiple -2000ns = -2us for {op:?}");
2091        }
2092    }
2093
2094    #[test]
2095    fn float64_index_bounds_include_infinity() {
2096        let config = KvTableConfig::new(
2097            0,
2098            vec![
2099                TableColumnConfig::new("id", DataType::Int64, false),
2100                TableColumnConfig::new("val", DataType::Float64, false),
2101            ],
2102            vec!["id".to_string()],
2103            vec![IndexSpec::new("val_idx", vec!["val".to_string()]).unwrap()],
2104        )
2105        .unwrap();
2106        let model = TableModel::from_config(&config).unwrap();
2107        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
2108
2109        let pred = QueryPredicate::default();
2110        let start = pred
2111            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, false)
2112            .unwrap();
2113        let end = pred
2114            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, true)
2115            .unwrap();
2116
2117        let neg_inf_row = KvRow {
2118            values: vec![CellValue::Int64(1), CellValue::Float64(f64::NEG_INFINITY)],
2119        };
2120        let pos_inf_row = KvRow {
2121            values: vec![CellValue::Int64(2), CellValue::Float64(f64::INFINITY)],
2122        };
2123
2124        let neg_inf_key =
2125            encode_secondary_index_key(model.table_prefix, &specs[0], &model, &neg_inf_row)
2126                .unwrap();
2127        let pos_inf_key =
2128            encode_secondary_index_key(model.table_prefix, &specs[0], &model, &pos_inf_row)
2129                .unwrap();
2130
2131        assert!(
2132            neg_inf_key >= start,
2133            "NEG_INFINITY row key must be within scan start bound"
2134        );
2135        assert!(
2136            pos_inf_key <= end,
2137            "INFINITY row key must be within scan end bound"
2138        );
2139    }
2140
2141    #[test]
2142    fn distinct_table_prefixes_produce_non_overlapping_pk_ranges() {
2143        let range_a = primary_key_prefix_range(1);
2144        let range_b = primary_key_prefix_range(2);
2145        assert!(
2146            range_a.end < range_b.start,
2147            "table prefix 1 pk range must be entirely below table prefix 2"
2148        );
2149    }
2150
2151    #[test]
2152    fn distinct_table_prefixes_isolate_primary_keys() {
2153        let model_1 = simple_int64_model(1);
2154        let model_2 = simple_int64_model(2);
2155        let pk = CellValue::Int64(42);
2156        let key_a = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2157        let key_b = encode_primary_key(2, &[&pk], &model_2).expect("pk key encodes");
2158        assert_ne!(key_a, key_b, "same PK under different prefixes must differ");
2159        assert!(
2160            decode_primary_key(1, &key_a, &model_1).is_some(),
2161            "key_a must decode under prefix 1"
2162        );
2163        assert!(
2164            decode_primary_key(2, &key_a, &model_2).is_none(),
2165            "key_a must NOT decode under prefix 2"
2166        );
2167        assert!(
2168            decode_primary_key(2, &key_b, &model_2).is_some(),
2169            "key_b must decode under prefix 2"
2170        );
2171    }
2172
2173    #[test]
2174    fn distinct_table_prefixes_isolate_secondary_keys() {
2175        let config_a = KvTableConfig::new(
2176            10,
2177            vec![
2178                TableColumnConfig::new("id", DataType::Int64, false),
2179                TableColumnConfig::new("name", DataType::Utf8, false),
2180            ],
2181            vec!["id".to_string()],
2182            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2183        )
2184        .unwrap();
2185        let config_b = KvTableConfig::new(
2186            11,
2187            vec![
2188                TableColumnConfig::new("id", DataType::Int64, false),
2189                TableColumnConfig::new("name", DataType::Utf8, false),
2190            ],
2191            vec!["id".to_string()],
2192            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2193        )
2194        .unwrap();
2195
2196        let model_a = TableModel::from_config(&config_a).unwrap();
2197        let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2198        let model_b = TableModel::from_config(&config_b).unwrap();
2199        let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2200
2201        let row = KvRow {
2202            values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2203        };
2204        let key_a =
2205            encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2206        let key_b =
2207            encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2208
2209        assert_ne!(
2210            key_a, key_b,
2211            "same row under different prefixes must differ"
2212        );
2213        assert!(
2214            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_a)
2215                .is_some()
2216        );
2217        assert!(
2218            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2219                .is_none(),
2220            "key from table B must not decode under table A's prefix"
2221        );
2222    }
2223
2224    #[test]
2225    fn table_prefix_stored_in_model() {
2226        let config = KvTableConfig::new(
2227            12,
2228            vec![TableColumnConfig::new("id", DataType::Int64, false)],
2229            vec!["id".to_string()],
2230            vec![],
2231        )
2232        .unwrap();
2233        let model = TableModel::from_config(&config).unwrap();
2234        assert_eq!(model.table_prefix, 12);
2235    }
2236
2237    #[test]
2238    fn codec_layout_exposes_payload_bits_under_reserved_family_bits() {
2239        let config = KvTableConfig::new(
2240            0,
2241            vec![
2242                TableColumnConfig::new("id", DataType::FixedSizeBinary(16), false),
2243                TableColumnConfig::new("bucket", DataType::FixedSizeBinary(16), false),
2244            ],
2245            vec!["id".to_string()],
2246            vec![IndexSpec::new("bucket_idx", vec!["bucket".to_string()]).unwrap()],
2247        )
2248        .unwrap();
2249        let model = TableModel::from_config(&config).unwrap();
2250        let spec = model
2251            .resolve_index_specs(&config.index_specs)
2252            .unwrap()
2253            .remove(0);
2254
2255        let mut current_primary = HashSet::new();
2256        let mut current_secondary = HashSet::new();
2257
2258        fn first_twelve_bits_of_key(key: &[u8]) -> u16 {
2259            let first = u16::from(*key.first().unwrap_or(&0));
2260            let second = u16::from(*key.get(1).unwrap_or(&0));
2261            (first << 4) | (second >> 4)
2262        }
2263
2264        for first_byte in 0u8..=255 {
2265            let mut id = vec![0u8; 16];
2266            id[0] = first_byte;
2267            let mut bucket = vec![0u8; 16];
2268            bucket[0] = first_byte;
2269
2270            let pk = CellValue::FixedBinary(id.clone());
2271            let current_pk = encode_primary_key(model.table_prefix, &[&pk], &model).unwrap();
2272            current_primary.insert(first_twelve_bits_of_key(&current_pk));
2273
2274            let row = KvRow {
2275                values: vec![
2276                    CellValue::FixedBinary(id),
2277                    CellValue::FixedBinary(bucket.clone()),
2278                ],
2279            };
2280            let current_index =
2281                encode_secondary_index_key(model.table_prefix, &spec, &model, &row).unwrap();
2282            current_secondary.insert(first_twelve_bits_of_key(&current_index));
2283        }
2284
2285        // Primary keys reserve 5 high bits for family, leaving 7 payload bits in the first
2286        // 12 bits of the physical key. Varying one payload byte therefore spans 2^7 values.
2287        assert_eq!(current_primary.len(), 128);
2288
2289        // Secondary index keys reserve 9 high bits for family, leaving 3 payload bits in the
2290        // first 12 bits. Varying one payload byte therefore spans 2^3 values.
2291        assert_eq!(current_secondary.len(), 8);
2292    }
2293
2294    #[test]
2295    fn kv_schema_auto_assigns_sequential_prefixes() {
2296        let client = StoreClient::new("http://localhost:10000");
2297        let schema = KvSchema::new(client)
2298            .table(
2299                "alpha",
2300                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2301                vec!["id".to_string()],
2302                vec![],
2303            )
2304            .unwrap()
2305            .table(
2306                "beta",
2307                vec![
2308                    TableColumnConfig::new("id", DataType::Int64, false),
2309                    TableColumnConfig::new("name", DataType::Utf8, false),
2310                ],
2311                vec!["id".to_string()],
2312                vec![],
2313            )
2314            .unwrap()
2315            .table(
2316                "gamma",
2317                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2318                vec!["id".to_string()],
2319                vec![],
2320            )
2321            .unwrap();
2322
2323        assert_eq!(schema.table_count(), 3);
2324    }
2325
2326    #[test]
2327    fn kv_schema_allows_max_codec_table_count_and_rejects_overflow() {
2328        let client = StoreClient::new("http://localhost:10000");
2329        let mut schema = KvSchema::new(client);
2330        for idx in 0..MAX_TABLES {
2331            schema = schema
2332                .table(
2333                    format!("t{idx}"),
2334                    vec![TableColumnConfig::new("id", DataType::Int64, false)],
2335                    vec!["id".to_string()],
2336                    vec![],
2337                )
2338                .expect("tables up to codec capacity should be accepted");
2339        }
2340        assert_eq!(schema.table_count(), MAX_TABLES);
2341
2342        let overflow = schema.table(
2343            "overflow",
2344            vec![TableColumnConfig::new("id", DataType::Int64, false)],
2345            vec!["id".to_string()],
2346            vec![],
2347        );
2348        match overflow {
2349            Ok(_) => panic!("overflow table should be rejected"),
2350            Err(err) => assert!(
2351                err.contains(&format!(
2352                    "too many tables for codec layout (max {MAX_TABLES})"
2353                )),
2354                "overflow table should be rejected with codec-capacity error"
2355            ),
2356        }
2357    }
2358
2359    #[test]
2360    fn sequential_prefixes_produce_non_overlapping_pk_ranges() {
2361        let range_a = primary_key_prefix_range(0);
2362        let range_b = primary_key_prefix_range(1);
2363        let range_c = primary_key_prefix_range(2);
2364        assert!(range_a.end < range_b.start);
2365        assert!(range_b.end < range_c.start);
2366    }
2367
2368    #[test]
2369    fn sequential_prefixes_isolate_primary_keys() {
2370        let model_0 = simple_int64_model(0);
2371        let model_1 = simple_int64_model(1);
2372        let pk = CellValue::Int64(42);
2373        let key_a = encode_primary_key(0, &[&pk], &model_0).expect("pk key encodes");
2374        let key_b = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2375        assert_ne!(key_a, key_b);
2376        assert!(decode_primary_key(0, &key_a, &model_0).is_some());
2377        assert!(decode_primary_key(1, &key_a, &model_1).is_none());
2378        assert!(decode_primary_key(0, &key_b, &model_0).is_none());
2379        assert!(decode_primary_key(1, &key_b, &model_1).is_some());
2380    }
2381
2382    #[test]
2383    fn sequential_prefixes_isolate_secondary_keys() {
2384        let config_a = KvTableConfig::new(
2385            0,
2386            vec![
2387                TableColumnConfig::new("id", DataType::Int64, false),
2388                TableColumnConfig::new("name", DataType::Utf8, false),
2389            ],
2390            vec!["id".to_string()],
2391            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2392        )
2393        .unwrap();
2394        let config_b = KvTableConfig::new(
2395            1,
2396            vec![
2397                TableColumnConfig::new("id", DataType::Int64, false),
2398                TableColumnConfig::new("name", DataType::Utf8, false),
2399            ],
2400            vec!["id".to_string()],
2401            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2402        )
2403        .unwrap();
2404
2405        let model_a = TableModel::from_config(&config_a).unwrap();
2406        let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2407        let model_b = TableModel::from_config(&config_b).unwrap();
2408        let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2409
2410        let row = KvRow {
2411            values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2412        };
2413        let key_a =
2414            encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2415        let key_b =
2416            encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2417        assert_ne!(key_a, key_b);
2418        assert!(
2419            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2420                .is_none(),
2421            "key from prefix 1 must not decode under prefix 0"
2422        );
2423    }
2424
2425    #[tokio::test]
2426    async fn kv_schema_register_all_enables_join() {
2427        let ctx = SessionContext::new();
2428        let client = StoreClient::new("http://localhost:10000");
2429
2430        let result = KvSchema::new(client)
2431            .table(
2432                "customers",
2433                vec![
2434                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2435                    TableColumnConfig::new("name", DataType::Utf8, false),
2436                ],
2437                vec!["customer_id".to_string()],
2438                vec![],
2439            )
2440            .unwrap()
2441            .table(
2442                "orders",
2443                vec![
2444                    TableColumnConfig::new("order_id", DataType::Int64, false),
2445                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2446                    TableColumnConfig::new("amount", DataType::Int64, false),
2447                ],
2448                vec!["order_id".to_string()],
2449                vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2450            )
2451            .unwrap()
2452            .register_all(&ctx);
2453
2454        assert!(
2455            result.is_ok(),
2456            "register_all must succeed: {:?}",
2457            result.err()
2458        );
2459
2460        let plan = ctx
2461            .sql(
2462                "SELECT c.name, o.order_id, o.amount \
2463                 FROM orders o \
2464                 JOIN customers c ON o.customer_id = c.customer_id",
2465            )
2466            .await;
2467        assert!(
2468            plan.is_ok(),
2469            "JOIN query must plan successfully: {:?}",
2470            plan.err()
2471        );
2472    }
2473
2474    #[tokio::test]
2475    async fn kv_schema_three_way_join() {
2476        let ctx = SessionContext::new();
2477        let client = StoreClient::new("http://localhost:10000");
2478
2479        KvSchema::new(client)
2480            .table(
2481                "products",
2482                vec![
2483                    TableColumnConfig::new("product_id", DataType::Int64, false),
2484                    TableColumnConfig::new("name", DataType::Utf8, false),
2485                    TableColumnConfig::new("price", DataType::Int64, false),
2486                ],
2487                vec!["product_id".to_string()],
2488                vec![],
2489            )
2490            .unwrap()
2491            .table(
2492                "line_items",
2493                vec![
2494                    TableColumnConfig::new("item_id", DataType::Int64, false),
2495                    TableColumnConfig::new("order_id", DataType::Int64, false),
2496                    TableColumnConfig::new("product_id", DataType::Int64, false),
2497                    TableColumnConfig::new("qty", DataType::Int64, false),
2498                ],
2499                vec!["item_id".to_string()],
2500                vec![
2501                    IndexSpec::new("prod_idx", vec!["product_id".to_string()]).unwrap(),
2502                    IndexSpec::new("order_idx", vec!["order_id".to_string()]).unwrap(),
2503                ],
2504            )
2505            .unwrap()
2506            .table(
2507                "orders",
2508                vec![
2509                    TableColumnConfig::new("order_id", DataType::Int64, false),
2510                    TableColumnConfig::new("customer", DataType::Utf8, false),
2511                ],
2512                vec!["order_id".to_string()],
2513                vec![],
2514            )
2515            .unwrap()
2516            .register_all(&ctx)
2517            .unwrap();
2518
2519        let plan = ctx
2520            .sql(
2521                "SELECT o.customer, p.name, li.qty \
2522                 FROM line_items li \
2523                 JOIN products p ON li.product_id = p.product_id \
2524                 JOIN orders o ON li.order_id = o.order_id",
2525            )
2526            .await;
2527        assert!(plan.is_ok(), "three-way JOIN must plan: {:?}", plan.err());
2528    }
2529
2530    #[test]
2531    fn kv_schema_orders_table_convenience() {
2532        let client = StoreClient::new("http://localhost:10000");
2533        let schema = KvSchema::new(client)
2534            .orders_table(
2535                "my_orders",
2536                vec![IndexSpec::new(
2537                    "region_customer",
2538                    vec!["region".to_string(), "customer_id".to_string()],
2539                )
2540                .unwrap()],
2541            )
2542            .unwrap();
2543        assert_eq!(schema.table_count(), 1);
2544    }
2545
2546    #[test]
2547    fn nullable_column_accepted_in_config() {
2548        let config = KvTableConfig::new(
2549            0,
2550            vec![
2551                TableColumnConfig::new("id", DataType::Int64, false),
2552                TableColumnConfig::new("name", DataType::Utf8, true),
2553            ],
2554            vec!["id".to_string()],
2555            vec![],
2556        );
2557        assert!(config.is_ok());
2558    }
2559
2560    #[test]
2561    fn nullable_column_rejected_in_index() {
2562        let config = KvTableConfig::new(
2563            0,
2564            vec![
2565                TableColumnConfig::new("id", DataType::Int64, false),
2566                TableColumnConfig::new("name", DataType::Utf8, true),
2567            ],
2568            vec!["id".to_string()],
2569            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2570        )
2571        .unwrap();
2572        let model = TableModel::from_config(&config).unwrap();
2573        let result = model.resolve_index_specs(&config.index_specs);
2574        assert!(result.is_err());
2575        assert!(result.unwrap_err().contains("nullable"));
2576    }
2577
2578    #[test]
2579    fn base_row_round_trip_with_null() {
2580        let config = KvTableConfig::new(
2581            0,
2582            vec![
2583                TableColumnConfig::new("id", DataType::Int64, false),
2584                TableColumnConfig::new("label", DataType::Utf8, true),
2585                TableColumnConfig::new("score", DataType::Int64, true),
2586            ],
2587            vec!["id".to_string()],
2588            vec![],
2589        )
2590        .unwrap();
2591        let model = TableModel::from_config(&config).unwrap();
2592        let row = KvRow {
2593            values: vec![CellValue::Int64(1), CellValue::Null, CellValue::Int64(42)],
2594        };
2595        let encoded = encode_base_row_value(&row, &model).unwrap();
2596        let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).unwrap();
2597        assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
2598        assert!(matches!(&decoded.values[1], CellValue::Null));
2599        assert!(matches!(&decoded.values[2], CellValue::Int64(42)));
2600    }
2601
2602    #[test]
2603    fn null_does_not_match_equality_constraint() {
2604        assert!(!matches_constraint(
2605            &CellValue::Null,
2606            &PredicateConstraint::StringEq("x".to_string())
2607        ));
2608        assert!(!matches_constraint(
2609            &CellValue::Null,
2610            &PredicateConstraint::IntRange {
2611                min: Some(0),
2612                max: Some(10)
2613            }
2614        ));
2615    }
2616
2617    #[test]
2618    fn is_null_constraint_matches() {
2619        assert!(matches_constraint(
2620            &CellValue::Null,
2621            &PredicateConstraint::IsNull
2622        ));
2623        assert!(!matches_constraint(
2624            &CellValue::Utf8("x".to_string()),
2625            &PredicateConstraint::IsNull
2626        ));
2627        assert!(!matches_constraint(
2628            &CellValue::Null,
2629            &PredicateConstraint::IsNotNull
2630        ));
2631        assert!(matches_constraint(
2632            &CellValue::Int64(5),
2633            &PredicateConstraint::IsNotNull
2634        ));
2635    }
2636
2637    #[test]
2638    fn string_in_constraint_matches() {
2639        let constraint =
2640            PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]);
2641        assert!(matches_constraint(
2642            &CellValue::Utf8("us-east".to_string()),
2643            &constraint,
2644        ));
2645        assert!(matches_constraint(
2646            &CellValue::Utf8("us-west".to_string()),
2647            &constraint,
2648        ));
2649        assert!(!matches_constraint(
2650            &CellValue::Utf8("eu-central".to_string()),
2651            &constraint,
2652        ));
2653    }
2654
2655    #[test]
2656    fn int_in_constraint_matches() {
2657        let constraint = PredicateConstraint::IntIn(vec![1, 2, 3]);
2658        assert!(matches_constraint(&CellValue::Int64(1), &constraint));
2659        assert!(matches_constraint(&CellValue::Int64(3), &constraint));
2660        assert!(!matches_constraint(&CellValue::Int64(4), &constraint));
2661    }
2662
2663    #[test]
2664    fn in_predicate_generates_multiple_index_ranges() {
2665        let (model, specs) = test_model();
2666        let region_idx = *model.columns_by_name.get("region").unwrap();
2667        let mut pred = QueryPredicate::default();
2668        pred.constraints.insert(
2669            region_idx,
2670            PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]),
2671        );
2672        let plan = pred
2673            .choose_index_plan(&model, &specs)
2674            .expect("plan")
2675            .expect("should find index");
2676        assert_eq!(plan.ranges.len(), 2);
2677    }
2678
2679    #[test]
2680    fn int_in_generates_multiple_pk_ranges() {
2681        let (model, _specs) = test_model();
2682        let mut pred = QueryPredicate::default();
2683        pred.constraints.insert(
2684            model.primary_key_indices[0],
2685            PredicateConstraint::IntIn(vec![100, 200, 300]),
2686        );
2687        let ranges = pred.primary_key_ranges(&model).unwrap();
2688        assert_eq!(ranges.len(), 3);
2689    }
2690
2691    #[test]
2692    fn duplicate_int_in_values_deduplicated() {
2693        let (model, _specs) = test_model();
2694        // Use the PK column "order_id" for the IN list
2695        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2696            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2697                "order_id",
2698            ))),
2699            list: vec![
2700                Expr::Literal(ScalarValue::Int64(Some(5)), None),
2701                Expr::Literal(ScalarValue::Int64(Some(5)), None),
2702                Expr::Literal(ScalarValue::Int64(Some(10)), None),
2703            ],
2704            negated: false,
2705        });
2706        let pred = QueryPredicate::from_filters(&[filter], &model);
2707        let ranges = pred.primary_key_ranges(&model).unwrap();
2708        assert_eq!(
2709            ranges.len(),
2710            2,
2711            "duplicate IN values must be deduped, producing 2 ranges not 3"
2712        );
2713    }
2714
2715    #[test]
2716    fn duplicate_uint64_in_values_deduplicated() {
2717        let config = KvTableConfig::new(
2718            0,
2719            vec![
2720                TableColumnConfig::new("id", DataType::UInt64, false),
2721                TableColumnConfig::new("name", DataType::Utf8, false),
2722            ],
2723            vec!["id".to_string()],
2724            vec![],
2725        )
2726        .unwrap();
2727        let model = TableModel::from_config(&config).unwrap();
2728        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2729            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2730                "id",
2731            ))),
2732            list: vec![
2733                Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2734                Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2735                Expr::Literal(ScalarValue::UInt64(Some(200)), None),
2736            ],
2737            negated: false,
2738        });
2739        let pred = QueryPredicate::from_filters(&[filter], &model);
2740        let ranges = pred.primary_key_ranges(&model).unwrap();
2741        assert_eq!(
2742            ranges.len(),
2743            2,
2744            "duplicate UInt64 IN values must be deduped"
2745        );
2746    }
2747
2748    #[test]
2749    fn duplicate_fixed_binary_in_values_deduplicated() {
2750        let config = KvTableConfig::new(
2751            0,
2752            vec![
2753                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
2754                TableColumnConfig::new("val", DataType::Int64, false),
2755            ],
2756            vec!["hash".to_string()],
2757            vec![],
2758        )
2759        .unwrap();
2760        let model = TableModel::from_config(&config).unwrap();
2761        let dup_val = vec![0xAA; 16];
2762        let other_val = vec![0xBB; 16];
2763        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2764            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2765                "hash",
2766            ))),
2767            list: vec![
2768                Expr::Literal(
2769                    ScalarValue::FixedSizeBinary(16, Some(dup_val.clone())),
2770                    None,
2771                ),
2772                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(dup_val)), None),
2773                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(other_val)), None),
2774            ],
2775            negated: false,
2776        });
2777        let pred = QueryPredicate::from_filters(&[filter], &model);
2778        let ranges = pred.primary_key_ranges(&model).unwrap();
2779        assert_eq!(
2780            ranges.len(),
2781            2,
2782            "duplicate FixedBinary IN values must be deduped"
2783        );
2784    }
2785
2786    #[test]
2787    fn or_equalities_extracted_as_in_list() {
2788        let (model, _) = test_model();
2789        let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2790            left: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2791                left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2792                    "region",
2793                ))),
2794                op: Operator::Eq,
2795                right: Box::new(Expr::Literal(
2796                    ScalarValue::Utf8(Some("us-east".to_string())),
2797                    None,
2798                )),
2799            })),
2800            op: Operator::Or,
2801            right: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2802                left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2803                    "region",
2804                ))),
2805                op: Operator::Eq,
2806                right: Box::new(Expr::Literal(
2807                    ScalarValue::Utf8(Some("us-west".to_string())),
2808                    None,
2809                )),
2810            })),
2811        });
2812        let result = extract_or_in_column(&expr, &model);
2813        assert!(result.is_some());
2814        let (col, vals) = result.unwrap();
2815        assert_eq!(col, "region");
2816        assert_eq!(vals.len(), 2);
2817    }
2818
2819    #[test]
2820    fn or_equalities_on_float64_are_not_pushdown_supported() {
2821        let config = KvTableConfig::new(
2822            0,
2823            vec![
2824                TableColumnConfig::new("id", DataType::Int64, false),
2825                TableColumnConfig::new("score", DataType::Float64, false),
2826            ],
2827            vec!["id".to_string()],
2828            vec![],
2829        )
2830        .unwrap();
2831        let model = TableModel::from_config(&config).unwrap();
2832
2833        use datafusion::logical_expr::col;
2834        let filter = col("score")
2835            .eq(Expr::Literal(ScalarValue::Float64(Some(1.0)), None))
2836            .or(col("score").eq(Expr::Literal(ScalarValue::Float64(Some(2.0)), None)));
2837
2838        assert!(
2839            !QueryPredicate::supports_filter(&filter, &model),
2840            "OR-equality pushdown should be disabled for Float64 because apply_in_list cannot enforce it"
2841        );
2842
2843        let pred = QueryPredicate::from_filters(&[filter], &model);
2844        assert!(!pred.contradiction);
2845        assert!(
2846            pred.constraints.is_empty(),
2847            "unsupported OR predicate must not contribute pushdown constraints"
2848        );
2849    }
2850
2851    #[test]
2852    fn batch_writer_encodes_rows_across_tables() {
2853        let client = StoreClient::new("http://localhost:10000");
2854        let schema = KvSchema::new(client)
2855            .table(
2856                "customers",
2857                vec![
2858                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2859                    TableColumnConfig::new("name", DataType::Utf8, false),
2860                ],
2861                vec!["customer_id".to_string()],
2862                vec![],
2863            )
2864            .unwrap()
2865            .table(
2866                "orders",
2867                vec![
2868                    TableColumnConfig::new("order_id", DataType::Int64, false),
2869                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2870                    TableColumnConfig::new("amount", DataType::Int64, false),
2871                ],
2872                vec!["order_id".to_string()],
2873                vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2874            )
2875            .unwrap();
2876
2877        let mut batch = schema.batch_writer();
2878        batch
2879            .insert(
2880                "customers",
2881                vec![CellValue::Int64(1), CellValue::Utf8("Alice".to_string())],
2882            )
2883            .unwrap();
2884        batch
2885            .insert(
2886                "orders",
2887                vec![
2888                    CellValue::Int64(100),
2889                    CellValue::Int64(1),
2890                    CellValue::Int64(4999),
2891                ],
2892            )
2893            .unwrap();
2894        batch
2895            .insert(
2896                "orders",
2897                vec![
2898                    CellValue::Int64(101),
2899                    CellValue::Int64(1),
2900                    CellValue::Int64(2999),
2901                ],
2902            )
2903            .unwrap();
2904
2905        // 1 customer base row + 2 order base rows + 2 order index rows = 5
2906        assert_eq!(batch.pending_count(), 5);
2907    }
2908
2909    #[test]
2910    fn batch_writer_rejects_unknown_table() {
2911        let client = StoreClient::new("http://localhost:10000");
2912        let schema = KvSchema::new(client)
2913            .table(
2914                "t1",
2915                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2916                vec!["id".to_string()],
2917                vec![],
2918            )
2919            .unwrap();
2920
2921        let mut batch = schema.batch_writer();
2922        let result = batch.insert("nonexistent", vec![CellValue::Int64(1)]);
2923        assert!(result.is_err());
2924        assert!(result.unwrap_err().contains("unknown table"));
2925    }
2926
2927    #[test]
2928    fn batch_writer_rejects_wrong_column_count() {
2929        let client = StoreClient::new("http://localhost:10000");
2930        let schema = KvSchema::new(client)
2931            .table(
2932                "t1",
2933                vec![
2934                    TableColumnConfig::new("id", DataType::Int64, false),
2935                    TableColumnConfig::new("name", DataType::Utf8, false),
2936                ],
2937                vec!["id".to_string()],
2938                vec![],
2939            )
2940            .unwrap();
2941
2942        let mut batch = schema.batch_writer();
2943        let result = batch.insert("t1", vec![CellValue::Int64(1)]);
2944        assert!(result.is_err());
2945        assert!(result.unwrap_err().contains("expected 2"));
2946    }
2947
2948    #[test]
2949    fn batch_writer_rejects_non_pk_type_mismatch() {
2950        let client = StoreClient::new("http://localhost:10000");
2951        let schema = KvSchema::new(client)
2952            .table(
2953                "t1",
2954                vec![
2955                    TableColumnConfig::new("id", DataType::Int64, false),
2956                    TableColumnConfig::new("amount", DataType::Int64, false),
2957                ],
2958                vec!["id".to_string()],
2959                vec![],
2960            )
2961            .unwrap();
2962
2963        let mut batch = schema.batch_writer();
2964        let result = batch.insert(
2965            "t1",
2966            vec![CellValue::Int64(1), CellValue::Utf8("bad".to_string())],
2967        );
2968        assert!(result.is_err());
2969        assert!(
2970            result.unwrap_err().contains("type mismatch"),
2971            "non-PK schema-invalid values must be rejected at insert-time"
2972        );
2973    }
2974
2975    #[test]
2976    fn batch_writer_entries_use_distinct_table_prefixes() {
2977        let client = StoreClient::new("http://localhost:10000");
2978        let schema = KvSchema::new(client)
2979            .table(
2980                "a",
2981                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2982                vec!["id".to_string()],
2983                vec![],
2984            )
2985            .unwrap()
2986            .table(
2987                "b",
2988                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2989                vec!["id".to_string()],
2990                vec![],
2991            )
2992            .unwrap();
2993
2994        let mut batch = schema.batch_writer();
2995        batch.insert("a", vec![CellValue::Int64(42)]).unwrap();
2996        batch.insert("b", vec![CellValue::Int64(42)]).unwrap();
2997
2998        assert_eq!(batch.pending_count(), 2);
2999        assert_ne!(
3000            batch.pending_keys[0], batch.pending_keys[1],
3001            "same PK in different tables must produce different keys"
3002        );
3003        assert_ne!(
3004            batch.pending_keys[0][0], batch.pending_keys[1][0],
3005            "table prefix byte must differ"
3006        );
3007    }
3008
3009    #[test]
3010    fn batch_writer_supports_nullable_columns() {
3011        let client = StoreClient::new("http://localhost:10000");
3012        let schema = KvSchema::new(client)
3013            .table(
3014                "t",
3015                vec![
3016                    TableColumnConfig::new("id", DataType::Int64, false),
3017                    TableColumnConfig::new("note", DataType::Utf8, true),
3018                ],
3019                vec!["id".to_string()],
3020                vec![],
3021            )
3022            .unwrap();
3023
3024        let mut batch = schema.batch_writer();
3025        batch
3026            .insert("t", vec![CellValue::Int64(1), CellValue::Null])
3027            .unwrap();
3028        assert_eq!(batch.pending_count(), 1);
3029    }
3030
3031    #[test]
3032    fn non_nullable_column_rejects_null_in_batch_writer() {
3033        let client = StoreClient::new("http://localhost:10000");
3034        let schema = KvSchema::new(client)
3035            .table(
3036                "t",
3037                vec![
3038                    TableColumnConfig::new("id", DataType::Int64, false),
3039                    TableColumnConfig::new("name", DataType::Utf8, false),
3040                    TableColumnConfig::new("note", DataType::Utf8, true),
3041                ],
3042                vec!["id".to_string()],
3043                vec![],
3044            )
3045            .unwrap();
3046
3047        // NULL in non-nullable column "name" must fail
3048        let mut batch = schema.batch_writer();
3049        let result = batch.insert(
3050            "t",
3051            vec![
3052                CellValue::Int64(1),
3053                CellValue::Null,
3054                CellValue::Utf8("ok".to_string()),
3055            ],
3056        );
3057        assert!(result.is_err());
3058        assert!(
3059            result.unwrap_err().contains("not nullable"),
3060            "error should mention non-nullable constraint"
3061        );
3062
3063        // NULL in nullable column "note" must succeed
3064        let mut batch = schema.batch_writer();
3065        batch
3066            .insert(
3067                "t",
3068                vec![
3069                    CellValue::Int64(1),
3070                    CellValue::Utf8("Alice".to_string()),
3071                    CellValue::Null,
3072                ],
3073            )
3074            .unwrap();
3075        assert_eq!(batch.pending_count(), 1);
3076
3077        // All non-null values must succeed
3078        let mut batch = schema.batch_writer();
3079        batch
3080            .insert(
3081                "t",
3082                vec![
3083                    CellValue::Int64(1),
3084                    CellValue::Utf8("Alice".to_string()),
3085                    CellValue::Utf8("hello".to_string()),
3086                ],
3087            )
3088            .unwrap();
3089        assert_eq!(batch.pending_count(), 1);
3090    }
3091
3092    #[test]
3093    fn uint64_column_accepted() {
3094        let config = KvTableConfig::new(
3095            0,
3096            vec![
3097                TableColumnConfig::new("id", DataType::UInt64, false),
3098                TableColumnConfig::new("name", DataType::Utf8, false),
3099            ],
3100            vec!["id".to_string()],
3101            vec![],
3102        );
3103        assert!(config.is_ok());
3104    }
3105
3106    #[test]
3107    fn uint64_primary_key_round_trip() {
3108        let config = KvTableConfig::new(
3109            0,
3110            vec![
3111                TableColumnConfig::new("id", DataType::UInt64, false),
3112                TableColumnConfig::new("label", DataType::Utf8, false),
3113            ],
3114            vec!["id".to_string()],
3115            vec![],
3116        )
3117        .unwrap();
3118        let model = TableModel::from_config(&config).unwrap();
3119        let row = KvRow {
3120            values: vec![
3121                CellValue::UInt64(u64::MAX),
3122                CellValue::Utf8("max".to_string()),
3123            ],
3124        };
3125        let encoded = encode_base_row_value(&row, &model).unwrap();
3126        let pk = row
3127            .primary_key_values(&model)
3128            .into_iter()
3129            .cloned()
3130            .collect::<Vec<_>>();
3131        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3132        assert!(matches!(&decoded.values[0], CellValue::UInt64(v) if *v == u64::MAX));
3133        assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "max"));
3134    }
3135
3136    #[test]
3137    fn string_primary_key_accepted() {
3138        let config = KvTableConfig::new(
3139            0,
3140            vec![
3141                TableColumnConfig::new("code", DataType::Utf8, false),
3142                TableColumnConfig::new("value", DataType::Int64, false),
3143            ],
3144            vec!["code".to_string()],
3145            vec![],
3146        );
3147        assert!(config.is_ok());
3148    }
3149
3150    #[test]
3151    fn fixed_binary_primary_key_round_trip() {
3152        let config = KvTableConfig::new(
3153            0,
3154            vec![
3155                TableColumnConfig::new("hash", DataType::FixedSizeBinary(32), false),
3156                TableColumnConfig::new("amount", DataType::Int64, false),
3157            ],
3158            vec!["hash".to_string()],
3159            vec![],
3160        )
3161        .unwrap();
3162        let model = TableModel::from_config(&config).unwrap();
3163        let hash_val = vec![0xABu8; 32];
3164        let row = KvRow {
3165            values: vec![
3166                CellValue::FixedBinary(hash_val.clone()),
3167                CellValue::Int64(100),
3168            ],
3169        };
3170        let encoded = encode_base_row_value(&row, &model).unwrap();
3171        let pk = row
3172            .primary_key_values(&model)
3173            .into_iter()
3174            .cloned()
3175            .collect::<Vec<_>>();
3176        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3177        assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if *v == hash_val));
3178    }
3179
3180    #[test]
3181    fn fixed_binary_key_rejects_wrong_length() {
3182        let config = KvTableConfig::new(
3183            0,
3184            vec![
3185                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3186                TableColumnConfig::new("amount", DataType::Int64, false),
3187            ],
3188            vec!["hash".to_string()],
3189            vec![],
3190        )
3191        .unwrap();
3192        let model = TableModel::from_config(&config).unwrap();
3193
3194        // Too short (10 bytes for a 16-byte column)
3195        let short_row = KvRow {
3196            values: vec![CellValue::FixedBinary(vec![0xAB; 10]), CellValue::Int64(1)],
3197        };
3198        let result = encode_primary_key_from_row(model.table_prefix, &short_row, &model);
3199        assert!(result.is_err());
3200        assert!(
3201            result.unwrap_err().contains("requires exactly 16 bytes"),
3202            "should mention exact width requirement"
3203        );
3204
3205        // Too long (20 bytes for a 16-byte column)
3206        let long_row = KvRow {
3207            values: vec![CellValue::FixedBinary(vec![0xCD; 20]), CellValue::Int64(2)],
3208        };
3209        let result = encode_primary_key_from_row(model.table_prefix, &long_row, &model);
3210        assert!(result.is_err());
3211        assert!(result.unwrap_err().contains("requires exactly 16 bytes"));
3212
3213        // Exact length (16 bytes) — must succeed
3214        let ok_row = KvRow {
3215            values: vec![CellValue::FixedBinary(vec![0xEF; 16]), CellValue::Int64(3)],
3216        };
3217        assert!(encode_primary_key_from_row(model.table_prefix, &ok_row, &model).is_ok());
3218    }
3219
3220    #[test]
3221    fn fixed_binary_index_key_rejects_wrong_length() {
3222        let config = KvTableConfig::new(
3223            0,
3224            vec![
3225                TableColumnConfig::new("id", DataType::Int64, false),
3226                TableColumnConfig::new("tag", DataType::FixedSizeBinary(8), false),
3227            ],
3228            vec!["id".to_string()],
3229            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3230        )
3231        .unwrap();
3232        let model = TableModel::from_config(&config).unwrap();
3233        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3234
3235        // Wrong length (4 bytes for an 8-byte column)
3236        let bad_row = KvRow {
3237            values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x01; 4])],
3238        };
3239        let result = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &bad_row);
3240        assert!(result.is_err());
3241        assert!(result.unwrap_err().contains("requires exactly 8 bytes"));
3242
3243        // Correct length (8 bytes)
3244        let ok_row = KvRow {
3245            values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x02; 8])],
3246        };
3247        assert!(encode_secondary_index_key(model.table_prefix, &specs[0], &model, &ok_row).is_ok());
3248    }
3249
3250    #[test]
3251    fn decimal256_column_round_trip() {
3252        let config = KvTableConfig::new(
3253            0,
3254            vec![
3255                TableColumnConfig::new("id", DataType::Int64, false),
3256                TableColumnConfig::new("balance", DataType::Decimal256(76, 0), false),
3257            ],
3258            vec!["id".to_string()],
3259            vec![],
3260        )
3261        .unwrap();
3262        let model = TableModel::from_config(&config).unwrap();
3263        let big_val = i256::from(123456789012345i64);
3264        let row = KvRow {
3265            values: vec![CellValue::Int64(1), CellValue::Decimal256(big_val)],
3266        };
3267        let encoded = encode_base_row_value(&row, &model).unwrap();
3268        let pk = row
3269            .primary_key_values(&model)
3270            .into_iter()
3271            .cloned()
3272            .collect::<Vec<_>>();
3273        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3274        assert!(matches!(&decoded.values[1], CellValue::Decimal256(v) if *v == big_val));
3275    }
3276
3277    #[test]
3278    fn float64_primary_key_rejected() {
3279        let config = KvTableConfig::new(
3280            0,
3281            vec![TableColumnConfig::new("id", DataType::Float64, false)],
3282            vec!["id".to_string()],
3283            vec![],
3284        );
3285        assert!(config.is_err());
3286    }
3287
3288    #[test]
3289    fn i256_ordered_encoding_round_trip() {
3290        let values = [
3291            i256::from_i128(i128::MIN),
3292            i256::from(-1i64),
3293            i256::from(0i64),
3294            i256::from(1i64),
3295            i256::from_i128(i128::MAX),
3296        ];
3297        for v in values {
3298            assert_eq!(decode_i256_ordered(encode_i256_ordered(v)), v);
3299        }
3300        let encoded: Vec<[u8; 32]> = values.iter().map(|v| encode_i256_ordered(*v)).collect();
3301        for i in 0..encoded.len() - 1 {
3302            assert!(encoded[i] < encoded[i + 1]);
3303        }
3304    }
3305
3306    #[test]
3307    fn uint64_primary_key_encode_decode() {
3308        let config = KvTableConfig::new(
3309            5,
3310            vec![
3311                TableColumnConfig::new("id", DataType::UInt64, false),
3312                TableColumnConfig::new("name", DataType::Utf8, false),
3313            ],
3314            vec!["id".to_string()],
3315            vec![],
3316        )
3317        .unwrap();
3318        let model = TableModel::from_config(&config).unwrap();
3319        let pk = CellValue::UInt64(12345);
3320        let key = encode_primary_key(5, &[&pk], &model).expect("pk key encodes");
3321        let decoded = decode_primary_key(5, &key, &model).unwrap();
3322        assert!(matches!(&decoded[0], CellValue::UInt64(12345)));
3323    }
3324
3325    #[test]
3326    fn utf8_primary_key_encode_decode() {
3327        let config = KvTableConfig::new(
3328            3,
3329            vec![
3330                TableColumnConfig::new("code", DataType::Utf8, false),
3331                TableColumnConfig::new("val", DataType::Int64, false),
3332            ],
3333            vec!["code".to_string()],
3334            vec![],
3335        )
3336        .unwrap();
3337        let model = TableModel::from_config(&config).unwrap();
3338        let pk = CellValue::Utf8("HELLO".to_string());
3339        let key = encode_primary_key(3, &[&pk], &model).expect("pk key encodes");
3340        let decoded = decode_primary_key(3, &key, &model).unwrap();
3341        assert!(matches!(&decoded[0], CellValue::Utf8(v) if v == "HELLO"));
3342    }
3343
3344    #[test]
3345    fn fixed_binary_primary_key_encode_decode() {
3346        let config = KvTableConfig::new(
3347            7,
3348            vec![
3349                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3350                TableColumnConfig::new("val", DataType::Int64, false),
3351            ],
3352            vec!["hash".to_string()],
3353            vec![],
3354        )
3355        .unwrap();
3356        let model = TableModel::from_config(&config).unwrap();
3357        let data = vec![0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
3358        let pk = CellValue::FixedBinary(data.clone());
3359        let key = encode_primary_key(7, &[&pk], &model).expect("pk key encodes");
3360        let decoded = decode_primary_key(7, &key, &model).unwrap();
3361        assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == data));
3362    }
3363
3364    #[test]
3365    fn secondary_index_with_uint64_column() {
3366        let config = KvTableConfig::new(
3367            0,
3368            vec![
3369                TableColumnConfig::new("id", DataType::Int64, false),
3370                TableColumnConfig::new("counter", DataType::UInt64, false),
3371            ],
3372            vec!["id".to_string()],
3373            vec![IndexSpec::new("counter_idx", vec!["counter".to_string()]).unwrap()],
3374        )
3375        .unwrap();
3376        let model = TableModel::from_config(&config).unwrap();
3377        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3378        let row = KvRow {
3379            values: vec![CellValue::Int64(1), CellValue::UInt64(999)],
3380        };
3381        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3382        let decoded =
3383            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3384        let counter_idx = *model.columns_by_name.get("counter").unwrap();
3385        assert!(matches!(
3386            decoded.values.get(&counter_idx),
3387            Some(CellValue::UInt64(999))
3388        ));
3389        assert!(matches!(
3390            &decoded.primary_key_values[0],
3391            CellValue::Int64(1)
3392        ));
3393    }
3394
3395    #[test]
3396    fn secondary_index_with_decimal256_column() {
3397        let config = KvTableConfig::new(
3398            0,
3399            vec![
3400                TableColumnConfig::new("id", DataType::Int64, false),
3401                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
3402            ],
3403            vec!["id".to_string()],
3404            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
3405        )
3406        .unwrap();
3407        let model = TableModel::from_config(&config).unwrap();
3408        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3409        let val = i256::from(42i64);
3410        let row = KvRow {
3411            values: vec![CellValue::Int64(1), CellValue::Decimal256(val)],
3412        };
3413        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3414        let decoded =
3415            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3416        let big_idx = *model.columns_by_name.get("big_val").unwrap();
3417        assert!(matches!(
3418            decoded.values.get(&big_idx),
3419            Some(CellValue::Decimal256(v)) if *v == val
3420        ));
3421    }
3422
3423    // -----------------------------------------------------------------------
3424    // Composite primary key tests
3425    // -----------------------------------------------------------------------
3426
3427    #[test]
3428    fn composite_pk_config_accepted() {
3429        let config = KvTableConfig::new(
3430            0,
3431            vec![
3432                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3433                TableColumnConfig::new("version", DataType::UInt64, false),
3434                TableColumnConfig::new("data", DataType::Utf8, true),
3435            ],
3436            vec!["entity".to_string(), "version".to_string()],
3437            vec![],
3438        );
3439        assert!(config.is_ok());
3440        let c = config.unwrap();
3441        assert_eq!(c.primary_key_columns, vec!["entity", "version"]);
3442    }
3443
3444    #[test]
3445    fn composite_pk_rejects_unsupported_type() {
3446        let result = KvTableConfig::new(
3447            0,
3448            vec![
3449                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3450                TableColumnConfig::new("score", DataType::Float64, false),
3451            ],
3452            vec!["entity".to_string(), "score".to_string()],
3453            vec![],
3454        );
3455        assert!(result.is_err());
3456        let err = result.unwrap_err();
3457        assert!(
3458            err.contains("must be Int64") || err.contains("must be"),
3459            "expected PK type error, got: {err}"
3460        );
3461    }
3462
3463    #[test]
3464    fn composite_pk_rejects_too_wide() {
3465        let config = KvTableConfig::new(
3466            0,
3467            vec![
3468                TableColumnConfig::new("big", DataType::FixedSizeBinary(60), false),
3469                TableColumnConfig::new("ver", DataType::UInt64, false),
3470            ],
3471            vec!["big".to_string(), "ver".to_string()],
3472            vec![],
3473        )
3474        .expect("variable-length keys should allow wider composite PKs");
3475        let model = TableModel::from_config(&config).expect("model");
3476        assert_eq!(model.primary_key_width, 68);
3477    }
3478
3479    #[test]
3480    fn composite_pk_encode_decode_round_trip() {
3481        let config = KvTableConfig::new(
3482            1,
3483            vec![
3484                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3485                TableColumnConfig::new("version", DataType::UInt64, false),
3486                TableColumnConfig::new("title", DataType::Utf8, true),
3487            ],
3488            vec!["entity".to_string(), "version".to_string()],
3489            vec![],
3490        )
3491        .unwrap();
3492        let model = TableModel::from_config(&config).unwrap();
3493
3494        let entity = vec![0xAA; 32];
3495        let pk_entity = CellValue::FixedBinary(entity.clone());
3496        let pk_version = CellValue::UInt64(42);
3497        let key =
3498            encode_primary_key(1, &[&pk_entity, &pk_version], &model).expect("pk key encodes");
3499
3500        let decoded = decode_primary_key(1, &key, &model).unwrap();
3501        assert_eq!(decoded.len(), 2);
3502        assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == entity));
3503        assert!(matches!(&decoded[1], CellValue::UInt64(42)));
3504    }
3505
3506    #[test]
3507    fn composite_pk_version_sort_order() {
3508        let config = KvTableConfig::new(
3509            0,
3510            vec![
3511                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3512                TableColumnConfig::new("version", DataType::UInt64, false),
3513            ],
3514            vec!["entity".to_string(), "version".to_string()],
3515            vec![],
3516        )
3517        .unwrap();
3518        let model = TableModel::from_config(&config).unwrap();
3519
3520        let entity = vec![0xBB; 32];
3521        let pk_entity = CellValue::FixedBinary(entity.clone());
3522
3523        let key_v1 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(1)], &model)
3524            .expect("pk key encodes");
3525        let key_v10 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(10)], &model)
3526            .expect("pk key encodes");
3527        let key_v100 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(100)], &model)
3528            .expect("pk key encodes");
3529
3530        // Versions must sort numerically (big-endian U64)
3531        assert!(key_v1 < key_v10);
3532        assert!(key_v10 < key_v100);
3533    }
3534
3535    #[test]
3536    fn composite_pk_value_excludes_all_pk_columns() {
3537        let config = KvTableConfig::new(
3538            0,
3539            vec![
3540                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3541                TableColumnConfig::new("version", DataType::UInt64, false),
3542                TableColumnConfig::new("data", DataType::Utf8, true),
3543            ],
3544            vec!["entity".to_string(), "version".to_string()],
3545            vec![],
3546        )
3547        .unwrap();
3548        let model = TableModel::from_config(&config).unwrap();
3549
3550        let row = KvRow {
3551            values: vec![
3552                CellValue::FixedBinary(vec![0xCC; 16]),
3553                CellValue::UInt64(7),
3554                CellValue::Utf8("hello".to_string()),
3555            ],
3556        };
3557        let encoded = encode_base_row_value(&row, &model).unwrap();
3558        // Both PK columns should be None in stored value
3559        let decoded = decode_base_row(
3560            vec![CellValue::FixedBinary(vec![0xCC; 16]), CellValue::UInt64(7)],
3561            &encoded,
3562            &model,
3563        )
3564        .unwrap();
3565        assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if v.len() == 16));
3566        assert!(matches!(&decoded.values[1], CellValue::UInt64(7)));
3567        assert!(matches!(&decoded.values[2], CellValue::Utf8(v) if v == "hello"));
3568    }
3569
3570    #[test]
3571    fn composite_pk_secondary_index_appends_all_pk_columns() {
3572        let config = KvTableConfig::new(
3573            0,
3574            vec![
3575                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3576                TableColumnConfig::new("version", DataType::UInt64, false),
3577                TableColumnConfig::new("tag", DataType::Int64, false),
3578            ],
3579            vec!["entity".to_string(), "version".to_string()],
3580            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3581        )
3582        .unwrap();
3583        let model = TableModel::from_config(&config).unwrap();
3584        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3585
3586        let entity_data = vec![0xDD; 16];
3587        let row = KvRow {
3588            values: vec![
3589                CellValue::FixedBinary(entity_data.clone()),
3590                CellValue::UInt64(99),
3591                CellValue::Int64(42),
3592            ],
3593        };
3594        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3595        let decoded =
3596            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3597
3598        assert_eq!(decoded.primary_key_values.len(), 2);
3599        assert!(matches!(
3600            &decoded.primary_key_values[0],
3601            CellValue::FixedBinary(v) if *v == entity_data
3602        ));
3603        assert!(matches!(
3604            &decoded.primary_key_values[1],
3605            CellValue::UInt64(99)
3606        ));
3607        let tag_idx = *model.columns_by_name.get("tag").unwrap();
3608        assert!(matches!(
3609            decoded.values.get(&tag_idx),
3610            Some(CellValue::Int64(42))
3611        ));
3612    }
3613
3614    #[test]
3615    fn table_versioned_convenience() {
3616        let client = StoreClient::new("http://localhost:10000");
3617        let schema = KvSchema::new(client)
3618            .table_versioned(
3619                "documents",
3620                vec![
3621                    TableColumnConfig::new("doc_id", DataType::FixedSizeBinary(32), false),
3622                    TableColumnConfig::new("version", DataType::UInt64, false),
3623                    TableColumnConfig::new("title", DataType::Utf8, false),
3624                ],
3625                "doc_id",
3626                "version",
3627                vec![],
3628            )
3629            .unwrap();
3630        assert_eq!(schema.table_count(), 1);
3631    }
3632
3633    #[test]
3634    fn single_column_pk_backward_compat() {
3635        // Ensure single-column PK still works identically
3636        let config = KvTableConfig::new(
3637            0,
3638            vec![
3639                TableColumnConfig::new("id", DataType::Int64, false),
3640                TableColumnConfig::new("name", DataType::Utf8, true),
3641            ],
3642            vec!["id".to_string()],
3643            vec![],
3644        )
3645        .unwrap();
3646        let model = TableModel::from_config(&config).unwrap();
3647        assert_eq!(model.primary_key_indices.len(), 1);
3648        assert_eq!(model.primary_key_indices[0], 0);
3649        assert_eq!(model.primary_key_width, 8);
3650
3651        let pk = CellValue::Int64(42);
3652        let key = encode_primary_key(0, &[&pk], &model).expect("pk key encodes");
3653        let decoded = decode_primary_key(0, &key, &model).unwrap();
3654        assert_eq!(decoded.len(), 1);
3655        assert!(matches!(&decoded[0], CellValue::Int64(42)));
3656    }
3657
3658    #[test]
3659    fn partial_prefix_upper_bound_fills_trailing_pk_bytes() {
3660        // Regression: encode_primary_key_bound with partial prefix must
3661        // fill 0xFF from the end of the encoded prefix, not from the
3662        // end of the full PK width. Otherwise trailing PK column bytes
3663        // stay 0x00, producing an end key that's too low.
3664        let config = KvTableConfig::new(
3665            0,
3666            vec![
3667                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3668                TableColumnConfig::new("version", DataType::UInt64, false),
3669            ],
3670            vec!["entity".to_string(), "version".to_string()],
3671            vec![],
3672        )
3673        .unwrap();
3674        let model = TableModel::from_config(&config).unwrap();
3675        assert_eq!(model.primary_key_width, 24); // 16 + 8
3676
3677        let entity = CellValue::FixedBinary(vec![0xAA; 16]);
3678        // Partial prefix: only entity, no version
3679        let upper =
3680            encode_primary_key_bound(0, &[&entity], &model, true).expect("pk bound encodes");
3681
3682        // Entity bytes must be encoded
3683        assert_eq!(primary_payload(&model, &upper, 0, 16), vec![0xAA; 16]);
3684        // Version bytes (8 bytes after entity) MUST be 0xFF, not 0x00
3685        assert_eq!(
3686            primary_payload(&model, &upper, 16, 8),
3687            vec![0xFF; 8],
3688            "trailing PK column (version) must be 0xFF for upper bound"
3689        );
3690        // Everything after PK region also 0xFF
3691        assert!(primary_payload(
3692            &model,
3693            &upper,
3694            24,
3695            model.primary_key_codec.payload_capacity_bytes() - 24
3696        )
3697        .iter()
3698        .all(|&b| b == 0xFF));
3699
3700        // Lower bound: trailing bytes should be 0x00
3701        let lower =
3702            encode_primary_key_bound(0, &[&entity], &model, false).expect("pk bound encodes");
3703        assert_eq!(primary_payload(&model, &lower, 0, 16), vec![0xAA; 16]);
3704        assert_eq!(
3705            primary_payload(&model, &lower, 16, 8),
3706            vec![0x00; 8],
3707            "trailing PK column (version) must be 0x00 for lower bound"
3708        );
3709    }
3710
3711    // -----------------------------------------------------------------------
3712    // Composite PK filter pushdown tests
3713    // -----------------------------------------------------------------------
3714
3715    #[test]
3716    fn composite_pk_range_pushdown_entity_eq_version_lte() {
3717        // PK = (entity: FixedSizeBinary(16), version: UInt64)
3718        // Query: entity = X'CC..CC' AND version <= 42
3719        // Should produce a TIGHT range, not a full table scan.
3720        let config = KvTableConfig::new(
3721            0,
3722            vec![
3723                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3724                TableColumnConfig::new("version", DataType::UInt64, false),
3725                TableColumnConfig::new("data", DataType::Utf8, true),
3726            ],
3727            vec!["entity".to_string(), "version".to_string()],
3728            vec![],
3729        )
3730        .unwrap();
3731        let model = TableModel::from_config(&config).unwrap();
3732
3733        // Simulate predicate: entity = X'CC..CC' AND version <= 42
3734        let mut pred = QueryPredicate::default();
3735        pred.constraints
3736            .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xCC; 16]));
3737        pred.constraints.insert(
3738            1,
3739            PredicateConstraint::UInt64Range {
3740                min: None,
3741                max: Some(42),
3742            },
3743        );
3744
3745        let ranges = pred.primary_key_ranges(&model).unwrap();
3746        assert_eq!(ranges.len(), 1, "should produce exactly one range");
3747
3748        let range = &ranges[0];
3749
3750        // The start key should encode entity=CC..CC, version=0
3751        let expected_start = encode_primary_key(
3752            0,
3753            &[
3754                &CellValue::FixedBinary(vec![0xCC; 16]),
3755                &CellValue::UInt64(0),
3756            ],
3757            &model,
3758        )
3759        .expect("pk key encodes");
3760        assert_eq!(
3761            range.start, expected_start,
3762            "start should be entity=CC..CC, version=0"
3763        );
3764
3765        // The end key should encode entity=CC..CC, version=42, then 0xFF tail
3766        let expected_end_prefix = encode_primary_key(
3767            0,
3768            &[
3769                &CellValue::FixedBinary(vec![0xCC; 16]),
3770                &CellValue::UInt64(42),
3771            ],
3772            &model,
3773        )
3774        .expect("pk key encodes");
3775        // The end key has 0xFF-filled tail after the PK portion
3776        assert_eq!(
3777            primary_payload(&model, &range.end, 0, model.primary_key_width),
3778            primary_payload(&model, &expected_end_prefix, 0, model.primary_key_width),
3779            "end prefix should be entity=CC..CC, version=42"
3780        );
3781        // Trailing bytes after PK should be 0xFF
3782        assert!(
3783            primary_payload(
3784                &model,
3785                &range.end,
3786                model.primary_key_width,
3787                model.primary_key_codec.payload_capacity_bytes() - model.primary_key_width
3788            )
3789            .iter()
3790            .all(|&b| b == 0xFF),
3791            "end trailing bytes should be 0xFF"
3792        );
3793
3794        // Crucially, the range must NOT be a full table scan
3795        let full_range = primary_key_prefix_range(0);
3796        assert_ne!(
3797            range.start, full_range.start,
3798            "range must not be a full table scan"
3799        );
3800    }
3801
3802    #[test]
3803    fn composite_pk_range_pushdown_entity_eq_only() {
3804        // PK = (entity: FixedSizeBinary(16), version: UInt64)
3805        // Query: entity = X'DD..DD' (no version constraint)
3806        // Should still produce a tight entity-prefix range.
3807        let config = KvTableConfig::new(
3808            0,
3809            vec![
3810                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3811                TableColumnConfig::new("version", DataType::UInt64, false),
3812            ],
3813            vec!["entity".to_string(), "version".to_string()],
3814            vec![],
3815        )
3816        .unwrap();
3817        let model = TableModel::from_config(&config).unwrap();
3818
3819        let mut pred = QueryPredicate::default();
3820        pred.constraints
3821            .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xDD; 16]));
3822
3823        let ranges = pred.primary_key_ranges(&model).unwrap();
3824        assert_eq!(ranges.len(), 1);
3825
3826        let range = &ranges[0];
3827        // Start should have entity=DD..DD, version=0x00..00
3828        assert_eq!(primary_payload(&model, &range.start, 0, 16), vec![0xDD; 16]);
3829        // End should have entity=DD..DD, then 0xFF for version + tail
3830        assert_eq!(primary_payload(&model, &range.end, 0, 16), vec![0xDD; 16]);
3831        assert!(
3832            primary_payload(
3833                &model,
3834                &range.end,
3835                16,
3836                model.primary_key_codec.payload_capacity_bytes() - 16
3837            )
3838            .iter()
3839            .all(|&b| b == 0xFF),
3840            "after entity bytes, everything should be 0xFF"
3841        );
3842    }
3843
3844    #[test]
3845    fn fixed_binary_eq_constraint_extracted() {
3846        let config = KvTableConfig::new(
3847            0,
3848            vec![
3849                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3850                TableColumnConfig::new("version", DataType::UInt64, false),
3851            ],
3852            vec!["entity".to_string(), "version".to_string()],
3853            vec![],
3854        )
3855        .unwrap();
3856        let model = TableModel::from_config(&config).unwrap();
3857
3858        // Build an equality expression: entity = X'AA..AA'
3859        use datafusion::logical_expr::col;
3860        let entity_literal =
3861            Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None);
3862        let filter = col("entity").eq(entity_literal);
3863
3864        assert!(
3865            QueryPredicate::supports_filter(&filter, &model),
3866            "FixedSizeBinary equality should be supported"
3867        );
3868
3869        let pred = QueryPredicate::from_filters(&[filter], &model);
3870        assert!(
3871            matches!(
3872                pred.constraints.get(&0),
3873                Some(PredicateConstraint::FixedBinaryEq(v)) if *v == vec![0xAA; 16]
3874            ),
3875            "should extract FixedBinaryEq constraint"
3876        );
3877    }
3878
3879    #[test]
3880    fn uint64_range_constraint_extracted() {
3881        let config = KvTableConfig::new(
3882            0,
3883            vec![
3884                TableColumnConfig::new("version", DataType::UInt64, false),
3885                TableColumnConfig::new("data", DataType::Utf8, true),
3886            ],
3887            vec!["version".to_string()],
3888            vec![],
3889        )
3890        .unwrap();
3891        let model = TableModel::from_config(&config).unwrap();
3892
3893        use datafusion::logical_expr::col;
3894        let filter = col("version").lt_eq(Expr::Literal(ScalarValue::UInt64(Some(42)), None));
3895
3896        assert!(
3897            QueryPredicate::supports_filter(&filter, &model),
3898            "UInt64 range should be supported"
3899        );
3900
3901        let pred = QueryPredicate::from_filters(&[filter], &model);
3902        assert!(
3903            matches!(
3904                pred.constraints.get(&0),
3905                Some(PredicateConstraint::UInt64Range {
3906                    min: None,
3907                    max: Some(42)
3908                })
3909            ),
3910            "should extract UInt64Range with max=42"
3911        );
3912    }
3913
3914    #[test]
3915    fn uint64_range_constraint_supports_values_above_i64_max() {
3916        let config = KvTableConfig::new(
3917            0,
3918            vec![
3919                TableColumnConfig::new("version", DataType::UInt64, false),
3920                TableColumnConfig::new("data", DataType::Utf8, true),
3921            ],
3922            vec!["version".to_string()],
3923            vec![],
3924        )
3925        .unwrap();
3926        let model = TableModel::from_config(&config).unwrap();
3927
3928        let threshold = (1u64 << 63) + 5;
3929        use datafusion::logical_expr::col;
3930        let filter =
3931            col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(threshold)), None));
3932
3933        assert!(QueryPredicate::supports_filter(&filter, &model));
3934
3935        let pred = QueryPredicate::from_filters(&[filter], &model);
3936        assert!(matches!(
3937            pred.constraints.get(&0),
3938            Some(PredicateConstraint::UInt64Range {
3939                min: Some(v),
3940                max: None
3941            }) if *v == threshold
3942        ));
3943    }
3944
3945    #[test]
3946    fn unsupported_uint64_comparison_does_not_force_contradiction() {
3947        let config = KvTableConfig::new(
3948            0,
3949            vec![
3950                TableColumnConfig::new("version", DataType::UInt64, false),
3951                TableColumnConfig::new("data", DataType::Utf8, true),
3952            ],
3953            vec!["version".to_string()],
3954            vec![],
3955        )
3956        .unwrap();
3957        let model = TableModel::from_config(&config).unwrap();
3958
3959        use datafusion::logical_expr::col;
3960        let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3961
3962        assert!(
3963            !QueryPredicate::supports_filter(&unsupported, &model),
3964            "negative Int64 literal on UInt64 column should not be pushdown-supported"
3965        );
3966
3967        let pred = QueryPredicate::from_filters(&[unsupported], &model);
3968        assert!(
3969            !pred.contradiction,
3970            "unsupported filter must not collapse scan to empty result"
3971        );
3972        assert!(
3973            pred.constraints.is_empty(),
3974            "unsupported filter must not contribute pushed constraints"
3975        );
3976    }
3977
3978    #[test]
3979    fn unsupported_uint64_comparison_in_and_keeps_supported_sibling() {
3980        let config = KvTableConfig::new(
3981            0,
3982            vec![
3983                TableColumnConfig::new("version", DataType::UInt64, false),
3984                TableColumnConfig::new("data", DataType::Utf8, true),
3985            ],
3986            vec!["version".to_string()],
3987            vec![],
3988        )
3989        .unwrap();
3990        let model = TableModel::from_config(&config).unwrap();
3991
3992        use datafusion::logical_expr::col;
3993        let supported = col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(10)), None));
3994        let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3995        let filter = supported.and(unsupported);
3996
3997        assert!(
3998            !QueryPredicate::supports_filter(&filter, &model),
3999            "mixed AND should not be marked fully pushdown-supported"
4000        );
4001
4002        let pred = QueryPredicate::from_filters(&[filter], &model);
4003        assert!(!pred.contradiction);
4004        assert!(matches!(
4005            pred.constraints.get(&0),
4006            Some(PredicateConstraint::UInt64Range {
4007                min: Some(10),
4008                max: None
4009            })
4010        ));
4011    }
4012
4013    #[test]
4014    fn uint64_in_list_pushdown() {
4015        let config = KvTableConfig::new(
4016            0,
4017            vec![
4018                TableColumnConfig::new("version", DataType::UInt64, false),
4019                TableColumnConfig::new("data", DataType::Utf8, true),
4020            ],
4021            vec!["version".to_string()],
4022            vec![],
4023        )
4024        .unwrap();
4025        let model = TableModel::from_config(&config).unwrap();
4026
4027        use datafusion::logical_expr::{col, in_list};
4028        let filter = in_list(
4029            col("version"),
4030            vec![
4031                Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4032                Expr::Literal(ScalarValue::UInt64(Some(5)), None),
4033                Expr::Literal(ScalarValue::UInt64(Some(10)), None),
4034            ],
4035            false,
4036        );
4037
4038        assert!(QueryPredicate::supports_filter(&filter, &model));
4039        let pred = QueryPredicate::from_filters(&[filter], &model);
4040        assert!(
4041            matches!(pred.constraints.get(&0), Some(PredicateConstraint::UInt64In(v)) if v.len() == 3),
4042            "should extract UInt64In with 3 values"
4043        );
4044    }
4045
4046    #[test]
4047    fn uint64_in_list_pushdown_supports_values_above_i64_max() {
4048        let config = KvTableConfig::new(
4049            0,
4050            vec![
4051                TableColumnConfig::new("version", DataType::UInt64, false),
4052                TableColumnConfig::new("data", DataType::Utf8, true),
4053            ],
4054            vec!["version".to_string()],
4055            vec![],
4056        )
4057        .unwrap();
4058        let model = TableModel::from_config(&config).unwrap();
4059
4060        let huge = 1u64 << 63;
4061        use datafusion::logical_expr::{col, in_list};
4062        let filter = in_list(
4063            col("version"),
4064            vec![
4065                Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4066                Expr::Literal(ScalarValue::UInt64(Some(huge)), None),
4067            ],
4068            false,
4069        );
4070
4071        assert!(QueryPredicate::supports_filter(&filter, &model));
4072        let pred = QueryPredicate::from_filters(&[filter], &model);
4073        assert!(matches!(
4074            pred.constraints.get(&0),
4075            Some(PredicateConstraint::UInt64In(v)) if v.contains(&huge) && v.len() == 2
4076        ));
4077    }
4078
4079    #[test]
4080    fn fixed_binary_in_list_pushdown() {
4081        let config = KvTableConfig::new(
4082            0,
4083            vec![
4084                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4085                TableColumnConfig::new("data", DataType::Utf8, true),
4086            ],
4087            vec!["entity".to_string()],
4088            vec![],
4089        )
4090        .unwrap();
4091        let model = TableModel::from_config(&config).unwrap();
4092
4093        use datafusion::logical_expr::{col, in_list};
4094        let filter = in_list(
4095            col("entity"),
4096            vec![
4097                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None),
4098                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xBB; 16])), None),
4099            ],
4100            false,
4101        );
4102
4103        assert!(QueryPredicate::supports_filter(&filter, &model));
4104        let pred = QueryPredicate::from_filters(&[filter], &model);
4105        assert!(
4106            matches!(
4107                pred.constraints.get(&0),
4108                Some(PredicateConstraint::FixedBinaryIn(v)) if v.len() == 2
4109            ),
4110            "should extract FixedBinaryIn with 2 values"
4111        );
4112
4113        // Verify range generation produces 2 ranges (one per entity)
4114        let ranges = pred.primary_key_ranges(&model).unwrap();
4115        assert_eq!(ranges.len(), 2, "should produce one range per entity");
4116    }
4117
4118    #[test]
4119    fn decimal256_range_pushdown() {
4120        let config = KvTableConfig::new(
4121            0,
4122            vec![
4123                TableColumnConfig::new("id", DataType::Int64, false),
4124                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4125            ],
4126            vec!["id".to_string()],
4127            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4128        )
4129        .unwrap();
4130        let model = TableModel::from_config(&config).unwrap();
4131
4132        use datafusion::logical_expr::col;
4133        let filter = col("big_val").gt_eq(Expr::Literal(
4134            ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4135            None,
4136        ));
4137
4138        assert!(
4139            QueryPredicate::supports_filter(&filter, &model),
4140            "Decimal256 range should be supported"
4141        );
4142
4143        let pred = QueryPredicate::from_filters(&[filter], &model);
4144        let big_idx = *model.columns_by_name.get("big_val").unwrap();
4145        assert!(
4146            matches!(
4147                pred.constraints.get(&big_idx),
4148                Some(PredicateConstraint::Decimal256Range {
4149                    min: Some(_),
4150                    max: None
4151                })
4152            ),
4153            "should extract Decimal256Range with min=100, no max"
4154        );
4155
4156        // Verify constraint matching
4157        let val_in = CellValue::Decimal256(i256::from(200i64));
4158        let val_out = CellValue::Decimal256(i256::from(50i64));
4159        let constraint = pred.constraints.get(&big_idx).unwrap();
4160        assert!(matches_constraint(&val_in, constraint));
4161        assert!(!matches_constraint(&val_out, constraint));
4162    }
4163
4164    #[test]
4165    fn uint64_constraint_matching_does_not_wrap_large_values() {
4166        let gt_zero = PredicateConstraint::UInt64Range {
4167            min: Some(1),
4168            max: None,
4169        };
4170        assert!(matches_constraint(&CellValue::UInt64(1u64 << 63), &gt_zero));
4171        assert!(!matches_constraint(&CellValue::UInt64(0), &gt_zero));
4172
4173        let in_list = PredicateConstraint::UInt64In(vec![1, 2, 3]);
4174        assert!(matches_constraint(&CellValue::UInt64(2), &in_list));
4175        assert!(!matches_constraint(
4176            &CellValue::UInt64(1u64 << 63),
4177            &in_list
4178        ));
4179    }
4180
4181    #[test]
4182    fn uint64_empty_range_produces_no_pk_ranges() {
4183        let config = KvTableConfig::new(
4184            0,
4185            vec![TableColumnConfig::new("version", DataType::UInt64, false)],
4186            vec!["version".to_string()],
4187            vec![],
4188        )
4189        .unwrap();
4190        let model = TableModel::from_config(&config).unwrap();
4191        let mut pred = QueryPredicate::default();
4192        pred.constraints.insert(
4193            0,
4194            PredicateConstraint::UInt64Range {
4195                min: Some(10),
4196                max: Some(9),
4197            },
4198        );
4199
4200        let ranges = pred.primary_key_ranges(&model).unwrap();
4201        assert!(ranges.is_empty());
4202    }
4203
4204    #[test]
4205    fn utf8_primary_key_encoding_supports_unicode_and_long_values() {
4206        let config = KvTableConfig::new(
4207            0,
4208            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4209            vec!["id".to_string()],
4210            vec![],
4211        )
4212        .unwrap();
4213        let model = TableModel::from_config(&config).unwrap();
4214
4215        let row_non_ascii = KvRow {
4216            values: vec![CellValue::Utf8("naive-cafe-e9".replace("e9", "\u{00E9}"))],
4217        };
4218        let key_non_ascii = encode_primary_key_from_row(model.table_prefix, &row_non_ascii, &model)
4219            .expect("non-ascii PK should encode");
4220        let decoded_non_ascii = decode_primary_key(model.table_prefix, &key_non_ascii, &model)
4221            .expect("non-ascii PK should decode");
4222        assert!(matches!(
4223            decoded_non_ascii.as_slice(),
4224            [CellValue::Utf8(value)] if value == "naive-cafe-\u{00E9}"
4225        ));
4226
4227        let row_too_long = KvRow {
4228            values: vec![CellValue::Utf8("abcdefghijklmnopq".to_string())],
4229        };
4230        let key_too_long = encode_primary_key_from_row(model.table_prefix, &row_too_long, &model)
4231            .expect("long UTF-8 PK should encode");
4232        let decoded_too_long = decode_primary_key(model.table_prefix, &key_too_long, &model)
4233            .expect("long UTF-8 PK should decode");
4234        assert!(matches!(
4235            decoded_too_long.as_slice(),
4236            [CellValue::Utf8(value)] if value == "abcdefghijklmnopq"
4237        ));
4238    }
4239
4240    #[test]
4241    fn utf8_primary_key_encodes_at_max_codec_payload_and_rejects_overflow() {
4242        let config = KvTableConfig::new(
4243            0,
4244            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4245            vec!["id".to_string()],
4246            vec![],
4247        )
4248        .unwrap();
4249        let model = TableModel::from_config(&config).unwrap();
4250        let max_payload = model.primary_key_codec.payload_capacity_bytes();
4251        let max_value = "a".repeat(max_payload - 1);
4252        let overflow_value = "a".repeat(max_payload);
4253
4254        let key = encode_primary_key_from_row(
4255            model.table_prefix,
4256            &KvRow {
4257                values: vec![CellValue::Utf8(max_value.clone())],
4258            },
4259            &model,
4260        )
4261        .expect("max-length UTF-8 PK should encode");
4262        assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4263        let decoded = decode_primary_key(model.table_prefix, &key, &model)
4264            .expect("max-length PK should decode");
4265        assert!(matches!(
4266            decoded.as_slice(),
4267            [CellValue::Utf8(value)] if value == &max_value
4268        ));
4269
4270        let err = encode_primary_key_from_row(
4271            model.table_prefix,
4272            &KvRow {
4273                values: vec![CellValue::Utf8(overflow_value)],
4274            },
4275            &model,
4276        )
4277        .expect_err("UTF-8 PK exceeding codec payload should be rejected");
4278        assert!(err.contains("primary key payload exceeds codec payload capacity 253 bytes"));
4279    }
4280
4281    #[test]
4282    fn utf8_primary_key_round_trips_embedded_nul() {
4283        let config = KvTableConfig::new(
4284            0,
4285            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4286            vec!["id".to_string()],
4287            vec![],
4288        )
4289        .unwrap();
4290        let model = TableModel::from_config(&config).unwrap();
4291        let row = KvRow {
4292            values: vec![CellValue::Utf8("AB\0CD".to_string())],
4293        };
4294
4295        let key = encode_primary_key_from_row(model.table_prefix, &row, &model)
4296            .expect("embedded NUL in key text must encode");
4297        let decoded =
4298            decode_primary_key(model.table_prefix, &key, &model).expect("embedded NUL must decode");
4299        assert!(matches!(
4300            decoded.as_slice(),
4301            [CellValue::Utf8(value)] if value == "AB\0CD"
4302        ));
4303    }
4304
4305    #[test]
4306    fn utf8_index_key_round_trips_embedded_nul() {
4307        let config = KvTableConfig::new(
4308            0,
4309            vec![
4310                TableColumnConfig::new("id", DataType::Int64, false),
4311                TableColumnConfig::new("tag", DataType::Utf8, false),
4312            ],
4313            vec!["id".to_string()],
4314            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4315        )
4316        .unwrap();
4317        let model = TableModel::from_config(&config).unwrap();
4318        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4319        let row = KvRow {
4320            values: vec![CellValue::Int64(1), CellValue::Utf8("AB\0CD".to_string())],
4321        };
4322
4323        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
4324            .expect("embedded NUL in index key text must encode");
4325        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
4326            .expect("embedded NUL index key must decode");
4327        assert!(matches!(
4328            decoded.values.get(&1),
4329            Some(CellValue::Utf8(value)) if value == "AB\0CD"
4330        ));
4331    }
4332
4333    #[test]
4334    fn secondary_index_with_long_utf8_primary_key_encodes_at_max_payload_and_rejects_overflow() {
4335        let config = KvTableConfig::new(
4336            0,
4337            vec![
4338                TableColumnConfig::new("id", DataType::Utf8, false),
4339                TableColumnConfig::new("tag", DataType::Utf8, false),
4340            ],
4341            vec!["id".to_string()],
4342            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4343        )
4344        .unwrap();
4345        let model = TableModel::from_config(&config).unwrap();
4346        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4347        let spec = &specs[0];
4348        let max_payload = spec.codec.payload_capacity_bytes();
4349        let max_tag = "t".to_string();
4350        let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4351        let overflow_id = format!("{max_id}x");
4352
4353        let key = encode_secondary_index_key(
4354            model.table_prefix,
4355            spec,
4356            &model,
4357            &KvRow {
4358                values: vec![
4359                    CellValue::Utf8(max_id.clone()),
4360                    CellValue::Utf8(max_tag.clone()),
4361                ],
4362            },
4363        )
4364        .expect("secondary key at max payload should encode");
4365        assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4366        let decoded =
4367            decode_secondary_index_key(model.table_prefix, spec, &model, &key).expect("decode");
4368        assert!(matches!(
4369            decoded.values.get(&1),
4370            Some(CellValue::Utf8(value)) if value == &max_tag
4371        ));
4372        assert!(matches!(
4373            decoded.primary_key_values.as_slice(),
4374            [CellValue::Utf8(value)] if value == &max_id
4375        ));
4376
4377        let err = encode_secondary_index_key(
4378            model.table_prefix,
4379            spec,
4380            &model,
4381            &KvRow {
4382                values: vec![CellValue::Utf8(overflow_id), CellValue::Utf8(max_tag)],
4383            },
4384        )
4385        .expect_err("secondary key exceeding max payload should be rejected");
4386        assert!(err.contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4387    }
4388
4389    #[test]
4390    fn secondary_index_from_parts_with_long_utf8_primary_key_rejects_overflow() {
4391        let config = KvTableConfig::new(
4392            0,
4393            vec![
4394                TableColumnConfig::new("id", DataType::Utf8, false),
4395                TableColumnConfig::new("tag", DataType::Utf8, false),
4396            ],
4397            vec!["id".to_string()],
4398            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4399        )
4400        .unwrap();
4401        let model = TableModel::from_config(&config).unwrap();
4402        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4403        let spec = &specs[0];
4404        let max_payload = spec.codec.payload_capacity_bytes();
4405        let max_tag = "t".to_string();
4406        let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4407        let overflow_id = format!("{max_id}x");
4408        let max_row = KvRow {
4409            values: vec![
4410                CellValue::Utf8(max_id.clone()),
4411                CellValue::Utf8(max_tag.clone()),
4412            ],
4413        };
4414        let encoded_row = encode_base_row_value(&max_row, &model).expect("encode row");
4415        let archived = decode_stored_row(&encoded_row).expect("archive row");
4416
4417        let key = encode_secondary_index_key_from_parts(
4418            model.table_prefix,
4419            spec,
4420            &model,
4421            &[CellValue::Utf8(max_id.clone())],
4422            &archived,
4423        )
4424        .expect("backfill path should encode max payload");
4425        assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4426
4427        let err = encode_secondary_index_key_from_parts(
4428            model.table_prefix,
4429            spec,
4430            &model,
4431            &[CellValue::Utf8(overflow_id)],
4432            &archived,
4433        )
4434        .expect_err("backfill path overflow should be rejected");
4435        assert!(err
4436            .to_string()
4437            .contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4438    }
4439
4440    #[test]
4441    fn primary_key_type_mismatch_returns_error_instead_of_panicking() {
4442        let config = KvTableConfig::new(
4443            0,
4444            vec![TableColumnConfig::new("id", DataType::UInt64, false)],
4445            vec!["id".to_string()],
4446            vec![],
4447        )
4448        .unwrap();
4449        let model = TableModel::from_config(&config).unwrap();
4450        let row = KvRow {
4451            values: vec![CellValue::Int64(7)],
4452        };
4453
4454        let err = encode_primary_key_from_row(model.table_prefix, &row, &model)
4455            .expect_err("mismatched PK type should return an error");
4456        assert!(err.contains("type mismatch while encoding key value"));
4457    }
4458
4459    #[test]
4460    fn choose_index_plan_uses_fixed_binary_leading_constraint() {
4461        let config = KvTableConfig::new(
4462            0,
4463            vec![
4464                TableColumnConfig::new("id", DataType::Int64, false),
4465                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4466            ],
4467            vec!["id".to_string()],
4468            vec![IndexSpec::new("entity_idx", vec!["entity".to_string()]).unwrap()],
4469        )
4470        .unwrap();
4471        let model = TableModel::from_config(&config).unwrap();
4472        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4473
4474        use datafusion::logical_expr::col;
4475        let filter = col("entity").eq(Expr::Literal(
4476            ScalarValue::FixedSizeBinary(16, Some(vec![0xAB; 16])),
4477            None,
4478        ));
4479        let pred = QueryPredicate::from_filters(&[filter], &model);
4480        let plan = pred
4481            .choose_index_plan(&model, &specs)
4482            .unwrap()
4483            .expect("fixed-binary equality should choose an index");
4484
4485        assert_eq!(plan.constrained_prefix_len, 1);
4486        assert_eq!(plan.ranges.len(), 1);
4487        let range = &plan.ranges[0];
4488        assert_eq!(
4489            index_payload(&specs[0], &range.start, 0, 16),
4490            vec![0xAB; 16]
4491        );
4492        assert_eq!(index_payload(&specs[0], &range.end, 0, 16), vec![0xAB; 16]);
4493    }
4494
4495    #[test]
4496    fn choose_index_plan_uses_decimal256_leading_constraint() {
4497        let config = KvTableConfig::new(
4498            0,
4499            vec![
4500                TableColumnConfig::new("id", DataType::Int64, false),
4501                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4502            ],
4503            vec!["id".to_string()],
4504            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4505        )
4506        .unwrap();
4507        let model = TableModel::from_config(&config).unwrap();
4508        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4509
4510        use datafusion::logical_expr::col;
4511        let filter = col("big_val").gt_eq(Expr::Literal(
4512            ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4513            None,
4514        ));
4515        let pred = QueryPredicate::from_filters(&[filter], &model);
4516        let plan = pred
4517            .choose_index_plan(&model, &specs)
4518            .unwrap()
4519            .expect("decimal256 range should choose an index");
4520
4521        assert_eq!(plan.constrained_prefix_len, 1);
4522        assert_eq!(plan.ranges.len(), 1);
4523        let range = &plan.ranges[0];
4524        assert_eq!(
4525            index_payload(&specs[0], &range.start, 0, 32),
4526            encode_i256_ordered(i256::from(100i64)).to_vec()
4527        );
4528    }
4529
4530    #[tokio::test]
4531    async fn backfill_added_indexes_writes_entries_for_existing_rows() {
4532        let state = MockState {
4533            kv: Arc::new(Mutex::new(BTreeMap::new())),
4534            range_calls: Arc::new(AtomicUsize::new(0)),
4535            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4536            sequence_number: Arc::new(AtomicU64::new(0)),
4537        };
4538        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4539        let client = StoreClient::new(&base_url);
4540
4541        let seed_schema = KvSchema::new(client.clone())
4542            .table(
4543                "orders",
4544                vec![
4545                    TableColumnConfig::new("id", DataType::Int64, false),
4546                    TableColumnConfig::new("status", DataType::Utf8, false),
4547                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4548                ],
4549                vec!["id".to_string()],
4550                vec![],
4551            )
4552            .expect("seed schema");
4553        let mut writer = seed_schema.batch_writer();
4554        for i in 0..6i64 {
4555            writer
4556                .insert(
4557                    "orders",
4558                    vec![
4559                        CellValue::Int64(i),
4560                        CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
4561                        CellValue::Int64(i * 10),
4562                    ],
4563                )
4564                .expect("seed row");
4565        }
4566        writer.flush().await.expect("seed flush");
4567
4568        {
4569            let guard = state.kv.lock().expect("kv mutex poisoned");
4570            let base_rows = guard
4571                .keys()
4572                .filter(|key| matches_primary_key(0, key))
4573                .count();
4574            let index_rows = guard
4575                .keys()
4576                .filter(|key| matches_secondary_index_key(0, 1, key))
4577                .count();
4578            assert_eq!(base_rows, 6);
4579            assert_eq!(index_rows, 0);
4580        }
4581
4582        let backfill_schema = KvSchema::new(client.clone())
4583            .table(
4584                "orders",
4585                vec![
4586                    TableColumnConfig::new("id", DataType::Int64, false),
4587                    TableColumnConfig::new("status", DataType::Utf8, false),
4588                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4589                ],
4590                vec!["id".to_string()],
4591                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
4592                    .expect("valid index")
4593                    .with_cover_columns(vec!["amount_cents".to_string()])],
4594            )
4595            .expect("backfill schema");
4596        let report = backfill_schema
4597            .backfill_added_indexes_with_options(
4598                "orders",
4599                &[],
4600                IndexBackfillOptions {
4601                    row_batch_size: 2,
4602                    start_from_primary_key: None,
4603                },
4604            )
4605            .await
4606            .expect("backfill should succeed");
4607        assert_eq!(report.scanned_rows, 6);
4608        assert_eq!(report.indexes_backfilled, 1);
4609        assert_eq!(report.index_entries_written, 6);
4610
4611        {
4612            let guard = state.kv.lock().expect("kv mutex poisoned");
4613            let index_rows = guard
4614                .keys()
4615                .filter(|key| matches_secondary_index_key(0, 1, key))
4616                .count();
4617            assert_eq!(index_rows, 6);
4618            let (_, sample_value) = guard
4619                .iter()
4620                .find(|(key, _)| matches_secondary_index_key(0, 1, key))
4621                .expect("backfill should create index entry");
4622            let archived = decode_stored_row(sample_value.as_ref())
4623                .expect("covering value must be valid codec");
4624            assert_eq!(archived.values.len(), 3);
4625        }
4626
4627        let _ = shutdown_tx.send(());
4628    }
4629
4630    #[tokio::test]
4631    async fn backfill_added_indexes_writes_zorder_entries_for_existing_rows() {
4632        let state = MockState {
4633            kv: Arc::new(Mutex::new(BTreeMap::new())),
4634            range_calls: Arc::new(AtomicUsize::new(0)),
4635            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4636            sequence_number: Arc::new(AtomicU64::new(0)),
4637        };
4638        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4639        let client = StoreClient::new(&base_url);
4640
4641        let seed_schema = KvSchema::new(client.clone())
4642            .table(
4643                "points",
4644                vec![
4645                    TableColumnConfig::new("x", DataType::Int64, false),
4646                    TableColumnConfig::new("y", DataType::Int64, false),
4647                    TableColumnConfig::new("id", DataType::Int64, false),
4648                    TableColumnConfig::new("value", DataType::Int64, false),
4649                ],
4650                vec!["id".to_string()],
4651                vec![],
4652            )
4653            .expect("seed schema");
4654        let mut writer = seed_schema.batch_writer();
4655        for (x, y, id, value) in [(1, 1, 11, 110), (1, 2, 12, 120), (2, 1, 21, 210)] {
4656            writer
4657                .insert(
4658                    "points",
4659                    vec![
4660                        CellValue::Int64(x),
4661                        CellValue::Int64(y),
4662                        CellValue::Int64(id),
4663                        CellValue::Int64(value),
4664                    ],
4665                )
4666                .expect("seed row");
4667        }
4668        writer.flush().await.expect("seed flush");
4669
4670        let backfill_schema = KvSchema::new(client.clone())
4671            .table(
4672                "points",
4673                vec![
4674                    TableColumnConfig::new("x", DataType::Int64, false),
4675                    TableColumnConfig::new("y", DataType::Int64, false),
4676                    TableColumnConfig::new("id", DataType::Int64, false),
4677                    TableColumnConfig::new("value", DataType::Int64, false),
4678                ],
4679                vec!["id".to_string()],
4680                vec![
4681                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
4682                        .expect("valid index")
4683                        .with_cover_columns(vec!["value".to_string()]),
4684                ],
4685            )
4686            .expect("backfill schema");
4687        let report = backfill_schema
4688            .backfill_added_indexes_with_options(
4689                "points",
4690                &[],
4691                IndexBackfillOptions {
4692                    row_batch_size: 2,
4693                    start_from_primary_key: None,
4694                },
4695            )
4696            .await
4697            .expect("backfill should succeed");
4698        assert_eq!(report.scanned_rows, 3);
4699        assert_eq!(report.index_entries_written, 3);
4700
4701        let guard = state.kv.lock().expect("kv mutex poisoned");
4702        let index_entry = guard
4703            .keys()
4704            .find(|key| matches_secondary_index_key(0, 1, key))
4705            .cloned()
4706            .expect("z-order backfill should create index entry");
4707        let config = KvTableConfig::new(
4708            0,
4709            vec![
4710                TableColumnConfig::new("x", DataType::Int64, false),
4711                TableColumnConfig::new("y", DataType::Int64, false),
4712                TableColumnConfig::new("id", DataType::Int64, false),
4713                TableColumnConfig::new("value", DataType::Int64, false),
4714            ],
4715            vec!["id".to_string()],
4716            vec![
4717                IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()]).expect("valid"),
4718            ],
4719        )
4720        .expect("config");
4721        let model = TableModel::from_config(&config).expect("model");
4722        let spec = model
4723            .resolve_index_specs(&config.index_specs)
4724            .expect("specs")
4725            .remove(0);
4726        let decoded = decode_secondary_index_key(model.table_prefix, &spec, &model, &index_entry)
4727            .expect("decode z-order key");
4728        let x_idx = *model.columns_by_name.get("x").unwrap();
4729        let y_idx = *model.columns_by_name.get("y").unwrap();
4730        assert!(matches!(
4731            decoded.values.get(&x_idx),
4732            Some(CellValue::Int64(_))
4733        ));
4734        assert!(matches!(
4735            decoded.values.get(&y_idx),
4736            Some(CellValue::Int64(_))
4737        ));
4738
4739        let _ = shutdown_tx.send(());
4740    }
4741
4742    #[tokio::test]
4743    async fn backfill_added_indexes_requires_append_only_index_evolution() {
4744        let client = StoreClient::new("http://127.0.0.1:1");
4745        let schema = KvSchema::new(client)
4746            .table(
4747                "orders",
4748                vec![
4749                    TableColumnConfig::new("id", DataType::Int64, false),
4750                    TableColumnConfig::new("status", DataType::Utf8, false),
4751                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4752                ],
4753                vec!["id".to_string()],
4754                vec![
4755                    IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid"),
4756                    IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid"),
4757                ],
4758            )
4759            .expect("schema");
4760
4761        let previous_specs =
4762            vec![IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid")];
4763        let err = schema
4764            .backfill_added_indexes("orders", &previous_specs)
4765            .await
4766            .expect_err("non-append-only evolution should be rejected");
4767        assert!(err
4768            .to_string()
4769            .contains("index evolution must be append-only"));
4770    }
4771
4772    #[tokio::test]
4773    async fn backfill_added_indexes_is_noop_when_no_new_indexes() {
4774        let client = StoreClient::new("http://127.0.0.1:1");
4775        let existing = IndexSpec::new("status_idx", vec!["status".to_string()])
4776            .expect("valid")
4777            .with_cover_columns(vec!["amount_cents".to_string()]);
4778        let schema = KvSchema::new(client)
4779            .table(
4780                "orders",
4781                vec![
4782                    TableColumnConfig::new("id", DataType::Int64, false),
4783                    TableColumnConfig::new("status", DataType::Utf8, false),
4784                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4785                ],
4786                vec!["id".to_string()],
4787                vec![existing.clone()],
4788            )
4789            .expect("schema");
4790
4791        let report = schema
4792            .backfill_added_indexes("orders", &[existing])
4793            .await
4794            .expect("no-op backfill should succeed");
4795        assert_eq!(report, IndexBackfillReport::default());
4796    }
4797
4798    #[tokio::test]
4799    async fn backfill_added_indexes_rejects_zero_row_batch_size() {
4800        let client = StoreClient::new("http://127.0.0.1:1");
4801        let schema = KvSchema::new(client)
4802            .table(
4803                "orders",
4804                vec![
4805                    TableColumnConfig::new("id", DataType::Int64, false),
4806                    TableColumnConfig::new("status", DataType::Utf8, false),
4807                ],
4808                vec!["id".to_string()],
4809                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4810            )
4811            .expect("schema");
4812        let err = schema
4813            .backfill_added_indexes_with_options(
4814                "orders",
4815                &[],
4816                IndexBackfillOptions {
4817                    row_batch_size: 0,
4818                    start_from_primary_key: None,
4819                },
4820            )
4821            .await
4822            .expect_err("row_batch_size=0 should fail");
4823        assert!(err.to_string().contains("row_batch_size must be > 0"));
4824    }
4825
4826    #[tokio::test]
4827    async fn backfill_added_indexes_emits_progress_events() {
4828        let state = MockState {
4829            kv: Arc::new(Mutex::new(BTreeMap::new())),
4830            range_calls: Arc::new(AtomicUsize::new(0)),
4831            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4832            sequence_number: Arc::new(AtomicU64::new(0)),
4833        };
4834        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4835        let client = StoreClient::new(&base_url);
4836
4837        let seed_schema = KvSchema::new(client.clone())
4838            .table(
4839                "orders",
4840                vec![
4841                    TableColumnConfig::new("id", DataType::Int64, false),
4842                    TableColumnConfig::new("status", DataType::Utf8, false),
4843                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4844                ],
4845                vec!["id".to_string()],
4846                vec![],
4847            )
4848            .expect("seed schema");
4849        let mut writer = seed_schema.batch_writer();
4850        for i in 0..5i64 {
4851            writer
4852                .insert(
4853                    "orders",
4854                    vec![
4855                        CellValue::Int64(i),
4856                        CellValue::Utf8("open".to_string()),
4857                        CellValue::Int64(i * 10),
4858                    ],
4859                )
4860                .expect("seed row");
4861        }
4862        writer.flush().await.expect("seed flush");
4863
4864        let backfill_schema = KvSchema::new(client.clone())
4865            .table(
4866                "orders",
4867                vec![
4868                    TableColumnConfig::new("id", DataType::Int64, false),
4869                    TableColumnConfig::new("status", DataType::Utf8, false),
4870                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4871                ],
4872                vec!["id".to_string()],
4873                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4874            )
4875            .expect("backfill schema");
4876
4877        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
4878        let report = backfill_schema
4879            .backfill_added_indexes_with_options_and_progress(
4880                "orders",
4881                &[],
4882                IndexBackfillOptions {
4883                    row_batch_size: 2,
4884                    start_from_primary_key: None,
4885                },
4886                Some(&progress_tx),
4887            )
4888            .await
4889            .expect("backfill should succeed");
4890        drop(progress_tx);
4891
4892        let mut saw_started = false;
4893        let mut saw_completed = false;
4894        let mut progress_events = 0usize;
4895        while let Some(event) = progress_rx.recv().await {
4896            match event {
4897                IndexBackfillEvent::Started {
4898                    table_name,
4899                    indexes_backfilled,
4900                    row_batch_size,
4901                    ..
4902                } => {
4903                    saw_started = true;
4904                    assert_eq!(table_name, "orders");
4905                    assert_eq!(indexes_backfilled, 1);
4906                    assert_eq!(row_batch_size, 2);
4907                }
4908                IndexBackfillEvent::Progress {
4909                    scanned_rows,
4910                    index_entries_written,
4911                    ..
4912                } => {
4913                    progress_events += 1;
4914                    assert!(scanned_rows >= 1);
4915                    assert_eq!(scanned_rows, index_entries_written);
4916                }
4917                IndexBackfillEvent::Completed {
4918                    report: completed_report,
4919                } => {
4920                    saw_completed = true;
4921                    assert_eq!(completed_report, report);
4922                }
4923            }
4924        }
4925        assert!(saw_started);
4926        assert!(saw_completed);
4927        assert!(progress_events >= 1);
4928
4929        let _ = shutdown_tx.send(());
4930    }
4931
4932    #[tokio::test]
4933    async fn backfill_added_indexes_can_resume_from_primary_key() {
4934        let state = MockState {
4935            kv: Arc::new(Mutex::new(BTreeMap::new())),
4936            range_calls: Arc::new(AtomicUsize::new(0)),
4937            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4938            sequence_number: Arc::new(AtomicU64::new(0)),
4939        };
4940        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4941        let client = StoreClient::new(&base_url);
4942
4943        let seed_schema = KvSchema::new(client.clone())
4944            .table(
4945                "orders",
4946                vec![
4947                    TableColumnConfig::new("id", DataType::Int64, false),
4948                    TableColumnConfig::new("status", DataType::Utf8, false),
4949                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4950                ],
4951                vec!["id".to_string()],
4952                vec![],
4953            )
4954            .expect("seed schema");
4955        let mut writer = seed_schema.batch_writer();
4956        for i in 0..6i64 {
4957            writer
4958                .insert(
4959                    "orders",
4960                    vec![
4961                        CellValue::Int64(i),
4962                        CellValue::Utf8("open".to_string()),
4963                        CellValue::Int64(i * 10),
4964                    ],
4965                )
4966                .expect("seed row");
4967        }
4968        writer.flush().await.expect("seed flush");
4969
4970        let backfill_schema = KvSchema::new(client.clone())
4971            .table(
4972                "orders",
4973                vec![
4974                    TableColumnConfig::new("id", DataType::Int64, false),
4975                    TableColumnConfig::new("status", DataType::Utf8, false),
4976                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4977                ],
4978                vec!["id".to_string()],
4979                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4980            )
4981            .expect("backfill schema");
4982
4983        let config = KvTableConfig::new(
4984            0,
4985            vec![
4986                TableColumnConfig::new("id", DataType::Int64, false),
4987                TableColumnConfig::new("status", DataType::Utf8, false),
4988                TableColumnConfig::new("amount_cents", DataType::Int64, false),
4989            ],
4990            vec!["id".to_string()],
4991            vec![],
4992        )
4993        .expect("valid config");
4994        let model = TableModel::from_config(&config).expect("model");
4995        let resume_value = CellValue::Int64(3);
4996        let resume_key =
4997            encode_primary_key(model.table_prefix, &[&resume_value], &model).expect("resume key");
4998
4999        let report = backfill_schema
5000            .backfill_added_indexes_with_options(
5001                "orders",
5002                &[],
5003                IndexBackfillOptions {
5004                    row_batch_size: 2,
5005                    start_from_primary_key: Some(resume_key.clone()),
5006                },
5007            )
5008            .await
5009            .expect("resume backfill should succeed");
5010        assert_eq!(report.scanned_rows, 3);
5011        assert_eq!(report.index_entries_written, 3);
5012
5013        {
5014            let guard = state.kv.lock().expect("kv mutex poisoned");
5015            let index_rows = guard
5016                .keys()
5017                .filter(|key| matches_secondary_index_key(0, 1, key))
5018                .count();
5019            assert_eq!(index_rows, 3);
5020        }
5021
5022        let resume_payload = model
5023            .primary_key_codec
5024            .read_payload(&resume_key, 0, model.primary_key_width)
5025            .expect("resume payload");
5026        let wrong_prefix = secondary_index_codec(model.table_prefix, 1)
5027            .expect("secondary codec")
5028            .encode(&resume_payload)
5029            .expect("wrong prefix key");
5030        let err = backfill_schema
5031            .backfill_added_indexes_with_options(
5032                "orders",
5033                &[],
5034                IndexBackfillOptions {
5035                    row_batch_size: 2,
5036                    start_from_primary_key: Some(wrong_prefix),
5037                },
5038            )
5039            .await
5040            .expect_err("wrong key prefix must be rejected");
5041        assert!(err.to_string().contains("primary-key prefix"));
5042
5043        let _ = shutdown_tx.send(());
5044    }
5045
5046    #[tokio::test]
5047    async fn covering_index_scan_fails_closed_when_covering_payload_missing() {
5048        let state = MockState {
5049            kv: Arc::new(Mutex::new(BTreeMap::new())),
5050            range_calls: Arc::new(AtomicUsize::new(0)),
5051            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5052            sequence_number: Arc::new(AtomicU64::new(0)),
5053        };
5054        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5055        let client = StoreClient::new(&base_url);
5056
5057        let schema = KvSchema::new(client.clone())
5058            .table(
5059                "orders",
5060                vec![
5061                    TableColumnConfig::new("id", DataType::Int64, false),
5062                    TableColumnConfig::new("status", DataType::Utf8, false),
5063                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5064                ],
5065                vec!["id".to_string()],
5066                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5067                    .expect("valid")
5068                    .with_cover_columns(vec!["amount_cents".to_string()])],
5069            )
5070            .expect("schema");
5071        let mut writer = schema.batch_writer();
5072        for id in 0..4i64 {
5073            writer
5074                .insert(
5075                    "orders",
5076                    vec![
5077                        CellValue::Int64(id),
5078                        CellValue::Utf8("open".to_string()),
5079                        CellValue::Int64(id * 10),
5080                    ],
5081                )
5082                .expect("row");
5083        }
5084        writer.flush().await.expect("flush");
5085
5086        {
5087            let mut guard = state.kv.lock().expect("kv mutex poisoned");
5088            let key = guard
5089                .keys()
5090                .find(|key| matches_secondary_index_key(0, 1, key))
5091                .expect("index row should exist")
5092                .clone();
5093            guard.insert(key, Bytes::new());
5094        }
5095
5096        let ctx = SessionContext::new();
5097        schema.register_all(&ctx).expect("register");
5098        let df = ctx
5099            .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5100            .await
5101            .expect("query should plan");
5102        let err = df
5103            .collect()
5104            .await
5105            .expect_err("missing covering payload must fail closed");
5106        assert!(err
5107            .to_string()
5108            .contains("secondary index entry missing covering payload"));
5109
5110        let _ = shutdown_tx.send(());
5111    }
5112
5113    #[tokio::test]
5114    async fn covering_index_scan_fails_closed_when_covering_payload_is_corrupt() {
5115        let state = MockState {
5116            kv: Arc::new(Mutex::new(BTreeMap::new())),
5117            range_calls: Arc::new(AtomicUsize::new(0)),
5118            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5119            sequence_number: Arc::new(AtomicU64::new(0)),
5120        };
5121        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5122        let client = StoreClient::new(&base_url);
5123
5124        let schema = KvSchema::new(client.clone())
5125            .table(
5126                "orders",
5127                vec![
5128                    TableColumnConfig::new("id", DataType::Int64, false),
5129                    TableColumnConfig::new("status", DataType::Utf8, false),
5130                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5131                ],
5132                vec!["id".to_string()],
5133                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5134                    .expect("valid")
5135                    .with_cover_columns(vec!["amount_cents".to_string()])],
5136            )
5137            .expect("schema");
5138        let mut writer = schema.batch_writer();
5139        for id in 0..4i64 {
5140            writer
5141                .insert(
5142                    "orders",
5143                    vec![
5144                        CellValue::Int64(id),
5145                        CellValue::Utf8("open".to_string()),
5146                        CellValue::Int64(id * 10),
5147                    ],
5148                )
5149                .expect("row");
5150        }
5151        writer.flush().await.expect("flush");
5152
5153        {
5154            let mut guard = state.kv.lock().expect("kv mutex poisoned");
5155            let key = guard
5156                .keys()
5157                .find(|key| matches_secondary_index_key(0, 1, key))
5158                .expect("index row should exist")
5159                .clone();
5160            guard.insert(key, Bytes::from_static(b"not-codec"));
5161        }
5162
5163        let ctx = SessionContext::new();
5164        schema.register_all(&ctx).expect("register");
5165        let df = ctx
5166            .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5167            .await
5168            .expect("query should plan");
5169        let err = df
5170            .collect()
5171            .await
5172            .expect_err("corrupt covering payload must fail closed");
5173        assert!(err.to_string().contains("invalid covering index payload"));
5174
5175        let _ = shutdown_tx.send(());
5176    }
5177
5178    #[tokio::test]
5179    async fn non_covering_index_uses_point_lookup_instead_of_full_scan() {
5180        let state = MockState {
5181            kv: Arc::new(Mutex::new(BTreeMap::new())),
5182            range_calls: Arc::new(AtomicUsize::new(0)),
5183            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5184            sequence_number: Arc::new(AtomicU64::new(0)),
5185        };
5186        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5187        let client = StoreClient::new(&base_url);
5188
5189        let schema = KvSchema::new(client.clone())
5190            .table(
5191                "orders",
5192                vec![
5193                    TableColumnConfig::new("id", DataType::Int64, false),
5194                    TableColumnConfig::new("status", DataType::Utf8, false),
5195                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5196                    TableColumnConfig::new("notes", DataType::Utf8, true),
5197                ],
5198                vec!["id".to_string()],
5199                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5200            )
5201            .expect("schema");
5202        let mut writer = schema.batch_writer();
5203        writer
5204            .insert(
5205                "orders",
5206                vec![
5207                    CellValue::Int64(1),
5208                    CellValue::Utf8("open".to_string()),
5209                    CellValue::Int64(100),
5210                    CellValue::Utf8("first".to_string()),
5211                ],
5212            )
5213            .expect("row");
5214        writer
5215            .insert(
5216                "orders",
5217                vec![
5218                    CellValue::Int64(2),
5219                    CellValue::Utf8("closed".to_string()),
5220                    CellValue::Int64(200),
5221                    CellValue::Utf8("second".to_string()),
5222                ],
5223            )
5224            .expect("row");
5225        writer
5226            .insert(
5227                "orders",
5228                vec![
5229                    CellValue::Int64(3),
5230                    CellValue::Utf8("open".to_string()),
5231                    CellValue::Int64(300),
5232                    CellValue::Utf8("third".to_string()),
5233                ],
5234            )
5235            .expect("row");
5236        writer.flush().await.expect("flush");
5237
5238        let ctx = SessionContext::new();
5239        schema.register_all(&ctx).expect("register");
5240
5241        let df = ctx
5242            .sql("SELECT id, notes FROM orders WHERE status = 'open' ORDER BY id")
5243            .await
5244            .expect("plan");
5245        let batches = df.collect().await.expect("non-covering index lookup");
5246        let ids: Vec<i64> = batches
5247            .iter()
5248            .flat_map(|b| {
5249                b.column(0)
5250                    .as_any()
5251                    .downcast_ref::<datafusion::arrow::array::Int64Array>()
5252                    .unwrap()
5253                    .iter()
5254                    .map(|v| v.unwrap())
5255            })
5256            .collect();
5257        let notes: Vec<String> = batches
5258            .iter()
5259            .flat_map(|b| {
5260                b.column(1)
5261                    .as_any()
5262                    .downcast_ref::<datafusion::arrow::array::StringArray>()
5263                    .unwrap()
5264                    .iter()
5265                    .map(|v| v.unwrap().to_string())
5266            })
5267            .collect();
5268        assert_eq!(ids, vec![1, 3]);
5269        assert_eq!(notes, vec!["first", "third"]);
5270
5271        let _ = shutdown_tx.send(());
5272    }
5273
5274    #[tokio::test]
5275    async fn backfill_resume_cursor_can_continue_without_skips_or_duplicates() {
5276        let state = MockState {
5277            kv: Arc::new(Mutex::new(BTreeMap::new())),
5278            range_calls: Arc::new(AtomicUsize::new(0)),
5279            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5280            sequence_number: Arc::new(AtomicU64::new(0)),
5281        };
5282        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5283        let client = StoreClient::new(&base_url);
5284
5285        let seed_schema = KvSchema::new(client.clone())
5286            .table(
5287                "orders",
5288                vec![
5289                    TableColumnConfig::new("id", DataType::Int64, false),
5290                    TableColumnConfig::new("status", DataType::Utf8, false),
5291                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5292                ],
5293                vec!["id".to_string()],
5294                vec![],
5295            )
5296            .expect("seed schema");
5297        let mut writer = seed_schema.batch_writer();
5298        for i in 0..8i64 {
5299            writer
5300                .insert(
5301                    "orders",
5302                    vec![
5303                        CellValue::Int64(i),
5304                        CellValue::Utf8("open".to_string()),
5305                        CellValue::Int64(i * 10),
5306                    ],
5307                )
5308                .expect("seed row");
5309        }
5310        writer.flush().await.expect("seed flush");
5311
5312        let backfill_schema = KvSchema::new(client.clone())
5313            .table(
5314                "orders",
5315                vec![
5316                    TableColumnConfig::new("id", DataType::Int64, false),
5317                    TableColumnConfig::new("status", DataType::Utf8, false),
5318                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5319                ],
5320                vec!["id".to_string()],
5321                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5322            )
5323            .expect("backfill schema");
5324
5325        let task_schema = KvSchema::new(client.clone())
5326            .table(
5327                "orders",
5328                vec![
5329                    TableColumnConfig::new("id", DataType::Int64, false),
5330                    TableColumnConfig::new("status", DataType::Utf8, false),
5331                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5332                ],
5333                vec!["id".to_string()],
5334                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5335            )
5336            .expect("task schema");
5337        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5338        let handle = tokio::spawn(async move {
5339            task_schema
5340                .backfill_added_indexes_with_options_and_progress(
5341                    "orders",
5342                    &[],
5343                    IndexBackfillOptions {
5344                        row_batch_size: 2,
5345                        start_from_primary_key: None,
5346                    },
5347                    Some(&progress_tx),
5348                )
5349                .await
5350        });
5351
5352        let mut resume_cursor = None;
5353        while let Some(event) = progress_rx.recv().await {
5354            if let IndexBackfillEvent::Progress { next_cursor, .. } = event {
5355                resume_cursor = next_cursor;
5356                break;
5357            }
5358        }
5359        handle.abort();
5360        let resume_cursor =
5361            resume_cursor.expect("first progress event should provide resume cursor");
5362
5363        let report = backfill_schema
5364            .backfill_added_indexes_with_options(
5365                "orders",
5366                &[],
5367                IndexBackfillOptions {
5368                    row_batch_size: 2,
5369                    start_from_primary_key: Some(resume_cursor),
5370                },
5371            )
5372            .await
5373            .expect("resume backfill should succeed");
5374        assert_eq!(report.scanned_rows, 6);
5375
5376        let guard = state.kv.lock().expect("kv mutex poisoned");
5377        let base_rows = guard
5378            .keys()
5379            .filter(|key| matches_primary_key(0, key))
5380            .count();
5381        let index_rows = guard
5382            .keys()
5383            .filter(|key| matches_secondary_index_key(0, 1, key))
5384            .count();
5385        assert_eq!(base_rows, 8);
5386        assert_eq!(
5387            index_rows, 8,
5388            "resume should backfill each row exactly once"
5389        );
5390
5391        let _ = shutdown_tx.send(());
5392    }
5393
5394    #[tokio::test]
5395    async fn concurrent_writes_during_backfill_preserve_index_correctness() {
5396        let state = MockState {
5397            kv: Arc::new(Mutex::new(BTreeMap::new())),
5398            range_calls: Arc::new(AtomicUsize::new(0)),
5399            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5400            sequence_number: Arc::new(AtomicU64::new(0)),
5401        };
5402        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5403        let client = StoreClient::new(&base_url);
5404
5405        let seed_schema = KvSchema::new(client.clone())
5406            .table(
5407                "orders",
5408                vec![
5409                    TableColumnConfig::new("id", DataType::Int64, false),
5410                    TableColumnConfig::new("status", DataType::Utf8, false),
5411                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5412                ],
5413                vec!["id".to_string()],
5414                vec![],
5415            )
5416            .expect("seed schema");
5417        let mut seed_writer = seed_schema.batch_writer();
5418        for i in 0..40i64 {
5419            seed_writer
5420                .insert(
5421                    "orders",
5422                    vec![
5423                        CellValue::Int64(i),
5424                        CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
5425                        CellValue::Int64(i * 10),
5426                    ],
5427                )
5428                .expect("seed row");
5429        }
5430        seed_writer.flush().await.expect("seed flush");
5431
5432        let backfill_schema = KvSchema::new(client.clone())
5433            .table(
5434                "orders",
5435                vec![
5436                    TableColumnConfig::new("id", DataType::Int64, false),
5437                    TableColumnConfig::new("status", DataType::Utf8, false),
5438                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5439                ],
5440                vec!["id".to_string()],
5441                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5442                    .expect("valid")
5443                    .with_cover_columns(vec!["amount_cents".to_string()])],
5444            )
5445            .expect("backfill schema");
5446
5447        let task_schema = KvSchema::new(client.clone())
5448            .table(
5449                "orders",
5450                vec![
5451                    TableColumnConfig::new("id", DataType::Int64, false),
5452                    TableColumnConfig::new("status", DataType::Utf8, false),
5453                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5454                ],
5455                vec!["id".to_string()],
5456                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5457                    .expect("valid")
5458                    .with_cover_columns(vec!["amount_cents".to_string()])],
5459            )
5460            .expect("task schema");
5461        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5462        let handle = tokio::spawn(async move {
5463            task_schema
5464                .backfill_added_indexes_with_options_and_progress(
5465                    "orders",
5466                    &[],
5467                    IndexBackfillOptions {
5468                        row_batch_size: 5,
5469                        start_from_primary_key: None,
5470                    },
5471                    Some(&progress_tx),
5472                )
5473                .await
5474        });
5475
5476        while let Some(event) = progress_rx.recv().await {
5477            if matches!(event, IndexBackfillEvent::Progress { .. }) {
5478                break;
5479            }
5480        }
5481
5482        let mut concurrent_writer = backfill_schema.batch_writer();
5483        for id in [100i64, 101i64] {
5484            concurrent_writer
5485                .insert(
5486                    "orders",
5487                    vec![
5488                        CellValue::Int64(id),
5489                        CellValue::Utf8("open".to_string()),
5490                        CellValue::Int64(id * 10),
5491                    ],
5492                )
5493                .expect("concurrent row");
5494        }
5495        concurrent_writer.flush().await.expect("concurrent flush");
5496
5497        let report = handle
5498            .await
5499            .expect("backfill task join")
5500            .expect("backfill result");
5501        assert!(
5502            report.scanned_rows >= 40,
5503            "backfill should at least scan the original historical rows"
5504        );
5505
5506        let guard = state.kv.lock().expect("kv mutex poisoned");
5507        let base_rows = guard
5508            .keys()
5509            .filter(|key| matches_primary_key(0, key))
5510            .count();
5511        let index_rows = guard
5512            .keys()
5513            .filter(|key| matches_secondary_index_key(0, 1, key))
5514            .count();
5515        assert_eq!(base_rows, 42);
5516        assert_eq!(
5517            index_rows, 42,
5518            "historical backfill plus concurrent indexed writes should leave one index row per base row"
5519        );
5520
5521        let _ = shutdown_tx.send(());
5522    }
5523
5524    #[derive(Clone)]
5525    struct DeferredChunkRangeHarness {
5526        first_chunk_sent: Arc<Notify>,
5527        release_second_chunk: Arc<Notify>,
5528        first_frame: ProtoRangeFrame,
5529        second_frame: ProtoRangeFrame,
5530    }
5531
5532    impl QueryService for DeferredChunkRangeHarness {
5533        async fn get(
5534            &self,
5535            _ctx: Context,
5536            _request: buffa::view::OwnedView<
5537                exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5538            >,
5539        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5540            Err(ConnectError::unimplemented("test harness"))
5541        }
5542
5543        async fn get_many(
5544            &self,
5545            _ctx: Context,
5546            _request: buffa::view::OwnedView<
5547                exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5548            >,
5549        ) -> Result<
5550            (
5551                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5552                Context,
5553            ),
5554            ConnectError,
5555        > {
5556            Err(ConnectError::unimplemented("test harness"))
5557        }
5558
5559        async fn range(
5560            &self,
5561            _ctx: Context,
5562            _request: buffa::view::OwnedView<
5563                exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5564            >,
5565        ) -> Result<
5566            (
5567                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5568                Context,
5569            ),
5570            ConnectError,
5571        > {
5572            let first_chunk_sent = self.first_chunk_sent.clone();
5573            let release_second_chunk = self.release_second_chunk.clone();
5574            let first_frame = self.first_frame.clone();
5575            let second_frame = self.second_frame.clone();
5576            let stream = stream::try_unfold(0u8, move |state| {
5577                let first_chunk_sent = first_chunk_sent.clone();
5578                let release_second_chunk = release_second_chunk.clone();
5579                let first_frame = first_frame.clone();
5580                let second_frame = second_frame.clone();
5581                async move {
5582                    match state {
5583                        0 => {
5584                            first_chunk_sent.notify_one();
5585                            Ok(Some((first_frame, 1)))
5586                        }
5587                        1 => {
5588                            release_second_chunk.notified().await;
5589                            Ok(Some((second_frame, 2)))
5590                        }
5591                        _ => Ok(None),
5592                    }
5593                }
5594            });
5595            Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5596        }
5597
5598        async fn reduce(
5599            &self,
5600            _ctx: Context,
5601            _request: buffa::view::OwnedView<
5602                exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5603            >,
5604        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5605            Err(ConnectError::unimplemented("test harness"))
5606        }
5607    }
5608
5609    #[derive(Clone)]
5610    struct ObservedLimitRangeHarness {
5611        release_second_chunk: Arc<Notify>,
5612        observed_limit: Arc<AtomicUsize>,
5613        first_frame: ProtoRangeFrame,
5614        second_frame: ProtoRangeFrame,
5615    }
5616
5617    impl QueryService for ObservedLimitRangeHarness {
5618        async fn get(
5619            &self,
5620            _ctx: Context,
5621            _request: buffa::view::OwnedView<
5622                exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5623            >,
5624        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5625            Err(ConnectError::unimplemented("test harness"))
5626        }
5627
5628        async fn get_many(
5629            &self,
5630            _ctx: Context,
5631            _request: buffa::view::OwnedView<
5632                exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5633            >,
5634        ) -> Result<
5635            (
5636                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5637                Context,
5638            ),
5639            ConnectError,
5640        > {
5641            Err(ConnectError::unimplemented("test harness"))
5642        }
5643
5644        async fn range(
5645            &self,
5646            _ctx: Context,
5647            request: buffa::view::OwnedView<
5648                exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5649            >,
5650        ) -> Result<
5651            (
5652                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5653                Context,
5654            ),
5655            ConnectError,
5656        > {
5657            let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
5658            self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5659            let release_second_chunk = self.release_second_chunk.clone();
5660            let first_frame = self.first_frame.clone();
5661            let second_frame = self.second_frame.clone();
5662            let stream = stream::try_unfold(0u8, move |state| {
5663                let release_second_chunk = release_second_chunk.clone();
5664                let first_frame = first_frame.clone();
5665                let second_frame = second_frame.clone();
5666                async move {
5667                    match state {
5668                        0 => Ok(Some((first_frame, 1))),
5669                        1 => {
5670                            if limit > 1 {
5671                                release_second_chunk.notified().await;
5672                                Ok(Some((second_frame, 2)))
5673                            } else {
5674                                Ok(None)
5675                            }
5676                        }
5677                        2 => Ok(None),
5678                        _ => Ok(None),
5679                    }
5680                }
5681            });
5682            Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5683        }
5684
5685        async fn reduce(
5686            &self,
5687            _ctx: Context,
5688            _request: buffa::view::OwnedView<
5689                exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5690            >,
5691        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5692            Err(ConnectError::unimplemented("test harness"))
5693        }
5694    }
5695
5696    #[derive(Clone)]
5697    struct ObservedLimitIndexRangeHarness {
5698        observed_limit: Arc<AtomicUsize>,
5699        entries_frame: ProtoRangeFrame,
5700    }
5701
5702    impl QueryService for ObservedLimitIndexRangeHarness {
5703        async fn get(
5704            &self,
5705            _ctx: Context,
5706            _request: buffa::view::OwnedView<
5707                exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5708            >,
5709        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5710            Err(ConnectError::unimplemented("test harness"))
5711        }
5712
5713        async fn get_many(
5714            &self,
5715            _ctx: Context,
5716            _request: buffa::view::OwnedView<
5717                exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5718            >,
5719        ) -> Result<
5720            (
5721                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5722                Context,
5723            ),
5724            ConnectError,
5725        > {
5726            Err(ConnectError::unimplemented("test harness"))
5727        }
5728
5729        async fn range(
5730            &self,
5731            _ctx: Context,
5732            request: buffa::view::OwnedView<
5733                exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5734            >,
5735        ) -> Result<
5736            (
5737                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5738                Context,
5739            ),
5740            ConnectError,
5741        > {
5742            let limit = request
5743                .limit
5744                .map(|v| {
5745                    if v == u32::MAX {
5746                        usize::MAX
5747                    } else {
5748                        v as usize
5749                    }
5750                })
5751                .unwrap_or(usize::MAX);
5752            self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5753            let entries_frame = self.entries_frame.clone();
5754            Ok((
5755                Box::pin(stream::iter(vec![Ok(entries_frame)])),
5756                query_detail_trailer_ctx(7),
5757            ))
5758        }
5759
5760        async fn reduce(
5761            &self,
5762            _ctx: Context,
5763            _request: buffa::view::OwnedView<
5764                exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5765            >,
5766        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5767            Err(ConnectError::unimplemented("test harness"))
5768        }
5769    }
5770
5771    #[tokio::test]
5772    async fn kv_scan_streaming_range_reads_emit_first_batch_before_full_range_completes() {
5773        let model = Arc::new(simple_int64_model(0));
5774        let first_chunk_sent = Arc::new(Notify::new());
5775        let release_second_chunk = Arc::new(Notify::new());
5776
5777        let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5778
5779        let first_results = {
5780            let mut results = Vec::with_capacity(BATCH_FLUSH_ROWS);
5781            for id in 0..BATCH_FLUSH_ROWS {
5782                let key =
5783                    encode_primary_key(model.table_prefix, &[&CellValue::Int64(id as i64)], &model)
5784                        .expect("primary key");
5785                results.push((key, encoded_row.clone()));
5786            }
5787            results
5788        };
5789        let first_frame = proto_range_entries_frame(first_results);
5790
5791        let second_results = {
5792            let key = encode_primary_key(
5793                model.table_prefix,
5794                &[&CellValue::Int64(BATCH_FLUSH_ROWS as i64)],
5795                &model,
5796            )
5797            .expect("primary key");
5798            vec![(key, encoded_row)]
5799        };
5800        let second_frame = proto_range_entries_frame(second_results);
5801
5802        let harness = DeferredChunkRangeHarness {
5803            first_chunk_sent: first_chunk_sent.clone(),
5804            release_second_chunk: release_second_chunk.clone(),
5805            first_frame,
5806            second_frame,
5807        };
5808        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5809            .with_compression(connect_compression_registry());
5810        let app = Router::new().fallback_service(connect);
5811
5812        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5813            .await
5814            .expect("bind test listener");
5815        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5816        tokio::spawn(async move {
5817            axum::serve(listener, app).await.expect("serve test app");
5818        });
5819
5820        let client = StoreClient::new(&url);
5821        let scan = KvScanExec::new(
5822            client,
5823            model.clone(),
5824            Arc::new(Vec::new()),
5825            QueryPredicate::default(),
5826            None,
5827            model.schema.clone(),
5828            None,
5829        );
5830
5831        let session_ctx = SessionContext::new();
5832        let mut stream = scan
5833            .execute(0, session_ctx.task_ctx())
5834            .expect("scan execute should start");
5835
5836        tokio::time::timeout(Duration::from_secs(1), first_chunk_sent.notified())
5837            .await
5838            .expect("server should send first range frame");
5839        let first_batch = tokio::time::timeout(Duration::from_millis(200), stream.try_next())
5840            .await
5841            .expect("first record batch should arrive before the second stream chunk is released")
5842            .expect("stream poll should succeed")
5843            .expect("expected first record batch");
5844        assert_eq!(first_batch.num_rows(), BATCH_FLUSH_ROWS);
5845
5846        release_second_chunk.notify_one();
5847
5848        let second_batch = stream
5849            .try_next()
5850            .await
5851            .expect("second poll should succeed")
5852            .expect("expected second record batch");
5853        assert_eq!(second_batch.num_rows(), 1);
5854        assert!(
5855            stream
5856                .try_next()
5857                .await
5858                .expect("stream completion poll")
5859                .is_none(),
5860            "stream should finish after the second batch"
5861        );
5862    }
5863
5864    #[tokio::test]
5865    async fn kv_scan_sql_limit_is_pushed_upstream_on_exact_streaming_scan() {
5866        let release_second_chunk = Arc::new(Notify::new());
5867        let observed_limit = Arc::new(AtomicUsize::new(0));
5868        let model = simple_int64_model(0);
5869
5870        let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5871
5872        let first_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(1)], &model)
5873            .expect("first primary key");
5874        let second_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(2)], &model)
5875            .expect("second primary key");
5876
5877        let first_frame = proto_range_entries_frame(vec![(first_key, encoded_row.clone())]);
5878        let second_frame = proto_range_entries_frame(vec![(second_key, encoded_row)]);
5879
5880        let harness = ObservedLimitRangeHarness {
5881            release_second_chunk: release_second_chunk.clone(),
5882            observed_limit: observed_limit.clone(),
5883            first_frame,
5884            second_frame,
5885        };
5886        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5887            .with_compression(connect_compression_registry());
5888        let app = Router::new().fallback_service(connect);
5889
5890        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5891            .await
5892            .expect("bind test listener");
5893        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5894        tokio::spawn(async move {
5895            axum::serve(listener, app).await.expect("serve test app");
5896        });
5897
5898        let client = StoreClient::new(&url);
5899        let schema = KvSchema::new(client)
5900            .table(
5901                "items",
5902                vec![TableColumnConfig::new("id", DataType::Int64, false)],
5903                vec!["id".to_string()],
5904                vec![],
5905            )
5906            .expect("schema");
5907        let ctx = SessionContext::new();
5908        schema.register_all(&ctx).expect("register");
5909
5910        let batches = tokio::time::timeout(Duration::from_millis(200), async {
5911            ctx.sql("SELECT id FROM items LIMIT 1")
5912                .await
5913                .expect("query")
5914                .collect()
5915                .await
5916                .expect("collect")
5917        })
5918        .await
5919        .expect("query with LIMIT 1 should finish without waiting for a delayed second chunk");
5920
5921        assert_eq!(
5922            batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
5923            1
5924        );
5925        assert_eq!(
5926            observed_limit.load(AtomicOrdering::SeqCst),
5927            1,
5928            "exact streaming scan should push SQL LIMIT upstream"
5929        );
5930        release_second_chunk.notify_one();
5931    }
5932
5933    #[tokio::test]
5934    async fn kv_scan_index_limit_does_not_push_upstream_when_seen_dedup_can_drop_entries() {
5935        let observed_limit = Arc::new(AtomicUsize::new(0));
5936        let config = KvTableConfig::new(
5937            0,
5938            vec![
5939                TableColumnConfig::new("id", DataType::Int64, false),
5940                TableColumnConfig::new("status", DataType::Utf8, false),
5941                TableColumnConfig::new("amount_cents", DataType::Int64, false),
5942            ],
5943            vec!["id".to_string()],
5944            vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5945                .expect("valid")
5946                .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
5947        )
5948        .expect("config");
5949        let model = TableModel::from_config(&config).expect("model");
5950        let spec = model
5951            .resolve_index_specs(&config.index_specs)
5952            .expect("specs")
5953            .into_iter()
5954            .next()
5955            .expect("status index spec");
5956        let stale_row = KvRow {
5957            values: vec![
5958                CellValue::Int64(7),
5959                CellValue::Utf8("closed".to_string()),
5960                CellValue::Int64(10),
5961            ],
5962        };
5963        let current_row = KvRow {
5964            values: vec![
5965                CellValue::Int64(7),
5966                CellValue::Utf8("open".to_string()),
5967                CellValue::Int64(10),
5968            ],
5969        };
5970        let unique_row = KvRow {
5971            values: vec![
5972                CellValue::Int64(8),
5973                CellValue::Utf8("open".to_string()),
5974                CellValue::Int64(20),
5975            ],
5976        };
5977        let stale_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &stale_row)
5978            .expect("stale index key");
5979        let current_key =
5980            encode_secondary_index_key(model.table_prefix, &spec, &model, &current_row)
5981                .expect("current index key");
5982        let unique_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &unique_row)
5983            .expect("unique index key");
5984        let stale_payload =
5985            encode_secondary_index_value(&stale_row, &model, &spec).expect("stale payload");
5986        let current_payload =
5987            encode_secondary_index_value(&current_row, &model, &spec).expect("current payload");
5988        let unique_payload =
5989            encode_secondary_index_value(&unique_row, &model, &spec).expect("unique payload");
5990
5991        let entries_frame = proto_range_entries_frame(vec![
5992            (stale_key, stale_payload),
5993            (current_key, current_payload),
5994            (unique_key, unique_payload),
5995        ]);
5996        let harness = ObservedLimitIndexRangeHarness {
5997            observed_limit: observed_limit.clone(),
5998            entries_frame,
5999        };
6000        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
6001            .with_compression(connect_compression_registry());
6002        let app = Router::new().fallback_service(connect);
6003
6004        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
6005            .await
6006            .expect("bind test listener");
6007        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
6008        tokio::spawn(async move {
6009            axum::serve(listener, app).await.expect("serve test app");
6010        });
6011
6012        let client = StoreClient::new(&url);
6013        let schema = KvSchema::new(client)
6014            .table(
6015                "orders",
6016                vec![
6017                    TableColumnConfig::new("id", DataType::Int64, false),
6018                    TableColumnConfig::new("status", DataType::Utf8, false),
6019                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6020                ],
6021                vec!["id".to_string()],
6022                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6023                    .expect("valid")
6024                    .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
6025            )
6026            .expect("schema");
6027        let ctx = SessionContext::new();
6028        schema.register_all(&ctx).expect("register");
6029
6030        let batches = ctx
6031            .sql(
6032                "SELECT id, amount_cents \
6033                 FROM orders \
6034                 WHERE status IN ('open', 'closed') \
6035                 LIMIT 2",
6036            )
6037            .await
6038            .expect("query")
6039            .collect()
6040            .await
6041            .expect("collect");
6042
6043        assert_eq!(
6044            batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
6045            2
6046        );
6047        assert_eq!(
6048            observed_limit.load(AtomicOrdering::SeqCst),
6049            usize::MAX,
6050            "index streaming scans should not push SQL LIMIT upstream while seen-dedup can drop duplicate primary keys"
6051        );
6052    }
6053
6054    #[tokio::test]
6055    async fn zorder_covering_index_scan_filters_false_positive_morton_span_rows() {
6056        let state = MockState {
6057            kv: Arc::new(Mutex::new(BTreeMap::new())),
6058            range_calls: Arc::new(AtomicUsize::new(0)),
6059            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6060            sequence_number: Arc::new(AtomicU64::new(0)),
6061        };
6062        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
6063        let client = StoreClient::new(&base_url);
6064
6065        let schema = KvSchema::new(client)
6066            .table(
6067                "points",
6068                vec![
6069                    TableColumnConfig::new("x", DataType::Int64, false),
6070                    TableColumnConfig::new("y", DataType::Int64, false),
6071                    TableColumnConfig::new("id", DataType::Int64, false),
6072                    TableColumnConfig::new("value", DataType::Int64, false),
6073                ],
6074                vec!["id".to_string()],
6075                vec![
6076                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6077                        .expect("valid")
6078                        .with_cover_columns(vec!["value".to_string()]),
6079                ],
6080            )
6081            .expect("schema");
6082
6083        let mut writer = schema.batch_writer();
6084        for (x, y, id, value) in [
6085            (0, 2, 2, 20),
6086            (1, 1, 11, 110),
6087            (1, 2, 12, 120),
6088            (2, 1, 21, 210),
6089            (2, 2, 22, 220),
6090            (3, 0, 30, 300),
6091        ] {
6092            writer
6093                .insert(
6094                    "points",
6095                    vec![
6096                        CellValue::Int64(x),
6097                        CellValue::Int64(y),
6098                        CellValue::Int64(id),
6099                        CellValue::Int64(value),
6100                    ],
6101                )
6102                .expect("row");
6103        }
6104        writer.flush().await.expect("flush");
6105
6106        let ctx = SessionContext::new();
6107        schema.register_all(&ctx).expect("register");
6108
6109        let batches = ctx
6110            .sql(
6111                "SELECT id, value FROM points \
6112                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2 \
6113                 ORDER BY id",
6114            )
6115            .await
6116            .expect("query")
6117            .collect()
6118            .await
6119            .expect("collect");
6120
6121        let mut rows = Vec::new();
6122        for batch in &batches {
6123            let ids = batch
6124                .column(0)
6125                .as_any()
6126                .downcast_ref::<Int64Array>()
6127                .expect("id int64");
6128            let values = batch
6129                .column(1)
6130                .as_any()
6131                .downcast_ref::<Int64Array>()
6132                .expect("value int64");
6133            for row_idx in 0..batch.num_rows() {
6134                rows.push((ids.value(row_idx), values.value(row_idx)));
6135            }
6136        }
6137        assert_eq!(rows, vec![(11, 110), (12, 120), (21, 210), (22, 220)]);
6138
6139        let _ = shutdown_tx.send(());
6140    }
6141
6142    #[tokio::test]
6143    async fn aggregate_pushdown_uses_range_reduce_for_supported_global_aggregates() {
6144        let state = MockState {
6145            kv: Arc::new(Mutex::new(BTreeMap::new())),
6146            range_calls: Arc::new(AtomicUsize::new(0)),
6147            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6148            sequence_number: Arc::new(AtomicU64::new(0)),
6149        };
6150        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6151        let client = StoreClient::new(&base_url);
6152
6153        let schema = KvSchema::new(client)
6154            .table(
6155                "orders",
6156                vec![
6157                    TableColumnConfig::new("id", DataType::Int64, false),
6158                    TableColumnConfig::new("status", DataType::Utf8, false),
6159                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6160                ],
6161                vec!["id".to_string()],
6162                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6163                    .expect("valid")
6164                    .with_cover_columns(vec!["amount_cents".to_string()])],
6165            )
6166            .expect("schema");
6167
6168        let mut writer = schema.batch_writer();
6169        for (id, status, amount) in [
6170            (1, "open", 10),
6171            (2, "closed", 15),
6172            (3, "open", 30),
6173            (4, "closed", 40),
6174        ] {
6175            writer
6176                .insert(
6177                    "orders",
6178                    vec![
6179                        CellValue::Int64(id),
6180                        CellValue::Utf8(status.to_string()),
6181                        CellValue::Int64(amount),
6182                    ],
6183                )
6184                .expect("row");
6185        }
6186        writer.flush().await.expect("flush");
6187
6188        let ctx = SessionContext::new();
6189        schema.register_all(&ctx).expect("register");
6190
6191        state.range_calls.store(0, AtomicOrdering::SeqCst);
6192        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6193
6194        let df = ctx
6195            .sql(
6196                "SELECT COUNT(*) AS row_count, SUM(amount_cents) AS total_cents, \
6197                 AVG(amount_cents) AS avg_cents \
6198                 FROM orders WHERE status = 'open'",
6199            )
6200            .await
6201            .expect("query");
6202        let batches = df.collect().await.expect("collect");
6203
6204        assert_eq!(batches.len(), 1);
6205        let batch = &batches[0];
6206        let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6207        let total = batch
6208            .column(1)
6209            .as_any()
6210            .downcast_ref::<Int64Array>()
6211            .expect("sum int64")
6212            .value(0);
6213        let avg = batch
6214            .column(2)
6215            .as_any()
6216            .downcast_ref::<Float64Array>()
6217            .expect("avg float64")
6218            .value(0);
6219        assert!(matches!(
6220            row_count,
6221            ScalarValue::UInt64(Some(2)) | ScalarValue::Int64(Some(2))
6222        ));
6223        assert_eq!(total, 40);
6224        assert_eq!(avg, 20.0);
6225        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6226        assert!(
6227            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6228            "supported aggregate should use range reduction path"
6229        );
6230
6231        let _ = shutdown_tx.send(());
6232    }
6233
6234    /// Store `/v1/range` is inclusive on both ends; `id <= N` and `BETWEEN` must include the end key.
6235    #[tokio::test]
6236    async fn primary_key_inclusive_upper_bound_streaming_scan_uses_range() {
6237        let state = MockState {
6238            kv: Arc::new(Mutex::new(BTreeMap::new())),
6239            range_calls: Arc::new(AtomicUsize::new(0)),
6240            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6241            sequence_number: Arc::new(AtomicU64::new(0)),
6242        };
6243        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6244        let client = StoreClient::new(&base_url);
6245
6246        let schema = KvSchema::new(client)
6247            .table(
6248                "inc_pk",
6249                vec![
6250                    TableColumnConfig::new("id", DataType::Int64, false),
6251                    TableColumnConfig::new("amount", DataType::Int64, false),
6252                ],
6253                vec!["id".to_string()],
6254                vec![],
6255            )
6256            .expect("schema");
6257
6258        let mut writer = schema.batch_writer();
6259        for id in 1i64..=5i64 {
6260            writer
6261                .insert(
6262                    "inc_pk",
6263                    vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6264                )
6265                .expect("row");
6266        }
6267        writer.flush().await.expect("flush");
6268
6269        let ctx = SessionContext::new();
6270        schema.register_all(&ctx).expect("register");
6271
6272        state.range_calls.store(0, AtomicOrdering::SeqCst);
6273        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6274
6275        let batches = ctx
6276            .sql("SELECT id FROM inc_pk WHERE id <= 3 ORDER BY id")
6277            .await
6278            .expect("lte query")
6279            .collect()
6280            .await
6281            .expect("collect");
6282        let mut ids = Vec::new();
6283        for batch in &batches {
6284            let col = batch
6285                .column(0)
6286                .as_any()
6287                .downcast_ref::<Int64Array>()
6288                .expect("id");
6289            for i in 0..batch.num_rows() {
6290                ids.push(col.value(i));
6291            }
6292        }
6293        assert_eq!(ids, vec![1, 2, 3], "id <= 3 must include id 3");
6294        assert!(
6295            state.range_calls.load(AtomicOrdering::SeqCst) >= 1,
6296            "PK bounded scan should call range"
6297        );
6298        assert_eq!(
6299            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6300            0,
6301            "streaming scan must not use range_reduce"
6302        );
6303
6304        state.range_calls.store(0, AtomicOrdering::SeqCst);
6305        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6306
6307        let batches = ctx
6308            .sql("SELECT id FROM inc_pk WHERE id BETWEEN 2 AND 4 ORDER BY id")
6309            .await
6310            .expect("between query")
6311            .collect()
6312            .await
6313            .expect("collect");
6314        ids.clear();
6315        for batch in &batches {
6316            let col = batch
6317                .column(0)
6318                .as_any()
6319                .downcast_ref::<Int64Array>()
6320                .expect("id");
6321            for i in 0..batch.num_rows() {
6322                ids.push(col.value(i));
6323            }
6324        }
6325        assert_eq!(ids, vec![2, 3, 4], "BETWEEN must include both endpoints");
6326        assert!(state.range_calls.load(AtomicOrdering::SeqCst) >= 1);
6327        assert_eq!(state.range_reduce_calls.load(AtomicOrdering::SeqCst), 0);
6328
6329        let _ = shutdown_tx.send(());
6330    }
6331
6332    #[tokio::test]
6333    async fn primary_key_inclusive_upper_bound_scalar_aggregates_use_range_reduce() {
6334        let state = MockState {
6335            kv: Arc::new(Mutex::new(BTreeMap::new())),
6336            range_calls: Arc::new(AtomicUsize::new(0)),
6337            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6338            sequence_number: Arc::new(AtomicU64::new(0)),
6339        };
6340        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6341        let client = StoreClient::new(&base_url);
6342
6343        let schema = KvSchema::new(client)
6344            .table(
6345                "inc_pk",
6346                vec![
6347                    TableColumnConfig::new("id", DataType::Int64, false),
6348                    TableColumnConfig::new("amount", DataType::Int64, false),
6349                ],
6350                vec!["id".to_string()],
6351                vec![],
6352            )
6353            .expect("schema");
6354
6355        let mut writer = schema.batch_writer();
6356        for id in 1i64..=5i64 {
6357            writer
6358                .insert(
6359                    "inc_pk",
6360                    vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6361                )
6362                .expect("row");
6363        }
6364        writer.flush().await.expect("flush");
6365
6366        let ctx = SessionContext::new();
6367        schema.register_all(&ctx).expect("register");
6368
6369        state.range_calls.store(0, AtomicOrdering::SeqCst);
6370        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6371
6372        let batches = ctx
6373            .sql("SELECT COUNT(*) AS c, SUM(amount) AS s FROM inc_pk WHERE id <= 3")
6374            .await
6375            .expect("lte agg")
6376            .collect()
6377            .await
6378            .expect("collect");
6379        assert_eq!(batches.len(), 1);
6380        let batch = &batches[0];
6381        let c = ScalarValue::try_from_array(batch.column(0), 0).expect("count");
6382        assert!(
6383            matches!(
6384                c,
6385                ScalarValue::UInt64(Some(3)) | ScalarValue::Int64(Some(3))
6386            ),
6387            "count should include id=3"
6388        );
6389        assert_eq!(
6390            batch
6391                .column(1)
6392                .as_any()
6393                .downcast_ref::<Int64Array>()
6394                .expect("sum")
6395                .value(0),
6396            100 + 200 + 300
6397        );
6398        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6399        assert!(
6400            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6401            "scalar aggregate on PK range should use range_reduce"
6402        );
6403
6404        state.range_calls.store(0, AtomicOrdering::SeqCst);
6405        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6406
6407        let batches = ctx
6408            .sql("SELECT SUM(amount) AS s FROM inc_pk WHERE id BETWEEN 2 AND 4")
6409            .await
6410            .expect("between agg")
6411            .collect()
6412            .await
6413            .expect("collect");
6414        assert_eq!(batches.len(), 1);
6415        let batch = &batches[0];
6416        assert_eq!(
6417            batch
6418                .column(0)
6419                .as_any()
6420                .downcast_ref::<Int64Array>()
6421                .expect("sum")
6422                .value(0),
6423            200 + 300 + 400
6424        );
6425        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6426        assert!(
6427            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6428            "BETWEEN aggregate should use range_reduce"
6429        );
6430
6431        let _ = shutdown_tx.send(());
6432    }
6433
6434    #[tokio::test]
6435    async fn aggregate_pushdown_uses_zorder_index_with_worker_filter() {
6436        let state = MockState {
6437            kv: Arc::new(Mutex::new(BTreeMap::new())),
6438            range_calls: Arc::new(AtomicUsize::new(0)),
6439            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6440            sequence_number: Arc::new(AtomicU64::new(0)),
6441        };
6442        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6443        let client = StoreClient::new(&base_url);
6444
6445        let schema = KvSchema::new(client)
6446            .table(
6447                "points",
6448                vec![
6449                    TableColumnConfig::new("x", DataType::Int64, false),
6450                    TableColumnConfig::new("y", DataType::Int64, false),
6451                    TableColumnConfig::new("id", DataType::Int64, false),
6452                    TableColumnConfig::new("value", DataType::Int64, false),
6453                ],
6454                vec!["id".to_string()],
6455                vec![
6456                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6457                        .expect("valid")
6458                        .with_cover_columns(vec!["value".to_string()]),
6459                ],
6460            )
6461            .expect("schema");
6462
6463        let mut writer = schema.batch_writer();
6464        for (x, y, id, value) in [
6465            (0, 2, 2, 20),
6466            (1, 1, 11, 110),
6467            (1, 2, 12, 120),
6468            (2, 1, 21, 210),
6469            (2, 2, 22, 220),
6470            (3, 0, 30, 300),
6471        ] {
6472            writer
6473                .insert(
6474                    "points",
6475                    vec![
6476                        CellValue::Int64(x),
6477                        CellValue::Int64(y),
6478                        CellValue::Int64(id),
6479                        CellValue::Int64(value),
6480                    ],
6481                )
6482                .expect("row");
6483        }
6484        writer.flush().await.expect("flush");
6485
6486        let ctx = SessionContext::new();
6487        schema.register_all(&ctx).expect("register");
6488
6489        state.range_calls.store(0, AtomicOrdering::SeqCst);
6490        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6491
6492        let batches = ctx
6493            .sql(
6494                "SELECT COUNT(*) AS row_count, SUM(value) AS total_value \
6495                 FROM points \
6496                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
6497            )
6498            .await
6499            .expect("query")
6500            .collect()
6501            .await
6502            .expect("collect");
6503
6504        assert_eq!(batches.len(), 1);
6505        let batch = &batches[0];
6506        let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6507        let total = batch
6508            .column(1)
6509            .as_any()
6510            .downcast_ref::<Int64Array>()
6511            .expect("sum int64")
6512            .value(0);
6513        assert!(matches!(
6514            row_count,
6515            ScalarValue::UInt64(Some(4)) | ScalarValue::Int64(Some(4))
6516        ));
6517        assert_eq!(total, 660);
6518        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6519        assert!(
6520            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6521            "z-order aggregate should use range reduction path"
6522        );
6523
6524        let _ = shutdown_tx.send(());
6525    }
6526
6527    #[tokio::test]
6528    async fn aggregate_pushdown_avg_merges_sum_and_count_across_multiple_ranges() {
6529        let state = MockState {
6530            kv: Arc::new(Mutex::new(BTreeMap::new())),
6531            range_calls: Arc::new(AtomicUsize::new(0)),
6532            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6533            sequence_number: Arc::new(AtomicU64::new(0)),
6534        };
6535        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6536        let client = StoreClient::new(&base_url);
6537
6538        let schema = KvSchema::new(client)
6539            .table(
6540                "orders",
6541                vec![
6542                    TableColumnConfig::new("id", DataType::Int64, false),
6543                    TableColumnConfig::new("status", DataType::Utf8, false),
6544                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6545                ],
6546                vec!["id".to_string()],
6547                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6548                    .expect("valid")
6549                    .with_cover_columns(vec!["amount_cents".to_string()])],
6550            )
6551            .expect("schema");
6552
6553        let mut writer = schema.batch_writer();
6554        for (id, status, amount) in [
6555            (1, "open", 10),
6556            (2, "open", 20),
6557            (3, "closed", 100),
6558            (4, "pending", 1_000),
6559        ] {
6560            writer
6561                .insert(
6562                    "orders",
6563                    vec![
6564                        CellValue::Int64(id),
6565                        CellValue::Utf8(status.to_string()),
6566                        CellValue::Int64(amount),
6567                    ],
6568                )
6569                .expect("row");
6570        }
6571        writer.flush().await.expect("flush");
6572
6573        let ctx = SessionContext::new();
6574        schema.register_all(&ctx).expect("register");
6575
6576        state.range_calls.store(0, AtomicOrdering::SeqCst);
6577        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6578
6579        let batches = ctx
6580            .sql(
6581                "SELECT AVG(amount_cents) AS avg_cents \
6582                 FROM orders \
6583                 WHERE status IN ('open', 'closed')",
6584            )
6585            .await
6586            .expect("query")
6587            .collect()
6588            .await
6589            .expect("collect");
6590
6591        assert_eq!(batches.len(), 1);
6592        let batch = &batches[0];
6593        let avg = batch
6594            .column(0)
6595            .as_any()
6596            .downcast_ref::<Float64Array>()
6597            .expect("avg float64")
6598            .value(0);
6599        let expected = 130.0 / 3.0;
6600        assert!(
6601            (avg - expected).abs() < 1e-12,
6602            "AVG should merge SUM+COUNT across unequal-count ranges: got {avg}, expected {expected}"
6603        );
6604        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6605        assert_eq!(
6606            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6607            2,
6608            "status IN (...) should expand to two pushed reduction ranges"
6609        );
6610
6611        let _ = shutdown_tx.send(());
6612    }
6613
6614    #[tokio::test]
6615    async fn aggregate_pushdown_supports_filtered_global_aggregates() {
6616        let state = MockState {
6617            kv: Arc::new(Mutex::new(BTreeMap::new())),
6618            range_calls: Arc::new(AtomicUsize::new(0)),
6619            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6620            sequence_number: Arc::new(AtomicU64::new(0)),
6621        };
6622        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6623        let client = StoreClient::new(&base_url);
6624
6625        let schema = KvSchema::new(client)
6626            .table(
6627                "orders",
6628                vec![
6629                    TableColumnConfig::new("id", DataType::Int64, false),
6630                    TableColumnConfig::new("status", DataType::Utf8, false),
6631                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6632                ],
6633                vec!["id".to_string()],
6634                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6635                    .expect("valid")
6636                    .with_cover_columns(vec!["amount_cents".to_string()])],
6637            )
6638            .expect("schema");
6639
6640        let mut writer = schema.batch_writer();
6641        for (id, status, amount) in [
6642            (1, "open", 10),
6643            (2, "closed", 15),
6644            (3, "open", 30),
6645            (4, "closed", 40),
6646        ] {
6647            writer
6648                .insert(
6649                    "orders",
6650                    vec![
6651                        CellValue::Int64(id),
6652                        CellValue::Utf8(status.to_string()),
6653                        CellValue::Int64(amount),
6654                    ],
6655                )
6656                .expect("row");
6657        }
6658        writer.flush().await.expect("flush");
6659
6660        let ctx = SessionContext::new();
6661        schema.register_all(&ctx).expect("register");
6662
6663        state.range_calls.store(0, AtomicOrdering::SeqCst);
6664        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6665
6666        let query = "SELECT COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
6667                            COUNT(*) FILTER (WHERE status = 'closed') AS closed_count, \
6668                            AVG(amount_cents) FILTER (WHERE status = 'closed') AS closed_avg \
6669                     FROM orders";
6670        let batches = ctx
6671            .sql(query)
6672            .await
6673            .expect("query")
6674            .collect()
6675            .await
6676            .expect("collect");
6677
6678        assert_eq!(batches.len(), 1);
6679        let batch = &batches[0];
6680        assert_count_scalar(batch, 0, 0, 2);
6681        assert_count_scalar(batch, 1, 0, 2);
6682        let closed_avg = batch
6683            .column(2)
6684            .as_any()
6685            .downcast_ref::<Float64Array>()
6686            .expect("avg float64")
6687            .value(0);
6688        assert_eq!(closed_avg, 27.5);
6689        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6690        assert!(
6691            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6692            "filtered aggregate pushdown should use dedicated reduction jobs"
6693        );
6694
6695        let _ = shutdown_tx.send(());
6696    }
6697
6698    #[tokio::test]
6699    async fn aggregate_pushdown_supports_case_filtered_global_aggregates() {
6700        let state = MockState {
6701            kv: Arc::new(Mutex::new(BTreeMap::new())),
6702            range_calls: Arc::new(AtomicUsize::new(0)),
6703            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6704            sequence_number: Arc::new(AtomicU64::new(0)),
6705        };
6706        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6707        let client = StoreClient::new(&base_url);
6708
6709        let schema = KvSchema::new(client)
6710            .table(
6711                "orders",
6712                vec![
6713                    TableColumnConfig::new("id", DataType::Int64, false),
6714                    TableColumnConfig::new("status", DataType::Utf8, false),
6715                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6716                ],
6717                vec!["id".to_string()],
6718                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6719                    .expect("valid")
6720                    .with_cover_columns(vec!["amount_cents".to_string()])],
6721            )
6722            .expect("schema");
6723
6724        let mut writer = schema.batch_writer();
6725        for (id, status, amount) in [
6726            (1, "open", 10),
6727            (2, "closed", 15),
6728            (3, "open", 30),
6729            (4, "closed", 40),
6730        ] {
6731            writer
6732                .insert(
6733                    "orders",
6734                    vec![
6735                        CellValue::Int64(id),
6736                        CellValue::Utf8(status.to_string()),
6737                        CellValue::Int64(amount),
6738                    ],
6739                )
6740                .expect("row");
6741        }
6742        writer.flush().await.expect("flush");
6743
6744        let ctx = SessionContext::new();
6745        schema.register_all(&ctx).expect("register");
6746
6747        state.range_calls.store(0, AtomicOrdering::SeqCst);
6748        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6749
6750        let query = "SELECT SUM(CASE status WHEN 'open' THEN amount_cents END) AS open_total, \
6751                            COUNT(CASE status WHEN 'closed' THEN 1 END) AS closed_count, \
6752                            AVG(CASE WHEN status = 'closed' THEN amount_cents END) AS closed_avg \
6753                     FROM orders";
6754        let batches = ctx
6755            .sql(query)
6756            .await
6757            .expect("query")
6758            .collect()
6759            .await
6760            .expect("collect");
6761
6762        assert_eq!(batches.len(), 1);
6763        let batch = &batches[0];
6764        assert_eq!(
6765            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6766            ScalarValue::Int64(Some(40))
6767        );
6768        assert_count_scalar(batch, 1, 0, 2);
6769        let closed_avg = batch
6770            .column(2)
6771            .as_any()
6772            .downcast_ref::<Float64Array>()
6773            .expect("avg float64")
6774            .value(0);
6775        assert_eq!(closed_avg, 27.5);
6776        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6777        assert!(
6778            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6779            "case-based conditional aggregates should use reduction jobs"
6780        );
6781
6782        let _ = shutdown_tx.send(());
6783    }
6784
6785    #[tokio::test]
6786    async fn aggregate_pushdown_supports_casted_group_and_aggregate_expressions() {
6787        let state = MockState {
6788            kv: Arc::new(Mutex::new(BTreeMap::new())),
6789            range_calls: Arc::new(AtomicUsize::new(0)),
6790            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6791            sequence_number: Arc::new(AtomicU64::new(0)),
6792        };
6793        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6794        let client = StoreClient::new(&base_url);
6795
6796        let schema = KvSchema::new(client)
6797            .table(
6798                "orders",
6799                vec![
6800                    TableColumnConfig::new("id", DataType::Int64, false),
6801                    TableColumnConfig::new("status", DataType::Utf8, false),
6802                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6803                ],
6804                vec!["id".to_string()],
6805                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6806                    .expect("valid")
6807                    .with_cover_columns(vec!["amount_cents".to_string()])],
6808            )
6809            .expect("schema");
6810
6811        let mut writer = schema.batch_writer();
6812        for (id, status, amount) in [
6813            (1, "open", 10),
6814            (2, "open", 30),
6815            (3, "closed", 15),
6816            (4, "closed", 40),
6817        ] {
6818            writer
6819                .insert(
6820                    "orders",
6821                    vec![
6822                        CellValue::Int64(id),
6823                        CellValue::Utf8(status.to_string()),
6824                        CellValue::Int64(amount),
6825                    ],
6826                )
6827                .expect("row");
6828        }
6829        writer.flush().await.expect("flush");
6830
6831        let ctx = SessionContext::new();
6832        schema.register_all(&ctx).expect("register");
6833
6834        state.range_calls.store(0, AtomicOrdering::SeqCst);
6835        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6836
6837        let batches = ctx
6838            .sql(
6839                "SELECT CAST(status AS VARCHAR) AS status_text, \
6840                        SUM(CAST(amount_cents AS DOUBLE)) AS total_cents \
6841                 FROM orders \
6842                 GROUP BY CAST(status AS VARCHAR) \
6843                 ORDER BY status_text",
6844            )
6845            .await
6846            .expect("query")
6847            .collect()
6848            .await
6849            .expect("collect");
6850
6851        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
6852        let batch = &batches[0];
6853        let status = ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar");
6854        assert_eq!(scalar_to_string(&status).as_deref(), Some("closed"));
6855        let closed_total = batch
6856            .column(1)
6857            .as_any()
6858            .downcast_ref::<Float64Array>()
6859            .expect("sum float64")
6860            .value(0);
6861        let open_total = batch
6862            .column(1)
6863            .as_any()
6864            .downcast_ref::<Float64Array>()
6865            .expect("sum float64")
6866            .value(1);
6867        assert_eq!(closed_total, 55.0);
6868        assert_eq!(open_total, 40.0);
6869        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6870        assert!(
6871            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6872            "casted grouped aggregates should stay on the reduction path"
6873        );
6874
6875        let _ = shutdown_tx.send(());
6876    }
6877
6878    #[tokio::test]
6879    async fn aggregate_pushdown_supports_computed_aggregate_inputs() {
6880        let state = MockState {
6881            kv: Arc::new(Mutex::new(BTreeMap::new())),
6882            range_calls: Arc::new(AtomicUsize::new(0)),
6883            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6884            sequence_number: Arc::new(AtomicU64::new(0)),
6885        };
6886        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6887        let client = StoreClient::new(&base_url);
6888
6889        let schema = KvSchema::new(client)
6890            .table(
6891                "orders",
6892                vec![
6893                    TableColumnConfig::new("id", DataType::Int64, false),
6894                    TableColumnConfig::new("price_cents", DataType::Int64, false),
6895                    TableColumnConfig::new("qty", DataType::Int64, false),
6896                    TableColumnConfig::new("duration_ms", DataType::Int64, false),
6897                ],
6898                vec!["id".to_string()],
6899                vec![],
6900            )
6901            .expect("schema");
6902
6903        let mut writer = schema.batch_writer();
6904        for (id, price, qty, duration_ms) in [(1, 10, 2, 500), (2, 15, 3, 2500), (3, 7, 4, 1000)] {
6905            writer
6906                .insert(
6907                    "orders",
6908                    vec![
6909                        CellValue::Int64(id),
6910                        CellValue::Int64(price),
6911                        CellValue::Int64(qty),
6912                        CellValue::Int64(duration_ms),
6913                    ],
6914                )
6915                .expect("row");
6916        }
6917        writer.flush().await.expect("flush");
6918
6919        let ctx = SessionContext::new();
6920        schema.register_all(&ctx).expect("register");
6921
6922        state.range_calls.store(0, AtomicOrdering::SeqCst);
6923        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6924
6925        let batches = ctx
6926            .sql(
6927                "SELECT SUM(price_cents * qty) AS total_revenue, \
6928                        AVG(duration_ms / 1e3) AS avg_seconds \
6929                 FROM orders",
6930            )
6931            .await
6932            .expect("query")
6933            .collect()
6934            .await
6935            .expect("collect");
6936
6937        assert_eq!(batches.len(), 1);
6938        let batch = &batches[0];
6939        assert_eq!(
6940            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6941            ScalarValue::Int64(Some(93))
6942        );
6943        let avg_seconds = batch
6944            .column(1)
6945            .as_any()
6946            .downcast_ref::<Float64Array>()
6947            .expect("avg float64")
6948            .value(0);
6949        assert!((avg_seconds - (4.0 / 3.0)).abs() < 1e-12);
6950        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6951        assert!(
6952            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
6953            "computed aggregate inputs should use reduction jobs"
6954        );
6955
6956        let _ = shutdown_tx.send(());
6957    }
6958
6959    #[tokio::test]
6960    async fn aggregate_pushdown_supports_add_and_subtract_inputs() {
6961        let state = MockState {
6962            kv: Arc::new(Mutex::new(BTreeMap::new())),
6963            range_calls: Arc::new(AtomicUsize::new(0)),
6964            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6965            sequence_number: Arc::new(AtomicU64::new(0)),
6966        };
6967        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6968        let client = StoreClient::new(&base_url);
6969
6970        let schema = KvSchema::new(client)
6971            .table(
6972                "orders",
6973                vec![
6974                    TableColumnConfig::new("id", DataType::Int64, false),
6975                    TableColumnConfig::new("price_cents", DataType::Int64, false),
6976                    TableColumnConfig::new("fee_cents", DataType::Int64, false),
6977                    TableColumnConfig::new("discount_cents", DataType::Int64, false),
6978                ],
6979                vec!["id".to_string()],
6980                vec![],
6981            )
6982            .expect("schema");
6983
6984        let mut writer = schema.batch_writer();
6985        for (id, price, fee, discount) in [(1, 10, 2, 1), (2, 15, 3, 4), (3, 7, 1, 2)] {
6986            writer
6987                .insert(
6988                    "orders",
6989                    vec![
6990                        CellValue::Int64(id),
6991                        CellValue::Int64(price),
6992                        CellValue::Int64(fee),
6993                        CellValue::Int64(discount),
6994                    ],
6995                )
6996                .expect("row");
6997        }
6998        writer.flush().await.expect("flush");
6999
7000        let ctx = SessionContext::new();
7001        schema.register_all(&ctx).expect("register");
7002
7003        state.range_calls.store(0, AtomicOrdering::SeqCst);
7004        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7005
7006        let batches = ctx
7007            .sql(
7008                "SELECT SUM(price_cents + fee_cents) AS gross_plus_fee, \
7009                        SUM(price_cents - discount_cents) AS net_total \
7010                 FROM orders",
7011            )
7012            .await
7013            .expect("query")
7014            .collect()
7015            .await
7016            .expect("collect");
7017
7018        assert_eq!(batches.len(), 1);
7019        let batch = &batches[0];
7020        assert_eq!(
7021            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7022            ScalarValue::Int64(Some(38))
7023        );
7024        assert_eq!(
7025            ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7026            ScalarValue::Int64(Some(25))
7027        );
7028        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7029        assert!(
7030            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7031            "add/sub aggregate inputs should use reduction jobs"
7032        );
7033
7034        let _ = shutdown_tx.send(());
7035    }
7036
7037    #[tokio::test]
7038    async fn aggregate_pushdown_supports_case_filtered_computed_aggregates() {
7039        let state = MockState {
7040            kv: Arc::new(Mutex::new(BTreeMap::new())),
7041            range_calls: Arc::new(AtomicUsize::new(0)),
7042            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7043            sequence_number: Arc::new(AtomicU64::new(0)),
7044        };
7045        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7046        let client = StoreClient::new(&base_url);
7047
7048        let schema = KvSchema::new(client)
7049            .table(
7050                "orders",
7051                vec![
7052                    TableColumnConfig::new("id", DataType::Int64, false),
7053                    TableColumnConfig::new("status", DataType::Utf8, false),
7054                    TableColumnConfig::new("price_cents", DataType::Int64, false),
7055                    TableColumnConfig::new("qty", DataType::Int64, false),
7056                ],
7057                vec!["id".to_string()],
7058                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7059                    .expect("valid")
7060                    .with_cover_columns(vec!["price_cents".to_string(), "qty".to_string()])],
7061            )
7062            .expect("schema");
7063
7064        let mut writer = schema.batch_writer();
7065        for (id, status, price, qty) in [
7066            (1, "open", 10, 2),
7067            (2, "closed", 99, 1),
7068            (3, "open", 15, 3),
7069            (4, "closed", 7, 4),
7070        ] {
7071            writer
7072                .insert(
7073                    "orders",
7074                    vec![
7075                        CellValue::Int64(id),
7076                        CellValue::Utf8(status.to_string()),
7077                        CellValue::Int64(price),
7078                        CellValue::Int64(qty),
7079                    ],
7080                )
7081                .expect("row");
7082        }
7083        writer.flush().await.expect("flush");
7084
7085        let ctx = SessionContext::new();
7086        schema.register_all(&ctx).expect("register");
7087
7088        state.range_calls.store(0, AtomicOrdering::SeqCst);
7089        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7090
7091        let batches = ctx
7092            .sql(
7093                "SELECT SUM(CASE WHEN status = 'open' THEN price_cents * qty END) \
7094                 AS open_revenue \
7095                 FROM orders",
7096            )
7097            .await
7098            .expect("query")
7099            .collect()
7100            .await
7101            .expect("collect");
7102
7103        assert_eq!(batches.len(), 1);
7104        let batch = &batches[0];
7105        assert_eq!(
7106            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7107            ScalarValue::Int64(Some(65))
7108        );
7109        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7110        assert!(
7111            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7112            "case-filtered computed aggregate should use reduction jobs"
7113        );
7114
7115        let _ = shutdown_tx.send(());
7116    }
7117
7118    #[tokio::test]
7119    async fn aggregate_pushdown_does_not_rewrite_sum_case_else_zero_semantics() {
7120        let state = MockState {
7121            kv: Arc::new(Mutex::new(BTreeMap::new())),
7122            range_calls: Arc::new(AtomicUsize::new(0)),
7123            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7124            sequence_number: Arc::new(AtomicU64::new(0)),
7125        };
7126        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7127        let client = StoreClient::new(&base_url);
7128
7129        let schema = KvSchema::new(client)
7130            .table(
7131                "orders",
7132                vec![
7133                    TableColumnConfig::new("id", DataType::Int64, false),
7134                    TableColumnConfig::new("region", DataType::Utf8, false),
7135                    TableColumnConfig::new("status", DataType::Utf8, false),
7136                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7137                ],
7138                vec!["id".to_string()],
7139                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7140                    .expect("valid")
7141                    .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7142            )
7143            .expect("schema");
7144
7145        let mut writer = schema.batch_writer();
7146        for (id, region, status, amount) in [
7147            (1, "east", "open", 10),
7148            (2, "east", "closed", 20),
7149            (3, "west", "closed", 30),
7150        ] {
7151            writer
7152                .insert(
7153                    "orders",
7154                    vec![
7155                        CellValue::Int64(id),
7156                        CellValue::Utf8(region.to_string()),
7157                        CellValue::Utf8(status.to_string()),
7158                        CellValue::Int64(amount),
7159                    ],
7160                )
7161                .expect("row");
7162        }
7163        writer.flush().await.expect("flush");
7164
7165        let ctx = SessionContext::new();
7166        schema.register_all(&ctx).expect("register");
7167
7168        state.range_calls.store(0, AtomicOrdering::SeqCst);
7169        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7170
7171        let batches = ctx
7172            .sql(
7173                "SELECT region, \
7174                        SUM(CASE WHEN status = 'open' THEN amount_cents ELSE 0 END) AS open_total \
7175                 FROM orders \
7176                 GROUP BY region \
7177                 ORDER BY region",
7178            )
7179            .await
7180            .expect("query")
7181            .collect()
7182            .await
7183            .expect("collect");
7184
7185        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7186        let batch = &batches[0];
7187        assert_eq!(
7188            ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7189            ScalarValue::Utf8(Some("east".to_string()))
7190        );
7191        assert_eq!(
7192            ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7193            ScalarValue::Int64(Some(10))
7194        );
7195        assert_eq!(
7196            ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7197            ScalarValue::Utf8(Some("west".to_string()))
7198        );
7199        assert_eq!(
7200            ScalarValue::try_from_array(batch.column(1), 1).expect("sum scalar"),
7201            ScalarValue::Int64(Some(0))
7202        );
7203        assert_eq!(
7204            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
7205            0,
7206            "SUM(CASE ... ELSE 0 END) must not push down because FILTER rewrite changes semantics"
7207        );
7208
7209        let _ = shutdown_tx.send(());
7210    }
7211
7212    #[tokio::test]
7213    async fn aggregate_pushdown_supports_computed_group_keys() {
7214        let state = MockState {
7215            kv: Arc::new(Mutex::new(BTreeMap::new())),
7216            range_calls: Arc::new(AtomicUsize::new(0)),
7217            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7218            sequence_number: Arc::new(AtomicU64::new(0)),
7219        };
7220        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7221        let client = StoreClient::new(&base_url);
7222
7223        let schema = KvSchema::new(client)
7224            .table(
7225                "events",
7226                vec![
7227                    TableColumnConfig::new("id", DataType::Int64, false),
7228                    TableColumnConfig::new("country", DataType::Utf8, false),
7229                    TableColumnConfig::new(
7230                        "occurred_at",
7231                        DataType::Timestamp(TimeUnit::Microsecond, None),
7232                        false,
7233                    ),
7234                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7235                ],
7236                vec!["id".to_string()],
7237                vec![],
7238            )
7239            .expect("schema");
7240
7241        let day_micros = 86_400_000_000i64;
7242        let day0 = 1_700_000_000_000_000i64;
7243        let day1 = day0 + day_micros;
7244        let day0_bucket = day0.div_euclid(day_micros) * day_micros;
7245        let day1_bucket = day1.div_euclid(day_micros) * day_micros;
7246        let mut writer = schema.batch_writer();
7247        for (id, country, occurred_at, amount) in [
7248            (1, "East", day0 + 111, 10),
7249            (2, "east", day0 + 222, 30),
7250            (3, "West", day1 + 333, 7),
7251        ] {
7252            writer
7253                .insert(
7254                    "events",
7255                    vec![
7256                        CellValue::Int64(id),
7257                        CellValue::Utf8(country.to_string()),
7258                        CellValue::Timestamp(occurred_at),
7259                        CellValue::Int64(amount),
7260                    ],
7261                )
7262                .expect("row");
7263        }
7264        writer.flush().await.expect("flush");
7265
7266        let ctx = SessionContext::new();
7267        schema.register_all(&ctx).expect("register");
7268
7269        state.range_calls.store(0, AtomicOrdering::SeqCst);
7270        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7271
7272        let batches = ctx
7273            .sql(
7274                "SELECT lower(country) AS country_norm, \
7275                        date_trunc('day', occurred_at) AS day_bucket, \
7276                        SUM(amount_cents) AS total_cents \
7277                 FROM events \
7278                 GROUP BY lower(country), date_trunc('day', occurred_at) \
7279                 ORDER BY country_norm, day_bucket",
7280            )
7281            .await
7282            .expect("query")
7283            .collect()
7284            .await
7285            .expect("collect");
7286
7287        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7288        let batch = &batches[0];
7289        assert_eq!(
7290            scalar_to_string(
7291                &ScalarValue::try_from_array(batch.column(0), 0).expect("country scalar")
7292            )
7293            .as_deref(),
7294            Some("east")
7295        );
7296        assert_eq!(
7297            ScalarValue::try_from_array(batch.column(1), 0).expect("day scalar"),
7298            ScalarValue::TimestampMicrosecond(Some(day0_bucket), None)
7299        );
7300        assert_eq!(
7301            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7302            ScalarValue::Int64(Some(40))
7303        );
7304        assert_eq!(
7305            scalar_to_string(
7306                &ScalarValue::try_from_array(batch.column(0), 1).expect("country scalar")
7307            )
7308            .as_deref(),
7309            Some("west")
7310        );
7311        assert_eq!(
7312            ScalarValue::try_from_array(batch.column(1), 1).expect("day scalar"),
7313            ScalarValue::TimestampMicrosecond(Some(day1_bucket), None)
7314        );
7315        assert_eq!(
7316            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7317            ScalarValue::Int64(Some(7))
7318        );
7319        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7320        assert!(
7321            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7322            "computed group keys should use grouped reduction path"
7323        );
7324
7325        let _ = shutdown_tx.send(());
7326    }
7327
7328    #[tokio::test]
7329    async fn aggregate_pushdown_supports_group_by_queries() {
7330        let state = MockState {
7331            kv: Arc::new(Mutex::new(BTreeMap::new())),
7332            range_calls: Arc::new(AtomicUsize::new(0)),
7333            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7334            sequence_number: Arc::new(AtomicU64::new(0)),
7335        };
7336        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7337        let client = StoreClient::new(&base_url);
7338
7339        let schema = KvSchema::new(client)
7340            .table(
7341                "orders",
7342                vec![
7343                    TableColumnConfig::new("id", DataType::Int64, false),
7344                    TableColumnConfig::new("status", DataType::Utf8, false),
7345                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7346                ],
7347                vec!["id".to_string()],
7348                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7349                    .expect("valid")
7350                    .with_cover_columns(vec!["amount_cents".to_string()])],
7351            )
7352            .expect("schema");
7353
7354        let mut writer = schema.batch_writer();
7355        for (id, status, amount) in [
7356            (1, "open", 10),
7357            (2, "open", 30),
7358            (3, "closed", 15),
7359            (4, "closed", 40),
7360        ] {
7361            writer
7362                .insert(
7363                    "orders",
7364                    vec![
7365                        CellValue::Int64(id),
7366                        CellValue::Utf8(status.to_string()),
7367                        CellValue::Int64(amount),
7368                    ],
7369                )
7370                .expect("row");
7371        }
7372        writer.flush().await.expect("flush");
7373
7374        let ctx = SessionContext::new();
7375        schema.register_all(&ctx).expect("register");
7376
7377        state.range_calls.store(0, AtomicOrdering::SeqCst);
7378        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7379
7380        let batches = ctx
7381            .sql(
7382                "SELECT status, COUNT(*) AS row_count, SUM(amount_cents) AS total_cents \
7383                 FROM orders GROUP BY status ORDER BY status",
7384            )
7385            .await
7386            .expect("query")
7387            .collect()
7388            .await
7389            .expect("collect");
7390
7391        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7392        let batch = &batches[0];
7393        assert_eq!(
7394            ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar"),
7395            ScalarValue::Utf8(Some("closed".to_string()))
7396        );
7397        assert_count_scalar(batch, 1, 0, 2);
7398        assert_eq!(
7399            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7400            ScalarValue::Int64(Some(55))
7401        );
7402        assert_eq!(
7403            ScalarValue::try_from_array(batch.column(0), 1).expect("status scalar"),
7404            ScalarValue::Utf8(Some("open".to_string()))
7405        );
7406        assert_count_scalar(batch, 1, 1, 2);
7407        assert_eq!(
7408            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7409            ScalarValue::Int64(Some(40))
7410        );
7411        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7412        assert!(
7413            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7414            "group-by aggregate should use grouped range reduction path"
7415        );
7416
7417        let _ = shutdown_tx.send(());
7418    }
7419
7420    #[tokio::test]
7421    async fn aggregate_pushdown_group_by_float_canonicalizes_signed_zero() {
7422        let state = MockState {
7423            kv: Arc::new(Mutex::new(BTreeMap::new())),
7424            range_calls: Arc::new(AtomicUsize::new(0)),
7425            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7426            sequence_number: Arc::new(AtomicU64::new(0)),
7427        };
7428        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7429        let client = StoreClient::new(&base_url);
7430
7431        let schema = KvSchema::new(client)
7432            .table(
7433                "metrics",
7434                vec![
7435                    TableColumnConfig::new("id", DataType::Int64, false),
7436                    TableColumnConfig::new("score", DataType::Float64, false),
7437                ],
7438                vec!["id".to_string()],
7439                vec![],
7440            )
7441            .expect("schema");
7442
7443        let mut writer = schema.batch_writer();
7444        for (id, score) in [(1, -0.0), (2, 0.0), (3, 1.5)] {
7445            writer
7446                .insert(
7447                    "metrics",
7448                    vec![CellValue::Int64(id), CellValue::Float64(score)],
7449                )
7450                .expect("row");
7451        }
7452        writer.flush().await.expect("flush");
7453
7454        let ctx = SessionContext::new();
7455        schema.register_all(&ctx).expect("register");
7456
7457        state.range_calls.store(0, AtomicOrdering::SeqCst);
7458        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7459
7460        let batches = ctx
7461            .sql(
7462                "SELECT score, COUNT(*) AS row_count \
7463                 FROM metrics GROUP BY score ORDER BY row_count DESC, score",
7464            )
7465            .await
7466            .expect("query")
7467            .collect()
7468            .await
7469            .expect("collect");
7470
7471        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7472        let batch = &batches[0];
7473        let top_score = batch
7474            .column(0)
7475            .as_any()
7476            .downcast_ref::<Float64Array>()
7477            .expect("score float64")
7478            .value(0);
7479        assert_eq!(top_score.to_bits(), 0.0f64.to_bits());
7480        assert_count_scalar(batch, 1, 0, 2);
7481        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7482        assert!(
7483            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7484            "float group-by aggregate should stay on grouped reduction path"
7485        );
7486
7487        let _ = shutdown_tx.send(());
7488    }
7489
7490    #[tokio::test]
7491    async fn aggregate_pushdown_supports_filtered_group_by_queries() {
7492        let state = MockState {
7493            kv: Arc::new(Mutex::new(BTreeMap::new())),
7494            range_calls: Arc::new(AtomicUsize::new(0)),
7495            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7496            sequence_number: Arc::new(AtomicU64::new(0)),
7497        };
7498        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7499        let client = StoreClient::new(&base_url);
7500
7501        let schema = KvSchema::new(client)
7502            .table(
7503                "orders",
7504                vec![
7505                    TableColumnConfig::new("id", DataType::Int64, false),
7506                    TableColumnConfig::new("region", DataType::Utf8, false),
7507                    TableColumnConfig::new("status", DataType::Utf8, false),
7508                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7509                ],
7510                vec!["id".to_string()],
7511                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7512                    .expect("valid")
7513                    .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7514            )
7515            .expect("schema");
7516
7517        let mut writer = schema.batch_writer();
7518        for (id, region, status, amount) in [
7519            (1, "east", "open", 10),
7520            (2, "east", "closed", 20),
7521            (3, "west", "open", 30),
7522            (4, "north", "closed", 40),
7523        ] {
7524            writer
7525                .insert(
7526                    "orders",
7527                    vec![
7528                        CellValue::Int64(id),
7529                        CellValue::Utf8(region.to_string()),
7530                        CellValue::Utf8(status.to_string()),
7531                        CellValue::Int64(amount),
7532                    ],
7533                )
7534                .expect("row");
7535        }
7536        writer.flush().await.expect("flush");
7537
7538        let ctx = SessionContext::new();
7539        schema.register_all(&ctx).expect("register");
7540
7541        state.range_calls.store(0, AtomicOrdering::SeqCst);
7542        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7543
7544        let batches = ctx
7545            .sql(
7546                "SELECT region, \
7547                        COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
7548                        SUM(amount_cents) FILTER (WHERE status = 'closed') AS closed_total \
7549                 FROM orders \
7550                 GROUP BY region \
7551                 ORDER BY region",
7552            )
7553            .await
7554            .expect("query")
7555            .collect()
7556            .await
7557            .expect("collect");
7558
7559        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
7560        let batch = &batches[0];
7561
7562        assert_eq!(
7563            ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7564            ScalarValue::Utf8(Some("east".to_string()))
7565        );
7566        assert_count_scalar(batch, 1, 0, 1);
7567        assert_eq!(
7568            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7569            ScalarValue::Int64(Some(20))
7570        );
7571
7572        assert_eq!(
7573            ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7574            ScalarValue::Utf8(Some("north".to_string()))
7575        );
7576        assert_count_scalar(batch, 1, 1, 0);
7577        assert_eq!(
7578            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7579            ScalarValue::Int64(Some(40))
7580        );
7581
7582        assert_eq!(
7583            ScalarValue::try_from_array(batch.column(0), 2).expect("region scalar"),
7584            ScalarValue::Utf8(Some("west".to_string()))
7585        );
7586        assert_count_scalar(batch, 1, 2, 1);
7587        assert_eq!(
7588            ScalarValue::try_from_array(batch.column(2), 2).expect("sum scalar"),
7589            ScalarValue::Int64(None)
7590        );
7591
7592        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7593        assert!(
7594            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
7595            "filtered group-by aggregate should use grouped reduction plus seed job"
7596        );
7597
7598        let _ = shutdown_tx.send(());
7599    }
7600
7601    mod e2e {
7602        use super::*;
7603        use axum::{routing::get, Router};
7604        use datafusion::prelude::SessionContext;
7605        use exoware_sdk_rs::StoreClient;
7606        use exoware_server::{connect_stack, AppState};
7607        use exoware_simulator::RocksStore;
7608        use tempfile::tempdir;
7609
7610        struct TestServers {
7611            ingest_url: String,
7612            query_url: String,
7613        }
7614
7615        impl TestServers {
7616            fn client(&self) -> StoreClient {
7617                StoreClient::with_split_urls(
7618                    &self.query_url,
7619                    &self.ingest_url,
7620                    &self.query_url,
7621                    &self.ingest_url,
7622                )
7623            }
7624        }
7625
7626        async fn spawn_e2e_servers() -> TestServers {
7627            let dir = tempdir().expect("tempdir");
7628            let db = RocksStore::open(dir.path()).expect("db");
7629            let state = AppState::new(std::sync::Arc::new(db));
7630            let connect = connect_stack(state);
7631            let app = Router::new()
7632                .route("/health", get(|| async { "ok" }))
7633                .fallback_service(connect);
7634            let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
7635                .await
7636                .expect("bind");
7637            let url = format!("http://{}", listener.local_addr().unwrap());
7638            tokio::spawn(async move {
7639                axum::serve(listener, app).await.expect("serve");
7640            });
7641            for _ in 0..200 {
7642                if reqwest::get(format!("{url}/health"))
7643                    .await
7644                    .ok()
7645                    .is_some_and(|r| r.status().is_success())
7646                {
7647                    return TestServers {
7648                        ingest_url: url.clone(),
7649                        query_url: url,
7650                    };
7651                }
7652                tokio::time::sleep(std::time::Duration::from_millis(25)).await;
7653            }
7654            panic!("e2e simulator did not become ready");
7655        }
7656
7657        #[tokio::test]
7658        async fn sql_insert_and_select_through_real_ingest_query_workers() {
7659            let servers = spawn_e2e_servers().await;
7660            let client = servers.client();
7661
7662            let schema = KvSchema::new(client)
7663                .table(
7664                    "orders",
7665                    vec![
7666                        TableColumnConfig::new("id", DataType::Int64, false),
7667                        TableColumnConfig::new("status", DataType::Utf8, false),
7668                        TableColumnConfig::new("amount_cents", DataType::Int64, false),
7669                    ],
7670                    vec!["id".to_string()],
7671                    vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7672                        .expect("valid")
7673                        .with_cover_columns(vec!["amount_cents".to_string()])],
7674                )
7675                .expect("schema");
7676
7677            let mut writer = schema.batch_writer();
7678            for (id, status, amount) in [
7679                (1i64, "open", 100i64),
7680                (2, "closed", 200),
7681                (3, "open", 300),
7682                (4, "closed", 400),
7683                (5, "open", 500),
7684            ] {
7685                writer
7686                    .insert(
7687                        "orders",
7688                        vec![
7689                            CellValue::Int64(id),
7690                            CellValue::Utf8(status.to_string()),
7691                            CellValue::Int64(amount),
7692                        ],
7693                    )
7694                    .expect("insert row");
7695            }
7696            writer.flush().await.expect("flush batch");
7697
7698            let ctx = SessionContext::new();
7699            schema.register_all(&ctx).expect("register tables");
7700
7701            let batches = ctx
7702                .sql("SELECT id, amount_cents FROM orders ORDER BY id")
7703                .await
7704                .expect("full scan query")
7705                .collect()
7706                .await
7707                .expect("collect full scan");
7708            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
7709            assert_eq!(total_rows, 5, "all 5 rows returned from full scan");
7710
7711            let mut ids = Vec::new();
7712            let mut amounts = Vec::new();
7713            for batch in &batches {
7714                let id_col = batch
7715                    .column(0)
7716                    .as_any()
7717                    .downcast_ref::<Int64Array>()
7718                    .expect("id column");
7719                let amt_col = batch
7720                    .column(1)
7721                    .as_any()
7722                    .downcast_ref::<Int64Array>()
7723                    .expect("amount column");
7724                for i in 0..batch.num_rows() {
7725                    ids.push(id_col.value(i));
7726                    amounts.push(amt_col.value(i));
7727                }
7728            }
7729            assert_eq!(ids, vec![1, 2, 3, 4, 5]);
7730            assert_eq!(amounts, vec![100, 200, 300, 400, 500]);
7731
7732            let filtered = ctx
7733                .sql(
7734                    "SELECT id, amount_cents FROM orders \
7735                     WHERE status = 'open' ORDER BY id",
7736                )
7737                .await
7738                .expect("filtered query")
7739                .collect()
7740                .await
7741                .expect("collect filtered");
7742            let mut filtered_ids = Vec::new();
7743            let mut filtered_amounts = Vec::new();
7744            for batch in &filtered {
7745                let id_col = batch
7746                    .column(0)
7747                    .as_any()
7748                    .downcast_ref::<Int64Array>()
7749                    .expect("id column");
7750                let amt_col = batch
7751                    .column(1)
7752                    .as_any()
7753                    .downcast_ref::<Int64Array>()
7754                    .expect("amount column");
7755                for i in 0..batch.num_rows() {
7756                    filtered_ids.push(id_col.value(i));
7757                    filtered_amounts.push(amt_col.value(i));
7758                }
7759            }
7760            assert_eq!(filtered_ids, vec![1, 3, 5]);
7761            assert_eq!(filtered_amounts, vec![100, 300, 500]);
7762
7763            let agg = ctx
7764                .sql(
7765                    "SELECT COUNT(*) AS cnt, SUM(amount_cents) AS total \
7766                     FROM orders WHERE status = 'open'",
7767                )
7768                .await
7769                .expect("aggregate query")
7770                .collect()
7771                .await
7772                .expect("collect aggregate");
7773            assert_eq!(agg.len(), 1);
7774            let batch = &agg[0];
7775            assert_eq!(batch.num_rows(), 1);
7776            assert_count_scalar(batch, 0, 0, 3);
7777            let total = ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar");
7778            match total {
7779                ScalarValue::Int64(Some(v)) => assert_eq!(v, 900),
7780                other => panic!("unexpected sum type: {other:?}"),
7781            }
7782        }
7783    }
7784}