Skip to main content

exoware_sql/
lib.rs

1pub mod prune;
2pub mod server;
3
4mod aggregate;
5mod builder;
6mod codec;
7mod diagnostics;
8mod filter;
9mod predicate;
10mod scan;
11mod schema;
12mod types;
13mod writer;
14
15pub use schema::KvSchema;
16pub use server::{sql_connect_stack, SqlConnect, SqlServer};
17pub use types::default_orders_index_specs;
18pub use types::{
19    CellValue, IndexBackfillEvent, IndexBackfillOptions, IndexBackfillReport, IndexLayout,
20    IndexSpec, TableColumnConfig,
21};
22pub use writer::{BatchReceipt, BatchWriter, PreparedBatch, TableWriter};
23
24#[cfg(test)]
25mod tests {
26    use super::aggregate::*;
27    use super::builder::*;
28    use super::codec::*;
29    use super::diagnostics::*;
30    use super::filter::*;
31    use super::predicate::*;
32    use super::scan::*;
33    use super::types::*;
34    use super::writer::*;
35    use super::*;
36    use commonware_codec::Encode;
37    use datafusion::arrow::array::{Float64Array, Int64Array, LargeStringArray, StringViewArray};
38    use datafusion::arrow::datatypes::{i256, DataType, TimeUnit};
39    use datafusion::arrow::record_batch::RecordBatch;
40    use datafusion::common::ScalarValue;
41    use datafusion::logical_expr::{Expr, Operator};
42    use datafusion::physical_plan::ExecutionPlan;
43    use datafusion::prelude::SessionContext;
44    use exoware_sdk::keys::{Key, KeyCodec};
45    use exoware_sdk::kv_codec::{
46        canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key,
47        eval_predicate, KvReducedValue, StoredRow,
48    };
49    use exoware_sdk::{RangeReduceOp, RangeReduceRequest, StoreBatchUpload, StoreClient};
50    use std::collections::{BTreeMap, HashSet};
51    use std::ops::Bound::{Included, Unbounded};
52    use std::pin::Pin;
53    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
54    use std::sync::{Arc, Mutex};
55    use std::time::Duration;
56
57    use axum::Router;
58    use bytes::Bytes;
59    use connectrpc::{Chain, ConnectError, ConnectRpcService, Context};
60    use exoware_sdk::connect_compression_registry;
61    use exoware_sdk::kv_codec::{eval_expr, expr_needs_value};
62    use exoware_sdk::store::common::v1::KvEntry as ProtoKvEntry;
63    use exoware_sdk::store::ingest::v1::{
64        PutResponse as ProtoPutResponse, Service as IngestService,
65        ServiceServer as IngestServiceServer,
66    };
67    use exoware_sdk::store::query::v1::{
68        GetManyEntry as ProtoGetManyEntry, GetManyFrame as ProtoGetManyFrame,
69        GetResponse as ProtoGetResponse, RangeFrame as ProtoRangeFrame,
70        ReduceResponse as ProtoReduceResponse, Service as QueryService,
71        ServiceServer as QueryServiceServer,
72    };
73    use exoware_sdk::RangeMode;
74    use exoware_sdk::{
75        parse_range_traversal_direction, to_domain_reduce_request, to_proto_optional_reduced_value,
76        to_proto_reduced_value, RangeTraversalDirection, RangeTraversalModeError,
77    };
78    use exoware_sdk::{RangeReduceGroup, RangeReduceResponse, RangeReduceResult};
79    use futures::{stream, Stream, TryStreamExt};
80    use tokio::sync::{mpsc, oneshot, Notify};
81
82    /// Assert EXPLAIN text includes the same `query_stats=...` suffix as [`format_query_stats_explain`].
83    fn assert_explain_includes_query_stats_surface(
84        explain: &str,
85        surface: QueryStatsExplainSurface,
86    ) {
87        let expected = format!("query_stats={}", format_query_stats_explain(surface));
88        assert!(
89            explain.contains(&expected),
90            "expected EXPLAIN output to include `{expected}`\n{explain}"
91        );
92    }
93
94    fn simple_int64_model(prefix: u8) -> TableModel {
95        let config = KvTableConfig::new(
96            prefix,
97            vec![TableColumnConfig::new("id", DataType::Int64, false)],
98            vec!["id".to_string()],
99            vec![],
100        )
101        .unwrap();
102        TableModel::from_config(&config).unwrap()
103    }
104
105    fn codec_payload(codec: KeyCodec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
106        codec.read_payload(key, offset, len).expect("codec payload")
107    }
108
109    fn primary_payload(model: &TableModel, key: &Key, offset: usize, len: usize) -> Vec<u8> {
110        codec_payload(model.primary_key_codec, key, offset, len)
111    }
112
113    fn index_payload(spec: &ResolvedIndexSpec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
114        codec_payload(spec.codec, key, offset, len)
115    }
116
117    fn matches_primary_key(table_prefix: u8, key: &Key) -> bool {
118        primary_key_codec(table_prefix)
119            .expect("primary codec")
120            .matches(key)
121    }
122
123    fn matches_secondary_index_key(table_prefix: u8, index_id: u8, key: &Key) -> bool {
124        secondary_index_codec(table_prefix, index_id)
125            .expect("secondary codec")
126            .matches(key)
127    }
128
129    fn test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
130        let config = KvTableConfig::new(
131            0,
132            vec![
133                TableColumnConfig::new("region", DataType::Utf8, false),
134                TableColumnConfig::new("customer_id", DataType::Int64, false),
135                TableColumnConfig::new("order_id", DataType::Int64, false),
136                TableColumnConfig::new("amount_cents", DataType::Int64, false),
137                TableColumnConfig::new("status", DataType::Utf8, false),
138            ],
139            vec!["order_id".to_string()],
140            vec![
141                IndexSpec::new(
142                    "region_customer",
143                    vec!["region".to_string(), "customer_id".to_string()],
144                )
145                .expect("valid"),
146                IndexSpec::new(
147                    "status_customer",
148                    vec!["status".to_string(), "customer_id".to_string()],
149                )
150                .expect("valid"),
151            ],
152        )
153        .expect("valid config");
154        let model = TableModel::from_config(&config).expect("model");
155        let specs = model
156            .resolve_index_specs(&config.index_specs)
157            .expect("specs");
158        (model, specs)
159    }
160
161    fn zorder_test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
162        let config = KvTableConfig::new(
163            0,
164            vec![
165                TableColumnConfig::new("x", DataType::Int64, false),
166                TableColumnConfig::new("y", DataType::Int64, false),
167                TableColumnConfig::new("id", DataType::Int64, false),
168                TableColumnConfig::new("value", DataType::Int64, false),
169            ],
170            vec!["id".to_string()],
171            vec![
172                IndexSpec::new("xy_lex", vec!["x".to_string(), "y".to_string()])
173                    .expect("valid")
174                    .with_cover_columns(vec!["value".to_string()]),
175                IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
176                    .expect("valid")
177                    .with_cover_columns(vec!["value".to_string()]),
178            ],
179        )
180        .expect("valid config");
181        let model = TableModel::from_config(&config).expect("model");
182        let specs = model
183            .resolve_index_specs(&config.index_specs)
184            .expect("specs");
185        (model, specs)
186    }
187
188    #[derive(Clone)]
189    struct MockState {
190        kv: Arc<Mutex<BTreeMap<Key, Bytes>>>,
191        range_calls: Arc<AtomicUsize>,
192        range_reduce_calls: Arc<AtomicUsize>,
193        sequence_number: Arc<AtomicU64>,
194    }
195
196    #[derive(Debug)]
197    struct MockGroupedReduceState {
198        group_values: Vec<Option<KvReducedValue>>,
199        states: Vec<PartialAggregateState>,
200    }
201
202    type MockReduceRow = (Vec<Option<KvReducedValue>>, Vec<Option<KvReducedValue>>);
203
204    fn extract_mock_reduce_row(
205        key: &Key,
206        value: &Bytes,
207        request: &RangeReduceRequest,
208    ) -> Option<MockReduceRow> {
209        let needs_value = request
210            .group_by
211            .iter()
212            .chain(
213                request
214                    .reducers
215                    .iter()
216                    .filter_map(|reducer| reducer.expr.as_ref()),
217            )
218            .any(expr_needs_value)
219            || request
220                .filter
221                .as_ref()
222                .is_some_and(exoware_sdk::kv_codec::predicate_needs_value);
223        let archived = if needs_value {
224            decode_stored_row(value.as_ref()).ok()
225        } else {
226            None
227        };
228
229        if let Some(filter) = &request.filter {
230            if !eval_predicate(key, archived.as_ref(), filter).ok()? {
231                return None;
232            }
233        }
234
235        let mut group_values = Vec::with_capacity(request.group_by.len());
236        for expr in &request.group_by {
237            let extracted_value = eval_expr(key, archived.as_ref(), expr).ok()?;
238            group_values.push(extracted_value);
239        }
240        canonicalize_reduced_group_values(&mut group_values);
241
242        let mut reducer_values = Vec::with_capacity(request.reducers.len());
243        for reducer in &request.reducers {
244            let extracted_value = match (&reducer.expr, archived.as_ref()) {
245                (None, _) => None,
246                (Some(expr), _) => eval_expr(key, archived.as_ref(), expr).ok()?,
247            };
248            reducer_values.push(extracted_value);
249        }
250
251        Some((group_values, reducer_values))
252    }
253
254    #[allow(clippy::result_large_err)]
255    fn ensure_min_sequence_number(
256        token: &Arc<AtomicU64>,
257        required: Option<u64>,
258    ) -> Result<(), ConnectError> {
259        let current = token.load(AtomicOrdering::Relaxed);
260        if let Some(required) = required {
261            if current < required {
262                return Err(ConnectError::aborted(format!(
263                    "consistency_not_ready: required={required}, current={current}"
264                )));
265            }
266        }
267        Ok(())
268    }
269
270    fn proto_range_entries_frame(results: Vec<(Key, Vec<u8>)>) -> ProtoRangeFrame {
271        ProtoRangeFrame {
272            results: results
273                .into_iter()
274                .map(|(key, value)| ProtoKvEntry {
275                    key: key.to_vec(),
276                    value,
277                    ..Default::default()
278                })
279                .collect(),
280            ..Default::default()
281        }
282    }
283
284    fn query_detail_trailer_ctx(sequence_number: u64) -> Context {
285        let detail = exoware_sdk::store::query::v1::Detail {
286            sequence_number,
287            read_stats: Default::default(),
288            ..Default::default()
289        };
290        exoware_sdk::with_query_detail_trailer(Context::default(), &detail)
291    }
292
293    #[derive(Clone)]
294    struct MockIngestConnect {
295        state: MockState,
296    }
297
298    impl IngestService for MockIngestConnect {
299        async fn put(
300            &self,
301            ctx: Context,
302            request: buffa::view::OwnedView<
303                exoware_sdk::store::ingest::v1::PutRequestView<'static>,
304            >,
305        ) -> Result<(ProtoPutResponse, Context), ConnectError> {
306            let mut parsed = Vec::<(Key, Bytes)>::new();
307            for kv in request.kvs.iter() {
308                parsed.push((kv.key.to_vec().into(), Bytes::copy_from_slice(kv.value)));
309            }
310            let mut guard = self.state.kv.lock().expect("kv mutex poisoned");
311            for (key, value) in parsed.iter() {
312                guard.insert(key.clone(), value.clone());
313            }
314            let seq = self
315                .state
316                .sequence_number
317                .fetch_add(1, AtomicOrdering::SeqCst)
318                + 1;
319            Ok((
320                ProtoPutResponse {
321                    sequence_number: seq,
322                    ..Default::default()
323                },
324                ctx,
325            ))
326        }
327    }
328
329    #[derive(Clone)]
330    struct MockQueryConnect {
331        state: MockState,
332    }
333
334    impl QueryService for MockQueryConnect {
335        async fn get(
336            &self,
337            _ctx: Context,
338            request: buffa::view::OwnedView<exoware_sdk::store::query::v1::GetRequestView<'static>>,
339        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
340            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
341            let key: Key = request.key.to_vec().into();
342            let guard = self.state.kv.lock().expect("kv mutex poisoned");
343            let value = guard.get(&key).cloned();
344            let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
345            let detail = exoware_sdk::store::query::v1::Detail {
346                sequence_number: token,
347                read_stats: Default::default(),
348                ..Default::default()
349            };
350            Ok((
351                ProtoGetResponse {
352                    value: value.map(|v| v.to_vec()),
353                    ..Default::default()
354                },
355                exoware_sdk::with_query_detail_response_header(Context::default(), &detail),
356            ))
357        }
358
359        async fn range(
360            &self,
361            _ctx: Context,
362            request: buffa::view::OwnedView<
363                exoware_sdk::store::query::v1::RangeRequestView<'static>,
364            >,
365        ) -> Result<
366            (
367                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
368                Context,
369            ),
370            ConnectError,
371        > {
372            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
373            self.state.range_calls.fetch_add(1, AtomicOrdering::SeqCst);
374
375            let start_key: Key = request.start.to_vec().into();
376            let end_key: Key = request.end.to_vec().into();
377            let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
378            let batch_size = usize::try_from(request.batch_size).unwrap_or(usize::MAX);
379            if batch_size == 0 {
380                return Err(ConnectError::invalid_argument(
381                    "invalid batch_size: expected positive integer",
382                ));
383            }
384
385            let mode = match parse_range_traversal_direction(request.mode) {
386                Ok(RangeTraversalDirection::Forward) => RangeMode::Forward,
387                Ok(RangeTraversalDirection::Reverse) => RangeMode::Reverse,
388                Err(RangeTraversalModeError::UnknownWireValue(v)) => {
389                    return Err(ConnectError::invalid_argument(format!(
390                        "unknown TraversalMode enum value {v}"
391                    )));
392                }
393            };
394
395            let state = self.state.clone();
396            let guard = state.kv.lock().expect("kv mutex poisoned");
397            // Match `StoreEngine::range_scan`: inclusive [start, end]; empty end = unbounded.
398            let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
399                Included(&start_key),
400                if end_key.is_empty() {
401                    Unbounded
402                } else {
403                    Included(&end_key)
404                },
405            );
406            let range_iter = guard.range::<Key, _>(range);
407            let iter: Box<dyn Iterator<Item = (&Key, &Bytes)> + Send> = match mode {
408                RangeMode::Forward => Box::new(range_iter),
409                RangeMode::Reverse => Box::new(range_iter.rev()),
410            };
411            let mut results: Vec<ProtoKvEntry> = Vec::new();
412            for (key, value) in iter.take(limit) {
413                results.push(ProtoKvEntry {
414                    key: key.to_vec(),
415                    value: value.to_vec(),
416                    ..Default::default()
417                });
418            }
419            drop(guard);
420            let token = state.sequence_number.load(AtomicOrdering::Relaxed);
421            let batch = batch_size.max(1);
422            let mut frames: Vec<Result<ProtoRangeFrame, ConnectError>> = Vec::new();
423            for chunk in results.chunks(batch) {
424                frames.push(Ok(ProtoRangeFrame {
425                    results: chunk.to_vec(),
426                    ..Default::default()
427                }));
428            }
429            let detail = exoware_sdk::store::query::v1::Detail {
430                sequence_number: token,
431                read_stats: Default::default(),
432                ..Default::default()
433            };
434            Ok((
435                Box::pin(stream::iter(frames)),
436                exoware_sdk::with_query_detail_trailer(Context::default(), &detail),
437            ))
438        }
439
440        async fn get_many(
441            &self,
442            _ctx: Context,
443            request: buffa::view::OwnedView<
444                exoware_sdk::store::query::v1::GetManyRequestView<'static>,
445            >,
446        ) -> Result<
447            (
448                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
449                Context,
450            ),
451            ConnectError,
452        > {
453            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
454            let batch_size = usize::try_from(request.batch_size)
455                .unwrap_or(usize::MAX)
456                .max(1);
457            let guard = self.state.kv.lock().expect("kv mutex poisoned");
458            let mut entries: Vec<ProtoGetManyEntry> = Vec::new();
459            for key_bytes in request.keys.iter() {
460                let key: Key = key_bytes.to_vec().into();
461                let value = guard.get(&key).cloned();
462                entries.push(ProtoGetManyEntry {
463                    key: key.to_vec(),
464                    value: value.map(|v| v.to_vec()),
465                    ..Default::default()
466                });
467            }
468            drop(guard);
469            let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
470            let mut frames: Vec<Result<ProtoGetManyFrame, ConnectError>> = Vec::new();
471            for chunk in entries.chunks(batch_size) {
472                frames.push(Ok(ProtoGetManyFrame {
473                    results: chunk.to_vec(),
474                    ..Default::default()
475                }));
476            }
477            let detail = exoware_sdk::store::query::v1::Detail {
478                sequence_number: token,
479                read_stats: Default::default(),
480                ..Default::default()
481            };
482            Ok((
483                Box::pin(stream::iter(frames)),
484                exoware_sdk::with_query_detail_trailer(Context::default(), &detail),
485            ))
486        }
487
488        async fn reduce(
489            &self,
490            _ctx: Context,
491            request: buffa::view::OwnedView<
492                exoware_sdk::store::query::v1::ReduceRequestView<'static>,
493            >,
494        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
495            ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
496            self.state
497                .range_reduce_calls
498                .fetch_add(1, AtomicOrdering::SeqCst);
499            let owned = request.to_owned_message();
500            let start_key: Key = owned.start.clone().into();
501            let end_key: Key = owned.end.clone().into();
502            let reduce_req = owned
503                .params
504                .as_option()
505                .ok_or_else(|| ConnectError::invalid_argument("missing range reduce params"))?;
506            let domain_request =
507                to_domain_reduce_request(reduce_req).map_err(ConnectError::invalid_argument)?;
508
509            let state = self.state.clone();
510            let guard = state.kv.lock().expect("kv mutex poisoned");
511            let mut states = domain_request.group_by.is_empty().then(|| {
512                domain_request
513                    .reducers
514                    .iter()
515                    .map(|reducer| PartialAggregateState::from_op(reducer.op))
516                    .collect::<Vec<_>>()
517            });
518            let mut grouped = BTreeMap::<Vec<u8>, MockGroupedReduceState>::new();
519
520            let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
521                Included(&start_key),
522                if end_key.is_empty() {
523                    Unbounded
524                } else {
525                    Included(&end_key)
526                },
527            );
528            for (key, value) in guard.range::<Key, _>(range) {
529                let Some((group_values, reducer_values)) =
530                    extract_mock_reduce_row(key, value, &domain_request)
531                else {
532                    continue;
533                };
534                if domain_request.group_by.is_empty() {
535                    let states = states.as_mut().expect("scalar states");
536                    for ((state, reducer), value) in states
537                        .iter_mut()
538                        .zip(domain_request.reducers.iter())
539                        .zip(reducer_values)
540                    {
541                        match reducer.op {
542                            RangeReduceOp::CountAll => state
543                                .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
544                                .map_err(|e| ConnectError::internal(e.to_string()))?,
545                            RangeReduceOp::CountField => {
546                                let partial =
547                                    KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
548                                state
549                                    .merge_partial(reducer.op, Some(&partial))
550                                    .map_err(|e| ConnectError::internal(e.to_string()))?
551                            }
552                            _ => state
553                                .merge_partial(reducer.op, value.as_ref())
554                                .map_err(|e| ConnectError::internal(e.to_string()))?,
555                        }
556                    }
557                } else {
558                    let group_key = encode_reduced_group_key(&group_values);
559                    let group =
560                        grouped
561                            .entry(group_key)
562                            .or_insert_with(|| MockGroupedReduceState {
563                                group_values: group_values.clone(),
564                                states: domain_request
565                                    .reducers
566                                    .iter()
567                                    .map(|reducer| PartialAggregateState::from_op(reducer.op))
568                                    .collect(),
569                            });
570                    for ((state, reducer), value) in group
571                        .states
572                        .iter_mut()
573                        .zip(domain_request.reducers.iter())
574                        .zip(reducer_values)
575                    {
576                        match reducer.op {
577                            RangeReduceOp::CountAll => state
578                                .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
579                                .map_err(|e| ConnectError::internal(e.to_string()))?,
580                            RangeReduceOp::CountField => {
581                                let partial =
582                                    KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
583                                state
584                                    .merge_partial(reducer.op, Some(&partial))
585                                    .map_err(|e| ConnectError::internal(e.to_string()))?
586                            }
587                            _ => state
588                                .merge_partial(reducer.op, value.as_ref())
589                                .map_err(|e| ConnectError::internal(e.to_string()))?,
590                        }
591                    }
592                }
593            }
594
595            let response = if let Some(states) = states {
596                RangeReduceResponse {
597                    results: states
598                        .iter()
599                        .map(|state| RangeReduceResult {
600                            value: match state {
601                                PartialAggregateState::Count(count) => {
602                                    Some(KvReducedValue::UInt64(*count))
603                                }
604                                PartialAggregateState::Sum(value)
605                                | PartialAggregateState::Min(value)
606                                | PartialAggregateState::Max(value) => value.clone(),
607                            },
608                        })
609                        .collect(),
610                    groups: Vec::new(),
611                }
612            } else {
613                RangeReduceResponse {
614                    results: Vec::new(),
615                    groups: grouped
616                        .into_values()
617                        .map(|group| RangeReduceGroup {
618                            group_values: group.group_values,
619                            results: group
620                                .states
621                                .into_iter()
622                                .map(|state| RangeReduceResult {
623                                    value: match state {
624                                        PartialAggregateState::Count(count) => {
625                                            Some(KvReducedValue::UInt64(count))
626                                        }
627                                        PartialAggregateState::Sum(value)
628                                        | PartialAggregateState::Min(value)
629                                        | PartialAggregateState::Max(value) => value,
630                                    },
631                                })
632                                .collect(),
633                        })
634                        .collect(),
635                }
636            };
637            drop(guard);
638            let token = state.sequence_number.load(AtomicOrdering::Relaxed);
639            let detail = exoware_sdk::store::query::v1::Detail {
640                sequence_number: token,
641                read_stats: Default::default(),
642                ..Default::default()
643            };
644            Ok((
645                ProtoReduceResponse {
646                    results: response
647                        .results
648                        .into_iter()
649                        .map(|result| exoware_sdk::store::query::v1::RangeReduceResult {
650                            value: result.value.map(to_proto_reduced_value).into(),
651                            ..Default::default()
652                        })
653                        .collect(),
654                    groups: response
655                        .groups
656                        .into_iter()
657                        .map(|group| {
658                            let group_values_present: Vec<bool> =
659                                group.group_values.iter().map(|v| v.is_some()).collect();
660                            exoware_sdk::store::query::v1::RangeReduceGroup {
661                                group_values: group
662                                    .group_values
663                                    .into_iter()
664                                    .map(to_proto_optional_reduced_value)
665                                    .collect(),
666                                group_values_present,
667                                results: group
668                                    .results
669                                    .into_iter()
670                                    .map(|result| {
671                                        exoware_sdk::store::query::v1::RangeReduceResult {
672                                            value: result.value.map(to_proto_reduced_value).into(),
673                                            ..Default::default()
674                                        }
675                                    })
676                                    .collect(),
677                                ..Default::default()
678                            }
679                        })
680                        .collect(),
681                    ..Default::default()
682                },
683                exoware_sdk::with_query_detail_response_header(Context::default(), &detail),
684            ))
685        }
686    }
687
688    async fn spawn_mock_server(state: MockState) -> (String, oneshot::Sender<()>) {
689        let connect = ConnectRpcService::new(Chain(
690            IngestServiceServer::new(MockIngestConnect {
691                state: state.clone(),
692            }),
693            QueryServiceServer::new(MockQueryConnect { state }),
694        ))
695        .with_compression(connect_compression_registry());
696        let app = Router::new().fallback_service(connect);
697
698        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
699            .await
700            .expect("bind mock server");
701        let addr = listener.local_addr().expect("local addr");
702        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
703        tokio::spawn(async move {
704            axum::serve(listener, app)
705                .with_graceful_shutdown(async move {
706                    let _ = shutdown_rx.await;
707                })
708                .await
709                .expect("mock server should run");
710        });
711        (format!("http://{addr}"), shutdown_tx)
712    }
713
714    fn assert_count_scalar(batch: &RecordBatch, col_idx: usize, row_idx: usize, expected: u64) {
715        let scalar = ScalarValue::try_from_array(batch.column(col_idx), row_idx)
716            .expect("count scalar should decode");
717        match scalar {
718            ScalarValue::UInt64(Some(value)) => assert_eq!(value, expected),
719            ScalarValue::Int64(Some(value)) => assert_eq!(value, expected as i64),
720            other => panic!("unexpected count scalar: {other:?}"),
721        }
722    }
723
724    async fn explain_plan_rows(ctx: &SessionContext, sql: &str) -> Vec<(String, String)> {
725        let batches = ctx
726            .sql(&format!("EXPLAIN {sql}"))
727            .await
728            .expect("explain query")
729            .collect()
730            .await
731            .expect("explain collect");
732        let mut rows = Vec::new();
733        for batch in batches {
734            for row_idx in 0..batch.num_rows() {
735                let plan_type = scalar_to_string(
736                    &ScalarValue::try_from_array(batch.column(0), row_idx).expect("plan type"),
737                )
738                .expect("plan type string");
739                let plan = scalar_to_string(
740                    &ScalarValue::try_from_array(batch.column(1), row_idx).expect("plan"),
741                )
742                .expect("plan string");
743                rows.push((plan_type, plan));
744            }
745        }
746        rows
747    }
748
749    fn physical_plan_text(rows: &[(String, String)]) -> String {
750        rows.iter()
751            .filter(|(plan_type, _)| plan_type.contains("physical_plan"))
752            .map(|(_, plan)| plan.as_str())
753            .collect::<Vec<_>>()
754            .join("\n")
755    }
756
757    #[tokio::test]
758    async fn explain_reports_full_scan_like_primary_key_scan() {
759        let state = MockState {
760            kv: Arc::new(Mutex::new(BTreeMap::new())),
761            range_calls: Arc::new(AtomicUsize::new(0)),
762            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
763            sequence_number: Arc::new(AtomicU64::new(0)),
764        };
765        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
766        let client = StoreClient::new(&base_url);
767
768        let schema = KvSchema::new(client)
769            .table(
770                "orders",
771                vec![
772                    TableColumnConfig::new("id", DataType::Int64, false),
773                    TableColumnConfig::new("status", DataType::Utf8, false),
774                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
775                ],
776                vec!["id".to_string()],
777                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
778                    .expect("valid")
779                    .with_cover_columns(vec!["amount_cents".to_string()])],
780            )
781            .expect("schema");
782        let ctx = SessionContext::new();
783        schema.register_all(&ctx).expect("register");
784
785        let explain =
786            physical_plan_text(&explain_plan_rows(&ctx, "SELECT id, status FROM orders").await);
787        assert!(explain.contains("KvScanExec:"));
788        assert!(explain.contains("mode=primary_key"));
789        assert!(explain.contains("predicate=<none>"));
790        assert!(explain.contains("row_recheck=false"));
791        assert!(explain.contains("full_scan_like=true"));
792        assert_explain_includes_query_stats_surface(
793            &explain,
794            QueryStatsExplainSurface::StreamedRangeTrailer,
795        );
796
797        let _ = shutdown_tx.send(());
798    }
799
800    #[tokio::test]
801    async fn explain_reports_secondary_index_scan_and_row_recheck() {
802        let state = MockState {
803            kv: Arc::new(Mutex::new(BTreeMap::new())),
804            range_calls: Arc::new(AtomicUsize::new(0)),
805            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
806            sequence_number: Arc::new(AtomicU64::new(0)),
807        };
808        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
809        let client = StoreClient::new(&base_url);
810
811        let schema = KvSchema::new(client)
812            .table(
813                "orders",
814                vec![
815                    TableColumnConfig::new("id", DataType::Int64, false),
816                    TableColumnConfig::new("status", DataType::Utf8, false),
817                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
818                ],
819                vec!["id".to_string()],
820                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
821                    .expect("valid")
822                    .with_cover_columns(vec!["amount_cents".to_string()])],
823            )
824            .expect("schema");
825        let ctx = SessionContext::new();
826        schema.register_all(&ctx).expect("register");
827
828        let explain = physical_plan_text(
829            &explain_plan_rows(
830                &ctx,
831                "SELECT id, status, amount_cents FROM orders \
832                 WHERE status = 'open' AND amount_cents >= 5",
833            )
834            .await,
835        );
836        assert!(explain.contains("KvScanExec:"));
837        assert!(explain.contains("mode=secondary_index(status_idx, lexicographic)"));
838        assert!(explain.contains("predicate=status = 'open' AND amount_cents >= 5"));
839        assert!(explain.contains("exact=false"));
840        assert!(explain.contains("row_recheck=true"));
841        assert!(explain.contains("full_scan_like=false"));
842
843        let _ = shutdown_tx.send(());
844    }
845
846    #[tokio::test]
847    async fn explain_reports_zorder_secondary_index_scan() {
848        let state = MockState {
849            kv: Arc::new(Mutex::new(BTreeMap::new())),
850            range_calls: Arc::new(AtomicUsize::new(0)),
851            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
852            sequence_number: Arc::new(AtomicU64::new(0)),
853        };
854        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
855        let client = StoreClient::new(&base_url);
856
857        let schema = KvSchema::new(client)
858            .table(
859                "points",
860                vec![
861                    TableColumnConfig::new("x", DataType::Int64, false),
862                    TableColumnConfig::new("y", DataType::Int64, false),
863                    TableColumnConfig::new("id", DataType::Int64, false),
864                    TableColumnConfig::new("value", DataType::Int64, false),
865                ],
866                vec!["id".to_string()],
867                vec![
868                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
869                        .expect("valid")
870                        .with_cover_columns(vec!["value".to_string()]),
871                ],
872            )
873            .expect("schema");
874        let ctx = SessionContext::new();
875        schema.register_all(&ctx).expect("register");
876
877        let explain = physical_plan_text(
878            &explain_plan_rows(
879                &ctx,
880                "SELECT id, value FROM points \
881                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
882            )
883            .await,
884        );
885        assert!(explain.contains("KvScanExec:"));
886        assert!(explain.contains("mode=secondary_index(xy_z, z_order)"));
887        assert!(explain.contains("exact=false"));
888        assert!(explain.contains("row_recheck=true"));
889
890        let _ = shutdown_tx.send(());
891    }
892
893    #[tokio::test]
894    async fn explain_reports_aggregate_pushdown_access_path_details() {
895        let state = MockState {
896            kv: Arc::new(Mutex::new(BTreeMap::new())),
897            range_calls: Arc::new(AtomicUsize::new(0)),
898            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
899            sequence_number: Arc::new(AtomicU64::new(0)),
900        };
901        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
902        let client = StoreClient::new(&base_url);
903
904        let schema = KvSchema::new(client)
905            .table(
906                "orders",
907                vec![
908                    TableColumnConfig::new("id", DataType::Int64, false),
909                    TableColumnConfig::new("status", DataType::Utf8, false),
910                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
911                ],
912                vec!["id".to_string()],
913                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
914                    .expect("valid")
915                    .with_cover_columns(vec!["amount_cents".to_string()])],
916            )
917            .expect("schema");
918        let ctx = SessionContext::new();
919        schema.register_all(&ctx).expect("register");
920
921        let explain = physical_plan_text(
922            &explain_plan_rows(
923                &ctx,
924                "SELECT status, SUM(amount_cents) AS total_cents \
925                 FROM orders WHERE status = 'open' GROUP BY status",
926            )
927            .await,
928        );
929        assert!(explain.contains("KvAggregateExec:"));
930        assert!(explain.contains("grouped=true"));
931        assert!(explain.contains("job0{mode=secondary_index(status_idx, lexicographic)"));
932        assert!(explain.contains("predicate=status = 'open'"));
933        assert!(explain.contains("exact=true"));
934        assert!(explain.contains("row_recheck=false"));
935        assert_explain_includes_query_stats_surface(
936            &explain,
937            QueryStatsExplainSurface::RangeReduceHeader,
938        );
939
940        let _ = shutdown_tx.send(());
941    }
942
943    #[test]
944    fn index_spec_constructor_sets_name_and_keys() {
945        let spec = IndexSpec::new(
946            "status_customer",
947            vec!["status".to_string(), "customer_id".to_string()],
948        )
949        .expect("valid index spec");
950        assert_eq!(spec.name(), "status_customer");
951        assert_eq!(spec.key_columns(), &["status", "customer_id"]);
952        assert!(spec.cover_columns().is_empty());
953    }
954
955    #[test]
956    fn index_spec_cover_columns_are_configurable_in_code() {
957        let spec = IndexSpec::new("status_customer", vec!["status".to_string()])
958            .expect("valid")
959            .with_cover_columns(vec!["amount_cents".to_string()]);
960        assert_eq!(spec.key_columns(), &["status"]);
961        assert_eq!(spec.cover_columns(), &["amount_cents"]);
962    }
963
964    #[test]
965    fn describe_in_list_places_truncation_ellipsis_inside_parentheses() {
966        let rendered = describe_in_list((1..=6).map(|v| v.to_string()));
967        assert_eq!(rendered, "IN (1, 2, 3, 4, 5, ...)");
968    }
969
970    #[test]
971    fn normalize_sum_case_then_one_uses_countall_optimization() {
972        let (model, _) = test_model();
973        let argument = normalize_case_then_expr(
974            AggregatePushdownFunction::Sum,
975            &Expr::Literal(ScalarValue::Int64(Some(1)), None),
976            &model,
977        )
978        .expect("normalize");
979        assert_eq!(argument, AggregatePushdownArgument::CountAll);
980    }
981
982    #[test]
983    fn normalize_count_case_then_literal_uses_countall_optimization() {
984        use datafusion::logical_expr::col;
985
986        let (model, _) = test_model();
987        let case_expr = Expr::Case(datafusion::logical_expr::expr::Case {
988            expr: None,
989            when_then_expr: vec![(
990                Box::new(col("status").eq(Expr::Literal(
991                    ScalarValue::Utf8(Some("open".to_string())),
992                    None,
993                ))),
994                Box::new(Expr::Literal(
995                    ScalarValue::Utf8(Some("yes".to_string())),
996                    None,
997                )),
998            )],
999            else_expr: Some(Box::new(Expr::Literal(ScalarValue::Utf8(None), None))),
1000        });
1001
1002        let (func, argument, filter) =
1003            normalize_count_aggregate_argument(&case_expr, &model).expect("normalize");
1004        assert_eq!(func, AggregatePushdownFunction::Count);
1005        assert_eq!(argument, AggregatePushdownArgument::CountAll);
1006        assert!(filter.is_some());
1007    }
1008
1009    #[test]
1010    fn reduced_value_to_scalar_preserves_timestamp_timezone_label() {
1011        let tz: Arc<str> = Arc::from("America/New_York");
1012        let scalar = reduced_value_to_scalar(
1013            Some(KvReducedValue::Timestamp(1_700_000_000_000_000)),
1014            &DataType::Timestamp(TimeUnit::Microsecond, Some(tz.clone())),
1015        )
1016        .expect("timestamp scalar");
1017        assert_eq!(
1018            scalar,
1019            ScalarValue::TimestampMicrosecond(Some(1_700_000_000_000_000), Some(tz))
1020        );
1021    }
1022
1023    #[test]
1024    fn index_spec_cover_pk_column_is_rejected() {
1025        let config = KvTableConfig::new(
1026            0,
1027            vec![
1028                TableColumnConfig::new("id", DataType::Int64, false),
1029                TableColumnConfig::new("status", DataType::Utf8, false),
1030            ],
1031            vec!["id".to_string()],
1032            vec![IndexSpec::new("status_idx", vec!["status".to_string()])
1033                .expect("valid")
1034                .with_cover_columns(vec!["id".to_string()])],
1035        )
1036        .expect("valid config");
1037        let model = TableModel::from_config(&config).expect("model");
1038        let err = model
1039            .resolve_index_specs(&config.index_specs)
1040            .expect_err("covering a PK column must be rejected");
1041        assert!(err.contains("primary key column"));
1042    }
1043
1044    #[test]
1045    fn access_plan_requires_cover_columns_for_index_scan() {
1046        let (model, _) = test_model();
1047        let predicate = QueryPredicate::default();
1048        let projection = Some(vec![
1049            *model.columns_by_name.get("order_id").unwrap(),
1050            *model.columns_by_name.get("amount_cents").unwrap(),
1051        ]);
1052        let plan = ScanAccessPlan::new(&model, &projection, &predicate);
1053
1054        let no_cover = IndexSpec::new("status_idx", vec!["status".to_string()]).unwrap();
1055        let with_cover = IndexSpec::new("status_idx", vec!["status".to_string()])
1056            .unwrap()
1057            .with_cover_columns(vec!["amount_cents".to_string()]);
1058        let no_cover_resolved = model.resolve_index_specs(&[no_cover]).unwrap();
1059        let with_cover_resolved = model.resolve_index_specs(&[with_cover]).unwrap();
1060
1061        assert!(!plan.index_covers_required_non_pk(&no_cover_resolved[0]));
1062        assert!(plan.index_covers_required_non_pk(&with_cover_resolved[0]));
1063    }
1064
1065    #[test]
1066    fn choose_index_plan_prefers_longer_prefix() {
1067        let (model, specs) = test_model();
1068        let region_idx = *model.columns_by_name.get("region").unwrap();
1069        let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1070        let mut predicate = QueryPredicate::default();
1071        predicate.constraints.insert(
1072            region_idx,
1073            PredicateConstraint::StringEq("us-east".to_string()),
1074        );
1075        predicate.constraints.insert(
1076            customer_idx,
1077            PredicateConstraint::IntRange {
1078                min: Some(10),
1079                max: Some(20),
1080            },
1081        );
1082        let plan = predicate
1083            .choose_index_plan(&model, &specs)
1084            .expect("plan")
1085            .expect("exists");
1086        assert_eq!(plan.spec_idx, 0);
1087        assert_eq!(plan.constrained_prefix_len, 2);
1088    }
1089
1090    #[test]
1091    fn choose_index_plan_prefers_covering_index_when_prefix_strength_ties() {
1092        let config = KvTableConfig::new(
1093            0,
1094            vec![
1095                TableColumnConfig::new("id", DataType::Int64, false),
1096                TableColumnConfig::new("status", DataType::Utf8, false),
1097                TableColumnConfig::new("amount_cents", DataType::Int64, false),
1098            ],
1099            vec!["id".to_string()],
1100            vec![
1101                IndexSpec::new("status_plain", vec!["status".to_string()]).expect("valid"),
1102                IndexSpec::new("status_covering", vec!["status".to_string()])
1103                    .expect("valid")
1104                    .with_cover_columns(vec!["amount_cents".to_string()]),
1105            ],
1106        )
1107        .expect("config");
1108        let model = TableModel::from_config(&config).expect("model");
1109        let specs = model
1110            .resolve_index_specs(&config.index_specs)
1111            .expect("specs");
1112        let status_idx = *model.columns_by_name.get("status").unwrap();
1113        let amount_idx = *model.columns_by_name.get("amount_cents").unwrap();
1114        let mut predicate = QueryPredicate::default();
1115        predicate.constraints.insert(
1116            status_idx,
1117            PredicateConstraint::StringEq("open".to_string()),
1118        );
1119        predicate.constraints.insert(
1120            amount_idx,
1121            PredicateConstraint::IntRange {
1122                min: Some(10),
1123                max: None,
1124            },
1125        );
1126
1127        let plan = predicate
1128            .choose_index_plan(&model, &specs)
1129            .expect("plan")
1130            .expect("exists");
1131        assert_eq!(specs[plan.spec_idx].name, "status_covering");
1132    }
1133
1134    #[test]
1135    fn choose_index_plan_prefers_zorder_for_multi_column_box_constraints() {
1136        let (model, specs) = zorder_test_model();
1137        let x_idx = *model.columns_by_name.get("x").unwrap();
1138        let y_idx = *model.columns_by_name.get("y").unwrap();
1139        let mut predicate = QueryPredicate::default();
1140        predicate.constraints.insert(
1141            x_idx,
1142            PredicateConstraint::IntRange {
1143                min: Some(1),
1144                max: Some(2),
1145            },
1146        );
1147        predicate.constraints.insert(
1148            y_idx,
1149            PredicateConstraint::IntRange {
1150                min: Some(1),
1151                max: Some(2),
1152            },
1153        );
1154
1155        let plan = predicate
1156            .choose_index_plan(&model, &specs)
1157            .expect("plan")
1158            .expect("exists");
1159        assert_eq!(specs[plan.spec_idx].name, "xy_z");
1160        assert_eq!(specs[plan.spec_idx].layout, IndexLayout::ZOrder);
1161        assert_eq!(plan.constrained_column_count, 2);
1162    }
1163
1164    #[test]
1165    fn secondary_index_key_round_trip() {
1166        let (model, specs) = test_model();
1167        let row = KvRow {
1168            values: vec![
1169                CellValue::Utf8("us-east".to_string()),
1170                CellValue::Int64(42),
1171                CellValue::Int64(9001),
1172                CellValue::Int64(1500),
1173                CellValue::Utf8("open".to_string()),
1174            ],
1175        };
1176        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1177            .expect("encode");
1178        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1179            .expect("decode");
1180        let region_idx = *model.columns_by_name.get("region").unwrap();
1181        let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1182        assert!(matches!(
1183            decoded.values.get(&region_idx),
1184            Some(CellValue::Utf8(v)) if v == "us-east"
1185        ));
1186        assert!(matches!(
1187            decoded.values.get(&customer_idx),
1188            Some(CellValue::Int64(v)) if *v == 42
1189        ));
1190        assert!(matches!(
1191            &decoded.primary_key_values[0],
1192            CellValue::Int64(9001)
1193        ));
1194        let expected_pk = encode_primary_key_from_row(model.table_prefix, &row, &model)
1195            .expect("primary key should encode");
1196        assert_eq!(decoded.primary_key, expected_pk);
1197    }
1198
1199    #[test]
1200    fn zorder_secondary_index_key_round_trip() {
1201        let (model, specs) = zorder_test_model();
1202        let row = KvRow {
1203            values: vec![
1204                CellValue::Int64(2),
1205                CellValue::Int64(1),
1206                CellValue::Int64(42),
1207                CellValue::Int64(900),
1208            ],
1209        };
1210        let key = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1211            .expect("encode");
1212        let decoded = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key)
1213            .expect("decode");
1214        let x_idx = *model.columns_by_name.get("x").unwrap();
1215        let y_idx = *model.columns_by_name.get("y").unwrap();
1216        assert!(matches!(
1217            decoded.values.get(&x_idx),
1218            Some(CellValue::Int64(v)) if *v == 2
1219        ));
1220        assert!(matches!(
1221            decoded.values.get(&y_idx),
1222            Some(CellValue::Int64(v)) if *v == 1
1223        ));
1224        assert!(matches!(
1225            &decoded.primary_key_values[0],
1226            CellValue::Int64(42)
1227        ));
1228    }
1229
1230    #[test]
1231    fn table_config_supports_non_orders_schema() {
1232        let config = KvTableConfig::new(
1233            0,
1234            vec![
1235                TableColumnConfig::new("tenant", DataType::Utf8, false),
1236                TableColumnConfig::new("id", DataType::Int64, false),
1237                TableColumnConfig::new("score", DataType::Int64, false),
1238            ],
1239            vec!["id".to_string()],
1240            vec![IndexSpec::new(
1241                "tenant_score",
1242                vec!["tenant".to_string(), "score".to_string()],
1243            )
1244            .expect("valid")],
1245        )
1246        .expect("schema agnostic config should be valid");
1247        assert_eq!(config.primary_key_columns, vec!["id".to_string()]);
1248        assert_eq!(config.columns.len(), 3);
1249    }
1250
1251    #[test]
1252    fn table_config_accepts_float64_column() {
1253        let config = KvTableConfig::new(
1254            0,
1255            vec![
1256                TableColumnConfig::new("id", DataType::Int64, false),
1257                TableColumnConfig::new("price", DataType::Float64, false),
1258            ],
1259            vec!["id".to_string()],
1260            vec![],
1261        )
1262        .expect("Float64 column should be accepted");
1263        assert_eq!(config.columns.len(), 2);
1264    }
1265
1266    #[test]
1267    fn table_config_accepts_boolean_column() {
1268        let config = KvTableConfig::new(
1269            0,
1270            vec![
1271                TableColumnConfig::new("id", DataType::Int64, false),
1272                TableColumnConfig::new("active", DataType::Boolean, false),
1273            ],
1274            vec!["id".to_string()],
1275            vec![],
1276        )
1277        .expect("Boolean column should be accepted");
1278        assert_eq!(config.columns.len(), 2);
1279    }
1280
1281    #[test]
1282    fn build_projected_batch_uses_large_utf8_type() {
1283        let config = KvTableConfig::new(
1284            0,
1285            vec![
1286                TableColumnConfig::new("id", DataType::Int64, false),
1287                TableColumnConfig::new("name", DataType::LargeUtf8, false),
1288            ],
1289            vec!["id".to_string()],
1290            vec![],
1291        )
1292        .unwrap();
1293        let model = TableModel::from_config(&config).unwrap();
1294        let rows = vec![KvRow {
1295            values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1296        }];
1297        let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1298        assert_eq!(batch.column(1).data_type(), &DataType::LargeUtf8);
1299        let values = batch
1300            .column(1)
1301            .as_any()
1302            .downcast_ref::<LargeStringArray>()
1303            .expect("must build LargeStringArray");
1304        assert_eq!(values.value(0), "hello");
1305    }
1306
1307    #[test]
1308    fn build_projected_batch_uses_utf8_view_type() {
1309        let config = KvTableConfig::new(
1310            0,
1311            vec![
1312                TableColumnConfig::new("id", DataType::Int64, false),
1313                TableColumnConfig::new("name", DataType::Utf8View, false),
1314            ],
1315            vec!["id".to_string()],
1316            vec![],
1317        )
1318        .unwrap();
1319        let model = TableModel::from_config(&config).unwrap();
1320        let rows = vec![KvRow {
1321            values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1322        }];
1323        let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1324        assert_eq!(batch.column(1).data_type(), &DataType::Utf8View);
1325        let values = batch
1326            .column(1)
1327            .as_any()
1328            .downcast_ref::<StringViewArray>()
1329            .expect("must build StringViewArray");
1330        assert_eq!(values.value(0), "hello");
1331    }
1332
1333    #[test]
1334    fn f64_ordered_encoding_preserves_order() {
1335        let values = [
1336            f64::NEG_INFINITY,
1337            f64::MIN,
1338            -1000.0,
1339            -1.0,
1340            -0.001,
1341            0.0,
1342            0.001,
1343            1.0,
1344            1000.0,
1345            f64::MAX,
1346            f64::INFINITY,
1347        ];
1348        let encoded: Vec<[u8; 8]> = values.iter().map(|v| encode_f64_ordered(*v)).collect();
1349        for i in 0..encoded.len() - 1 {
1350            assert!(
1351                encoded[i] < encoded[i + 1],
1352                "encode_f64_ordered({}) >= encode_f64_ordered({})",
1353                values[i],
1354                values[i + 1]
1355            );
1356        }
1357    }
1358
1359    #[test]
1360    fn f64_ordered_encoding_round_trip() {
1361        let values = [
1362            f64::MIN,
1363            -42.5,
1364            -0.0,
1365            0.0,
1366            3.125,
1367            f64::MAX,
1368            f64::INFINITY,
1369            f64::NEG_INFINITY,
1370        ];
1371        for v in values {
1372            let encoded = encode_f64_ordered(v);
1373            let decoded = decode_f64_ordered(encoded);
1374            assert!(
1375                v.to_bits() == decoded.to_bits(),
1376                "round-trip failed for {v}: got {decoded}"
1377            );
1378        }
1379    }
1380
1381    fn mixed_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1382        let config = KvTableConfig::new(
1383            0,
1384            vec![
1385                TableColumnConfig::new("id", DataType::Int64, false),
1386                TableColumnConfig::new("label", DataType::Utf8, false),
1387                TableColumnConfig::new("score", DataType::Float64, false),
1388                TableColumnConfig::new("active", DataType::Boolean, false),
1389            ],
1390            vec!["id".to_string()],
1391            vec![
1392                IndexSpec::new(
1393                    "active_score",
1394                    vec!["active".to_string(), "score".to_string()],
1395                )
1396                .expect("valid"),
1397                IndexSpec::new("label_idx", vec!["label".to_string()]).expect("valid"),
1398            ],
1399        )
1400        .expect("valid config");
1401        let model = TableModel::from_config(&config).expect("model");
1402        let specs = model
1403            .resolve_index_specs(&config.index_specs)
1404            .expect("specs");
1405        (model, specs)
1406    }
1407
1408    #[test]
1409    fn secondary_index_key_round_trip_with_float64_and_boolean() {
1410        let (model, specs) = mixed_model();
1411        let row = KvRow {
1412            values: vec![
1413                CellValue::Int64(100),
1414                CellValue::Utf8("hello".to_string()),
1415                CellValue::Float64(3.125),
1416                CellValue::Boolean(true),
1417            ],
1418        };
1419        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1420            .expect("encode");
1421        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1422            .expect("decode");
1423        let active_idx = *model.columns_by_name.get("active").unwrap();
1424        let score_idx = *model.columns_by_name.get("score").unwrap();
1425        assert!(matches!(
1426            decoded.values.get(&active_idx),
1427            Some(CellValue::Boolean(true))
1428        ));
1429        assert!(
1430            matches!(decoded.values.get(&score_idx), Some(CellValue::Float64(v)) if (*v - 3.125).abs() < f64::EPSILON)
1431        );
1432        assert!(matches!(
1433            &decoded.primary_key_values[0],
1434            CellValue::Int64(100)
1435        ));
1436    }
1437
1438    #[test]
1439    fn base_row_round_trip_with_float64_and_boolean() {
1440        let (model, _specs) = mixed_model();
1441        let row = KvRow {
1442            values: vec![
1443                CellValue::Int64(42),
1444                CellValue::Utf8("world".to_string()),
1445                CellValue::Float64(-99.5),
1446                CellValue::Boolean(false),
1447            ],
1448        };
1449        let encoded = encode_base_row_value(&row, &model).expect("encode");
1450        let decoded =
1451            decode_base_row(vec![CellValue::Int64(42)], &encoded, &model).expect("decode");
1452        assert!(matches!(&decoded.values[0], CellValue::Int64(42)));
1453        assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "world"));
1454        assert!(
1455            matches!(&decoded.values[2], CellValue::Float64(v) if (*v - (-99.5)).abs() < f64::EPSILON)
1456        );
1457        assert!(matches!(&decoded.values[3], CellValue::Boolean(false)));
1458    }
1459
1460    #[test]
1461    fn predicate_bool_eq_matches() {
1462        let (model, _specs) = mixed_model();
1463        let active_idx = *model.columns_by_name.get("active").unwrap();
1464        let mut pred = QueryPredicate::default();
1465        pred.constraints
1466            .insert(active_idx, PredicateConstraint::BoolEq(true));
1467        let row_true = KvRow {
1468            values: vec![
1469                CellValue::Int64(1),
1470                CellValue::Utf8("a".to_string()),
1471                CellValue::Float64(1.0),
1472                CellValue::Boolean(true),
1473            ],
1474        };
1475        let row_false = KvRow {
1476            values: vec![
1477                CellValue::Int64(2),
1478                CellValue::Utf8("b".to_string()),
1479                CellValue::Float64(2.0),
1480                CellValue::Boolean(false),
1481            ],
1482        };
1483        assert!(pred.matches_row(&row_true));
1484        assert!(!pred.matches_row(&row_false));
1485    }
1486
1487    #[test]
1488    fn predicate_float_range_matches() {
1489        let (model, _specs) = mixed_model();
1490        let score_idx = *model.columns_by_name.get("score").unwrap();
1491        let mut pred = QueryPredicate::default();
1492        pred.constraints.insert(
1493            score_idx,
1494            PredicateConstraint::FloatRange {
1495                min: Some((2.0, true)),
1496                max: Some((5.0, false)),
1497            },
1498        );
1499        let make_row = |score: f64| KvRow {
1500            values: vec![
1501                CellValue::Int64(1),
1502                CellValue::Utf8("a".to_string()),
1503                CellValue::Float64(score),
1504                CellValue::Boolean(true),
1505            ],
1506        };
1507        assert!(!pred.matches_row(&make_row(1.99)));
1508        assert!(pred.matches_row(&make_row(2.0)));
1509        assert!(pred.matches_row(&make_row(3.5)));
1510        assert!(pred.matches_row(&make_row(4.99)));
1511        assert!(!pred.matches_row(&make_row(5.0)));
1512        assert!(!pred.matches_row(&make_row(5.01)));
1513    }
1514
1515    #[test]
1516    fn float_range_rejects_nan_row_value() {
1517        let constraint = PredicateConstraint::FloatRange {
1518            min: Some((0.0, true)),
1519            max: Some((10.0, true)),
1520        };
1521        assert!(!matches_constraint(
1522            &CellValue::Float64(f64::NAN),
1523            &constraint
1524        ));
1525    }
1526
1527    #[test]
1528    fn index_plan_with_boolean_prefix() {
1529        let (model, specs) = mixed_model();
1530        let active_idx = *model.columns_by_name.get("active").unwrap();
1531        let mut pred = QueryPredicate::default();
1532        pred.constraints
1533            .insert(active_idx, PredicateConstraint::BoolEq(true));
1534        let plan = pred
1535            .choose_index_plan(&model, &specs)
1536            .expect("plan")
1537            .expect("should find index");
1538        assert_eq!(plan.spec_idx, 0);
1539        assert_eq!(plan.constrained_prefix_len, 1);
1540    }
1541
1542    #[test]
1543    fn float_constraint_contradiction() {
1544        let mut lo: Option<(f64, bool)> = None;
1545        let mut hi: Option<(f64, bool)> = None;
1546        let mut contradiction = false;
1547        apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 10.0, &mut contradiction);
1548        assert!(!contradiction);
1549        apply_float_constraint(&mut lo, &mut hi, Operator::Lt, 5.0, &mut contradiction);
1550        assert!(contradiction);
1551    }
1552
1553    #[test]
1554    fn float_constraint_eq_then_range_contradicts() {
1555        let mut lo: Option<(f64, bool)> = None;
1556        let mut hi: Option<(f64, bool)> = None;
1557        let mut contradiction = false;
1558        apply_float_constraint(&mut lo, &mut hi, Operator::Eq, 5.0, &mut contradiction);
1559        assert!(!contradiction);
1560        apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 5.0, &mut contradiction);
1561        assert!(contradiction);
1562    }
1563
1564    #[test]
1565    fn float_nan_literal_comparison_marks_contradiction() {
1566        let config = KvTableConfig::new(
1567            0,
1568            vec![
1569                TableColumnConfig::new("id", DataType::Int64, false),
1570                TableColumnConfig::new("score", DataType::Float64, false),
1571            ],
1572            vec!["id".to_string()],
1573            vec![],
1574        )
1575        .unwrap();
1576        let model = TableModel::from_config(&config).unwrap();
1577
1578        use datafusion::logical_expr::col;
1579        let filter = col("score").gt(Expr::Literal(ScalarValue::Float64(Some(f64::NAN)), None));
1580        assert!(QueryPredicate::supports_filter(&filter, &model));
1581
1582        let pred = QueryPredicate::from_filters(&[filter], &model);
1583        assert!(
1584            pred.contradiction,
1585            "comparison with NaN literal must produce contradiction"
1586        );
1587    }
1588
1589    #[test]
1590    fn table_config_accepts_date32_column() {
1591        let config = KvTableConfig::new(
1592            0,
1593            vec![
1594                TableColumnConfig::new("id", DataType::Int64, false),
1595                TableColumnConfig::new("created", DataType::Date32, false),
1596            ],
1597            vec!["id".to_string()],
1598            vec![],
1599        )
1600        .expect("Date32 column should be accepted");
1601        assert_eq!(config.columns.len(), 2);
1602    }
1603
1604    #[test]
1605    fn table_config_accepts_timestamp_column() {
1606        let config = KvTableConfig::new(
1607            0,
1608            vec![
1609                TableColumnConfig::new("id", DataType::Int64, false),
1610                TableColumnConfig::new(
1611                    "ts",
1612                    DataType::Timestamp(TimeUnit::Microsecond, None),
1613                    false,
1614                ),
1615            ],
1616            vec!["id".to_string()],
1617            vec![],
1618        )
1619        .expect("Timestamp column should be accepted");
1620        let schema = config.to_schema();
1621        assert!(matches!(
1622            schema.field(1).data_type(),
1623            DataType::Timestamp(TimeUnit::Microsecond, _)
1624        ));
1625    }
1626
1627    #[test]
1628    fn table_config_normalizes_timestamp_to_microsecond() {
1629        let config = KvTableConfig::new(
1630            0,
1631            vec![
1632                TableColumnConfig::new("id", DataType::Int64, false),
1633                TableColumnConfig::new(
1634                    "ts",
1635                    DataType::Timestamp(TimeUnit::Nanosecond, None),
1636                    false,
1637                ),
1638            ],
1639            vec!["id".to_string()],
1640            vec![],
1641        )
1642        .expect("Nanosecond timestamp should be accepted");
1643        let schema = config.to_schema();
1644        assert!(matches!(
1645            schema.field(1).data_type(),
1646            DataType::Timestamp(TimeUnit::Microsecond, _)
1647        ));
1648    }
1649
1650    #[test]
1651    fn table_config_accepts_decimal128_column() {
1652        let config = KvTableConfig::new(
1653            0,
1654            vec![
1655                TableColumnConfig::new("id", DataType::Int64, false),
1656                TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1657            ],
1658            vec!["id".to_string()],
1659            vec![],
1660        )
1661        .expect("Decimal128 column should be accepted");
1662        assert_eq!(config.columns.len(), 2);
1663    }
1664
1665    #[test]
1666    fn table_config_accepts_list_column() {
1667        use datafusion::arrow::datatypes::Field;
1668
1669        let config = KvTableConfig::new(
1670            0,
1671            vec![
1672                TableColumnConfig::new("id", DataType::Int64, false),
1673                TableColumnConfig::new(
1674                    "tags",
1675                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1676                    false,
1677                ),
1678            ],
1679            vec!["id".to_string()],
1680            vec![],
1681        )
1682        .expect("List<Utf8> column should be accepted");
1683        assert_eq!(config.columns.len(), 2);
1684    }
1685
1686    #[test]
1687    fn list_column_rejected_in_index() {
1688        use datafusion::arrow::datatypes::Field;
1689
1690        let result = KvTableConfig::new(
1691            0,
1692            vec![
1693                TableColumnConfig::new("id", DataType::Int64, false),
1694                TableColumnConfig::new(
1695                    "tags",
1696                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1697                    false,
1698                ),
1699            ],
1700            vec!["id".to_string()],
1701            vec![IndexSpec::new("tags_idx", vec!["tags".to_string()]).unwrap()],
1702        );
1703        assert!(
1704            result.is_err() || {
1705                let config = result.unwrap();
1706                let model = TableModel::from_config(&config).unwrap();
1707                model.resolve_index_specs(&config.index_specs).is_err()
1708            }
1709        );
1710    }
1711
1712    #[test]
1713    fn i32_ordered_encoding_round_trip() {
1714        let values = [i32::MIN, -1000, -1, 0, 1, 1000, i32::MAX];
1715        for v in values {
1716            assert_eq!(decode_i32_ordered(encode_i32_ordered(v)), v);
1717        }
1718        let encoded: Vec<[u8; 4]> = values.iter().map(|v| encode_i32_ordered(*v)).collect();
1719        for i in 0..encoded.len() - 1 {
1720            assert!(encoded[i] < encoded[i + 1]);
1721        }
1722    }
1723
1724    #[test]
1725    fn i128_ordered_encoding_round_trip() {
1726        let values = [i128::MIN, -1, 0, 1, 1234567890123456789, i128::MAX];
1727        for v in values {
1728            assert_eq!(decode_i128_ordered(encode_i128_ordered(v)), v);
1729        }
1730        let encoded: Vec<[u8; 16]> = values.iter().map(|v| encode_i128_ordered(*v)).collect();
1731        for i in 0..encoded.len() - 1 {
1732            assert!(encoded[i] < encoded[i + 1]);
1733        }
1734    }
1735
1736    fn extended_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1737        let config = KvTableConfig::new(
1738            0,
1739            vec![
1740                TableColumnConfig::new("id", DataType::Int64, false),
1741                TableColumnConfig::new("created", DataType::Date32, false),
1742                TableColumnConfig::new(
1743                    "ts",
1744                    DataType::Timestamp(TimeUnit::Microsecond, None),
1745                    false,
1746                ),
1747                TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1748                TableColumnConfig::new("label", DataType::Utf8, false),
1749            ],
1750            vec!["id".to_string()],
1751            vec![
1752                IndexSpec::new(
1753                    "date_label",
1754                    vec!["created".to_string(), "label".to_string()],
1755                )
1756                .expect("valid"),
1757                IndexSpec::new("price_idx", vec!["price".to_string()]).expect("valid"),
1758            ],
1759        )
1760        .expect("valid config");
1761        let model = TableModel::from_config(&config).expect("model");
1762        let specs = model
1763            .resolve_index_specs(&config.index_specs)
1764            .expect("specs");
1765        (model, specs)
1766    }
1767
1768    #[test]
1769    fn secondary_index_key_round_trip_date32_and_decimal128() {
1770        let (model, specs) = extended_model();
1771        let row = KvRow {
1772            values: vec![
1773                CellValue::Int64(42),
1774                CellValue::Date32(19000),
1775                CellValue::Timestamp(1_700_000_000_000_000),
1776                CellValue::Decimal128(123456),
1777                CellValue::Utf8("hello".to_string()),
1778            ],
1779        };
1780        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1781            .expect("encode");
1782        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1783            .expect("decode");
1784        let created_idx = *model.columns_by_name.get("created").unwrap();
1785        let label_idx = *model.columns_by_name.get("label").unwrap();
1786        assert!(matches!(
1787            decoded.values.get(&created_idx),
1788            Some(CellValue::Date32(19000))
1789        ));
1790        assert!(matches!(
1791            decoded.values.get(&label_idx),
1792            Some(CellValue::Utf8(v)) if v == "hello"
1793        ));
1794        assert!(matches!(
1795            &decoded.primary_key_values[0],
1796            CellValue::Int64(42)
1797        ));
1798
1799        let key2 = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1800            .expect("encode");
1801        let decoded2 = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key2)
1802            .expect("decode");
1803        let price_idx = *model.columns_by_name.get("price").unwrap();
1804        assert!(matches!(
1805            decoded2.values.get(&price_idx),
1806            Some(CellValue::Decimal128(123456))
1807        ));
1808        assert!(matches!(
1809            &decoded2.primary_key_values[0],
1810            CellValue::Int64(42)
1811        ));
1812    }
1813
1814    #[test]
1815    fn base_row_round_trip_with_date32_timestamp_decimal128() {
1816        let (model, _specs) = extended_model();
1817        let row = KvRow {
1818            values: vec![
1819                CellValue::Int64(7),
1820                CellValue::Date32(19500),
1821                CellValue::Timestamp(1_700_000_000_000_000),
1822                CellValue::Decimal128(-9876543),
1823                CellValue::Utf8("world".to_string()),
1824            ],
1825        };
1826        let encoded = encode_base_row_value(&row, &model).expect("encode");
1827        let decoded = decode_base_row(vec![CellValue::Int64(7)], &encoded, &model).expect("decode");
1828        assert!(matches!(&decoded.values[0], CellValue::Int64(7)));
1829        assert!(matches!(&decoded.values[1], CellValue::Date32(19500)));
1830        assert!(matches!(
1831            &decoded.values[2],
1832            CellValue::Timestamp(1_700_000_000_000_000)
1833        ));
1834        assert!(matches!(
1835            &decoded.values[3],
1836            CellValue::Decimal128(-9876543)
1837        ));
1838        assert!(matches!(&decoded.values[4], CellValue::Utf8(v) if v == "world"));
1839    }
1840
1841    #[test]
1842    fn base_row_round_trip_with_list() {
1843        use datafusion::arrow::datatypes::Field;
1844
1845        let config = KvTableConfig::new(
1846            0,
1847            vec![
1848                TableColumnConfig::new("id", DataType::Int64, false),
1849                TableColumnConfig::new(
1850                    "tags",
1851                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1852                    false,
1853                ),
1854                TableColumnConfig::new(
1855                    "scores",
1856                    DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
1857                    false,
1858                ),
1859            ],
1860            vec!["id".to_string()],
1861            vec![],
1862        )
1863        .expect("valid");
1864        let model = TableModel::from_config(&config).expect("model");
1865        let row = KvRow {
1866            values: vec![
1867                CellValue::Int64(1),
1868                CellValue::List(vec![
1869                    CellValue::Utf8("a".to_string()),
1870                    CellValue::Utf8("b".to_string()),
1871                ]),
1872                CellValue::List(vec![CellValue::Int64(10), CellValue::Int64(20)]),
1873            ],
1874        };
1875        let encoded = encode_base_row_value(&row, &model).expect("encode");
1876        let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).expect("decode");
1877        assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
1878        match &decoded.values[1] {
1879            CellValue::List(items) => {
1880                assert_eq!(items.len(), 2);
1881                assert!(matches!(&items[0], CellValue::Utf8(v) if v == "a"));
1882                assert!(matches!(&items[1], CellValue::Utf8(v) if v == "b"));
1883            }
1884            _ => panic!("expected List"),
1885        }
1886        match &decoded.values[2] {
1887            CellValue::List(items) => {
1888                assert_eq!(items.len(), 2);
1889                assert!(matches!(&items[0], CellValue::Int64(10)));
1890                assert!(matches!(&items[1], CellValue::Int64(20)));
1891            }
1892            _ => panic!("expected List"),
1893        }
1894    }
1895
1896    #[test]
1897    fn decimal128_constraint_range() {
1898        let mut min: Option<i128> = None;
1899        let mut max: Option<i128> = None;
1900        let mut contradiction = false;
1901        apply_decimal128_constraint(&mut min, &mut max, Operator::GtEq, 100, &mut contradiction);
1902        assert!(!contradiction);
1903        apply_decimal128_constraint(&mut min, &mut max, Operator::LtEq, 200, &mut contradiction);
1904        assert!(!contradiction);
1905        assert_eq!(min, Some(100));
1906        assert_eq!(max, Some(200));
1907        assert!(in_i128_bounds(150, min, max));
1908        assert!(!in_i128_bounds(99, min, max));
1909        assert!(!in_i128_bounds(201, min, max));
1910    }
1911
1912    #[test]
1913    fn decimal256_gt_max_is_contradiction() {
1914        let mut min: Option<i256> = None;
1915        let mut max: Option<i256> = None;
1916        let mut contradiction = false;
1917        apply_i256_constraint(
1918            &mut min,
1919            &mut max,
1920            Operator::Gt,
1921            i256::MAX,
1922            &mut contradiction,
1923        );
1924        assert!(contradiction);
1925        assert_eq!(min, None);
1926        assert_eq!(max, None);
1927    }
1928
1929    #[test]
1930    fn decimal256_lt_min_is_contradiction() {
1931        let mut min: Option<i256> = None;
1932        let mut max: Option<i256> = None;
1933        let mut contradiction = false;
1934        apply_i256_constraint(
1935            &mut min,
1936            &mut max,
1937            Operator::Lt,
1938            i256::MIN,
1939            &mut contradiction,
1940        );
1941        assert!(contradiction);
1942        assert_eq!(min, None);
1943        assert_eq!(max, None);
1944    }
1945
1946    #[test]
1947    fn date32_index_bound_clamps_on_i64_overflow() {
1948        let config = KvTableConfig::new(
1949            0,
1950            vec![
1951                TableColumnConfig::new("id", DataType::Int64, false),
1952                TableColumnConfig::new("created", DataType::Date32, false),
1953            ],
1954            vec!["id".to_string()],
1955            vec![IndexSpec::new("created_idx", vec!["created".to_string()]).unwrap()],
1956        )
1957        .unwrap();
1958        let model = TableModel::from_config(&config).unwrap();
1959        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
1960
1961        let created_idx = *model.columns_by_name.get("created").unwrap();
1962        let mut pred = QueryPredicate::default();
1963        pred.constraints.insert(
1964            created_idx,
1965            PredicateConstraint::IntRange {
1966                min: Some(i32::MAX as i64 + 1),
1967                max: None,
1968            },
1969        );
1970
1971        let start = pred
1972            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, false)
1973            .unwrap();
1974        let end = pred
1975            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, true)
1976            .unwrap();
1977
1978        assert!(
1979            start <= end,
1980            "lower bound must not exceed upper bound (was wrapping via as i32)"
1981        );
1982
1983        let encoded_lower = specs[0].codec.read_payload_exact::<4>(&start, 0).unwrap();
1984        let decoded_lower = decode_i32_ordered(encoded_lower);
1985        assert_eq!(
1986            decoded_lower,
1987            i32::MAX,
1988            "out-of-range i64 must clamp to i32::MAX, not wrap"
1989        );
1990    }
1991
1992    #[test]
1993    fn timestamp_nanos_gt_uses_floor_division() {
1994        let micros = timestamp_scalar_to_micros_for_op(
1995            &ScalarValue::TimestampNanosecond(Some(-1500), None),
1996            Operator::Gt,
1997        )
1998        .unwrap();
1999        assert_eq!(micros, -2, "Gt on -1500ns should floor to -2us");
2000
2001        let mut min: Option<i64> = None;
2002        let mut max: Option<i64> = None;
2003        let mut contradiction = false;
2004        apply_int_constraint(&mut min, &mut max, Operator::Gt, micros, &mut contradiction);
2005        assert_eq!(min, Some(-1), "Gt(-2us) + 1 = min -1us");
2006
2007        let row_at_minus_1 = CellValue::Timestamp(-1);
2008        assert!(matches_constraint(
2009            &row_at_minus_1,
2010            &PredicateConstraint::IntRange { min, max }
2011        ));
2012    }
2013
2014    #[test]
2015    fn timestamp_nanos_lteq_uses_floor_division() {
2016        let micros = timestamp_scalar_to_micros_for_op(
2017            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2018            Operator::LtEq,
2019        )
2020        .unwrap();
2021        assert_eq!(micros, -2, "LtEq on -1500ns should floor to -2us");
2022
2023        let row_at_minus_1 = CellValue::Timestamp(-1);
2024        assert!(
2025            !matches_constraint(
2026                &row_at_minus_1,
2027                &PredicateConstraint::IntRange {
2028                    min: None,
2029                    max: Some(micros)
2030                }
2031            ),
2032            "-1us (-1000ns) > -1500ns, must not satisfy <= -1500ns"
2033        );
2034    }
2035
2036    #[test]
2037    fn timestamp_nanos_gteq_uses_ceil_division() {
2038        let micros = timestamp_scalar_to_micros_for_op(
2039            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2040            Operator::GtEq,
2041        )
2042        .unwrap();
2043        assert_eq!(micros, -1, "GtEq on -1500ns should ceil to -1us");
2044    }
2045
2046    #[test]
2047    fn timestamp_nanos_lt_uses_ceil_division() {
2048        let micros = timestamp_scalar_to_micros_for_op(
2049            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2050            Operator::Lt,
2051        )
2052        .unwrap();
2053        assert_eq!(micros, -1, "Lt on -1500ns should ceil to -1us");
2054
2055        let mut min: Option<i64> = None;
2056        let mut max: Option<i64> = None;
2057        let mut contradiction = false;
2058        apply_int_constraint(&mut min, &mut max, Operator::Lt, micros, &mut contradiction);
2059        assert_eq!(max, Some(-2), "Lt(-1us) - 1 = max -2us");
2060    }
2061
2062    #[test]
2063    fn timestamp_nanos_eq_non_aligned_is_contradiction() {
2064        let result = timestamp_scalar_to_micros_for_op(
2065            &ScalarValue::TimestampNanosecond(Some(-1500), None),
2066            Operator::Eq,
2067        );
2068        assert!(
2069            result.is_none(),
2070            "non-aligned ns Eq must produce contradiction"
2071        );
2072    }
2073
2074    #[test]
2075    fn timestamp_nanos_exact_multiple_is_unchanged() {
2076        for op in [
2077            Operator::Eq,
2078            Operator::Gt,
2079            Operator::GtEq,
2080            Operator::Lt,
2081            Operator::LtEq,
2082        ] {
2083            let micros = timestamp_scalar_to_micros_for_op(
2084                &ScalarValue::TimestampNanosecond(Some(-2000), None),
2085                op,
2086            )
2087            .unwrap();
2088            assert_eq!(micros, -2, "exact multiple -2000ns = -2us for {op:?}");
2089        }
2090    }
2091
2092    #[test]
2093    fn float64_index_bounds_include_infinity() {
2094        let config = KvTableConfig::new(
2095            0,
2096            vec![
2097                TableColumnConfig::new("id", DataType::Int64, false),
2098                TableColumnConfig::new("val", DataType::Float64, false),
2099            ],
2100            vec!["id".to_string()],
2101            vec![IndexSpec::new("val_idx", vec!["val".to_string()]).unwrap()],
2102        )
2103        .unwrap();
2104        let model = TableModel::from_config(&config).unwrap();
2105        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
2106
2107        let pred = QueryPredicate::default();
2108        let start = pred
2109            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, false)
2110            .unwrap();
2111        let end = pred
2112            .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, true)
2113            .unwrap();
2114
2115        let neg_inf_row = KvRow {
2116            values: vec![CellValue::Int64(1), CellValue::Float64(f64::NEG_INFINITY)],
2117        };
2118        let pos_inf_row = KvRow {
2119            values: vec![CellValue::Int64(2), CellValue::Float64(f64::INFINITY)],
2120        };
2121
2122        let neg_inf_key =
2123            encode_secondary_index_key(model.table_prefix, &specs[0], &model, &neg_inf_row)
2124                .unwrap();
2125        let pos_inf_key =
2126            encode_secondary_index_key(model.table_prefix, &specs[0], &model, &pos_inf_row)
2127                .unwrap();
2128
2129        assert!(
2130            neg_inf_key >= start,
2131            "NEG_INFINITY row key must be within scan start bound"
2132        );
2133        assert!(
2134            pos_inf_key <= end,
2135            "INFINITY row key must be within scan end bound"
2136        );
2137    }
2138
2139    #[test]
2140    fn distinct_table_prefixes_produce_non_overlapping_pk_ranges() {
2141        let range_a = primary_key_prefix_range(1);
2142        let range_b = primary_key_prefix_range(2);
2143        assert!(
2144            range_a.end < range_b.start,
2145            "table prefix 1 pk range must be entirely below table prefix 2"
2146        );
2147    }
2148
2149    #[test]
2150    fn distinct_table_prefixes_isolate_primary_keys() {
2151        let model_1 = simple_int64_model(1);
2152        let model_2 = simple_int64_model(2);
2153        let pk = CellValue::Int64(42);
2154        let key_a = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2155        let key_b = encode_primary_key(2, &[&pk], &model_2).expect("pk key encodes");
2156        assert_ne!(key_a, key_b, "same PK under different prefixes must differ");
2157        assert!(
2158            decode_primary_key(1, &key_a, &model_1).is_some(),
2159            "key_a must decode under prefix 1"
2160        );
2161        assert!(
2162            decode_primary_key(2, &key_a, &model_2).is_none(),
2163            "key_a must NOT decode under prefix 2"
2164        );
2165        assert!(
2166            decode_primary_key(2, &key_b, &model_2).is_some(),
2167            "key_b must decode under prefix 2"
2168        );
2169    }
2170
2171    #[test]
2172    fn distinct_table_prefixes_isolate_secondary_keys() {
2173        let config_a = KvTableConfig::new(
2174            10,
2175            vec![
2176                TableColumnConfig::new("id", DataType::Int64, false),
2177                TableColumnConfig::new("name", DataType::Utf8, false),
2178            ],
2179            vec!["id".to_string()],
2180            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2181        )
2182        .unwrap();
2183        let config_b = KvTableConfig::new(
2184            11,
2185            vec![
2186                TableColumnConfig::new("id", DataType::Int64, false),
2187                TableColumnConfig::new("name", DataType::Utf8, false),
2188            ],
2189            vec!["id".to_string()],
2190            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2191        )
2192        .unwrap();
2193
2194        let model_a = TableModel::from_config(&config_a).unwrap();
2195        let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2196        let model_b = TableModel::from_config(&config_b).unwrap();
2197        let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2198
2199        let row = KvRow {
2200            values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2201        };
2202        let key_a =
2203            encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2204        let key_b =
2205            encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2206
2207        assert_ne!(
2208            key_a, key_b,
2209            "same row under different prefixes must differ"
2210        );
2211        assert!(
2212            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_a)
2213                .is_some()
2214        );
2215        assert!(
2216            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2217                .is_none(),
2218            "key from table B must not decode under table A's prefix"
2219        );
2220    }
2221
2222    #[test]
2223    fn table_prefix_stored_in_model() {
2224        let config = KvTableConfig::new(
2225            12,
2226            vec![TableColumnConfig::new("id", DataType::Int64, false)],
2227            vec!["id".to_string()],
2228            vec![],
2229        )
2230        .unwrap();
2231        let model = TableModel::from_config(&config).unwrap();
2232        assert_eq!(model.table_prefix, 12);
2233    }
2234
2235    #[test]
2236    fn codec_layout_exposes_payload_bits_under_reserved_family_bits() {
2237        let config = KvTableConfig::new(
2238            0,
2239            vec![
2240                TableColumnConfig::new("id", DataType::FixedSizeBinary(16), false),
2241                TableColumnConfig::new("bucket", DataType::FixedSizeBinary(16), false),
2242            ],
2243            vec!["id".to_string()],
2244            vec![IndexSpec::new("bucket_idx", vec!["bucket".to_string()]).unwrap()],
2245        )
2246        .unwrap();
2247        let model = TableModel::from_config(&config).unwrap();
2248        let spec = model
2249            .resolve_index_specs(&config.index_specs)
2250            .unwrap()
2251            .remove(0);
2252
2253        let mut current_primary = HashSet::new();
2254        let mut current_secondary = HashSet::new();
2255
2256        fn first_twelve_bits_of_key(key: &[u8]) -> u16 {
2257            let first = u16::from(*key.first().unwrap_or(&0));
2258            let second = u16::from(*key.get(1).unwrap_or(&0));
2259            (first << 4) | (second >> 4)
2260        }
2261
2262        for first_byte in 0u8..=255 {
2263            let mut id = vec![0u8; 16];
2264            id[0] = first_byte;
2265            let mut bucket = vec![0u8; 16];
2266            bucket[0] = first_byte;
2267
2268            let pk = CellValue::FixedBinary(id.clone());
2269            let current_pk = encode_primary_key(model.table_prefix, &[&pk], &model).unwrap();
2270            current_primary.insert(first_twelve_bits_of_key(&current_pk));
2271
2272            let row = KvRow {
2273                values: vec![
2274                    CellValue::FixedBinary(id),
2275                    CellValue::FixedBinary(bucket.clone()),
2276                ],
2277            };
2278            let current_index =
2279                encode_secondary_index_key(model.table_prefix, &spec, &model, &row).unwrap();
2280            current_secondary.insert(first_twelve_bits_of_key(&current_index));
2281        }
2282
2283        // Primary keys reserve 5 high bits for family, leaving 7 payload bits in the first
2284        // 12 bits of the physical key. Varying one payload byte therefore spans 2^7 values.
2285        assert_eq!(current_primary.len(), 128);
2286
2287        // Secondary index keys reserve 9 high bits for family, leaving 3 payload bits in the
2288        // first 12 bits. Varying one payload byte therefore spans 2^3 values.
2289        assert_eq!(current_secondary.len(), 8);
2290    }
2291
2292    #[test]
2293    fn kv_schema_auto_assigns_sequential_prefixes() {
2294        let client = StoreClient::new("http://localhost:10000");
2295        let schema = KvSchema::new(client)
2296            .table(
2297                "alpha",
2298                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2299                vec!["id".to_string()],
2300                vec![],
2301            )
2302            .unwrap()
2303            .table(
2304                "beta",
2305                vec![
2306                    TableColumnConfig::new("id", DataType::Int64, false),
2307                    TableColumnConfig::new("name", DataType::Utf8, false),
2308                ],
2309                vec!["id".to_string()],
2310                vec![],
2311            )
2312            .unwrap()
2313            .table(
2314                "gamma",
2315                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2316                vec!["id".to_string()],
2317                vec![],
2318            )
2319            .unwrap();
2320
2321        assert_eq!(schema.table_count(), 3);
2322    }
2323
2324    #[test]
2325    fn kv_schema_allows_max_codec_table_count_and_rejects_overflow() {
2326        let client = StoreClient::new("http://localhost:10000");
2327        let mut schema = KvSchema::new(client);
2328        for idx in 0..MAX_TABLES {
2329            schema = schema
2330                .table(
2331                    format!("t{idx}"),
2332                    vec![TableColumnConfig::new("id", DataType::Int64, false)],
2333                    vec!["id".to_string()],
2334                    vec![],
2335                )
2336                .expect("tables up to codec capacity should be accepted");
2337        }
2338        assert_eq!(schema.table_count(), MAX_TABLES);
2339
2340        let overflow = schema.table(
2341            "overflow",
2342            vec![TableColumnConfig::new("id", DataType::Int64, false)],
2343            vec!["id".to_string()],
2344            vec![],
2345        );
2346        match overflow {
2347            Ok(_) => panic!("overflow table should be rejected"),
2348            Err(err) => assert!(
2349                err.contains(&format!(
2350                    "too many tables for codec layout (max {MAX_TABLES})"
2351                )),
2352                "overflow table should be rejected with codec-capacity error"
2353            ),
2354        }
2355    }
2356
2357    #[test]
2358    fn sequential_prefixes_produce_non_overlapping_pk_ranges() {
2359        let range_a = primary_key_prefix_range(0);
2360        let range_b = primary_key_prefix_range(1);
2361        let range_c = primary_key_prefix_range(2);
2362        assert!(range_a.end < range_b.start);
2363        assert!(range_b.end < range_c.start);
2364    }
2365
2366    #[test]
2367    fn sequential_prefixes_isolate_primary_keys() {
2368        let model_0 = simple_int64_model(0);
2369        let model_1 = simple_int64_model(1);
2370        let pk = CellValue::Int64(42);
2371        let key_a = encode_primary_key(0, &[&pk], &model_0).expect("pk key encodes");
2372        let key_b = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2373        assert_ne!(key_a, key_b);
2374        assert!(decode_primary_key(0, &key_a, &model_0).is_some());
2375        assert!(decode_primary_key(1, &key_a, &model_1).is_none());
2376        assert!(decode_primary_key(0, &key_b, &model_0).is_none());
2377        assert!(decode_primary_key(1, &key_b, &model_1).is_some());
2378    }
2379
2380    #[test]
2381    fn sequential_prefixes_isolate_secondary_keys() {
2382        let config_a = KvTableConfig::new(
2383            0,
2384            vec![
2385                TableColumnConfig::new("id", DataType::Int64, false),
2386                TableColumnConfig::new("name", DataType::Utf8, false),
2387            ],
2388            vec!["id".to_string()],
2389            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2390        )
2391        .unwrap();
2392        let config_b = KvTableConfig::new(
2393            1,
2394            vec![
2395                TableColumnConfig::new("id", DataType::Int64, false),
2396                TableColumnConfig::new("name", DataType::Utf8, false),
2397            ],
2398            vec!["id".to_string()],
2399            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2400        )
2401        .unwrap();
2402
2403        let model_a = TableModel::from_config(&config_a).unwrap();
2404        let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2405        let model_b = TableModel::from_config(&config_b).unwrap();
2406        let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2407
2408        let row = KvRow {
2409            values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2410        };
2411        let key_a =
2412            encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2413        let key_b =
2414            encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2415        assert_ne!(key_a, key_b);
2416        assert!(
2417            decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2418                .is_none(),
2419            "key from prefix 1 must not decode under prefix 0"
2420        );
2421    }
2422
2423    #[tokio::test]
2424    async fn kv_schema_register_all_enables_join() {
2425        let ctx = SessionContext::new();
2426        let client = StoreClient::new("http://localhost:10000");
2427
2428        let result = KvSchema::new(client)
2429            .table(
2430                "customers",
2431                vec![
2432                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2433                    TableColumnConfig::new("name", DataType::Utf8, false),
2434                ],
2435                vec!["customer_id".to_string()],
2436                vec![],
2437            )
2438            .unwrap()
2439            .table(
2440                "orders",
2441                vec![
2442                    TableColumnConfig::new("order_id", DataType::Int64, false),
2443                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2444                    TableColumnConfig::new("amount", DataType::Int64, false),
2445                ],
2446                vec!["order_id".to_string()],
2447                vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2448            )
2449            .unwrap()
2450            .register_all(&ctx);
2451
2452        assert!(
2453            result.is_ok(),
2454            "register_all must succeed: {:?}",
2455            result.err()
2456        );
2457
2458        let plan = ctx
2459            .sql(
2460                "SELECT c.name, o.order_id, o.amount \
2461                 FROM orders o \
2462                 JOIN customers c ON o.customer_id = c.customer_id",
2463            )
2464            .await;
2465        assert!(
2466            plan.is_ok(),
2467            "JOIN query must plan successfully: {:?}",
2468            plan.err()
2469        );
2470    }
2471
2472    #[tokio::test]
2473    async fn kv_schema_three_way_join() {
2474        let ctx = SessionContext::new();
2475        let client = StoreClient::new("http://localhost:10000");
2476
2477        KvSchema::new(client)
2478            .table(
2479                "products",
2480                vec![
2481                    TableColumnConfig::new("product_id", DataType::Int64, false),
2482                    TableColumnConfig::new("name", DataType::Utf8, false),
2483                    TableColumnConfig::new("price", DataType::Int64, false),
2484                ],
2485                vec!["product_id".to_string()],
2486                vec![],
2487            )
2488            .unwrap()
2489            .table(
2490                "line_items",
2491                vec![
2492                    TableColumnConfig::new("item_id", DataType::Int64, false),
2493                    TableColumnConfig::new("order_id", DataType::Int64, false),
2494                    TableColumnConfig::new("product_id", DataType::Int64, false),
2495                    TableColumnConfig::new("qty", DataType::Int64, false),
2496                ],
2497                vec!["item_id".to_string()],
2498                vec![
2499                    IndexSpec::new("prod_idx", vec!["product_id".to_string()]).unwrap(),
2500                    IndexSpec::new("order_idx", vec!["order_id".to_string()]).unwrap(),
2501                ],
2502            )
2503            .unwrap()
2504            .table(
2505                "orders",
2506                vec![
2507                    TableColumnConfig::new("order_id", DataType::Int64, false),
2508                    TableColumnConfig::new("customer", DataType::Utf8, false),
2509                ],
2510                vec!["order_id".to_string()],
2511                vec![],
2512            )
2513            .unwrap()
2514            .register_all(&ctx)
2515            .unwrap();
2516
2517        let plan = ctx
2518            .sql(
2519                "SELECT o.customer, p.name, li.qty \
2520                 FROM line_items li \
2521                 JOIN products p ON li.product_id = p.product_id \
2522                 JOIN orders o ON li.order_id = o.order_id",
2523            )
2524            .await;
2525        assert!(plan.is_ok(), "three-way JOIN must plan: {:?}", plan.err());
2526    }
2527
2528    #[test]
2529    fn kv_schema_orders_table_convenience() {
2530        let client = StoreClient::new("http://localhost:10000");
2531        let schema = KvSchema::new(client)
2532            .orders_table(
2533                "my_orders",
2534                vec![IndexSpec::new(
2535                    "region_customer",
2536                    vec!["region".to_string(), "customer_id".to_string()],
2537                )
2538                .unwrap()],
2539            )
2540            .unwrap();
2541        assert_eq!(schema.table_count(), 1);
2542    }
2543
2544    #[test]
2545    fn nullable_column_accepted_in_config() {
2546        let config = KvTableConfig::new(
2547            0,
2548            vec![
2549                TableColumnConfig::new("id", DataType::Int64, false),
2550                TableColumnConfig::new("name", DataType::Utf8, true),
2551            ],
2552            vec!["id".to_string()],
2553            vec![],
2554        );
2555        assert!(config.is_ok());
2556    }
2557
2558    #[test]
2559    fn nullable_column_rejected_in_index() {
2560        let config = KvTableConfig::new(
2561            0,
2562            vec![
2563                TableColumnConfig::new("id", DataType::Int64, false),
2564                TableColumnConfig::new("name", DataType::Utf8, true),
2565            ],
2566            vec!["id".to_string()],
2567            vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2568        )
2569        .unwrap();
2570        let model = TableModel::from_config(&config).unwrap();
2571        let result = model.resolve_index_specs(&config.index_specs);
2572        assert!(result.is_err());
2573        assert!(result.unwrap_err().contains("nullable"));
2574    }
2575
2576    #[test]
2577    fn base_row_round_trip_with_null() {
2578        let config = KvTableConfig::new(
2579            0,
2580            vec![
2581                TableColumnConfig::new("id", DataType::Int64, false),
2582                TableColumnConfig::new("label", DataType::Utf8, true),
2583                TableColumnConfig::new("score", DataType::Int64, true),
2584            ],
2585            vec!["id".to_string()],
2586            vec![],
2587        )
2588        .unwrap();
2589        let model = TableModel::from_config(&config).unwrap();
2590        let row = KvRow {
2591            values: vec![CellValue::Int64(1), CellValue::Null, CellValue::Int64(42)],
2592        };
2593        let encoded = encode_base_row_value(&row, &model).unwrap();
2594        let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).unwrap();
2595        assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
2596        assert!(matches!(&decoded.values[1], CellValue::Null));
2597        assert!(matches!(&decoded.values[2], CellValue::Int64(42)));
2598    }
2599
2600    #[test]
2601    fn null_does_not_match_equality_constraint() {
2602        assert!(!matches_constraint(
2603            &CellValue::Null,
2604            &PredicateConstraint::StringEq("x".to_string())
2605        ));
2606        assert!(!matches_constraint(
2607            &CellValue::Null,
2608            &PredicateConstraint::IntRange {
2609                min: Some(0),
2610                max: Some(10)
2611            }
2612        ));
2613    }
2614
2615    #[test]
2616    fn is_null_constraint_matches() {
2617        assert!(matches_constraint(
2618            &CellValue::Null,
2619            &PredicateConstraint::IsNull
2620        ));
2621        assert!(!matches_constraint(
2622            &CellValue::Utf8("x".to_string()),
2623            &PredicateConstraint::IsNull
2624        ));
2625        assert!(!matches_constraint(
2626            &CellValue::Null,
2627            &PredicateConstraint::IsNotNull
2628        ));
2629        assert!(matches_constraint(
2630            &CellValue::Int64(5),
2631            &PredicateConstraint::IsNotNull
2632        ));
2633    }
2634
2635    #[test]
2636    fn string_in_constraint_matches() {
2637        let constraint =
2638            PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]);
2639        assert!(matches_constraint(
2640            &CellValue::Utf8("us-east".to_string()),
2641            &constraint,
2642        ));
2643        assert!(matches_constraint(
2644            &CellValue::Utf8("us-west".to_string()),
2645            &constraint,
2646        ));
2647        assert!(!matches_constraint(
2648            &CellValue::Utf8("eu-central".to_string()),
2649            &constraint,
2650        ));
2651    }
2652
2653    #[test]
2654    fn int_in_constraint_matches() {
2655        let constraint = PredicateConstraint::IntIn(vec![1, 2, 3]);
2656        assert!(matches_constraint(&CellValue::Int64(1), &constraint));
2657        assert!(matches_constraint(&CellValue::Int64(3), &constraint));
2658        assert!(!matches_constraint(&CellValue::Int64(4), &constraint));
2659    }
2660
2661    #[test]
2662    fn in_predicate_generates_multiple_index_ranges() {
2663        let (model, specs) = test_model();
2664        let region_idx = *model.columns_by_name.get("region").unwrap();
2665        let mut pred = QueryPredicate::default();
2666        pred.constraints.insert(
2667            region_idx,
2668            PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]),
2669        );
2670        let plan = pred
2671            .choose_index_plan(&model, &specs)
2672            .expect("plan")
2673            .expect("should find index");
2674        assert_eq!(plan.ranges.len(), 2);
2675    }
2676
2677    #[test]
2678    fn int_in_generates_multiple_pk_ranges() {
2679        let (model, _specs) = test_model();
2680        let mut pred = QueryPredicate::default();
2681        pred.constraints.insert(
2682            model.primary_key_indices[0],
2683            PredicateConstraint::IntIn(vec![100, 200, 300]),
2684        );
2685        let ranges = pred.primary_key_ranges(&model).unwrap();
2686        assert_eq!(ranges.len(), 3);
2687    }
2688
2689    #[test]
2690    fn duplicate_int_in_values_deduplicated() {
2691        let (model, _specs) = test_model();
2692        // Use the PK column "order_id" for the IN list
2693        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2694            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2695                "order_id",
2696            ))),
2697            list: vec![
2698                Expr::Literal(ScalarValue::Int64(Some(5)), None),
2699                Expr::Literal(ScalarValue::Int64(Some(5)), None),
2700                Expr::Literal(ScalarValue::Int64(Some(10)), None),
2701            ],
2702            negated: false,
2703        });
2704        let pred = QueryPredicate::from_filters(&[filter], &model);
2705        let ranges = pred.primary_key_ranges(&model).unwrap();
2706        assert_eq!(
2707            ranges.len(),
2708            2,
2709            "duplicate IN values must be deduped, producing 2 ranges not 3"
2710        );
2711    }
2712
2713    #[test]
2714    fn duplicate_uint64_in_values_deduplicated() {
2715        let config = KvTableConfig::new(
2716            0,
2717            vec![
2718                TableColumnConfig::new("id", DataType::UInt64, false),
2719                TableColumnConfig::new("name", DataType::Utf8, false),
2720            ],
2721            vec!["id".to_string()],
2722            vec![],
2723        )
2724        .unwrap();
2725        let model = TableModel::from_config(&config).unwrap();
2726        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2727            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2728                "id",
2729            ))),
2730            list: vec![
2731                Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2732                Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2733                Expr::Literal(ScalarValue::UInt64(Some(200)), None),
2734            ],
2735            negated: false,
2736        });
2737        let pred = QueryPredicate::from_filters(&[filter], &model);
2738        let ranges = pred.primary_key_ranges(&model).unwrap();
2739        assert_eq!(
2740            ranges.len(),
2741            2,
2742            "duplicate UInt64 IN values must be deduped"
2743        );
2744    }
2745
2746    #[test]
2747    fn duplicate_fixed_binary_in_values_deduplicated() {
2748        let config = KvTableConfig::new(
2749            0,
2750            vec![
2751                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
2752                TableColumnConfig::new("val", DataType::Int64, false),
2753            ],
2754            vec!["hash".to_string()],
2755            vec![],
2756        )
2757        .unwrap();
2758        let model = TableModel::from_config(&config).unwrap();
2759        let dup_val = vec![0xAA; 16];
2760        let other_val = vec![0xBB; 16];
2761        let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2762            expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2763                "hash",
2764            ))),
2765            list: vec![
2766                Expr::Literal(
2767                    ScalarValue::FixedSizeBinary(16, Some(dup_val.clone())),
2768                    None,
2769                ),
2770                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(dup_val)), None),
2771                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(other_val)), None),
2772            ],
2773            negated: false,
2774        });
2775        let pred = QueryPredicate::from_filters(&[filter], &model);
2776        let ranges = pred.primary_key_ranges(&model).unwrap();
2777        assert_eq!(
2778            ranges.len(),
2779            2,
2780            "duplicate FixedBinary IN values must be deduped"
2781        );
2782    }
2783
2784    #[test]
2785    fn or_equalities_extracted_as_in_list() {
2786        let (model, _) = test_model();
2787        let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2788            left: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2789                left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2790                    "region",
2791                ))),
2792                op: Operator::Eq,
2793                right: Box::new(Expr::Literal(
2794                    ScalarValue::Utf8(Some("us-east".to_string())),
2795                    None,
2796                )),
2797            })),
2798            op: Operator::Or,
2799            right: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2800                left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2801                    "region",
2802                ))),
2803                op: Operator::Eq,
2804                right: Box::new(Expr::Literal(
2805                    ScalarValue::Utf8(Some("us-west".to_string())),
2806                    None,
2807                )),
2808            })),
2809        });
2810        let result = extract_or_in_column(&expr, &model);
2811        assert!(result.is_some());
2812        let (col, vals) = result.unwrap();
2813        assert_eq!(col, "region");
2814        assert_eq!(vals.len(), 2);
2815    }
2816
2817    #[test]
2818    fn or_equalities_on_float64_are_not_pushdown_supported() {
2819        let config = KvTableConfig::new(
2820            0,
2821            vec![
2822                TableColumnConfig::new("id", DataType::Int64, false),
2823                TableColumnConfig::new("score", DataType::Float64, false),
2824            ],
2825            vec!["id".to_string()],
2826            vec![],
2827        )
2828        .unwrap();
2829        let model = TableModel::from_config(&config).unwrap();
2830
2831        use datafusion::logical_expr::col;
2832        let filter = col("score")
2833            .eq(Expr::Literal(ScalarValue::Float64(Some(1.0)), None))
2834            .or(col("score").eq(Expr::Literal(ScalarValue::Float64(Some(2.0)), None)));
2835
2836        assert!(
2837            !QueryPredicate::supports_filter(&filter, &model),
2838            "OR-equality pushdown should be disabled for Float64 because apply_in_list cannot enforce it"
2839        );
2840
2841        let pred = QueryPredicate::from_filters(&[filter], &model);
2842        assert!(!pred.contradiction);
2843        assert!(
2844            pred.constraints.is_empty(),
2845            "unsupported OR predicate must not contribute pushdown constraints"
2846        );
2847    }
2848
2849    #[test]
2850    fn batch_writer_encodes_rows_across_tables() {
2851        let client = StoreClient::new("http://localhost:10000");
2852        let schema = KvSchema::new(client)
2853            .table(
2854                "customers",
2855                vec![
2856                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2857                    TableColumnConfig::new("name", DataType::Utf8, false),
2858                ],
2859                vec!["customer_id".to_string()],
2860                vec![],
2861            )
2862            .unwrap()
2863            .table(
2864                "orders",
2865                vec![
2866                    TableColumnConfig::new("order_id", DataType::Int64, false),
2867                    TableColumnConfig::new("customer_id", DataType::Int64, false),
2868                    TableColumnConfig::new("amount", DataType::Int64, false),
2869                ],
2870                vec!["order_id".to_string()],
2871                vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2872            )
2873            .unwrap();
2874
2875        let mut batch = schema.batch_writer();
2876        batch
2877            .insert(
2878                "customers",
2879                vec![CellValue::Int64(1), CellValue::Utf8("Alice".to_string())],
2880            )
2881            .unwrap();
2882        batch
2883            .insert(
2884                "orders",
2885                vec![
2886                    CellValue::Int64(100),
2887                    CellValue::Int64(1),
2888                    CellValue::Int64(4999),
2889                ],
2890            )
2891            .unwrap();
2892        batch
2893            .insert(
2894                "orders",
2895                vec![
2896                    CellValue::Int64(101),
2897                    CellValue::Int64(1),
2898                    CellValue::Int64(2999),
2899                ],
2900            )
2901            .unwrap();
2902
2903        // 1 customer base row + 2 order base rows + 2 order index rows = 5
2904        assert_eq!(batch.pending_count(), 5);
2905    }
2906
2907    #[test]
2908    fn batch_writer_rejects_unknown_table() {
2909        let client = StoreClient::new("http://localhost:10000");
2910        let schema = KvSchema::new(client)
2911            .table(
2912                "t1",
2913                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2914                vec!["id".to_string()],
2915                vec![],
2916            )
2917            .unwrap();
2918
2919        let mut batch = schema.batch_writer();
2920        let result = batch.insert("nonexistent", vec![CellValue::Int64(1)]);
2921        assert!(result.is_err());
2922        assert!(result.unwrap_err().contains("unknown table"));
2923    }
2924
2925    #[test]
2926    fn batch_writer_rejects_wrong_column_count() {
2927        let client = StoreClient::new("http://localhost:10000");
2928        let schema = KvSchema::new(client)
2929            .table(
2930                "t1",
2931                vec![
2932                    TableColumnConfig::new("id", DataType::Int64, false),
2933                    TableColumnConfig::new("name", DataType::Utf8, false),
2934                ],
2935                vec!["id".to_string()],
2936                vec![],
2937            )
2938            .unwrap();
2939
2940        let mut batch = schema.batch_writer();
2941        let result = batch.insert("t1", vec![CellValue::Int64(1)]);
2942        assert!(result.is_err());
2943        assert!(result.unwrap_err().contains("expected 2"));
2944    }
2945
2946    #[test]
2947    fn batch_writer_rejects_non_pk_type_mismatch() {
2948        let client = StoreClient::new("http://localhost:10000");
2949        let schema = KvSchema::new(client)
2950            .table(
2951                "t1",
2952                vec![
2953                    TableColumnConfig::new("id", DataType::Int64, false),
2954                    TableColumnConfig::new("amount", DataType::Int64, false),
2955                ],
2956                vec!["id".to_string()],
2957                vec![],
2958            )
2959            .unwrap();
2960
2961        let mut batch = schema.batch_writer();
2962        let result = batch.insert(
2963            "t1",
2964            vec![CellValue::Int64(1), CellValue::Utf8("bad".to_string())],
2965        );
2966        assert!(result.is_err());
2967        assert!(
2968            result.unwrap_err().contains("type mismatch"),
2969            "non-PK schema-invalid values must be rejected at insert-time"
2970        );
2971    }
2972
2973    #[test]
2974    fn batch_writer_entries_use_distinct_table_prefixes() {
2975        let client = StoreClient::new("http://localhost:10000");
2976        let schema = KvSchema::new(client)
2977            .table(
2978                "a",
2979                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2980                vec!["id".to_string()],
2981                vec![],
2982            )
2983            .unwrap()
2984            .table(
2985                "b",
2986                vec![TableColumnConfig::new("id", DataType::Int64, false)],
2987                vec!["id".to_string()],
2988                vec![],
2989            )
2990            .unwrap();
2991
2992        let mut batch = schema.batch_writer();
2993        batch.insert("a", vec![CellValue::Int64(42)]).unwrap();
2994        batch.insert("b", vec![CellValue::Int64(42)]).unwrap();
2995
2996        assert_eq!(batch.pending_count(), 2);
2997        assert_ne!(
2998            batch.pending_keys[0], batch.pending_keys[1],
2999            "same PK in different tables must produce different keys"
3000        );
3001        assert_ne!(
3002            batch.pending_keys[0][0], batch.pending_keys[1][0],
3003            "table prefix byte must differ"
3004        );
3005    }
3006
3007    #[tokio::test]
3008    async fn batch_writer_trait_failure_requeues_prepared_before_new_pending() {
3009        let client = StoreClient::new("http://localhost:10000");
3010        let schema = KvSchema::new(client)
3011            .table(
3012                "t",
3013                vec![TableColumnConfig::new("id", DataType::Int64, false)],
3014                vec!["id".to_string()],
3015                vec![],
3016            )
3017            .unwrap();
3018
3019        let mut batch = schema.batch_writer();
3020        batch.insert("t", vec![CellValue::Int64(1)]).unwrap();
3021        let prepared = batch.prepare_flush().unwrap().expect("prepared row");
3022        assert_eq!(prepared.request_id(), 0);
3023        assert_eq!(prepared.entry_count(), 1);
3024
3025        batch.insert("t", vec![CellValue::Int64(2)]).unwrap();
3026        StoreBatchUpload::mark_upload_failed(&batch, prepared, "commit failed".to_string()).await;
3027        assert_eq!(batch.pending_count(), 2);
3028
3029        let retry = batch.prepare_flush().unwrap().expect("retry row");
3030        assert_eq!(retry.request_id(), 0);
3031        assert_eq!(retry.entry_count(), 1);
3032        let next = batch.prepare_flush().unwrap().expect("new pending row");
3033        assert_eq!(next.request_id(), 1);
3034        assert_eq!(next.entry_count(), 1);
3035    }
3036
3037    #[test]
3038    fn batch_writer_supports_nullable_columns() {
3039        let client = StoreClient::new("http://localhost:10000");
3040        let schema = KvSchema::new(client)
3041            .table(
3042                "t",
3043                vec![
3044                    TableColumnConfig::new("id", DataType::Int64, false),
3045                    TableColumnConfig::new("note", DataType::Utf8, true),
3046                ],
3047                vec!["id".to_string()],
3048                vec![],
3049            )
3050            .unwrap();
3051
3052        let mut batch = schema.batch_writer();
3053        batch
3054            .insert("t", vec![CellValue::Int64(1), CellValue::Null])
3055            .unwrap();
3056        assert_eq!(batch.pending_count(), 1);
3057    }
3058
3059    #[test]
3060    fn non_nullable_column_rejects_null_in_batch_writer() {
3061        let client = StoreClient::new("http://localhost:10000");
3062        let schema = KvSchema::new(client)
3063            .table(
3064                "t",
3065                vec![
3066                    TableColumnConfig::new("id", DataType::Int64, false),
3067                    TableColumnConfig::new("name", DataType::Utf8, false),
3068                    TableColumnConfig::new("note", DataType::Utf8, true),
3069                ],
3070                vec!["id".to_string()],
3071                vec![],
3072            )
3073            .unwrap();
3074
3075        // NULL in non-nullable column "name" must fail
3076        let mut batch = schema.batch_writer();
3077        let result = batch.insert(
3078            "t",
3079            vec![
3080                CellValue::Int64(1),
3081                CellValue::Null,
3082                CellValue::Utf8("ok".to_string()),
3083            ],
3084        );
3085        assert!(result.is_err());
3086        assert!(
3087            result.unwrap_err().contains("not nullable"),
3088            "error should mention non-nullable constraint"
3089        );
3090
3091        // NULL in nullable column "note" must succeed
3092        let mut batch = schema.batch_writer();
3093        batch
3094            .insert(
3095                "t",
3096                vec![
3097                    CellValue::Int64(1),
3098                    CellValue::Utf8("Alice".to_string()),
3099                    CellValue::Null,
3100                ],
3101            )
3102            .unwrap();
3103        assert_eq!(batch.pending_count(), 1);
3104
3105        // All non-null values must succeed
3106        let mut batch = schema.batch_writer();
3107        batch
3108            .insert(
3109                "t",
3110                vec![
3111                    CellValue::Int64(1),
3112                    CellValue::Utf8("Alice".to_string()),
3113                    CellValue::Utf8("hello".to_string()),
3114                ],
3115            )
3116            .unwrap();
3117        assert_eq!(batch.pending_count(), 1);
3118    }
3119
3120    #[test]
3121    fn uint64_column_accepted() {
3122        let config = KvTableConfig::new(
3123            0,
3124            vec![
3125                TableColumnConfig::new("id", DataType::UInt64, false),
3126                TableColumnConfig::new("name", DataType::Utf8, false),
3127            ],
3128            vec!["id".to_string()],
3129            vec![],
3130        );
3131        assert!(config.is_ok());
3132    }
3133
3134    #[test]
3135    fn uint64_primary_key_round_trip() {
3136        let config = KvTableConfig::new(
3137            0,
3138            vec![
3139                TableColumnConfig::new("id", DataType::UInt64, false),
3140                TableColumnConfig::new("label", DataType::Utf8, false),
3141            ],
3142            vec!["id".to_string()],
3143            vec![],
3144        )
3145        .unwrap();
3146        let model = TableModel::from_config(&config).unwrap();
3147        let row = KvRow {
3148            values: vec![
3149                CellValue::UInt64(u64::MAX),
3150                CellValue::Utf8("max".to_string()),
3151            ],
3152        };
3153        let encoded = encode_base_row_value(&row, &model).unwrap();
3154        let pk = row
3155            .primary_key_values(&model)
3156            .into_iter()
3157            .cloned()
3158            .collect::<Vec<_>>();
3159        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3160        assert!(matches!(&decoded.values[0], CellValue::UInt64(v) if *v == u64::MAX));
3161        assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "max"));
3162    }
3163
3164    #[test]
3165    fn string_primary_key_accepted() {
3166        let config = KvTableConfig::new(
3167            0,
3168            vec![
3169                TableColumnConfig::new("code", DataType::Utf8, false),
3170                TableColumnConfig::new("value", DataType::Int64, false),
3171            ],
3172            vec!["code".to_string()],
3173            vec![],
3174        );
3175        assert!(config.is_ok());
3176    }
3177
3178    #[test]
3179    fn fixed_binary_primary_key_round_trip() {
3180        let config = KvTableConfig::new(
3181            0,
3182            vec![
3183                TableColumnConfig::new("hash", DataType::FixedSizeBinary(32), false),
3184                TableColumnConfig::new("amount", DataType::Int64, false),
3185            ],
3186            vec!["hash".to_string()],
3187            vec![],
3188        )
3189        .unwrap();
3190        let model = TableModel::from_config(&config).unwrap();
3191        let hash_val = vec![0xABu8; 32];
3192        let row = KvRow {
3193            values: vec![
3194                CellValue::FixedBinary(hash_val.clone()),
3195                CellValue::Int64(100),
3196            ],
3197        };
3198        let encoded = encode_base_row_value(&row, &model).unwrap();
3199        let pk = row
3200            .primary_key_values(&model)
3201            .into_iter()
3202            .cloned()
3203            .collect::<Vec<_>>();
3204        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3205        assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if *v == hash_val));
3206    }
3207
3208    #[test]
3209    fn fixed_binary_key_rejects_wrong_length() {
3210        let config = KvTableConfig::new(
3211            0,
3212            vec![
3213                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3214                TableColumnConfig::new("amount", DataType::Int64, false),
3215            ],
3216            vec!["hash".to_string()],
3217            vec![],
3218        )
3219        .unwrap();
3220        let model = TableModel::from_config(&config).unwrap();
3221
3222        // Too short (10 bytes for a 16-byte column)
3223        let short_row = KvRow {
3224            values: vec![CellValue::FixedBinary(vec![0xAB; 10]), CellValue::Int64(1)],
3225        };
3226        let result = encode_primary_key_from_row(model.table_prefix, &short_row, &model);
3227        assert!(result.is_err());
3228        assert!(
3229            result.unwrap_err().contains("requires exactly 16 bytes"),
3230            "should mention exact width requirement"
3231        );
3232
3233        // Too long (20 bytes for a 16-byte column)
3234        let long_row = KvRow {
3235            values: vec![CellValue::FixedBinary(vec![0xCD; 20]), CellValue::Int64(2)],
3236        };
3237        let result = encode_primary_key_from_row(model.table_prefix, &long_row, &model);
3238        assert!(result.is_err());
3239        assert!(result.unwrap_err().contains("requires exactly 16 bytes"));
3240
3241        // Exact length (16 bytes) — must succeed
3242        let ok_row = KvRow {
3243            values: vec![CellValue::FixedBinary(vec![0xEF; 16]), CellValue::Int64(3)],
3244        };
3245        assert!(encode_primary_key_from_row(model.table_prefix, &ok_row, &model).is_ok());
3246    }
3247
3248    #[test]
3249    fn fixed_binary_index_key_rejects_wrong_length() {
3250        let config = KvTableConfig::new(
3251            0,
3252            vec![
3253                TableColumnConfig::new("id", DataType::Int64, false),
3254                TableColumnConfig::new("tag", DataType::FixedSizeBinary(8), false),
3255            ],
3256            vec!["id".to_string()],
3257            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3258        )
3259        .unwrap();
3260        let model = TableModel::from_config(&config).unwrap();
3261        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3262
3263        // Wrong length (4 bytes for an 8-byte column)
3264        let bad_row = KvRow {
3265            values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x01; 4])],
3266        };
3267        let result = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &bad_row);
3268        assert!(result.is_err());
3269        assert!(result.unwrap_err().contains("requires exactly 8 bytes"));
3270
3271        // Correct length (8 bytes)
3272        let ok_row = KvRow {
3273            values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x02; 8])],
3274        };
3275        assert!(encode_secondary_index_key(model.table_prefix, &specs[0], &model, &ok_row).is_ok());
3276    }
3277
3278    #[test]
3279    fn decimal256_column_round_trip() {
3280        let config = KvTableConfig::new(
3281            0,
3282            vec![
3283                TableColumnConfig::new("id", DataType::Int64, false),
3284                TableColumnConfig::new("balance", DataType::Decimal256(76, 0), false),
3285            ],
3286            vec!["id".to_string()],
3287            vec![],
3288        )
3289        .unwrap();
3290        let model = TableModel::from_config(&config).unwrap();
3291        let big_val = i256::from(123456789012345i64);
3292        let row = KvRow {
3293            values: vec![CellValue::Int64(1), CellValue::Decimal256(big_val)],
3294        };
3295        let encoded = encode_base_row_value(&row, &model).unwrap();
3296        let pk = row
3297            .primary_key_values(&model)
3298            .into_iter()
3299            .cloned()
3300            .collect::<Vec<_>>();
3301        let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3302        assert!(matches!(&decoded.values[1], CellValue::Decimal256(v) if *v == big_val));
3303    }
3304
3305    #[test]
3306    fn float64_primary_key_rejected() {
3307        let config = KvTableConfig::new(
3308            0,
3309            vec![TableColumnConfig::new("id", DataType::Float64, false)],
3310            vec!["id".to_string()],
3311            vec![],
3312        );
3313        assert!(config.is_err());
3314    }
3315
3316    #[test]
3317    fn i256_ordered_encoding_round_trip() {
3318        let values = [
3319            i256::from_i128(i128::MIN),
3320            i256::from(-1i64),
3321            i256::from(0i64),
3322            i256::from(1i64),
3323            i256::from_i128(i128::MAX),
3324        ];
3325        for v in values {
3326            assert_eq!(decode_i256_ordered(encode_i256_ordered(v)), v);
3327        }
3328        let encoded: Vec<[u8; 32]> = values.iter().map(|v| encode_i256_ordered(*v)).collect();
3329        for i in 0..encoded.len() - 1 {
3330            assert!(encoded[i] < encoded[i + 1]);
3331        }
3332    }
3333
3334    #[test]
3335    fn uint64_primary_key_encode_decode() {
3336        let config = KvTableConfig::new(
3337            5,
3338            vec![
3339                TableColumnConfig::new("id", DataType::UInt64, false),
3340                TableColumnConfig::new("name", DataType::Utf8, false),
3341            ],
3342            vec!["id".to_string()],
3343            vec![],
3344        )
3345        .unwrap();
3346        let model = TableModel::from_config(&config).unwrap();
3347        let pk = CellValue::UInt64(12345);
3348        let key = encode_primary_key(5, &[&pk], &model).expect("pk key encodes");
3349        let decoded = decode_primary_key(5, &key, &model).unwrap();
3350        assert!(matches!(&decoded[0], CellValue::UInt64(12345)));
3351    }
3352
3353    #[test]
3354    fn utf8_primary_key_encode_decode() {
3355        let config = KvTableConfig::new(
3356            3,
3357            vec![
3358                TableColumnConfig::new("code", DataType::Utf8, false),
3359                TableColumnConfig::new("val", DataType::Int64, false),
3360            ],
3361            vec!["code".to_string()],
3362            vec![],
3363        )
3364        .unwrap();
3365        let model = TableModel::from_config(&config).unwrap();
3366        let pk = CellValue::Utf8("HELLO".to_string());
3367        let key = encode_primary_key(3, &[&pk], &model).expect("pk key encodes");
3368        let decoded = decode_primary_key(3, &key, &model).unwrap();
3369        assert!(matches!(&decoded[0], CellValue::Utf8(v) if v == "HELLO"));
3370    }
3371
3372    #[test]
3373    fn fixed_binary_primary_key_encode_decode() {
3374        let config = KvTableConfig::new(
3375            7,
3376            vec![
3377                TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3378                TableColumnConfig::new("val", DataType::Int64, false),
3379            ],
3380            vec!["hash".to_string()],
3381            vec![],
3382        )
3383        .unwrap();
3384        let model = TableModel::from_config(&config).unwrap();
3385        let data = vec![0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
3386        let pk = CellValue::FixedBinary(data.clone());
3387        let key = encode_primary_key(7, &[&pk], &model).expect("pk key encodes");
3388        let decoded = decode_primary_key(7, &key, &model).unwrap();
3389        assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == data));
3390    }
3391
3392    #[test]
3393    fn secondary_index_with_uint64_column() {
3394        let config = KvTableConfig::new(
3395            0,
3396            vec![
3397                TableColumnConfig::new("id", DataType::Int64, false),
3398                TableColumnConfig::new("counter", DataType::UInt64, false),
3399            ],
3400            vec!["id".to_string()],
3401            vec![IndexSpec::new("counter_idx", vec!["counter".to_string()]).unwrap()],
3402        )
3403        .unwrap();
3404        let model = TableModel::from_config(&config).unwrap();
3405        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3406        let row = KvRow {
3407            values: vec![CellValue::Int64(1), CellValue::UInt64(999)],
3408        };
3409        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3410        let decoded =
3411            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3412        let counter_idx = *model.columns_by_name.get("counter").unwrap();
3413        assert!(matches!(
3414            decoded.values.get(&counter_idx),
3415            Some(CellValue::UInt64(999))
3416        ));
3417        assert!(matches!(
3418            &decoded.primary_key_values[0],
3419            CellValue::Int64(1)
3420        ));
3421    }
3422
3423    #[test]
3424    fn secondary_index_with_decimal256_column() {
3425        let config = KvTableConfig::new(
3426            0,
3427            vec![
3428                TableColumnConfig::new("id", DataType::Int64, false),
3429                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
3430            ],
3431            vec!["id".to_string()],
3432            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
3433        )
3434        .unwrap();
3435        let model = TableModel::from_config(&config).unwrap();
3436        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3437        let val = i256::from(42i64);
3438        let row = KvRow {
3439            values: vec![CellValue::Int64(1), CellValue::Decimal256(val)],
3440        };
3441        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3442        let decoded =
3443            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3444        let big_idx = *model.columns_by_name.get("big_val").unwrap();
3445        assert!(matches!(
3446            decoded.values.get(&big_idx),
3447            Some(CellValue::Decimal256(v)) if *v == val
3448        ));
3449    }
3450
3451    // -----------------------------------------------------------------------
3452    // Composite primary key tests
3453    // -----------------------------------------------------------------------
3454
3455    #[test]
3456    fn composite_pk_config_accepted() {
3457        let config = KvTableConfig::new(
3458            0,
3459            vec![
3460                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3461                TableColumnConfig::new("version", DataType::UInt64, false),
3462                TableColumnConfig::new("data", DataType::Utf8, true),
3463            ],
3464            vec!["entity".to_string(), "version".to_string()],
3465            vec![],
3466        );
3467        assert!(config.is_ok());
3468        let c = config.unwrap();
3469        assert_eq!(c.primary_key_columns, vec!["entity", "version"]);
3470    }
3471
3472    #[test]
3473    fn composite_pk_rejects_unsupported_type() {
3474        let result = KvTableConfig::new(
3475            0,
3476            vec![
3477                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3478                TableColumnConfig::new("score", DataType::Float64, false),
3479            ],
3480            vec!["entity".to_string(), "score".to_string()],
3481            vec![],
3482        );
3483        assert!(result.is_err());
3484        let err = result.unwrap_err();
3485        assert!(
3486            err.contains("must be Int64") || err.contains("must be"),
3487            "expected PK type error, got: {err}"
3488        );
3489    }
3490
3491    #[test]
3492    fn composite_pk_rejects_too_wide() {
3493        let config = KvTableConfig::new(
3494            0,
3495            vec![
3496                TableColumnConfig::new("big", DataType::FixedSizeBinary(60), false),
3497                TableColumnConfig::new("ver", DataType::UInt64, false),
3498            ],
3499            vec!["big".to_string(), "ver".to_string()],
3500            vec![],
3501        )
3502        .expect("variable-length keys should allow wider composite PKs");
3503        let model = TableModel::from_config(&config).expect("model");
3504        assert_eq!(model.primary_key_width, 68);
3505    }
3506
3507    #[test]
3508    fn composite_pk_encode_decode_round_trip() {
3509        let config = KvTableConfig::new(
3510            1,
3511            vec![
3512                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3513                TableColumnConfig::new("version", DataType::UInt64, false),
3514                TableColumnConfig::new("title", DataType::Utf8, true),
3515            ],
3516            vec!["entity".to_string(), "version".to_string()],
3517            vec![],
3518        )
3519        .unwrap();
3520        let model = TableModel::from_config(&config).unwrap();
3521
3522        let entity = vec![0xAA; 32];
3523        let pk_entity = CellValue::FixedBinary(entity.clone());
3524        let pk_version = CellValue::UInt64(42);
3525        let key =
3526            encode_primary_key(1, &[&pk_entity, &pk_version], &model).expect("pk key encodes");
3527
3528        let decoded = decode_primary_key(1, &key, &model).unwrap();
3529        assert_eq!(decoded.len(), 2);
3530        assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == entity));
3531        assert!(matches!(&decoded[1], CellValue::UInt64(42)));
3532    }
3533
3534    #[test]
3535    fn composite_pk_version_sort_order() {
3536        let config = KvTableConfig::new(
3537            0,
3538            vec![
3539                TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3540                TableColumnConfig::new("version", DataType::UInt64, false),
3541            ],
3542            vec!["entity".to_string(), "version".to_string()],
3543            vec![],
3544        )
3545        .unwrap();
3546        let model = TableModel::from_config(&config).unwrap();
3547
3548        let entity = vec![0xBB; 32];
3549        let pk_entity = CellValue::FixedBinary(entity.clone());
3550
3551        let key_v1 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(1)], &model)
3552            .expect("pk key encodes");
3553        let key_v10 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(10)], &model)
3554            .expect("pk key encodes");
3555        let key_v100 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(100)], &model)
3556            .expect("pk key encodes");
3557
3558        // Versions must sort numerically (big-endian U64)
3559        assert!(key_v1 < key_v10);
3560        assert!(key_v10 < key_v100);
3561    }
3562
3563    #[test]
3564    fn composite_pk_value_excludes_all_pk_columns() {
3565        let config = KvTableConfig::new(
3566            0,
3567            vec![
3568                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3569                TableColumnConfig::new("version", DataType::UInt64, false),
3570                TableColumnConfig::new("data", DataType::Utf8, true),
3571            ],
3572            vec!["entity".to_string(), "version".to_string()],
3573            vec![],
3574        )
3575        .unwrap();
3576        let model = TableModel::from_config(&config).unwrap();
3577
3578        let row = KvRow {
3579            values: vec![
3580                CellValue::FixedBinary(vec![0xCC; 16]),
3581                CellValue::UInt64(7),
3582                CellValue::Utf8("hello".to_string()),
3583            ],
3584        };
3585        let encoded = encode_base_row_value(&row, &model).unwrap();
3586        // Both PK columns should be None in stored value
3587        let decoded = decode_base_row(
3588            vec![CellValue::FixedBinary(vec![0xCC; 16]), CellValue::UInt64(7)],
3589            &encoded,
3590            &model,
3591        )
3592        .unwrap();
3593        assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if v.len() == 16));
3594        assert!(matches!(&decoded.values[1], CellValue::UInt64(7)));
3595        assert!(matches!(&decoded.values[2], CellValue::Utf8(v) if v == "hello"));
3596    }
3597
3598    #[test]
3599    fn composite_pk_secondary_index_appends_all_pk_columns() {
3600        let config = KvTableConfig::new(
3601            0,
3602            vec![
3603                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3604                TableColumnConfig::new("version", DataType::UInt64, false),
3605                TableColumnConfig::new("tag", DataType::Int64, false),
3606            ],
3607            vec!["entity".to_string(), "version".to_string()],
3608            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3609        )
3610        .unwrap();
3611        let model = TableModel::from_config(&config).unwrap();
3612        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3613
3614        let entity_data = vec![0xDD; 16];
3615        let row = KvRow {
3616            values: vec![
3617                CellValue::FixedBinary(entity_data.clone()),
3618                CellValue::UInt64(99),
3619                CellValue::Int64(42),
3620            ],
3621        };
3622        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3623        let decoded =
3624            decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3625
3626        assert_eq!(decoded.primary_key_values.len(), 2);
3627        assert!(matches!(
3628            &decoded.primary_key_values[0],
3629            CellValue::FixedBinary(v) if *v == entity_data
3630        ));
3631        assert!(matches!(
3632            &decoded.primary_key_values[1],
3633            CellValue::UInt64(99)
3634        ));
3635        let tag_idx = *model.columns_by_name.get("tag").unwrap();
3636        assert!(matches!(
3637            decoded.values.get(&tag_idx),
3638            Some(CellValue::Int64(42))
3639        ));
3640    }
3641
3642    #[test]
3643    fn table_versioned_convenience() {
3644        let client = StoreClient::new("http://localhost:10000");
3645        let schema = KvSchema::new(client)
3646            .table_versioned(
3647                "documents",
3648                vec![
3649                    TableColumnConfig::new("doc_id", DataType::FixedSizeBinary(32), false),
3650                    TableColumnConfig::new("version", DataType::UInt64, false),
3651                    TableColumnConfig::new("title", DataType::Utf8, false),
3652                ],
3653                "doc_id",
3654                "version",
3655                vec![],
3656            )
3657            .unwrap();
3658        assert_eq!(schema.table_count(), 1);
3659    }
3660
3661    #[test]
3662    fn single_column_pk_backward_compat() {
3663        // Ensure single-column PK still works identically
3664        let config = KvTableConfig::new(
3665            0,
3666            vec![
3667                TableColumnConfig::new("id", DataType::Int64, false),
3668                TableColumnConfig::new("name", DataType::Utf8, true),
3669            ],
3670            vec!["id".to_string()],
3671            vec![],
3672        )
3673        .unwrap();
3674        let model = TableModel::from_config(&config).unwrap();
3675        assert_eq!(model.primary_key_indices.len(), 1);
3676        assert_eq!(model.primary_key_indices[0], 0);
3677        assert_eq!(model.primary_key_width, 8);
3678
3679        let pk = CellValue::Int64(42);
3680        let key = encode_primary_key(0, &[&pk], &model).expect("pk key encodes");
3681        let decoded = decode_primary_key(0, &key, &model).unwrap();
3682        assert_eq!(decoded.len(), 1);
3683        assert!(matches!(&decoded[0], CellValue::Int64(42)));
3684    }
3685
3686    #[test]
3687    fn partial_prefix_upper_bound_fills_trailing_pk_bytes() {
3688        // Regression: encode_primary_key_bound with partial prefix must
3689        // fill 0xFF from the end of the encoded prefix, not from the
3690        // end of the full PK width. Otherwise trailing PK column bytes
3691        // stay 0x00, producing an end key that's too low.
3692        let config = KvTableConfig::new(
3693            0,
3694            vec![
3695                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3696                TableColumnConfig::new("version", DataType::UInt64, false),
3697            ],
3698            vec!["entity".to_string(), "version".to_string()],
3699            vec![],
3700        )
3701        .unwrap();
3702        let model = TableModel::from_config(&config).unwrap();
3703        assert_eq!(model.primary_key_width, 24); // 16 + 8
3704
3705        let entity = CellValue::FixedBinary(vec![0xAA; 16]);
3706        // Partial prefix: only entity, no version
3707        let upper =
3708            encode_primary_key_bound(0, &[&entity], &model, true).expect("pk bound encodes");
3709
3710        // Entity bytes must be encoded
3711        assert_eq!(primary_payload(&model, &upper, 0, 16), vec![0xAA; 16]);
3712        // Version bytes (8 bytes after entity) MUST be 0xFF, not 0x00
3713        assert_eq!(
3714            primary_payload(&model, &upper, 16, 8),
3715            vec![0xFF; 8],
3716            "trailing PK column (version) must be 0xFF for upper bound"
3717        );
3718        // Everything after PK region also 0xFF
3719        assert!(primary_payload(
3720            &model,
3721            &upper,
3722            24,
3723            model.primary_key_codec.payload_capacity_bytes() - 24
3724        )
3725        .iter()
3726        .all(|&b| b == 0xFF));
3727
3728        // Lower bound: trailing bytes should be 0x00
3729        let lower =
3730            encode_primary_key_bound(0, &[&entity], &model, false).expect("pk bound encodes");
3731        assert_eq!(primary_payload(&model, &lower, 0, 16), vec![0xAA; 16]);
3732        assert_eq!(
3733            primary_payload(&model, &lower, 16, 8),
3734            vec![0x00; 8],
3735            "trailing PK column (version) must be 0x00 for lower bound"
3736        );
3737    }
3738
3739    // -----------------------------------------------------------------------
3740    // Composite PK filter pushdown tests
3741    // -----------------------------------------------------------------------
3742
3743    #[test]
3744    fn composite_pk_range_pushdown_entity_eq_version_lte() {
3745        // PK = (entity: FixedSizeBinary(16), version: UInt64)
3746        // Query: entity = X'CC..CC' AND version <= 42
3747        // Should produce a TIGHT range, not a full table scan.
3748        let config = KvTableConfig::new(
3749            0,
3750            vec![
3751                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3752                TableColumnConfig::new("version", DataType::UInt64, false),
3753                TableColumnConfig::new("data", DataType::Utf8, true),
3754            ],
3755            vec!["entity".to_string(), "version".to_string()],
3756            vec![],
3757        )
3758        .unwrap();
3759        let model = TableModel::from_config(&config).unwrap();
3760
3761        // Simulate predicate: entity = X'CC..CC' AND version <= 42
3762        let mut pred = QueryPredicate::default();
3763        pred.constraints
3764            .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xCC; 16]));
3765        pred.constraints.insert(
3766            1,
3767            PredicateConstraint::UInt64Range {
3768                min: None,
3769                max: Some(42),
3770            },
3771        );
3772
3773        let ranges = pred.primary_key_ranges(&model).unwrap();
3774        assert_eq!(ranges.len(), 1, "should produce exactly one range");
3775
3776        let range = &ranges[0];
3777
3778        // The start key should encode entity=CC..CC, version=0
3779        let expected_start = encode_primary_key(
3780            0,
3781            &[
3782                &CellValue::FixedBinary(vec![0xCC; 16]),
3783                &CellValue::UInt64(0),
3784            ],
3785            &model,
3786        )
3787        .expect("pk key encodes");
3788        assert_eq!(
3789            range.start, expected_start,
3790            "start should be entity=CC..CC, version=0"
3791        );
3792
3793        // The end key should encode entity=CC..CC, version=42, then 0xFF tail
3794        let expected_end_prefix = encode_primary_key(
3795            0,
3796            &[
3797                &CellValue::FixedBinary(vec![0xCC; 16]),
3798                &CellValue::UInt64(42),
3799            ],
3800            &model,
3801        )
3802        .expect("pk key encodes");
3803        // The end key has 0xFF-filled tail after the PK portion
3804        assert_eq!(
3805            primary_payload(&model, &range.end, 0, model.primary_key_width),
3806            primary_payload(&model, &expected_end_prefix, 0, model.primary_key_width),
3807            "end prefix should be entity=CC..CC, version=42"
3808        );
3809        // Trailing bytes after PK should be 0xFF
3810        assert!(
3811            primary_payload(
3812                &model,
3813                &range.end,
3814                model.primary_key_width,
3815                model.primary_key_codec.payload_capacity_bytes() - model.primary_key_width
3816            )
3817            .iter()
3818            .all(|&b| b == 0xFF),
3819            "end trailing bytes should be 0xFF"
3820        );
3821
3822        // Crucially, the range must NOT be a full table scan
3823        let full_range = primary_key_prefix_range(0);
3824        assert_ne!(
3825            range.start, full_range.start,
3826            "range must not be a full table scan"
3827        );
3828    }
3829
3830    #[test]
3831    fn composite_pk_range_pushdown_entity_eq_only() {
3832        // PK = (entity: FixedSizeBinary(16), version: UInt64)
3833        // Query: entity = X'DD..DD' (no version constraint)
3834        // Should still produce a tight entity-prefix range.
3835        let config = KvTableConfig::new(
3836            0,
3837            vec![
3838                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3839                TableColumnConfig::new("version", DataType::UInt64, false),
3840            ],
3841            vec!["entity".to_string(), "version".to_string()],
3842            vec![],
3843        )
3844        .unwrap();
3845        let model = TableModel::from_config(&config).unwrap();
3846
3847        let mut pred = QueryPredicate::default();
3848        pred.constraints
3849            .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xDD; 16]));
3850
3851        let ranges = pred.primary_key_ranges(&model).unwrap();
3852        assert_eq!(ranges.len(), 1);
3853
3854        let range = &ranges[0];
3855        // Start should have entity=DD..DD, version=0x00..00
3856        assert_eq!(primary_payload(&model, &range.start, 0, 16), vec![0xDD; 16]);
3857        // End should have entity=DD..DD, then 0xFF for version + tail
3858        assert_eq!(primary_payload(&model, &range.end, 0, 16), vec![0xDD; 16]);
3859        assert!(
3860            primary_payload(
3861                &model,
3862                &range.end,
3863                16,
3864                model.primary_key_codec.payload_capacity_bytes() - 16
3865            )
3866            .iter()
3867            .all(|&b| b == 0xFF),
3868            "after entity bytes, everything should be 0xFF"
3869        );
3870    }
3871
3872    #[test]
3873    fn fixed_binary_eq_constraint_extracted() {
3874        let config = KvTableConfig::new(
3875            0,
3876            vec![
3877                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3878                TableColumnConfig::new("version", DataType::UInt64, false),
3879            ],
3880            vec!["entity".to_string(), "version".to_string()],
3881            vec![],
3882        )
3883        .unwrap();
3884        let model = TableModel::from_config(&config).unwrap();
3885
3886        // Build an equality expression: entity = X'AA..AA'
3887        use datafusion::logical_expr::col;
3888        let entity_literal =
3889            Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None);
3890        let filter = col("entity").eq(entity_literal);
3891
3892        assert!(
3893            QueryPredicate::supports_filter(&filter, &model),
3894            "FixedSizeBinary equality should be supported"
3895        );
3896
3897        let pred = QueryPredicate::from_filters(&[filter], &model);
3898        assert!(
3899            matches!(
3900                pred.constraints.get(&0),
3901                Some(PredicateConstraint::FixedBinaryEq(v)) if *v == vec![0xAA; 16]
3902            ),
3903            "should extract FixedBinaryEq constraint"
3904        );
3905    }
3906
3907    #[test]
3908    fn uint64_range_constraint_extracted() {
3909        let config = KvTableConfig::new(
3910            0,
3911            vec![
3912                TableColumnConfig::new("version", DataType::UInt64, false),
3913                TableColumnConfig::new("data", DataType::Utf8, true),
3914            ],
3915            vec!["version".to_string()],
3916            vec![],
3917        )
3918        .unwrap();
3919        let model = TableModel::from_config(&config).unwrap();
3920
3921        use datafusion::logical_expr::col;
3922        let filter = col("version").lt_eq(Expr::Literal(ScalarValue::UInt64(Some(42)), None));
3923
3924        assert!(
3925            QueryPredicate::supports_filter(&filter, &model),
3926            "UInt64 range should be supported"
3927        );
3928
3929        let pred = QueryPredicate::from_filters(&[filter], &model);
3930        assert!(
3931            matches!(
3932                pred.constraints.get(&0),
3933                Some(PredicateConstraint::UInt64Range {
3934                    min: None,
3935                    max: Some(42)
3936                })
3937            ),
3938            "should extract UInt64Range with max=42"
3939        );
3940    }
3941
3942    #[test]
3943    fn uint64_range_constraint_supports_values_above_i64_max() {
3944        let config = KvTableConfig::new(
3945            0,
3946            vec![
3947                TableColumnConfig::new("version", DataType::UInt64, false),
3948                TableColumnConfig::new("data", DataType::Utf8, true),
3949            ],
3950            vec!["version".to_string()],
3951            vec![],
3952        )
3953        .unwrap();
3954        let model = TableModel::from_config(&config).unwrap();
3955
3956        let threshold = (1u64 << 63) + 5;
3957        use datafusion::logical_expr::col;
3958        let filter =
3959            col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(threshold)), None));
3960
3961        assert!(QueryPredicate::supports_filter(&filter, &model));
3962
3963        let pred = QueryPredicate::from_filters(&[filter], &model);
3964        assert!(matches!(
3965            pred.constraints.get(&0),
3966            Some(PredicateConstraint::UInt64Range {
3967                min: Some(v),
3968                max: None
3969            }) if *v == threshold
3970        ));
3971    }
3972
3973    #[test]
3974    fn unsupported_uint64_comparison_does_not_force_contradiction() {
3975        let config = KvTableConfig::new(
3976            0,
3977            vec![
3978                TableColumnConfig::new("version", DataType::UInt64, false),
3979                TableColumnConfig::new("data", DataType::Utf8, true),
3980            ],
3981            vec!["version".to_string()],
3982            vec![],
3983        )
3984        .unwrap();
3985        let model = TableModel::from_config(&config).unwrap();
3986
3987        use datafusion::logical_expr::col;
3988        let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3989
3990        assert!(
3991            !QueryPredicate::supports_filter(&unsupported, &model),
3992            "negative Int64 literal on UInt64 column should not be pushdown-supported"
3993        );
3994
3995        let pred = QueryPredicate::from_filters(&[unsupported], &model);
3996        assert!(
3997            !pred.contradiction,
3998            "unsupported filter must not collapse scan to empty result"
3999        );
4000        assert!(
4001            pred.constraints.is_empty(),
4002            "unsupported filter must not contribute pushed constraints"
4003        );
4004    }
4005
4006    #[test]
4007    fn unsupported_uint64_comparison_in_and_keeps_supported_sibling() {
4008        let config = KvTableConfig::new(
4009            0,
4010            vec![
4011                TableColumnConfig::new("version", DataType::UInt64, false),
4012                TableColumnConfig::new("data", DataType::Utf8, true),
4013            ],
4014            vec!["version".to_string()],
4015            vec![],
4016        )
4017        .unwrap();
4018        let model = TableModel::from_config(&config).unwrap();
4019
4020        use datafusion::logical_expr::col;
4021        let supported = col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(10)), None));
4022        let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
4023        let filter = supported.and(unsupported);
4024
4025        assert!(
4026            !QueryPredicate::supports_filter(&filter, &model),
4027            "mixed AND should not be marked fully pushdown-supported"
4028        );
4029
4030        let pred = QueryPredicate::from_filters(&[filter], &model);
4031        assert!(!pred.contradiction);
4032        assert!(matches!(
4033            pred.constraints.get(&0),
4034            Some(PredicateConstraint::UInt64Range {
4035                min: Some(10),
4036                max: None
4037            })
4038        ));
4039    }
4040
4041    #[test]
4042    fn uint64_in_list_pushdown() {
4043        let config = KvTableConfig::new(
4044            0,
4045            vec![
4046                TableColumnConfig::new("version", DataType::UInt64, false),
4047                TableColumnConfig::new("data", DataType::Utf8, true),
4048            ],
4049            vec!["version".to_string()],
4050            vec![],
4051        )
4052        .unwrap();
4053        let model = TableModel::from_config(&config).unwrap();
4054
4055        use datafusion::logical_expr::{col, in_list};
4056        let filter = in_list(
4057            col("version"),
4058            vec![
4059                Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4060                Expr::Literal(ScalarValue::UInt64(Some(5)), None),
4061                Expr::Literal(ScalarValue::UInt64(Some(10)), None),
4062            ],
4063            false,
4064        );
4065
4066        assert!(QueryPredicate::supports_filter(&filter, &model));
4067        let pred = QueryPredicate::from_filters(&[filter], &model);
4068        assert!(
4069            matches!(pred.constraints.get(&0), Some(PredicateConstraint::UInt64In(v)) if v.len() == 3),
4070            "should extract UInt64In with 3 values"
4071        );
4072    }
4073
4074    #[test]
4075    fn uint64_in_list_pushdown_supports_values_above_i64_max() {
4076        let config = KvTableConfig::new(
4077            0,
4078            vec![
4079                TableColumnConfig::new("version", DataType::UInt64, false),
4080                TableColumnConfig::new("data", DataType::Utf8, true),
4081            ],
4082            vec!["version".to_string()],
4083            vec![],
4084        )
4085        .unwrap();
4086        let model = TableModel::from_config(&config).unwrap();
4087
4088        let huge = 1u64 << 63;
4089        use datafusion::logical_expr::{col, in_list};
4090        let filter = in_list(
4091            col("version"),
4092            vec![
4093                Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4094                Expr::Literal(ScalarValue::UInt64(Some(huge)), None),
4095            ],
4096            false,
4097        );
4098
4099        assert!(QueryPredicate::supports_filter(&filter, &model));
4100        let pred = QueryPredicate::from_filters(&[filter], &model);
4101        assert!(matches!(
4102            pred.constraints.get(&0),
4103            Some(PredicateConstraint::UInt64In(v)) if v.contains(&huge) && v.len() == 2
4104        ));
4105    }
4106
4107    #[test]
4108    fn fixed_binary_in_list_pushdown() {
4109        let config = KvTableConfig::new(
4110            0,
4111            vec![
4112                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4113                TableColumnConfig::new("data", DataType::Utf8, true),
4114            ],
4115            vec!["entity".to_string()],
4116            vec![],
4117        )
4118        .unwrap();
4119        let model = TableModel::from_config(&config).unwrap();
4120
4121        use datafusion::logical_expr::{col, in_list};
4122        let filter = in_list(
4123            col("entity"),
4124            vec![
4125                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None),
4126                Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xBB; 16])), None),
4127            ],
4128            false,
4129        );
4130
4131        assert!(QueryPredicate::supports_filter(&filter, &model));
4132        let pred = QueryPredicate::from_filters(&[filter], &model);
4133        assert!(
4134            matches!(
4135                pred.constraints.get(&0),
4136                Some(PredicateConstraint::FixedBinaryIn(v)) if v.len() == 2
4137            ),
4138            "should extract FixedBinaryIn with 2 values"
4139        );
4140
4141        // Verify range generation produces 2 ranges (one per entity)
4142        let ranges = pred.primary_key_ranges(&model).unwrap();
4143        assert_eq!(ranges.len(), 2, "should produce one range per entity");
4144    }
4145
4146    #[test]
4147    fn decimal256_range_pushdown() {
4148        let config = KvTableConfig::new(
4149            0,
4150            vec![
4151                TableColumnConfig::new("id", DataType::Int64, false),
4152                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4153            ],
4154            vec!["id".to_string()],
4155            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4156        )
4157        .unwrap();
4158        let model = TableModel::from_config(&config).unwrap();
4159
4160        use datafusion::logical_expr::col;
4161        let filter = col("big_val").gt_eq(Expr::Literal(
4162            ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4163            None,
4164        ));
4165
4166        assert!(
4167            QueryPredicate::supports_filter(&filter, &model),
4168            "Decimal256 range should be supported"
4169        );
4170
4171        let pred = QueryPredicate::from_filters(&[filter], &model);
4172        let big_idx = *model.columns_by_name.get("big_val").unwrap();
4173        assert!(
4174            matches!(
4175                pred.constraints.get(&big_idx),
4176                Some(PredicateConstraint::Decimal256Range {
4177                    min: Some(_),
4178                    max: None
4179                })
4180            ),
4181            "should extract Decimal256Range with min=100, no max"
4182        );
4183
4184        // Verify constraint matching
4185        let val_in = CellValue::Decimal256(i256::from(200i64));
4186        let val_out = CellValue::Decimal256(i256::from(50i64));
4187        let constraint = pred.constraints.get(&big_idx).unwrap();
4188        assert!(matches_constraint(&val_in, constraint));
4189        assert!(!matches_constraint(&val_out, constraint));
4190    }
4191
4192    #[test]
4193    fn uint64_constraint_matching_does_not_wrap_large_values() {
4194        let gt_zero = PredicateConstraint::UInt64Range {
4195            min: Some(1),
4196            max: None,
4197        };
4198        assert!(matches_constraint(&CellValue::UInt64(1u64 << 63), &gt_zero));
4199        assert!(!matches_constraint(&CellValue::UInt64(0), &gt_zero));
4200
4201        let in_list = PredicateConstraint::UInt64In(vec![1, 2, 3]);
4202        assert!(matches_constraint(&CellValue::UInt64(2), &in_list));
4203        assert!(!matches_constraint(
4204            &CellValue::UInt64(1u64 << 63),
4205            &in_list
4206        ));
4207    }
4208
4209    #[test]
4210    fn uint64_empty_range_produces_no_pk_ranges() {
4211        let config = KvTableConfig::new(
4212            0,
4213            vec![TableColumnConfig::new("version", DataType::UInt64, false)],
4214            vec!["version".to_string()],
4215            vec![],
4216        )
4217        .unwrap();
4218        let model = TableModel::from_config(&config).unwrap();
4219        let mut pred = QueryPredicate::default();
4220        pred.constraints.insert(
4221            0,
4222            PredicateConstraint::UInt64Range {
4223                min: Some(10),
4224                max: Some(9),
4225            },
4226        );
4227
4228        let ranges = pred.primary_key_ranges(&model).unwrap();
4229        assert!(ranges.is_empty());
4230    }
4231
4232    #[test]
4233    fn utf8_primary_key_encoding_supports_unicode_and_long_values() {
4234        let config = KvTableConfig::new(
4235            0,
4236            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4237            vec!["id".to_string()],
4238            vec![],
4239        )
4240        .unwrap();
4241        let model = TableModel::from_config(&config).unwrap();
4242
4243        let row_non_ascii = KvRow {
4244            values: vec![CellValue::Utf8("naive-cafe-e9".replace("e9", "\u{00E9}"))],
4245        };
4246        let key_non_ascii = encode_primary_key_from_row(model.table_prefix, &row_non_ascii, &model)
4247            .expect("non-ascii PK should encode");
4248        let decoded_non_ascii = decode_primary_key(model.table_prefix, &key_non_ascii, &model)
4249            .expect("non-ascii PK should decode");
4250        assert!(matches!(
4251            decoded_non_ascii.as_slice(),
4252            [CellValue::Utf8(value)] if value == "naive-cafe-\u{00E9}"
4253        ));
4254
4255        let row_too_long = KvRow {
4256            values: vec![CellValue::Utf8("abcdefghijklmnopq".to_string())],
4257        };
4258        let key_too_long = encode_primary_key_from_row(model.table_prefix, &row_too_long, &model)
4259            .expect("long UTF-8 PK should encode");
4260        let decoded_too_long = decode_primary_key(model.table_prefix, &key_too_long, &model)
4261            .expect("long UTF-8 PK should decode");
4262        assert!(matches!(
4263            decoded_too_long.as_slice(),
4264            [CellValue::Utf8(value)] if value == "abcdefghijklmnopq"
4265        ));
4266    }
4267
4268    #[test]
4269    fn utf8_primary_key_encodes_at_max_codec_payload_and_rejects_overflow() {
4270        let config = KvTableConfig::new(
4271            0,
4272            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4273            vec!["id".to_string()],
4274            vec![],
4275        )
4276        .unwrap();
4277        let model = TableModel::from_config(&config).unwrap();
4278        let max_payload = model.primary_key_codec.payload_capacity_bytes();
4279        let max_value = "a".repeat(max_payload - 1);
4280        let overflow_value = "a".repeat(max_payload);
4281
4282        let key = encode_primary_key_from_row(
4283            model.table_prefix,
4284            &KvRow {
4285                values: vec![CellValue::Utf8(max_value.clone())],
4286            },
4287            &model,
4288        )
4289        .expect("max-length UTF-8 PK should encode");
4290        assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4291        let decoded = decode_primary_key(model.table_prefix, &key, &model)
4292            .expect("max-length PK should decode");
4293        assert!(matches!(
4294            decoded.as_slice(),
4295            [CellValue::Utf8(value)] if value == &max_value
4296        ));
4297
4298        let err = encode_primary_key_from_row(
4299            model.table_prefix,
4300            &KvRow {
4301                values: vec![CellValue::Utf8(overflow_value)],
4302            },
4303            &model,
4304        )
4305        .expect_err("UTF-8 PK exceeding codec payload should be rejected");
4306        assert!(err.contains("primary key payload exceeds codec payload capacity 253 bytes"));
4307    }
4308
4309    #[test]
4310    fn utf8_primary_key_round_trips_embedded_nul() {
4311        let config = KvTableConfig::new(
4312            0,
4313            vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4314            vec!["id".to_string()],
4315            vec![],
4316        )
4317        .unwrap();
4318        let model = TableModel::from_config(&config).unwrap();
4319        let row = KvRow {
4320            values: vec![CellValue::Utf8("AB\0CD".to_string())],
4321        };
4322
4323        let key = encode_primary_key_from_row(model.table_prefix, &row, &model)
4324            .expect("embedded NUL in key text must encode");
4325        let decoded =
4326            decode_primary_key(model.table_prefix, &key, &model).expect("embedded NUL must decode");
4327        assert!(matches!(
4328            decoded.as_slice(),
4329            [CellValue::Utf8(value)] if value == "AB\0CD"
4330        ));
4331    }
4332
4333    #[test]
4334    fn utf8_index_key_round_trips_embedded_nul() {
4335        let config = KvTableConfig::new(
4336            0,
4337            vec![
4338                TableColumnConfig::new("id", DataType::Int64, false),
4339                TableColumnConfig::new("tag", DataType::Utf8, false),
4340            ],
4341            vec!["id".to_string()],
4342            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4343        )
4344        .unwrap();
4345        let model = TableModel::from_config(&config).unwrap();
4346        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4347        let row = KvRow {
4348            values: vec![CellValue::Int64(1), CellValue::Utf8("AB\0CD".to_string())],
4349        };
4350
4351        let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
4352            .expect("embedded NUL in index key text must encode");
4353        let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
4354            .expect("embedded NUL index key must decode");
4355        assert!(matches!(
4356            decoded.values.get(&1),
4357            Some(CellValue::Utf8(value)) if value == "AB\0CD"
4358        ));
4359    }
4360
4361    #[test]
4362    fn secondary_index_with_long_utf8_primary_key_encodes_at_max_payload_and_rejects_overflow() {
4363        let config = KvTableConfig::new(
4364            0,
4365            vec![
4366                TableColumnConfig::new("id", DataType::Utf8, false),
4367                TableColumnConfig::new("tag", DataType::Utf8, false),
4368            ],
4369            vec!["id".to_string()],
4370            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4371        )
4372        .unwrap();
4373        let model = TableModel::from_config(&config).unwrap();
4374        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4375        let spec = &specs[0];
4376        let max_payload = spec.codec.payload_capacity_bytes();
4377        let max_tag = "t".to_string();
4378        let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4379        let overflow_id = format!("{max_id}x");
4380
4381        let key = encode_secondary_index_key(
4382            model.table_prefix,
4383            spec,
4384            &model,
4385            &KvRow {
4386                values: vec![
4387                    CellValue::Utf8(max_id.clone()),
4388                    CellValue::Utf8(max_tag.clone()),
4389                ],
4390            },
4391        )
4392        .expect("secondary key at max payload should encode");
4393        assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4394        let decoded =
4395            decode_secondary_index_key(model.table_prefix, spec, &model, &key).expect("decode");
4396        assert!(matches!(
4397            decoded.values.get(&1),
4398            Some(CellValue::Utf8(value)) if value == &max_tag
4399        ));
4400        assert!(matches!(
4401            decoded.primary_key_values.as_slice(),
4402            [CellValue::Utf8(value)] if value == &max_id
4403        ));
4404
4405        let err = encode_secondary_index_key(
4406            model.table_prefix,
4407            spec,
4408            &model,
4409            &KvRow {
4410                values: vec![CellValue::Utf8(overflow_id), CellValue::Utf8(max_tag)],
4411            },
4412        )
4413        .expect_err("secondary key exceeding max payload should be rejected");
4414        assert!(err.contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4415    }
4416
4417    #[test]
4418    fn secondary_index_from_parts_with_long_utf8_primary_key_rejects_overflow() {
4419        let config = KvTableConfig::new(
4420            0,
4421            vec![
4422                TableColumnConfig::new("id", DataType::Utf8, false),
4423                TableColumnConfig::new("tag", DataType::Utf8, false),
4424            ],
4425            vec!["id".to_string()],
4426            vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4427        )
4428        .unwrap();
4429        let model = TableModel::from_config(&config).unwrap();
4430        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4431        let spec = &specs[0];
4432        let max_payload = spec.codec.payload_capacity_bytes();
4433        let max_tag = "t".to_string();
4434        let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4435        let overflow_id = format!("{max_id}x");
4436        let max_row = KvRow {
4437            values: vec![
4438                CellValue::Utf8(max_id.clone()),
4439                CellValue::Utf8(max_tag.clone()),
4440            ],
4441        };
4442        let encoded_row = encode_base_row_value(&max_row, &model).expect("encode row");
4443        let archived = decode_stored_row(&encoded_row).expect("archive row");
4444
4445        let key = encode_secondary_index_key_from_parts(
4446            model.table_prefix,
4447            spec,
4448            &model,
4449            &[CellValue::Utf8(max_id.clone())],
4450            &archived,
4451        )
4452        .expect("backfill path should encode max payload");
4453        assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4454
4455        let err = encode_secondary_index_key_from_parts(
4456            model.table_prefix,
4457            spec,
4458            &model,
4459            &[CellValue::Utf8(overflow_id)],
4460            &archived,
4461        )
4462        .expect_err("backfill path overflow should be rejected");
4463        assert!(err
4464            .to_string()
4465            .contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4466    }
4467
4468    #[test]
4469    fn primary_key_type_mismatch_returns_error_instead_of_panicking() {
4470        let config = KvTableConfig::new(
4471            0,
4472            vec![TableColumnConfig::new("id", DataType::UInt64, false)],
4473            vec!["id".to_string()],
4474            vec![],
4475        )
4476        .unwrap();
4477        let model = TableModel::from_config(&config).unwrap();
4478        let row = KvRow {
4479            values: vec![CellValue::Int64(7)],
4480        };
4481
4482        let err = encode_primary_key_from_row(model.table_prefix, &row, &model)
4483            .expect_err("mismatched PK type should return an error");
4484        assert!(err.contains("type mismatch while encoding key value"));
4485    }
4486
4487    #[test]
4488    fn choose_index_plan_uses_fixed_binary_leading_constraint() {
4489        let config = KvTableConfig::new(
4490            0,
4491            vec![
4492                TableColumnConfig::new("id", DataType::Int64, false),
4493                TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4494            ],
4495            vec!["id".to_string()],
4496            vec![IndexSpec::new("entity_idx", vec!["entity".to_string()]).unwrap()],
4497        )
4498        .unwrap();
4499        let model = TableModel::from_config(&config).unwrap();
4500        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4501
4502        use datafusion::logical_expr::col;
4503        let filter = col("entity").eq(Expr::Literal(
4504            ScalarValue::FixedSizeBinary(16, Some(vec![0xAB; 16])),
4505            None,
4506        ));
4507        let pred = QueryPredicate::from_filters(&[filter], &model);
4508        let plan = pred
4509            .choose_index_plan(&model, &specs)
4510            .unwrap()
4511            .expect("fixed-binary equality should choose an index");
4512
4513        assert_eq!(plan.constrained_prefix_len, 1);
4514        assert_eq!(plan.ranges.len(), 1);
4515        let range = &plan.ranges[0];
4516        assert_eq!(
4517            index_payload(&specs[0], &range.start, 0, 16),
4518            vec![0xAB; 16]
4519        );
4520        assert_eq!(index_payload(&specs[0], &range.end, 0, 16), vec![0xAB; 16]);
4521    }
4522
4523    #[test]
4524    fn choose_index_plan_uses_decimal256_leading_constraint() {
4525        let config = KvTableConfig::new(
4526            0,
4527            vec![
4528                TableColumnConfig::new("id", DataType::Int64, false),
4529                TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4530            ],
4531            vec!["id".to_string()],
4532            vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4533        )
4534        .unwrap();
4535        let model = TableModel::from_config(&config).unwrap();
4536        let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4537
4538        use datafusion::logical_expr::col;
4539        let filter = col("big_val").gt_eq(Expr::Literal(
4540            ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4541            None,
4542        ));
4543        let pred = QueryPredicate::from_filters(&[filter], &model);
4544        let plan = pred
4545            .choose_index_plan(&model, &specs)
4546            .unwrap()
4547            .expect("decimal256 range should choose an index");
4548
4549        assert_eq!(plan.constrained_prefix_len, 1);
4550        assert_eq!(plan.ranges.len(), 1);
4551        let range = &plan.ranges[0];
4552        assert_eq!(
4553            index_payload(&specs[0], &range.start, 0, 32),
4554            encode_i256_ordered(i256::from(100i64)).to_vec()
4555        );
4556    }
4557
4558    #[tokio::test]
4559    async fn backfill_added_indexes_writes_entries_for_existing_rows() {
4560        let state = MockState {
4561            kv: Arc::new(Mutex::new(BTreeMap::new())),
4562            range_calls: Arc::new(AtomicUsize::new(0)),
4563            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4564            sequence_number: Arc::new(AtomicU64::new(0)),
4565        };
4566        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4567        let client = StoreClient::new(&base_url);
4568
4569        let seed_schema = KvSchema::new(client.clone())
4570            .table(
4571                "orders",
4572                vec![
4573                    TableColumnConfig::new("id", DataType::Int64, false),
4574                    TableColumnConfig::new("status", DataType::Utf8, false),
4575                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4576                ],
4577                vec!["id".to_string()],
4578                vec![],
4579            )
4580            .expect("seed schema");
4581        let mut writer = seed_schema.batch_writer();
4582        for i in 0..6i64 {
4583            writer
4584                .insert(
4585                    "orders",
4586                    vec![
4587                        CellValue::Int64(i),
4588                        CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
4589                        CellValue::Int64(i * 10),
4590                    ],
4591                )
4592                .expect("seed row");
4593        }
4594        writer.flush().await.expect("seed flush");
4595
4596        {
4597            let guard = state.kv.lock().expect("kv mutex poisoned");
4598            let base_rows = guard
4599                .keys()
4600                .filter(|key| matches_primary_key(0, key))
4601                .count();
4602            let index_rows = guard
4603                .keys()
4604                .filter(|key| matches_secondary_index_key(0, 1, key))
4605                .count();
4606            assert_eq!(base_rows, 6);
4607            assert_eq!(index_rows, 0);
4608        }
4609
4610        let backfill_schema = KvSchema::new(client.clone())
4611            .table(
4612                "orders",
4613                vec![
4614                    TableColumnConfig::new("id", DataType::Int64, false),
4615                    TableColumnConfig::new("status", DataType::Utf8, false),
4616                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4617                ],
4618                vec!["id".to_string()],
4619                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
4620                    .expect("valid index")
4621                    .with_cover_columns(vec!["amount_cents".to_string()])],
4622            )
4623            .expect("backfill schema");
4624        let report = backfill_schema
4625            .backfill_added_indexes_with_options(
4626                "orders",
4627                &[],
4628                IndexBackfillOptions {
4629                    row_batch_size: 2,
4630                    start_from_primary_key: None,
4631                },
4632            )
4633            .await
4634            .expect("backfill should succeed");
4635        assert_eq!(report.scanned_rows, 6);
4636        assert_eq!(report.indexes_backfilled, 1);
4637        assert_eq!(report.index_entries_written, 6);
4638
4639        {
4640            let guard = state.kv.lock().expect("kv mutex poisoned");
4641            let index_rows = guard
4642                .keys()
4643                .filter(|key| matches_secondary_index_key(0, 1, key))
4644                .count();
4645            assert_eq!(index_rows, 6);
4646            let (_, sample_value) = guard
4647                .iter()
4648                .find(|(key, _)| matches_secondary_index_key(0, 1, key))
4649                .expect("backfill should create index entry");
4650            let archived = decode_stored_row(sample_value.as_ref())
4651                .expect("covering value must be valid codec");
4652            assert_eq!(archived.values.len(), 3);
4653        }
4654
4655        let _ = shutdown_tx.send(());
4656    }
4657
4658    #[tokio::test]
4659    async fn backfill_added_indexes_writes_zorder_entries_for_existing_rows() {
4660        let state = MockState {
4661            kv: Arc::new(Mutex::new(BTreeMap::new())),
4662            range_calls: Arc::new(AtomicUsize::new(0)),
4663            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4664            sequence_number: Arc::new(AtomicU64::new(0)),
4665        };
4666        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4667        let client = StoreClient::new(&base_url);
4668
4669        let seed_schema = KvSchema::new(client.clone())
4670            .table(
4671                "points",
4672                vec![
4673                    TableColumnConfig::new("x", DataType::Int64, false),
4674                    TableColumnConfig::new("y", DataType::Int64, false),
4675                    TableColumnConfig::new("id", DataType::Int64, false),
4676                    TableColumnConfig::new("value", DataType::Int64, false),
4677                ],
4678                vec!["id".to_string()],
4679                vec![],
4680            )
4681            .expect("seed schema");
4682        let mut writer = seed_schema.batch_writer();
4683        for (x, y, id, value) in [(1, 1, 11, 110), (1, 2, 12, 120), (2, 1, 21, 210)] {
4684            writer
4685                .insert(
4686                    "points",
4687                    vec![
4688                        CellValue::Int64(x),
4689                        CellValue::Int64(y),
4690                        CellValue::Int64(id),
4691                        CellValue::Int64(value),
4692                    ],
4693                )
4694                .expect("seed row");
4695        }
4696        writer.flush().await.expect("seed flush");
4697
4698        let backfill_schema = KvSchema::new(client.clone())
4699            .table(
4700                "points",
4701                vec![
4702                    TableColumnConfig::new("x", DataType::Int64, false),
4703                    TableColumnConfig::new("y", DataType::Int64, false),
4704                    TableColumnConfig::new("id", DataType::Int64, false),
4705                    TableColumnConfig::new("value", DataType::Int64, false),
4706                ],
4707                vec!["id".to_string()],
4708                vec![
4709                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
4710                        .expect("valid index")
4711                        .with_cover_columns(vec!["value".to_string()]),
4712                ],
4713            )
4714            .expect("backfill schema");
4715        let report = backfill_schema
4716            .backfill_added_indexes_with_options(
4717                "points",
4718                &[],
4719                IndexBackfillOptions {
4720                    row_batch_size: 2,
4721                    start_from_primary_key: None,
4722                },
4723            )
4724            .await
4725            .expect("backfill should succeed");
4726        assert_eq!(report.scanned_rows, 3);
4727        assert_eq!(report.index_entries_written, 3);
4728
4729        let guard = state.kv.lock().expect("kv mutex poisoned");
4730        let index_entry = guard
4731            .keys()
4732            .find(|key| matches_secondary_index_key(0, 1, key))
4733            .cloned()
4734            .expect("z-order backfill should create index entry");
4735        let config = KvTableConfig::new(
4736            0,
4737            vec![
4738                TableColumnConfig::new("x", DataType::Int64, false),
4739                TableColumnConfig::new("y", DataType::Int64, false),
4740                TableColumnConfig::new("id", DataType::Int64, false),
4741                TableColumnConfig::new("value", DataType::Int64, false),
4742            ],
4743            vec!["id".to_string()],
4744            vec![
4745                IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()]).expect("valid"),
4746            ],
4747        )
4748        .expect("config");
4749        let model = TableModel::from_config(&config).expect("model");
4750        let spec = model
4751            .resolve_index_specs(&config.index_specs)
4752            .expect("specs")
4753            .remove(0);
4754        let decoded = decode_secondary_index_key(model.table_prefix, &spec, &model, &index_entry)
4755            .expect("decode z-order key");
4756        let x_idx = *model.columns_by_name.get("x").unwrap();
4757        let y_idx = *model.columns_by_name.get("y").unwrap();
4758        assert!(matches!(
4759            decoded.values.get(&x_idx),
4760            Some(CellValue::Int64(_))
4761        ));
4762        assert!(matches!(
4763            decoded.values.get(&y_idx),
4764            Some(CellValue::Int64(_))
4765        ));
4766
4767        let _ = shutdown_tx.send(());
4768    }
4769
4770    #[tokio::test]
4771    async fn backfill_added_indexes_requires_append_only_index_evolution() {
4772        let client = StoreClient::new("http://127.0.0.1:1");
4773        let schema = KvSchema::new(client)
4774            .table(
4775                "orders",
4776                vec![
4777                    TableColumnConfig::new("id", DataType::Int64, false),
4778                    TableColumnConfig::new("status", DataType::Utf8, false),
4779                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4780                ],
4781                vec!["id".to_string()],
4782                vec![
4783                    IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid"),
4784                    IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid"),
4785                ],
4786            )
4787            .expect("schema");
4788
4789        let previous_specs =
4790            vec![IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid")];
4791        let err = schema
4792            .backfill_added_indexes("orders", &previous_specs)
4793            .await
4794            .expect_err("non-append-only evolution should be rejected");
4795        assert!(err
4796            .to_string()
4797            .contains("index evolution must be append-only"));
4798    }
4799
4800    #[tokio::test]
4801    async fn backfill_added_indexes_is_noop_when_no_new_indexes() {
4802        let client = StoreClient::new("http://127.0.0.1:1");
4803        let existing = IndexSpec::new("status_idx", vec!["status".to_string()])
4804            .expect("valid")
4805            .with_cover_columns(vec!["amount_cents".to_string()]);
4806        let schema = KvSchema::new(client)
4807            .table(
4808                "orders",
4809                vec![
4810                    TableColumnConfig::new("id", DataType::Int64, false),
4811                    TableColumnConfig::new("status", DataType::Utf8, false),
4812                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4813                ],
4814                vec!["id".to_string()],
4815                vec![existing.clone()],
4816            )
4817            .expect("schema");
4818
4819        let report = schema
4820            .backfill_added_indexes("orders", &[existing])
4821            .await
4822            .expect("no-op backfill should succeed");
4823        assert_eq!(report, IndexBackfillReport::default());
4824    }
4825
4826    #[tokio::test]
4827    async fn backfill_added_indexes_rejects_zero_row_batch_size() {
4828        let client = StoreClient::new("http://127.0.0.1:1");
4829        let schema = KvSchema::new(client)
4830            .table(
4831                "orders",
4832                vec![
4833                    TableColumnConfig::new("id", DataType::Int64, false),
4834                    TableColumnConfig::new("status", DataType::Utf8, false),
4835                ],
4836                vec!["id".to_string()],
4837                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4838            )
4839            .expect("schema");
4840        let err = schema
4841            .backfill_added_indexes_with_options(
4842                "orders",
4843                &[],
4844                IndexBackfillOptions {
4845                    row_batch_size: 0,
4846                    start_from_primary_key: None,
4847                },
4848            )
4849            .await
4850            .expect_err("row_batch_size=0 should fail");
4851        assert!(err.to_string().contains("row_batch_size must be > 0"));
4852    }
4853
4854    #[tokio::test]
4855    async fn backfill_added_indexes_emits_progress_events() {
4856        let state = MockState {
4857            kv: Arc::new(Mutex::new(BTreeMap::new())),
4858            range_calls: Arc::new(AtomicUsize::new(0)),
4859            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4860            sequence_number: Arc::new(AtomicU64::new(0)),
4861        };
4862        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4863        let client = StoreClient::new(&base_url);
4864
4865        let seed_schema = KvSchema::new(client.clone())
4866            .table(
4867                "orders",
4868                vec![
4869                    TableColumnConfig::new("id", DataType::Int64, false),
4870                    TableColumnConfig::new("status", DataType::Utf8, false),
4871                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4872                ],
4873                vec!["id".to_string()],
4874                vec![],
4875            )
4876            .expect("seed schema");
4877        let mut writer = seed_schema.batch_writer();
4878        for i in 0..5i64 {
4879            writer
4880                .insert(
4881                    "orders",
4882                    vec![
4883                        CellValue::Int64(i),
4884                        CellValue::Utf8("open".to_string()),
4885                        CellValue::Int64(i * 10),
4886                    ],
4887                )
4888                .expect("seed row");
4889        }
4890        writer.flush().await.expect("seed flush");
4891
4892        let backfill_schema = KvSchema::new(client.clone())
4893            .table(
4894                "orders",
4895                vec![
4896                    TableColumnConfig::new("id", DataType::Int64, false),
4897                    TableColumnConfig::new("status", DataType::Utf8, false),
4898                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4899                ],
4900                vec!["id".to_string()],
4901                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4902            )
4903            .expect("backfill schema");
4904
4905        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
4906        let report = backfill_schema
4907            .backfill_added_indexes_with_options_and_progress(
4908                "orders",
4909                &[],
4910                IndexBackfillOptions {
4911                    row_batch_size: 2,
4912                    start_from_primary_key: None,
4913                },
4914                Some(&progress_tx),
4915            )
4916            .await
4917            .expect("backfill should succeed");
4918        drop(progress_tx);
4919
4920        let mut saw_started = false;
4921        let mut saw_completed = false;
4922        let mut progress_events = 0usize;
4923        while let Some(event) = progress_rx.recv().await {
4924            match event {
4925                IndexBackfillEvent::Started {
4926                    table_name,
4927                    indexes_backfilled,
4928                    row_batch_size,
4929                    ..
4930                } => {
4931                    saw_started = true;
4932                    assert_eq!(table_name, "orders");
4933                    assert_eq!(indexes_backfilled, 1);
4934                    assert_eq!(row_batch_size, 2);
4935                }
4936                IndexBackfillEvent::Progress {
4937                    scanned_rows,
4938                    index_entries_written,
4939                    ..
4940                } => {
4941                    progress_events += 1;
4942                    assert!(scanned_rows >= 1);
4943                    assert_eq!(scanned_rows, index_entries_written);
4944                }
4945                IndexBackfillEvent::Completed {
4946                    report: completed_report,
4947                } => {
4948                    saw_completed = true;
4949                    assert_eq!(completed_report, report);
4950                }
4951            }
4952        }
4953        assert!(saw_started);
4954        assert!(saw_completed);
4955        assert!(progress_events >= 1);
4956
4957        let _ = shutdown_tx.send(());
4958    }
4959
4960    #[tokio::test]
4961    async fn backfill_added_indexes_can_resume_from_primary_key() {
4962        let state = MockState {
4963            kv: Arc::new(Mutex::new(BTreeMap::new())),
4964            range_calls: Arc::new(AtomicUsize::new(0)),
4965            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4966            sequence_number: Arc::new(AtomicU64::new(0)),
4967        };
4968        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4969        let client = StoreClient::new(&base_url);
4970
4971        let seed_schema = KvSchema::new(client.clone())
4972            .table(
4973                "orders",
4974                vec![
4975                    TableColumnConfig::new("id", DataType::Int64, false),
4976                    TableColumnConfig::new("status", DataType::Utf8, false),
4977                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
4978                ],
4979                vec!["id".to_string()],
4980                vec![],
4981            )
4982            .expect("seed schema");
4983        let mut writer = seed_schema.batch_writer();
4984        for i in 0..6i64 {
4985            writer
4986                .insert(
4987                    "orders",
4988                    vec![
4989                        CellValue::Int64(i),
4990                        CellValue::Utf8("open".to_string()),
4991                        CellValue::Int64(i * 10),
4992                    ],
4993                )
4994                .expect("seed row");
4995        }
4996        writer.flush().await.expect("seed flush");
4997
4998        let backfill_schema = KvSchema::new(client.clone())
4999            .table(
5000                "orders",
5001                vec![
5002                    TableColumnConfig::new("id", DataType::Int64, false),
5003                    TableColumnConfig::new("status", DataType::Utf8, false),
5004                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5005                ],
5006                vec!["id".to_string()],
5007                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5008            )
5009            .expect("backfill schema");
5010
5011        let config = KvTableConfig::new(
5012            0,
5013            vec![
5014                TableColumnConfig::new("id", DataType::Int64, false),
5015                TableColumnConfig::new("status", DataType::Utf8, false),
5016                TableColumnConfig::new("amount_cents", DataType::Int64, false),
5017            ],
5018            vec!["id".to_string()],
5019            vec![],
5020        )
5021        .expect("valid config");
5022        let model = TableModel::from_config(&config).expect("model");
5023        let resume_value = CellValue::Int64(3);
5024        let resume_key =
5025            encode_primary_key(model.table_prefix, &[&resume_value], &model).expect("resume key");
5026
5027        let report = backfill_schema
5028            .backfill_added_indexes_with_options(
5029                "orders",
5030                &[],
5031                IndexBackfillOptions {
5032                    row_batch_size: 2,
5033                    start_from_primary_key: Some(resume_key.clone()),
5034                },
5035            )
5036            .await
5037            .expect("resume backfill should succeed");
5038        assert_eq!(report.scanned_rows, 3);
5039        assert_eq!(report.index_entries_written, 3);
5040
5041        {
5042            let guard = state.kv.lock().expect("kv mutex poisoned");
5043            let index_rows = guard
5044                .keys()
5045                .filter(|key| matches_secondary_index_key(0, 1, key))
5046                .count();
5047            assert_eq!(index_rows, 3);
5048        }
5049
5050        let resume_payload = model
5051            .primary_key_codec
5052            .read_payload(&resume_key, 0, model.primary_key_width)
5053            .expect("resume payload");
5054        let wrong_prefix = secondary_index_codec(model.table_prefix, 1)
5055            .expect("secondary codec")
5056            .encode(&resume_payload)
5057            .expect("wrong prefix key");
5058        let err = backfill_schema
5059            .backfill_added_indexes_with_options(
5060                "orders",
5061                &[],
5062                IndexBackfillOptions {
5063                    row_batch_size: 2,
5064                    start_from_primary_key: Some(wrong_prefix),
5065                },
5066            )
5067            .await
5068            .expect_err("wrong key prefix must be rejected");
5069        assert!(err.to_string().contains("primary-key prefix"));
5070
5071        let _ = shutdown_tx.send(());
5072    }
5073
5074    #[tokio::test]
5075    async fn covering_index_scan_fails_closed_when_covering_payload_missing() {
5076        let state = MockState {
5077            kv: Arc::new(Mutex::new(BTreeMap::new())),
5078            range_calls: Arc::new(AtomicUsize::new(0)),
5079            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5080            sequence_number: Arc::new(AtomicU64::new(0)),
5081        };
5082        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5083        let client = StoreClient::new(&base_url);
5084
5085        let schema = KvSchema::new(client.clone())
5086            .table(
5087                "orders",
5088                vec![
5089                    TableColumnConfig::new("id", DataType::Int64, false),
5090                    TableColumnConfig::new("status", DataType::Utf8, false),
5091                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5092                ],
5093                vec!["id".to_string()],
5094                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5095                    .expect("valid")
5096                    .with_cover_columns(vec!["amount_cents".to_string()])],
5097            )
5098            .expect("schema");
5099        let mut writer = schema.batch_writer();
5100        for id in 0..4i64 {
5101            writer
5102                .insert(
5103                    "orders",
5104                    vec![
5105                        CellValue::Int64(id),
5106                        CellValue::Utf8("open".to_string()),
5107                        CellValue::Int64(id * 10),
5108                    ],
5109                )
5110                .expect("row");
5111        }
5112        writer.flush().await.expect("flush");
5113
5114        {
5115            let mut guard = state.kv.lock().expect("kv mutex poisoned");
5116            let key = guard
5117                .keys()
5118                .find(|key| matches_secondary_index_key(0, 1, key))
5119                .expect("index row should exist")
5120                .clone();
5121            guard.insert(key, Bytes::new());
5122        }
5123
5124        let ctx = SessionContext::new();
5125        schema.register_all(&ctx).expect("register");
5126        let df = ctx
5127            .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5128            .await
5129            .expect("query should plan");
5130        let err = df
5131            .collect()
5132            .await
5133            .expect_err("missing covering payload must fail closed");
5134        assert!(err
5135            .to_string()
5136            .contains("secondary index entry missing covering payload"));
5137
5138        let _ = shutdown_tx.send(());
5139    }
5140
5141    #[tokio::test]
5142    async fn covering_index_scan_fails_closed_when_covering_payload_is_corrupt() {
5143        let state = MockState {
5144            kv: Arc::new(Mutex::new(BTreeMap::new())),
5145            range_calls: Arc::new(AtomicUsize::new(0)),
5146            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5147            sequence_number: Arc::new(AtomicU64::new(0)),
5148        };
5149        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5150        let client = StoreClient::new(&base_url);
5151
5152        let schema = KvSchema::new(client.clone())
5153            .table(
5154                "orders",
5155                vec![
5156                    TableColumnConfig::new("id", DataType::Int64, false),
5157                    TableColumnConfig::new("status", DataType::Utf8, false),
5158                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5159                ],
5160                vec!["id".to_string()],
5161                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5162                    .expect("valid")
5163                    .with_cover_columns(vec!["amount_cents".to_string()])],
5164            )
5165            .expect("schema");
5166        let mut writer = schema.batch_writer();
5167        for id in 0..4i64 {
5168            writer
5169                .insert(
5170                    "orders",
5171                    vec![
5172                        CellValue::Int64(id),
5173                        CellValue::Utf8("open".to_string()),
5174                        CellValue::Int64(id * 10),
5175                    ],
5176                )
5177                .expect("row");
5178        }
5179        writer.flush().await.expect("flush");
5180
5181        {
5182            let mut guard = state.kv.lock().expect("kv mutex poisoned");
5183            let key = guard
5184                .keys()
5185                .find(|key| matches_secondary_index_key(0, 1, key))
5186                .expect("index row should exist")
5187                .clone();
5188            guard.insert(key, Bytes::from_static(b"not-codec"));
5189        }
5190
5191        let ctx = SessionContext::new();
5192        schema.register_all(&ctx).expect("register");
5193        let df = ctx
5194            .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5195            .await
5196            .expect("query should plan");
5197        let err = df
5198            .collect()
5199            .await
5200            .expect_err("corrupt covering payload must fail closed");
5201        assert!(err.to_string().contains("invalid covering index payload"));
5202
5203        let _ = shutdown_tx.send(());
5204    }
5205
5206    #[tokio::test]
5207    async fn non_covering_index_uses_point_lookup_instead_of_full_scan() {
5208        let state = MockState {
5209            kv: Arc::new(Mutex::new(BTreeMap::new())),
5210            range_calls: Arc::new(AtomicUsize::new(0)),
5211            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5212            sequence_number: Arc::new(AtomicU64::new(0)),
5213        };
5214        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5215        let client = StoreClient::new(&base_url);
5216
5217        let schema = KvSchema::new(client.clone())
5218            .table(
5219                "orders",
5220                vec![
5221                    TableColumnConfig::new("id", DataType::Int64, false),
5222                    TableColumnConfig::new("status", DataType::Utf8, false),
5223                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5224                    TableColumnConfig::new("notes", DataType::Utf8, true),
5225                ],
5226                vec!["id".to_string()],
5227                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5228            )
5229            .expect("schema");
5230        let mut writer = schema.batch_writer();
5231        writer
5232            .insert(
5233                "orders",
5234                vec![
5235                    CellValue::Int64(1),
5236                    CellValue::Utf8("open".to_string()),
5237                    CellValue::Int64(100),
5238                    CellValue::Utf8("first".to_string()),
5239                ],
5240            )
5241            .expect("row");
5242        writer
5243            .insert(
5244                "orders",
5245                vec![
5246                    CellValue::Int64(2),
5247                    CellValue::Utf8("closed".to_string()),
5248                    CellValue::Int64(200),
5249                    CellValue::Utf8("second".to_string()),
5250                ],
5251            )
5252            .expect("row");
5253        writer
5254            .insert(
5255                "orders",
5256                vec![
5257                    CellValue::Int64(3),
5258                    CellValue::Utf8("open".to_string()),
5259                    CellValue::Int64(300),
5260                    CellValue::Utf8("third".to_string()),
5261                ],
5262            )
5263            .expect("row");
5264        writer.flush().await.expect("flush");
5265
5266        let ctx = SessionContext::new();
5267        schema.register_all(&ctx).expect("register");
5268
5269        let df = ctx
5270            .sql("SELECT id, notes FROM orders WHERE status = 'open' ORDER BY id")
5271            .await
5272            .expect("plan");
5273        let batches = df.collect().await.expect("non-covering index lookup");
5274        let ids: Vec<i64> = batches
5275            .iter()
5276            .flat_map(|b| {
5277                b.column(0)
5278                    .as_any()
5279                    .downcast_ref::<datafusion::arrow::array::Int64Array>()
5280                    .unwrap()
5281                    .iter()
5282                    .map(|v| v.unwrap())
5283            })
5284            .collect();
5285        let notes: Vec<String> = batches
5286            .iter()
5287            .flat_map(|b| {
5288                b.column(1)
5289                    .as_any()
5290                    .downcast_ref::<datafusion::arrow::array::StringArray>()
5291                    .unwrap()
5292                    .iter()
5293                    .map(|v| v.unwrap().to_string())
5294            })
5295            .collect();
5296        assert_eq!(ids, vec![1, 3]);
5297        assert_eq!(notes, vec!["first", "third"]);
5298
5299        let _ = shutdown_tx.send(());
5300    }
5301
5302    #[tokio::test]
5303    async fn backfill_resume_cursor_can_continue_without_skips_or_duplicates() {
5304        let state = MockState {
5305            kv: Arc::new(Mutex::new(BTreeMap::new())),
5306            range_calls: Arc::new(AtomicUsize::new(0)),
5307            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5308            sequence_number: Arc::new(AtomicU64::new(0)),
5309        };
5310        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5311        let client = StoreClient::new(&base_url);
5312
5313        let seed_schema = KvSchema::new(client.clone())
5314            .table(
5315                "orders",
5316                vec![
5317                    TableColumnConfig::new("id", DataType::Int64, false),
5318                    TableColumnConfig::new("status", DataType::Utf8, false),
5319                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5320                ],
5321                vec!["id".to_string()],
5322                vec![],
5323            )
5324            .expect("seed schema");
5325        let mut writer = seed_schema.batch_writer();
5326        for i in 0..8i64 {
5327            writer
5328                .insert(
5329                    "orders",
5330                    vec![
5331                        CellValue::Int64(i),
5332                        CellValue::Utf8("open".to_string()),
5333                        CellValue::Int64(i * 10),
5334                    ],
5335                )
5336                .expect("seed row");
5337        }
5338        writer.flush().await.expect("seed flush");
5339
5340        let backfill_schema = KvSchema::new(client.clone())
5341            .table(
5342                "orders",
5343                vec![
5344                    TableColumnConfig::new("id", DataType::Int64, false),
5345                    TableColumnConfig::new("status", DataType::Utf8, false),
5346                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5347                ],
5348                vec!["id".to_string()],
5349                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5350            )
5351            .expect("backfill schema");
5352
5353        let task_schema = KvSchema::new(client.clone())
5354            .table(
5355                "orders",
5356                vec![
5357                    TableColumnConfig::new("id", DataType::Int64, false),
5358                    TableColumnConfig::new("status", DataType::Utf8, false),
5359                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5360                ],
5361                vec!["id".to_string()],
5362                vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5363            )
5364            .expect("task schema");
5365        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5366        let handle = tokio::spawn(async move {
5367            task_schema
5368                .backfill_added_indexes_with_options_and_progress(
5369                    "orders",
5370                    &[],
5371                    IndexBackfillOptions {
5372                        row_batch_size: 2,
5373                        start_from_primary_key: None,
5374                    },
5375                    Some(&progress_tx),
5376                )
5377                .await
5378        });
5379
5380        let mut resume_cursor = None;
5381        while let Some(event) = progress_rx.recv().await {
5382            if let IndexBackfillEvent::Progress { next_cursor, .. } = event {
5383                resume_cursor = next_cursor;
5384                break;
5385            }
5386        }
5387        handle.abort();
5388        let resume_cursor =
5389            resume_cursor.expect("first progress event should provide resume cursor");
5390
5391        let report = backfill_schema
5392            .backfill_added_indexes_with_options(
5393                "orders",
5394                &[],
5395                IndexBackfillOptions {
5396                    row_batch_size: 2,
5397                    start_from_primary_key: Some(resume_cursor),
5398                },
5399            )
5400            .await
5401            .expect("resume backfill should succeed");
5402        assert_eq!(report.scanned_rows, 6);
5403
5404        let guard = state.kv.lock().expect("kv mutex poisoned");
5405        let base_rows = guard
5406            .keys()
5407            .filter(|key| matches_primary_key(0, key))
5408            .count();
5409        let index_rows = guard
5410            .keys()
5411            .filter(|key| matches_secondary_index_key(0, 1, key))
5412            .count();
5413        assert_eq!(base_rows, 8);
5414        assert_eq!(
5415            index_rows, 8,
5416            "resume should backfill each row exactly once"
5417        );
5418
5419        let _ = shutdown_tx.send(());
5420    }
5421
5422    #[tokio::test]
5423    async fn concurrent_writes_during_backfill_preserve_index_correctness() {
5424        let state = MockState {
5425            kv: Arc::new(Mutex::new(BTreeMap::new())),
5426            range_calls: Arc::new(AtomicUsize::new(0)),
5427            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5428            sequence_number: Arc::new(AtomicU64::new(0)),
5429        };
5430        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5431        let client = StoreClient::new(&base_url);
5432
5433        let seed_schema = KvSchema::new(client.clone())
5434            .table(
5435                "orders",
5436                vec![
5437                    TableColumnConfig::new("id", DataType::Int64, false),
5438                    TableColumnConfig::new("status", DataType::Utf8, false),
5439                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5440                ],
5441                vec!["id".to_string()],
5442                vec![],
5443            )
5444            .expect("seed schema");
5445        let mut seed_writer = seed_schema.batch_writer();
5446        for i in 0..40i64 {
5447            seed_writer
5448                .insert(
5449                    "orders",
5450                    vec![
5451                        CellValue::Int64(i),
5452                        CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
5453                        CellValue::Int64(i * 10),
5454                    ],
5455                )
5456                .expect("seed row");
5457        }
5458        seed_writer.flush().await.expect("seed flush");
5459
5460        let backfill_schema = KvSchema::new(client.clone())
5461            .table(
5462                "orders",
5463                vec![
5464                    TableColumnConfig::new("id", DataType::Int64, false),
5465                    TableColumnConfig::new("status", DataType::Utf8, false),
5466                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5467                ],
5468                vec!["id".to_string()],
5469                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5470                    .expect("valid")
5471                    .with_cover_columns(vec!["amount_cents".to_string()])],
5472            )
5473            .expect("backfill schema");
5474
5475        let task_schema = KvSchema::new(client.clone())
5476            .table(
5477                "orders",
5478                vec![
5479                    TableColumnConfig::new("id", DataType::Int64, false),
5480                    TableColumnConfig::new("status", DataType::Utf8, false),
5481                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
5482                ],
5483                vec!["id".to_string()],
5484                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5485                    .expect("valid")
5486                    .with_cover_columns(vec!["amount_cents".to_string()])],
5487            )
5488            .expect("task schema");
5489        let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5490        let handle = tokio::spawn(async move {
5491            task_schema
5492                .backfill_added_indexes_with_options_and_progress(
5493                    "orders",
5494                    &[],
5495                    IndexBackfillOptions {
5496                        row_batch_size: 5,
5497                        start_from_primary_key: None,
5498                    },
5499                    Some(&progress_tx),
5500                )
5501                .await
5502        });
5503
5504        while let Some(event) = progress_rx.recv().await {
5505            if matches!(event, IndexBackfillEvent::Progress { .. }) {
5506                break;
5507            }
5508        }
5509
5510        let mut concurrent_writer = backfill_schema.batch_writer();
5511        for id in [100i64, 101i64] {
5512            concurrent_writer
5513                .insert(
5514                    "orders",
5515                    vec![
5516                        CellValue::Int64(id),
5517                        CellValue::Utf8("open".to_string()),
5518                        CellValue::Int64(id * 10),
5519                    ],
5520                )
5521                .expect("concurrent row");
5522        }
5523        concurrent_writer.flush().await.expect("concurrent flush");
5524
5525        let report = handle
5526            .await
5527            .expect("backfill task join")
5528            .expect("backfill result");
5529        assert!(
5530            report.scanned_rows >= 40,
5531            "backfill should at least scan the original historical rows"
5532        );
5533
5534        let guard = state.kv.lock().expect("kv mutex poisoned");
5535        let base_rows = guard
5536            .keys()
5537            .filter(|key| matches_primary_key(0, key))
5538            .count();
5539        let index_rows = guard
5540            .keys()
5541            .filter(|key| matches_secondary_index_key(0, 1, key))
5542            .count();
5543        assert_eq!(base_rows, 42);
5544        assert_eq!(
5545            index_rows, 42,
5546            "historical backfill plus concurrent indexed writes should leave one index row per base row"
5547        );
5548
5549        let _ = shutdown_tx.send(());
5550    }
5551
5552    #[derive(Clone)]
5553    struct DeferredChunkRangeHarness {
5554        first_chunk_sent: Arc<Notify>,
5555        release_second_chunk: Arc<Notify>,
5556        first_frame: ProtoRangeFrame,
5557        second_frame: ProtoRangeFrame,
5558    }
5559
5560    impl QueryService for DeferredChunkRangeHarness {
5561        async fn get(
5562            &self,
5563            _ctx: Context,
5564            _request: buffa::view::OwnedView<
5565                exoware_sdk::store::query::v1::GetRequestView<'static>,
5566            >,
5567        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5568            Err(ConnectError::unimplemented("test harness"))
5569        }
5570
5571        async fn get_many(
5572            &self,
5573            _ctx: Context,
5574            _request: buffa::view::OwnedView<
5575                exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5576            >,
5577        ) -> Result<
5578            (
5579                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5580                Context,
5581            ),
5582            ConnectError,
5583        > {
5584            Err(ConnectError::unimplemented("test harness"))
5585        }
5586
5587        async fn range(
5588            &self,
5589            _ctx: Context,
5590            _request: buffa::view::OwnedView<
5591                exoware_sdk::store::query::v1::RangeRequestView<'static>,
5592            >,
5593        ) -> Result<
5594            (
5595                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5596                Context,
5597            ),
5598            ConnectError,
5599        > {
5600            let first_chunk_sent = self.first_chunk_sent.clone();
5601            let release_second_chunk = self.release_second_chunk.clone();
5602            let first_frame = self.first_frame.clone();
5603            let second_frame = self.second_frame.clone();
5604            let stream = stream::try_unfold(0u8, move |state| {
5605                let first_chunk_sent = first_chunk_sent.clone();
5606                let release_second_chunk = release_second_chunk.clone();
5607                let first_frame = first_frame.clone();
5608                let second_frame = second_frame.clone();
5609                async move {
5610                    match state {
5611                        0 => {
5612                            first_chunk_sent.notify_one();
5613                            Ok(Some((first_frame, 1)))
5614                        }
5615                        1 => {
5616                            release_second_chunk.notified().await;
5617                            Ok(Some((second_frame, 2)))
5618                        }
5619                        _ => Ok(None),
5620                    }
5621                }
5622            });
5623            Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5624        }
5625
5626        async fn reduce(
5627            &self,
5628            _ctx: Context,
5629            _request: buffa::view::OwnedView<
5630                exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5631            >,
5632        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5633            Err(ConnectError::unimplemented("test harness"))
5634        }
5635    }
5636
5637    #[derive(Clone)]
5638    struct ObservedLimitRangeHarness {
5639        release_second_chunk: Arc<Notify>,
5640        observed_limit: Arc<AtomicUsize>,
5641        first_frame: ProtoRangeFrame,
5642        second_frame: ProtoRangeFrame,
5643    }
5644
5645    impl QueryService for ObservedLimitRangeHarness {
5646        async fn get(
5647            &self,
5648            _ctx: Context,
5649            _request: buffa::view::OwnedView<
5650                exoware_sdk::store::query::v1::GetRequestView<'static>,
5651            >,
5652        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5653            Err(ConnectError::unimplemented("test harness"))
5654        }
5655
5656        async fn get_many(
5657            &self,
5658            _ctx: Context,
5659            _request: buffa::view::OwnedView<
5660                exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5661            >,
5662        ) -> Result<
5663            (
5664                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5665                Context,
5666            ),
5667            ConnectError,
5668        > {
5669            Err(ConnectError::unimplemented("test harness"))
5670        }
5671
5672        async fn range(
5673            &self,
5674            _ctx: Context,
5675            request: buffa::view::OwnedView<
5676                exoware_sdk::store::query::v1::RangeRequestView<'static>,
5677            >,
5678        ) -> Result<
5679            (
5680                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5681                Context,
5682            ),
5683            ConnectError,
5684        > {
5685            let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
5686            self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5687            let release_second_chunk = self.release_second_chunk.clone();
5688            let first_frame = self.first_frame.clone();
5689            let second_frame = self.second_frame.clone();
5690            let stream = stream::try_unfold(0u8, move |state| {
5691                let release_second_chunk = release_second_chunk.clone();
5692                let first_frame = first_frame.clone();
5693                let second_frame = second_frame.clone();
5694                async move {
5695                    match state {
5696                        0 => Ok(Some((first_frame, 1))),
5697                        1 => {
5698                            if limit > 1 {
5699                                release_second_chunk.notified().await;
5700                                Ok(Some((second_frame, 2)))
5701                            } else {
5702                                Ok(None)
5703                            }
5704                        }
5705                        2 => Ok(None),
5706                        _ => Ok(None),
5707                    }
5708                }
5709            });
5710            Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5711        }
5712
5713        async fn reduce(
5714            &self,
5715            _ctx: Context,
5716            _request: buffa::view::OwnedView<
5717                exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5718            >,
5719        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5720            Err(ConnectError::unimplemented("test harness"))
5721        }
5722    }
5723
5724    #[derive(Clone)]
5725    struct ObservedLimitIndexRangeHarness {
5726        observed_limit: Arc<AtomicUsize>,
5727        entries_frame: ProtoRangeFrame,
5728    }
5729
5730    impl QueryService for ObservedLimitIndexRangeHarness {
5731        async fn get(
5732            &self,
5733            _ctx: Context,
5734            _request: buffa::view::OwnedView<
5735                exoware_sdk::store::query::v1::GetRequestView<'static>,
5736            >,
5737        ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5738            Err(ConnectError::unimplemented("test harness"))
5739        }
5740
5741        async fn get_many(
5742            &self,
5743            _ctx: Context,
5744            _request: buffa::view::OwnedView<
5745                exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5746            >,
5747        ) -> Result<
5748            (
5749                Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5750                Context,
5751            ),
5752            ConnectError,
5753        > {
5754            Err(ConnectError::unimplemented("test harness"))
5755        }
5756
5757        async fn range(
5758            &self,
5759            _ctx: Context,
5760            request: buffa::view::OwnedView<
5761                exoware_sdk::store::query::v1::RangeRequestView<'static>,
5762            >,
5763        ) -> Result<
5764            (
5765                Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5766                Context,
5767            ),
5768            ConnectError,
5769        > {
5770            let limit = request
5771                .limit
5772                .map(|v| {
5773                    if v == u32::MAX {
5774                        usize::MAX
5775                    } else {
5776                        v as usize
5777                    }
5778                })
5779                .unwrap_or(usize::MAX);
5780            self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5781            let entries_frame = self.entries_frame.clone();
5782            Ok((
5783                Box::pin(stream::iter(vec![Ok(entries_frame)])),
5784                query_detail_trailer_ctx(7),
5785            ))
5786        }
5787
5788        async fn reduce(
5789            &self,
5790            _ctx: Context,
5791            _request: buffa::view::OwnedView<
5792                exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5793            >,
5794        ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5795            Err(ConnectError::unimplemented("test harness"))
5796        }
5797    }
5798
5799    #[tokio::test]
5800    async fn kv_scan_streaming_range_reads_emit_first_batch_before_full_range_completes() {
5801        let model = Arc::new(simple_int64_model(0));
5802        let first_chunk_sent = Arc::new(Notify::new());
5803        let release_second_chunk = Arc::new(Notify::new());
5804
5805        let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5806
5807        let first_results = {
5808            let mut results = Vec::with_capacity(BATCH_FLUSH_ROWS);
5809            for id in 0..BATCH_FLUSH_ROWS {
5810                let key =
5811                    encode_primary_key(model.table_prefix, &[&CellValue::Int64(id as i64)], &model)
5812                        .expect("primary key");
5813                results.push((key, encoded_row.clone()));
5814            }
5815            results
5816        };
5817        let first_frame = proto_range_entries_frame(first_results);
5818
5819        let second_results = {
5820            let key = encode_primary_key(
5821                model.table_prefix,
5822                &[&CellValue::Int64(BATCH_FLUSH_ROWS as i64)],
5823                &model,
5824            )
5825            .expect("primary key");
5826            vec![(key, encoded_row)]
5827        };
5828        let second_frame = proto_range_entries_frame(second_results);
5829
5830        let harness = DeferredChunkRangeHarness {
5831            first_chunk_sent: first_chunk_sent.clone(),
5832            release_second_chunk: release_second_chunk.clone(),
5833            first_frame,
5834            second_frame,
5835        };
5836        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5837            .with_compression(connect_compression_registry());
5838        let app = Router::new().fallback_service(connect);
5839
5840        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5841            .await
5842            .expect("bind test listener");
5843        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5844        tokio::spawn(async move {
5845            axum::serve(listener, app).await.expect("serve test app");
5846        });
5847
5848        let client = StoreClient::new(&url);
5849        let scan = KvScanExec::new(
5850            client,
5851            model.clone(),
5852            Arc::new(Vec::new()),
5853            QueryPredicate::default(),
5854            None,
5855            model.schema.clone(),
5856            None,
5857        );
5858
5859        let session_ctx = SessionContext::new();
5860        let mut stream = scan
5861            .execute(0, session_ctx.task_ctx())
5862            .expect("scan execute should start");
5863
5864        tokio::time::timeout(Duration::from_secs(1), first_chunk_sent.notified())
5865            .await
5866            .expect("server should send first range frame");
5867        let first_batch = tokio::time::timeout(Duration::from_millis(200), stream.try_next())
5868            .await
5869            .expect("first record batch should arrive before the second stream chunk is released")
5870            .expect("stream poll should succeed")
5871            .expect("expected first record batch");
5872        assert_eq!(first_batch.num_rows(), BATCH_FLUSH_ROWS);
5873
5874        release_second_chunk.notify_one();
5875
5876        let second_batch = stream
5877            .try_next()
5878            .await
5879            .expect("second poll should succeed")
5880            .expect("expected second record batch");
5881        assert_eq!(second_batch.num_rows(), 1);
5882        assert!(
5883            stream
5884                .try_next()
5885                .await
5886                .expect("stream completion poll")
5887                .is_none(),
5888            "stream should finish after the second batch"
5889        );
5890    }
5891
5892    #[tokio::test]
5893    async fn kv_scan_sql_limit_is_pushed_upstream_on_exact_streaming_scan() {
5894        let release_second_chunk = Arc::new(Notify::new());
5895        let observed_limit = Arc::new(AtomicUsize::new(0));
5896        let model = simple_int64_model(0);
5897
5898        let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5899
5900        let first_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(1)], &model)
5901            .expect("first primary key");
5902        let second_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(2)], &model)
5903            .expect("second primary key");
5904
5905        let first_frame = proto_range_entries_frame(vec![(first_key, encoded_row.clone())]);
5906        let second_frame = proto_range_entries_frame(vec![(second_key, encoded_row)]);
5907
5908        let harness = ObservedLimitRangeHarness {
5909            release_second_chunk: release_second_chunk.clone(),
5910            observed_limit: observed_limit.clone(),
5911            first_frame,
5912            second_frame,
5913        };
5914        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5915            .with_compression(connect_compression_registry());
5916        let app = Router::new().fallback_service(connect);
5917
5918        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5919            .await
5920            .expect("bind test listener");
5921        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5922        tokio::spawn(async move {
5923            axum::serve(listener, app).await.expect("serve test app");
5924        });
5925
5926        let client = StoreClient::new(&url);
5927        let schema = KvSchema::new(client)
5928            .table(
5929                "items",
5930                vec![TableColumnConfig::new("id", DataType::Int64, false)],
5931                vec!["id".to_string()],
5932                vec![],
5933            )
5934            .expect("schema");
5935        let ctx = SessionContext::new();
5936        schema.register_all(&ctx).expect("register");
5937
5938        let batches = tokio::time::timeout(Duration::from_millis(200), async {
5939            ctx.sql("SELECT id FROM items LIMIT 1")
5940                .await
5941                .expect("query")
5942                .collect()
5943                .await
5944                .expect("collect")
5945        })
5946        .await
5947        .expect("query with LIMIT 1 should finish without waiting for a delayed second chunk");
5948
5949        assert_eq!(
5950            batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
5951            1
5952        );
5953        assert_eq!(
5954            observed_limit.load(AtomicOrdering::SeqCst),
5955            1,
5956            "exact streaming scan should push SQL LIMIT upstream"
5957        );
5958        release_second_chunk.notify_one();
5959    }
5960
5961    #[tokio::test]
5962    async fn kv_scan_index_limit_does_not_push_upstream_when_seen_dedup_can_drop_entries() {
5963        let observed_limit = Arc::new(AtomicUsize::new(0));
5964        let config = KvTableConfig::new(
5965            0,
5966            vec![
5967                TableColumnConfig::new("id", DataType::Int64, false),
5968                TableColumnConfig::new("status", DataType::Utf8, false),
5969                TableColumnConfig::new("amount_cents", DataType::Int64, false),
5970            ],
5971            vec!["id".to_string()],
5972            vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5973                .expect("valid")
5974                .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
5975        )
5976        .expect("config");
5977        let model = TableModel::from_config(&config).expect("model");
5978        let spec = model
5979            .resolve_index_specs(&config.index_specs)
5980            .expect("specs")
5981            .into_iter()
5982            .next()
5983            .expect("status index spec");
5984        let stale_row = KvRow {
5985            values: vec![
5986                CellValue::Int64(7),
5987                CellValue::Utf8("closed".to_string()),
5988                CellValue::Int64(10),
5989            ],
5990        };
5991        let current_row = KvRow {
5992            values: vec![
5993                CellValue::Int64(7),
5994                CellValue::Utf8("open".to_string()),
5995                CellValue::Int64(10),
5996            ],
5997        };
5998        let unique_row = KvRow {
5999            values: vec![
6000                CellValue::Int64(8),
6001                CellValue::Utf8("open".to_string()),
6002                CellValue::Int64(20),
6003            ],
6004        };
6005        let stale_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &stale_row)
6006            .expect("stale index key");
6007        let current_key =
6008            encode_secondary_index_key(model.table_prefix, &spec, &model, &current_row)
6009                .expect("current index key");
6010        let unique_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &unique_row)
6011            .expect("unique index key");
6012        let stale_payload =
6013            encode_secondary_index_value(&stale_row, &model, &spec).expect("stale payload");
6014        let current_payload =
6015            encode_secondary_index_value(&current_row, &model, &spec).expect("current payload");
6016        let unique_payload =
6017            encode_secondary_index_value(&unique_row, &model, &spec).expect("unique payload");
6018
6019        let entries_frame = proto_range_entries_frame(vec![
6020            (stale_key, stale_payload),
6021            (current_key, current_payload),
6022            (unique_key, unique_payload),
6023        ]);
6024        let harness = ObservedLimitIndexRangeHarness {
6025            observed_limit: observed_limit.clone(),
6026            entries_frame,
6027        };
6028        let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
6029            .with_compression(connect_compression_registry());
6030        let app = Router::new().fallback_service(connect);
6031
6032        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
6033            .await
6034            .expect("bind test listener");
6035        let url = format!("http://{}", listener.local_addr().expect("listener addr"));
6036        tokio::spawn(async move {
6037            axum::serve(listener, app).await.expect("serve test app");
6038        });
6039
6040        let client = StoreClient::new(&url);
6041        let schema = KvSchema::new(client)
6042            .table(
6043                "orders",
6044                vec![
6045                    TableColumnConfig::new("id", DataType::Int64, false),
6046                    TableColumnConfig::new("status", DataType::Utf8, false),
6047                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6048                ],
6049                vec!["id".to_string()],
6050                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6051                    .expect("valid")
6052                    .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
6053            )
6054            .expect("schema");
6055        let ctx = SessionContext::new();
6056        schema.register_all(&ctx).expect("register");
6057
6058        let batches = ctx
6059            .sql(
6060                "SELECT id, amount_cents \
6061                 FROM orders \
6062                 WHERE status IN ('open', 'closed') \
6063                 LIMIT 2",
6064            )
6065            .await
6066            .expect("query")
6067            .collect()
6068            .await
6069            .expect("collect");
6070
6071        assert_eq!(
6072            batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
6073            2
6074        );
6075        assert_eq!(
6076            observed_limit.load(AtomicOrdering::SeqCst),
6077            usize::MAX,
6078            "index streaming scans should not push SQL LIMIT upstream while seen-dedup can drop duplicate primary keys"
6079        );
6080    }
6081
6082    #[tokio::test]
6083    async fn zorder_covering_index_scan_filters_false_positive_morton_span_rows() {
6084        let state = MockState {
6085            kv: Arc::new(Mutex::new(BTreeMap::new())),
6086            range_calls: Arc::new(AtomicUsize::new(0)),
6087            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6088            sequence_number: Arc::new(AtomicU64::new(0)),
6089        };
6090        let (base_url, shutdown_tx) = spawn_mock_server(state).await;
6091        let client = StoreClient::new(&base_url);
6092
6093        let schema = KvSchema::new(client)
6094            .table(
6095                "points",
6096                vec![
6097                    TableColumnConfig::new("x", DataType::Int64, false),
6098                    TableColumnConfig::new("y", DataType::Int64, false),
6099                    TableColumnConfig::new("id", DataType::Int64, false),
6100                    TableColumnConfig::new("value", DataType::Int64, false),
6101                ],
6102                vec!["id".to_string()],
6103                vec![
6104                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6105                        .expect("valid")
6106                        .with_cover_columns(vec!["value".to_string()]),
6107                ],
6108            )
6109            .expect("schema");
6110
6111        let mut writer = schema.batch_writer();
6112        for (x, y, id, value) in [
6113            (0, 2, 2, 20),
6114            (1, 1, 11, 110),
6115            (1, 2, 12, 120),
6116            (2, 1, 21, 210),
6117            (2, 2, 22, 220),
6118            (3, 0, 30, 300),
6119        ] {
6120            writer
6121                .insert(
6122                    "points",
6123                    vec![
6124                        CellValue::Int64(x),
6125                        CellValue::Int64(y),
6126                        CellValue::Int64(id),
6127                        CellValue::Int64(value),
6128                    ],
6129                )
6130                .expect("row");
6131        }
6132        writer.flush().await.expect("flush");
6133
6134        let ctx = SessionContext::new();
6135        schema.register_all(&ctx).expect("register");
6136
6137        let batches = ctx
6138            .sql(
6139                "SELECT id, value FROM points \
6140                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2 \
6141                 ORDER BY id",
6142            )
6143            .await
6144            .expect("query")
6145            .collect()
6146            .await
6147            .expect("collect");
6148
6149        let mut rows = Vec::new();
6150        for batch in &batches {
6151            let ids = batch
6152                .column(0)
6153                .as_any()
6154                .downcast_ref::<Int64Array>()
6155                .expect("id int64");
6156            let values = batch
6157                .column(1)
6158                .as_any()
6159                .downcast_ref::<Int64Array>()
6160                .expect("value int64");
6161            for row_idx in 0..batch.num_rows() {
6162                rows.push((ids.value(row_idx), values.value(row_idx)));
6163            }
6164        }
6165        assert_eq!(rows, vec![(11, 110), (12, 120), (21, 210), (22, 220)]);
6166
6167        let _ = shutdown_tx.send(());
6168    }
6169
6170    #[tokio::test]
6171    async fn aggregate_pushdown_uses_range_reduce_for_supported_global_aggregates() {
6172        let state = MockState {
6173            kv: Arc::new(Mutex::new(BTreeMap::new())),
6174            range_calls: Arc::new(AtomicUsize::new(0)),
6175            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6176            sequence_number: Arc::new(AtomicU64::new(0)),
6177        };
6178        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6179        let client = StoreClient::new(&base_url);
6180
6181        let schema = KvSchema::new(client)
6182            .table(
6183                "orders",
6184                vec![
6185                    TableColumnConfig::new("id", DataType::Int64, false),
6186                    TableColumnConfig::new("status", DataType::Utf8, false),
6187                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6188                ],
6189                vec!["id".to_string()],
6190                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6191                    .expect("valid")
6192                    .with_cover_columns(vec!["amount_cents".to_string()])],
6193            )
6194            .expect("schema");
6195
6196        let mut writer = schema.batch_writer();
6197        for (id, status, amount) in [
6198            (1, "open", 10),
6199            (2, "closed", 15),
6200            (3, "open", 30),
6201            (4, "closed", 40),
6202        ] {
6203            writer
6204                .insert(
6205                    "orders",
6206                    vec![
6207                        CellValue::Int64(id),
6208                        CellValue::Utf8(status.to_string()),
6209                        CellValue::Int64(amount),
6210                    ],
6211                )
6212                .expect("row");
6213        }
6214        writer.flush().await.expect("flush");
6215
6216        let ctx = SessionContext::new();
6217        schema.register_all(&ctx).expect("register");
6218
6219        state.range_calls.store(0, AtomicOrdering::SeqCst);
6220        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6221
6222        let df = ctx
6223            .sql(
6224                "SELECT COUNT(*) AS row_count, SUM(amount_cents) AS total_cents, \
6225                 AVG(amount_cents) AS avg_cents \
6226                 FROM orders WHERE status = 'open'",
6227            )
6228            .await
6229            .expect("query");
6230        let batches = df.collect().await.expect("collect");
6231
6232        assert_eq!(batches.len(), 1);
6233        let batch = &batches[0];
6234        let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6235        let total = batch
6236            .column(1)
6237            .as_any()
6238            .downcast_ref::<Int64Array>()
6239            .expect("sum int64")
6240            .value(0);
6241        let avg = batch
6242            .column(2)
6243            .as_any()
6244            .downcast_ref::<Float64Array>()
6245            .expect("avg float64")
6246            .value(0);
6247        assert!(matches!(
6248            row_count,
6249            ScalarValue::UInt64(Some(2)) | ScalarValue::Int64(Some(2))
6250        ));
6251        assert_eq!(total, 40);
6252        assert_eq!(avg, 20.0);
6253        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6254        assert!(
6255            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6256            "supported aggregate should use range reduction path"
6257        );
6258
6259        let _ = shutdown_tx.send(());
6260    }
6261
6262    /// Store `/v1/range` is inclusive on both ends; `id <= N` and `BETWEEN` must include the end key.
6263    #[tokio::test]
6264    async fn primary_key_inclusive_upper_bound_streaming_scan_uses_range() {
6265        let state = MockState {
6266            kv: Arc::new(Mutex::new(BTreeMap::new())),
6267            range_calls: Arc::new(AtomicUsize::new(0)),
6268            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6269            sequence_number: Arc::new(AtomicU64::new(0)),
6270        };
6271        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6272        let client = StoreClient::new(&base_url);
6273
6274        let schema = KvSchema::new(client)
6275            .table(
6276                "inc_pk",
6277                vec![
6278                    TableColumnConfig::new("id", DataType::Int64, false),
6279                    TableColumnConfig::new("amount", DataType::Int64, false),
6280                ],
6281                vec!["id".to_string()],
6282                vec![],
6283            )
6284            .expect("schema");
6285
6286        let mut writer = schema.batch_writer();
6287        for id in 1i64..=5i64 {
6288            writer
6289                .insert(
6290                    "inc_pk",
6291                    vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6292                )
6293                .expect("row");
6294        }
6295        writer.flush().await.expect("flush");
6296
6297        let ctx = SessionContext::new();
6298        schema.register_all(&ctx).expect("register");
6299
6300        state.range_calls.store(0, AtomicOrdering::SeqCst);
6301        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6302
6303        let batches = ctx
6304            .sql("SELECT id FROM inc_pk WHERE id <= 3 ORDER BY id")
6305            .await
6306            .expect("lte query")
6307            .collect()
6308            .await
6309            .expect("collect");
6310        let mut ids = Vec::new();
6311        for batch in &batches {
6312            let col = batch
6313                .column(0)
6314                .as_any()
6315                .downcast_ref::<Int64Array>()
6316                .expect("id");
6317            for i in 0..batch.num_rows() {
6318                ids.push(col.value(i));
6319            }
6320        }
6321        assert_eq!(ids, vec![1, 2, 3], "id <= 3 must include id 3");
6322        assert!(
6323            state.range_calls.load(AtomicOrdering::SeqCst) >= 1,
6324            "PK bounded scan should call range"
6325        );
6326        assert_eq!(
6327            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6328            0,
6329            "streaming scan must not use range_reduce"
6330        );
6331
6332        state.range_calls.store(0, AtomicOrdering::SeqCst);
6333        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6334
6335        let batches = ctx
6336            .sql("SELECT id FROM inc_pk WHERE id BETWEEN 2 AND 4 ORDER BY id")
6337            .await
6338            .expect("between query")
6339            .collect()
6340            .await
6341            .expect("collect");
6342        ids.clear();
6343        for batch in &batches {
6344            let col = batch
6345                .column(0)
6346                .as_any()
6347                .downcast_ref::<Int64Array>()
6348                .expect("id");
6349            for i in 0..batch.num_rows() {
6350                ids.push(col.value(i));
6351            }
6352        }
6353        assert_eq!(ids, vec![2, 3, 4], "BETWEEN must include both endpoints");
6354        assert!(state.range_calls.load(AtomicOrdering::SeqCst) >= 1);
6355        assert_eq!(state.range_reduce_calls.load(AtomicOrdering::SeqCst), 0);
6356
6357        let _ = shutdown_tx.send(());
6358    }
6359
6360    #[tokio::test]
6361    async fn primary_key_inclusive_upper_bound_scalar_aggregates_use_range_reduce() {
6362        let state = MockState {
6363            kv: Arc::new(Mutex::new(BTreeMap::new())),
6364            range_calls: Arc::new(AtomicUsize::new(0)),
6365            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6366            sequence_number: Arc::new(AtomicU64::new(0)),
6367        };
6368        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6369        let client = StoreClient::new(&base_url);
6370
6371        let schema = KvSchema::new(client)
6372            .table(
6373                "inc_pk",
6374                vec![
6375                    TableColumnConfig::new("id", DataType::Int64, false),
6376                    TableColumnConfig::new("amount", DataType::Int64, false),
6377                ],
6378                vec!["id".to_string()],
6379                vec![],
6380            )
6381            .expect("schema");
6382
6383        let mut writer = schema.batch_writer();
6384        for id in 1i64..=5i64 {
6385            writer
6386                .insert(
6387                    "inc_pk",
6388                    vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6389                )
6390                .expect("row");
6391        }
6392        writer.flush().await.expect("flush");
6393
6394        let ctx = SessionContext::new();
6395        schema.register_all(&ctx).expect("register");
6396
6397        state.range_calls.store(0, AtomicOrdering::SeqCst);
6398        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6399
6400        let batches = ctx
6401            .sql("SELECT COUNT(*) AS c, SUM(amount) AS s FROM inc_pk WHERE id <= 3")
6402            .await
6403            .expect("lte agg")
6404            .collect()
6405            .await
6406            .expect("collect");
6407        assert_eq!(batches.len(), 1);
6408        let batch = &batches[0];
6409        let c = ScalarValue::try_from_array(batch.column(0), 0).expect("count");
6410        assert!(
6411            matches!(
6412                c,
6413                ScalarValue::UInt64(Some(3)) | ScalarValue::Int64(Some(3))
6414            ),
6415            "count should include id=3"
6416        );
6417        assert_eq!(
6418            batch
6419                .column(1)
6420                .as_any()
6421                .downcast_ref::<Int64Array>()
6422                .expect("sum")
6423                .value(0),
6424            100 + 200 + 300
6425        );
6426        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6427        assert!(
6428            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6429            "scalar aggregate on PK range should use range_reduce"
6430        );
6431
6432        state.range_calls.store(0, AtomicOrdering::SeqCst);
6433        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6434
6435        let batches = ctx
6436            .sql("SELECT SUM(amount) AS s FROM inc_pk WHERE id BETWEEN 2 AND 4")
6437            .await
6438            .expect("between agg")
6439            .collect()
6440            .await
6441            .expect("collect");
6442        assert_eq!(batches.len(), 1);
6443        let batch = &batches[0];
6444        assert_eq!(
6445            batch
6446                .column(0)
6447                .as_any()
6448                .downcast_ref::<Int64Array>()
6449                .expect("sum")
6450                .value(0),
6451            200 + 300 + 400
6452        );
6453        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6454        assert!(
6455            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6456            "BETWEEN aggregate should use range_reduce"
6457        );
6458
6459        let _ = shutdown_tx.send(());
6460    }
6461
6462    #[tokio::test]
6463    async fn aggregate_pushdown_uses_zorder_index_with_worker_filter() {
6464        let state = MockState {
6465            kv: Arc::new(Mutex::new(BTreeMap::new())),
6466            range_calls: Arc::new(AtomicUsize::new(0)),
6467            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6468            sequence_number: Arc::new(AtomicU64::new(0)),
6469        };
6470        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6471        let client = StoreClient::new(&base_url);
6472
6473        let schema = KvSchema::new(client)
6474            .table(
6475                "points",
6476                vec![
6477                    TableColumnConfig::new("x", DataType::Int64, false),
6478                    TableColumnConfig::new("y", DataType::Int64, false),
6479                    TableColumnConfig::new("id", DataType::Int64, false),
6480                    TableColumnConfig::new("value", DataType::Int64, false),
6481                ],
6482                vec!["id".to_string()],
6483                vec![
6484                    IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6485                        .expect("valid")
6486                        .with_cover_columns(vec!["value".to_string()]),
6487                ],
6488            )
6489            .expect("schema");
6490
6491        let mut writer = schema.batch_writer();
6492        for (x, y, id, value) in [
6493            (0, 2, 2, 20),
6494            (1, 1, 11, 110),
6495            (1, 2, 12, 120),
6496            (2, 1, 21, 210),
6497            (2, 2, 22, 220),
6498            (3, 0, 30, 300),
6499        ] {
6500            writer
6501                .insert(
6502                    "points",
6503                    vec![
6504                        CellValue::Int64(x),
6505                        CellValue::Int64(y),
6506                        CellValue::Int64(id),
6507                        CellValue::Int64(value),
6508                    ],
6509                )
6510                .expect("row");
6511        }
6512        writer.flush().await.expect("flush");
6513
6514        let ctx = SessionContext::new();
6515        schema.register_all(&ctx).expect("register");
6516
6517        state.range_calls.store(0, AtomicOrdering::SeqCst);
6518        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6519
6520        let batches = ctx
6521            .sql(
6522                "SELECT COUNT(*) AS row_count, SUM(value) AS total_value \
6523                 FROM points \
6524                 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
6525            )
6526            .await
6527            .expect("query")
6528            .collect()
6529            .await
6530            .expect("collect");
6531
6532        assert_eq!(batches.len(), 1);
6533        let batch = &batches[0];
6534        let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6535        let total = batch
6536            .column(1)
6537            .as_any()
6538            .downcast_ref::<Int64Array>()
6539            .expect("sum int64")
6540            .value(0);
6541        assert!(matches!(
6542            row_count,
6543            ScalarValue::UInt64(Some(4)) | ScalarValue::Int64(Some(4))
6544        ));
6545        assert_eq!(total, 660);
6546        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6547        assert!(
6548            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6549            "z-order aggregate should use range reduction path"
6550        );
6551
6552        let _ = shutdown_tx.send(());
6553    }
6554
6555    #[tokio::test]
6556    async fn aggregate_pushdown_avg_merges_sum_and_count_across_multiple_ranges() {
6557        let state = MockState {
6558            kv: Arc::new(Mutex::new(BTreeMap::new())),
6559            range_calls: Arc::new(AtomicUsize::new(0)),
6560            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6561            sequence_number: Arc::new(AtomicU64::new(0)),
6562        };
6563        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6564        let client = StoreClient::new(&base_url);
6565
6566        let schema = KvSchema::new(client)
6567            .table(
6568                "orders",
6569                vec![
6570                    TableColumnConfig::new("id", DataType::Int64, false),
6571                    TableColumnConfig::new("status", DataType::Utf8, false),
6572                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6573                ],
6574                vec!["id".to_string()],
6575                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6576                    .expect("valid")
6577                    .with_cover_columns(vec!["amount_cents".to_string()])],
6578            )
6579            .expect("schema");
6580
6581        let mut writer = schema.batch_writer();
6582        for (id, status, amount) in [
6583            (1, "open", 10),
6584            (2, "open", 20),
6585            (3, "closed", 100),
6586            (4, "pending", 1_000),
6587        ] {
6588            writer
6589                .insert(
6590                    "orders",
6591                    vec![
6592                        CellValue::Int64(id),
6593                        CellValue::Utf8(status.to_string()),
6594                        CellValue::Int64(amount),
6595                    ],
6596                )
6597                .expect("row");
6598        }
6599        writer.flush().await.expect("flush");
6600
6601        let ctx = SessionContext::new();
6602        schema.register_all(&ctx).expect("register");
6603
6604        state.range_calls.store(0, AtomicOrdering::SeqCst);
6605        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6606
6607        let batches = ctx
6608            .sql(
6609                "SELECT AVG(amount_cents) AS avg_cents \
6610                 FROM orders \
6611                 WHERE status IN ('open', 'closed')",
6612            )
6613            .await
6614            .expect("query")
6615            .collect()
6616            .await
6617            .expect("collect");
6618
6619        assert_eq!(batches.len(), 1);
6620        let batch = &batches[0];
6621        let avg = batch
6622            .column(0)
6623            .as_any()
6624            .downcast_ref::<Float64Array>()
6625            .expect("avg float64")
6626            .value(0);
6627        let expected = 130.0 / 3.0;
6628        assert!(
6629            (avg - expected).abs() < 1e-12,
6630            "AVG should merge SUM+COUNT across unequal-count ranges: got {avg}, expected {expected}"
6631        );
6632        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6633        assert_eq!(
6634            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6635            2,
6636            "status IN (...) should expand to two pushed reduction ranges"
6637        );
6638
6639        let _ = shutdown_tx.send(());
6640    }
6641
6642    #[tokio::test]
6643    async fn aggregate_pushdown_supports_filtered_global_aggregates() {
6644        let state = MockState {
6645            kv: Arc::new(Mutex::new(BTreeMap::new())),
6646            range_calls: Arc::new(AtomicUsize::new(0)),
6647            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6648            sequence_number: Arc::new(AtomicU64::new(0)),
6649        };
6650        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6651        let client = StoreClient::new(&base_url);
6652
6653        let schema = KvSchema::new(client)
6654            .table(
6655                "orders",
6656                vec![
6657                    TableColumnConfig::new("id", DataType::Int64, false),
6658                    TableColumnConfig::new("status", DataType::Utf8, false),
6659                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6660                ],
6661                vec!["id".to_string()],
6662                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6663                    .expect("valid")
6664                    .with_cover_columns(vec!["amount_cents".to_string()])],
6665            )
6666            .expect("schema");
6667
6668        let mut writer = schema.batch_writer();
6669        for (id, status, amount) in [
6670            (1, "open", 10),
6671            (2, "closed", 15),
6672            (3, "open", 30),
6673            (4, "closed", 40),
6674        ] {
6675            writer
6676                .insert(
6677                    "orders",
6678                    vec![
6679                        CellValue::Int64(id),
6680                        CellValue::Utf8(status.to_string()),
6681                        CellValue::Int64(amount),
6682                    ],
6683                )
6684                .expect("row");
6685        }
6686        writer.flush().await.expect("flush");
6687
6688        let ctx = SessionContext::new();
6689        schema.register_all(&ctx).expect("register");
6690
6691        state.range_calls.store(0, AtomicOrdering::SeqCst);
6692        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6693
6694        let query = "SELECT COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
6695                            COUNT(*) FILTER (WHERE status = 'closed') AS closed_count, \
6696                            AVG(amount_cents) FILTER (WHERE status = 'closed') AS closed_avg \
6697                     FROM orders";
6698        let batches = ctx
6699            .sql(query)
6700            .await
6701            .expect("query")
6702            .collect()
6703            .await
6704            .expect("collect");
6705
6706        assert_eq!(batches.len(), 1);
6707        let batch = &batches[0];
6708        assert_count_scalar(batch, 0, 0, 2);
6709        assert_count_scalar(batch, 1, 0, 2);
6710        let closed_avg = batch
6711            .column(2)
6712            .as_any()
6713            .downcast_ref::<Float64Array>()
6714            .expect("avg float64")
6715            .value(0);
6716        assert_eq!(closed_avg, 27.5);
6717        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6718        assert!(
6719            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6720            "filtered aggregate pushdown should use dedicated reduction jobs"
6721        );
6722
6723        let _ = shutdown_tx.send(());
6724    }
6725
6726    #[tokio::test]
6727    async fn aggregate_pushdown_supports_case_filtered_global_aggregates() {
6728        let state = MockState {
6729            kv: Arc::new(Mutex::new(BTreeMap::new())),
6730            range_calls: Arc::new(AtomicUsize::new(0)),
6731            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6732            sequence_number: Arc::new(AtomicU64::new(0)),
6733        };
6734        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6735        let client = StoreClient::new(&base_url);
6736
6737        let schema = KvSchema::new(client)
6738            .table(
6739                "orders",
6740                vec![
6741                    TableColumnConfig::new("id", DataType::Int64, false),
6742                    TableColumnConfig::new("status", DataType::Utf8, false),
6743                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6744                ],
6745                vec!["id".to_string()],
6746                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6747                    .expect("valid")
6748                    .with_cover_columns(vec!["amount_cents".to_string()])],
6749            )
6750            .expect("schema");
6751
6752        let mut writer = schema.batch_writer();
6753        for (id, status, amount) in [
6754            (1, "open", 10),
6755            (2, "closed", 15),
6756            (3, "open", 30),
6757            (4, "closed", 40),
6758        ] {
6759            writer
6760                .insert(
6761                    "orders",
6762                    vec![
6763                        CellValue::Int64(id),
6764                        CellValue::Utf8(status.to_string()),
6765                        CellValue::Int64(amount),
6766                    ],
6767                )
6768                .expect("row");
6769        }
6770        writer.flush().await.expect("flush");
6771
6772        let ctx = SessionContext::new();
6773        schema.register_all(&ctx).expect("register");
6774
6775        state.range_calls.store(0, AtomicOrdering::SeqCst);
6776        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6777
6778        let query = "SELECT SUM(CASE status WHEN 'open' THEN amount_cents END) AS open_total, \
6779                            COUNT(CASE status WHEN 'closed' THEN 1 END) AS closed_count, \
6780                            AVG(CASE WHEN status = 'closed' THEN amount_cents END) AS closed_avg \
6781                     FROM orders";
6782        let batches = ctx
6783            .sql(query)
6784            .await
6785            .expect("query")
6786            .collect()
6787            .await
6788            .expect("collect");
6789
6790        assert_eq!(batches.len(), 1);
6791        let batch = &batches[0];
6792        assert_eq!(
6793            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6794            ScalarValue::Int64(Some(40))
6795        );
6796        assert_count_scalar(batch, 1, 0, 2);
6797        let closed_avg = batch
6798            .column(2)
6799            .as_any()
6800            .downcast_ref::<Float64Array>()
6801            .expect("avg float64")
6802            .value(0);
6803        assert_eq!(closed_avg, 27.5);
6804        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6805        assert!(
6806            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6807            "case-based conditional aggregates should use reduction jobs"
6808        );
6809
6810        let _ = shutdown_tx.send(());
6811    }
6812
6813    #[tokio::test]
6814    async fn aggregate_pushdown_supports_casted_group_and_aggregate_expressions() {
6815        let state = MockState {
6816            kv: Arc::new(Mutex::new(BTreeMap::new())),
6817            range_calls: Arc::new(AtomicUsize::new(0)),
6818            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6819            sequence_number: Arc::new(AtomicU64::new(0)),
6820        };
6821        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6822        let client = StoreClient::new(&base_url);
6823
6824        let schema = KvSchema::new(client)
6825            .table(
6826                "orders",
6827                vec![
6828                    TableColumnConfig::new("id", DataType::Int64, false),
6829                    TableColumnConfig::new("status", DataType::Utf8, false),
6830                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
6831                ],
6832                vec!["id".to_string()],
6833                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6834                    .expect("valid")
6835                    .with_cover_columns(vec!["amount_cents".to_string()])],
6836            )
6837            .expect("schema");
6838
6839        let mut writer = schema.batch_writer();
6840        for (id, status, amount) in [
6841            (1, "open", 10),
6842            (2, "open", 30),
6843            (3, "closed", 15),
6844            (4, "closed", 40),
6845        ] {
6846            writer
6847                .insert(
6848                    "orders",
6849                    vec![
6850                        CellValue::Int64(id),
6851                        CellValue::Utf8(status.to_string()),
6852                        CellValue::Int64(amount),
6853                    ],
6854                )
6855                .expect("row");
6856        }
6857        writer.flush().await.expect("flush");
6858
6859        let ctx = SessionContext::new();
6860        schema.register_all(&ctx).expect("register");
6861
6862        state.range_calls.store(0, AtomicOrdering::SeqCst);
6863        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6864
6865        let batches = ctx
6866            .sql(
6867                "SELECT CAST(status AS VARCHAR) AS status_text, \
6868                        SUM(CAST(amount_cents AS DOUBLE)) AS total_cents \
6869                 FROM orders \
6870                 GROUP BY CAST(status AS VARCHAR) \
6871                 ORDER BY status_text",
6872            )
6873            .await
6874            .expect("query")
6875            .collect()
6876            .await
6877            .expect("collect");
6878
6879        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
6880        let batch = &batches[0];
6881        let status = ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar");
6882        assert_eq!(scalar_to_string(&status).as_deref(), Some("closed"));
6883        let closed_total = batch
6884            .column(1)
6885            .as_any()
6886            .downcast_ref::<Float64Array>()
6887            .expect("sum float64")
6888            .value(0);
6889        let open_total = batch
6890            .column(1)
6891            .as_any()
6892            .downcast_ref::<Float64Array>()
6893            .expect("sum float64")
6894            .value(1);
6895        assert_eq!(closed_total, 55.0);
6896        assert_eq!(open_total, 40.0);
6897        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6898        assert!(
6899            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6900            "casted grouped aggregates should stay on the reduction path"
6901        );
6902
6903        let _ = shutdown_tx.send(());
6904    }
6905
6906    #[tokio::test]
6907    async fn aggregate_pushdown_supports_computed_aggregate_inputs() {
6908        let state = MockState {
6909            kv: Arc::new(Mutex::new(BTreeMap::new())),
6910            range_calls: Arc::new(AtomicUsize::new(0)),
6911            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6912            sequence_number: Arc::new(AtomicU64::new(0)),
6913        };
6914        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6915        let client = StoreClient::new(&base_url);
6916
6917        let schema = KvSchema::new(client)
6918            .table(
6919                "orders",
6920                vec![
6921                    TableColumnConfig::new("id", DataType::Int64, false),
6922                    TableColumnConfig::new("price_cents", DataType::Int64, false),
6923                    TableColumnConfig::new("qty", DataType::Int64, false),
6924                    TableColumnConfig::new("duration_ms", DataType::Int64, false),
6925                ],
6926                vec!["id".to_string()],
6927                vec![],
6928            )
6929            .expect("schema");
6930
6931        let mut writer = schema.batch_writer();
6932        for (id, price, qty, duration_ms) in [(1, 10, 2, 500), (2, 15, 3, 2500), (3, 7, 4, 1000)] {
6933            writer
6934                .insert(
6935                    "orders",
6936                    vec![
6937                        CellValue::Int64(id),
6938                        CellValue::Int64(price),
6939                        CellValue::Int64(qty),
6940                        CellValue::Int64(duration_ms),
6941                    ],
6942                )
6943                .expect("row");
6944        }
6945        writer.flush().await.expect("flush");
6946
6947        let ctx = SessionContext::new();
6948        schema.register_all(&ctx).expect("register");
6949
6950        state.range_calls.store(0, AtomicOrdering::SeqCst);
6951        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6952
6953        let batches = ctx
6954            .sql(
6955                "SELECT SUM(price_cents * qty) AS total_revenue, \
6956                        AVG(duration_ms / 1e3) AS avg_seconds \
6957                 FROM orders",
6958            )
6959            .await
6960            .expect("query")
6961            .collect()
6962            .await
6963            .expect("collect");
6964
6965        assert_eq!(batches.len(), 1);
6966        let batch = &batches[0];
6967        assert_eq!(
6968            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6969            ScalarValue::Int64(Some(93))
6970        );
6971        let avg_seconds = batch
6972            .column(1)
6973            .as_any()
6974            .downcast_ref::<Float64Array>()
6975            .expect("avg float64")
6976            .value(0);
6977        assert!((avg_seconds - (4.0 / 3.0)).abs() < 1e-12);
6978        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6979        assert!(
6980            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
6981            "computed aggregate inputs should use reduction jobs"
6982        );
6983
6984        let _ = shutdown_tx.send(());
6985    }
6986
6987    #[tokio::test]
6988    async fn aggregate_pushdown_supports_add_and_subtract_inputs() {
6989        let state = MockState {
6990            kv: Arc::new(Mutex::new(BTreeMap::new())),
6991            range_calls: Arc::new(AtomicUsize::new(0)),
6992            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6993            sequence_number: Arc::new(AtomicU64::new(0)),
6994        };
6995        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6996        let client = StoreClient::new(&base_url);
6997
6998        let schema = KvSchema::new(client)
6999            .table(
7000                "orders",
7001                vec![
7002                    TableColumnConfig::new("id", DataType::Int64, false),
7003                    TableColumnConfig::new("price_cents", DataType::Int64, false),
7004                    TableColumnConfig::new("fee_cents", DataType::Int64, false),
7005                    TableColumnConfig::new("discount_cents", DataType::Int64, false),
7006                ],
7007                vec!["id".to_string()],
7008                vec![],
7009            )
7010            .expect("schema");
7011
7012        let mut writer = schema.batch_writer();
7013        for (id, price, fee, discount) in [(1, 10, 2, 1), (2, 15, 3, 4), (3, 7, 1, 2)] {
7014            writer
7015                .insert(
7016                    "orders",
7017                    vec![
7018                        CellValue::Int64(id),
7019                        CellValue::Int64(price),
7020                        CellValue::Int64(fee),
7021                        CellValue::Int64(discount),
7022                    ],
7023                )
7024                .expect("row");
7025        }
7026        writer.flush().await.expect("flush");
7027
7028        let ctx = SessionContext::new();
7029        schema.register_all(&ctx).expect("register");
7030
7031        state.range_calls.store(0, AtomicOrdering::SeqCst);
7032        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7033
7034        let batches = ctx
7035            .sql(
7036                "SELECT SUM(price_cents + fee_cents) AS gross_plus_fee, \
7037                        SUM(price_cents - discount_cents) AS net_total \
7038                 FROM orders",
7039            )
7040            .await
7041            .expect("query")
7042            .collect()
7043            .await
7044            .expect("collect");
7045
7046        assert_eq!(batches.len(), 1);
7047        let batch = &batches[0];
7048        assert_eq!(
7049            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7050            ScalarValue::Int64(Some(38))
7051        );
7052        assert_eq!(
7053            ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7054            ScalarValue::Int64(Some(25))
7055        );
7056        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7057        assert!(
7058            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7059            "add/sub aggregate inputs should use reduction jobs"
7060        );
7061
7062        let _ = shutdown_tx.send(());
7063    }
7064
7065    #[tokio::test]
7066    async fn aggregate_pushdown_supports_case_filtered_computed_aggregates() {
7067        let state = MockState {
7068            kv: Arc::new(Mutex::new(BTreeMap::new())),
7069            range_calls: Arc::new(AtomicUsize::new(0)),
7070            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7071            sequence_number: Arc::new(AtomicU64::new(0)),
7072        };
7073        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7074        let client = StoreClient::new(&base_url);
7075
7076        let schema = KvSchema::new(client)
7077            .table(
7078                "orders",
7079                vec![
7080                    TableColumnConfig::new("id", DataType::Int64, false),
7081                    TableColumnConfig::new("status", DataType::Utf8, false),
7082                    TableColumnConfig::new("price_cents", DataType::Int64, false),
7083                    TableColumnConfig::new("qty", DataType::Int64, false),
7084                ],
7085                vec!["id".to_string()],
7086                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7087                    .expect("valid")
7088                    .with_cover_columns(vec!["price_cents".to_string(), "qty".to_string()])],
7089            )
7090            .expect("schema");
7091
7092        let mut writer = schema.batch_writer();
7093        for (id, status, price, qty) in [
7094            (1, "open", 10, 2),
7095            (2, "closed", 99, 1),
7096            (3, "open", 15, 3),
7097            (4, "closed", 7, 4),
7098        ] {
7099            writer
7100                .insert(
7101                    "orders",
7102                    vec![
7103                        CellValue::Int64(id),
7104                        CellValue::Utf8(status.to_string()),
7105                        CellValue::Int64(price),
7106                        CellValue::Int64(qty),
7107                    ],
7108                )
7109                .expect("row");
7110        }
7111        writer.flush().await.expect("flush");
7112
7113        let ctx = SessionContext::new();
7114        schema.register_all(&ctx).expect("register");
7115
7116        state.range_calls.store(0, AtomicOrdering::SeqCst);
7117        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7118
7119        let batches = ctx
7120            .sql(
7121                "SELECT SUM(CASE WHEN status = 'open' THEN price_cents * qty END) \
7122                 AS open_revenue \
7123                 FROM orders",
7124            )
7125            .await
7126            .expect("query")
7127            .collect()
7128            .await
7129            .expect("collect");
7130
7131        assert_eq!(batches.len(), 1);
7132        let batch = &batches[0];
7133        assert_eq!(
7134            ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7135            ScalarValue::Int64(Some(65))
7136        );
7137        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7138        assert!(
7139            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7140            "case-filtered computed aggregate should use reduction jobs"
7141        );
7142
7143        let _ = shutdown_tx.send(());
7144    }
7145
7146    #[tokio::test]
7147    async fn aggregate_pushdown_does_not_rewrite_sum_case_else_zero_semantics() {
7148        let state = MockState {
7149            kv: Arc::new(Mutex::new(BTreeMap::new())),
7150            range_calls: Arc::new(AtomicUsize::new(0)),
7151            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7152            sequence_number: Arc::new(AtomicU64::new(0)),
7153        };
7154        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7155        let client = StoreClient::new(&base_url);
7156
7157        let schema = KvSchema::new(client)
7158            .table(
7159                "orders",
7160                vec![
7161                    TableColumnConfig::new("id", DataType::Int64, false),
7162                    TableColumnConfig::new("region", DataType::Utf8, false),
7163                    TableColumnConfig::new("status", DataType::Utf8, false),
7164                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7165                ],
7166                vec!["id".to_string()],
7167                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7168                    .expect("valid")
7169                    .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7170            )
7171            .expect("schema");
7172
7173        let mut writer = schema.batch_writer();
7174        for (id, region, status, amount) in [
7175            (1, "east", "open", 10),
7176            (2, "east", "closed", 20),
7177            (3, "west", "closed", 30),
7178        ] {
7179            writer
7180                .insert(
7181                    "orders",
7182                    vec![
7183                        CellValue::Int64(id),
7184                        CellValue::Utf8(region.to_string()),
7185                        CellValue::Utf8(status.to_string()),
7186                        CellValue::Int64(amount),
7187                    ],
7188                )
7189                .expect("row");
7190        }
7191        writer.flush().await.expect("flush");
7192
7193        let ctx = SessionContext::new();
7194        schema.register_all(&ctx).expect("register");
7195
7196        state.range_calls.store(0, AtomicOrdering::SeqCst);
7197        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7198
7199        let batches = ctx
7200            .sql(
7201                "SELECT region, \
7202                        SUM(CASE WHEN status = 'open' THEN amount_cents ELSE 0 END) AS open_total \
7203                 FROM orders \
7204                 GROUP BY region \
7205                 ORDER BY region",
7206            )
7207            .await
7208            .expect("query")
7209            .collect()
7210            .await
7211            .expect("collect");
7212
7213        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7214        let batch = &batches[0];
7215        assert_eq!(
7216            ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7217            ScalarValue::Utf8(Some("east".to_string()))
7218        );
7219        assert_eq!(
7220            ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7221            ScalarValue::Int64(Some(10))
7222        );
7223        assert_eq!(
7224            ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7225            ScalarValue::Utf8(Some("west".to_string()))
7226        );
7227        assert_eq!(
7228            ScalarValue::try_from_array(batch.column(1), 1).expect("sum scalar"),
7229            ScalarValue::Int64(Some(0))
7230        );
7231        assert_eq!(
7232            state.range_reduce_calls.load(AtomicOrdering::SeqCst),
7233            0,
7234            "SUM(CASE ... ELSE 0 END) must not push down because FILTER rewrite changes semantics"
7235        );
7236
7237        let _ = shutdown_tx.send(());
7238    }
7239
7240    #[tokio::test]
7241    async fn aggregate_pushdown_supports_computed_group_keys() {
7242        let state = MockState {
7243            kv: Arc::new(Mutex::new(BTreeMap::new())),
7244            range_calls: Arc::new(AtomicUsize::new(0)),
7245            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7246            sequence_number: Arc::new(AtomicU64::new(0)),
7247        };
7248        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7249        let client = StoreClient::new(&base_url);
7250
7251        let schema = KvSchema::new(client)
7252            .table(
7253                "events",
7254                vec![
7255                    TableColumnConfig::new("id", DataType::Int64, false),
7256                    TableColumnConfig::new("country", DataType::Utf8, false),
7257                    TableColumnConfig::new(
7258                        "occurred_at",
7259                        DataType::Timestamp(TimeUnit::Microsecond, None),
7260                        false,
7261                    ),
7262                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7263                ],
7264                vec!["id".to_string()],
7265                vec![],
7266            )
7267            .expect("schema");
7268
7269        let day_micros = 86_400_000_000i64;
7270        let day0 = 1_700_000_000_000_000i64;
7271        let day1 = day0 + day_micros;
7272        let day0_bucket = day0.div_euclid(day_micros) * day_micros;
7273        let day1_bucket = day1.div_euclid(day_micros) * day_micros;
7274        let mut writer = schema.batch_writer();
7275        for (id, country, occurred_at, amount) in [
7276            (1, "East", day0 + 111, 10),
7277            (2, "east", day0 + 222, 30),
7278            (3, "West", day1 + 333, 7),
7279        ] {
7280            writer
7281                .insert(
7282                    "events",
7283                    vec![
7284                        CellValue::Int64(id),
7285                        CellValue::Utf8(country.to_string()),
7286                        CellValue::Timestamp(occurred_at),
7287                        CellValue::Int64(amount),
7288                    ],
7289                )
7290                .expect("row");
7291        }
7292        writer.flush().await.expect("flush");
7293
7294        let ctx = SessionContext::new();
7295        schema.register_all(&ctx).expect("register");
7296
7297        state.range_calls.store(0, AtomicOrdering::SeqCst);
7298        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7299
7300        let batches = ctx
7301            .sql(
7302                "SELECT lower(country) AS country_norm, \
7303                        date_trunc('day', occurred_at) AS day_bucket, \
7304                        SUM(amount_cents) AS total_cents \
7305                 FROM events \
7306                 GROUP BY lower(country), date_trunc('day', occurred_at) \
7307                 ORDER BY country_norm, day_bucket",
7308            )
7309            .await
7310            .expect("query")
7311            .collect()
7312            .await
7313            .expect("collect");
7314
7315        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7316        let batch = &batches[0];
7317        assert_eq!(
7318            scalar_to_string(
7319                &ScalarValue::try_from_array(batch.column(0), 0).expect("country scalar")
7320            )
7321            .as_deref(),
7322            Some("east")
7323        );
7324        assert_eq!(
7325            ScalarValue::try_from_array(batch.column(1), 0).expect("day scalar"),
7326            ScalarValue::TimestampMicrosecond(Some(day0_bucket), None)
7327        );
7328        assert_eq!(
7329            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7330            ScalarValue::Int64(Some(40))
7331        );
7332        assert_eq!(
7333            scalar_to_string(
7334                &ScalarValue::try_from_array(batch.column(0), 1).expect("country scalar")
7335            )
7336            .as_deref(),
7337            Some("west")
7338        );
7339        assert_eq!(
7340            ScalarValue::try_from_array(batch.column(1), 1).expect("day scalar"),
7341            ScalarValue::TimestampMicrosecond(Some(day1_bucket), None)
7342        );
7343        assert_eq!(
7344            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7345            ScalarValue::Int64(Some(7))
7346        );
7347        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7348        assert!(
7349            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7350            "computed group keys should use grouped reduction path"
7351        );
7352
7353        let _ = shutdown_tx.send(());
7354    }
7355
7356    #[tokio::test]
7357    async fn aggregate_pushdown_supports_group_by_queries() {
7358        let state = MockState {
7359            kv: Arc::new(Mutex::new(BTreeMap::new())),
7360            range_calls: Arc::new(AtomicUsize::new(0)),
7361            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7362            sequence_number: Arc::new(AtomicU64::new(0)),
7363        };
7364        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7365        let client = StoreClient::new(&base_url);
7366
7367        let schema = KvSchema::new(client)
7368            .table(
7369                "orders",
7370                vec![
7371                    TableColumnConfig::new("id", DataType::Int64, false),
7372                    TableColumnConfig::new("status", DataType::Utf8, false),
7373                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7374                ],
7375                vec!["id".to_string()],
7376                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7377                    .expect("valid")
7378                    .with_cover_columns(vec!["amount_cents".to_string()])],
7379            )
7380            .expect("schema");
7381
7382        let mut writer = schema.batch_writer();
7383        for (id, status, amount) in [
7384            (1, "open", 10),
7385            (2, "open", 30),
7386            (3, "closed", 15),
7387            (4, "closed", 40),
7388        ] {
7389            writer
7390                .insert(
7391                    "orders",
7392                    vec![
7393                        CellValue::Int64(id),
7394                        CellValue::Utf8(status.to_string()),
7395                        CellValue::Int64(amount),
7396                    ],
7397                )
7398                .expect("row");
7399        }
7400        writer.flush().await.expect("flush");
7401
7402        let ctx = SessionContext::new();
7403        schema.register_all(&ctx).expect("register");
7404
7405        state.range_calls.store(0, AtomicOrdering::SeqCst);
7406        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7407
7408        let batches = ctx
7409            .sql(
7410                "SELECT status, COUNT(*) AS row_count, SUM(amount_cents) AS total_cents \
7411                 FROM orders GROUP BY status ORDER BY status",
7412            )
7413            .await
7414            .expect("query")
7415            .collect()
7416            .await
7417            .expect("collect");
7418
7419        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7420        let batch = &batches[0];
7421        assert_eq!(
7422            ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar"),
7423            ScalarValue::Utf8(Some("closed".to_string()))
7424        );
7425        assert_count_scalar(batch, 1, 0, 2);
7426        assert_eq!(
7427            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7428            ScalarValue::Int64(Some(55))
7429        );
7430        assert_eq!(
7431            ScalarValue::try_from_array(batch.column(0), 1).expect("status scalar"),
7432            ScalarValue::Utf8(Some("open".to_string()))
7433        );
7434        assert_count_scalar(batch, 1, 1, 2);
7435        assert_eq!(
7436            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7437            ScalarValue::Int64(Some(40))
7438        );
7439        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7440        assert!(
7441            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7442            "group-by aggregate should use grouped range reduction path"
7443        );
7444
7445        let _ = shutdown_tx.send(());
7446    }
7447
7448    #[tokio::test]
7449    async fn aggregate_pushdown_group_by_float_canonicalizes_signed_zero() {
7450        let state = MockState {
7451            kv: Arc::new(Mutex::new(BTreeMap::new())),
7452            range_calls: Arc::new(AtomicUsize::new(0)),
7453            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7454            sequence_number: Arc::new(AtomicU64::new(0)),
7455        };
7456        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7457        let client = StoreClient::new(&base_url);
7458
7459        let schema = KvSchema::new(client)
7460            .table(
7461                "metrics",
7462                vec![
7463                    TableColumnConfig::new("id", DataType::Int64, false),
7464                    TableColumnConfig::new("score", DataType::Float64, false),
7465                ],
7466                vec!["id".to_string()],
7467                vec![],
7468            )
7469            .expect("schema");
7470
7471        let mut writer = schema.batch_writer();
7472        for (id, score) in [(1, -0.0), (2, 0.0), (3, 1.5)] {
7473            writer
7474                .insert(
7475                    "metrics",
7476                    vec![CellValue::Int64(id), CellValue::Float64(score)],
7477                )
7478                .expect("row");
7479        }
7480        writer.flush().await.expect("flush");
7481
7482        let ctx = SessionContext::new();
7483        schema.register_all(&ctx).expect("register");
7484
7485        state.range_calls.store(0, AtomicOrdering::SeqCst);
7486        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7487
7488        let batches = ctx
7489            .sql(
7490                "SELECT score, COUNT(*) AS row_count \
7491                 FROM metrics GROUP BY score ORDER BY row_count DESC, score",
7492            )
7493            .await
7494            .expect("query")
7495            .collect()
7496            .await
7497            .expect("collect");
7498
7499        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7500        let batch = &batches[0];
7501        let top_score = batch
7502            .column(0)
7503            .as_any()
7504            .downcast_ref::<Float64Array>()
7505            .expect("score float64")
7506            .value(0);
7507        assert_eq!(top_score.to_bits(), 0.0f64.to_bits());
7508        assert_count_scalar(batch, 1, 0, 2);
7509        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7510        assert!(
7511            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7512            "float group-by aggregate should stay on grouped reduction path"
7513        );
7514
7515        let _ = shutdown_tx.send(());
7516    }
7517
7518    #[tokio::test]
7519    async fn aggregate_pushdown_supports_filtered_group_by_queries() {
7520        let state = MockState {
7521            kv: Arc::new(Mutex::new(BTreeMap::new())),
7522            range_calls: Arc::new(AtomicUsize::new(0)),
7523            range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7524            sequence_number: Arc::new(AtomicU64::new(0)),
7525        };
7526        let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7527        let client = StoreClient::new(&base_url);
7528
7529        let schema = KvSchema::new(client)
7530            .table(
7531                "orders",
7532                vec![
7533                    TableColumnConfig::new("id", DataType::Int64, false),
7534                    TableColumnConfig::new("region", DataType::Utf8, false),
7535                    TableColumnConfig::new("status", DataType::Utf8, false),
7536                    TableColumnConfig::new("amount_cents", DataType::Int64, false),
7537                ],
7538                vec!["id".to_string()],
7539                vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7540                    .expect("valid")
7541                    .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7542            )
7543            .expect("schema");
7544
7545        let mut writer = schema.batch_writer();
7546        for (id, region, status, amount) in [
7547            (1, "east", "open", 10),
7548            (2, "east", "closed", 20),
7549            (3, "west", "open", 30),
7550            (4, "north", "closed", 40),
7551        ] {
7552            writer
7553                .insert(
7554                    "orders",
7555                    vec![
7556                        CellValue::Int64(id),
7557                        CellValue::Utf8(region.to_string()),
7558                        CellValue::Utf8(status.to_string()),
7559                        CellValue::Int64(amount),
7560                    ],
7561                )
7562                .expect("row");
7563        }
7564        writer.flush().await.expect("flush");
7565
7566        let ctx = SessionContext::new();
7567        schema.register_all(&ctx).expect("register");
7568
7569        state.range_calls.store(0, AtomicOrdering::SeqCst);
7570        state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7571
7572        let batches = ctx
7573            .sql(
7574                "SELECT region, \
7575                        COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
7576                        SUM(amount_cents) FILTER (WHERE status = 'closed') AS closed_total \
7577                 FROM orders \
7578                 GROUP BY region \
7579                 ORDER BY region",
7580            )
7581            .await
7582            .expect("query")
7583            .collect()
7584            .await
7585            .expect("collect");
7586
7587        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
7588        let batch = &batches[0];
7589
7590        assert_eq!(
7591            ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7592            ScalarValue::Utf8(Some("east".to_string()))
7593        );
7594        assert_count_scalar(batch, 1, 0, 1);
7595        assert_eq!(
7596            ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7597            ScalarValue::Int64(Some(20))
7598        );
7599
7600        assert_eq!(
7601            ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7602            ScalarValue::Utf8(Some("north".to_string()))
7603        );
7604        assert_count_scalar(batch, 1, 1, 0);
7605        assert_eq!(
7606            ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7607            ScalarValue::Int64(Some(40))
7608        );
7609
7610        assert_eq!(
7611            ScalarValue::try_from_array(batch.column(0), 2).expect("region scalar"),
7612            ScalarValue::Utf8(Some("west".to_string()))
7613        );
7614        assert_count_scalar(batch, 1, 2, 1);
7615        assert_eq!(
7616            ScalarValue::try_from_array(batch.column(2), 2).expect("sum scalar"),
7617            ScalarValue::Int64(None)
7618        );
7619
7620        assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7621        assert!(
7622            state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
7623            "filtered group-by aggregate should use grouped reduction plus seed job"
7624        );
7625
7626        let _ = shutdown_tx.send(());
7627    }
7628
7629    mod e2e {
7630        use super::*;
7631        use axum::{routing::get, Router};
7632        use datafusion::prelude::SessionContext;
7633        use exoware_sdk::StoreClient;
7634        use exoware_server::{connect_stack, AppState};
7635        use exoware_simulator::RocksStore;
7636        use tempfile::tempdir;
7637
7638        struct TestServers {
7639            ingest_url: String,
7640            query_url: String,
7641        }
7642
7643        impl TestServers {
7644            fn client(&self) -> StoreClient {
7645                StoreClient::with_split_urls(
7646                    &self.query_url,
7647                    &self.ingest_url,
7648                    &self.query_url,
7649                    &self.ingest_url,
7650                )
7651            }
7652        }
7653
7654        async fn spawn_e2e_servers() -> TestServers {
7655            let dir = tempdir().expect("tempdir");
7656            let db = RocksStore::open(dir.path()).expect("db");
7657            let state = AppState::new(std::sync::Arc::new(db));
7658            let connect = connect_stack(state);
7659            let app = Router::new()
7660                .route("/health", get(|| async { "ok" }))
7661                .fallback_service(connect);
7662            let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
7663                .await
7664                .expect("bind");
7665            let url = format!("http://{}", listener.local_addr().unwrap());
7666            tokio::spawn(async move {
7667                axum::serve(listener, app).await.expect("serve");
7668            });
7669            for _ in 0..200 {
7670                if reqwest::get(format!("{url}/health"))
7671                    .await
7672                    .ok()
7673                    .is_some_and(|r| r.status().is_success())
7674                {
7675                    return TestServers {
7676                        ingest_url: url.clone(),
7677                        query_url: url,
7678                    };
7679                }
7680                tokio::time::sleep(std::time::Duration::from_millis(25)).await;
7681            }
7682            panic!("e2e simulator did not become ready");
7683        }
7684
7685        #[tokio::test]
7686        async fn sql_insert_and_select_through_real_ingest_query_workers() {
7687            let servers = spawn_e2e_servers().await;
7688            let client = servers.client();
7689
7690            let schema = KvSchema::new(client)
7691                .table(
7692                    "orders",
7693                    vec![
7694                        TableColumnConfig::new("id", DataType::Int64, false),
7695                        TableColumnConfig::new("status", DataType::Utf8, false),
7696                        TableColumnConfig::new("amount_cents", DataType::Int64, false),
7697                    ],
7698                    vec!["id".to_string()],
7699                    vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7700                        .expect("valid")
7701                        .with_cover_columns(vec!["amount_cents".to_string()])],
7702                )
7703                .expect("schema");
7704
7705            let mut writer = schema.batch_writer();
7706            for (id, status, amount) in [
7707                (1i64, "open", 100i64),
7708                (2, "closed", 200),
7709                (3, "open", 300),
7710                (4, "closed", 400),
7711                (5, "open", 500),
7712            ] {
7713                writer
7714                    .insert(
7715                        "orders",
7716                        vec![
7717                            CellValue::Int64(id),
7718                            CellValue::Utf8(status.to_string()),
7719                            CellValue::Int64(amount),
7720                        ],
7721                    )
7722                    .expect("insert row");
7723            }
7724            writer.flush().await.expect("flush batch");
7725
7726            let ctx = SessionContext::new();
7727            schema.register_all(&ctx).expect("register tables");
7728
7729            let batches = ctx
7730                .sql("SELECT id, amount_cents FROM orders ORDER BY id")
7731                .await
7732                .expect("full scan query")
7733                .collect()
7734                .await
7735                .expect("collect full scan");
7736            let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
7737            assert_eq!(total_rows, 5, "all 5 rows returned from full scan");
7738
7739            let mut ids = Vec::new();
7740            let mut amounts = Vec::new();
7741            for batch in &batches {
7742                let id_col = batch
7743                    .column(0)
7744                    .as_any()
7745                    .downcast_ref::<Int64Array>()
7746                    .expect("id column");
7747                let amt_col = batch
7748                    .column(1)
7749                    .as_any()
7750                    .downcast_ref::<Int64Array>()
7751                    .expect("amount column");
7752                for i in 0..batch.num_rows() {
7753                    ids.push(id_col.value(i));
7754                    amounts.push(amt_col.value(i));
7755                }
7756            }
7757            assert_eq!(ids, vec![1, 2, 3, 4, 5]);
7758            assert_eq!(amounts, vec![100, 200, 300, 400, 500]);
7759
7760            let filtered = ctx
7761                .sql(
7762                    "SELECT id, amount_cents FROM orders \
7763                     WHERE status = 'open' ORDER BY id",
7764                )
7765                .await
7766                .expect("filtered query")
7767                .collect()
7768                .await
7769                .expect("collect filtered");
7770            let mut filtered_ids = Vec::new();
7771            let mut filtered_amounts = Vec::new();
7772            for batch in &filtered {
7773                let id_col = batch
7774                    .column(0)
7775                    .as_any()
7776                    .downcast_ref::<Int64Array>()
7777                    .expect("id column");
7778                let amt_col = batch
7779                    .column(1)
7780                    .as_any()
7781                    .downcast_ref::<Int64Array>()
7782                    .expect("amount column");
7783                for i in 0..batch.num_rows() {
7784                    filtered_ids.push(id_col.value(i));
7785                    filtered_amounts.push(amt_col.value(i));
7786                }
7787            }
7788            assert_eq!(filtered_ids, vec![1, 3, 5]);
7789            assert_eq!(filtered_amounts, vec![100, 300, 500]);
7790
7791            let agg = ctx
7792                .sql(
7793                    "SELECT COUNT(*) AS cnt, SUM(amount_cents) AS total \
7794                     FROM orders WHERE status = 'open'",
7795                )
7796                .await
7797                .expect("aggregate query")
7798                .collect()
7799                .await
7800                .expect("collect aggregate");
7801            assert_eq!(agg.len(), 1);
7802            let batch = &agg[0];
7803            assert_eq!(batch.num_rows(), 1);
7804            assert_count_scalar(batch, 0, 0, 3);
7805            let total = ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar");
7806            match total {
7807                ScalarValue::Int64(Some(v)) => assert_eq!(v, 900),
7808                other => panic!("unexpected sum type: {other:?}"),
7809            }
7810        }
7811    }
7812}