1pub mod prune;
2
3mod aggregate;
4mod builder;
5mod codec;
6mod diagnostics;
7mod filter;
8mod predicate;
9mod scan;
10mod schema;
11mod types;
12mod writer;
13
14pub use schema::KvSchema;
15pub use types::default_orders_index_specs;
16pub use types::{
17 CellValue, IndexBackfillEvent, IndexBackfillOptions, IndexBackfillReport, IndexLayout,
18 IndexSpec, TableColumnConfig,
19};
20pub use writer::{BatchWriter, TableWriter};
21
22#[cfg(test)]
23mod tests {
24 use super::aggregate::*;
25 use super::builder::*;
26 use super::codec::*;
27 use super::diagnostics::*;
28 use super::filter::*;
29 use super::predicate::*;
30 use super::scan::*;
31 use super::types::*;
32 use super::writer::*;
33 use super::*;
34 use commonware_codec::Encode;
35 use datafusion::arrow::array::{Float64Array, Int64Array, LargeStringArray, StringViewArray};
36 use datafusion::arrow::datatypes::{i256, DataType, TimeUnit};
37 use datafusion::arrow::record_batch::RecordBatch;
38 use datafusion::common::ScalarValue;
39 use datafusion::logical_expr::{Expr, Operator};
40 use datafusion::physical_plan::ExecutionPlan;
41 use datafusion::prelude::SessionContext;
42 use exoware_sdk_rs::keys::{Key, KeyCodec};
43 use exoware_sdk_rs::kv_codec::{
44 canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key,
45 eval_predicate, KvReducedValue, StoredRow,
46 };
47 use exoware_sdk_rs::{RangeReduceOp, RangeReduceRequest, StoreClient};
48 use std::collections::{BTreeMap, HashSet};
49 use std::ops::Bound::{Included, Unbounded};
50 use std::pin::Pin;
51 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
52 use std::sync::{Arc, Mutex};
53 use std::time::Duration;
54
55 use axum::Router;
56 use bytes::Bytes;
57 use connectrpc::{Chain, ConnectError, ConnectRpcService, Context};
58 use exoware_sdk_rs::connect_compression_registry;
59 use exoware_sdk_rs::kv_codec::{eval_expr, expr_needs_value};
60 use exoware_sdk_rs::store::ingest::v1::{
61 PutResponse as ProtoPutResponse, Service as IngestService,
62 ServiceServer as IngestServiceServer,
63 };
64 use exoware_sdk_rs::store::query::v1::RangeEntry as ProtoRangeEntry;
65 use exoware_sdk_rs::store::query::v1::{
66 GetManyEntry as ProtoGetManyEntry, GetManyFrame as ProtoGetManyFrame,
67 GetResponse as ProtoGetResponse, RangeFrame as ProtoRangeFrame,
68 ReduceResponse as ProtoReduceResponse, Service as QueryService,
69 ServiceServer as QueryServiceServer,
70 };
71 use exoware_sdk_rs::RangeMode;
72 use exoware_sdk_rs::{
73 parse_range_traversal_direction, to_domain_reduce_request, to_proto_optional_reduced_value,
74 to_proto_reduced_value, RangeTraversalDirection, RangeTraversalModeError,
75 };
76 use exoware_sdk_rs::{RangeReduceGroup, RangeReduceResponse, RangeReduceResult};
77 use futures::{stream, Stream, TryStreamExt};
78 use tokio::sync::{mpsc, oneshot, Notify};
79
80 fn assert_explain_includes_query_stats_surface(
82 explain: &str,
83 surface: QueryStatsExplainSurface,
84 ) {
85 let expected = format!("query_stats={}", format_query_stats_explain(surface));
86 assert!(
87 explain.contains(&expected),
88 "expected EXPLAIN output to include `{expected}`\n{explain}"
89 );
90 }
91
92 fn simple_int64_model(prefix: u8) -> TableModel {
93 let config = KvTableConfig::new(
94 prefix,
95 vec![TableColumnConfig::new("id", DataType::Int64, false)],
96 vec!["id".to_string()],
97 vec![],
98 )
99 .unwrap();
100 TableModel::from_config(&config).unwrap()
101 }
102
103 fn codec_payload(codec: KeyCodec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
104 codec.read_payload(key, offset, len).expect("codec payload")
105 }
106
107 fn primary_payload(model: &TableModel, key: &Key, offset: usize, len: usize) -> Vec<u8> {
108 codec_payload(model.primary_key_codec, key, offset, len)
109 }
110
111 fn index_payload(spec: &ResolvedIndexSpec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
112 codec_payload(spec.codec, key, offset, len)
113 }
114
115 fn matches_primary_key(table_prefix: u8, key: &Key) -> bool {
116 primary_key_codec(table_prefix)
117 .expect("primary codec")
118 .matches(key)
119 }
120
121 fn matches_secondary_index_key(table_prefix: u8, index_id: u8, key: &Key) -> bool {
122 secondary_index_codec(table_prefix, index_id)
123 .expect("secondary codec")
124 .matches(key)
125 }
126
127 fn test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
128 let config = KvTableConfig::new(
129 0,
130 vec![
131 TableColumnConfig::new("region", DataType::Utf8, false),
132 TableColumnConfig::new("customer_id", DataType::Int64, false),
133 TableColumnConfig::new("order_id", DataType::Int64, false),
134 TableColumnConfig::new("amount_cents", DataType::Int64, false),
135 TableColumnConfig::new("status", DataType::Utf8, false),
136 ],
137 vec!["order_id".to_string()],
138 vec![
139 IndexSpec::new(
140 "region_customer",
141 vec!["region".to_string(), "customer_id".to_string()],
142 )
143 .expect("valid"),
144 IndexSpec::new(
145 "status_customer",
146 vec!["status".to_string(), "customer_id".to_string()],
147 )
148 .expect("valid"),
149 ],
150 )
151 .expect("valid config");
152 let model = TableModel::from_config(&config).expect("model");
153 let specs = model
154 .resolve_index_specs(&config.index_specs)
155 .expect("specs");
156 (model, specs)
157 }
158
159 fn zorder_test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
160 let config = KvTableConfig::new(
161 0,
162 vec![
163 TableColumnConfig::new("x", DataType::Int64, false),
164 TableColumnConfig::new("y", DataType::Int64, false),
165 TableColumnConfig::new("id", DataType::Int64, false),
166 TableColumnConfig::new("value", DataType::Int64, false),
167 ],
168 vec!["id".to_string()],
169 vec![
170 IndexSpec::new("xy_lex", vec!["x".to_string(), "y".to_string()])
171 .expect("valid")
172 .with_cover_columns(vec!["value".to_string()]),
173 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
174 .expect("valid")
175 .with_cover_columns(vec!["value".to_string()]),
176 ],
177 )
178 .expect("valid config");
179 let model = TableModel::from_config(&config).expect("model");
180 let specs = model
181 .resolve_index_specs(&config.index_specs)
182 .expect("specs");
183 (model, specs)
184 }
185
186 #[derive(Clone)]
187 struct MockState {
188 kv: Arc<Mutex<BTreeMap<Key, Bytes>>>,
189 range_calls: Arc<AtomicUsize>,
190 range_reduce_calls: Arc<AtomicUsize>,
191 sequence_number: Arc<AtomicU64>,
192 }
193
194 #[derive(Debug)]
195 struct MockGroupedReduceState {
196 group_values: Vec<Option<KvReducedValue>>,
197 states: Vec<PartialAggregateState>,
198 }
199
200 type MockReduceRow = (Vec<Option<KvReducedValue>>, Vec<Option<KvReducedValue>>);
201
202 fn extract_mock_reduce_row(
203 key: &Key,
204 value: &Bytes,
205 request: &RangeReduceRequest,
206 ) -> Option<MockReduceRow> {
207 let needs_value = request
208 .group_by
209 .iter()
210 .chain(
211 request
212 .reducers
213 .iter()
214 .filter_map(|reducer| reducer.expr.as_ref()),
215 )
216 .any(expr_needs_value)
217 || request
218 .filter
219 .as_ref()
220 .is_some_and(exoware_sdk_rs::kv_codec::predicate_needs_value);
221 let archived = if needs_value {
222 decode_stored_row(value.as_ref()).ok()
223 } else {
224 None
225 };
226
227 if let Some(filter) = &request.filter {
228 if !eval_predicate(key, archived.as_ref(), filter).ok()? {
229 return None;
230 }
231 }
232
233 let mut group_values = Vec::with_capacity(request.group_by.len());
234 for expr in &request.group_by {
235 let extracted_value = eval_expr(key, archived.as_ref(), expr).ok()?;
236 group_values.push(extracted_value);
237 }
238 canonicalize_reduced_group_values(&mut group_values);
239
240 let mut reducer_values = Vec::with_capacity(request.reducers.len());
241 for reducer in &request.reducers {
242 let extracted_value = match (&reducer.expr, archived.as_ref()) {
243 (None, _) => None,
244 (Some(expr), _) => eval_expr(key, archived.as_ref(), expr).ok()?,
245 };
246 reducer_values.push(extracted_value);
247 }
248
249 Some((group_values, reducer_values))
250 }
251
252 #[allow(clippy::result_large_err)]
253 fn ensure_min_sequence_number(
254 token: &Arc<AtomicU64>,
255 required: Option<u64>,
256 ) -> Result<(), ConnectError> {
257 let current = token.load(AtomicOrdering::Relaxed);
258 if let Some(required) = required {
259 if current < required {
260 return Err(ConnectError::aborted(format!(
261 "consistency_not_ready: required={required}, current={current}"
262 )));
263 }
264 }
265 Ok(())
266 }
267
268 fn proto_range_entries_frame(results: Vec<(Key, Vec<u8>)>) -> ProtoRangeFrame {
269 ProtoRangeFrame {
270 results: results
271 .into_iter()
272 .map(|(key, value)| ProtoRangeEntry {
273 key: key.to_vec(),
274 value,
275 ..Default::default()
276 })
277 .collect(),
278 ..Default::default()
279 }
280 }
281
282 fn query_detail_trailer_ctx(sequence_number: u64) -> Context {
283 let detail = exoware_sdk_rs::store::query::v1::Detail {
284 sequence_number,
285 read_stats: Default::default(),
286 ..Default::default()
287 };
288 exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail)
289 }
290
291 #[derive(Clone)]
292 struct MockIngestConnect {
293 state: MockState,
294 }
295
296 impl IngestService for MockIngestConnect {
297 async fn put(
298 &self,
299 ctx: Context,
300 request: buffa::view::OwnedView<
301 exoware_sdk_rs::store::ingest::v1::PutRequestView<'static>,
302 >,
303 ) -> Result<(ProtoPutResponse, Context), ConnectError> {
304 let mut parsed = Vec::<(Key, Bytes)>::new();
305 for kv in request.kvs.iter() {
306 parsed.push((kv.key.to_vec().into(), Bytes::copy_from_slice(kv.value)));
307 }
308 let mut guard = self.state.kv.lock().expect("kv mutex poisoned");
309 for (key, value) in parsed.iter() {
310 guard.insert(key.clone(), value.clone());
311 }
312 let seq = self
313 .state
314 .sequence_number
315 .fetch_add(1, AtomicOrdering::SeqCst)
316 + 1;
317 Ok((
318 ProtoPutResponse {
319 sequence_number: seq,
320 ..Default::default()
321 },
322 ctx,
323 ))
324 }
325 }
326
327 #[derive(Clone)]
328 struct MockQueryConnect {
329 state: MockState,
330 }
331
332 impl QueryService for MockQueryConnect {
333 async fn get(
334 &self,
335 _ctx: Context,
336 request: buffa::view::OwnedView<
337 exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
338 >,
339 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
340 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
341 let key: Key = request.key.to_vec().into();
342 let guard = self.state.kv.lock().expect("kv mutex poisoned");
343 let value = guard.get(&key).cloned();
344 let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
345 let detail = exoware_sdk_rs::store::query::v1::Detail {
346 sequence_number: token,
347 read_stats: Default::default(),
348 ..Default::default()
349 };
350 Ok((
351 ProtoGetResponse {
352 value: value.map(|v| v.to_vec()),
353 ..Default::default()
354 },
355 exoware_sdk_rs::with_query_detail_response_header(Context::default(), &detail),
356 ))
357 }
358
359 async fn range(
360 &self,
361 _ctx: Context,
362 request: buffa::view::OwnedView<
363 exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
364 >,
365 ) -> Result<
366 (
367 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
368 Context,
369 ),
370 ConnectError,
371 > {
372 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
373 self.state.range_calls.fetch_add(1, AtomicOrdering::SeqCst);
374
375 let start_key: Key = request.start.to_vec().into();
376 let end_key: Key = request.end.to_vec().into();
377 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
378 let batch_size = usize::try_from(request.batch_size).unwrap_or(usize::MAX);
379 if batch_size == 0 {
380 return Err(ConnectError::invalid_argument(
381 "invalid batch_size: expected positive integer",
382 ));
383 }
384
385 let mode = match parse_range_traversal_direction(request.mode) {
386 Ok(RangeTraversalDirection::Forward) => RangeMode::Forward,
387 Ok(RangeTraversalDirection::Reverse) => RangeMode::Reverse,
388 Err(RangeTraversalModeError::UnknownWireValue(v)) => {
389 return Err(ConnectError::invalid_argument(format!(
390 "unknown TraversalMode enum value {v}"
391 )));
392 }
393 };
394
395 let state = self.state.clone();
396 let guard = state.kv.lock().expect("kv mutex poisoned");
397 let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
399 Included(&start_key),
400 if end_key.is_empty() {
401 Unbounded
402 } else {
403 Included(&end_key)
404 },
405 );
406 let range_iter = guard.range::<Key, _>(range);
407 let iter: Box<dyn Iterator<Item = (&Key, &Bytes)> + Send> = match mode {
408 RangeMode::Forward => Box::new(range_iter),
409 RangeMode::Reverse => Box::new(range_iter.rev()),
410 };
411 let mut results: Vec<ProtoRangeEntry> = Vec::new();
412 for (key, value) in iter.take(limit) {
413 results.push(ProtoRangeEntry {
414 key: key.to_vec(),
415 value: value.to_vec(),
416 ..Default::default()
417 });
418 }
419 drop(guard);
420 let token = state.sequence_number.load(AtomicOrdering::Relaxed);
421 let batch = batch_size.max(1);
422 let mut frames: Vec<Result<ProtoRangeFrame, ConnectError>> = Vec::new();
423 for chunk in results.chunks(batch) {
424 frames.push(Ok(ProtoRangeFrame {
425 results: chunk.to_vec(),
426 ..Default::default()
427 }));
428 }
429 let detail = exoware_sdk_rs::store::query::v1::Detail {
430 sequence_number: token,
431 read_stats: Default::default(),
432 ..Default::default()
433 };
434 Ok((
435 Box::pin(stream::iter(frames)),
436 exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail),
437 ))
438 }
439
440 async fn get_many(
441 &self,
442 _ctx: Context,
443 request: buffa::view::OwnedView<
444 exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
445 >,
446 ) -> Result<
447 (
448 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
449 Context,
450 ),
451 ConnectError,
452 > {
453 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
454 let batch_size = usize::try_from(request.batch_size)
455 .unwrap_or(usize::MAX)
456 .max(1);
457 let guard = self.state.kv.lock().expect("kv mutex poisoned");
458 let mut entries: Vec<ProtoGetManyEntry> = Vec::new();
459 for key_bytes in request.keys.iter() {
460 let key: Key = key_bytes.to_vec().into();
461 let value = guard.get(&key).cloned();
462 entries.push(ProtoGetManyEntry {
463 key: key.to_vec(),
464 value: value.map(|v| v.to_vec()),
465 ..Default::default()
466 });
467 }
468 drop(guard);
469 let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
470 let mut frames: Vec<Result<ProtoGetManyFrame, ConnectError>> = Vec::new();
471 for chunk in entries.chunks(batch_size) {
472 frames.push(Ok(ProtoGetManyFrame {
473 results: chunk.to_vec(),
474 ..Default::default()
475 }));
476 }
477 let detail = exoware_sdk_rs::store::query::v1::Detail {
478 sequence_number: token,
479 read_stats: Default::default(),
480 ..Default::default()
481 };
482 Ok((
483 Box::pin(stream::iter(frames)),
484 exoware_sdk_rs::with_query_detail_trailer(Context::default(), &detail),
485 ))
486 }
487
488 async fn reduce(
489 &self,
490 _ctx: Context,
491 request: buffa::view::OwnedView<
492 exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
493 >,
494 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
495 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
496 self.state
497 .range_reduce_calls
498 .fetch_add(1, AtomicOrdering::SeqCst);
499 let owned = request.to_owned_message();
500 let start_key: Key = owned.start.clone().into();
501 let end_key: Key = owned.end.clone().into();
502 let reduce_req = owned
503 .params
504 .as_option()
505 .ok_or_else(|| ConnectError::invalid_argument("missing range reduce params"))?;
506 let domain_request =
507 to_domain_reduce_request(reduce_req).map_err(ConnectError::invalid_argument)?;
508
509 let state = self.state.clone();
510 let guard = state.kv.lock().expect("kv mutex poisoned");
511 let mut states = domain_request.group_by.is_empty().then(|| {
512 domain_request
513 .reducers
514 .iter()
515 .map(|reducer| PartialAggregateState::from_op(reducer.op))
516 .collect::<Vec<_>>()
517 });
518 let mut grouped = BTreeMap::<Vec<u8>, MockGroupedReduceState>::new();
519
520 let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
521 Included(&start_key),
522 if end_key.is_empty() {
523 Unbounded
524 } else {
525 Included(&end_key)
526 },
527 );
528 for (key, value) in guard.range::<Key, _>(range) {
529 let Some((group_values, reducer_values)) =
530 extract_mock_reduce_row(key, value, &domain_request)
531 else {
532 continue;
533 };
534 if domain_request.group_by.is_empty() {
535 let states = states.as_mut().expect("scalar states");
536 for ((state, reducer), value) in states
537 .iter_mut()
538 .zip(domain_request.reducers.iter())
539 .zip(reducer_values.into_iter())
540 {
541 match reducer.op {
542 RangeReduceOp::CountAll => state
543 .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
544 .map_err(|e| ConnectError::internal(e.to_string()))?,
545 RangeReduceOp::CountField => {
546 let partial =
547 KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
548 state
549 .merge_partial(reducer.op, Some(&partial))
550 .map_err(|e| ConnectError::internal(e.to_string()))?
551 }
552 _ => state
553 .merge_partial(reducer.op, value.as_ref())
554 .map_err(|e| ConnectError::internal(e.to_string()))?,
555 }
556 }
557 } else {
558 let group_key = encode_reduced_group_key(&group_values);
559 let group =
560 grouped
561 .entry(group_key)
562 .or_insert_with(|| MockGroupedReduceState {
563 group_values: group_values.clone(),
564 states: domain_request
565 .reducers
566 .iter()
567 .map(|reducer| PartialAggregateState::from_op(reducer.op))
568 .collect(),
569 });
570 for ((state, reducer), value) in group
571 .states
572 .iter_mut()
573 .zip(domain_request.reducers.iter())
574 .zip(reducer_values.into_iter())
575 {
576 match reducer.op {
577 RangeReduceOp::CountAll => state
578 .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
579 .map_err(|e| ConnectError::internal(e.to_string()))?,
580 RangeReduceOp::CountField => {
581 let partial =
582 KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
583 state
584 .merge_partial(reducer.op, Some(&partial))
585 .map_err(|e| ConnectError::internal(e.to_string()))?
586 }
587 _ => state
588 .merge_partial(reducer.op, value.as_ref())
589 .map_err(|e| ConnectError::internal(e.to_string()))?,
590 }
591 }
592 }
593 }
594
595 let response = if let Some(states) = states {
596 RangeReduceResponse {
597 results: states
598 .iter()
599 .map(|state| RangeReduceResult {
600 value: match state {
601 PartialAggregateState::Count(count) => {
602 Some(KvReducedValue::UInt64(*count))
603 }
604 PartialAggregateState::Sum(value)
605 | PartialAggregateState::Min(value)
606 | PartialAggregateState::Max(value) => value.clone(),
607 },
608 })
609 .collect(),
610 groups: Vec::new(),
611 }
612 } else {
613 RangeReduceResponse {
614 results: Vec::new(),
615 groups: grouped
616 .into_values()
617 .map(|group| RangeReduceGroup {
618 group_values: group.group_values,
619 results: group
620 .states
621 .into_iter()
622 .map(|state| RangeReduceResult {
623 value: match state {
624 PartialAggregateState::Count(count) => {
625 Some(KvReducedValue::UInt64(count))
626 }
627 PartialAggregateState::Sum(value)
628 | PartialAggregateState::Min(value)
629 | PartialAggregateState::Max(value) => value,
630 },
631 })
632 .collect(),
633 })
634 .collect(),
635 }
636 };
637 drop(guard);
638 let token = state.sequence_number.load(AtomicOrdering::Relaxed);
639 let detail = exoware_sdk_rs::store::query::v1::Detail {
640 sequence_number: token,
641 read_stats: Default::default(),
642 ..Default::default()
643 };
644 Ok((
645 ProtoReduceResponse {
646 results: response
647 .results
648 .into_iter()
649 .map(
650 |result| exoware_sdk_rs::store::query::v1::RangeReduceResult {
651 value: result.value.map(to_proto_reduced_value).into(),
652 ..Default::default()
653 },
654 )
655 .collect(),
656 groups: response
657 .groups
658 .into_iter()
659 .map(|group| {
660 let group_values_present: Vec<bool> =
661 group.group_values.iter().map(|v| v.is_some()).collect();
662 exoware_sdk_rs::store::query::v1::RangeReduceGroup {
663 group_values: group
664 .group_values
665 .into_iter()
666 .map(to_proto_optional_reduced_value)
667 .collect(),
668 group_values_present,
669 results: group
670 .results
671 .into_iter()
672 .map(|result| {
673 exoware_sdk_rs::store::query::v1::RangeReduceResult {
674 value: result.value.map(to_proto_reduced_value).into(),
675 ..Default::default()
676 }
677 })
678 .collect(),
679 ..Default::default()
680 }
681 })
682 .collect(),
683 ..Default::default()
684 },
685 exoware_sdk_rs::with_query_detail_response_header(Context::default(), &detail),
686 ))
687 }
688 }
689
690 async fn spawn_mock_server(state: MockState) -> (String, oneshot::Sender<()>) {
691 let connect = ConnectRpcService::new(Chain(
692 IngestServiceServer::new(MockIngestConnect {
693 state: state.clone(),
694 }),
695 QueryServiceServer::new(MockQueryConnect { state }),
696 ))
697 .with_compression(connect_compression_registry());
698 let app = Router::new().fallback_service(connect);
699
700 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
701 .await
702 .expect("bind mock server");
703 let addr = listener.local_addr().expect("local addr");
704 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
705 tokio::spawn(async move {
706 axum::serve(listener, app)
707 .with_graceful_shutdown(async move {
708 let _ = shutdown_rx.await;
709 })
710 .await
711 .expect("mock server should run");
712 });
713 (format!("http://{addr}"), shutdown_tx)
714 }
715
716 fn assert_count_scalar(batch: &RecordBatch, col_idx: usize, row_idx: usize, expected: u64) {
717 let scalar = ScalarValue::try_from_array(batch.column(col_idx), row_idx)
718 .expect("count scalar should decode");
719 match scalar {
720 ScalarValue::UInt64(Some(value)) => assert_eq!(value, expected),
721 ScalarValue::Int64(Some(value)) => assert_eq!(value, expected as i64),
722 other => panic!("unexpected count scalar: {other:?}"),
723 }
724 }
725
726 async fn explain_plan_rows(ctx: &SessionContext, sql: &str) -> Vec<(String, String)> {
727 let batches = ctx
728 .sql(&format!("EXPLAIN {sql}"))
729 .await
730 .expect("explain query")
731 .collect()
732 .await
733 .expect("explain collect");
734 let mut rows = Vec::new();
735 for batch in batches {
736 for row_idx in 0..batch.num_rows() {
737 let plan_type = scalar_to_string(
738 &ScalarValue::try_from_array(batch.column(0), row_idx).expect("plan type"),
739 )
740 .expect("plan type string");
741 let plan = scalar_to_string(
742 &ScalarValue::try_from_array(batch.column(1), row_idx).expect("plan"),
743 )
744 .expect("plan string");
745 rows.push((plan_type, plan));
746 }
747 }
748 rows
749 }
750
751 fn physical_plan_text(rows: &[(String, String)]) -> String {
752 rows.iter()
753 .filter(|(plan_type, _)| plan_type.contains("physical_plan"))
754 .map(|(_, plan)| plan.as_str())
755 .collect::<Vec<_>>()
756 .join("\n")
757 }
758
759 #[tokio::test]
760 async fn explain_reports_full_scan_like_primary_key_scan() {
761 let state = MockState {
762 kv: Arc::new(Mutex::new(BTreeMap::new())),
763 range_calls: Arc::new(AtomicUsize::new(0)),
764 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
765 sequence_number: Arc::new(AtomicU64::new(0)),
766 };
767 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
768 let client = StoreClient::new(&base_url);
769
770 let schema = KvSchema::new(client)
771 .table(
772 "orders",
773 vec![
774 TableColumnConfig::new("id", DataType::Int64, false),
775 TableColumnConfig::new("status", DataType::Utf8, false),
776 TableColumnConfig::new("amount_cents", DataType::Int64, false),
777 ],
778 vec!["id".to_string()],
779 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
780 .expect("valid")
781 .with_cover_columns(vec!["amount_cents".to_string()])],
782 )
783 .expect("schema");
784 let ctx = SessionContext::new();
785 schema.register_all(&ctx).expect("register");
786
787 let explain =
788 physical_plan_text(&explain_plan_rows(&ctx, "SELECT id, status FROM orders").await);
789 assert!(explain.contains("KvScanExec:"));
790 assert!(explain.contains("mode=primary_key"));
791 assert!(explain.contains("predicate=<none>"));
792 assert!(explain.contains("row_recheck=false"));
793 assert!(explain.contains("full_scan_like=true"));
794 assert_explain_includes_query_stats_surface(
795 &explain,
796 QueryStatsExplainSurface::StreamedRangeTrailer,
797 );
798
799 let _ = shutdown_tx.send(());
800 }
801
802 #[tokio::test]
803 async fn explain_reports_secondary_index_scan_and_row_recheck() {
804 let state = MockState {
805 kv: Arc::new(Mutex::new(BTreeMap::new())),
806 range_calls: Arc::new(AtomicUsize::new(0)),
807 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
808 sequence_number: Arc::new(AtomicU64::new(0)),
809 };
810 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
811 let client = StoreClient::new(&base_url);
812
813 let schema = KvSchema::new(client)
814 .table(
815 "orders",
816 vec![
817 TableColumnConfig::new("id", DataType::Int64, false),
818 TableColumnConfig::new("status", DataType::Utf8, false),
819 TableColumnConfig::new("amount_cents", DataType::Int64, false),
820 ],
821 vec!["id".to_string()],
822 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
823 .expect("valid")
824 .with_cover_columns(vec!["amount_cents".to_string()])],
825 )
826 .expect("schema");
827 let ctx = SessionContext::new();
828 schema.register_all(&ctx).expect("register");
829
830 let explain = physical_plan_text(
831 &explain_plan_rows(
832 &ctx,
833 "SELECT id, status, amount_cents FROM orders \
834 WHERE status = 'open' AND amount_cents >= 5",
835 )
836 .await,
837 );
838 assert!(explain.contains("KvScanExec:"));
839 assert!(explain.contains("mode=secondary_index(status_idx, lexicographic)"));
840 assert!(explain.contains("predicate=status = 'open' AND amount_cents >= 5"));
841 assert!(explain.contains("exact=false"));
842 assert!(explain.contains("row_recheck=true"));
843 assert!(explain.contains("full_scan_like=false"));
844
845 let _ = shutdown_tx.send(());
846 }
847
848 #[tokio::test]
849 async fn explain_reports_zorder_secondary_index_scan() {
850 let state = MockState {
851 kv: Arc::new(Mutex::new(BTreeMap::new())),
852 range_calls: Arc::new(AtomicUsize::new(0)),
853 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
854 sequence_number: Arc::new(AtomicU64::new(0)),
855 };
856 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
857 let client = StoreClient::new(&base_url);
858
859 let schema = KvSchema::new(client)
860 .table(
861 "points",
862 vec![
863 TableColumnConfig::new("x", DataType::Int64, false),
864 TableColumnConfig::new("y", DataType::Int64, false),
865 TableColumnConfig::new("id", DataType::Int64, false),
866 TableColumnConfig::new("value", DataType::Int64, false),
867 ],
868 vec!["id".to_string()],
869 vec![
870 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
871 .expect("valid")
872 .with_cover_columns(vec!["value".to_string()]),
873 ],
874 )
875 .expect("schema");
876 let ctx = SessionContext::new();
877 schema.register_all(&ctx).expect("register");
878
879 let explain = physical_plan_text(
880 &explain_plan_rows(
881 &ctx,
882 "SELECT id, value FROM points \
883 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
884 )
885 .await,
886 );
887 assert!(explain.contains("KvScanExec:"));
888 assert!(explain.contains("mode=secondary_index(xy_z, z_order)"));
889 assert!(explain.contains("exact=false"));
890 assert!(explain.contains("row_recheck=true"));
891
892 let _ = shutdown_tx.send(());
893 }
894
895 #[tokio::test]
896 async fn explain_reports_aggregate_pushdown_access_path_details() {
897 let state = MockState {
898 kv: Arc::new(Mutex::new(BTreeMap::new())),
899 range_calls: Arc::new(AtomicUsize::new(0)),
900 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
901 sequence_number: Arc::new(AtomicU64::new(0)),
902 };
903 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
904 let client = StoreClient::new(&base_url);
905
906 let schema = KvSchema::new(client)
907 .table(
908 "orders",
909 vec![
910 TableColumnConfig::new("id", DataType::Int64, false),
911 TableColumnConfig::new("status", DataType::Utf8, false),
912 TableColumnConfig::new("amount_cents", DataType::Int64, false),
913 ],
914 vec!["id".to_string()],
915 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
916 .expect("valid")
917 .with_cover_columns(vec!["amount_cents".to_string()])],
918 )
919 .expect("schema");
920 let ctx = SessionContext::new();
921 schema.register_all(&ctx).expect("register");
922
923 let explain = physical_plan_text(
924 &explain_plan_rows(
925 &ctx,
926 "SELECT status, SUM(amount_cents) AS total_cents \
927 FROM orders WHERE status = 'open' GROUP BY status",
928 )
929 .await,
930 );
931 assert!(explain.contains("KvAggregateExec:"));
932 assert!(explain.contains("grouped=true"));
933 assert!(explain.contains("job0{mode=secondary_index(status_idx, lexicographic)"));
934 assert!(explain.contains("predicate=status = 'open'"));
935 assert!(explain.contains("exact=true"));
936 assert!(explain.contains("row_recheck=false"));
937 assert_explain_includes_query_stats_surface(
938 &explain,
939 QueryStatsExplainSurface::RangeReduceHeader,
940 );
941
942 let _ = shutdown_tx.send(());
943 }
944
945 #[test]
946 fn index_spec_constructor_sets_name_and_keys() {
947 let spec = IndexSpec::new(
948 "status_customer",
949 vec!["status".to_string(), "customer_id".to_string()],
950 )
951 .expect("valid index spec");
952 assert_eq!(spec.name(), "status_customer");
953 assert_eq!(spec.key_columns(), &["status", "customer_id"]);
954 assert!(spec.cover_columns().is_empty());
955 }
956
957 #[test]
958 fn index_spec_cover_columns_are_configurable_in_code() {
959 let spec = IndexSpec::new("status_customer", vec!["status".to_string()])
960 .expect("valid")
961 .with_cover_columns(vec!["amount_cents".to_string()]);
962 assert_eq!(spec.key_columns(), &["status"]);
963 assert_eq!(spec.cover_columns(), &["amount_cents"]);
964 }
965
966 #[test]
967 fn describe_in_list_places_truncation_ellipsis_inside_parentheses() {
968 let rendered = describe_in_list((1..=6).map(|v| v.to_string()));
969 assert_eq!(rendered, "IN (1, 2, 3, 4, 5, ...)");
970 }
971
972 #[test]
973 fn normalize_sum_case_then_one_uses_countall_optimization() {
974 let (model, _) = test_model();
975 let argument = normalize_case_then_expr(
976 AggregatePushdownFunction::Sum,
977 &Expr::Literal(ScalarValue::Int64(Some(1)), None),
978 &model,
979 )
980 .expect("normalize");
981 assert_eq!(argument, AggregatePushdownArgument::CountAll);
982 }
983
984 #[test]
985 fn normalize_count_case_then_literal_uses_countall_optimization() {
986 use datafusion::logical_expr::col;
987
988 let (model, _) = test_model();
989 let case_expr = Expr::Case(datafusion::logical_expr::expr::Case {
990 expr: None,
991 when_then_expr: vec![(
992 Box::new(col("status").eq(Expr::Literal(
993 ScalarValue::Utf8(Some("open".to_string())),
994 None,
995 ))),
996 Box::new(Expr::Literal(
997 ScalarValue::Utf8(Some("yes".to_string())),
998 None,
999 )),
1000 )],
1001 else_expr: Some(Box::new(Expr::Literal(ScalarValue::Utf8(None), None))),
1002 });
1003
1004 let (func, argument, filter) =
1005 normalize_count_aggregate_argument(&case_expr, &model).expect("normalize");
1006 assert_eq!(func, AggregatePushdownFunction::Count);
1007 assert_eq!(argument, AggregatePushdownArgument::CountAll);
1008 assert!(filter.is_some());
1009 }
1010
1011 #[test]
1012 fn reduced_value_to_scalar_preserves_timestamp_timezone_label() {
1013 let tz: Arc<str> = Arc::from("America/New_York");
1014 let scalar = reduced_value_to_scalar(
1015 Some(KvReducedValue::Timestamp(1_700_000_000_000_000)),
1016 &DataType::Timestamp(TimeUnit::Microsecond, Some(tz.clone())),
1017 )
1018 .expect("timestamp scalar");
1019 assert_eq!(
1020 scalar,
1021 ScalarValue::TimestampMicrosecond(Some(1_700_000_000_000_000), Some(tz))
1022 );
1023 }
1024
1025 #[test]
1026 fn index_spec_cover_pk_column_is_rejected() {
1027 let config = KvTableConfig::new(
1028 0,
1029 vec![
1030 TableColumnConfig::new("id", DataType::Int64, false),
1031 TableColumnConfig::new("status", DataType::Utf8, false),
1032 ],
1033 vec!["id".to_string()],
1034 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
1035 .expect("valid")
1036 .with_cover_columns(vec!["id".to_string()])],
1037 )
1038 .expect("valid config");
1039 let model = TableModel::from_config(&config).expect("model");
1040 let err = model
1041 .resolve_index_specs(&config.index_specs)
1042 .expect_err("covering a PK column must be rejected");
1043 assert!(err.contains("primary key column"));
1044 }
1045
1046 #[test]
1047 fn access_plan_requires_cover_columns_for_index_scan() {
1048 let (model, _) = test_model();
1049 let predicate = QueryPredicate::default();
1050 let projection = Some(vec![
1051 *model.columns_by_name.get("order_id").unwrap(),
1052 *model.columns_by_name.get("amount_cents").unwrap(),
1053 ]);
1054 let plan = ScanAccessPlan::new(&model, &projection, &predicate);
1055
1056 let no_cover = IndexSpec::new("status_idx", vec!["status".to_string()]).unwrap();
1057 let with_cover = IndexSpec::new("status_idx", vec!["status".to_string()])
1058 .unwrap()
1059 .with_cover_columns(vec!["amount_cents".to_string()]);
1060 let no_cover_resolved = model.resolve_index_specs(&[no_cover]).unwrap();
1061 let with_cover_resolved = model.resolve_index_specs(&[with_cover]).unwrap();
1062
1063 assert!(!plan.index_covers_required_non_pk(&no_cover_resolved[0]));
1064 assert!(plan.index_covers_required_non_pk(&with_cover_resolved[0]));
1065 }
1066
1067 #[test]
1068 fn choose_index_plan_prefers_longer_prefix() {
1069 let (model, specs) = test_model();
1070 let region_idx = *model.columns_by_name.get("region").unwrap();
1071 let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1072 let mut predicate = QueryPredicate::default();
1073 predicate.constraints.insert(
1074 region_idx,
1075 PredicateConstraint::StringEq("us-east".to_string()),
1076 );
1077 predicate.constraints.insert(
1078 customer_idx,
1079 PredicateConstraint::IntRange {
1080 min: Some(10),
1081 max: Some(20),
1082 },
1083 );
1084 let plan = predicate
1085 .choose_index_plan(&model, &specs)
1086 .expect("plan")
1087 .expect("exists");
1088 assert_eq!(plan.spec_idx, 0);
1089 assert_eq!(plan.constrained_prefix_len, 2);
1090 }
1091
1092 #[test]
1093 fn choose_index_plan_prefers_covering_index_when_prefix_strength_ties() {
1094 let config = KvTableConfig::new(
1095 0,
1096 vec![
1097 TableColumnConfig::new("id", DataType::Int64, false),
1098 TableColumnConfig::new("status", DataType::Utf8, false),
1099 TableColumnConfig::new("amount_cents", DataType::Int64, false),
1100 ],
1101 vec!["id".to_string()],
1102 vec![
1103 IndexSpec::new("status_plain", vec!["status".to_string()]).expect("valid"),
1104 IndexSpec::new("status_covering", vec!["status".to_string()])
1105 .expect("valid")
1106 .with_cover_columns(vec!["amount_cents".to_string()]),
1107 ],
1108 )
1109 .expect("config");
1110 let model = TableModel::from_config(&config).expect("model");
1111 let specs = model
1112 .resolve_index_specs(&config.index_specs)
1113 .expect("specs");
1114 let status_idx = *model.columns_by_name.get("status").unwrap();
1115 let amount_idx = *model.columns_by_name.get("amount_cents").unwrap();
1116 let mut predicate = QueryPredicate::default();
1117 predicate.constraints.insert(
1118 status_idx,
1119 PredicateConstraint::StringEq("open".to_string()),
1120 );
1121 predicate.constraints.insert(
1122 amount_idx,
1123 PredicateConstraint::IntRange {
1124 min: Some(10),
1125 max: None,
1126 },
1127 );
1128
1129 let plan = predicate
1130 .choose_index_plan(&model, &specs)
1131 .expect("plan")
1132 .expect("exists");
1133 assert_eq!(specs[plan.spec_idx].name, "status_covering");
1134 }
1135
1136 #[test]
1137 fn choose_index_plan_prefers_zorder_for_multi_column_box_constraints() {
1138 let (model, specs) = zorder_test_model();
1139 let x_idx = *model.columns_by_name.get("x").unwrap();
1140 let y_idx = *model.columns_by_name.get("y").unwrap();
1141 let mut predicate = QueryPredicate::default();
1142 predicate.constraints.insert(
1143 x_idx,
1144 PredicateConstraint::IntRange {
1145 min: Some(1),
1146 max: Some(2),
1147 },
1148 );
1149 predicate.constraints.insert(
1150 y_idx,
1151 PredicateConstraint::IntRange {
1152 min: Some(1),
1153 max: Some(2),
1154 },
1155 );
1156
1157 let plan = predicate
1158 .choose_index_plan(&model, &specs)
1159 .expect("plan")
1160 .expect("exists");
1161 assert_eq!(specs[plan.spec_idx].name, "xy_z");
1162 assert_eq!(specs[plan.spec_idx].layout, IndexLayout::ZOrder);
1163 assert_eq!(plan.constrained_column_count, 2);
1164 }
1165
1166 #[test]
1167 fn secondary_index_key_round_trip() {
1168 let (model, specs) = test_model();
1169 let row = KvRow {
1170 values: vec![
1171 CellValue::Utf8("us-east".to_string()),
1172 CellValue::Int64(42),
1173 CellValue::Int64(9001),
1174 CellValue::Int64(1500),
1175 CellValue::Utf8("open".to_string()),
1176 ],
1177 };
1178 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1179 .expect("encode");
1180 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1181 .expect("decode");
1182 let region_idx = *model.columns_by_name.get("region").unwrap();
1183 let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1184 assert!(matches!(
1185 decoded.values.get(®ion_idx),
1186 Some(CellValue::Utf8(v)) if v == "us-east"
1187 ));
1188 assert!(matches!(
1189 decoded.values.get(&customer_idx),
1190 Some(CellValue::Int64(v)) if *v == 42
1191 ));
1192 assert!(matches!(
1193 &decoded.primary_key_values[0],
1194 CellValue::Int64(9001)
1195 ));
1196 let expected_pk = encode_primary_key_from_row(model.table_prefix, &row, &model)
1197 .expect("primary key should encode");
1198 assert_eq!(decoded.primary_key, expected_pk);
1199 }
1200
1201 #[test]
1202 fn zorder_secondary_index_key_round_trip() {
1203 let (model, specs) = zorder_test_model();
1204 let row = KvRow {
1205 values: vec![
1206 CellValue::Int64(2),
1207 CellValue::Int64(1),
1208 CellValue::Int64(42),
1209 CellValue::Int64(900),
1210 ],
1211 };
1212 let key = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1213 .expect("encode");
1214 let decoded = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key)
1215 .expect("decode");
1216 let x_idx = *model.columns_by_name.get("x").unwrap();
1217 let y_idx = *model.columns_by_name.get("y").unwrap();
1218 assert!(matches!(
1219 decoded.values.get(&x_idx),
1220 Some(CellValue::Int64(v)) if *v == 2
1221 ));
1222 assert!(matches!(
1223 decoded.values.get(&y_idx),
1224 Some(CellValue::Int64(v)) if *v == 1
1225 ));
1226 assert!(matches!(
1227 &decoded.primary_key_values[0],
1228 CellValue::Int64(42)
1229 ));
1230 }
1231
1232 #[test]
1233 fn table_config_supports_non_orders_schema() {
1234 let config = KvTableConfig::new(
1235 0,
1236 vec![
1237 TableColumnConfig::new("tenant", DataType::Utf8, false),
1238 TableColumnConfig::new("id", DataType::Int64, false),
1239 TableColumnConfig::new("score", DataType::Int64, false),
1240 ],
1241 vec!["id".to_string()],
1242 vec![IndexSpec::new(
1243 "tenant_score",
1244 vec!["tenant".to_string(), "score".to_string()],
1245 )
1246 .expect("valid")],
1247 )
1248 .expect("schema agnostic config should be valid");
1249 assert_eq!(config.primary_key_columns, vec!["id".to_string()]);
1250 assert_eq!(config.columns.len(), 3);
1251 }
1252
1253 #[test]
1254 fn table_config_accepts_float64_column() {
1255 let config = KvTableConfig::new(
1256 0,
1257 vec![
1258 TableColumnConfig::new("id", DataType::Int64, false),
1259 TableColumnConfig::new("price", DataType::Float64, false),
1260 ],
1261 vec!["id".to_string()],
1262 vec![],
1263 )
1264 .expect("Float64 column should be accepted");
1265 assert_eq!(config.columns.len(), 2);
1266 }
1267
1268 #[test]
1269 fn table_config_accepts_boolean_column() {
1270 let config = KvTableConfig::new(
1271 0,
1272 vec![
1273 TableColumnConfig::new("id", DataType::Int64, false),
1274 TableColumnConfig::new("active", DataType::Boolean, false),
1275 ],
1276 vec!["id".to_string()],
1277 vec![],
1278 )
1279 .expect("Boolean column should be accepted");
1280 assert_eq!(config.columns.len(), 2);
1281 }
1282
1283 #[test]
1284 fn build_projected_batch_uses_large_utf8_type() {
1285 let config = KvTableConfig::new(
1286 0,
1287 vec![
1288 TableColumnConfig::new("id", DataType::Int64, false),
1289 TableColumnConfig::new("name", DataType::LargeUtf8, false),
1290 ],
1291 vec!["id".to_string()],
1292 vec![],
1293 )
1294 .unwrap();
1295 let model = TableModel::from_config(&config).unwrap();
1296 let rows = vec![KvRow {
1297 values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1298 }];
1299 let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1300 assert_eq!(batch.column(1).data_type(), &DataType::LargeUtf8);
1301 let values = batch
1302 .column(1)
1303 .as_any()
1304 .downcast_ref::<LargeStringArray>()
1305 .expect("must build LargeStringArray");
1306 assert_eq!(values.value(0), "hello");
1307 }
1308
1309 #[test]
1310 fn build_projected_batch_uses_utf8_view_type() {
1311 let config = KvTableConfig::new(
1312 0,
1313 vec![
1314 TableColumnConfig::new("id", DataType::Int64, false),
1315 TableColumnConfig::new("name", DataType::Utf8View, false),
1316 ],
1317 vec!["id".to_string()],
1318 vec![],
1319 )
1320 .unwrap();
1321 let model = TableModel::from_config(&config).unwrap();
1322 let rows = vec![KvRow {
1323 values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1324 }];
1325 let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1326 assert_eq!(batch.column(1).data_type(), &DataType::Utf8View);
1327 let values = batch
1328 .column(1)
1329 .as_any()
1330 .downcast_ref::<StringViewArray>()
1331 .expect("must build StringViewArray");
1332 assert_eq!(values.value(0), "hello");
1333 }
1334
1335 #[test]
1336 fn f64_ordered_encoding_preserves_order() {
1337 let values = [
1338 f64::NEG_INFINITY,
1339 f64::MIN,
1340 -1000.0,
1341 -1.0,
1342 -0.001,
1343 0.0,
1344 0.001,
1345 1.0,
1346 1000.0,
1347 f64::MAX,
1348 f64::INFINITY,
1349 ];
1350 let encoded: Vec<[u8; 8]> = values.iter().map(|v| encode_f64_ordered(*v)).collect();
1351 for i in 0..encoded.len() - 1 {
1352 assert!(
1353 encoded[i] < encoded[i + 1],
1354 "encode_f64_ordered({}) >= encode_f64_ordered({})",
1355 values[i],
1356 values[i + 1]
1357 );
1358 }
1359 }
1360
1361 #[test]
1362 fn f64_ordered_encoding_round_trip() {
1363 let values = [
1364 f64::MIN,
1365 -42.5,
1366 -0.0,
1367 0.0,
1368 3.125,
1369 f64::MAX,
1370 f64::INFINITY,
1371 f64::NEG_INFINITY,
1372 ];
1373 for v in values {
1374 let encoded = encode_f64_ordered(v);
1375 let decoded = decode_f64_ordered(encoded);
1376 assert!(
1377 v.to_bits() == decoded.to_bits(),
1378 "round-trip failed for {v}: got {decoded}"
1379 );
1380 }
1381 }
1382
1383 fn mixed_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1384 let config = KvTableConfig::new(
1385 0,
1386 vec![
1387 TableColumnConfig::new("id", DataType::Int64, false),
1388 TableColumnConfig::new("label", DataType::Utf8, false),
1389 TableColumnConfig::new("score", DataType::Float64, false),
1390 TableColumnConfig::new("active", DataType::Boolean, false),
1391 ],
1392 vec!["id".to_string()],
1393 vec![
1394 IndexSpec::new(
1395 "active_score",
1396 vec!["active".to_string(), "score".to_string()],
1397 )
1398 .expect("valid"),
1399 IndexSpec::new("label_idx", vec!["label".to_string()]).expect("valid"),
1400 ],
1401 )
1402 .expect("valid config");
1403 let model = TableModel::from_config(&config).expect("model");
1404 let specs = model
1405 .resolve_index_specs(&config.index_specs)
1406 .expect("specs");
1407 (model, specs)
1408 }
1409
1410 #[test]
1411 fn secondary_index_key_round_trip_with_float64_and_boolean() {
1412 let (model, specs) = mixed_model();
1413 let row = KvRow {
1414 values: vec![
1415 CellValue::Int64(100),
1416 CellValue::Utf8("hello".to_string()),
1417 CellValue::Float64(3.125),
1418 CellValue::Boolean(true),
1419 ],
1420 };
1421 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1422 .expect("encode");
1423 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1424 .expect("decode");
1425 let active_idx = *model.columns_by_name.get("active").unwrap();
1426 let score_idx = *model.columns_by_name.get("score").unwrap();
1427 assert!(matches!(
1428 decoded.values.get(&active_idx),
1429 Some(CellValue::Boolean(true))
1430 ));
1431 assert!(
1432 matches!(decoded.values.get(&score_idx), Some(CellValue::Float64(v)) if (*v - 3.125).abs() < f64::EPSILON)
1433 );
1434 assert!(matches!(
1435 &decoded.primary_key_values[0],
1436 CellValue::Int64(100)
1437 ));
1438 }
1439
1440 #[test]
1441 fn base_row_round_trip_with_float64_and_boolean() {
1442 let (model, _specs) = mixed_model();
1443 let row = KvRow {
1444 values: vec![
1445 CellValue::Int64(42),
1446 CellValue::Utf8("world".to_string()),
1447 CellValue::Float64(-99.5),
1448 CellValue::Boolean(false),
1449 ],
1450 };
1451 let encoded = encode_base_row_value(&row, &model).expect("encode");
1452 let decoded =
1453 decode_base_row(vec![CellValue::Int64(42)], &encoded, &model).expect("decode");
1454 assert!(matches!(&decoded.values[0], CellValue::Int64(42)));
1455 assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "world"));
1456 assert!(
1457 matches!(&decoded.values[2], CellValue::Float64(v) if (*v - (-99.5)).abs() < f64::EPSILON)
1458 );
1459 assert!(matches!(&decoded.values[3], CellValue::Boolean(false)));
1460 }
1461
1462 #[test]
1463 fn predicate_bool_eq_matches() {
1464 let (model, _specs) = mixed_model();
1465 let active_idx = *model.columns_by_name.get("active").unwrap();
1466 let mut pred = QueryPredicate::default();
1467 pred.constraints
1468 .insert(active_idx, PredicateConstraint::BoolEq(true));
1469 let row_true = KvRow {
1470 values: vec![
1471 CellValue::Int64(1),
1472 CellValue::Utf8("a".to_string()),
1473 CellValue::Float64(1.0),
1474 CellValue::Boolean(true),
1475 ],
1476 };
1477 let row_false = KvRow {
1478 values: vec![
1479 CellValue::Int64(2),
1480 CellValue::Utf8("b".to_string()),
1481 CellValue::Float64(2.0),
1482 CellValue::Boolean(false),
1483 ],
1484 };
1485 assert!(pred.matches_row(&row_true));
1486 assert!(!pred.matches_row(&row_false));
1487 }
1488
1489 #[test]
1490 fn predicate_float_range_matches() {
1491 let (model, _specs) = mixed_model();
1492 let score_idx = *model.columns_by_name.get("score").unwrap();
1493 let mut pred = QueryPredicate::default();
1494 pred.constraints.insert(
1495 score_idx,
1496 PredicateConstraint::FloatRange {
1497 min: Some((2.0, true)),
1498 max: Some((5.0, false)),
1499 },
1500 );
1501 let make_row = |score: f64| KvRow {
1502 values: vec![
1503 CellValue::Int64(1),
1504 CellValue::Utf8("a".to_string()),
1505 CellValue::Float64(score),
1506 CellValue::Boolean(true),
1507 ],
1508 };
1509 assert!(!pred.matches_row(&make_row(1.99)));
1510 assert!(pred.matches_row(&make_row(2.0)));
1511 assert!(pred.matches_row(&make_row(3.5)));
1512 assert!(pred.matches_row(&make_row(4.99)));
1513 assert!(!pred.matches_row(&make_row(5.0)));
1514 assert!(!pred.matches_row(&make_row(5.01)));
1515 }
1516
1517 #[test]
1518 fn float_range_rejects_nan_row_value() {
1519 let constraint = PredicateConstraint::FloatRange {
1520 min: Some((0.0, true)),
1521 max: Some((10.0, true)),
1522 };
1523 assert!(!matches_constraint(
1524 &CellValue::Float64(f64::NAN),
1525 &constraint
1526 ));
1527 }
1528
1529 #[test]
1530 fn index_plan_with_boolean_prefix() {
1531 let (model, specs) = mixed_model();
1532 let active_idx = *model.columns_by_name.get("active").unwrap();
1533 let mut pred = QueryPredicate::default();
1534 pred.constraints
1535 .insert(active_idx, PredicateConstraint::BoolEq(true));
1536 let plan = pred
1537 .choose_index_plan(&model, &specs)
1538 .expect("plan")
1539 .expect("should find index");
1540 assert_eq!(plan.spec_idx, 0);
1541 assert_eq!(plan.constrained_prefix_len, 1);
1542 }
1543
1544 #[test]
1545 fn float_constraint_contradiction() {
1546 let mut lo: Option<(f64, bool)> = None;
1547 let mut hi: Option<(f64, bool)> = None;
1548 let mut contradiction = false;
1549 apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 10.0, &mut contradiction);
1550 assert!(!contradiction);
1551 apply_float_constraint(&mut lo, &mut hi, Operator::Lt, 5.0, &mut contradiction);
1552 assert!(contradiction);
1553 }
1554
1555 #[test]
1556 fn float_constraint_eq_then_range_contradicts() {
1557 let mut lo: Option<(f64, bool)> = None;
1558 let mut hi: Option<(f64, bool)> = None;
1559 let mut contradiction = false;
1560 apply_float_constraint(&mut lo, &mut hi, Operator::Eq, 5.0, &mut contradiction);
1561 assert!(!contradiction);
1562 apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 5.0, &mut contradiction);
1563 assert!(contradiction);
1564 }
1565
1566 #[test]
1567 fn float_nan_literal_comparison_marks_contradiction() {
1568 let config = KvTableConfig::new(
1569 0,
1570 vec![
1571 TableColumnConfig::new("id", DataType::Int64, false),
1572 TableColumnConfig::new("score", DataType::Float64, false),
1573 ],
1574 vec!["id".to_string()],
1575 vec![],
1576 )
1577 .unwrap();
1578 let model = TableModel::from_config(&config).unwrap();
1579
1580 use datafusion::logical_expr::col;
1581 let filter = col("score").gt(Expr::Literal(ScalarValue::Float64(Some(f64::NAN)), None));
1582 assert!(QueryPredicate::supports_filter(&filter, &model));
1583
1584 let pred = QueryPredicate::from_filters(&[filter], &model);
1585 assert!(
1586 pred.contradiction,
1587 "comparison with NaN literal must produce contradiction"
1588 );
1589 }
1590
1591 #[test]
1592 fn table_config_accepts_date32_column() {
1593 let config = KvTableConfig::new(
1594 0,
1595 vec![
1596 TableColumnConfig::new("id", DataType::Int64, false),
1597 TableColumnConfig::new("created", DataType::Date32, false),
1598 ],
1599 vec!["id".to_string()],
1600 vec![],
1601 )
1602 .expect("Date32 column should be accepted");
1603 assert_eq!(config.columns.len(), 2);
1604 }
1605
1606 #[test]
1607 fn table_config_accepts_timestamp_column() {
1608 let config = KvTableConfig::new(
1609 0,
1610 vec![
1611 TableColumnConfig::new("id", DataType::Int64, false),
1612 TableColumnConfig::new(
1613 "ts",
1614 DataType::Timestamp(TimeUnit::Microsecond, None),
1615 false,
1616 ),
1617 ],
1618 vec!["id".to_string()],
1619 vec![],
1620 )
1621 .expect("Timestamp column should be accepted");
1622 let schema = config.to_schema();
1623 assert!(matches!(
1624 schema.field(1).data_type(),
1625 DataType::Timestamp(TimeUnit::Microsecond, _)
1626 ));
1627 }
1628
1629 #[test]
1630 fn table_config_normalizes_timestamp_to_microsecond() {
1631 let config = KvTableConfig::new(
1632 0,
1633 vec![
1634 TableColumnConfig::new("id", DataType::Int64, false),
1635 TableColumnConfig::new(
1636 "ts",
1637 DataType::Timestamp(TimeUnit::Nanosecond, None),
1638 false,
1639 ),
1640 ],
1641 vec!["id".to_string()],
1642 vec![],
1643 )
1644 .expect("Nanosecond timestamp should be accepted");
1645 let schema = config.to_schema();
1646 assert!(matches!(
1647 schema.field(1).data_type(),
1648 DataType::Timestamp(TimeUnit::Microsecond, _)
1649 ));
1650 }
1651
1652 #[test]
1653 fn table_config_accepts_decimal128_column() {
1654 let config = KvTableConfig::new(
1655 0,
1656 vec![
1657 TableColumnConfig::new("id", DataType::Int64, false),
1658 TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1659 ],
1660 vec!["id".to_string()],
1661 vec![],
1662 )
1663 .expect("Decimal128 column should be accepted");
1664 assert_eq!(config.columns.len(), 2);
1665 }
1666
1667 #[test]
1668 fn table_config_accepts_list_column() {
1669 use datafusion::arrow::datatypes::Field;
1670
1671 let config = KvTableConfig::new(
1672 0,
1673 vec![
1674 TableColumnConfig::new("id", DataType::Int64, false),
1675 TableColumnConfig::new(
1676 "tags",
1677 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1678 false,
1679 ),
1680 ],
1681 vec!["id".to_string()],
1682 vec![],
1683 )
1684 .expect("List<Utf8> column should be accepted");
1685 assert_eq!(config.columns.len(), 2);
1686 }
1687
1688 #[test]
1689 fn list_column_rejected_in_index() {
1690 use datafusion::arrow::datatypes::Field;
1691
1692 let result = KvTableConfig::new(
1693 0,
1694 vec![
1695 TableColumnConfig::new("id", DataType::Int64, false),
1696 TableColumnConfig::new(
1697 "tags",
1698 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1699 false,
1700 ),
1701 ],
1702 vec!["id".to_string()],
1703 vec![IndexSpec::new("tags_idx", vec!["tags".to_string()]).unwrap()],
1704 );
1705 assert!(
1706 result.is_err() || {
1707 let config = result.unwrap();
1708 let model = TableModel::from_config(&config).unwrap();
1709 model.resolve_index_specs(&config.index_specs).is_err()
1710 }
1711 );
1712 }
1713
1714 #[test]
1715 fn i32_ordered_encoding_round_trip() {
1716 let values = [i32::MIN, -1000, -1, 0, 1, 1000, i32::MAX];
1717 for v in values {
1718 assert_eq!(decode_i32_ordered(encode_i32_ordered(v)), v);
1719 }
1720 let encoded: Vec<[u8; 4]> = values.iter().map(|v| encode_i32_ordered(*v)).collect();
1721 for i in 0..encoded.len() - 1 {
1722 assert!(encoded[i] < encoded[i + 1]);
1723 }
1724 }
1725
1726 #[test]
1727 fn i128_ordered_encoding_round_trip() {
1728 let values = [i128::MIN, -1, 0, 1, 1234567890123456789, i128::MAX];
1729 for v in values {
1730 assert_eq!(decode_i128_ordered(encode_i128_ordered(v)), v);
1731 }
1732 let encoded: Vec<[u8; 16]> = values.iter().map(|v| encode_i128_ordered(*v)).collect();
1733 for i in 0..encoded.len() - 1 {
1734 assert!(encoded[i] < encoded[i + 1]);
1735 }
1736 }
1737
1738 fn extended_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1739 let config = KvTableConfig::new(
1740 0,
1741 vec![
1742 TableColumnConfig::new("id", DataType::Int64, false),
1743 TableColumnConfig::new("created", DataType::Date32, false),
1744 TableColumnConfig::new(
1745 "ts",
1746 DataType::Timestamp(TimeUnit::Microsecond, None),
1747 false,
1748 ),
1749 TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1750 TableColumnConfig::new("label", DataType::Utf8, false),
1751 ],
1752 vec!["id".to_string()],
1753 vec![
1754 IndexSpec::new(
1755 "date_label",
1756 vec!["created".to_string(), "label".to_string()],
1757 )
1758 .expect("valid"),
1759 IndexSpec::new("price_idx", vec!["price".to_string()]).expect("valid"),
1760 ],
1761 )
1762 .expect("valid config");
1763 let model = TableModel::from_config(&config).expect("model");
1764 let specs = model
1765 .resolve_index_specs(&config.index_specs)
1766 .expect("specs");
1767 (model, specs)
1768 }
1769
1770 #[test]
1771 fn secondary_index_key_round_trip_date32_and_decimal128() {
1772 let (model, specs) = extended_model();
1773 let row = KvRow {
1774 values: vec![
1775 CellValue::Int64(42),
1776 CellValue::Date32(19000),
1777 CellValue::Timestamp(1_700_000_000_000_000),
1778 CellValue::Decimal128(123456),
1779 CellValue::Utf8("hello".to_string()),
1780 ],
1781 };
1782 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1783 .expect("encode");
1784 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1785 .expect("decode");
1786 let created_idx = *model.columns_by_name.get("created").unwrap();
1787 let label_idx = *model.columns_by_name.get("label").unwrap();
1788 assert!(matches!(
1789 decoded.values.get(&created_idx),
1790 Some(CellValue::Date32(19000))
1791 ));
1792 assert!(matches!(
1793 decoded.values.get(&label_idx),
1794 Some(CellValue::Utf8(v)) if v == "hello"
1795 ));
1796 assert!(matches!(
1797 &decoded.primary_key_values[0],
1798 CellValue::Int64(42)
1799 ));
1800
1801 let key2 = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1802 .expect("encode");
1803 let decoded2 = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key2)
1804 .expect("decode");
1805 let price_idx = *model.columns_by_name.get("price").unwrap();
1806 assert!(matches!(
1807 decoded2.values.get(&price_idx),
1808 Some(CellValue::Decimal128(123456))
1809 ));
1810 assert!(matches!(
1811 &decoded2.primary_key_values[0],
1812 CellValue::Int64(42)
1813 ));
1814 }
1815
1816 #[test]
1817 fn base_row_round_trip_with_date32_timestamp_decimal128() {
1818 let (model, _specs) = extended_model();
1819 let row = KvRow {
1820 values: vec![
1821 CellValue::Int64(7),
1822 CellValue::Date32(19500),
1823 CellValue::Timestamp(1_700_000_000_000_000),
1824 CellValue::Decimal128(-9876543),
1825 CellValue::Utf8("world".to_string()),
1826 ],
1827 };
1828 let encoded = encode_base_row_value(&row, &model).expect("encode");
1829 let decoded = decode_base_row(vec![CellValue::Int64(7)], &encoded, &model).expect("decode");
1830 assert!(matches!(&decoded.values[0], CellValue::Int64(7)));
1831 assert!(matches!(&decoded.values[1], CellValue::Date32(19500)));
1832 assert!(matches!(
1833 &decoded.values[2],
1834 CellValue::Timestamp(1_700_000_000_000_000)
1835 ));
1836 assert!(matches!(
1837 &decoded.values[3],
1838 CellValue::Decimal128(-9876543)
1839 ));
1840 assert!(matches!(&decoded.values[4], CellValue::Utf8(v) if v == "world"));
1841 }
1842
1843 #[test]
1844 fn base_row_round_trip_with_list() {
1845 use datafusion::arrow::datatypes::Field;
1846
1847 let config = KvTableConfig::new(
1848 0,
1849 vec![
1850 TableColumnConfig::new("id", DataType::Int64, false),
1851 TableColumnConfig::new(
1852 "tags",
1853 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1854 false,
1855 ),
1856 TableColumnConfig::new(
1857 "scores",
1858 DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
1859 false,
1860 ),
1861 ],
1862 vec!["id".to_string()],
1863 vec![],
1864 )
1865 .expect("valid");
1866 let model = TableModel::from_config(&config).expect("model");
1867 let row = KvRow {
1868 values: vec![
1869 CellValue::Int64(1),
1870 CellValue::List(vec![
1871 CellValue::Utf8("a".to_string()),
1872 CellValue::Utf8("b".to_string()),
1873 ]),
1874 CellValue::List(vec![CellValue::Int64(10), CellValue::Int64(20)]),
1875 ],
1876 };
1877 let encoded = encode_base_row_value(&row, &model).expect("encode");
1878 let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).expect("decode");
1879 assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
1880 match &decoded.values[1] {
1881 CellValue::List(items) => {
1882 assert_eq!(items.len(), 2);
1883 assert!(matches!(&items[0], CellValue::Utf8(v) if v == "a"));
1884 assert!(matches!(&items[1], CellValue::Utf8(v) if v == "b"));
1885 }
1886 _ => panic!("expected List"),
1887 }
1888 match &decoded.values[2] {
1889 CellValue::List(items) => {
1890 assert_eq!(items.len(), 2);
1891 assert!(matches!(&items[0], CellValue::Int64(10)));
1892 assert!(matches!(&items[1], CellValue::Int64(20)));
1893 }
1894 _ => panic!("expected List"),
1895 }
1896 }
1897
1898 #[test]
1899 fn decimal128_constraint_range() {
1900 let mut min: Option<i128> = None;
1901 let mut max: Option<i128> = None;
1902 let mut contradiction = false;
1903 apply_decimal128_constraint(&mut min, &mut max, Operator::GtEq, 100, &mut contradiction);
1904 assert!(!contradiction);
1905 apply_decimal128_constraint(&mut min, &mut max, Operator::LtEq, 200, &mut contradiction);
1906 assert!(!contradiction);
1907 assert_eq!(min, Some(100));
1908 assert_eq!(max, Some(200));
1909 assert!(in_i128_bounds(150, min, max));
1910 assert!(!in_i128_bounds(99, min, max));
1911 assert!(!in_i128_bounds(201, min, max));
1912 }
1913
1914 #[test]
1915 fn decimal256_gt_max_is_contradiction() {
1916 let mut min: Option<i256> = None;
1917 let mut max: Option<i256> = None;
1918 let mut contradiction = false;
1919 apply_i256_constraint(
1920 &mut min,
1921 &mut max,
1922 Operator::Gt,
1923 i256::MAX,
1924 &mut contradiction,
1925 );
1926 assert!(contradiction);
1927 assert_eq!(min, None);
1928 assert_eq!(max, None);
1929 }
1930
1931 #[test]
1932 fn decimal256_lt_min_is_contradiction() {
1933 let mut min: Option<i256> = None;
1934 let mut max: Option<i256> = None;
1935 let mut contradiction = false;
1936 apply_i256_constraint(
1937 &mut min,
1938 &mut max,
1939 Operator::Lt,
1940 i256::MIN,
1941 &mut contradiction,
1942 );
1943 assert!(contradiction);
1944 assert_eq!(min, None);
1945 assert_eq!(max, None);
1946 }
1947
1948 #[test]
1949 fn date32_index_bound_clamps_on_i64_overflow() {
1950 let config = KvTableConfig::new(
1951 0,
1952 vec![
1953 TableColumnConfig::new("id", DataType::Int64, false),
1954 TableColumnConfig::new("created", DataType::Date32, false),
1955 ],
1956 vec!["id".to_string()],
1957 vec![IndexSpec::new("created_idx", vec!["created".to_string()]).unwrap()],
1958 )
1959 .unwrap();
1960 let model = TableModel::from_config(&config).unwrap();
1961 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
1962
1963 let created_idx = *model.columns_by_name.get("created").unwrap();
1964 let mut pred = QueryPredicate::default();
1965 pred.constraints.insert(
1966 created_idx,
1967 PredicateConstraint::IntRange {
1968 min: Some(i32::MAX as i64 + 1),
1969 max: None,
1970 },
1971 );
1972
1973 let start = pred
1974 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, false)
1975 .unwrap();
1976 let end = pred
1977 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, true)
1978 .unwrap();
1979
1980 assert!(
1981 start <= end,
1982 "lower bound must not exceed upper bound (was wrapping via as i32)"
1983 );
1984
1985 let encoded_lower = specs[0].codec.read_payload_exact::<4>(&start, 0).unwrap();
1986 let decoded_lower = decode_i32_ordered(encoded_lower);
1987 assert_eq!(
1988 decoded_lower,
1989 i32::MAX,
1990 "out-of-range i64 must clamp to i32::MAX, not wrap"
1991 );
1992 }
1993
1994 #[test]
1995 fn timestamp_nanos_gt_uses_floor_division() {
1996 let micros = timestamp_scalar_to_micros_for_op(
1997 &ScalarValue::TimestampNanosecond(Some(-1500), None),
1998 Operator::Gt,
1999 )
2000 .unwrap();
2001 assert_eq!(micros, -2, "Gt on -1500ns should floor to -2us");
2002
2003 let mut min: Option<i64> = None;
2004 let mut max: Option<i64> = None;
2005 let mut contradiction = false;
2006 apply_int_constraint(&mut min, &mut max, Operator::Gt, micros, &mut contradiction);
2007 assert_eq!(min, Some(-1), "Gt(-2us) + 1 = min -1us");
2008
2009 let row_at_minus_1 = CellValue::Timestamp(-1);
2010 assert!(matches_constraint(
2011 &row_at_minus_1,
2012 &PredicateConstraint::IntRange { min, max }
2013 ));
2014 }
2015
2016 #[test]
2017 fn timestamp_nanos_lteq_uses_floor_division() {
2018 let micros = timestamp_scalar_to_micros_for_op(
2019 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2020 Operator::LtEq,
2021 )
2022 .unwrap();
2023 assert_eq!(micros, -2, "LtEq on -1500ns should floor to -2us");
2024
2025 let row_at_minus_1 = CellValue::Timestamp(-1);
2026 assert!(
2027 !matches_constraint(
2028 &row_at_minus_1,
2029 &PredicateConstraint::IntRange {
2030 min: None,
2031 max: Some(micros)
2032 }
2033 ),
2034 "-1us (-1000ns) > -1500ns, must not satisfy <= -1500ns"
2035 );
2036 }
2037
2038 #[test]
2039 fn timestamp_nanos_gteq_uses_ceil_division() {
2040 let micros = timestamp_scalar_to_micros_for_op(
2041 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2042 Operator::GtEq,
2043 )
2044 .unwrap();
2045 assert_eq!(micros, -1, "GtEq on -1500ns should ceil to -1us");
2046 }
2047
2048 #[test]
2049 fn timestamp_nanos_lt_uses_ceil_division() {
2050 let micros = timestamp_scalar_to_micros_for_op(
2051 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2052 Operator::Lt,
2053 )
2054 .unwrap();
2055 assert_eq!(micros, -1, "Lt on -1500ns should ceil to -1us");
2056
2057 let mut min: Option<i64> = None;
2058 let mut max: Option<i64> = None;
2059 let mut contradiction = false;
2060 apply_int_constraint(&mut min, &mut max, Operator::Lt, micros, &mut contradiction);
2061 assert_eq!(max, Some(-2), "Lt(-1us) - 1 = max -2us");
2062 }
2063
2064 #[test]
2065 fn timestamp_nanos_eq_non_aligned_is_contradiction() {
2066 let result = timestamp_scalar_to_micros_for_op(
2067 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2068 Operator::Eq,
2069 );
2070 assert!(
2071 result.is_none(),
2072 "non-aligned ns Eq must produce contradiction"
2073 );
2074 }
2075
2076 #[test]
2077 fn timestamp_nanos_exact_multiple_is_unchanged() {
2078 for op in [
2079 Operator::Eq,
2080 Operator::Gt,
2081 Operator::GtEq,
2082 Operator::Lt,
2083 Operator::LtEq,
2084 ] {
2085 let micros = timestamp_scalar_to_micros_for_op(
2086 &ScalarValue::TimestampNanosecond(Some(-2000), None),
2087 op,
2088 )
2089 .unwrap();
2090 assert_eq!(micros, -2, "exact multiple -2000ns = -2us for {op:?}");
2091 }
2092 }
2093
2094 #[test]
2095 fn float64_index_bounds_include_infinity() {
2096 let config = KvTableConfig::new(
2097 0,
2098 vec![
2099 TableColumnConfig::new("id", DataType::Int64, false),
2100 TableColumnConfig::new("val", DataType::Float64, false),
2101 ],
2102 vec!["id".to_string()],
2103 vec![IndexSpec::new("val_idx", vec!["val".to_string()]).unwrap()],
2104 )
2105 .unwrap();
2106 let model = TableModel::from_config(&config).unwrap();
2107 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
2108
2109 let pred = QueryPredicate::default();
2110 let start = pred
2111 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, false)
2112 .unwrap();
2113 let end = pred
2114 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, true)
2115 .unwrap();
2116
2117 let neg_inf_row = KvRow {
2118 values: vec![CellValue::Int64(1), CellValue::Float64(f64::NEG_INFINITY)],
2119 };
2120 let pos_inf_row = KvRow {
2121 values: vec![CellValue::Int64(2), CellValue::Float64(f64::INFINITY)],
2122 };
2123
2124 let neg_inf_key =
2125 encode_secondary_index_key(model.table_prefix, &specs[0], &model, &neg_inf_row)
2126 .unwrap();
2127 let pos_inf_key =
2128 encode_secondary_index_key(model.table_prefix, &specs[0], &model, &pos_inf_row)
2129 .unwrap();
2130
2131 assert!(
2132 neg_inf_key >= start,
2133 "NEG_INFINITY row key must be within scan start bound"
2134 );
2135 assert!(
2136 pos_inf_key <= end,
2137 "INFINITY row key must be within scan end bound"
2138 );
2139 }
2140
2141 #[test]
2142 fn distinct_table_prefixes_produce_non_overlapping_pk_ranges() {
2143 let range_a = primary_key_prefix_range(1);
2144 let range_b = primary_key_prefix_range(2);
2145 assert!(
2146 range_a.end < range_b.start,
2147 "table prefix 1 pk range must be entirely below table prefix 2"
2148 );
2149 }
2150
2151 #[test]
2152 fn distinct_table_prefixes_isolate_primary_keys() {
2153 let model_1 = simple_int64_model(1);
2154 let model_2 = simple_int64_model(2);
2155 let pk = CellValue::Int64(42);
2156 let key_a = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2157 let key_b = encode_primary_key(2, &[&pk], &model_2).expect("pk key encodes");
2158 assert_ne!(key_a, key_b, "same PK under different prefixes must differ");
2159 assert!(
2160 decode_primary_key(1, &key_a, &model_1).is_some(),
2161 "key_a must decode under prefix 1"
2162 );
2163 assert!(
2164 decode_primary_key(2, &key_a, &model_2).is_none(),
2165 "key_a must NOT decode under prefix 2"
2166 );
2167 assert!(
2168 decode_primary_key(2, &key_b, &model_2).is_some(),
2169 "key_b must decode under prefix 2"
2170 );
2171 }
2172
2173 #[test]
2174 fn distinct_table_prefixes_isolate_secondary_keys() {
2175 let config_a = KvTableConfig::new(
2176 10,
2177 vec![
2178 TableColumnConfig::new("id", DataType::Int64, false),
2179 TableColumnConfig::new("name", DataType::Utf8, false),
2180 ],
2181 vec!["id".to_string()],
2182 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2183 )
2184 .unwrap();
2185 let config_b = KvTableConfig::new(
2186 11,
2187 vec![
2188 TableColumnConfig::new("id", DataType::Int64, false),
2189 TableColumnConfig::new("name", DataType::Utf8, false),
2190 ],
2191 vec!["id".to_string()],
2192 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2193 )
2194 .unwrap();
2195
2196 let model_a = TableModel::from_config(&config_a).unwrap();
2197 let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2198 let model_b = TableModel::from_config(&config_b).unwrap();
2199 let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2200
2201 let row = KvRow {
2202 values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2203 };
2204 let key_a =
2205 encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2206 let key_b =
2207 encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2208
2209 assert_ne!(
2210 key_a, key_b,
2211 "same row under different prefixes must differ"
2212 );
2213 assert!(
2214 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_a)
2215 .is_some()
2216 );
2217 assert!(
2218 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2219 .is_none(),
2220 "key from table B must not decode under table A's prefix"
2221 );
2222 }
2223
2224 #[test]
2225 fn table_prefix_stored_in_model() {
2226 let config = KvTableConfig::new(
2227 12,
2228 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2229 vec!["id".to_string()],
2230 vec![],
2231 )
2232 .unwrap();
2233 let model = TableModel::from_config(&config).unwrap();
2234 assert_eq!(model.table_prefix, 12);
2235 }
2236
2237 #[test]
2238 fn codec_layout_exposes_payload_bits_under_reserved_family_bits() {
2239 let config = KvTableConfig::new(
2240 0,
2241 vec![
2242 TableColumnConfig::new("id", DataType::FixedSizeBinary(16), false),
2243 TableColumnConfig::new("bucket", DataType::FixedSizeBinary(16), false),
2244 ],
2245 vec!["id".to_string()],
2246 vec![IndexSpec::new("bucket_idx", vec!["bucket".to_string()]).unwrap()],
2247 )
2248 .unwrap();
2249 let model = TableModel::from_config(&config).unwrap();
2250 let spec = model
2251 .resolve_index_specs(&config.index_specs)
2252 .unwrap()
2253 .remove(0);
2254
2255 let mut current_primary = HashSet::new();
2256 let mut current_secondary = HashSet::new();
2257
2258 fn first_twelve_bits_of_key(key: &[u8]) -> u16 {
2259 let first = u16::from(*key.first().unwrap_or(&0));
2260 let second = u16::from(*key.get(1).unwrap_or(&0));
2261 (first << 4) | (second >> 4)
2262 }
2263
2264 for first_byte in 0u8..=255 {
2265 let mut id = vec![0u8; 16];
2266 id[0] = first_byte;
2267 let mut bucket = vec![0u8; 16];
2268 bucket[0] = first_byte;
2269
2270 let pk = CellValue::FixedBinary(id.clone());
2271 let current_pk = encode_primary_key(model.table_prefix, &[&pk], &model).unwrap();
2272 current_primary.insert(first_twelve_bits_of_key(¤t_pk));
2273
2274 let row = KvRow {
2275 values: vec![
2276 CellValue::FixedBinary(id),
2277 CellValue::FixedBinary(bucket.clone()),
2278 ],
2279 };
2280 let current_index =
2281 encode_secondary_index_key(model.table_prefix, &spec, &model, &row).unwrap();
2282 current_secondary.insert(first_twelve_bits_of_key(¤t_index));
2283 }
2284
2285 assert_eq!(current_primary.len(), 128);
2288
2289 assert_eq!(current_secondary.len(), 8);
2292 }
2293
2294 #[test]
2295 fn kv_schema_auto_assigns_sequential_prefixes() {
2296 let client = StoreClient::new("http://localhost:10000");
2297 let schema = KvSchema::new(client)
2298 .table(
2299 "alpha",
2300 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2301 vec!["id".to_string()],
2302 vec![],
2303 )
2304 .unwrap()
2305 .table(
2306 "beta",
2307 vec![
2308 TableColumnConfig::new("id", DataType::Int64, false),
2309 TableColumnConfig::new("name", DataType::Utf8, false),
2310 ],
2311 vec!["id".to_string()],
2312 vec![],
2313 )
2314 .unwrap()
2315 .table(
2316 "gamma",
2317 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2318 vec!["id".to_string()],
2319 vec![],
2320 )
2321 .unwrap();
2322
2323 assert_eq!(schema.table_count(), 3);
2324 }
2325
2326 #[test]
2327 fn kv_schema_allows_max_codec_table_count_and_rejects_overflow() {
2328 let client = StoreClient::new("http://localhost:10000");
2329 let mut schema = KvSchema::new(client);
2330 for idx in 0..MAX_TABLES {
2331 schema = schema
2332 .table(
2333 format!("t{idx}"),
2334 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2335 vec!["id".to_string()],
2336 vec![],
2337 )
2338 .expect("tables up to codec capacity should be accepted");
2339 }
2340 assert_eq!(schema.table_count(), MAX_TABLES);
2341
2342 let overflow = schema.table(
2343 "overflow",
2344 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2345 vec!["id".to_string()],
2346 vec![],
2347 );
2348 match overflow {
2349 Ok(_) => panic!("overflow table should be rejected"),
2350 Err(err) => assert!(
2351 err.contains(&format!(
2352 "too many tables for codec layout (max {MAX_TABLES})"
2353 )),
2354 "overflow table should be rejected with codec-capacity error"
2355 ),
2356 }
2357 }
2358
2359 #[test]
2360 fn sequential_prefixes_produce_non_overlapping_pk_ranges() {
2361 let range_a = primary_key_prefix_range(0);
2362 let range_b = primary_key_prefix_range(1);
2363 let range_c = primary_key_prefix_range(2);
2364 assert!(range_a.end < range_b.start);
2365 assert!(range_b.end < range_c.start);
2366 }
2367
2368 #[test]
2369 fn sequential_prefixes_isolate_primary_keys() {
2370 let model_0 = simple_int64_model(0);
2371 let model_1 = simple_int64_model(1);
2372 let pk = CellValue::Int64(42);
2373 let key_a = encode_primary_key(0, &[&pk], &model_0).expect("pk key encodes");
2374 let key_b = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2375 assert_ne!(key_a, key_b);
2376 assert!(decode_primary_key(0, &key_a, &model_0).is_some());
2377 assert!(decode_primary_key(1, &key_a, &model_1).is_none());
2378 assert!(decode_primary_key(0, &key_b, &model_0).is_none());
2379 assert!(decode_primary_key(1, &key_b, &model_1).is_some());
2380 }
2381
2382 #[test]
2383 fn sequential_prefixes_isolate_secondary_keys() {
2384 let config_a = KvTableConfig::new(
2385 0,
2386 vec![
2387 TableColumnConfig::new("id", DataType::Int64, false),
2388 TableColumnConfig::new("name", DataType::Utf8, false),
2389 ],
2390 vec!["id".to_string()],
2391 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2392 )
2393 .unwrap();
2394 let config_b = KvTableConfig::new(
2395 1,
2396 vec![
2397 TableColumnConfig::new("id", DataType::Int64, false),
2398 TableColumnConfig::new("name", DataType::Utf8, false),
2399 ],
2400 vec!["id".to_string()],
2401 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2402 )
2403 .unwrap();
2404
2405 let model_a = TableModel::from_config(&config_a).unwrap();
2406 let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2407 let model_b = TableModel::from_config(&config_b).unwrap();
2408 let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2409
2410 let row = KvRow {
2411 values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2412 };
2413 let key_a =
2414 encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2415 let key_b =
2416 encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2417 assert_ne!(key_a, key_b);
2418 assert!(
2419 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2420 .is_none(),
2421 "key from prefix 1 must not decode under prefix 0"
2422 );
2423 }
2424
2425 #[tokio::test]
2426 async fn kv_schema_register_all_enables_join() {
2427 let ctx = SessionContext::new();
2428 let client = StoreClient::new("http://localhost:10000");
2429
2430 let result = KvSchema::new(client)
2431 .table(
2432 "customers",
2433 vec![
2434 TableColumnConfig::new("customer_id", DataType::Int64, false),
2435 TableColumnConfig::new("name", DataType::Utf8, false),
2436 ],
2437 vec!["customer_id".to_string()],
2438 vec![],
2439 )
2440 .unwrap()
2441 .table(
2442 "orders",
2443 vec![
2444 TableColumnConfig::new("order_id", DataType::Int64, false),
2445 TableColumnConfig::new("customer_id", DataType::Int64, false),
2446 TableColumnConfig::new("amount", DataType::Int64, false),
2447 ],
2448 vec!["order_id".to_string()],
2449 vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2450 )
2451 .unwrap()
2452 .register_all(&ctx);
2453
2454 assert!(
2455 result.is_ok(),
2456 "register_all must succeed: {:?}",
2457 result.err()
2458 );
2459
2460 let plan = ctx
2461 .sql(
2462 "SELECT c.name, o.order_id, o.amount \
2463 FROM orders o \
2464 JOIN customers c ON o.customer_id = c.customer_id",
2465 )
2466 .await;
2467 assert!(
2468 plan.is_ok(),
2469 "JOIN query must plan successfully: {:?}",
2470 plan.err()
2471 );
2472 }
2473
2474 #[tokio::test]
2475 async fn kv_schema_three_way_join() {
2476 let ctx = SessionContext::new();
2477 let client = StoreClient::new("http://localhost:10000");
2478
2479 KvSchema::new(client)
2480 .table(
2481 "products",
2482 vec![
2483 TableColumnConfig::new("product_id", DataType::Int64, false),
2484 TableColumnConfig::new("name", DataType::Utf8, false),
2485 TableColumnConfig::new("price", DataType::Int64, false),
2486 ],
2487 vec!["product_id".to_string()],
2488 vec![],
2489 )
2490 .unwrap()
2491 .table(
2492 "line_items",
2493 vec![
2494 TableColumnConfig::new("item_id", DataType::Int64, false),
2495 TableColumnConfig::new("order_id", DataType::Int64, false),
2496 TableColumnConfig::new("product_id", DataType::Int64, false),
2497 TableColumnConfig::new("qty", DataType::Int64, false),
2498 ],
2499 vec!["item_id".to_string()],
2500 vec![
2501 IndexSpec::new("prod_idx", vec!["product_id".to_string()]).unwrap(),
2502 IndexSpec::new("order_idx", vec!["order_id".to_string()]).unwrap(),
2503 ],
2504 )
2505 .unwrap()
2506 .table(
2507 "orders",
2508 vec![
2509 TableColumnConfig::new("order_id", DataType::Int64, false),
2510 TableColumnConfig::new("customer", DataType::Utf8, false),
2511 ],
2512 vec!["order_id".to_string()],
2513 vec![],
2514 )
2515 .unwrap()
2516 .register_all(&ctx)
2517 .unwrap();
2518
2519 let plan = ctx
2520 .sql(
2521 "SELECT o.customer, p.name, li.qty \
2522 FROM line_items li \
2523 JOIN products p ON li.product_id = p.product_id \
2524 JOIN orders o ON li.order_id = o.order_id",
2525 )
2526 .await;
2527 assert!(plan.is_ok(), "three-way JOIN must plan: {:?}", plan.err());
2528 }
2529
2530 #[test]
2531 fn kv_schema_orders_table_convenience() {
2532 let client = StoreClient::new("http://localhost:10000");
2533 let schema = KvSchema::new(client)
2534 .orders_table(
2535 "my_orders",
2536 vec![IndexSpec::new(
2537 "region_customer",
2538 vec!["region".to_string(), "customer_id".to_string()],
2539 )
2540 .unwrap()],
2541 )
2542 .unwrap();
2543 assert_eq!(schema.table_count(), 1);
2544 }
2545
2546 #[test]
2547 fn nullable_column_accepted_in_config() {
2548 let config = KvTableConfig::new(
2549 0,
2550 vec![
2551 TableColumnConfig::new("id", DataType::Int64, false),
2552 TableColumnConfig::new("name", DataType::Utf8, true),
2553 ],
2554 vec!["id".to_string()],
2555 vec![],
2556 );
2557 assert!(config.is_ok());
2558 }
2559
2560 #[test]
2561 fn nullable_column_rejected_in_index() {
2562 let config = KvTableConfig::new(
2563 0,
2564 vec![
2565 TableColumnConfig::new("id", DataType::Int64, false),
2566 TableColumnConfig::new("name", DataType::Utf8, true),
2567 ],
2568 vec!["id".to_string()],
2569 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2570 )
2571 .unwrap();
2572 let model = TableModel::from_config(&config).unwrap();
2573 let result = model.resolve_index_specs(&config.index_specs);
2574 assert!(result.is_err());
2575 assert!(result.unwrap_err().contains("nullable"));
2576 }
2577
2578 #[test]
2579 fn base_row_round_trip_with_null() {
2580 let config = KvTableConfig::new(
2581 0,
2582 vec![
2583 TableColumnConfig::new("id", DataType::Int64, false),
2584 TableColumnConfig::new("label", DataType::Utf8, true),
2585 TableColumnConfig::new("score", DataType::Int64, true),
2586 ],
2587 vec!["id".to_string()],
2588 vec![],
2589 )
2590 .unwrap();
2591 let model = TableModel::from_config(&config).unwrap();
2592 let row = KvRow {
2593 values: vec![CellValue::Int64(1), CellValue::Null, CellValue::Int64(42)],
2594 };
2595 let encoded = encode_base_row_value(&row, &model).unwrap();
2596 let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).unwrap();
2597 assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
2598 assert!(matches!(&decoded.values[1], CellValue::Null));
2599 assert!(matches!(&decoded.values[2], CellValue::Int64(42)));
2600 }
2601
2602 #[test]
2603 fn null_does_not_match_equality_constraint() {
2604 assert!(!matches_constraint(
2605 &CellValue::Null,
2606 &PredicateConstraint::StringEq("x".to_string())
2607 ));
2608 assert!(!matches_constraint(
2609 &CellValue::Null,
2610 &PredicateConstraint::IntRange {
2611 min: Some(0),
2612 max: Some(10)
2613 }
2614 ));
2615 }
2616
2617 #[test]
2618 fn is_null_constraint_matches() {
2619 assert!(matches_constraint(
2620 &CellValue::Null,
2621 &PredicateConstraint::IsNull
2622 ));
2623 assert!(!matches_constraint(
2624 &CellValue::Utf8("x".to_string()),
2625 &PredicateConstraint::IsNull
2626 ));
2627 assert!(!matches_constraint(
2628 &CellValue::Null,
2629 &PredicateConstraint::IsNotNull
2630 ));
2631 assert!(matches_constraint(
2632 &CellValue::Int64(5),
2633 &PredicateConstraint::IsNotNull
2634 ));
2635 }
2636
2637 #[test]
2638 fn string_in_constraint_matches() {
2639 let constraint =
2640 PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]);
2641 assert!(matches_constraint(
2642 &CellValue::Utf8("us-east".to_string()),
2643 &constraint,
2644 ));
2645 assert!(matches_constraint(
2646 &CellValue::Utf8("us-west".to_string()),
2647 &constraint,
2648 ));
2649 assert!(!matches_constraint(
2650 &CellValue::Utf8("eu-central".to_string()),
2651 &constraint,
2652 ));
2653 }
2654
2655 #[test]
2656 fn int_in_constraint_matches() {
2657 let constraint = PredicateConstraint::IntIn(vec![1, 2, 3]);
2658 assert!(matches_constraint(&CellValue::Int64(1), &constraint));
2659 assert!(matches_constraint(&CellValue::Int64(3), &constraint));
2660 assert!(!matches_constraint(&CellValue::Int64(4), &constraint));
2661 }
2662
2663 #[test]
2664 fn in_predicate_generates_multiple_index_ranges() {
2665 let (model, specs) = test_model();
2666 let region_idx = *model.columns_by_name.get("region").unwrap();
2667 let mut pred = QueryPredicate::default();
2668 pred.constraints.insert(
2669 region_idx,
2670 PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]),
2671 );
2672 let plan = pred
2673 .choose_index_plan(&model, &specs)
2674 .expect("plan")
2675 .expect("should find index");
2676 assert_eq!(plan.ranges.len(), 2);
2677 }
2678
2679 #[test]
2680 fn int_in_generates_multiple_pk_ranges() {
2681 let (model, _specs) = test_model();
2682 let mut pred = QueryPredicate::default();
2683 pred.constraints.insert(
2684 model.primary_key_indices[0],
2685 PredicateConstraint::IntIn(vec![100, 200, 300]),
2686 );
2687 let ranges = pred.primary_key_ranges(&model).unwrap();
2688 assert_eq!(ranges.len(), 3);
2689 }
2690
2691 #[test]
2692 fn duplicate_int_in_values_deduplicated() {
2693 let (model, _specs) = test_model();
2694 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2696 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2697 "order_id",
2698 ))),
2699 list: vec![
2700 Expr::Literal(ScalarValue::Int64(Some(5)), None),
2701 Expr::Literal(ScalarValue::Int64(Some(5)), None),
2702 Expr::Literal(ScalarValue::Int64(Some(10)), None),
2703 ],
2704 negated: false,
2705 });
2706 let pred = QueryPredicate::from_filters(&[filter], &model);
2707 let ranges = pred.primary_key_ranges(&model).unwrap();
2708 assert_eq!(
2709 ranges.len(),
2710 2,
2711 "duplicate IN values must be deduped, producing 2 ranges not 3"
2712 );
2713 }
2714
2715 #[test]
2716 fn duplicate_uint64_in_values_deduplicated() {
2717 let config = KvTableConfig::new(
2718 0,
2719 vec![
2720 TableColumnConfig::new("id", DataType::UInt64, false),
2721 TableColumnConfig::new("name", DataType::Utf8, false),
2722 ],
2723 vec!["id".to_string()],
2724 vec![],
2725 )
2726 .unwrap();
2727 let model = TableModel::from_config(&config).unwrap();
2728 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2729 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2730 "id",
2731 ))),
2732 list: vec![
2733 Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2734 Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2735 Expr::Literal(ScalarValue::UInt64(Some(200)), None),
2736 ],
2737 negated: false,
2738 });
2739 let pred = QueryPredicate::from_filters(&[filter], &model);
2740 let ranges = pred.primary_key_ranges(&model).unwrap();
2741 assert_eq!(
2742 ranges.len(),
2743 2,
2744 "duplicate UInt64 IN values must be deduped"
2745 );
2746 }
2747
2748 #[test]
2749 fn duplicate_fixed_binary_in_values_deduplicated() {
2750 let config = KvTableConfig::new(
2751 0,
2752 vec![
2753 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
2754 TableColumnConfig::new("val", DataType::Int64, false),
2755 ],
2756 vec!["hash".to_string()],
2757 vec![],
2758 )
2759 .unwrap();
2760 let model = TableModel::from_config(&config).unwrap();
2761 let dup_val = vec![0xAA; 16];
2762 let other_val = vec![0xBB; 16];
2763 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2764 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2765 "hash",
2766 ))),
2767 list: vec![
2768 Expr::Literal(
2769 ScalarValue::FixedSizeBinary(16, Some(dup_val.clone())),
2770 None,
2771 ),
2772 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(dup_val)), None),
2773 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(other_val)), None),
2774 ],
2775 negated: false,
2776 });
2777 let pred = QueryPredicate::from_filters(&[filter], &model);
2778 let ranges = pred.primary_key_ranges(&model).unwrap();
2779 assert_eq!(
2780 ranges.len(),
2781 2,
2782 "duplicate FixedBinary IN values must be deduped"
2783 );
2784 }
2785
2786 #[test]
2787 fn or_equalities_extracted_as_in_list() {
2788 let (model, _) = test_model();
2789 let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2790 left: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2791 left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2792 "region",
2793 ))),
2794 op: Operator::Eq,
2795 right: Box::new(Expr::Literal(
2796 ScalarValue::Utf8(Some("us-east".to_string())),
2797 None,
2798 )),
2799 })),
2800 op: Operator::Or,
2801 right: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2802 left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2803 "region",
2804 ))),
2805 op: Operator::Eq,
2806 right: Box::new(Expr::Literal(
2807 ScalarValue::Utf8(Some("us-west".to_string())),
2808 None,
2809 )),
2810 })),
2811 });
2812 let result = extract_or_in_column(&expr, &model);
2813 assert!(result.is_some());
2814 let (col, vals) = result.unwrap();
2815 assert_eq!(col, "region");
2816 assert_eq!(vals.len(), 2);
2817 }
2818
2819 #[test]
2820 fn or_equalities_on_float64_are_not_pushdown_supported() {
2821 let config = KvTableConfig::new(
2822 0,
2823 vec![
2824 TableColumnConfig::new("id", DataType::Int64, false),
2825 TableColumnConfig::new("score", DataType::Float64, false),
2826 ],
2827 vec!["id".to_string()],
2828 vec![],
2829 )
2830 .unwrap();
2831 let model = TableModel::from_config(&config).unwrap();
2832
2833 use datafusion::logical_expr::col;
2834 let filter = col("score")
2835 .eq(Expr::Literal(ScalarValue::Float64(Some(1.0)), None))
2836 .or(col("score").eq(Expr::Literal(ScalarValue::Float64(Some(2.0)), None)));
2837
2838 assert!(
2839 !QueryPredicate::supports_filter(&filter, &model),
2840 "OR-equality pushdown should be disabled for Float64 because apply_in_list cannot enforce it"
2841 );
2842
2843 let pred = QueryPredicate::from_filters(&[filter], &model);
2844 assert!(!pred.contradiction);
2845 assert!(
2846 pred.constraints.is_empty(),
2847 "unsupported OR predicate must not contribute pushdown constraints"
2848 );
2849 }
2850
2851 #[test]
2852 fn batch_writer_encodes_rows_across_tables() {
2853 let client = StoreClient::new("http://localhost:10000");
2854 let schema = KvSchema::new(client)
2855 .table(
2856 "customers",
2857 vec![
2858 TableColumnConfig::new("customer_id", DataType::Int64, false),
2859 TableColumnConfig::new("name", DataType::Utf8, false),
2860 ],
2861 vec!["customer_id".to_string()],
2862 vec![],
2863 )
2864 .unwrap()
2865 .table(
2866 "orders",
2867 vec![
2868 TableColumnConfig::new("order_id", DataType::Int64, false),
2869 TableColumnConfig::new("customer_id", DataType::Int64, false),
2870 TableColumnConfig::new("amount", DataType::Int64, false),
2871 ],
2872 vec!["order_id".to_string()],
2873 vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2874 )
2875 .unwrap();
2876
2877 let mut batch = schema.batch_writer();
2878 batch
2879 .insert(
2880 "customers",
2881 vec![CellValue::Int64(1), CellValue::Utf8("Alice".to_string())],
2882 )
2883 .unwrap();
2884 batch
2885 .insert(
2886 "orders",
2887 vec![
2888 CellValue::Int64(100),
2889 CellValue::Int64(1),
2890 CellValue::Int64(4999),
2891 ],
2892 )
2893 .unwrap();
2894 batch
2895 .insert(
2896 "orders",
2897 vec![
2898 CellValue::Int64(101),
2899 CellValue::Int64(1),
2900 CellValue::Int64(2999),
2901 ],
2902 )
2903 .unwrap();
2904
2905 assert_eq!(batch.pending_count(), 5);
2907 }
2908
2909 #[test]
2910 fn batch_writer_rejects_unknown_table() {
2911 let client = StoreClient::new("http://localhost:10000");
2912 let schema = KvSchema::new(client)
2913 .table(
2914 "t1",
2915 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2916 vec!["id".to_string()],
2917 vec![],
2918 )
2919 .unwrap();
2920
2921 let mut batch = schema.batch_writer();
2922 let result = batch.insert("nonexistent", vec![CellValue::Int64(1)]);
2923 assert!(result.is_err());
2924 assert!(result.unwrap_err().contains("unknown table"));
2925 }
2926
2927 #[test]
2928 fn batch_writer_rejects_wrong_column_count() {
2929 let client = StoreClient::new("http://localhost:10000");
2930 let schema = KvSchema::new(client)
2931 .table(
2932 "t1",
2933 vec![
2934 TableColumnConfig::new("id", DataType::Int64, false),
2935 TableColumnConfig::new("name", DataType::Utf8, false),
2936 ],
2937 vec!["id".to_string()],
2938 vec![],
2939 )
2940 .unwrap();
2941
2942 let mut batch = schema.batch_writer();
2943 let result = batch.insert("t1", vec![CellValue::Int64(1)]);
2944 assert!(result.is_err());
2945 assert!(result.unwrap_err().contains("expected 2"));
2946 }
2947
2948 #[test]
2949 fn batch_writer_rejects_non_pk_type_mismatch() {
2950 let client = StoreClient::new("http://localhost:10000");
2951 let schema = KvSchema::new(client)
2952 .table(
2953 "t1",
2954 vec![
2955 TableColumnConfig::new("id", DataType::Int64, false),
2956 TableColumnConfig::new("amount", DataType::Int64, false),
2957 ],
2958 vec!["id".to_string()],
2959 vec![],
2960 )
2961 .unwrap();
2962
2963 let mut batch = schema.batch_writer();
2964 let result = batch.insert(
2965 "t1",
2966 vec![CellValue::Int64(1), CellValue::Utf8("bad".to_string())],
2967 );
2968 assert!(result.is_err());
2969 assert!(
2970 result.unwrap_err().contains("type mismatch"),
2971 "non-PK schema-invalid values must be rejected at insert-time"
2972 );
2973 }
2974
2975 #[test]
2976 fn batch_writer_entries_use_distinct_table_prefixes() {
2977 let client = StoreClient::new("http://localhost:10000");
2978 let schema = KvSchema::new(client)
2979 .table(
2980 "a",
2981 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2982 vec!["id".to_string()],
2983 vec![],
2984 )
2985 .unwrap()
2986 .table(
2987 "b",
2988 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2989 vec!["id".to_string()],
2990 vec![],
2991 )
2992 .unwrap();
2993
2994 let mut batch = schema.batch_writer();
2995 batch.insert("a", vec![CellValue::Int64(42)]).unwrap();
2996 batch.insert("b", vec![CellValue::Int64(42)]).unwrap();
2997
2998 assert_eq!(batch.pending_count(), 2);
2999 assert_ne!(
3000 batch.pending_keys[0], batch.pending_keys[1],
3001 "same PK in different tables must produce different keys"
3002 );
3003 assert_ne!(
3004 batch.pending_keys[0][0], batch.pending_keys[1][0],
3005 "table prefix byte must differ"
3006 );
3007 }
3008
3009 #[test]
3010 fn batch_writer_supports_nullable_columns() {
3011 let client = StoreClient::new("http://localhost:10000");
3012 let schema = KvSchema::new(client)
3013 .table(
3014 "t",
3015 vec![
3016 TableColumnConfig::new("id", DataType::Int64, false),
3017 TableColumnConfig::new("note", DataType::Utf8, true),
3018 ],
3019 vec!["id".to_string()],
3020 vec![],
3021 )
3022 .unwrap();
3023
3024 let mut batch = schema.batch_writer();
3025 batch
3026 .insert("t", vec![CellValue::Int64(1), CellValue::Null])
3027 .unwrap();
3028 assert_eq!(batch.pending_count(), 1);
3029 }
3030
3031 #[test]
3032 fn non_nullable_column_rejects_null_in_batch_writer() {
3033 let client = StoreClient::new("http://localhost:10000");
3034 let schema = KvSchema::new(client)
3035 .table(
3036 "t",
3037 vec![
3038 TableColumnConfig::new("id", DataType::Int64, false),
3039 TableColumnConfig::new("name", DataType::Utf8, false),
3040 TableColumnConfig::new("note", DataType::Utf8, true),
3041 ],
3042 vec!["id".to_string()],
3043 vec![],
3044 )
3045 .unwrap();
3046
3047 let mut batch = schema.batch_writer();
3049 let result = batch.insert(
3050 "t",
3051 vec![
3052 CellValue::Int64(1),
3053 CellValue::Null,
3054 CellValue::Utf8("ok".to_string()),
3055 ],
3056 );
3057 assert!(result.is_err());
3058 assert!(
3059 result.unwrap_err().contains("not nullable"),
3060 "error should mention non-nullable constraint"
3061 );
3062
3063 let mut batch = schema.batch_writer();
3065 batch
3066 .insert(
3067 "t",
3068 vec![
3069 CellValue::Int64(1),
3070 CellValue::Utf8("Alice".to_string()),
3071 CellValue::Null,
3072 ],
3073 )
3074 .unwrap();
3075 assert_eq!(batch.pending_count(), 1);
3076
3077 let mut batch = schema.batch_writer();
3079 batch
3080 .insert(
3081 "t",
3082 vec![
3083 CellValue::Int64(1),
3084 CellValue::Utf8("Alice".to_string()),
3085 CellValue::Utf8("hello".to_string()),
3086 ],
3087 )
3088 .unwrap();
3089 assert_eq!(batch.pending_count(), 1);
3090 }
3091
3092 #[test]
3093 fn uint64_column_accepted() {
3094 let config = KvTableConfig::new(
3095 0,
3096 vec![
3097 TableColumnConfig::new("id", DataType::UInt64, false),
3098 TableColumnConfig::new("name", DataType::Utf8, false),
3099 ],
3100 vec!["id".to_string()],
3101 vec![],
3102 );
3103 assert!(config.is_ok());
3104 }
3105
3106 #[test]
3107 fn uint64_primary_key_round_trip() {
3108 let config = KvTableConfig::new(
3109 0,
3110 vec![
3111 TableColumnConfig::new("id", DataType::UInt64, false),
3112 TableColumnConfig::new("label", DataType::Utf8, false),
3113 ],
3114 vec!["id".to_string()],
3115 vec![],
3116 )
3117 .unwrap();
3118 let model = TableModel::from_config(&config).unwrap();
3119 let row = KvRow {
3120 values: vec![
3121 CellValue::UInt64(u64::MAX),
3122 CellValue::Utf8("max".to_string()),
3123 ],
3124 };
3125 let encoded = encode_base_row_value(&row, &model).unwrap();
3126 let pk = row
3127 .primary_key_values(&model)
3128 .into_iter()
3129 .cloned()
3130 .collect::<Vec<_>>();
3131 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3132 assert!(matches!(&decoded.values[0], CellValue::UInt64(v) if *v == u64::MAX));
3133 assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "max"));
3134 }
3135
3136 #[test]
3137 fn string_primary_key_accepted() {
3138 let config = KvTableConfig::new(
3139 0,
3140 vec![
3141 TableColumnConfig::new("code", DataType::Utf8, false),
3142 TableColumnConfig::new("value", DataType::Int64, false),
3143 ],
3144 vec!["code".to_string()],
3145 vec![],
3146 );
3147 assert!(config.is_ok());
3148 }
3149
3150 #[test]
3151 fn fixed_binary_primary_key_round_trip() {
3152 let config = KvTableConfig::new(
3153 0,
3154 vec![
3155 TableColumnConfig::new("hash", DataType::FixedSizeBinary(32), false),
3156 TableColumnConfig::new("amount", DataType::Int64, false),
3157 ],
3158 vec!["hash".to_string()],
3159 vec![],
3160 )
3161 .unwrap();
3162 let model = TableModel::from_config(&config).unwrap();
3163 let hash_val = vec![0xABu8; 32];
3164 let row = KvRow {
3165 values: vec![
3166 CellValue::FixedBinary(hash_val.clone()),
3167 CellValue::Int64(100),
3168 ],
3169 };
3170 let encoded = encode_base_row_value(&row, &model).unwrap();
3171 let pk = row
3172 .primary_key_values(&model)
3173 .into_iter()
3174 .cloned()
3175 .collect::<Vec<_>>();
3176 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3177 assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if *v == hash_val));
3178 }
3179
3180 #[test]
3181 fn fixed_binary_key_rejects_wrong_length() {
3182 let config = KvTableConfig::new(
3183 0,
3184 vec![
3185 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3186 TableColumnConfig::new("amount", DataType::Int64, false),
3187 ],
3188 vec!["hash".to_string()],
3189 vec![],
3190 )
3191 .unwrap();
3192 let model = TableModel::from_config(&config).unwrap();
3193
3194 let short_row = KvRow {
3196 values: vec![CellValue::FixedBinary(vec![0xAB; 10]), CellValue::Int64(1)],
3197 };
3198 let result = encode_primary_key_from_row(model.table_prefix, &short_row, &model);
3199 assert!(result.is_err());
3200 assert!(
3201 result.unwrap_err().contains("requires exactly 16 bytes"),
3202 "should mention exact width requirement"
3203 );
3204
3205 let long_row = KvRow {
3207 values: vec![CellValue::FixedBinary(vec![0xCD; 20]), CellValue::Int64(2)],
3208 };
3209 let result = encode_primary_key_from_row(model.table_prefix, &long_row, &model);
3210 assert!(result.is_err());
3211 assert!(result.unwrap_err().contains("requires exactly 16 bytes"));
3212
3213 let ok_row = KvRow {
3215 values: vec![CellValue::FixedBinary(vec![0xEF; 16]), CellValue::Int64(3)],
3216 };
3217 assert!(encode_primary_key_from_row(model.table_prefix, &ok_row, &model).is_ok());
3218 }
3219
3220 #[test]
3221 fn fixed_binary_index_key_rejects_wrong_length() {
3222 let config = KvTableConfig::new(
3223 0,
3224 vec![
3225 TableColumnConfig::new("id", DataType::Int64, false),
3226 TableColumnConfig::new("tag", DataType::FixedSizeBinary(8), false),
3227 ],
3228 vec!["id".to_string()],
3229 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3230 )
3231 .unwrap();
3232 let model = TableModel::from_config(&config).unwrap();
3233 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3234
3235 let bad_row = KvRow {
3237 values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x01; 4])],
3238 };
3239 let result = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &bad_row);
3240 assert!(result.is_err());
3241 assert!(result.unwrap_err().contains("requires exactly 8 bytes"));
3242
3243 let ok_row = KvRow {
3245 values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x02; 8])],
3246 };
3247 assert!(encode_secondary_index_key(model.table_prefix, &specs[0], &model, &ok_row).is_ok());
3248 }
3249
3250 #[test]
3251 fn decimal256_column_round_trip() {
3252 let config = KvTableConfig::new(
3253 0,
3254 vec![
3255 TableColumnConfig::new("id", DataType::Int64, false),
3256 TableColumnConfig::new("balance", DataType::Decimal256(76, 0), false),
3257 ],
3258 vec!["id".to_string()],
3259 vec![],
3260 )
3261 .unwrap();
3262 let model = TableModel::from_config(&config).unwrap();
3263 let big_val = i256::from(123456789012345i64);
3264 let row = KvRow {
3265 values: vec![CellValue::Int64(1), CellValue::Decimal256(big_val)],
3266 };
3267 let encoded = encode_base_row_value(&row, &model).unwrap();
3268 let pk = row
3269 .primary_key_values(&model)
3270 .into_iter()
3271 .cloned()
3272 .collect::<Vec<_>>();
3273 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3274 assert!(matches!(&decoded.values[1], CellValue::Decimal256(v) if *v == big_val));
3275 }
3276
3277 #[test]
3278 fn float64_primary_key_rejected() {
3279 let config = KvTableConfig::new(
3280 0,
3281 vec![TableColumnConfig::new("id", DataType::Float64, false)],
3282 vec!["id".to_string()],
3283 vec![],
3284 );
3285 assert!(config.is_err());
3286 }
3287
3288 #[test]
3289 fn i256_ordered_encoding_round_trip() {
3290 let values = [
3291 i256::from_i128(i128::MIN),
3292 i256::from(-1i64),
3293 i256::from(0i64),
3294 i256::from(1i64),
3295 i256::from_i128(i128::MAX),
3296 ];
3297 for v in values {
3298 assert_eq!(decode_i256_ordered(encode_i256_ordered(v)), v);
3299 }
3300 let encoded: Vec<[u8; 32]> = values.iter().map(|v| encode_i256_ordered(*v)).collect();
3301 for i in 0..encoded.len() - 1 {
3302 assert!(encoded[i] < encoded[i + 1]);
3303 }
3304 }
3305
3306 #[test]
3307 fn uint64_primary_key_encode_decode() {
3308 let config = KvTableConfig::new(
3309 5,
3310 vec![
3311 TableColumnConfig::new("id", DataType::UInt64, false),
3312 TableColumnConfig::new("name", DataType::Utf8, false),
3313 ],
3314 vec!["id".to_string()],
3315 vec![],
3316 )
3317 .unwrap();
3318 let model = TableModel::from_config(&config).unwrap();
3319 let pk = CellValue::UInt64(12345);
3320 let key = encode_primary_key(5, &[&pk], &model).expect("pk key encodes");
3321 let decoded = decode_primary_key(5, &key, &model).unwrap();
3322 assert!(matches!(&decoded[0], CellValue::UInt64(12345)));
3323 }
3324
3325 #[test]
3326 fn utf8_primary_key_encode_decode() {
3327 let config = KvTableConfig::new(
3328 3,
3329 vec![
3330 TableColumnConfig::new("code", DataType::Utf8, false),
3331 TableColumnConfig::new("val", DataType::Int64, false),
3332 ],
3333 vec!["code".to_string()],
3334 vec![],
3335 )
3336 .unwrap();
3337 let model = TableModel::from_config(&config).unwrap();
3338 let pk = CellValue::Utf8("HELLO".to_string());
3339 let key = encode_primary_key(3, &[&pk], &model).expect("pk key encodes");
3340 let decoded = decode_primary_key(3, &key, &model).unwrap();
3341 assert!(matches!(&decoded[0], CellValue::Utf8(v) if v == "HELLO"));
3342 }
3343
3344 #[test]
3345 fn fixed_binary_primary_key_encode_decode() {
3346 let config = KvTableConfig::new(
3347 7,
3348 vec![
3349 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3350 TableColumnConfig::new("val", DataType::Int64, false),
3351 ],
3352 vec!["hash".to_string()],
3353 vec![],
3354 )
3355 .unwrap();
3356 let model = TableModel::from_config(&config).unwrap();
3357 let data = vec![0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
3358 let pk = CellValue::FixedBinary(data.clone());
3359 let key = encode_primary_key(7, &[&pk], &model).expect("pk key encodes");
3360 let decoded = decode_primary_key(7, &key, &model).unwrap();
3361 assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == data));
3362 }
3363
3364 #[test]
3365 fn secondary_index_with_uint64_column() {
3366 let config = KvTableConfig::new(
3367 0,
3368 vec![
3369 TableColumnConfig::new("id", DataType::Int64, false),
3370 TableColumnConfig::new("counter", DataType::UInt64, false),
3371 ],
3372 vec!["id".to_string()],
3373 vec![IndexSpec::new("counter_idx", vec!["counter".to_string()]).unwrap()],
3374 )
3375 .unwrap();
3376 let model = TableModel::from_config(&config).unwrap();
3377 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3378 let row = KvRow {
3379 values: vec![CellValue::Int64(1), CellValue::UInt64(999)],
3380 };
3381 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3382 let decoded =
3383 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3384 let counter_idx = *model.columns_by_name.get("counter").unwrap();
3385 assert!(matches!(
3386 decoded.values.get(&counter_idx),
3387 Some(CellValue::UInt64(999))
3388 ));
3389 assert!(matches!(
3390 &decoded.primary_key_values[0],
3391 CellValue::Int64(1)
3392 ));
3393 }
3394
3395 #[test]
3396 fn secondary_index_with_decimal256_column() {
3397 let config = KvTableConfig::new(
3398 0,
3399 vec![
3400 TableColumnConfig::new("id", DataType::Int64, false),
3401 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
3402 ],
3403 vec!["id".to_string()],
3404 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
3405 )
3406 .unwrap();
3407 let model = TableModel::from_config(&config).unwrap();
3408 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3409 let val = i256::from(42i64);
3410 let row = KvRow {
3411 values: vec![CellValue::Int64(1), CellValue::Decimal256(val)],
3412 };
3413 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3414 let decoded =
3415 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3416 let big_idx = *model.columns_by_name.get("big_val").unwrap();
3417 assert!(matches!(
3418 decoded.values.get(&big_idx),
3419 Some(CellValue::Decimal256(v)) if *v == val
3420 ));
3421 }
3422
3423 #[test]
3428 fn composite_pk_config_accepted() {
3429 let config = KvTableConfig::new(
3430 0,
3431 vec![
3432 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3433 TableColumnConfig::new("version", DataType::UInt64, false),
3434 TableColumnConfig::new("data", DataType::Utf8, true),
3435 ],
3436 vec!["entity".to_string(), "version".to_string()],
3437 vec![],
3438 );
3439 assert!(config.is_ok());
3440 let c = config.unwrap();
3441 assert_eq!(c.primary_key_columns, vec!["entity", "version"]);
3442 }
3443
3444 #[test]
3445 fn composite_pk_rejects_unsupported_type() {
3446 let result = KvTableConfig::new(
3447 0,
3448 vec![
3449 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3450 TableColumnConfig::new("score", DataType::Float64, false),
3451 ],
3452 vec!["entity".to_string(), "score".to_string()],
3453 vec![],
3454 );
3455 assert!(result.is_err());
3456 let err = result.unwrap_err();
3457 assert!(
3458 err.contains("must be Int64") || err.contains("must be"),
3459 "expected PK type error, got: {err}"
3460 );
3461 }
3462
3463 #[test]
3464 fn composite_pk_rejects_too_wide() {
3465 let config = KvTableConfig::new(
3466 0,
3467 vec![
3468 TableColumnConfig::new("big", DataType::FixedSizeBinary(60), false),
3469 TableColumnConfig::new("ver", DataType::UInt64, false),
3470 ],
3471 vec!["big".to_string(), "ver".to_string()],
3472 vec![],
3473 )
3474 .expect("variable-length keys should allow wider composite PKs");
3475 let model = TableModel::from_config(&config).expect("model");
3476 assert_eq!(model.primary_key_width, 68);
3477 }
3478
3479 #[test]
3480 fn composite_pk_encode_decode_round_trip() {
3481 let config = KvTableConfig::new(
3482 1,
3483 vec![
3484 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3485 TableColumnConfig::new("version", DataType::UInt64, false),
3486 TableColumnConfig::new("title", DataType::Utf8, true),
3487 ],
3488 vec!["entity".to_string(), "version".to_string()],
3489 vec![],
3490 )
3491 .unwrap();
3492 let model = TableModel::from_config(&config).unwrap();
3493
3494 let entity = vec![0xAA; 32];
3495 let pk_entity = CellValue::FixedBinary(entity.clone());
3496 let pk_version = CellValue::UInt64(42);
3497 let key =
3498 encode_primary_key(1, &[&pk_entity, &pk_version], &model).expect("pk key encodes");
3499
3500 let decoded = decode_primary_key(1, &key, &model).unwrap();
3501 assert_eq!(decoded.len(), 2);
3502 assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == entity));
3503 assert!(matches!(&decoded[1], CellValue::UInt64(42)));
3504 }
3505
3506 #[test]
3507 fn composite_pk_version_sort_order() {
3508 let config = KvTableConfig::new(
3509 0,
3510 vec![
3511 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3512 TableColumnConfig::new("version", DataType::UInt64, false),
3513 ],
3514 vec!["entity".to_string(), "version".to_string()],
3515 vec![],
3516 )
3517 .unwrap();
3518 let model = TableModel::from_config(&config).unwrap();
3519
3520 let entity = vec![0xBB; 32];
3521 let pk_entity = CellValue::FixedBinary(entity.clone());
3522
3523 let key_v1 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(1)], &model)
3524 .expect("pk key encodes");
3525 let key_v10 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(10)], &model)
3526 .expect("pk key encodes");
3527 let key_v100 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(100)], &model)
3528 .expect("pk key encodes");
3529
3530 assert!(key_v1 < key_v10);
3532 assert!(key_v10 < key_v100);
3533 }
3534
3535 #[test]
3536 fn composite_pk_value_excludes_all_pk_columns() {
3537 let config = KvTableConfig::new(
3538 0,
3539 vec![
3540 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3541 TableColumnConfig::new("version", DataType::UInt64, false),
3542 TableColumnConfig::new("data", DataType::Utf8, true),
3543 ],
3544 vec!["entity".to_string(), "version".to_string()],
3545 vec![],
3546 )
3547 .unwrap();
3548 let model = TableModel::from_config(&config).unwrap();
3549
3550 let row = KvRow {
3551 values: vec![
3552 CellValue::FixedBinary(vec![0xCC; 16]),
3553 CellValue::UInt64(7),
3554 CellValue::Utf8("hello".to_string()),
3555 ],
3556 };
3557 let encoded = encode_base_row_value(&row, &model).unwrap();
3558 let decoded = decode_base_row(
3560 vec![CellValue::FixedBinary(vec![0xCC; 16]), CellValue::UInt64(7)],
3561 &encoded,
3562 &model,
3563 )
3564 .unwrap();
3565 assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if v.len() == 16));
3566 assert!(matches!(&decoded.values[1], CellValue::UInt64(7)));
3567 assert!(matches!(&decoded.values[2], CellValue::Utf8(v) if v == "hello"));
3568 }
3569
3570 #[test]
3571 fn composite_pk_secondary_index_appends_all_pk_columns() {
3572 let config = KvTableConfig::new(
3573 0,
3574 vec![
3575 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3576 TableColumnConfig::new("version", DataType::UInt64, false),
3577 TableColumnConfig::new("tag", DataType::Int64, false),
3578 ],
3579 vec!["entity".to_string(), "version".to_string()],
3580 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3581 )
3582 .unwrap();
3583 let model = TableModel::from_config(&config).unwrap();
3584 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3585
3586 let entity_data = vec![0xDD; 16];
3587 let row = KvRow {
3588 values: vec![
3589 CellValue::FixedBinary(entity_data.clone()),
3590 CellValue::UInt64(99),
3591 CellValue::Int64(42),
3592 ],
3593 };
3594 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3595 let decoded =
3596 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3597
3598 assert_eq!(decoded.primary_key_values.len(), 2);
3599 assert!(matches!(
3600 &decoded.primary_key_values[0],
3601 CellValue::FixedBinary(v) if *v == entity_data
3602 ));
3603 assert!(matches!(
3604 &decoded.primary_key_values[1],
3605 CellValue::UInt64(99)
3606 ));
3607 let tag_idx = *model.columns_by_name.get("tag").unwrap();
3608 assert!(matches!(
3609 decoded.values.get(&tag_idx),
3610 Some(CellValue::Int64(42))
3611 ));
3612 }
3613
3614 #[test]
3615 fn table_versioned_convenience() {
3616 let client = StoreClient::new("http://localhost:10000");
3617 let schema = KvSchema::new(client)
3618 .table_versioned(
3619 "documents",
3620 vec![
3621 TableColumnConfig::new("doc_id", DataType::FixedSizeBinary(32), false),
3622 TableColumnConfig::new("version", DataType::UInt64, false),
3623 TableColumnConfig::new("title", DataType::Utf8, false),
3624 ],
3625 "doc_id",
3626 "version",
3627 vec![],
3628 )
3629 .unwrap();
3630 assert_eq!(schema.table_count(), 1);
3631 }
3632
3633 #[test]
3634 fn single_column_pk_backward_compat() {
3635 let config = KvTableConfig::new(
3637 0,
3638 vec![
3639 TableColumnConfig::new("id", DataType::Int64, false),
3640 TableColumnConfig::new("name", DataType::Utf8, true),
3641 ],
3642 vec!["id".to_string()],
3643 vec![],
3644 )
3645 .unwrap();
3646 let model = TableModel::from_config(&config).unwrap();
3647 assert_eq!(model.primary_key_indices.len(), 1);
3648 assert_eq!(model.primary_key_indices[0], 0);
3649 assert_eq!(model.primary_key_width, 8);
3650
3651 let pk = CellValue::Int64(42);
3652 let key = encode_primary_key(0, &[&pk], &model).expect("pk key encodes");
3653 let decoded = decode_primary_key(0, &key, &model).unwrap();
3654 assert_eq!(decoded.len(), 1);
3655 assert!(matches!(&decoded[0], CellValue::Int64(42)));
3656 }
3657
3658 #[test]
3659 fn partial_prefix_upper_bound_fills_trailing_pk_bytes() {
3660 let config = KvTableConfig::new(
3665 0,
3666 vec![
3667 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3668 TableColumnConfig::new("version", DataType::UInt64, false),
3669 ],
3670 vec!["entity".to_string(), "version".to_string()],
3671 vec![],
3672 )
3673 .unwrap();
3674 let model = TableModel::from_config(&config).unwrap();
3675 assert_eq!(model.primary_key_width, 24); let entity = CellValue::FixedBinary(vec![0xAA; 16]);
3678 let upper =
3680 encode_primary_key_bound(0, &[&entity], &model, true).expect("pk bound encodes");
3681
3682 assert_eq!(primary_payload(&model, &upper, 0, 16), vec![0xAA; 16]);
3684 assert_eq!(
3686 primary_payload(&model, &upper, 16, 8),
3687 vec![0xFF; 8],
3688 "trailing PK column (version) must be 0xFF for upper bound"
3689 );
3690 assert!(primary_payload(
3692 &model,
3693 &upper,
3694 24,
3695 model.primary_key_codec.payload_capacity_bytes() - 24
3696 )
3697 .iter()
3698 .all(|&b| b == 0xFF));
3699
3700 let lower =
3702 encode_primary_key_bound(0, &[&entity], &model, false).expect("pk bound encodes");
3703 assert_eq!(primary_payload(&model, &lower, 0, 16), vec![0xAA; 16]);
3704 assert_eq!(
3705 primary_payload(&model, &lower, 16, 8),
3706 vec![0x00; 8],
3707 "trailing PK column (version) must be 0x00 for lower bound"
3708 );
3709 }
3710
3711 #[test]
3716 fn composite_pk_range_pushdown_entity_eq_version_lte() {
3717 let config = KvTableConfig::new(
3721 0,
3722 vec![
3723 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3724 TableColumnConfig::new("version", DataType::UInt64, false),
3725 TableColumnConfig::new("data", DataType::Utf8, true),
3726 ],
3727 vec!["entity".to_string(), "version".to_string()],
3728 vec![],
3729 )
3730 .unwrap();
3731 let model = TableModel::from_config(&config).unwrap();
3732
3733 let mut pred = QueryPredicate::default();
3735 pred.constraints
3736 .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xCC; 16]));
3737 pred.constraints.insert(
3738 1,
3739 PredicateConstraint::UInt64Range {
3740 min: None,
3741 max: Some(42),
3742 },
3743 );
3744
3745 let ranges = pred.primary_key_ranges(&model).unwrap();
3746 assert_eq!(ranges.len(), 1, "should produce exactly one range");
3747
3748 let range = &ranges[0];
3749
3750 let expected_start = encode_primary_key(
3752 0,
3753 &[
3754 &CellValue::FixedBinary(vec![0xCC; 16]),
3755 &CellValue::UInt64(0),
3756 ],
3757 &model,
3758 )
3759 .expect("pk key encodes");
3760 assert_eq!(
3761 range.start, expected_start,
3762 "start should be entity=CC..CC, version=0"
3763 );
3764
3765 let expected_end_prefix = encode_primary_key(
3767 0,
3768 &[
3769 &CellValue::FixedBinary(vec![0xCC; 16]),
3770 &CellValue::UInt64(42),
3771 ],
3772 &model,
3773 )
3774 .expect("pk key encodes");
3775 assert_eq!(
3777 primary_payload(&model, &range.end, 0, model.primary_key_width),
3778 primary_payload(&model, &expected_end_prefix, 0, model.primary_key_width),
3779 "end prefix should be entity=CC..CC, version=42"
3780 );
3781 assert!(
3783 primary_payload(
3784 &model,
3785 &range.end,
3786 model.primary_key_width,
3787 model.primary_key_codec.payload_capacity_bytes() - model.primary_key_width
3788 )
3789 .iter()
3790 .all(|&b| b == 0xFF),
3791 "end trailing bytes should be 0xFF"
3792 );
3793
3794 let full_range = primary_key_prefix_range(0);
3796 assert_ne!(
3797 range.start, full_range.start,
3798 "range must not be a full table scan"
3799 );
3800 }
3801
3802 #[test]
3803 fn composite_pk_range_pushdown_entity_eq_only() {
3804 let config = KvTableConfig::new(
3808 0,
3809 vec![
3810 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3811 TableColumnConfig::new("version", DataType::UInt64, false),
3812 ],
3813 vec!["entity".to_string(), "version".to_string()],
3814 vec![],
3815 )
3816 .unwrap();
3817 let model = TableModel::from_config(&config).unwrap();
3818
3819 let mut pred = QueryPredicate::default();
3820 pred.constraints
3821 .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xDD; 16]));
3822
3823 let ranges = pred.primary_key_ranges(&model).unwrap();
3824 assert_eq!(ranges.len(), 1);
3825
3826 let range = &ranges[0];
3827 assert_eq!(primary_payload(&model, &range.start, 0, 16), vec![0xDD; 16]);
3829 assert_eq!(primary_payload(&model, &range.end, 0, 16), vec![0xDD; 16]);
3831 assert!(
3832 primary_payload(
3833 &model,
3834 &range.end,
3835 16,
3836 model.primary_key_codec.payload_capacity_bytes() - 16
3837 )
3838 .iter()
3839 .all(|&b| b == 0xFF),
3840 "after entity bytes, everything should be 0xFF"
3841 );
3842 }
3843
3844 #[test]
3845 fn fixed_binary_eq_constraint_extracted() {
3846 let config = KvTableConfig::new(
3847 0,
3848 vec![
3849 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3850 TableColumnConfig::new("version", DataType::UInt64, false),
3851 ],
3852 vec!["entity".to_string(), "version".to_string()],
3853 vec![],
3854 )
3855 .unwrap();
3856 let model = TableModel::from_config(&config).unwrap();
3857
3858 use datafusion::logical_expr::col;
3860 let entity_literal =
3861 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None);
3862 let filter = col("entity").eq(entity_literal);
3863
3864 assert!(
3865 QueryPredicate::supports_filter(&filter, &model),
3866 "FixedSizeBinary equality should be supported"
3867 );
3868
3869 let pred = QueryPredicate::from_filters(&[filter], &model);
3870 assert!(
3871 matches!(
3872 pred.constraints.get(&0),
3873 Some(PredicateConstraint::FixedBinaryEq(v)) if *v == vec![0xAA; 16]
3874 ),
3875 "should extract FixedBinaryEq constraint"
3876 );
3877 }
3878
3879 #[test]
3880 fn uint64_range_constraint_extracted() {
3881 let config = KvTableConfig::new(
3882 0,
3883 vec![
3884 TableColumnConfig::new("version", DataType::UInt64, false),
3885 TableColumnConfig::new("data", DataType::Utf8, true),
3886 ],
3887 vec!["version".to_string()],
3888 vec![],
3889 )
3890 .unwrap();
3891 let model = TableModel::from_config(&config).unwrap();
3892
3893 use datafusion::logical_expr::col;
3894 let filter = col("version").lt_eq(Expr::Literal(ScalarValue::UInt64(Some(42)), None));
3895
3896 assert!(
3897 QueryPredicate::supports_filter(&filter, &model),
3898 "UInt64 range should be supported"
3899 );
3900
3901 let pred = QueryPredicate::from_filters(&[filter], &model);
3902 assert!(
3903 matches!(
3904 pred.constraints.get(&0),
3905 Some(PredicateConstraint::UInt64Range {
3906 min: None,
3907 max: Some(42)
3908 })
3909 ),
3910 "should extract UInt64Range with max=42"
3911 );
3912 }
3913
3914 #[test]
3915 fn uint64_range_constraint_supports_values_above_i64_max() {
3916 let config = KvTableConfig::new(
3917 0,
3918 vec![
3919 TableColumnConfig::new("version", DataType::UInt64, false),
3920 TableColumnConfig::new("data", DataType::Utf8, true),
3921 ],
3922 vec!["version".to_string()],
3923 vec![],
3924 )
3925 .unwrap();
3926 let model = TableModel::from_config(&config).unwrap();
3927
3928 let threshold = (1u64 << 63) + 5;
3929 use datafusion::logical_expr::col;
3930 let filter =
3931 col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(threshold)), None));
3932
3933 assert!(QueryPredicate::supports_filter(&filter, &model));
3934
3935 let pred = QueryPredicate::from_filters(&[filter], &model);
3936 assert!(matches!(
3937 pred.constraints.get(&0),
3938 Some(PredicateConstraint::UInt64Range {
3939 min: Some(v),
3940 max: None
3941 }) if *v == threshold
3942 ));
3943 }
3944
3945 #[test]
3946 fn unsupported_uint64_comparison_does_not_force_contradiction() {
3947 let config = KvTableConfig::new(
3948 0,
3949 vec![
3950 TableColumnConfig::new("version", DataType::UInt64, false),
3951 TableColumnConfig::new("data", DataType::Utf8, true),
3952 ],
3953 vec!["version".to_string()],
3954 vec![],
3955 )
3956 .unwrap();
3957 let model = TableModel::from_config(&config).unwrap();
3958
3959 use datafusion::logical_expr::col;
3960 let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3961
3962 assert!(
3963 !QueryPredicate::supports_filter(&unsupported, &model),
3964 "negative Int64 literal on UInt64 column should not be pushdown-supported"
3965 );
3966
3967 let pred = QueryPredicate::from_filters(&[unsupported], &model);
3968 assert!(
3969 !pred.contradiction,
3970 "unsupported filter must not collapse scan to empty result"
3971 );
3972 assert!(
3973 pred.constraints.is_empty(),
3974 "unsupported filter must not contribute pushed constraints"
3975 );
3976 }
3977
3978 #[test]
3979 fn unsupported_uint64_comparison_in_and_keeps_supported_sibling() {
3980 let config = KvTableConfig::new(
3981 0,
3982 vec![
3983 TableColumnConfig::new("version", DataType::UInt64, false),
3984 TableColumnConfig::new("data", DataType::Utf8, true),
3985 ],
3986 vec!["version".to_string()],
3987 vec![],
3988 )
3989 .unwrap();
3990 let model = TableModel::from_config(&config).unwrap();
3991
3992 use datafusion::logical_expr::col;
3993 let supported = col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(10)), None));
3994 let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3995 let filter = supported.and(unsupported);
3996
3997 assert!(
3998 !QueryPredicate::supports_filter(&filter, &model),
3999 "mixed AND should not be marked fully pushdown-supported"
4000 );
4001
4002 let pred = QueryPredicate::from_filters(&[filter], &model);
4003 assert!(!pred.contradiction);
4004 assert!(matches!(
4005 pred.constraints.get(&0),
4006 Some(PredicateConstraint::UInt64Range {
4007 min: Some(10),
4008 max: None
4009 })
4010 ));
4011 }
4012
4013 #[test]
4014 fn uint64_in_list_pushdown() {
4015 let config = KvTableConfig::new(
4016 0,
4017 vec![
4018 TableColumnConfig::new("version", DataType::UInt64, false),
4019 TableColumnConfig::new("data", DataType::Utf8, true),
4020 ],
4021 vec!["version".to_string()],
4022 vec![],
4023 )
4024 .unwrap();
4025 let model = TableModel::from_config(&config).unwrap();
4026
4027 use datafusion::logical_expr::{col, in_list};
4028 let filter = in_list(
4029 col("version"),
4030 vec![
4031 Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4032 Expr::Literal(ScalarValue::UInt64(Some(5)), None),
4033 Expr::Literal(ScalarValue::UInt64(Some(10)), None),
4034 ],
4035 false,
4036 );
4037
4038 assert!(QueryPredicate::supports_filter(&filter, &model));
4039 let pred = QueryPredicate::from_filters(&[filter], &model);
4040 assert!(
4041 matches!(pred.constraints.get(&0), Some(PredicateConstraint::UInt64In(v)) if v.len() == 3),
4042 "should extract UInt64In with 3 values"
4043 );
4044 }
4045
4046 #[test]
4047 fn uint64_in_list_pushdown_supports_values_above_i64_max() {
4048 let config = KvTableConfig::new(
4049 0,
4050 vec![
4051 TableColumnConfig::new("version", DataType::UInt64, false),
4052 TableColumnConfig::new("data", DataType::Utf8, true),
4053 ],
4054 vec!["version".to_string()],
4055 vec![],
4056 )
4057 .unwrap();
4058 let model = TableModel::from_config(&config).unwrap();
4059
4060 let huge = 1u64 << 63;
4061 use datafusion::logical_expr::{col, in_list};
4062 let filter = in_list(
4063 col("version"),
4064 vec![
4065 Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4066 Expr::Literal(ScalarValue::UInt64(Some(huge)), None),
4067 ],
4068 false,
4069 );
4070
4071 assert!(QueryPredicate::supports_filter(&filter, &model));
4072 let pred = QueryPredicate::from_filters(&[filter], &model);
4073 assert!(matches!(
4074 pred.constraints.get(&0),
4075 Some(PredicateConstraint::UInt64In(v)) if v.contains(&huge) && v.len() == 2
4076 ));
4077 }
4078
4079 #[test]
4080 fn fixed_binary_in_list_pushdown() {
4081 let config = KvTableConfig::new(
4082 0,
4083 vec![
4084 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4085 TableColumnConfig::new("data", DataType::Utf8, true),
4086 ],
4087 vec!["entity".to_string()],
4088 vec![],
4089 )
4090 .unwrap();
4091 let model = TableModel::from_config(&config).unwrap();
4092
4093 use datafusion::logical_expr::{col, in_list};
4094 let filter = in_list(
4095 col("entity"),
4096 vec![
4097 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None),
4098 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xBB; 16])), None),
4099 ],
4100 false,
4101 );
4102
4103 assert!(QueryPredicate::supports_filter(&filter, &model));
4104 let pred = QueryPredicate::from_filters(&[filter], &model);
4105 assert!(
4106 matches!(
4107 pred.constraints.get(&0),
4108 Some(PredicateConstraint::FixedBinaryIn(v)) if v.len() == 2
4109 ),
4110 "should extract FixedBinaryIn with 2 values"
4111 );
4112
4113 let ranges = pred.primary_key_ranges(&model).unwrap();
4115 assert_eq!(ranges.len(), 2, "should produce one range per entity");
4116 }
4117
4118 #[test]
4119 fn decimal256_range_pushdown() {
4120 let config = KvTableConfig::new(
4121 0,
4122 vec![
4123 TableColumnConfig::new("id", DataType::Int64, false),
4124 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4125 ],
4126 vec!["id".to_string()],
4127 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4128 )
4129 .unwrap();
4130 let model = TableModel::from_config(&config).unwrap();
4131
4132 use datafusion::logical_expr::col;
4133 let filter = col("big_val").gt_eq(Expr::Literal(
4134 ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4135 None,
4136 ));
4137
4138 assert!(
4139 QueryPredicate::supports_filter(&filter, &model),
4140 "Decimal256 range should be supported"
4141 );
4142
4143 let pred = QueryPredicate::from_filters(&[filter], &model);
4144 let big_idx = *model.columns_by_name.get("big_val").unwrap();
4145 assert!(
4146 matches!(
4147 pred.constraints.get(&big_idx),
4148 Some(PredicateConstraint::Decimal256Range {
4149 min: Some(_),
4150 max: None
4151 })
4152 ),
4153 "should extract Decimal256Range with min=100, no max"
4154 );
4155
4156 let val_in = CellValue::Decimal256(i256::from(200i64));
4158 let val_out = CellValue::Decimal256(i256::from(50i64));
4159 let constraint = pred.constraints.get(&big_idx).unwrap();
4160 assert!(matches_constraint(&val_in, constraint));
4161 assert!(!matches_constraint(&val_out, constraint));
4162 }
4163
4164 #[test]
4165 fn uint64_constraint_matching_does_not_wrap_large_values() {
4166 let gt_zero = PredicateConstraint::UInt64Range {
4167 min: Some(1),
4168 max: None,
4169 };
4170 assert!(matches_constraint(&CellValue::UInt64(1u64 << 63), >_zero));
4171 assert!(!matches_constraint(&CellValue::UInt64(0), >_zero));
4172
4173 let in_list = PredicateConstraint::UInt64In(vec![1, 2, 3]);
4174 assert!(matches_constraint(&CellValue::UInt64(2), &in_list));
4175 assert!(!matches_constraint(
4176 &CellValue::UInt64(1u64 << 63),
4177 &in_list
4178 ));
4179 }
4180
4181 #[test]
4182 fn uint64_empty_range_produces_no_pk_ranges() {
4183 let config = KvTableConfig::new(
4184 0,
4185 vec![TableColumnConfig::new("version", DataType::UInt64, false)],
4186 vec!["version".to_string()],
4187 vec![],
4188 )
4189 .unwrap();
4190 let model = TableModel::from_config(&config).unwrap();
4191 let mut pred = QueryPredicate::default();
4192 pred.constraints.insert(
4193 0,
4194 PredicateConstraint::UInt64Range {
4195 min: Some(10),
4196 max: Some(9),
4197 },
4198 );
4199
4200 let ranges = pred.primary_key_ranges(&model).unwrap();
4201 assert!(ranges.is_empty());
4202 }
4203
4204 #[test]
4205 fn utf8_primary_key_encoding_supports_unicode_and_long_values() {
4206 let config = KvTableConfig::new(
4207 0,
4208 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4209 vec!["id".to_string()],
4210 vec![],
4211 )
4212 .unwrap();
4213 let model = TableModel::from_config(&config).unwrap();
4214
4215 let row_non_ascii = KvRow {
4216 values: vec![CellValue::Utf8("naive-cafe-e9".replace("e9", "\u{00E9}"))],
4217 };
4218 let key_non_ascii = encode_primary_key_from_row(model.table_prefix, &row_non_ascii, &model)
4219 .expect("non-ascii PK should encode");
4220 let decoded_non_ascii = decode_primary_key(model.table_prefix, &key_non_ascii, &model)
4221 .expect("non-ascii PK should decode");
4222 assert!(matches!(
4223 decoded_non_ascii.as_slice(),
4224 [CellValue::Utf8(value)] if value == "naive-cafe-\u{00E9}"
4225 ));
4226
4227 let row_too_long = KvRow {
4228 values: vec![CellValue::Utf8("abcdefghijklmnopq".to_string())],
4229 };
4230 let key_too_long = encode_primary_key_from_row(model.table_prefix, &row_too_long, &model)
4231 .expect("long UTF-8 PK should encode");
4232 let decoded_too_long = decode_primary_key(model.table_prefix, &key_too_long, &model)
4233 .expect("long UTF-8 PK should decode");
4234 assert!(matches!(
4235 decoded_too_long.as_slice(),
4236 [CellValue::Utf8(value)] if value == "abcdefghijklmnopq"
4237 ));
4238 }
4239
4240 #[test]
4241 fn utf8_primary_key_encodes_at_max_codec_payload_and_rejects_overflow() {
4242 let config = KvTableConfig::new(
4243 0,
4244 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4245 vec!["id".to_string()],
4246 vec![],
4247 )
4248 .unwrap();
4249 let model = TableModel::from_config(&config).unwrap();
4250 let max_payload = model.primary_key_codec.payload_capacity_bytes();
4251 let max_value = "a".repeat(max_payload - 1);
4252 let overflow_value = "a".repeat(max_payload);
4253
4254 let key = encode_primary_key_from_row(
4255 model.table_prefix,
4256 &KvRow {
4257 values: vec![CellValue::Utf8(max_value.clone())],
4258 },
4259 &model,
4260 )
4261 .expect("max-length UTF-8 PK should encode");
4262 assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4263 let decoded = decode_primary_key(model.table_prefix, &key, &model)
4264 .expect("max-length PK should decode");
4265 assert!(matches!(
4266 decoded.as_slice(),
4267 [CellValue::Utf8(value)] if value == &max_value
4268 ));
4269
4270 let err = encode_primary_key_from_row(
4271 model.table_prefix,
4272 &KvRow {
4273 values: vec![CellValue::Utf8(overflow_value)],
4274 },
4275 &model,
4276 )
4277 .expect_err("UTF-8 PK exceeding codec payload should be rejected");
4278 assert!(err.contains("primary key payload exceeds codec payload capacity 253 bytes"));
4279 }
4280
4281 #[test]
4282 fn utf8_primary_key_round_trips_embedded_nul() {
4283 let config = KvTableConfig::new(
4284 0,
4285 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4286 vec!["id".to_string()],
4287 vec![],
4288 )
4289 .unwrap();
4290 let model = TableModel::from_config(&config).unwrap();
4291 let row = KvRow {
4292 values: vec![CellValue::Utf8("AB\0CD".to_string())],
4293 };
4294
4295 let key = encode_primary_key_from_row(model.table_prefix, &row, &model)
4296 .expect("embedded NUL in key text must encode");
4297 let decoded =
4298 decode_primary_key(model.table_prefix, &key, &model).expect("embedded NUL must decode");
4299 assert!(matches!(
4300 decoded.as_slice(),
4301 [CellValue::Utf8(value)] if value == "AB\0CD"
4302 ));
4303 }
4304
4305 #[test]
4306 fn utf8_index_key_round_trips_embedded_nul() {
4307 let config = KvTableConfig::new(
4308 0,
4309 vec![
4310 TableColumnConfig::new("id", DataType::Int64, false),
4311 TableColumnConfig::new("tag", DataType::Utf8, false),
4312 ],
4313 vec!["id".to_string()],
4314 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4315 )
4316 .unwrap();
4317 let model = TableModel::from_config(&config).unwrap();
4318 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4319 let row = KvRow {
4320 values: vec![CellValue::Int64(1), CellValue::Utf8("AB\0CD".to_string())],
4321 };
4322
4323 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
4324 .expect("embedded NUL in index key text must encode");
4325 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
4326 .expect("embedded NUL index key must decode");
4327 assert!(matches!(
4328 decoded.values.get(&1),
4329 Some(CellValue::Utf8(value)) if value == "AB\0CD"
4330 ));
4331 }
4332
4333 #[test]
4334 fn secondary_index_with_long_utf8_primary_key_encodes_at_max_payload_and_rejects_overflow() {
4335 let config = KvTableConfig::new(
4336 0,
4337 vec![
4338 TableColumnConfig::new("id", DataType::Utf8, false),
4339 TableColumnConfig::new("tag", DataType::Utf8, false),
4340 ],
4341 vec!["id".to_string()],
4342 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4343 )
4344 .unwrap();
4345 let model = TableModel::from_config(&config).unwrap();
4346 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4347 let spec = &specs[0];
4348 let max_payload = spec.codec.payload_capacity_bytes();
4349 let max_tag = "t".to_string();
4350 let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4351 let overflow_id = format!("{max_id}x");
4352
4353 let key = encode_secondary_index_key(
4354 model.table_prefix,
4355 spec,
4356 &model,
4357 &KvRow {
4358 values: vec![
4359 CellValue::Utf8(max_id.clone()),
4360 CellValue::Utf8(max_tag.clone()),
4361 ],
4362 },
4363 )
4364 .expect("secondary key at max payload should encode");
4365 assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4366 let decoded =
4367 decode_secondary_index_key(model.table_prefix, spec, &model, &key).expect("decode");
4368 assert!(matches!(
4369 decoded.values.get(&1),
4370 Some(CellValue::Utf8(value)) if value == &max_tag
4371 ));
4372 assert!(matches!(
4373 decoded.primary_key_values.as_slice(),
4374 [CellValue::Utf8(value)] if value == &max_id
4375 ));
4376
4377 let err = encode_secondary_index_key(
4378 model.table_prefix,
4379 spec,
4380 &model,
4381 &KvRow {
4382 values: vec![CellValue::Utf8(overflow_id), CellValue::Utf8(max_tag)],
4383 },
4384 )
4385 .expect_err("secondary key exceeding max payload should be rejected");
4386 assert!(err.contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4387 }
4388
4389 #[test]
4390 fn secondary_index_from_parts_with_long_utf8_primary_key_rejects_overflow() {
4391 let config = KvTableConfig::new(
4392 0,
4393 vec![
4394 TableColumnConfig::new("id", DataType::Utf8, false),
4395 TableColumnConfig::new("tag", DataType::Utf8, false),
4396 ],
4397 vec!["id".to_string()],
4398 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4399 )
4400 .unwrap();
4401 let model = TableModel::from_config(&config).unwrap();
4402 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4403 let spec = &specs[0];
4404 let max_payload = spec.codec.payload_capacity_bytes();
4405 let max_tag = "t".to_string();
4406 let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4407 let overflow_id = format!("{max_id}x");
4408 let max_row = KvRow {
4409 values: vec![
4410 CellValue::Utf8(max_id.clone()),
4411 CellValue::Utf8(max_tag.clone()),
4412 ],
4413 };
4414 let encoded_row = encode_base_row_value(&max_row, &model).expect("encode row");
4415 let archived = decode_stored_row(&encoded_row).expect("archive row");
4416
4417 let key = encode_secondary_index_key_from_parts(
4418 model.table_prefix,
4419 spec,
4420 &model,
4421 &[CellValue::Utf8(max_id.clone())],
4422 &archived,
4423 )
4424 .expect("backfill path should encode max payload");
4425 assert_eq!(key.len(), exoware_sdk_rs::keys::MAX_KEY_LEN);
4426
4427 let err = encode_secondary_index_key_from_parts(
4428 model.table_prefix,
4429 spec,
4430 &model,
4431 &[CellValue::Utf8(overflow_id)],
4432 &archived,
4433 )
4434 .expect_err("backfill path overflow should be rejected");
4435 assert!(err
4436 .to_string()
4437 .contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4438 }
4439
4440 #[test]
4441 fn primary_key_type_mismatch_returns_error_instead_of_panicking() {
4442 let config = KvTableConfig::new(
4443 0,
4444 vec![TableColumnConfig::new("id", DataType::UInt64, false)],
4445 vec!["id".to_string()],
4446 vec![],
4447 )
4448 .unwrap();
4449 let model = TableModel::from_config(&config).unwrap();
4450 let row = KvRow {
4451 values: vec![CellValue::Int64(7)],
4452 };
4453
4454 let err = encode_primary_key_from_row(model.table_prefix, &row, &model)
4455 .expect_err("mismatched PK type should return an error");
4456 assert!(err.contains("type mismatch while encoding key value"));
4457 }
4458
4459 #[test]
4460 fn choose_index_plan_uses_fixed_binary_leading_constraint() {
4461 let config = KvTableConfig::new(
4462 0,
4463 vec![
4464 TableColumnConfig::new("id", DataType::Int64, false),
4465 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4466 ],
4467 vec!["id".to_string()],
4468 vec![IndexSpec::new("entity_idx", vec!["entity".to_string()]).unwrap()],
4469 )
4470 .unwrap();
4471 let model = TableModel::from_config(&config).unwrap();
4472 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4473
4474 use datafusion::logical_expr::col;
4475 let filter = col("entity").eq(Expr::Literal(
4476 ScalarValue::FixedSizeBinary(16, Some(vec![0xAB; 16])),
4477 None,
4478 ));
4479 let pred = QueryPredicate::from_filters(&[filter], &model);
4480 let plan = pred
4481 .choose_index_plan(&model, &specs)
4482 .unwrap()
4483 .expect("fixed-binary equality should choose an index");
4484
4485 assert_eq!(plan.constrained_prefix_len, 1);
4486 assert_eq!(plan.ranges.len(), 1);
4487 let range = &plan.ranges[0];
4488 assert_eq!(
4489 index_payload(&specs[0], &range.start, 0, 16),
4490 vec![0xAB; 16]
4491 );
4492 assert_eq!(index_payload(&specs[0], &range.end, 0, 16), vec![0xAB; 16]);
4493 }
4494
4495 #[test]
4496 fn choose_index_plan_uses_decimal256_leading_constraint() {
4497 let config = KvTableConfig::new(
4498 0,
4499 vec![
4500 TableColumnConfig::new("id", DataType::Int64, false),
4501 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4502 ],
4503 vec!["id".to_string()],
4504 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4505 )
4506 .unwrap();
4507 let model = TableModel::from_config(&config).unwrap();
4508 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4509
4510 use datafusion::logical_expr::col;
4511 let filter = col("big_val").gt_eq(Expr::Literal(
4512 ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4513 None,
4514 ));
4515 let pred = QueryPredicate::from_filters(&[filter], &model);
4516 let plan = pred
4517 .choose_index_plan(&model, &specs)
4518 .unwrap()
4519 .expect("decimal256 range should choose an index");
4520
4521 assert_eq!(plan.constrained_prefix_len, 1);
4522 assert_eq!(plan.ranges.len(), 1);
4523 let range = &plan.ranges[0];
4524 assert_eq!(
4525 index_payload(&specs[0], &range.start, 0, 32),
4526 encode_i256_ordered(i256::from(100i64)).to_vec()
4527 );
4528 }
4529
4530 #[tokio::test]
4531 async fn backfill_added_indexes_writes_entries_for_existing_rows() {
4532 let state = MockState {
4533 kv: Arc::new(Mutex::new(BTreeMap::new())),
4534 range_calls: Arc::new(AtomicUsize::new(0)),
4535 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4536 sequence_number: Arc::new(AtomicU64::new(0)),
4537 };
4538 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4539 let client = StoreClient::new(&base_url);
4540
4541 let seed_schema = KvSchema::new(client.clone())
4542 .table(
4543 "orders",
4544 vec![
4545 TableColumnConfig::new("id", DataType::Int64, false),
4546 TableColumnConfig::new("status", DataType::Utf8, false),
4547 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4548 ],
4549 vec!["id".to_string()],
4550 vec![],
4551 )
4552 .expect("seed schema");
4553 let mut writer = seed_schema.batch_writer();
4554 for i in 0..6i64 {
4555 writer
4556 .insert(
4557 "orders",
4558 vec![
4559 CellValue::Int64(i),
4560 CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
4561 CellValue::Int64(i * 10),
4562 ],
4563 )
4564 .expect("seed row");
4565 }
4566 writer.flush().await.expect("seed flush");
4567
4568 {
4569 let guard = state.kv.lock().expect("kv mutex poisoned");
4570 let base_rows = guard
4571 .keys()
4572 .filter(|key| matches_primary_key(0, key))
4573 .count();
4574 let index_rows = guard
4575 .keys()
4576 .filter(|key| matches_secondary_index_key(0, 1, key))
4577 .count();
4578 assert_eq!(base_rows, 6);
4579 assert_eq!(index_rows, 0);
4580 }
4581
4582 let backfill_schema = KvSchema::new(client.clone())
4583 .table(
4584 "orders",
4585 vec![
4586 TableColumnConfig::new("id", DataType::Int64, false),
4587 TableColumnConfig::new("status", DataType::Utf8, false),
4588 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4589 ],
4590 vec!["id".to_string()],
4591 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
4592 .expect("valid index")
4593 .with_cover_columns(vec!["amount_cents".to_string()])],
4594 )
4595 .expect("backfill schema");
4596 let report = backfill_schema
4597 .backfill_added_indexes_with_options(
4598 "orders",
4599 &[],
4600 IndexBackfillOptions {
4601 row_batch_size: 2,
4602 start_from_primary_key: None,
4603 },
4604 )
4605 .await
4606 .expect("backfill should succeed");
4607 assert_eq!(report.scanned_rows, 6);
4608 assert_eq!(report.indexes_backfilled, 1);
4609 assert_eq!(report.index_entries_written, 6);
4610
4611 {
4612 let guard = state.kv.lock().expect("kv mutex poisoned");
4613 let index_rows = guard
4614 .keys()
4615 .filter(|key| matches_secondary_index_key(0, 1, key))
4616 .count();
4617 assert_eq!(index_rows, 6);
4618 let (_, sample_value) = guard
4619 .iter()
4620 .find(|(key, _)| matches_secondary_index_key(0, 1, key))
4621 .expect("backfill should create index entry");
4622 let archived = decode_stored_row(sample_value.as_ref())
4623 .expect("covering value must be valid codec");
4624 assert_eq!(archived.values.len(), 3);
4625 }
4626
4627 let _ = shutdown_tx.send(());
4628 }
4629
4630 #[tokio::test]
4631 async fn backfill_added_indexes_writes_zorder_entries_for_existing_rows() {
4632 let state = MockState {
4633 kv: Arc::new(Mutex::new(BTreeMap::new())),
4634 range_calls: Arc::new(AtomicUsize::new(0)),
4635 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4636 sequence_number: Arc::new(AtomicU64::new(0)),
4637 };
4638 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4639 let client = StoreClient::new(&base_url);
4640
4641 let seed_schema = KvSchema::new(client.clone())
4642 .table(
4643 "points",
4644 vec![
4645 TableColumnConfig::new("x", DataType::Int64, false),
4646 TableColumnConfig::new("y", DataType::Int64, false),
4647 TableColumnConfig::new("id", DataType::Int64, false),
4648 TableColumnConfig::new("value", DataType::Int64, false),
4649 ],
4650 vec!["id".to_string()],
4651 vec![],
4652 )
4653 .expect("seed schema");
4654 let mut writer = seed_schema.batch_writer();
4655 for (x, y, id, value) in [(1, 1, 11, 110), (1, 2, 12, 120), (2, 1, 21, 210)] {
4656 writer
4657 .insert(
4658 "points",
4659 vec![
4660 CellValue::Int64(x),
4661 CellValue::Int64(y),
4662 CellValue::Int64(id),
4663 CellValue::Int64(value),
4664 ],
4665 )
4666 .expect("seed row");
4667 }
4668 writer.flush().await.expect("seed flush");
4669
4670 let backfill_schema = KvSchema::new(client.clone())
4671 .table(
4672 "points",
4673 vec![
4674 TableColumnConfig::new("x", DataType::Int64, false),
4675 TableColumnConfig::new("y", DataType::Int64, false),
4676 TableColumnConfig::new("id", DataType::Int64, false),
4677 TableColumnConfig::new("value", DataType::Int64, false),
4678 ],
4679 vec!["id".to_string()],
4680 vec![
4681 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
4682 .expect("valid index")
4683 .with_cover_columns(vec!["value".to_string()]),
4684 ],
4685 )
4686 .expect("backfill schema");
4687 let report = backfill_schema
4688 .backfill_added_indexes_with_options(
4689 "points",
4690 &[],
4691 IndexBackfillOptions {
4692 row_batch_size: 2,
4693 start_from_primary_key: None,
4694 },
4695 )
4696 .await
4697 .expect("backfill should succeed");
4698 assert_eq!(report.scanned_rows, 3);
4699 assert_eq!(report.index_entries_written, 3);
4700
4701 let guard = state.kv.lock().expect("kv mutex poisoned");
4702 let index_entry = guard
4703 .keys()
4704 .find(|key| matches_secondary_index_key(0, 1, key))
4705 .cloned()
4706 .expect("z-order backfill should create index entry");
4707 let config = KvTableConfig::new(
4708 0,
4709 vec![
4710 TableColumnConfig::new("x", DataType::Int64, false),
4711 TableColumnConfig::new("y", DataType::Int64, false),
4712 TableColumnConfig::new("id", DataType::Int64, false),
4713 TableColumnConfig::new("value", DataType::Int64, false),
4714 ],
4715 vec!["id".to_string()],
4716 vec![
4717 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()]).expect("valid"),
4718 ],
4719 )
4720 .expect("config");
4721 let model = TableModel::from_config(&config).expect("model");
4722 let spec = model
4723 .resolve_index_specs(&config.index_specs)
4724 .expect("specs")
4725 .remove(0);
4726 let decoded = decode_secondary_index_key(model.table_prefix, &spec, &model, &index_entry)
4727 .expect("decode z-order key");
4728 let x_idx = *model.columns_by_name.get("x").unwrap();
4729 let y_idx = *model.columns_by_name.get("y").unwrap();
4730 assert!(matches!(
4731 decoded.values.get(&x_idx),
4732 Some(CellValue::Int64(_))
4733 ));
4734 assert!(matches!(
4735 decoded.values.get(&y_idx),
4736 Some(CellValue::Int64(_))
4737 ));
4738
4739 let _ = shutdown_tx.send(());
4740 }
4741
4742 #[tokio::test]
4743 async fn backfill_added_indexes_requires_append_only_index_evolution() {
4744 let client = StoreClient::new("http://127.0.0.1:1");
4745 let schema = KvSchema::new(client)
4746 .table(
4747 "orders",
4748 vec![
4749 TableColumnConfig::new("id", DataType::Int64, false),
4750 TableColumnConfig::new("status", DataType::Utf8, false),
4751 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4752 ],
4753 vec!["id".to_string()],
4754 vec![
4755 IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid"),
4756 IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid"),
4757 ],
4758 )
4759 .expect("schema");
4760
4761 let previous_specs =
4762 vec![IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid")];
4763 let err = schema
4764 .backfill_added_indexes("orders", &previous_specs)
4765 .await
4766 .expect_err("non-append-only evolution should be rejected");
4767 assert!(err
4768 .to_string()
4769 .contains("index evolution must be append-only"));
4770 }
4771
4772 #[tokio::test]
4773 async fn backfill_added_indexes_is_noop_when_no_new_indexes() {
4774 let client = StoreClient::new("http://127.0.0.1:1");
4775 let existing = IndexSpec::new("status_idx", vec!["status".to_string()])
4776 .expect("valid")
4777 .with_cover_columns(vec!["amount_cents".to_string()]);
4778 let schema = KvSchema::new(client)
4779 .table(
4780 "orders",
4781 vec![
4782 TableColumnConfig::new("id", DataType::Int64, false),
4783 TableColumnConfig::new("status", DataType::Utf8, false),
4784 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4785 ],
4786 vec!["id".to_string()],
4787 vec![existing.clone()],
4788 )
4789 .expect("schema");
4790
4791 let report = schema
4792 .backfill_added_indexes("orders", &[existing])
4793 .await
4794 .expect("no-op backfill should succeed");
4795 assert_eq!(report, IndexBackfillReport::default());
4796 }
4797
4798 #[tokio::test]
4799 async fn backfill_added_indexes_rejects_zero_row_batch_size() {
4800 let client = StoreClient::new("http://127.0.0.1:1");
4801 let schema = KvSchema::new(client)
4802 .table(
4803 "orders",
4804 vec![
4805 TableColumnConfig::new("id", DataType::Int64, false),
4806 TableColumnConfig::new("status", DataType::Utf8, false),
4807 ],
4808 vec!["id".to_string()],
4809 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4810 )
4811 .expect("schema");
4812 let err = schema
4813 .backfill_added_indexes_with_options(
4814 "orders",
4815 &[],
4816 IndexBackfillOptions {
4817 row_batch_size: 0,
4818 start_from_primary_key: None,
4819 },
4820 )
4821 .await
4822 .expect_err("row_batch_size=0 should fail");
4823 assert!(err.to_string().contains("row_batch_size must be > 0"));
4824 }
4825
4826 #[tokio::test]
4827 async fn backfill_added_indexes_emits_progress_events() {
4828 let state = MockState {
4829 kv: Arc::new(Mutex::new(BTreeMap::new())),
4830 range_calls: Arc::new(AtomicUsize::new(0)),
4831 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4832 sequence_number: Arc::new(AtomicU64::new(0)),
4833 };
4834 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4835 let client = StoreClient::new(&base_url);
4836
4837 let seed_schema = KvSchema::new(client.clone())
4838 .table(
4839 "orders",
4840 vec![
4841 TableColumnConfig::new("id", DataType::Int64, false),
4842 TableColumnConfig::new("status", DataType::Utf8, false),
4843 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4844 ],
4845 vec!["id".to_string()],
4846 vec![],
4847 )
4848 .expect("seed schema");
4849 let mut writer = seed_schema.batch_writer();
4850 for i in 0..5i64 {
4851 writer
4852 .insert(
4853 "orders",
4854 vec![
4855 CellValue::Int64(i),
4856 CellValue::Utf8("open".to_string()),
4857 CellValue::Int64(i * 10),
4858 ],
4859 )
4860 .expect("seed row");
4861 }
4862 writer.flush().await.expect("seed flush");
4863
4864 let backfill_schema = KvSchema::new(client.clone())
4865 .table(
4866 "orders",
4867 vec![
4868 TableColumnConfig::new("id", DataType::Int64, false),
4869 TableColumnConfig::new("status", DataType::Utf8, false),
4870 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4871 ],
4872 vec!["id".to_string()],
4873 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4874 )
4875 .expect("backfill schema");
4876
4877 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
4878 let report = backfill_schema
4879 .backfill_added_indexes_with_options_and_progress(
4880 "orders",
4881 &[],
4882 IndexBackfillOptions {
4883 row_batch_size: 2,
4884 start_from_primary_key: None,
4885 },
4886 Some(&progress_tx),
4887 )
4888 .await
4889 .expect("backfill should succeed");
4890 drop(progress_tx);
4891
4892 let mut saw_started = false;
4893 let mut saw_completed = false;
4894 let mut progress_events = 0usize;
4895 while let Some(event) = progress_rx.recv().await {
4896 match event {
4897 IndexBackfillEvent::Started {
4898 table_name,
4899 indexes_backfilled,
4900 row_batch_size,
4901 ..
4902 } => {
4903 saw_started = true;
4904 assert_eq!(table_name, "orders");
4905 assert_eq!(indexes_backfilled, 1);
4906 assert_eq!(row_batch_size, 2);
4907 }
4908 IndexBackfillEvent::Progress {
4909 scanned_rows,
4910 index_entries_written,
4911 ..
4912 } => {
4913 progress_events += 1;
4914 assert!(scanned_rows >= 1);
4915 assert_eq!(scanned_rows, index_entries_written);
4916 }
4917 IndexBackfillEvent::Completed {
4918 report: completed_report,
4919 } => {
4920 saw_completed = true;
4921 assert_eq!(completed_report, report);
4922 }
4923 }
4924 }
4925 assert!(saw_started);
4926 assert!(saw_completed);
4927 assert!(progress_events >= 1);
4928
4929 let _ = shutdown_tx.send(());
4930 }
4931
4932 #[tokio::test]
4933 async fn backfill_added_indexes_can_resume_from_primary_key() {
4934 let state = MockState {
4935 kv: Arc::new(Mutex::new(BTreeMap::new())),
4936 range_calls: Arc::new(AtomicUsize::new(0)),
4937 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4938 sequence_number: Arc::new(AtomicU64::new(0)),
4939 };
4940 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4941 let client = StoreClient::new(&base_url);
4942
4943 let seed_schema = KvSchema::new(client.clone())
4944 .table(
4945 "orders",
4946 vec![
4947 TableColumnConfig::new("id", DataType::Int64, false),
4948 TableColumnConfig::new("status", DataType::Utf8, false),
4949 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4950 ],
4951 vec!["id".to_string()],
4952 vec![],
4953 )
4954 .expect("seed schema");
4955 let mut writer = seed_schema.batch_writer();
4956 for i in 0..6i64 {
4957 writer
4958 .insert(
4959 "orders",
4960 vec![
4961 CellValue::Int64(i),
4962 CellValue::Utf8("open".to_string()),
4963 CellValue::Int64(i * 10),
4964 ],
4965 )
4966 .expect("seed row");
4967 }
4968 writer.flush().await.expect("seed flush");
4969
4970 let backfill_schema = KvSchema::new(client.clone())
4971 .table(
4972 "orders",
4973 vec![
4974 TableColumnConfig::new("id", DataType::Int64, false),
4975 TableColumnConfig::new("status", DataType::Utf8, false),
4976 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4977 ],
4978 vec!["id".to_string()],
4979 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4980 )
4981 .expect("backfill schema");
4982
4983 let config = KvTableConfig::new(
4984 0,
4985 vec![
4986 TableColumnConfig::new("id", DataType::Int64, false),
4987 TableColumnConfig::new("status", DataType::Utf8, false),
4988 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4989 ],
4990 vec!["id".to_string()],
4991 vec![],
4992 )
4993 .expect("valid config");
4994 let model = TableModel::from_config(&config).expect("model");
4995 let resume_value = CellValue::Int64(3);
4996 let resume_key =
4997 encode_primary_key(model.table_prefix, &[&resume_value], &model).expect("resume key");
4998
4999 let report = backfill_schema
5000 .backfill_added_indexes_with_options(
5001 "orders",
5002 &[],
5003 IndexBackfillOptions {
5004 row_batch_size: 2,
5005 start_from_primary_key: Some(resume_key.clone()),
5006 },
5007 )
5008 .await
5009 .expect("resume backfill should succeed");
5010 assert_eq!(report.scanned_rows, 3);
5011 assert_eq!(report.index_entries_written, 3);
5012
5013 {
5014 let guard = state.kv.lock().expect("kv mutex poisoned");
5015 let index_rows = guard
5016 .keys()
5017 .filter(|key| matches_secondary_index_key(0, 1, key))
5018 .count();
5019 assert_eq!(index_rows, 3);
5020 }
5021
5022 let resume_payload = model
5023 .primary_key_codec
5024 .read_payload(&resume_key, 0, model.primary_key_width)
5025 .expect("resume payload");
5026 let wrong_prefix = secondary_index_codec(model.table_prefix, 1)
5027 .expect("secondary codec")
5028 .encode(&resume_payload)
5029 .expect("wrong prefix key");
5030 let err = backfill_schema
5031 .backfill_added_indexes_with_options(
5032 "orders",
5033 &[],
5034 IndexBackfillOptions {
5035 row_batch_size: 2,
5036 start_from_primary_key: Some(wrong_prefix),
5037 },
5038 )
5039 .await
5040 .expect_err("wrong key prefix must be rejected");
5041 assert!(err.to_string().contains("primary-key prefix"));
5042
5043 let _ = shutdown_tx.send(());
5044 }
5045
5046 #[tokio::test]
5047 async fn covering_index_scan_fails_closed_when_covering_payload_missing() {
5048 let state = MockState {
5049 kv: Arc::new(Mutex::new(BTreeMap::new())),
5050 range_calls: Arc::new(AtomicUsize::new(0)),
5051 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5052 sequence_number: Arc::new(AtomicU64::new(0)),
5053 };
5054 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5055 let client = StoreClient::new(&base_url);
5056
5057 let schema = KvSchema::new(client.clone())
5058 .table(
5059 "orders",
5060 vec![
5061 TableColumnConfig::new("id", DataType::Int64, false),
5062 TableColumnConfig::new("status", DataType::Utf8, false),
5063 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5064 ],
5065 vec!["id".to_string()],
5066 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5067 .expect("valid")
5068 .with_cover_columns(vec!["amount_cents".to_string()])],
5069 )
5070 .expect("schema");
5071 let mut writer = schema.batch_writer();
5072 for id in 0..4i64 {
5073 writer
5074 .insert(
5075 "orders",
5076 vec![
5077 CellValue::Int64(id),
5078 CellValue::Utf8("open".to_string()),
5079 CellValue::Int64(id * 10),
5080 ],
5081 )
5082 .expect("row");
5083 }
5084 writer.flush().await.expect("flush");
5085
5086 {
5087 let mut guard = state.kv.lock().expect("kv mutex poisoned");
5088 let key = guard
5089 .keys()
5090 .find(|key| matches_secondary_index_key(0, 1, key))
5091 .expect("index row should exist")
5092 .clone();
5093 guard.insert(key, Bytes::new());
5094 }
5095
5096 let ctx = SessionContext::new();
5097 schema.register_all(&ctx).expect("register");
5098 let df = ctx
5099 .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5100 .await
5101 .expect("query should plan");
5102 let err = df
5103 .collect()
5104 .await
5105 .expect_err("missing covering payload must fail closed");
5106 assert!(err
5107 .to_string()
5108 .contains("secondary index entry missing covering payload"));
5109
5110 let _ = shutdown_tx.send(());
5111 }
5112
5113 #[tokio::test]
5114 async fn covering_index_scan_fails_closed_when_covering_payload_is_corrupt() {
5115 let state = MockState {
5116 kv: Arc::new(Mutex::new(BTreeMap::new())),
5117 range_calls: Arc::new(AtomicUsize::new(0)),
5118 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5119 sequence_number: Arc::new(AtomicU64::new(0)),
5120 };
5121 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5122 let client = StoreClient::new(&base_url);
5123
5124 let schema = KvSchema::new(client.clone())
5125 .table(
5126 "orders",
5127 vec![
5128 TableColumnConfig::new("id", DataType::Int64, false),
5129 TableColumnConfig::new("status", DataType::Utf8, false),
5130 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5131 ],
5132 vec!["id".to_string()],
5133 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5134 .expect("valid")
5135 .with_cover_columns(vec!["amount_cents".to_string()])],
5136 )
5137 .expect("schema");
5138 let mut writer = schema.batch_writer();
5139 for id in 0..4i64 {
5140 writer
5141 .insert(
5142 "orders",
5143 vec![
5144 CellValue::Int64(id),
5145 CellValue::Utf8("open".to_string()),
5146 CellValue::Int64(id * 10),
5147 ],
5148 )
5149 .expect("row");
5150 }
5151 writer.flush().await.expect("flush");
5152
5153 {
5154 let mut guard = state.kv.lock().expect("kv mutex poisoned");
5155 let key = guard
5156 .keys()
5157 .find(|key| matches_secondary_index_key(0, 1, key))
5158 .expect("index row should exist")
5159 .clone();
5160 guard.insert(key, Bytes::from_static(b"not-codec"));
5161 }
5162
5163 let ctx = SessionContext::new();
5164 schema.register_all(&ctx).expect("register");
5165 let df = ctx
5166 .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5167 .await
5168 .expect("query should plan");
5169 let err = df
5170 .collect()
5171 .await
5172 .expect_err("corrupt covering payload must fail closed");
5173 assert!(err.to_string().contains("invalid covering index payload"));
5174
5175 let _ = shutdown_tx.send(());
5176 }
5177
5178 #[tokio::test]
5179 async fn non_covering_index_uses_point_lookup_instead_of_full_scan() {
5180 let state = MockState {
5181 kv: Arc::new(Mutex::new(BTreeMap::new())),
5182 range_calls: Arc::new(AtomicUsize::new(0)),
5183 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5184 sequence_number: Arc::new(AtomicU64::new(0)),
5185 };
5186 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5187 let client = StoreClient::new(&base_url);
5188
5189 let schema = KvSchema::new(client.clone())
5190 .table(
5191 "orders",
5192 vec![
5193 TableColumnConfig::new("id", DataType::Int64, false),
5194 TableColumnConfig::new("status", DataType::Utf8, false),
5195 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5196 TableColumnConfig::new("notes", DataType::Utf8, true),
5197 ],
5198 vec!["id".to_string()],
5199 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5200 )
5201 .expect("schema");
5202 let mut writer = schema.batch_writer();
5203 writer
5204 .insert(
5205 "orders",
5206 vec![
5207 CellValue::Int64(1),
5208 CellValue::Utf8("open".to_string()),
5209 CellValue::Int64(100),
5210 CellValue::Utf8("first".to_string()),
5211 ],
5212 )
5213 .expect("row");
5214 writer
5215 .insert(
5216 "orders",
5217 vec![
5218 CellValue::Int64(2),
5219 CellValue::Utf8("closed".to_string()),
5220 CellValue::Int64(200),
5221 CellValue::Utf8("second".to_string()),
5222 ],
5223 )
5224 .expect("row");
5225 writer
5226 .insert(
5227 "orders",
5228 vec![
5229 CellValue::Int64(3),
5230 CellValue::Utf8("open".to_string()),
5231 CellValue::Int64(300),
5232 CellValue::Utf8("third".to_string()),
5233 ],
5234 )
5235 .expect("row");
5236 writer.flush().await.expect("flush");
5237
5238 let ctx = SessionContext::new();
5239 schema.register_all(&ctx).expect("register");
5240
5241 let df = ctx
5242 .sql("SELECT id, notes FROM orders WHERE status = 'open' ORDER BY id")
5243 .await
5244 .expect("plan");
5245 let batches = df.collect().await.expect("non-covering index lookup");
5246 let ids: Vec<i64> = batches
5247 .iter()
5248 .flat_map(|b| {
5249 b.column(0)
5250 .as_any()
5251 .downcast_ref::<datafusion::arrow::array::Int64Array>()
5252 .unwrap()
5253 .iter()
5254 .map(|v| v.unwrap())
5255 })
5256 .collect();
5257 let notes: Vec<String> = batches
5258 .iter()
5259 .flat_map(|b| {
5260 b.column(1)
5261 .as_any()
5262 .downcast_ref::<datafusion::arrow::array::StringArray>()
5263 .unwrap()
5264 .iter()
5265 .map(|v| v.unwrap().to_string())
5266 })
5267 .collect();
5268 assert_eq!(ids, vec![1, 3]);
5269 assert_eq!(notes, vec!["first", "third"]);
5270
5271 let _ = shutdown_tx.send(());
5272 }
5273
5274 #[tokio::test]
5275 async fn backfill_resume_cursor_can_continue_without_skips_or_duplicates() {
5276 let state = MockState {
5277 kv: Arc::new(Mutex::new(BTreeMap::new())),
5278 range_calls: Arc::new(AtomicUsize::new(0)),
5279 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5280 sequence_number: Arc::new(AtomicU64::new(0)),
5281 };
5282 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5283 let client = StoreClient::new(&base_url);
5284
5285 let seed_schema = KvSchema::new(client.clone())
5286 .table(
5287 "orders",
5288 vec![
5289 TableColumnConfig::new("id", DataType::Int64, false),
5290 TableColumnConfig::new("status", DataType::Utf8, false),
5291 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5292 ],
5293 vec!["id".to_string()],
5294 vec![],
5295 )
5296 .expect("seed schema");
5297 let mut writer = seed_schema.batch_writer();
5298 for i in 0..8i64 {
5299 writer
5300 .insert(
5301 "orders",
5302 vec![
5303 CellValue::Int64(i),
5304 CellValue::Utf8("open".to_string()),
5305 CellValue::Int64(i * 10),
5306 ],
5307 )
5308 .expect("seed row");
5309 }
5310 writer.flush().await.expect("seed flush");
5311
5312 let backfill_schema = KvSchema::new(client.clone())
5313 .table(
5314 "orders",
5315 vec![
5316 TableColumnConfig::new("id", DataType::Int64, false),
5317 TableColumnConfig::new("status", DataType::Utf8, false),
5318 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5319 ],
5320 vec!["id".to_string()],
5321 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5322 )
5323 .expect("backfill schema");
5324
5325 let task_schema = KvSchema::new(client.clone())
5326 .table(
5327 "orders",
5328 vec![
5329 TableColumnConfig::new("id", DataType::Int64, false),
5330 TableColumnConfig::new("status", DataType::Utf8, false),
5331 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5332 ],
5333 vec!["id".to_string()],
5334 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5335 )
5336 .expect("task schema");
5337 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5338 let handle = tokio::spawn(async move {
5339 task_schema
5340 .backfill_added_indexes_with_options_and_progress(
5341 "orders",
5342 &[],
5343 IndexBackfillOptions {
5344 row_batch_size: 2,
5345 start_from_primary_key: None,
5346 },
5347 Some(&progress_tx),
5348 )
5349 .await
5350 });
5351
5352 let mut resume_cursor = None;
5353 while let Some(event) = progress_rx.recv().await {
5354 if let IndexBackfillEvent::Progress { next_cursor, .. } = event {
5355 resume_cursor = next_cursor;
5356 break;
5357 }
5358 }
5359 handle.abort();
5360 let resume_cursor =
5361 resume_cursor.expect("first progress event should provide resume cursor");
5362
5363 let report = backfill_schema
5364 .backfill_added_indexes_with_options(
5365 "orders",
5366 &[],
5367 IndexBackfillOptions {
5368 row_batch_size: 2,
5369 start_from_primary_key: Some(resume_cursor),
5370 },
5371 )
5372 .await
5373 .expect("resume backfill should succeed");
5374 assert_eq!(report.scanned_rows, 6);
5375
5376 let guard = state.kv.lock().expect("kv mutex poisoned");
5377 let base_rows = guard
5378 .keys()
5379 .filter(|key| matches_primary_key(0, key))
5380 .count();
5381 let index_rows = guard
5382 .keys()
5383 .filter(|key| matches_secondary_index_key(0, 1, key))
5384 .count();
5385 assert_eq!(base_rows, 8);
5386 assert_eq!(
5387 index_rows, 8,
5388 "resume should backfill each row exactly once"
5389 );
5390
5391 let _ = shutdown_tx.send(());
5392 }
5393
5394 #[tokio::test]
5395 async fn concurrent_writes_during_backfill_preserve_index_correctness() {
5396 let state = MockState {
5397 kv: Arc::new(Mutex::new(BTreeMap::new())),
5398 range_calls: Arc::new(AtomicUsize::new(0)),
5399 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5400 sequence_number: Arc::new(AtomicU64::new(0)),
5401 };
5402 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5403 let client = StoreClient::new(&base_url);
5404
5405 let seed_schema = KvSchema::new(client.clone())
5406 .table(
5407 "orders",
5408 vec![
5409 TableColumnConfig::new("id", DataType::Int64, false),
5410 TableColumnConfig::new("status", DataType::Utf8, false),
5411 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5412 ],
5413 vec!["id".to_string()],
5414 vec![],
5415 )
5416 .expect("seed schema");
5417 let mut seed_writer = seed_schema.batch_writer();
5418 for i in 0..40i64 {
5419 seed_writer
5420 .insert(
5421 "orders",
5422 vec![
5423 CellValue::Int64(i),
5424 CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
5425 CellValue::Int64(i * 10),
5426 ],
5427 )
5428 .expect("seed row");
5429 }
5430 seed_writer.flush().await.expect("seed flush");
5431
5432 let backfill_schema = KvSchema::new(client.clone())
5433 .table(
5434 "orders",
5435 vec![
5436 TableColumnConfig::new("id", DataType::Int64, false),
5437 TableColumnConfig::new("status", DataType::Utf8, false),
5438 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5439 ],
5440 vec!["id".to_string()],
5441 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5442 .expect("valid")
5443 .with_cover_columns(vec!["amount_cents".to_string()])],
5444 )
5445 .expect("backfill schema");
5446
5447 let task_schema = KvSchema::new(client.clone())
5448 .table(
5449 "orders",
5450 vec![
5451 TableColumnConfig::new("id", DataType::Int64, false),
5452 TableColumnConfig::new("status", DataType::Utf8, false),
5453 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5454 ],
5455 vec!["id".to_string()],
5456 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5457 .expect("valid")
5458 .with_cover_columns(vec!["amount_cents".to_string()])],
5459 )
5460 .expect("task schema");
5461 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5462 let handle = tokio::spawn(async move {
5463 task_schema
5464 .backfill_added_indexes_with_options_and_progress(
5465 "orders",
5466 &[],
5467 IndexBackfillOptions {
5468 row_batch_size: 5,
5469 start_from_primary_key: None,
5470 },
5471 Some(&progress_tx),
5472 )
5473 .await
5474 });
5475
5476 while let Some(event) = progress_rx.recv().await {
5477 if matches!(event, IndexBackfillEvent::Progress { .. }) {
5478 break;
5479 }
5480 }
5481
5482 let mut concurrent_writer = backfill_schema.batch_writer();
5483 for id in [100i64, 101i64] {
5484 concurrent_writer
5485 .insert(
5486 "orders",
5487 vec![
5488 CellValue::Int64(id),
5489 CellValue::Utf8("open".to_string()),
5490 CellValue::Int64(id * 10),
5491 ],
5492 )
5493 .expect("concurrent row");
5494 }
5495 concurrent_writer.flush().await.expect("concurrent flush");
5496
5497 let report = handle
5498 .await
5499 .expect("backfill task join")
5500 .expect("backfill result");
5501 assert!(
5502 report.scanned_rows >= 40,
5503 "backfill should at least scan the original historical rows"
5504 );
5505
5506 let guard = state.kv.lock().expect("kv mutex poisoned");
5507 let base_rows = guard
5508 .keys()
5509 .filter(|key| matches_primary_key(0, key))
5510 .count();
5511 let index_rows = guard
5512 .keys()
5513 .filter(|key| matches_secondary_index_key(0, 1, key))
5514 .count();
5515 assert_eq!(base_rows, 42);
5516 assert_eq!(
5517 index_rows, 42,
5518 "historical backfill plus concurrent indexed writes should leave one index row per base row"
5519 );
5520
5521 let _ = shutdown_tx.send(());
5522 }
5523
5524 #[derive(Clone)]
5525 struct DeferredChunkRangeHarness {
5526 first_chunk_sent: Arc<Notify>,
5527 release_second_chunk: Arc<Notify>,
5528 first_frame: ProtoRangeFrame,
5529 second_frame: ProtoRangeFrame,
5530 }
5531
5532 impl QueryService for DeferredChunkRangeHarness {
5533 async fn get(
5534 &self,
5535 _ctx: Context,
5536 _request: buffa::view::OwnedView<
5537 exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5538 >,
5539 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5540 Err(ConnectError::unimplemented("test harness"))
5541 }
5542
5543 async fn get_many(
5544 &self,
5545 _ctx: Context,
5546 _request: buffa::view::OwnedView<
5547 exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5548 >,
5549 ) -> Result<
5550 (
5551 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5552 Context,
5553 ),
5554 ConnectError,
5555 > {
5556 Err(ConnectError::unimplemented("test harness"))
5557 }
5558
5559 async fn range(
5560 &self,
5561 _ctx: Context,
5562 _request: buffa::view::OwnedView<
5563 exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5564 >,
5565 ) -> Result<
5566 (
5567 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5568 Context,
5569 ),
5570 ConnectError,
5571 > {
5572 let first_chunk_sent = self.first_chunk_sent.clone();
5573 let release_second_chunk = self.release_second_chunk.clone();
5574 let first_frame = self.first_frame.clone();
5575 let second_frame = self.second_frame.clone();
5576 let stream = stream::try_unfold(0u8, move |state| {
5577 let first_chunk_sent = first_chunk_sent.clone();
5578 let release_second_chunk = release_second_chunk.clone();
5579 let first_frame = first_frame.clone();
5580 let second_frame = second_frame.clone();
5581 async move {
5582 match state {
5583 0 => {
5584 first_chunk_sent.notify_one();
5585 Ok(Some((first_frame, 1)))
5586 }
5587 1 => {
5588 release_second_chunk.notified().await;
5589 Ok(Some((second_frame, 2)))
5590 }
5591 _ => Ok(None),
5592 }
5593 }
5594 });
5595 Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5596 }
5597
5598 async fn reduce(
5599 &self,
5600 _ctx: Context,
5601 _request: buffa::view::OwnedView<
5602 exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5603 >,
5604 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5605 Err(ConnectError::unimplemented("test harness"))
5606 }
5607 }
5608
5609 #[derive(Clone)]
5610 struct ObservedLimitRangeHarness {
5611 release_second_chunk: Arc<Notify>,
5612 observed_limit: Arc<AtomicUsize>,
5613 first_frame: ProtoRangeFrame,
5614 second_frame: ProtoRangeFrame,
5615 }
5616
5617 impl QueryService for ObservedLimitRangeHarness {
5618 async fn get(
5619 &self,
5620 _ctx: Context,
5621 _request: buffa::view::OwnedView<
5622 exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5623 >,
5624 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5625 Err(ConnectError::unimplemented("test harness"))
5626 }
5627
5628 async fn get_many(
5629 &self,
5630 _ctx: Context,
5631 _request: buffa::view::OwnedView<
5632 exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5633 >,
5634 ) -> Result<
5635 (
5636 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5637 Context,
5638 ),
5639 ConnectError,
5640 > {
5641 Err(ConnectError::unimplemented("test harness"))
5642 }
5643
5644 async fn range(
5645 &self,
5646 _ctx: Context,
5647 request: buffa::view::OwnedView<
5648 exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5649 >,
5650 ) -> Result<
5651 (
5652 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5653 Context,
5654 ),
5655 ConnectError,
5656 > {
5657 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
5658 self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5659 let release_second_chunk = self.release_second_chunk.clone();
5660 let first_frame = self.first_frame.clone();
5661 let second_frame = self.second_frame.clone();
5662 let stream = stream::try_unfold(0u8, move |state| {
5663 let release_second_chunk = release_second_chunk.clone();
5664 let first_frame = first_frame.clone();
5665 let second_frame = second_frame.clone();
5666 async move {
5667 match state {
5668 0 => Ok(Some((first_frame, 1))),
5669 1 => {
5670 if limit > 1 {
5671 release_second_chunk.notified().await;
5672 Ok(Some((second_frame, 2)))
5673 } else {
5674 Ok(None)
5675 }
5676 }
5677 2 => Ok(None),
5678 _ => Ok(None),
5679 }
5680 }
5681 });
5682 Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5683 }
5684
5685 async fn reduce(
5686 &self,
5687 _ctx: Context,
5688 _request: buffa::view::OwnedView<
5689 exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5690 >,
5691 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5692 Err(ConnectError::unimplemented("test harness"))
5693 }
5694 }
5695
5696 #[derive(Clone)]
5697 struct ObservedLimitIndexRangeHarness {
5698 observed_limit: Arc<AtomicUsize>,
5699 entries_frame: ProtoRangeFrame,
5700 }
5701
5702 impl QueryService for ObservedLimitIndexRangeHarness {
5703 async fn get(
5704 &self,
5705 _ctx: Context,
5706 _request: buffa::view::OwnedView<
5707 exoware_sdk_rs::store::query::v1::GetRequestView<'static>,
5708 >,
5709 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5710 Err(ConnectError::unimplemented("test harness"))
5711 }
5712
5713 async fn get_many(
5714 &self,
5715 _ctx: Context,
5716 _request: buffa::view::OwnedView<
5717 exoware_sdk_rs::store::query::v1::GetManyRequestView<'static>,
5718 >,
5719 ) -> Result<
5720 (
5721 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5722 Context,
5723 ),
5724 ConnectError,
5725 > {
5726 Err(ConnectError::unimplemented("test harness"))
5727 }
5728
5729 async fn range(
5730 &self,
5731 _ctx: Context,
5732 request: buffa::view::OwnedView<
5733 exoware_sdk_rs::store::query::v1::RangeRequestView<'static>,
5734 >,
5735 ) -> Result<
5736 (
5737 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5738 Context,
5739 ),
5740 ConnectError,
5741 > {
5742 let limit = request
5743 .limit
5744 .map(|v| {
5745 if v == u32::MAX {
5746 usize::MAX
5747 } else {
5748 v as usize
5749 }
5750 })
5751 .unwrap_or(usize::MAX);
5752 self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5753 let entries_frame = self.entries_frame.clone();
5754 Ok((
5755 Box::pin(stream::iter(vec![Ok(entries_frame)])),
5756 query_detail_trailer_ctx(7),
5757 ))
5758 }
5759
5760 async fn reduce(
5761 &self,
5762 _ctx: Context,
5763 _request: buffa::view::OwnedView<
5764 exoware_sdk_rs::store::query::v1::ReduceRequestView<'static>,
5765 >,
5766 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5767 Err(ConnectError::unimplemented("test harness"))
5768 }
5769 }
5770
5771 #[tokio::test]
5772 async fn kv_scan_streaming_range_reads_emit_first_batch_before_full_range_completes() {
5773 let model = Arc::new(simple_int64_model(0));
5774 let first_chunk_sent = Arc::new(Notify::new());
5775 let release_second_chunk = Arc::new(Notify::new());
5776
5777 let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5778
5779 let first_results = {
5780 let mut results = Vec::with_capacity(BATCH_FLUSH_ROWS);
5781 for id in 0..BATCH_FLUSH_ROWS {
5782 let key =
5783 encode_primary_key(model.table_prefix, &[&CellValue::Int64(id as i64)], &model)
5784 .expect("primary key");
5785 results.push((key, encoded_row.clone()));
5786 }
5787 results
5788 };
5789 let first_frame = proto_range_entries_frame(first_results);
5790
5791 let second_results = {
5792 let key = encode_primary_key(
5793 model.table_prefix,
5794 &[&CellValue::Int64(BATCH_FLUSH_ROWS as i64)],
5795 &model,
5796 )
5797 .expect("primary key");
5798 vec![(key, encoded_row)]
5799 };
5800 let second_frame = proto_range_entries_frame(second_results);
5801
5802 let harness = DeferredChunkRangeHarness {
5803 first_chunk_sent: first_chunk_sent.clone(),
5804 release_second_chunk: release_second_chunk.clone(),
5805 first_frame,
5806 second_frame,
5807 };
5808 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5809 .with_compression(connect_compression_registry());
5810 let app = Router::new().fallback_service(connect);
5811
5812 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5813 .await
5814 .expect("bind test listener");
5815 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5816 tokio::spawn(async move {
5817 axum::serve(listener, app).await.expect("serve test app");
5818 });
5819
5820 let client = StoreClient::new(&url);
5821 let scan = KvScanExec::new(
5822 client,
5823 model.clone(),
5824 Arc::new(Vec::new()),
5825 QueryPredicate::default(),
5826 None,
5827 model.schema.clone(),
5828 None,
5829 );
5830
5831 let session_ctx = SessionContext::new();
5832 let mut stream = scan
5833 .execute(0, session_ctx.task_ctx())
5834 .expect("scan execute should start");
5835
5836 tokio::time::timeout(Duration::from_secs(1), first_chunk_sent.notified())
5837 .await
5838 .expect("server should send first range frame");
5839 let first_batch = tokio::time::timeout(Duration::from_millis(200), stream.try_next())
5840 .await
5841 .expect("first record batch should arrive before the second stream chunk is released")
5842 .expect("stream poll should succeed")
5843 .expect("expected first record batch");
5844 assert_eq!(first_batch.num_rows(), BATCH_FLUSH_ROWS);
5845
5846 release_second_chunk.notify_one();
5847
5848 let second_batch = stream
5849 .try_next()
5850 .await
5851 .expect("second poll should succeed")
5852 .expect("expected second record batch");
5853 assert_eq!(second_batch.num_rows(), 1);
5854 assert!(
5855 stream
5856 .try_next()
5857 .await
5858 .expect("stream completion poll")
5859 .is_none(),
5860 "stream should finish after the second batch"
5861 );
5862 }
5863
5864 #[tokio::test]
5865 async fn kv_scan_sql_limit_is_pushed_upstream_on_exact_streaming_scan() {
5866 let release_second_chunk = Arc::new(Notify::new());
5867 let observed_limit = Arc::new(AtomicUsize::new(0));
5868 let model = simple_int64_model(0);
5869
5870 let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5871
5872 let first_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(1)], &model)
5873 .expect("first primary key");
5874 let second_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(2)], &model)
5875 .expect("second primary key");
5876
5877 let first_frame = proto_range_entries_frame(vec![(first_key, encoded_row.clone())]);
5878 let second_frame = proto_range_entries_frame(vec![(second_key, encoded_row)]);
5879
5880 let harness = ObservedLimitRangeHarness {
5881 release_second_chunk: release_second_chunk.clone(),
5882 observed_limit: observed_limit.clone(),
5883 first_frame,
5884 second_frame,
5885 };
5886 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5887 .with_compression(connect_compression_registry());
5888 let app = Router::new().fallback_service(connect);
5889
5890 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5891 .await
5892 .expect("bind test listener");
5893 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5894 tokio::spawn(async move {
5895 axum::serve(listener, app).await.expect("serve test app");
5896 });
5897
5898 let client = StoreClient::new(&url);
5899 let schema = KvSchema::new(client)
5900 .table(
5901 "items",
5902 vec![TableColumnConfig::new("id", DataType::Int64, false)],
5903 vec!["id".to_string()],
5904 vec![],
5905 )
5906 .expect("schema");
5907 let ctx = SessionContext::new();
5908 schema.register_all(&ctx).expect("register");
5909
5910 let batches = tokio::time::timeout(Duration::from_millis(200), async {
5911 ctx.sql("SELECT id FROM items LIMIT 1")
5912 .await
5913 .expect("query")
5914 .collect()
5915 .await
5916 .expect("collect")
5917 })
5918 .await
5919 .expect("query with LIMIT 1 should finish without waiting for a delayed second chunk");
5920
5921 assert_eq!(
5922 batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
5923 1
5924 );
5925 assert_eq!(
5926 observed_limit.load(AtomicOrdering::SeqCst),
5927 1,
5928 "exact streaming scan should push SQL LIMIT upstream"
5929 );
5930 release_second_chunk.notify_one();
5931 }
5932
5933 #[tokio::test]
5934 async fn kv_scan_index_limit_does_not_push_upstream_when_seen_dedup_can_drop_entries() {
5935 let observed_limit = Arc::new(AtomicUsize::new(0));
5936 let config = KvTableConfig::new(
5937 0,
5938 vec![
5939 TableColumnConfig::new("id", DataType::Int64, false),
5940 TableColumnConfig::new("status", DataType::Utf8, false),
5941 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5942 ],
5943 vec!["id".to_string()],
5944 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5945 .expect("valid")
5946 .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
5947 )
5948 .expect("config");
5949 let model = TableModel::from_config(&config).expect("model");
5950 let spec = model
5951 .resolve_index_specs(&config.index_specs)
5952 .expect("specs")
5953 .into_iter()
5954 .next()
5955 .expect("status index spec");
5956 let stale_row = KvRow {
5957 values: vec![
5958 CellValue::Int64(7),
5959 CellValue::Utf8("closed".to_string()),
5960 CellValue::Int64(10),
5961 ],
5962 };
5963 let current_row = KvRow {
5964 values: vec![
5965 CellValue::Int64(7),
5966 CellValue::Utf8("open".to_string()),
5967 CellValue::Int64(10),
5968 ],
5969 };
5970 let unique_row = KvRow {
5971 values: vec![
5972 CellValue::Int64(8),
5973 CellValue::Utf8("open".to_string()),
5974 CellValue::Int64(20),
5975 ],
5976 };
5977 let stale_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &stale_row)
5978 .expect("stale index key");
5979 let current_key =
5980 encode_secondary_index_key(model.table_prefix, &spec, &model, ¤t_row)
5981 .expect("current index key");
5982 let unique_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &unique_row)
5983 .expect("unique index key");
5984 let stale_payload =
5985 encode_secondary_index_value(&stale_row, &model, &spec).expect("stale payload");
5986 let current_payload =
5987 encode_secondary_index_value(¤t_row, &model, &spec).expect("current payload");
5988 let unique_payload =
5989 encode_secondary_index_value(&unique_row, &model, &spec).expect("unique payload");
5990
5991 let entries_frame = proto_range_entries_frame(vec![
5992 (stale_key, stale_payload),
5993 (current_key, current_payload),
5994 (unique_key, unique_payload),
5995 ]);
5996 let harness = ObservedLimitIndexRangeHarness {
5997 observed_limit: observed_limit.clone(),
5998 entries_frame,
5999 };
6000 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
6001 .with_compression(connect_compression_registry());
6002 let app = Router::new().fallback_service(connect);
6003
6004 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
6005 .await
6006 .expect("bind test listener");
6007 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
6008 tokio::spawn(async move {
6009 axum::serve(listener, app).await.expect("serve test app");
6010 });
6011
6012 let client = StoreClient::new(&url);
6013 let schema = KvSchema::new(client)
6014 .table(
6015 "orders",
6016 vec![
6017 TableColumnConfig::new("id", DataType::Int64, false),
6018 TableColumnConfig::new("status", DataType::Utf8, false),
6019 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6020 ],
6021 vec!["id".to_string()],
6022 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6023 .expect("valid")
6024 .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
6025 )
6026 .expect("schema");
6027 let ctx = SessionContext::new();
6028 schema.register_all(&ctx).expect("register");
6029
6030 let batches = ctx
6031 .sql(
6032 "SELECT id, amount_cents \
6033 FROM orders \
6034 WHERE status IN ('open', 'closed') \
6035 LIMIT 2",
6036 )
6037 .await
6038 .expect("query")
6039 .collect()
6040 .await
6041 .expect("collect");
6042
6043 assert_eq!(
6044 batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
6045 2
6046 );
6047 assert_eq!(
6048 observed_limit.load(AtomicOrdering::SeqCst),
6049 usize::MAX,
6050 "index streaming scans should not push SQL LIMIT upstream while seen-dedup can drop duplicate primary keys"
6051 );
6052 }
6053
6054 #[tokio::test]
6055 async fn zorder_covering_index_scan_filters_false_positive_morton_span_rows() {
6056 let state = MockState {
6057 kv: Arc::new(Mutex::new(BTreeMap::new())),
6058 range_calls: Arc::new(AtomicUsize::new(0)),
6059 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6060 sequence_number: Arc::new(AtomicU64::new(0)),
6061 };
6062 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
6063 let client = StoreClient::new(&base_url);
6064
6065 let schema = KvSchema::new(client)
6066 .table(
6067 "points",
6068 vec![
6069 TableColumnConfig::new("x", DataType::Int64, false),
6070 TableColumnConfig::new("y", DataType::Int64, false),
6071 TableColumnConfig::new("id", DataType::Int64, false),
6072 TableColumnConfig::new("value", DataType::Int64, false),
6073 ],
6074 vec!["id".to_string()],
6075 vec![
6076 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6077 .expect("valid")
6078 .with_cover_columns(vec!["value".to_string()]),
6079 ],
6080 )
6081 .expect("schema");
6082
6083 let mut writer = schema.batch_writer();
6084 for (x, y, id, value) in [
6085 (0, 2, 2, 20),
6086 (1, 1, 11, 110),
6087 (1, 2, 12, 120),
6088 (2, 1, 21, 210),
6089 (2, 2, 22, 220),
6090 (3, 0, 30, 300),
6091 ] {
6092 writer
6093 .insert(
6094 "points",
6095 vec![
6096 CellValue::Int64(x),
6097 CellValue::Int64(y),
6098 CellValue::Int64(id),
6099 CellValue::Int64(value),
6100 ],
6101 )
6102 .expect("row");
6103 }
6104 writer.flush().await.expect("flush");
6105
6106 let ctx = SessionContext::new();
6107 schema.register_all(&ctx).expect("register");
6108
6109 let batches = ctx
6110 .sql(
6111 "SELECT id, value FROM points \
6112 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2 \
6113 ORDER BY id",
6114 )
6115 .await
6116 .expect("query")
6117 .collect()
6118 .await
6119 .expect("collect");
6120
6121 let mut rows = Vec::new();
6122 for batch in &batches {
6123 let ids = batch
6124 .column(0)
6125 .as_any()
6126 .downcast_ref::<Int64Array>()
6127 .expect("id int64");
6128 let values = batch
6129 .column(1)
6130 .as_any()
6131 .downcast_ref::<Int64Array>()
6132 .expect("value int64");
6133 for row_idx in 0..batch.num_rows() {
6134 rows.push((ids.value(row_idx), values.value(row_idx)));
6135 }
6136 }
6137 assert_eq!(rows, vec![(11, 110), (12, 120), (21, 210), (22, 220)]);
6138
6139 let _ = shutdown_tx.send(());
6140 }
6141
6142 #[tokio::test]
6143 async fn aggregate_pushdown_uses_range_reduce_for_supported_global_aggregates() {
6144 let state = MockState {
6145 kv: Arc::new(Mutex::new(BTreeMap::new())),
6146 range_calls: Arc::new(AtomicUsize::new(0)),
6147 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6148 sequence_number: Arc::new(AtomicU64::new(0)),
6149 };
6150 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6151 let client = StoreClient::new(&base_url);
6152
6153 let schema = KvSchema::new(client)
6154 .table(
6155 "orders",
6156 vec![
6157 TableColumnConfig::new("id", DataType::Int64, false),
6158 TableColumnConfig::new("status", DataType::Utf8, false),
6159 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6160 ],
6161 vec!["id".to_string()],
6162 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6163 .expect("valid")
6164 .with_cover_columns(vec!["amount_cents".to_string()])],
6165 )
6166 .expect("schema");
6167
6168 let mut writer = schema.batch_writer();
6169 for (id, status, amount) in [
6170 (1, "open", 10),
6171 (2, "closed", 15),
6172 (3, "open", 30),
6173 (4, "closed", 40),
6174 ] {
6175 writer
6176 .insert(
6177 "orders",
6178 vec![
6179 CellValue::Int64(id),
6180 CellValue::Utf8(status.to_string()),
6181 CellValue::Int64(amount),
6182 ],
6183 )
6184 .expect("row");
6185 }
6186 writer.flush().await.expect("flush");
6187
6188 let ctx = SessionContext::new();
6189 schema.register_all(&ctx).expect("register");
6190
6191 state.range_calls.store(0, AtomicOrdering::SeqCst);
6192 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6193
6194 let df = ctx
6195 .sql(
6196 "SELECT COUNT(*) AS row_count, SUM(amount_cents) AS total_cents, \
6197 AVG(amount_cents) AS avg_cents \
6198 FROM orders WHERE status = 'open'",
6199 )
6200 .await
6201 .expect("query");
6202 let batches = df.collect().await.expect("collect");
6203
6204 assert_eq!(batches.len(), 1);
6205 let batch = &batches[0];
6206 let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6207 let total = batch
6208 .column(1)
6209 .as_any()
6210 .downcast_ref::<Int64Array>()
6211 .expect("sum int64")
6212 .value(0);
6213 let avg = batch
6214 .column(2)
6215 .as_any()
6216 .downcast_ref::<Float64Array>()
6217 .expect("avg float64")
6218 .value(0);
6219 assert!(matches!(
6220 row_count,
6221 ScalarValue::UInt64(Some(2)) | ScalarValue::Int64(Some(2))
6222 ));
6223 assert_eq!(total, 40);
6224 assert_eq!(avg, 20.0);
6225 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6226 assert!(
6227 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6228 "supported aggregate should use range reduction path"
6229 );
6230
6231 let _ = shutdown_tx.send(());
6232 }
6233
6234 #[tokio::test]
6236 async fn primary_key_inclusive_upper_bound_streaming_scan_uses_range() {
6237 let state = MockState {
6238 kv: Arc::new(Mutex::new(BTreeMap::new())),
6239 range_calls: Arc::new(AtomicUsize::new(0)),
6240 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6241 sequence_number: Arc::new(AtomicU64::new(0)),
6242 };
6243 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6244 let client = StoreClient::new(&base_url);
6245
6246 let schema = KvSchema::new(client)
6247 .table(
6248 "inc_pk",
6249 vec![
6250 TableColumnConfig::new("id", DataType::Int64, false),
6251 TableColumnConfig::new("amount", DataType::Int64, false),
6252 ],
6253 vec!["id".to_string()],
6254 vec![],
6255 )
6256 .expect("schema");
6257
6258 let mut writer = schema.batch_writer();
6259 for id in 1i64..=5i64 {
6260 writer
6261 .insert(
6262 "inc_pk",
6263 vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6264 )
6265 .expect("row");
6266 }
6267 writer.flush().await.expect("flush");
6268
6269 let ctx = SessionContext::new();
6270 schema.register_all(&ctx).expect("register");
6271
6272 state.range_calls.store(0, AtomicOrdering::SeqCst);
6273 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6274
6275 let batches = ctx
6276 .sql("SELECT id FROM inc_pk WHERE id <= 3 ORDER BY id")
6277 .await
6278 .expect("lte query")
6279 .collect()
6280 .await
6281 .expect("collect");
6282 let mut ids = Vec::new();
6283 for batch in &batches {
6284 let col = batch
6285 .column(0)
6286 .as_any()
6287 .downcast_ref::<Int64Array>()
6288 .expect("id");
6289 for i in 0..batch.num_rows() {
6290 ids.push(col.value(i));
6291 }
6292 }
6293 assert_eq!(ids, vec![1, 2, 3], "id <= 3 must include id 3");
6294 assert!(
6295 state.range_calls.load(AtomicOrdering::SeqCst) >= 1,
6296 "PK bounded scan should call range"
6297 );
6298 assert_eq!(
6299 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6300 0,
6301 "streaming scan must not use range_reduce"
6302 );
6303
6304 state.range_calls.store(0, AtomicOrdering::SeqCst);
6305 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6306
6307 let batches = ctx
6308 .sql("SELECT id FROM inc_pk WHERE id BETWEEN 2 AND 4 ORDER BY id")
6309 .await
6310 .expect("between query")
6311 .collect()
6312 .await
6313 .expect("collect");
6314 ids.clear();
6315 for batch in &batches {
6316 let col = batch
6317 .column(0)
6318 .as_any()
6319 .downcast_ref::<Int64Array>()
6320 .expect("id");
6321 for i in 0..batch.num_rows() {
6322 ids.push(col.value(i));
6323 }
6324 }
6325 assert_eq!(ids, vec![2, 3, 4], "BETWEEN must include both endpoints");
6326 assert!(state.range_calls.load(AtomicOrdering::SeqCst) >= 1);
6327 assert_eq!(state.range_reduce_calls.load(AtomicOrdering::SeqCst), 0);
6328
6329 let _ = shutdown_tx.send(());
6330 }
6331
6332 #[tokio::test]
6333 async fn primary_key_inclusive_upper_bound_scalar_aggregates_use_range_reduce() {
6334 let state = MockState {
6335 kv: Arc::new(Mutex::new(BTreeMap::new())),
6336 range_calls: Arc::new(AtomicUsize::new(0)),
6337 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6338 sequence_number: Arc::new(AtomicU64::new(0)),
6339 };
6340 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6341 let client = StoreClient::new(&base_url);
6342
6343 let schema = KvSchema::new(client)
6344 .table(
6345 "inc_pk",
6346 vec![
6347 TableColumnConfig::new("id", DataType::Int64, false),
6348 TableColumnConfig::new("amount", DataType::Int64, false),
6349 ],
6350 vec!["id".to_string()],
6351 vec![],
6352 )
6353 .expect("schema");
6354
6355 let mut writer = schema.batch_writer();
6356 for id in 1i64..=5i64 {
6357 writer
6358 .insert(
6359 "inc_pk",
6360 vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6361 )
6362 .expect("row");
6363 }
6364 writer.flush().await.expect("flush");
6365
6366 let ctx = SessionContext::new();
6367 schema.register_all(&ctx).expect("register");
6368
6369 state.range_calls.store(0, AtomicOrdering::SeqCst);
6370 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6371
6372 let batches = ctx
6373 .sql("SELECT COUNT(*) AS c, SUM(amount) AS s FROM inc_pk WHERE id <= 3")
6374 .await
6375 .expect("lte agg")
6376 .collect()
6377 .await
6378 .expect("collect");
6379 assert_eq!(batches.len(), 1);
6380 let batch = &batches[0];
6381 let c = ScalarValue::try_from_array(batch.column(0), 0).expect("count");
6382 assert!(
6383 matches!(
6384 c,
6385 ScalarValue::UInt64(Some(3)) | ScalarValue::Int64(Some(3))
6386 ),
6387 "count should include id=3"
6388 );
6389 assert_eq!(
6390 batch
6391 .column(1)
6392 .as_any()
6393 .downcast_ref::<Int64Array>()
6394 .expect("sum")
6395 .value(0),
6396 100 + 200 + 300
6397 );
6398 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6399 assert!(
6400 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6401 "scalar aggregate on PK range should use range_reduce"
6402 );
6403
6404 state.range_calls.store(0, AtomicOrdering::SeqCst);
6405 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6406
6407 let batches = ctx
6408 .sql("SELECT SUM(amount) AS s FROM inc_pk WHERE id BETWEEN 2 AND 4")
6409 .await
6410 .expect("between agg")
6411 .collect()
6412 .await
6413 .expect("collect");
6414 assert_eq!(batches.len(), 1);
6415 let batch = &batches[0];
6416 assert_eq!(
6417 batch
6418 .column(0)
6419 .as_any()
6420 .downcast_ref::<Int64Array>()
6421 .expect("sum")
6422 .value(0),
6423 200 + 300 + 400
6424 );
6425 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6426 assert!(
6427 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6428 "BETWEEN aggregate should use range_reduce"
6429 );
6430
6431 let _ = shutdown_tx.send(());
6432 }
6433
6434 #[tokio::test]
6435 async fn aggregate_pushdown_uses_zorder_index_with_worker_filter() {
6436 let state = MockState {
6437 kv: Arc::new(Mutex::new(BTreeMap::new())),
6438 range_calls: Arc::new(AtomicUsize::new(0)),
6439 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6440 sequence_number: Arc::new(AtomicU64::new(0)),
6441 };
6442 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6443 let client = StoreClient::new(&base_url);
6444
6445 let schema = KvSchema::new(client)
6446 .table(
6447 "points",
6448 vec![
6449 TableColumnConfig::new("x", DataType::Int64, false),
6450 TableColumnConfig::new("y", DataType::Int64, false),
6451 TableColumnConfig::new("id", DataType::Int64, false),
6452 TableColumnConfig::new("value", DataType::Int64, false),
6453 ],
6454 vec!["id".to_string()],
6455 vec![
6456 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6457 .expect("valid")
6458 .with_cover_columns(vec!["value".to_string()]),
6459 ],
6460 )
6461 .expect("schema");
6462
6463 let mut writer = schema.batch_writer();
6464 for (x, y, id, value) in [
6465 (0, 2, 2, 20),
6466 (1, 1, 11, 110),
6467 (1, 2, 12, 120),
6468 (2, 1, 21, 210),
6469 (2, 2, 22, 220),
6470 (3, 0, 30, 300),
6471 ] {
6472 writer
6473 .insert(
6474 "points",
6475 vec![
6476 CellValue::Int64(x),
6477 CellValue::Int64(y),
6478 CellValue::Int64(id),
6479 CellValue::Int64(value),
6480 ],
6481 )
6482 .expect("row");
6483 }
6484 writer.flush().await.expect("flush");
6485
6486 let ctx = SessionContext::new();
6487 schema.register_all(&ctx).expect("register");
6488
6489 state.range_calls.store(0, AtomicOrdering::SeqCst);
6490 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6491
6492 let batches = ctx
6493 .sql(
6494 "SELECT COUNT(*) AS row_count, SUM(value) AS total_value \
6495 FROM points \
6496 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
6497 )
6498 .await
6499 .expect("query")
6500 .collect()
6501 .await
6502 .expect("collect");
6503
6504 assert_eq!(batches.len(), 1);
6505 let batch = &batches[0];
6506 let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6507 let total = batch
6508 .column(1)
6509 .as_any()
6510 .downcast_ref::<Int64Array>()
6511 .expect("sum int64")
6512 .value(0);
6513 assert!(matches!(
6514 row_count,
6515 ScalarValue::UInt64(Some(4)) | ScalarValue::Int64(Some(4))
6516 ));
6517 assert_eq!(total, 660);
6518 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6519 assert!(
6520 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6521 "z-order aggregate should use range reduction path"
6522 );
6523
6524 let _ = shutdown_tx.send(());
6525 }
6526
6527 #[tokio::test]
6528 async fn aggregate_pushdown_avg_merges_sum_and_count_across_multiple_ranges() {
6529 let state = MockState {
6530 kv: Arc::new(Mutex::new(BTreeMap::new())),
6531 range_calls: Arc::new(AtomicUsize::new(0)),
6532 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6533 sequence_number: Arc::new(AtomicU64::new(0)),
6534 };
6535 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6536 let client = StoreClient::new(&base_url);
6537
6538 let schema = KvSchema::new(client)
6539 .table(
6540 "orders",
6541 vec![
6542 TableColumnConfig::new("id", DataType::Int64, false),
6543 TableColumnConfig::new("status", DataType::Utf8, false),
6544 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6545 ],
6546 vec!["id".to_string()],
6547 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6548 .expect("valid")
6549 .with_cover_columns(vec!["amount_cents".to_string()])],
6550 )
6551 .expect("schema");
6552
6553 let mut writer = schema.batch_writer();
6554 for (id, status, amount) in [
6555 (1, "open", 10),
6556 (2, "open", 20),
6557 (3, "closed", 100),
6558 (4, "pending", 1_000),
6559 ] {
6560 writer
6561 .insert(
6562 "orders",
6563 vec![
6564 CellValue::Int64(id),
6565 CellValue::Utf8(status.to_string()),
6566 CellValue::Int64(amount),
6567 ],
6568 )
6569 .expect("row");
6570 }
6571 writer.flush().await.expect("flush");
6572
6573 let ctx = SessionContext::new();
6574 schema.register_all(&ctx).expect("register");
6575
6576 state.range_calls.store(0, AtomicOrdering::SeqCst);
6577 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6578
6579 let batches = ctx
6580 .sql(
6581 "SELECT AVG(amount_cents) AS avg_cents \
6582 FROM orders \
6583 WHERE status IN ('open', 'closed')",
6584 )
6585 .await
6586 .expect("query")
6587 .collect()
6588 .await
6589 .expect("collect");
6590
6591 assert_eq!(batches.len(), 1);
6592 let batch = &batches[0];
6593 let avg = batch
6594 .column(0)
6595 .as_any()
6596 .downcast_ref::<Float64Array>()
6597 .expect("avg float64")
6598 .value(0);
6599 let expected = 130.0 / 3.0;
6600 assert!(
6601 (avg - expected).abs() < 1e-12,
6602 "AVG should merge SUM+COUNT across unequal-count ranges: got {avg}, expected {expected}"
6603 );
6604 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6605 assert_eq!(
6606 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6607 2,
6608 "status IN (...) should expand to two pushed reduction ranges"
6609 );
6610
6611 let _ = shutdown_tx.send(());
6612 }
6613
6614 #[tokio::test]
6615 async fn aggregate_pushdown_supports_filtered_global_aggregates() {
6616 let state = MockState {
6617 kv: Arc::new(Mutex::new(BTreeMap::new())),
6618 range_calls: Arc::new(AtomicUsize::new(0)),
6619 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6620 sequence_number: Arc::new(AtomicU64::new(0)),
6621 };
6622 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6623 let client = StoreClient::new(&base_url);
6624
6625 let schema = KvSchema::new(client)
6626 .table(
6627 "orders",
6628 vec![
6629 TableColumnConfig::new("id", DataType::Int64, false),
6630 TableColumnConfig::new("status", DataType::Utf8, false),
6631 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6632 ],
6633 vec!["id".to_string()],
6634 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6635 .expect("valid")
6636 .with_cover_columns(vec!["amount_cents".to_string()])],
6637 )
6638 .expect("schema");
6639
6640 let mut writer = schema.batch_writer();
6641 for (id, status, amount) in [
6642 (1, "open", 10),
6643 (2, "closed", 15),
6644 (3, "open", 30),
6645 (4, "closed", 40),
6646 ] {
6647 writer
6648 .insert(
6649 "orders",
6650 vec![
6651 CellValue::Int64(id),
6652 CellValue::Utf8(status.to_string()),
6653 CellValue::Int64(amount),
6654 ],
6655 )
6656 .expect("row");
6657 }
6658 writer.flush().await.expect("flush");
6659
6660 let ctx = SessionContext::new();
6661 schema.register_all(&ctx).expect("register");
6662
6663 state.range_calls.store(0, AtomicOrdering::SeqCst);
6664 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6665
6666 let query = "SELECT COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
6667 COUNT(*) FILTER (WHERE status = 'closed') AS closed_count, \
6668 AVG(amount_cents) FILTER (WHERE status = 'closed') AS closed_avg \
6669 FROM orders";
6670 let batches = ctx
6671 .sql(query)
6672 .await
6673 .expect("query")
6674 .collect()
6675 .await
6676 .expect("collect");
6677
6678 assert_eq!(batches.len(), 1);
6679 let batch = &batches[0];
6680 assert_count_scalar(batch, 0, 0, 2);
6681 assert_count_scalar(batch, 1, 0, 2);
6682 let closed_avg = batch
6683 .column(2)
6684 .as_any()
6685 .downcast_ref::<Float64Array>()
6686 .expect("avg float64")
6687 .value(0);
6688 assert_eq!(closed_avg, 27.5);
6689 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6690 assert!(
6691 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6692 "filtered aggregate pushdown should use dedicated reduction jobs"
6693 );
6694
6695 let _ = shutdown_tx.send(());
6696 }
6697
6698 #[tokio::test]
6699 async fn aggregate_pushdown_supports_case_filtered_global_aggregates() {
6700 let state = MockState {
6701 kv: Arc::new(Mutex::new(BTreeMap::new())),
6702 range_calls: Arc::new(AtomicUsize::new(0)),
6703 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6704 sequence_number: Arc::new(AtomicU64::new(0)),
6705 };
6706 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6707 let client = StoreClient::new(&base_url);
6708
6709 let schema = KvSchema::new(client)
6710 .table(
6711 "orders",
6712 vec![
6713 TableColumnConfig::new("id", DataType::Int64, false),
6714 TableColumnConfig::new("status", DataType::Utf8, false),
6715 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6716 ],
6717 vec!["id".to_string()],
6718 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6719 .expect("valid")
6720 .with_cover_columns(vec!["amount_cents".to_string()])],
6721 )
6722 .expect("schema");
6723
6724 let mut writer = schema.batch_writer();
6725 for (id, status, amount) in [
6726 (1, "open", 10),
6727 (2, "closed", 15),
6728 (3, "open", 30),
6729 (4, "closed", 40),
6730 ] {
6731 writer
6732 .insert(
6733 "orders",
6734 vec![
6735 CellValue::Int64(id),
6736 CellValue::Utf8(status.to_string()),
6737 CellValue::Int64(amount),
6738 ],
6739 )
6740 .expect("row");
6741 }
6742 writer.flush().await.expect("flush");
6743
6744 let ctx = SessionContext::new();
6745 schema.register_all(&ctx).expect("register");
6746
6747 state.range_calls.store(0, AtomicOrdering::SeqCst);
6748 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6749
6750 let query = "SELECT SUM(CASE status WHEN 'open' THEN amount_cents END) AS open_total, \
6751 COUNT(CASE status WHEN 'closed' THEN 1 END) AS closed_count, \
6752 AVG(CASE WHEN status = 'closed' THEN amount_cents END) AS closed_avg \
6753 FROM orders";
6754 let batches = ctx
6755 .sql(query)
6756 .await
6757 .expect("query")
6758 .collect()
6759 .await
6760 .expect("collect");
6761
6762 assert_eq!(batches.len(), 1);
6763 let batch = &batches[0];
6764 assert_eq!(
6765 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6766 ScalarValue::Int64(Some(40))
6767 );
6768 assert_count_scalar(batch, 1, 0, 2);
6769 let closed_avg = batch
6770 .column(2)
6771 .as_any()
6772 .downcast_ref::<Float64Array>()
6773 .expect("avg float64")
6774 .value(0);
6775 assert_eq!(closed_avg, 27.5);
6776 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6777 assert!(
6778 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6779 "case-based conditional aggregates should use reduction jobs"
6780 );
6781
6782 let _ = shutdown_tx.send(());
6783 }
6784
6785 #[tokio::test]
6786 async fn aggregate_pushdown_supports_casted_group_and_aggregate_expressions() {
6787 let state = MockState {
6788 kv: Arc::new(Mutex::new(BTreeMap::new())),
6789 range_calls: Arc::new(AtomicUsize::new(0)),
6790 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6791 sequence_number: Arc::new(AtomicU64::new(0)),
6792 };
6793 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6794 let client = StoreClient::new(&base_url);
6795
6796 let schema = KvSchema::new(client)
6797 .table(
6798 "orders",
6799 vec![
6800 TableColumnConfig::new("id", DataType::Int64, false),
6801 TableColumnConfig::new("status", DataType::Utf8, false),
6802 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6803 ],
6804 vec!["id".to_string()],
6805 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6806 .expect("valid")
6807 .with_cover_columns(vec!["amount_cents".to_string()])],
6808 )
6809 .expect("schema");
6810
6811 let mut writer = schema.batch_writer();
6812 for (id, status, amount) in [
6813 (1, "open", 10),
6814 (2, "open", 30),
6815 (3, "closed", 15),
6816 (4, "closed", 40),
6817 ] {
6818 writer
6819 .insert(
6820 "orders",
6821 vec![
6822 CellValue::Int64(id),
6823 CellValue::Utf8(status.to_string()),
6824 CellValue::Int64(amount),
6825 ],
6826 )
6827 .expect("row");
6828 }
6829 writer.flush().await.expect("flush");
6830
6831 let ctx = SessionContext::new();
6832 schema.register_all(&ctx).expect("register");
6833
6834 state.range_calls.store(0, AtomicOrdering::SeqCst);
6835 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6836
6837 let batches = ctx
6838 .sql(
6839 "SELECT CAST(status AS VARCHAR) AS status_text, \
6840 SUM(CAST(amount_cents AS DOUBLE)) AS total_cents \
6841 FROM orders \
6842 GROUP BY CAST(status AS VARCHAR) \
6843 ORDER BY status_text",
6844 )
6845 .await
6846 .expect("query")
6847 .collect()
6848 .await
6849 .expect("collect");
6850
6851 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
6852 let batch = &batches[0];
6853 let status = ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar");
6854 assert_eq!(scalar_to_string(&status).as_deref(), Some("closed"));
6855 let closed_total = batch
6856 .column(1)
6857 .as_any()
6858 .downcast_ref::<Float64Array>()
6859 .expect("sum float64")
6860 .value(0);
6861 let open_total = batch
6862 .column(1)
6863 .as_any()
6864 .downcast_ref::<Float64Array>()
6865 .expect("sum float64")
6866 .value(1);
6867 assert_eq!(closed_total, 55.0);
6868 assert_eq!(open_total, 40.0);
6869 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6870 assert!(
6871 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6872 "casted grouped aggregates should stay on the reduction path"
6873 );
6874
6875 let _ = shutdown_tx.send(());
6876 }
6877
6878 #[tokio::test]
6879 async fn aggregate_pushdown_supports_computed_aggregate_inputs() {
6880 let state = MockState {
6881 kv: Arc::new(Mutex::new(BTreeMap::new())),
6882 range_calls: Arc::new(AtomicUsize::new(0)),
6883 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6884 sequence_number: Arc::new(AtomicU64::new(0)),
6885 };
6886 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6887 let client = StoreClient::new(&base_url);
6888
6889 let schema = KvSchema::new(client)
6890 .table(
6891 "orders",
6892 vec![
6893 TableColumnConfig::new("id", DataType::Int64, false),
6894 TableColumnConfig::new("price_cents", DataType::Int64, false),
6895 TableColumnConfig::new("qty", DataType::Int64, false),
6896 TableColumnConfig::new("duration_ms", DataType::Int64, false),
6897 ],
6898 vec!["id".to_string()],
6899 vec![],
6900 )
6901 .expect("schema");
6902
6903 let mut writer = schema.batch_writer();
6904 for (id, price, qty, duration_ms) in [(1, 10, 2, 500), (2, 15, 3, 2500), (3, 7, 4, 1000)] {
6905 writer
6906 .insert(
6907 "orders",
6908 vec![
6909 CellValue::Int64(id),
6910 CellValue::Int64(price),
6911 CellValue::Int64(qty),
6912 CellValue::Int64(duration_ms),
6913 ],
6914 )
6915 .expect("row");
6916 }
6917 writer.flush().await.expect("flush");
6918
6919 let ctx = SessionContext::new();
6920 schema.register_all(&ctx).expect("register");
6921
6922 state.range_calls.store(0, AtomicOrdering::SeqCst);
6923 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6924
6925 let batches = ctx
6926 .sql(
6927 "SELECT SUM(price_cents * qty) AS total_revenue, \
6928 AVG(duration_ms / 1e3) AS avg_seconds \
6929 FROM orders",
6930 )
6931 .await
6932 .expect("query")
6933 .collect()
6934 .await
6935 .expect("collect");
6936
6937 assert_eq!(batches.len(), 1);
6938 let batch = &batches[0];
6939 assert_eq!(
6940 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6941 ScalarValue::Int64(Some(93))
6942 );
6943 let avg_seconds = batch
6944 .column(1)
6945 .as_any()
6946 .downcast_ref::<Float64Array>()
6947 .expect("avg float64")
6948 .value(0);
6949 assert!((avg_seconds - (4.0 / 3.0)).abs() < 1e-12);
6950 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6951 assert!(
6952 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
6953 "computed aggregate inputs should use reduction jobs"
6954 );
6955
6956 let _ = shutdown_tx.send(());
6957 }
6958
6959 #[tokio::test]
6960 async fn aggregate_pushdown_supports_add_and_subtract_inputs() {
6961 let state = MockState {
6962 kv: Arc::new(Mutex::new(BTreeMap::new())),
6963 range_calls: Arc::new(AtomicUsize::new(0)),
6964 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6965 sequence_number: Arc::new(AtomicU64::new(0)),
6966 };
6967 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6968 let client = StoreClient::new(&base_url);
6969
6970 let schema = KvSchema::new(client)
6971 .table(
6972 "orders",
6973 vec![
6974 TableColumnConfig::new("id", DataType::Int64, false),
6975 TableColumnConfig::new("price_cents", DataType::Int64, false),
6976 TableColumnConfig::new("fee_cents", DataType::Int64, false),
6977 TableColumnConfig::new("discount_cents", DataType::Int64, false),
6978 ],
6979 vec!["id".to_string()],
6980 vec![],
6981 )
6982 .expect("schema");
6983
6984 let mut writer = schema.batch_writer();
6985 for (id, price, fee, discount) in [(1, 10, 2, 1), (2, 15, 3, 4), (3, 7, 1, 2)] {
6986 writer
6987 .insert(
6988 "orders",
6989 vec![
6990 CellValue::Int64(id),
6991 CellValue::Int64(price),
6992 CellValue::Int64(fee),
6993 CellValue::Int64(discount),
6994 ],
6995 )
6996 .expect("row");
6997 }
6998 writer.flush().await.expect("flush");
6999
7000 let ctx = SessionContext::new();
7001 schema.register_all(&ctx).expect("register");
7002
7003 state.range_calls.store(0, AtomicOrdering::SeqCst);
7004 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7005
7006 let batches = ctx
7007 .sql(
7008 "SELECT SUM(price_cents + fee_cents) AS gross_plus_fee, \
7009 SUM(price_cents - discount_cents) AS net_total \
7010 FROM orders",
7011 )
7012 .await
7013 .expect("query")
7014 .collect()
7015 .await
7016 .expect("collect");
7017
7018 assert_eq!(batches.len(), 1);
7019 let batch = &batches[0];
7020 assert_eq!(
7021 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7022 ScalarValue::Int64(Some(38))
7023 );
7024 assert_eq!(
7025 ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7026 ScalarValue::Int64(Some(25))
7027 );
7028 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7029 assert!(
7030 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7031 "add/sub aggregate inputs should use reduction jobs"
7032 );
7033
7034 let _ = shutdown_tx.send(());
7035 }
7036
7037 #[tokio::test]
7038 async fn aggregate_pushdown_supports_case_filtered_computed_aggregates() {
7039 let state = MockState {
7040 kv: Arc::new(Mutex::new(BTreeMap::new())),
7041 range_calls: Arc::new(AtomicUsize::new(0)),
7042 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7043 sequence_number: Arc::new(AtomicU64::new(0)),
7044 };
7045 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7046 let client = StoreClient::new(&base_url);
7047
7048 let schema = KvSchema::new(client)
7049 .table(
7050 "orders",
7051 vec![
7052 TableColumnConfig::new("id", DataType::Int64, false),
7053 TableColumnConfig::new("status", DataType::Utf8, false),
7054 TableColumnConfig::new("price_cents", DataType::Int64, false),
7055 TableColumnConfig::new("qty", DataType::Int64, false),
7056 ],
7057 vec!["id".to_string()],
7058 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7059 .expect("valid")
7060 .with_cover_columns(vec!["price_cents".to_string(), "qty".to_string()])],
7061 )
7062 .expect("schema");
7063
7064 let mut writer = schema.batch_writer();
7065 for (id, status, price, qty) in [
7066 (1, "open", 10, 2),
7067 (2, "closed", 99, 1),
7068 (3, "open", 15, 3),
7069 (4, "closed", 7, 4),
7070 ] {
7071 writer
7072 .insert(
7073 "orders",
7074 vec![
7075 CellValue::Int64(id),
7076 CellValue::Utf8(status.to_string()),
7077 CellValue::Int64(price),
7078 CellValue::Int64(qty),
7079 ],
7080 )
7081 .expect("row");
7082 }
7083 writer.flush().await.expect("flush");
7084
7085 let ctx = SessionContext::new();
7086 schema.register_all(&ctx).expect("register");
7087
7088 state.range_calls.store(0, AtomicOrdering::SeqCst);
7089 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7090
7091 let batches = ctx
7092 .sql(
7093 "SELECT SUM(CASE WHEN status = 'open' THEN price_cents * qty END) \
7094 AS open_revenue \
7095 FROM orders",
7096 )
7097 .await
7098 .expect("query")
7099 .collect()
7100 .await
7101 .expect("collect");
7102
7103 assert_eq!(batches.len(), 1);
7104 let batch = &batches[0];
7105 assert_eq!(
7106 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7107 ScalarValue::Int64(Some(65))
7108 );
7109 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7110 assert!(
7111 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7112 "case-filtered computed aggregate should use reduction jobs"
7113 );
7114
7115 let _ = shutdown_tx.send(());
7116 }
7117
7118 #[tokio::test]
7119 async fn aggregate_pushdown_does_not_rewrite_sum_case_else_zero_semantics() {
7120 let state = MockState {
7121 kv: Arc::new(Mutex::new(BTreeMap::new())),
7122 range_calls: Arc::new(AtomicUsize::new(0)),
7123 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7124 sequence_number: Arc::new(AtomicU64::new(0)),
7125 };
7126 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7127 let client = StoreClient::new(&base_url);
7128
7129 let schema = KvSchema::new(client)
7130 .table(
7131 "orders",
7132 vec![
7133 TableColumnConfig::new("id", DataType::Int64, false),
7134 TableColumnConfig::new("region", DataType::Utf8, false),
7135 TableColumnConfig::new("status", DataType::Utf8, false),
7136 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7137 ],
7138 vec!["id".to_string()],
7139 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7140 .expect("valid")
7141 .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7142 )
7143 .expect("schema");
7144
7145 let mut writer = schema.batch_writer();
7146 for (id, region, status, amount) in [
7147 (1, "east", "open", 10),
7148 (2, "east", "closed", 20),
7149 (3, "west", "closed", 30),
7150 ] {
7151 writer
7152 .insert(
7153 "orders",
7154 vec![
7155 CellValue::Int64(id),
7156 CellValue::Utf8(region.to_string()),
7157 CellValue::Utf8(status.to_string()),
7158 CellValue::Int64(amount),
7159 ],
7160 )
7161 .expect("row");
7162 }
7163 writer.flush().await.expect("flush");
7164
7165 let ctx = SessionContext::new();
7166 schema.register_all(&ctx).expect("register");
7167
7168 state.range_calls.store(0, AtomicOrdering::SeqCst);
7169 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7170
7171 let batches = ctx
7172 .sql(
7173 "SELECT region, \
7174 SUM(CASE WHEN status = 'open' THEN amount_cents ELSE 0 END) AS open_total \
7175 FROM orders \
7176 GROUP BY region \
7177 ORDER BY region",
7178 )
7179 .await
7180 .expect("query")
7181 .collect()
7182 .await
7183 .expect("collect");
7184
7185 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7186 let batch = &batches[0];
7187 assert_eq!(
7188 ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7189 ScalarValue::Utf8(Some("east".to_string()))
7190 );
7191 assert_eq!(
7192 ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7193 ScalarValue::Int64(Some(10))
7194 );
7195 assert_eq!(
7196 ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7197 ScalarValue::Utf8(Some("west".to_string()))
7198 );
7199 assert_eq!(
7200 ScalarValue::try_from_array(batch.column(1), 1).expect("sum scalar"),
7201 ScalarValue::Int64(Some(0))
7202 );
7203 assert_eq!(
7204 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
7205 0,
7206 "SUM(CASE ... ELSE 0 END) must not push down because FILTER rewrite changes semantics"
7207 );
7208
7209 let _ = shutdown_tx.send(());
7210 }
7211
7212 #[tokio::test]
7213 async fn aggregate_pushdown_supports_computed_group_keys() {
7214 let state = MockState {
7215 kv: Arc::new(Mutex::new(BTreeMap::new())),
7216 range_calls: Arc::new(AtomicUsize::new(0)),
7217 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7218 sequence_number: Arc::new(AtomicU64::new(0)),
7219 };
7220 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7221 let client = StoreClient::new(&base_url);
7222
7223 let schema = KvSchema::new(client)
7224 .table(
7225 "events",
7226 vec![
7227 TableColumnConfig::new("id", DataType::Int64, false),
7228 TableColumnConfig::new("country", DataType::Utf8, false),
7229 TableColumnConfig::new(
7230 "occurred_at",
7231 DataType::Timestamp(TimeUnit::Microsecond, None),
7232 false,
7233 ),
7234 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7235 ],
7236 vec!["id".to_string()],
7237 vec![],
7238 )
7239 .expect("schema");
7240
7241 let day_micros = 86_400_000_000i64;
7242 let day0 = 1_700_000_000_000_000i64;
7243 let day1 = day0 + day_micros;
7244 let day0_bucket = day0.div_euclid(day_micros) * day_micros;
7245 let day1_bucket = day1.div_euclid(day_micros) * day_micros;
7246 let mut writer = schema.batch_writer();
7247 for (id, country, occurred_at, amount) in [
7248 (1, "East", day0 + 111, 10),
7249 (2, "east", day0 + 222, 30),
7250 (3, "West", day1 + 333, 7),
7251 ] {
7252 writer
7253 .insert(
7254 "events",
7255 vec![
7256 CellValue::Int64(id),
7257 CellValue::Utf8(country.to_string()),
7258 CellValue::Timestamp(occurred_at),
7259 CellValue::Int64(amount),
7260 ],
7261 )
7262 .expect("row");
7263 }
7264 writer.flush().await.expect("flush");
7265
7266 let ctx = SessionContext::new();
7267 schema.register_all(&ctx).expect("register");
7268
7269 state.range_calls.store(0, AtomicOrdering::SeqCst);
7270 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7271
7272 let batches = ctx
7273 .sql(
7274 "SELECT lower(country) AS country_norm, \
7275 date_trunc('day', occurred_at) AS day_bucket, \
7276 SUM(amount_cents) AS total_cents \
7277 FROM events \
7278 GROUP BY lower(country), date_trunc('day', occurred_at) \
7279 ORDER BY country_norm, day_bucket",
7280 )
7281 .await
7282 .expect("query")
7283 .collect()
7284 .await
7285 .expect("collect");
7286
7287 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7288 let batch = &batches[0];
7289 assert_eq!(
7290 scalar_to_string(
7291 &ScalarValue::try_from_array(batch.column(0), 0).expect("country scalar")
7292 )
7293 .as_deref(),
7294 Some("east")
7295 );
7296 assert_eq!(
7297 ScalarValue::try_from_array(batch.column(1), 0).expect("day scalar"),
7298 ScalarValue::TimestampMicrosecond(Some(day0_bucket), None)
7299 );
7300 assert_eq!(
7301 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7302 ScalarValue::Int64(Some(40))
7303 );
7304 assert_eq!(
7305 scalar_to_string(
7306 &ScalarValue::try_from_array(batch.column(0), 1).expect("country scalar")
7307 )
7308 .as_deref(),
7309 Some("west")
7310 );
7311 assert_eq!(
7312 ScalarValue::try_from_array(batch.column(1), 1).expect("day scalar"),
7313 ScalarValue::TimestampMicrosecond(Some(day1_bucket), None)
7314 );
7315 assert_eq!(
7316 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7317 ScalarValue::Int64(Some(7))
7318 );
7319 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7320 assert!(
7321 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7322 "computed group keys should use grouped reduction path"
7323 );
7324
7325 let _ = shutdown_tx.send(());
7326 }
7327
7328 #[tokio::test]
7329 async fn aggregate_pushdown_supports_group_by_queries() {
7330 let state = MockState {
7331 kv: Arc::new(Mutex::new(BTreeMap::new())),
7332 range_calls: Arc::new(AtomicUsize::new(0)),
7333 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7334 sequence_number: Arc::new(AtomicU64::new(0)),
7335 };
7336 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7337 let client = StoreClient::new(&base_url);
7338
7339 let schema = KvSchema::new(client)
7340 .table(
7341 "orders",
7342 vec![
7343 TableColumnConfig::new("id", DataType::Int64, false),
7344 TableColumnConfig::new("status", DataType::Utf8, false),
7345 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7346 ],
7347 vec!["id".to_string()],
7348 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7349 .expect("valid")
7350 .with_cover_columns(vec!["amount_cents".to_string()])],
7351 )
7352 .expect("schema");
7353
7354 let mut writer = schema.batch_writer();
7355 for (id, status, amount) in [
7356 (1, "open", 10),
7357 (2, "open", 30),
7358 (3, "closed", 15),
7359 (4, "closed", 40),
7360 ] {
7361 writer
7362 .insert(
7363 "orders",
7364 vec![
7365 CellValue::Int64(id),
7366 CellValue::Utf8(status.to_string()),
7367 CellValue::Int64(amount),
7368 ],
7369 )
7370 .expect("row");
7371 }
7372 writer.flush().await.expect("flush");
7373
7374 let ctx = SessionContext::new();
7375 schema.register_all(&ctx).expect("register");
7376
7377 state.range_calls.store(0, AtomicOrdering::SeqCst);
7378 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7379
7380 let batches = ctx
7381 .sql(
7382 "SELECT status, COUNT(*) AS row_count, SUM(amount_cents) AS total_cents \
7383 FROM orders GROUP BY status ORDER BY status",
7384 )
7385 .await
7386 .expect("query")
7387 .collect()
7388 .await
7389 .expect("collect");
7390
7391 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7392 let batch = &batches[0];
7393 assert_eq!(
7394 ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar"),
7395 ScalarValue::Utf8(Some("closed".to_string()))
7396 );
7397 assert_count_scalar(batch, 1, 0, 2);
7398 assert_eq!(
7399 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7400 ScalarValue::Int64(Some(55))
7401 );
7402 assert_eq!(
7403 ScalarValue::try_from_array(batch.column(0), 1).expect("status scalar"),
7404 ScalarValue::Utf8(Some("open".to_string()))
7405 );
7406 assert_count_scalar(batch, 1, 1, 2);
7407 assert_eq!(
7408 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7409 ScalarValue::Int64(Some(40))
7410 );
7411 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7412 assert!(
7413 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7414 "group-by aggregate should use grouped range reduction path"
7415 );
7416
7417 let _ = shutdown_tx.send(());
7418 }
7419
7420 #[tokio::test]
7421 async fn aggregate_pushdown_group_by_float_canonicalizes_signed_zero() {
7422 let state = MockState {
7423 kv: Arc::new(Mutex::new(BTreeMap::new())),
7424 range_calls: Arc::new(AtomicUsize::new(0)),
7425 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7426 sequence_number: Arc::new(AtomicU64::new(0)),
7427 };
7428 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7429 let client = StoreClient::new(&base_url);
7430
7431 let schema = KvSchema::new(client)
7432 .table(
7433 "metrics",
7434 vec![
7435 TableColumnConfig::new("id", DataType::Int64, false),
7436 TableColumnConfig::new("score", DataType::Float64, false),
7437 ],
7438 vec!["id".to_string()],
7439 vec![],
7440 )
7441 .expect("schema");
7442
7443 let mut writer = schema.batch_writer();
7444 for (id, score) in [(1, -0.0), (2, 0.0), (3, 1.5)] {
7445 writer
7446 .insert(
7447 "metrics",
7448 vec![CellValue::Int64(id), CellValue::Float64(score)],
7449 )
7450 .expect("row");
7451 }
7452 writer.flush().await.expect("flush");
7453
7454 let ctx = SessionContext::new();
7455 schema.register_all(&ctx).expect("register");
7456
7457 state.range_calls.store(0, AtomicOrdering::SeqCst);
7458 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7459
7460 let batches = ctx
7461 .sql(
7462 "SELECT score, COUNT(*) AS row_count \
7463 FROM metrics GROUP BY score ORDER BY row_count DESC, score",
7464 )
7465 .await
7466 .expect("query")
7467 .collect()
7468 .await
7469 .expect("collect");
7470
7471 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7472 let batch = &batches[0];
7473 let top_score = batch
7474 .column(0)
7475 .as_any()
7476 .downcast_ref::<Float64Array>()
7477 .expect("score float64")
7478 .value(0);
7479 assert_eq!(top_score.to_bits(), 0.0f64.to_bits());
7480 assert_count_scalar(batch, 1, 0, 2);
7481 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7482 assert!(
7483 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7484 "float group-by aggregate should stay on grouped reduction path"
7485 );
7486
7487 let _ = shutdown_tx.send(());
7488 }
7489
7490 #[tokio::test]
7491 async fn aggregate_pushdown_supports_filtered_group_by_queries() {
7492 let state = MockState {
7493 kv: Arc::new(Mutex::new(BTreeMap::new())),
7494 range_calls: Arc::new(AtomicUsize::new(0)),
7495 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7496 sequence_number: Arc::new(AtomicU64::new(0)),
7497 };
7498 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7499 let client = StoreClient::new(&base_url);
7500
7501 let schema = KvSchema::new(client)
7502 .table(
7503 "orders",
7504 vec![
7505 TableColumnConfig::new("id", DataType::Int64, false),
7506 TableColumnConfig::new("region", DataType::Utf8, false),
7507 TableColumnConfig::new("status", DataType::Utf8, false),
7508 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7509 ],
7510 vec!["id".to_string()],
7511 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7512 .expect("valid")
7513 .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7514 )
7515 .expect("schema");
7516
7517 let mut writer = schema.batch_writer();
7518 for (id, region, status, amount) in [
7519 (1, "east", "open", 10),
7520 (2, "east", "closed", 20),
7521 (3, "west", "open", 30),
7522 (4, "north", "closed", 40),
7523 ] {
7524 writer
7525 .insert(
7526 "orders",
7527 vec![
7528 CellValue::Int64(id),
7529 CellValue::Utf8(region.to_string()),
7530 CellValue::Utf8(status.to_string()),
7531 CellValue::Int64(amount),
7532 ],
7533 )
7534 .expect("row");
7535 }
7536 writer.flush().await.expect("flush");
7537
7538 let ctx = SessionContext::new();
7539 schema.register_all(&ctx).expect("register");
7540
7541 state.range_calls.store(0, AtomicOrdering::SeqCst);
7542 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7543
7544 let batches = ctx
7545 .sql(
7546 "SELECT region, \
7547 COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
7548 SUM(amount_cents) FILTER (WHERE status = 'closed') AS closed_total \
7549 FROM orders \
7550 GROUP BY region \
7551 ORDER BY region",
7552 )
7553 .await
7554 .expect("query")
7555 .collect()
7556 .await
7557 .expect("collect");
7558
7559 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
7560 let batch = &batches[0];
7561
7562 assert_eq!(
7563 ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7564 ScalarValue::Utf8(Some("east".to_string()))
7565 );
7566 assert_count_scalar(batch, 1, 0, 1);
7567 assert_eq!(
7568 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7569 ScalarValue::Int64(Some(20))
7570 );
7571
7572 assert_eq!(
7573 ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7574 ScalarValue::Utf8(Some("north".to_string()))
7575 );
7576 assert_count_scalar(batch, 1, 1, 0);
7577 assert_eq!(
7578 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7579 ScalarValue::Int64(Some(40))
7580 );
7581
7582 assert_eq!(
7583 ScalarValue::try_from_array(batch.column(0), 2).expect("region scalar"),
7584 ScalarValue::Utf8(Some("west".to_string()))
7585 );
7586 assert_count_scalar(batch, 1, 2, 1);
7587 assert_eq!(
7588 ScalarValue::try_from_array(batch.column(2), 2).expect("sum scalar"),
7589 ScalarValue::Int64(None)
7590 );
7591
7592 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7593 assert!(
7594 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
7595 "filtered group-by aggregate should use grouped reduction plus seed job"
7596 );
7597
7598 let _ = shutdown_tx.send(());
7599 }
7600
7601 mod e2e {
7602 use super::*;
7603 use axum::{routing::get, Router};
7604 use datafusion::prelude::SessionContext;
7605 use exoware_sdk_rs::StoreClient;
7606 use exoware_server::{connect_stack, AppState};
7607 use exoware_simulator::RocksStore;
7608 use tempfile::tempdir;
7609
7610 struct TestServers {
7611 ingest_url: String,
7612 query_url: String,
7613 }
7614
7615 impl TestServers {
7616 fn client(&self) -> StoreClient {
7617 StoreClient::with_split_urls(
7618 &self.query_url,
7619 &self.ingest_url,
7620 &self.query_url,
7621 &self.ingest_url,
7622 )
7623 }
7624 }
7625
7626 async fn spawn_e2e_servers() -> TestServers {
7627 let dir = tempdir().expect("tempdir");
7628 let db = RocksStore::open(dir.path()).expect("db");
7629 let state = AppState::new(std::sync::Arc::new(db));
7630 let connect = connect_stack(state);
7631 let app = Router::new()
7632 .route("/health", get(|| async { "ok" }))
7633 .fallback_service(connect);
7634 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
7635 .await
7636 .expect("bind");
7637 let url = format!("http://{}", listener.local_addr().unwrap());
7638 tokio::spawn(async move {
7639 axum::serve(listener, app).await.expect("serve");
7640 });
7641 for _ in 0..200 {
7642 if reqwest::get(format!("{url}/health"))
7643 .await
7644 .ok()
7645 .is_some_and(|r| r.status().is_success())
7646 {
7647 return TestServers {
7648 ingest_url: url.clone(),
7649 query_url: url,
7650 };
7651 }
7652 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
7653 }
7654 panic!("e2e simulator did not become ready");
7655 }
7656
7657 #[tokio::test]
7658 async fn sql_insert_and_select_through_real_ingest_query_workers() {
7659 let servers = spawn_e2e_servers().await;
7660 let client = servers.client();
7661
7662 let schema = KvSchema::new(client)
7663 .table(
7664 "orders",
7665 vec![
7666 TableColumnConfig::new("id", DataType::Int64, false),
7667 TableColumnConfig::new("status", DataType::Utf8, false),
7668 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7669 ],
7670 vec!["id".to_string()],
7671 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7672 .expect("valid")
7673 .with_cover_columns(vec!["amount_cents".to_string()])],
7674 )
7675 .expect("schema");
7676
7677 let mut writer = schema.batch_writer();
7678 for (id, status, amount) in [
7679 (1i64, "open", 100i64),
7680 (2, "closed", 200),
7681 (3, "open", 300),
7682 (4, "closed", 400),
7683 (5, "open", 500),
7684 ] {
7685 writer
7686 .insert(
7687 "orders",
7688 vec![
7689 CellValue::Int64(id),
7690 CellValue::Utf8(status.to_string()),
7691 CellValue::Int64(amount),
7692 ],
7693 )
7694 .expect("insert row");
7695 }
7696 writer.flush().await.expect("flush batch");
7697
7698 let ctx = SessionContext::new();
7699 schema.register_all(&ctx).expect("register tables");
7700
7701 let batches = ctx
7702 .sql("SELECT id, amount_cents FROM orders ORDER BY id")
7703 .await
7704 .expect("full scan query")
7705 .collect()
7706 .await
7707 .expect("collect full scan");
7708 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
7709 assert_eq!(total_rows, 5, "all 5 rows returned from full scan");
7710
7711 let mut ids = Vec::new();
7712 let mut amounts = Vec::new();
7713 for batch in &batches {
7714 let id_col = batch
7715 .column(0)
7716 .as_any()
7717 .downcast_ref::<Int64Array>()
7718 .expect("id column");
7719 let amt_col = batch
7720 .column(1)
7721 .as_any()
7722 .downcast_ref::<Int64Array>()
7723 .expect("amount column");
7724 for i in 0..batch.num_rows() {
7725 ids.push(id_col.value(i));
7726 amounts.push(amt_col.value(i));
7727 }
7728 }
7729 assert_eq!(ids, vec![1, 2, 3, 4, 5]);
7730 assert_eq!(amounts, vec![100, 200, 300, 400, 500]);
7731
7732 let filtered = ctx
7733 .sql(
7734 "SELECT id, amount_cents FROM orders \
7735 WHERE status = 'open' ORDER BY id",
7736 )
7737 .await
7738 .expect("filtered query")
7739 .collect()
7740 .await
7741 .expect("collect filtered");
7742 let mut filtered_ids = Vec::new();
7743 let mut filtered_amounts = Vec::new();
7744 for batch in &filtered {
7745 let id_col = batch
7746 .column(0)
7747 .as_any()
7748 .downcast_ref::<Int64Array>()
7749 .expect("id column");
7750 let amt_col = batch
7751 .column(1)
7752 .as_any()
7753 .downcast_ref::<Int64Array>()
7754 .expect("amount column");
7755 for i in 0..batch.num_rows() {
7756 filtered_ids.push(id_col.value(i));
7757 filtered_amounts.push(amt_col.value(i));
7758 }
7759 }
7760 assert_eq!(filtered_ids, vec![1, 3, 5]);
7761 assert_eq!(filtered_amounts, vec![100, 300, 500]);
7762
7763 let agg = ctx
7764 .sql(
7765 "SELECT COUNT(*) AS cnt, SUM(amount_cents) AS total \
7766 FROM orders WHERE status = 'open'",
7767 )
7768 .await
7769 .expect("aggregate query")
7770 .collect()
7771 .await
7772 .expect("collect aggregate");
7773 assert_eq!(agg.len(), 1);
7774 let batch = &agg[0];
7775 assert_eq!(batch.num_rows(), 1);
7776 assert_count_scalar(batch, 0, 0, 3);
7777 let total = ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar");
7778 match total {
7779 ScalarValue::Int64(Some(v)) => assert_eq!(v, 900),
7780 other => panic!("unexpected sum type: {other:?}"),
7781 }
7782 }
7783 }
7784}