1pub mod prune;
2pub mod server;
3
4mod aggregate;
5mod builder;
6mod codec;
7mod diagnostics;
8mod filter;
9mod predicate;
10mod scan;
11mod schema;
12mod types;
13mod writer;
14
15pub use schema::KvSchema;
16pub use server::{sql_connect_stack, SqlConnect, SqlServer};
17pub use types::default_orders_index_specs;
18pub use types::{
19 CellValue, IndexBackfillEvent, IndexBackfillOptions, IndexBackfillReport, IndexLayout,
20 IndexSpec, TableColumnConfig,
21};
22pub use writer::{BatchReceipt, BatchWriter, PreparedBatch, TableWriter};
23
24#[cfg(test)]
25mod tests {
26 use super::aggregate::*;
27 use super::builder::*;
28 use super::codec::*;
29 use super::diagnostics::*;
30 use super::filter::*;
31 use super::predicate::*;
32 use super::scan::*;
33 use super::types::*;
34 use super::writer::*;
35 use super::*;
36 use commonware_codec::Encode;
37 use datafusion::arrow::array::{Float64Array, Int64Array, LargeStringArray, StringViewArray};
38 use datafusion::arrow::datatypes::{i256, DataType, TimeUnit};
39 use datafusion::arrow::record_batch::RecordBatch;
40 use datafusion::common::ScalarValue;
41 use datafusion::logical_expr::{Expr, Operator};
42 use datafusion::physical_plan::ExecutionPlan;
43 use datafusion::prelude::SessionContext;
44 use exoware_sdk::keys::{Key, KeyCodec};
45 use exoware_sdk::kv_codec::{
46 canonicalize_reduced_group_values, decode_stored_row, encode_reduced_group_key,
47 eval_predicate, KvReducedValue, StoredRow,
48 };
49 use exoware_sdk::{RangeReduceOp, RangeReduceRequest, StoreBatchUpload, StoreClient};
50 use std::collections::{BTreeMap, HashSet};
51 use std::ops::Bound::{Included, Unbounded};
52 use std::pin::Pin;
53 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
54 use std::sync::{Arc, Mutex};
55 use std::time::Duration;
56
57 use axum::Router;
58 use bytes::Bytes;
59 use connectrpc::{Chain, ConnectError, ConnectRpcService, Context};
60 use exoware_sdk::connect_compression_registry;
61 use exoware_sdk::kv_codec::{eval_expr, expr_needs_value};
62 use exoware_sdk::store::common::v1::KvEntry as ProtoKvEntry;
63 use exoware_sdk::store::ingest::v1::{
64 PutResponse as ProtoPutResponse, Service as IngestService,
65 ServiceServer as IngestServiceServer,
66 };
67 use exoware_sdk::store::query::v1::{
68 GetManyEntry as ProtoGetManyEntry, GetManyFrame as ProtoGetManyFrame,
69 GetResponse as ProtoGetResponse, RangeFrame as ProtoRangeFrame,
70 ReduceResponse as ProtoReduceResponse, Service as QueryService,
71 ServiceServer as QueryServiceServer,
72 };
73 use exoware_sdk::RangeMode;
74 use exoware_sdk::{
75 parse_range_traversal_direction, to_domain_reduce_request, to_proto_optional_reduced_value,
76 to_proto_reduced_value, RangeTraversalDirection, RangeTraversalModeError,
77 };
78 use exoware_sdk::{RangeReduceGroup, RangeReduceResponse, RangeReduceResult};
79 use futures::{stream, Stream, TryStreamExt};
80 use tokio::sync::{mpsc, oneshot, Notify};
81
82 fn assert_explain_includes_query_stats_surface(
84 explain: &str,
85 surface: QueryStatsExplainSurface,
86 ) {
87 let expected = format!("query_stats={}", format_query_stats_explain(surface));
88 assert!(
89 explain.contains(&expected),
90 "expected EXPLAIN output to include `{expected}`\n{explain}"
91 );
92 }
93
94 fn simple_int64_model(prefix: u8) -> TableModel {
95 let config = KvTableConfig::new(
96 prefix,
97 vec![TableColumnConfig::new("id", DataType::Int64, false)],
98 vec!["id".to_string()],
99 vec![],
100 )
101 .unwrap();
102 TableModel::from_config(&config).unwrap()
103 }
104
105 fn codec_payload(codec: KeyCodec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
106 codec.read_payload(key, offset, len).expect("codec payload")
107 }
108
109 fn primary_payload(model: &TableModel, key: &Key, offset: usize, len: usize) -> Vec<u8> {
110 codec_payload(model.primary_key_codec, key, offset, len)
111 }
112
113 fn index_payload(spec: &ResolvedIndexSpec, key: &Key, offset: usize, len: usize) -> Vec<u8> {
114 codec_payload(spec.codec, key, offset, len)
115 }
116
117 fn matches_primary_key(table_prefix: u8, key: &Key) -> bool {
118 primary_key_codec(table_prefix)
119 .expect("primary codec")
120 .matches(key)
121 }
122
123 fn matches_secondary_index_key(table_prefix: u8, index_id: u8, key: &Key) -> bool {
124 secondary_index_codec(table_prefix, index_id)
125 .expect("secondary codec")
126 .matches(key)
127 }
128
129 fn test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
130 let config = KvTableConfig::new(
131 0,
132 vec![
133 TableColumnConfig::new("region", DataType::Utf8, false),
134 TableColumnConfig::new("customer_id", DataType::Int64, false),
135 TableColumnConfig::new("order_id", DataType::Int64, false),
136 TableColumnConfig::new("amount_cents", DataType::Int64, false),
137 TableColumnConfig::new("status", DataType::Utf8, false),
138 ],
139 vec!["order_id".to_string()],
140 vec![
141 IndexSpec::new(
142 "region_customer",
143 vec!["region".to_string(), "customer_id".to_string()],
144 )
145 .expect("valid"),
146 IndexSpec::new(
147 "status_customer",
148 vec!["status".to_string(), "customer_id".to_string()],
149 )
150 .expect("valid"),
151 ],
152 )
153 .expect("valid config");
154 let model = TableModel::from_config(&config).expect("model");
155 let specs = model
156 .resolve_index_specs(&config.index_specs)
157 .expect("specs");
158 (model, specs)
159 }
160
161 fn zorder_test_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
162 let config = KvTableConfig::new(
163 0,
164 vec![
165 TableColumnConfig::new("x", DataType::Int64, false),
166 TableColumnConfig::new("y", DataType::Int64, false),
167 TableColumnConfig::new("id", DataType::Int64, false),
168 TableColumnConfig::new("value", DataType::Int64, false),
169 ],
170 vec!["id".to_string()],
171 vec![
172 IndexSpec::new("xy_lex", vec!["x".to_string(), "y".to_string()])
173 .expect("valid")
174 .with_cover_columns(vec!["value".to_string()]),
175 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
176 .expect("valid")
177 .with_cover_columns(vec!["value".to_string()]),
178 ],
179 )
180 .expect("valid config");
181 let model = TableModel::from_config(&config).expect("model");
182 let specs = model
183 .resolve_index_specs(&config.index_specs)
184 .expect("specs");
185 (model, specs)
186 }
187
188 #[derive(Clone)]
189 struct MockState {
190 kv: Arc<Mutex<BTreeMap<Key, Bytes>>>,
191 range_calls: Arc<AtomicUsize>,
192 range_reduce_calls: Arc<AtomicUsize>,
193 sequence_number: Arc<AtomicU64>,
194 }
195
196 #[derive(Debug)]
197 struct MockGroupedReduceState {
198 group_values: Vec<Option<KvReducedValue>>,
199 states: Vec<PartialAggregateState>,
200 }
201
202 type MockReduceRow = (Vec<Option<KvReducedValue>>, Vec<Option<KvReducedValue>>);
203
204 fn extract_mock_reduce_row(
205 key: &Key,
206 value: &Bytes,
207 request: &RangeReduceRequest,
208 ) -> Option<MockReduceRow> {
209 let needs_value = request
210 .group_by
211 .iter()
212 .chain(
213 request
214 .reducers
215 .iter()
216 .filter_map(|reducer| reducer.expr.as_ref()),
217 )
218 .any(expr_needs_value)
219 || request
220 .filter
221 .as_ref()
222 .is_some_and(exoware_sdk::kv_codec::predicate_needs_value);
223 let archived = if needs_value {
224 decode_stored_row(value.as_ref()).ok()
225 } else {
226 None
227 };
228
229 if let Some(filter) = &request.filter {
230 if !eval_predicate(key, archived.as_ref(), filter).ok()? {
231 return None;
232 }
233 }
234
235 let mut group_values = Vec::with_capacity(request.group_by.len());
236 for expr in &request.group_by {
237 let extracted_value = eval_expr(key, archived.as_ref(), expr).ok()?;
238 group_values.push(extracted_value);
239 }
240 canonicalize_reduced_group_values(&mut group_values);
241
242 let mut reducer_values = Vec::with_capacity(request.reducers.len());
243 for reducer in &request.reducers {
244 let extracted_value = match (&reducer.expr, archived.as_ref()) {
245 (None, _) => None,
246 (Some(expr), _) => eval_expr(key, archived.as_ref(), expr).ok()?,
247 };
248 reducer_values.push(extracted_value);
249 }
250
251 Some((group_values, reducer_values))
252 }
253
254 #[allow(clippy::result_large_err)]
255 fn ensure_min_sequence_number(
256 token: &Arc<AtomicU64>,
257 required: Option<u64>,
258 ) -> Result<(), ConnectError> {
259 let current = token.load(AtomicOrdering::Relaxed);
260 if let Some(required) = required {
261 if current < required {
262 return Err(ConnectError::aborted(format!(
263 "consistency_not_ready: required={required}, current={current}"
264 )));
265 }
266 }
267 Ok(())
268 }
269
270 fn proto_range_entries_frame(results: Vec<(Key, Vec<u8>)>) -> ProtoRangeFrame {
271 ProtoRangeFrame {
272 results: results
273 .into_iter()
274 .map(|(key, value)| ProtoKvEntry {
275 key: key.to_vec(),
276 value,
277 ..Default::default()
278 })
279 .collect(),
280 ..Default::default()
281 }
282 }
283
284 fn query_detail_trailer_ctx(sequence_number: u64) -> Context {
285 let detail = exoware_sdk::store::query::v1::Detail {
286 sequence_number,
287 read_stats: Default::default(),
288 ..Default::default()
289 };
290 exoware_sdk::with_query_detail_trailer(Context::default(), &detail)
291 }
292
293 #[derive(Clone)]
294 struct MockIngestConnect {
295 state: MockState,
296 }
297
298 impl IngestService for MockIngestConnect {
299 async fn put(
300 &self,
301 ctx: Context,
302 request: buffa::view::OwnedView<
303 exoware_sdk::store::ingest::v1::PutRequestView<'static>,
304 >,
305 ) -> Result<(ProtoPutResponse, Context), ConnectError> {
306 let mut parsed = Vec::<(Key, Bytes)>::new();
307 for kv in request.kvs.iter() {
308 parsed.push((kv.key.to_vec().into(), Bytes::copy_from_slice(kv.value)));
309 }
310 let mut guard = self.state.kv.lock().expect("kv mutex poisoned");
311 for (key, value) in parsed.iter() {
312 guard.insert(key.clone(), value.clone());
313 }
314 let seq = self
315 .state
316 .sequence_number
317 .fetch_add(1, AtomicOrdering::SeqCst)
318 + 1;
319 Ok((
320 ProtoPutResponse {
321 sequence_number: seq,
322 ..Default::default()
323 },
324 ctx,
325 ))
326 }
327 }
328
329 #[derive(Clone)]
330 struct MockQueryConnect {
331 state: MockState,
332 }
333
334 impl QueryService for MockQueryConnect {
335 async fn get(
336 &self,
337 _ctx: Context,
338 request: buffa::view::OwnedView<exoware_sdk::store::query::v1::GetRequestView<'static>>,
339 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
340 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
341 let key: Key = request.key.to_vec().into();
342 let guard = self.state.kv.lock().expect("kv mutex poisoned");
343 let value = guard.get(&key).cloned();
344 let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
345 let detail = exoware_sdk::store::query::v1::Detail {
346 sequence_number: token,
347 read_stats: Default::default(),
348 ..Default::default()
349 };
350 Ok((
351 ProtoGetResponse {
352 value: value.map(|v| v.to_vec()),
353 ..Default::default()
354 },
355 exoware_sdk::with_query_detail_response_header(Context::default(), &detail),
356 ))
357 }
358
359 async fn range(
360 &self,
361 _ctx: Context,
362 request: buffa::view::OwnedView<
363 exoware_sdk::store::query::v1::RangeRequestView<'static>,
364 >,
365 ) -> Result<
366 (
367 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
368 Context,
369 ),
370 ConnectError,
371 > {
372 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
373 self.state.range_calls.fetch_add(1, AtomicOrdering::SeqCst);
374
375 let start_key: Key = request.start.to_vec().into();
376 let end_key: Key = request.end.to_vec().into();
377 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
378 let batch_size = usize::try_from(request.batch_size).unwrap_or(usize::MAX);
379 if batch_size == 0 {
380 return Err(ConnectError::invalid_argument(
381 "invalid batch_size: expected positive integer",
382 ));
383 }
384
385 let mode = match parse_range_traversal_direction(request.mode) {
386 Ok(RangeTraversalDirection::Forward) => RangeMode::Forward,
387 Ok(RangeTraversalDirection::Reverse) => RangeMode::Reverse,
388 Err(RangeTraversalModeError::UnknownWireValue(v)) => {
389 return Err(ConnectError::invalid_argument(format!(
390 "unknown TraversalMode enum value {v}"
391 )));
392 }
393 };
394
395 let state = self.state.clone();
396 let guard = state.kv.lock().expect("kv mutex poisoned");
397 let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
399 Included(&start_key),
400 if end_key.is_empty() {
401 Unbounded
402 } else {
403 Included(&end_key)
404 },
405 );
406 let range_iter = guard.range::<Key, _>(range);
407 let iter: Box<dyn Iterator<Item = (&Key, &Bytes)> + Send> = match mode {
408 RangeMode::Forward => Box::new(range_iter),
409 RangeMode::Reverse => Box::new(range_iter.rev()),
410 };
411 let mut results: Vec<ProtoKvEntry> = Vec::new();
412 for (key, value) in iter.take(limit) {
413 results.push(ProtoKvEntry {
414 key: key.to_vec(),
415 value: value.to_vec(),
416 ..Default::default()
417 });
418 }
419 drop(guard);
420 let token = state.sequence_number.load(AtomicOrdering::Relaxed);
421 let batch = batch_size.max(1);
422 let mut frames: Vec<Result<ProtoRangeFrame, ConnectError>> = Vec::new();
423 for chunk in results.chunks(batch) {
424 frames.push(Ok(ProtoRangeFrame {
425 results: chunk.to_vec(),
426 ..Default::default()
427 }));
428 }
429 let detail = exoware_sdk::store::query::v1::Detail {
430 sequence_number: token,
431 read_stats: Default::default(),
432 ..Default::default()
433 };
434 Ok((
435 Box::pin(stream::iter(frames)),
436 exoware_sdk::with_query_detail_trailer(Context::default(), &detail),
437 ))
438 }
439
440 async fn get_many(
441 &self,
442 _ctx: Context,
443 request: buffa::view::OwnedView<
444 exoware_sdk::store::query::v1::GetManyRequestView<'static>,
445 >,
446 ) -> Result<
447 (
448 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
449 Context,
450 ),
451 ConnectError,
452 > {
453 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
454 let batch_size = usize::try_from(request.batch_size)
455 .unwrap_or(usize::MAX)
456 .max(1);
457 let guard = self.state.kv.lock().expect("kv mutex poisoned");
458 let mut entries: Vec<ProtoGetManyEntry> = Vec::new();
459 for key_bytes in request.keys.iter() {
460 let key: Key = key_bytes.to_vec().into();
461 let value = guard.get(&key).cloned();
462 entries.push(ProtoGetManyEntry {
463 key: key.to_vec(),
464 value: value.map(|v| v.to_vec()),
465 ..Default::default()
466 });
467 }
468 drop(guard);
469 let token = self.state.sequence_number.load(AtomicOrdering::Relaxed);
470 let mut frames: Vec<Result<ProtoGetManyFrame, ConnectError>> = Vec::new();
471 for chunk in entries.chunks(batch_size) {
472 frames.push(Ok(ProtoGetManyFrame {
473 results: chunk.to_vec(),
474 ..Default::default()
475 }));
476 }
477 let detail = exoware_sdk::store::query::v1::Detail {
478 sequence_number: token,
479 read_stats: Default::default(),
480 ..Default::default()
481 };
482 Ok((
483 Box::pin(stream::iter(frames)),
484 exoware_sdk::with_query_detail_trailer(Context::default(), &detail),
485 ))
486 }
487
488 async fn reduce(
489 &self,
490 _ctx: Context,
491 request: buffa::view::OwnedView<
492 exoware_sdk::store::query::v1::ReduceRequestView<'static>,
493 >,
494 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
495 ensure_min_sequence_number(&self.state.sequence_number, request.min_sequence_number)?;
496 self.state
497 .range_reduce_calls
498 .fetch_add(1, AtomicOrdering::SeqCst);
499 let owned = request.to_owned_message();
500 let start_key: Key = owned.start.clone().into();
501 let end_key: Key = owned.end.clone().into();
502 let reduce_req = owned
503 .params
504 .as_option()
505 .ok_or_else(|| ConnectError::invalid_argument("missing range reduce params"))?;
506 let domain_request =
507 to_domain_reduce_request(reduce_req).map_err(ConnectError::invalid_argument)?;
508
509 let state = self.state.clone();
510 let guard = state.kv.lock().expect("kv mutex poisoned");
511 let mut states = domain_request.group_by.is_empty().then(|| {
512 domain_request
513 .reducers
514 .iter()
515 .map(|reducer| PartialAggregateState::from_op(reducer.op))
516 .collect::<Vec<_>>()
517 });
518 let mut grouped = BTreeMap::<Vec<u8>, MockGroupedReduceState>::new();
519
520 let range: (std::ops::Bound<&Key>, std::ops::Bound<&Key>) = (
521 Included(&start_key),
522 if end_key.is_empty() {
523 Unbounded
524 } else {
525 Included(&end_key)
526 },
527 );
528 for (key, value) in guard.range::<Key, _>(range) {
529 let Some((group_values, reducer_values)) =
530 extract_mock_reduce_row(key, value, &domain_request)
531 else {
532 continue;
533 };
534 if domain_request.group_by.is_empty() {
535 let states = states.as_mut().expect("scalar states");
536 for ((state, reducer), value) in states
537 .iter_mut()
538 .zip(domain_request.reducers.iter())
539 .zip(reducer_values)
540 {
541 match reducer.op {
542 RangeReduceOp::CountAll => state
543 .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
544 .map_err(|e| ConnectError::internal(e.to_string()))?,
545 RangeReduceOp::CountField => {
546 let partial =
547 KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
548 state
549 .merge_partial(reducer.op, Some(&partial))
550 .map_err(|e| ConnectError::internal(e.to_string()))?
551 }
552 _ => state
553 .merge_partial(reducer.op, value.as_ref())
554 .map_err(|e| ConnectError::internal(e.to_string()))?,
555 }
556 }
557 } else {
558 let group_key = encode_reduced_group_key(&group_values);
559 let group =
560 grouped
561 .entry(group_key)
562 .or_insert_with(|| MockGroupedReduceState {
563 group_values: group_values.clone(),
564 states: domain_request
565 .reducers
566 .iter()
567 .map(|reducer| PartialAggregateState::from_op(reducer.op))
568 .collect(),
569 });
570 for ((state, reducer), value) in group
571 .states
572 .iter_mut()
573 .zip(domain_request.reducers.iter())
574 .zip(reducer_values)
575 {
576 match reducer.op {
577 RangeReduceOp::CountAll => state
578 .merge_partial(reducer.op, Some(&KvReducedValue::UInt64(1)))
579 .map_err(|e| ConnectError::internal(e.to_string()))?,
580 RangeReduceOp::CountField => {
581 let partial =
582 KvReducedValue::UInt64(if value.is_some() { 1 } else { 0 });
583 state
584 .merge_partial(reducer.op, Some(&partial))
585 .map_err(|e| ConnectError::internal(e.to_string()))?
586 }
587 _ => state
588 .merge_partial(reducer.op, value.as_ref())
589 .map_err(|e| ConnectError::internal(e.to_string()))?,
590 }
591 }
592 }
593 }
594
595 let response = if let Some(states) = states {
596 RangeReduceResponse {
597 results: states
598 .iter()
599 .map(|state| RangeReduceResult {
600 value: match state {
601 PartialAggregateState::Count(count) => {
602 Some(KvReducedValue::UInt64(*count))
603 }
604 PartialAggregateState::Sum(value)
605 | PartialAggregateState::Min(value)
606 | PartialAggregateState::Max(value) => value.clone(),
607 },
608 })
609 .collect(),
610 groups: Vec::new(),
611 }
612 } else {
613 RangeReduceResponse {
614 results: Vec::new(),
615 groups: grouped
616 .into_values()
617 .map(|group| RangeReduceGroup {
618 group_values: group.group_values,
619 results: group
620 .states
621 .into_iter()
622 .map(|state| RangeReduceResult {
623 value: match state {
624 PartialAggregateState::Count(count) => {
625 Some(KvReducedValue::UInt64(count))
626 }
627 PartialAggregateState::Sum(value)
628 | PartialAggregateState::Min(value)
629 | PartialAggregateState::Max(value) => value,
630 },
631 })
632 .collect(),
633 })
634 .collect(),
635 }
636 };
637 drop(guard);
638 let token = state.sequence_number.load(AtomicOrdering::Relaxed);
639 let detail = exoware_sdk::store::query::v1::Detail {
640 sequence_number: token,
641 read_stats: Default::default(),
642 ..Default::default()
643 };
644 Ok((
645 ProtoReduceResponse {
646 results: response
647 .results
648 .into_iter()
649 .map(|result| exoware_sdk::store::query::v1::RangeReduceResult {
650 value: result.value.map(to_proto_reduced_value).into(),
651 ..Default::default()
652 })
653 .collect(),
654 groups: response
655 .groups
656 .into_iter()
657 .map(|group| {
658 let group_values_present: Vec<bool> =
659 group.group_values.iter().map(|v| v.is_some()).collect();
660 exoware_sdk::store::query::v1::RangeReduceGroup {
661 group_values: group
662 .group_values
663 .into_iter()
664 .map(to_proto_optional_reduced_value)
665 .collect(),
666 group_values_present,
667 results: group
668 .results
669 .into_iter()
670 .map(|result| {
671 exoware_sdk::store::query::v1::RangeReduceResult {
672 value: result.value.map(to_proto_reduced_value).into(),
673 ..Default::default()
674 }
675 })
676 .collect(),
677 ..Default::default()
678 }
679 })
680 .collect(),
681 ..Default::default()
682 },
683 exoware_sdk::with_query_detail_response_header(Context::default(), &detail),
684 ))
685 }
686 }
687
688 async fn spawn_mock_server(state: MockState) -> (String, oneshot::Sender<()>) {
689 let connect = ConnectRpcService::new(Chain(
690 IngestServiceServer::new(MockIngestConnect {
691 state: state.clone(),
692 }),
693 QueryServiceServer::new(MockQueryConnect { state }),
694 ))
695 .with_compression(connect_compression_registry());
696 let app = Router::new().fallback_service(connect);
697
698 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
699 .await
700 .expect("bind mock server");
701 let addr = listener.local_addr().expect("local addr");
702 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
703 tokio::spawn(async move {
704 axum::serve(listener, app)
705 .with_graceful_shutdown(async move {
706 let _ = shutdown_rx.await;
707 })
708 .await
709 .expect("mock server should run");
710 });
711 (format!("http://{addr}"), shutdown_tx)
712 }
713
714 fn assert_count_scalar(batch: &RecordBatch, col_idx: usize, row_idx: usize, expected: u64) {
715 let scalar = ScalarValue::try_from_array(batch.column(col_idx), row_idx)
716 .expect("count scalar should decode");
717 match scalar {
718 ScalarValue::UInt64(Some(value)) => assert_eq!(value, expected),
719 ScalarValue::Int64(Some(value)) => assert_eq!(value, expected as i64),
720 other => panic!("unexpected count scalar: {other:?}"),
721 }
722 }
723
724 async fn explain_plan_rows(ctx: &SessionContext, sql: &str) -> Vec<(String, String)> {
725 let batches = ctx
726 .sql(&format!("EXPLAIN {sql}"))
727 .await
728 .expect("explain query")
729 .collect()
730 .await
731 .expect("explain collect");
732 let mut rows = Vec::new();
733 for batch in batches {
734 for row_idx in 0..batch.num_rows() {
735 let plan_type = scalar_to_string(
736 &ScalarValue::try_from_array(batch.column(0), row_idx).expect("plan type"),
737 )
738 .expect("plan type string");
739 let plan = scalar_to_string(
740 &ScalarValue::try_from_array(batch.column(1), row_idx).expect("plan"),
741 )
742 .expect("plan string");
743 rows.push((plan_type, plan));
744 }
745 }
746 rows
747 }
748
749 fn physical_plan_text(rows: &[(String, String)]) -> String {
750 rows.iter()
751 .filter(|(plan_type, _)| plan_type.contains("physical_plan"))
752 .map(|(_, plan)| plan.as_str())
753 .collect::<Vec<_>>()
754 .join("\n")
755 }
756
757 #[tokio::test]
758 async fn explain_reports_full_scan_like_primary_key_scan() {
759 let state = MockState {
760 kv: Arc::new(Mutex::new(BTreeMap::new())),
761 range_calls: Arc::new(AtomicUsize::new(0)),
762 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
763 sequence_number: Arc::new(AtomicU64::new(0)),
764 };
765 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
766 let client = StoreClient::new(&base_url);
767
768 let schema = KvSchema::new(client)
769 .table(
770 "orders",
771 vec![
772 TableColumnConfig::new("id", DataType::Int64, false),
773 TableColumnConfig::new("status", DataType::Utf8, false),
774 TableColumnConfig::new("amount_cents", DataType::Int64, false),
775 ],
776 vec!["id".to_string()],
777 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
778 .expect("valid")
779 .with_cover_columns(vec!["amount_cents".to_string()])],
780 )
781 .expect("schema");
782 let ctx = SessionContext::new();
783 schema.register_all(&ctx).expect("register");
784
785 let explain =
786 physical_plan_text(&explain_plan_rows(&ctx, "SELECT id, status FROM orders").await);
787 assert!(explain.contains("KvScanExec:"));
788 assert!(explain.contains("mode=primary_key"));
789 assert!(explain.contains("predicate=<none>"));
790 assert!(explain.contains("row_recheck=false"));
791 assert!(explain.contains("full_scan_like=true"));
792 assert_explain_includes_query_stats_surface(
793 &explain,
794 QueryStatsExplainSurface::StreamedRangeTrailer,
795 );
796
797 let _ = shutdown_tx.send(());
798 }
799
800 #[tokio::test]
801 async fn explain_reports_secondary_index_scan_and_row_recheck() {
802 let state = MockState {
803 kv: Arc::new(Mutex::new(BTreeMap::new())),
804 range_calls: Arc::new(AtomicUsize::new(0)),
805 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
806 sequence_number: Arc::new(AtomicU64::new(0)),
807 };
808 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
809 let client = StoreClient::new(&base_url);
810
811 let schema = KvSchema::new(client)
812 .table(
813 "orders",
814 vec![
815 TableColumnConfig::new("id", DataType::Int64, false),
816 TableColumnConfig::new("status", DataType::Utf8, false),
817 TableColumnConfig::new("amount_cents", DataType::Int64, false),
818 ],
819 vec!["id".to_string()],
820 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
821 .expect("valid")
822 .with_cover_columns(vec!["amount_cents".to_string()])],
823 )
824 .expect("schema");
825 let ctx = SessionContext::new();
826 schema.register_all(&ctx).expect("register");
827
828 let explain = physical_plan_text(
829 &explain_plan_rows(
830 &ctx,
831 "SELECT id, status, amount_cents FROM orders \
832 WHERE status = 'open' AND amount_cents >= 5",
833 )
834 .await,
835 );
836 assert!(explain.contains("KvScanExec:"));
837 assert!(explain.contains("mode=secondary_index(status_idx, lexicographic)"));
838 assert!(explain.contains("predicate=status = 'open' AND amount_cents >= 5"));
839 assert!(explain.contains("exact=false"));
840 assert!(explain.contains("row_recheck=true"));
841 assert!(explain.contains("full_scan_like=false"));
842
843 let _ = shutdown_tx.send(());
844 }
845
846 #[tokio::test]
847 async fn explain_reports_zorder_secondary_index_scan() {
848 let state = MockState {
849 kv: Arc::new(Mutex::new(BTreeMap::new())),
850 range_calls: Arc::new(AtomicUsize::new(0)),
851 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
852 sequence_number: Arc::new(AtomicU64::new(0)),
853 };
854 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
855 let client = StoreClient::new(&base_url);
856
857 let schema = KvSchema::new(client)
858 .table(
859 "points",
860 vec![
861 TableColumnConfig::new("x", DataType::Int64, false),
862 TableColumnConfig::new("y", DataType::Int64, false),
863 TableColumnConfig::new("id", DataType::Int64, false),
864 TableColumnConfig::new("value", DataType::Int64, false),
865 ],
866 vec!["id".to_string()],
867 vec![
868 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
869 .expect("valid")
870 .with_cover_columns(vec!["value".to_string()]),
871 ],
872 )
873 .expect("schema");
874 let ctx = SessionContext::new();
875 schema.register_all(&ctx).expect("register");
876
877 let explain = physical_plan_text(
878 &explain_plan_rows(
879 &ctx,
880 "SELECT id, value FROM points \
881 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
882 )
883 .await,
884 );
885 assert!(explain.contains("KvScanExec:"));
886 assert!(explain.contains("mode=secondary_index(xy_z, z_order)"));
887 assert!(explain.contains("exact=false"));
888 assert!(explain.contains("row_recheck=true"));
889
890 let _ = shutdown_tx.send(());
891 }
892
893 #[tokio::test]
894 async fn explain_reports_aggregate_pushdown_access_path_details() {
895 let state = MockState {
896 kv: Arc::new(Mutex::new(BTreeMap::new())),
897 range_calls: Arc::new(AtomicUsize::new(0)),
898 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
899 sequence_number: Arc::new(AtomicU64::new(0)),
900 };
901 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
902 let client = StoreClient::new(&base_url);
903
904 let schema = KvSchema::new(client)
905 .table(
906 "orders",
907 vec![
908 TableColumnConfig::new("id", DataType::Int64, false),
909 TableColumnConfig::new("status", DataType::Utf8, false),
910 TableColumnConfig::new("amount_cents", DataType::Int64, false),
911 ],
912 vec!["id".to_string()],
913 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
914 .expect("valid")
915 .with_cover_columns(vec!["amount_cents".to_string()])],
916 )
917 .expect("schema");
918 let ctx = SessionContext::new();
919 schema.register_all(&ctx).expect("register");
920
921 let explain = physical_plan_text(
922 &explain_plan_rows(
923 &ctx,
924 "SELECT status, SUM(amount_cents) AS total_cents \
925 FROM orders WHERE status = 'open' GROUP BY status",
926 )
927 .await,
928 );
929 assert!(explain.contains("KvAggregateExec:"));
930 assert!(explain.contains("grouped=true"));
931 assert!(explain.contains("job0{mode=secondary_index(status_idx, lexicographic)"));
932 assert!(explain.contains("predicate=status = 'open'"));
933 assert!(explain.contains("exact=true"));
934 assert!(explain.contains("row_recheck=false"));
935 assert_explain_includes_query_stats_surface(
936 &explain,
937 QueryStatsExplainSurface::RangeReduceHeader,
938 );
939
940 let _ = shutdown_tx.send(());
941 }
942
943 #[test]
944 fn index_spec_constructor_sets_name_and_keys() {
945 let spec = IndexSpec::new(
946 "status_customer",
947 vec!["status".to_string(), "customer_id".to_string()],
948 )
949 .expect("valid index spec");
950 assert_eq!(spec.name(), "status_customer");
951 assert_eq!(spec.key_columns(), &["status", "customer_id"]);
952 assert!(spec.cover_columns().is_empty());
953 }
954
955 #[test]
956 fn index_spec_cover_columns_are_configurable_in_code() {
957 let spec = IndexSpec::new("status_customer", vec!["status".to_string()])
958 .expect("valid")
959 .with_cover_columns(vec!["amount_cents".to_string()]);
960 assert_eq!(spec.key_columns(), &["status"]);
961 assert_eq!(spec.cover_columns(), &["amount_cents"]);
962 }
963
964 #[test]
965 fn describe_in_list_places_truncation_ellipsis_inside_parentheses() {
966 let rendered = describe_in_list((1..=6).map(|v| v.to_string()));
967 assert_eq!(rendered, "IN (1, 2, 3, 4, 5, ...)");
968 }
969
970 #[test]
971 fn normalize_sum_case_then_one_uses_countall_optimization() {
972 let (model, _) = test_model();
973 let argument = normalize_case_then_expr(
974 AggregatePushdownFunction::Sum,
975 &Expr::Literal(ScalarValue::Int64(Some(1)), None),
976 &model,
977 )
978 .expect("normalize");
979 assert_eq!(argument, AggregatePushdownArgument::CountAll);
980 }
981
982 #[test]
983 fn normalize_count_case_then_literal_uses_countall_optimization() {
984 use datafusion::logical_expr::col;
985
986 let (model, _) = test_model();
987 let case_expr = Expr::Case(datafusion::logical_expr::expr::Case {
988 expr: None,
989 when_then_expr: vec![(
990 Box::new(col("status").eq(Expr::Literal(
991 ScalarValue::Utf8(Some("open".to_string())),
992 None,
993 ))),
994 Box::new(Expr::Literal(
995 ScalarValue::Utf8(Some("yes".to_string())),
996 None,
997 )),
998 )],
999 else_expr: Some(Box::new(Expr::Literal(ScalarValue::Utf8(None), None))),
1000 });
1001
1002 let (func, argument, filter) =
1003 normalize_count_aggregate_argument(&case_expr, &model).expect("normalize");
1004 assert_eq!(func, AggregatePushdownFunction::Count);
1005 assert_eq!(argument, AggregatePushdownArgument::CountAll);
1006 assert!(filter.is_some());
1007 }
1008
1009 #[test]
1010 fn reduced_value_to_scalar_preserves_timestamp_timezone_label() {
1011 let tz: Arc<str> = Arc::from("America/New_York");
1012 let scalar = reduced_value_to_scalar(
1013 Some(KvReducedValue::Timestamp(1_700_000_000_000_000)),
1014 &DataType::Timestamp(TimeUnit::Microsecond, Some(tz.clone())),
1015 )
1016 .expect("timestamp scalar");
1017 assert_eq!(
1018 scalar,
1019 ScalarValue::TimestampMicrosecond(Some(1_700_000_000_000_000), Some(tz))
1020 );
1021 }
1022
1023 #[test]
1024 fn index_spec_cover_pk_column_is_rejected() {
1025 let config = KvTableConfig::new(
1026 0,
1027 vec![
1028 TableColumnConfig::new("id", DataType::Int64, false),
1029 TableColumnConfig::new("status", DataType::Utf8, false),
1030 ],
1031 vec!["id".to_string()],
1032 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
1033 .expect("valid")
1034 .with_cover_columns(vec!["id".to_string()])],
1035 )
1036 .expect("valid config");
1037 let model = TableModel::from_config(&config).expect("model");
1038 let err = model
1039 .resolve_index_specs(&config.index_specs)
1040 .expect_err("covering a PK column must be rejected");
1041 assert!(err.contains("primary key column"));
1042 }
1043
1044 #[test]
1045 fn access_plan_requires_cover_columns_for_index_scan() {
1046 let (model, _) = test_model();
1047 let predicate = QueryPredicate::default();
1048 let projection = Some(vec![
1049 *model.columns_by_name.get("order_id").unwrap(),
1050 *model.columns_by_name.get("amount_cents").unwrap(),
1051 ]);
1052 let plan = ScanAccessPlan::new(&model, &projection, &predicate);
1053
1054 let no_cover = IndexSpec::new("status_idx", vec!["status".to_string()]).unwrap();
1055 let with_cover = IndexSpec::new("status_idx", vec!["status".to_string()])
1056 .unwrap()
1057 .with_cover_columns(vec!["amount_cents".to_string()]);
1058 let no_cover_resolved = model.resolve_index_specs(&[no_cover]).unwrap();
1059 let with_cover_resolved = model.resolve_index_specs(&[with_cover]).unwrap();
1060
1061 assert!(!plan.index_covers_required_non_pk(&no_cover_resolved[0]));
1062 assert!(plan.index_covers_required_non_pk(&with_cover_resolved[0]));
1063 }
1064
1065 #[test]
1066 fn choose_index_plan_prefers_longer_prefix() {
1067 let (model, specs) = test_model();
1068 let region_idx = *model.columns_by_name.get("region").unwrap();
1069 let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1070 let mut predicate = QueryPredicate::default();
1071 predicate.constraints.insert(
1072 region_idx,
1073 PredicateConstraint::StringEq("us-east".to_string()),
1074 );
1075 predicate.constraints.insert(
1076 customer_idx,
1077 PredicateConstraint::IntRange {
1078 min: Some(10),
1079 max: Some(20),
1080 },
1081 );
1082 let plan = predicate
1083 .choose_index_plan(&model, &specs)
1084 .expect("plan")
1085 .expect("exists");
1086 assert_eq!(plan.spec_idx, 0);
1087 assert_eq!(plan.constrained_prefix_len, 2);
1088 }
1089
1090 #[test]
1091 fn choose_index_plan_prefers_covering_index_when_prefix_strength_ties() {
1092 let config = KvTableConfig::new(
1093 0,
1094 vec![
1095 TableColumnConfig::new("id", DataType::Int64, false),
1096 TableColumnConfig::new("status", DataType::Utf8, false),
1097 TableColumnConfig::new("amount_cents", DataType::Int64, false),
1098 ],
1099 vec!["id".to_string()],
1100 vec![
1101 IndexSpec::new("status_plain", vec!["status".to_string()]).expect("valid"),
1102 IndexSpec::new("status_covering", vec!["status".to_string()])
1103 .expect("valid")
1104 .with_cover_columns(vec!["amount_cents".to_string()]),
1105 ],
1106 )
1107 .expect("config");
1108 let model = TableModel::from_config(&config).expect("model");
1109 let specs = model
1110 .resolve_index_specs(&config.index_specs)
1111 .expect("specs");
1112 let status_idx = *model.columns_by_name.get("status").unwrap();
1113 let amount_idx = *model.columns_by_name.get("amount_cents").unwrap();
1114 let mut predicate = QueryPredicate::default();
1115 predicate.constraints.insert(
1116 status_idx,
1117 PredicateConstraint::StringEq("open".to_string()),
1118 );
1119 predicate.constraints.insert(
1120 amount_idx,
1121 PredicateConstraint::IntRange {
1122 min: Some(10),
1123 max: None,
1124 },
1125 );
1126
1127 let plan = predicate
1128 .choose_index_plan(&model, &specs)
1129 .expect("plan")
1130 .expect("exists");
1131 assert_eq!(specs[plan.spec_idx].name, "status_covering");
1132 }
1133
1134 #[test]
1135 fn choose_index_plan_prefers_zorder_for_multi_column_box_constraints() {
1136 let (model, specs) = zorder_test_model();
1137 let x_idx = *model.columns_by_name.get("x").unwrap();
1138 let y_idx = *model.columns_by_name.get("y").unwrap();
1139 let mut predicate = QueryPredicate::default();
1140 predicate.constraints.insert(
1141 x_idx,
1142 PredicateConstraint::IntRange {
1143 min: Some(1),
1144 max: Some(2),
1145 },
1146 );
1147 predicate.constraints.insert(
1148 y_idx,
1149 PredicateConstraint::IntRange {
1150 min: Some(1),
1151 max: Some(2),
1152 },
1153 );
1154
1155 let plan = predicate
1156 .choose_index_plan(&model, &specs)
1157 .expect("plan")
1158 .expect("exists");
1159 assert_eq!(specs[plan.spec_idx].name, "xy_z");
1160 assert_eq!(specs[plan.spec_idx].layout, IndexLayout::ZOrder);
1161 assert_eq!(plan.constrained_column_count, 2);
1162 }
1163
1164 #[test]
1165 fn secondary_index_key_round_trip() {
1166 let (model, specs) = test_model();
1167 let row = KvRow {
1168 values: vec![
1169 CellValue::Utf8("us-east".to_string()),
1170 CellValue::Int64(42),
1171 CellValue::Int64(9001),
1172 CellValue::Int64(1500),
1173 CellValue::Utf8("open".to_string()),
1174 ],
1175 };
1176 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1177 .expect("encode");
1178 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1179 .expect("decode");
1180 let region_idx = *model.columns_by_name.get("region").unwrap();
1181 let customer_idx = *model.columns_by_name.get("customer_id").unwrap();
1182 assert!(matches!(
1183 decoded.values.get(®ion_idx),
1184 Some(CellValue::Utf8(v)) if v == "us-east"
1185 ));
1186 assert!(matches!(
1187 decoded.values.get(&customer_idx),
1188 Some(CellValue::Int64(v)) if *v == 42
1189 ));
1190 assert!(matches!(
1191 &decoded.primary_key_values[0],
1192 CellValue::Int64(9001)
1193 ));
1194 let expected_pk = encode_primary_key_from_row(model.table_prefix, &row, &model)
1195 .expect("primary key should encode");
1196 assert_eq!(decoded.primary_key, expected_pk);
1197 }
1198
1199 #[test]
1200 fn zorder_secondary_index_key_round_trip() {
1201 let (model, specs) = zorder_test_model();
1202 let row = KvRow {
1203 values: vec![
1204 CellValue::Int64(2),
1205 CellValue::Int64(1),
1206 CellValue::Int64(42),
1207 CellValue::Int64(900),
1208 ],
1209 };
1210 let key = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1211 .expect("encode");
1212 let decoded = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key)
1213 .expect("decode");
1214 let x_idx = *model.columns_by_name.get("x").unwrap();
1215 let y_idx = *model.columns_by_name.get("y").unwrap();
1216 assert!(matches!(
1217 decoded.values.get(&x_idx),
1218 Some(CellValue::Int64(v)) if *v == 2
1219 ));
1220 assert!(matches!(
1221 decoded.values.get(&y_idx),
1222 Some(CellValue::Int64(v)) if *v == 1
1223 ));
1224 assert!(matches!(
1225 &decoded.primary_key_values[0],
1226 CellValue::Int64(42)
1227 ));
1228 }
1229
1230 #[test]
1231 fn table_config_supports_non_orders_schema() {
1232 let config = KvTableConfig::new(
1233 0,
1234 vec![
1235 TableColumnConfig::new("tenant", DataType::Utf8, false),
1236 TableColumnConfig::new("id", DataType::Int64, false),
1237 TableColumnConfig::new("score", DataType::Int64, false),
1238 ],
1239 vec!["id".to_string()],
1240 vec![IndexSpec::new(
1241 "tenant_score",
1242 vec!["tenant".to_string(), "score".to_string()],
1243 )
1244 .expect("valid")],
1245 )
1246 .expect("schema agnostic config should be valid");
1247 assert_eq!(config.primary_key_columns, vec!["id".to_string()]);
1248 assert_eq!(config.columns.len(), 3);
1249 }
1250
1251 #[test]
1252 fn table_config_accepts_float64_column() {
1253 let config = KvTableConfig::new(
1254 0,
1255 vec![
1256 TableColumnConfig::new("id", DataType::Int64, false),
1257 TableColumnConfig::new("price", DataType::Float64, false),
1258 ],
1259 vec!["id".to_string()],
1260 vec![],
1261 )
1262 .expect("Float64 column should be accepted");
1263 assert_eq!(config.columns.len(), 2);
1264 }
1265
1266 #[test]
1267 fn table_config_accepts_boolean_column() {
1268 let config = KvTableConfig::new(
1269 0,
1270 vec![
1271 TableColumnConfig::new("id", DataType::Int64, false),
1272 TableColumnConfig::new("active", DataType::Boolean, false),
1273 ],
1274 vec!["id".to_string()],
1275 vec![],
1276 )
1277 .expect("Boolean column should be accepted");
1278 assert_eq!(config.columns.len(), 2);
1279 }
1280
1281 #[test]
1282 fn build_projected_batch_uses_large_utf8_type() {
1283 let config = KvTableConfig::new(
1284 0,
1285 vec![
1286 TableColumnConfig::new("id", DataType::Int64, false),
1287 TableColumnConfig::new("name", DataType::LargeUtf8, false),
1288 ],
1289 vec!["id".to_string()],
1290 vec![],
1291 )
1292 .unwrap();
1293 let model = TableModel::from_config(&config).unwrap();
1294 let rows = vec![KvRow {
1295 values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1296 }];
1297 let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1298 assert_eq!(batch.column(1).data_type(), &DataType::LargeUtf8);
1299 let values = batch
1300 .column(1)
1301 .as_any()
1302 .downcast_ref::<LargeStringArray>()
1303 .expect("must build LargeStringArray");
1304 assert_eq!(values.value(0), "hello");
1305 }
1306
1307 #[test]
1308 fn build_projected_batch_uses_utf8_view_type() {
1309 let config = KvTableConfig::new(
1310 0,
1311 vec![
1312 TableColumnConfig::new("id", DataType::Int64, false),
1313 TableColumnConfig::new("name", DataType::Utf8View, false),
1314 ],
1315 vec!["id".to_string()],
1316 vec![],
1317 )
1318 .unwrap();
1319 let model = TableModel::from_config(&config).unwrap();
1320 let rows = vec![KvRow {
1321 values: vec![CellValue::Int64(1), CellValue::Utf8("hello".to_string())],
1322 }];
1323 let batch = build_projected_batch(&rows, &model, &model.schema, &None).unwrap();
1324 assert_eq!(batch.column(1).data_type(), &DataType::Utf8View);
1325 let values = batch
1326 .column(1)
1327 .as_any()
1328 .downcast_ref::<StringViewArray>()
1329 .expect("must build StringViewArray");
1330 assert_eq!(values.value(0), "hello");
1331 }
1332
1333 #[test]
1334 fn f64_ordered_encoding_preserves_order() {
1335 let values = [
1336 f64::NEG_INFINITY,
1337 f64::MIN,
1338 -1000.0,
1339 -1.0,
1340 -0.001,
1341 0.0,
1342 0.001,
1343 1.0,
1344 1000.0,
1345 f64::MAX,
1346 f64::INFINITY,
1347 ];
1348 let encoded: Vec<[u8; 8]> = values.iter().map(|v| encode_f64_ordered(*v)).collect();
1349 for i in 0..encoded.len() - 1 {
1350 assert!(
1351 encoded[i] < encoded[i + 1],
1352 "encode_f64_ordered({}) >= encode_f64_ordered({})",
1353 values[i],
1354 values[i + 1]
1355 );
1356 }
1357 }
1358
1359 #[test]
1360 fn f64_ordered_encoding_round_trip() {
1361 let values = [
1362 f64::MIN,
1363 -42.5,
1364 -0.0,
1365 0.0,
1366 3.125,
1367 f64::MAX,
1368 f64::INFINITY,
1369 f64::NEG_INFINITY,
1370 ];
1371 for v in values {
1372 let encoded = encode_f64_ordered(v);
1373 let decoded = decode_f64_ordered(encoded);
1374 assert!(
1375 v.to_bits() == decoded.to_bits(),
1376 "round-trip failed for {v}: got {decoded}"
1377 );
1378 }
1379 }
1380
1381 fn mixed_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1382 let config = KvTableConfig::new(
1383 0,
1384 vec![
1385 TableColumnConfig::new("id", DataType::Int64, false),
1386 TableColumnConfig::new("label", DataType::Utf8, false),
1387 TableColumnConfig::new("score", DataType::Float64, false),
1388 TableColumnConfig::new("active", DataType::Boolean, false),
1389 ],
1390 vec!["id".to_string()],
1391 vec![
1392 IndexSpec::new(
1393 "active_score",
1394 vec!["active".to_string(), "score".to_string()],
1395 )
1396 .expect("valid"),
1397 IndexSpec::new("label_idx", vec!["label".to_string()]).expect("valid"),
1398 ],
1399 )
1400 .expect("valid config");
1401 let model = TableModel::from_config(&config).expect("model");
1402 let specs = model
1403 .resolve_index_specs(&config.index_specs)
1404 .expect("specs");
1405 (model, specs)
1406 }
1407
1408 #[test]
1409 fn secondary_index_key_round_trip_with_float64_and_boolean() {
1410 let (model, specs) = mixed_model();
1411 let row = KvRow {
1412 values: vec![
1413 CellValue::Int64(100),
1414 CellValue::Utf8("hello".to_string()),
1415 CellValue::Float64(3.125),
1416 CellValue::Boolean(true),
1417 ],
1418 };
1419 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1420 .expect("encode");
1421 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1422 .expect("decode");
1423 let active_idx = *model.columns_by_name.get("active").unwrap();
1424 let score_idx = *model.columns_by_name.get("score").unwrap();
1425 assert!(matches!(
1426 decoded.values.get(&active_idx),
1427 Some(CellValue::Boolean(true))
1428 ));
1429 assert!(
1430 matches!(decoded.values.get(&score_idx), Some(CellValue::Float64(v)) if (*v - 3.125).abs() < f64::EPSILON)
1431 );
1432 assert!(matches!(
1433 &decoded.primary_key_values[0],
1434 CellValue::Int64(100)
1435 ));
1436 }
1437
1438 #[test]
1439 fn base_row_round_trip_with_float64_and_boolean() {
1440 let (model, _specs) = mixed_model();
1441 let row = KvRow {
1442 values: vec![
1443 CellValue::Int64(42),
1444 CellValue::Utf8("world".to_string()),
1445 CellValue::Float64(-99.5),
1446 CellValue::Boolean(false),
1447 ],
1448 };
1449 let encoded = encode_base_row_value(&row, &model).expect("encode");
1450 let decoded =
1451 decode_base_row(vec![CellValue::Int64(42)], &encoded, &model).expect("decode");
1452 assert!(matches!(&decoded.values[0], CellValue::Int64(42)));
1453 assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "world"));
1454 assert!(
1455 matches!(&decoded.values[2], CellValue::Float64(v) if (*v - (-99.5)).abs() < f64::EPSILON)
1456 );
1457 assert!(matches!(&decoded.values[3], CellValue::Boolean(false)));
1458 }
1459
1460 #[test]
1461 fn predicate_bool_eq_matches() {
1462 let (model, _specs) = mixed_model();
1463 let active_idx = *model.columns_by_name.get("active").unwrap();
1464 let mut pred = QueryPredicate::default();
1465 pred.constraints
1466 .insert(active_idx, PredicateConstraint::BoolEq(true));
1467 let row_true = KvRow {
1468 values: vec![
1469 CellValue::Int64(1),
1470 CellValue::Utf8("a".to_string()),
1471 CellValue::Float64(1.0),
1472 CellValue::Boolean(true),
1473 ],
1474 };
1475 let row_false = KvRow {
1476 values: vec![
1477 CellValue::Int64(2),
1478 CellValue::Utf8("b".to_string()),
1479 CellValue::Float64(2.0),
1480 CellValue::Boolean(false),
1481 ],
1482 };
1483 assert!(pred.matches_row(&row_true));
1484 assert!(!pred.matches_row(&row_false));
1485 }
1486
1487 #[test]
1488 fn predicate_float_range_matches() {
1489 let (model, _specs) = mixed_model();
1490 let score_idx = *model.columns_by_name.get("score").unwrap();
1491 let mut pred = QueryPredicate::default();
1492 pred.constraints.insert(
1493 score_idx,
1494 PredicateConstraint::FloatRange {
1495 min: Some((2.0, true)),
1496 max: Some((5.0, false)),
1497 },
1498 );
1499 let make_row = |score: f64| KvRow {
1500 values: vec![
1501 CellValue::Int64(1),
1502 CellValue::Utf8("a".to_string()),
1503 CellValue::Float64(score),
1504 CellValue::Boolean(true),
1505 ],
1506 };
1507 assert!(!pred.matches_row(&make_row(1.99)));
1508 assert!(pred.matches_row(&make_row(2.0)));
1509 assert!(pred.matches_row(&make_row(3.5)));
1510 assert!(pred.matches_row(&make_row(4.99)));
1511 assert!(!pred.matches_row(&make_row(5.0)));
1512 assert!(!pred.matches_row(&make_row(5.01)));
1513 }
1514
1515 #[test]
1516 fn float_range_rejects_nan_row_value() {
1517 let constraint = PredicateConstraint::FloatRange {
1518 min: Some((0.0, true)),
1519 max: Some((10.0, true)),
1520 };
1521 assert!(!matches_constraint(
1522 &CellValue::Float64(f64::NAN),
1523 &constraint
1524 ));
1525 }
1526
1527 #[test]
1528 fn index_plan_with_boolean_prefix() {
1529 let (model, specs) = mixed_model();
1530 let active_idx = *model.columns_by_name.get("active").unwrap();
1531 let mut pred = QueryPredicate::default();
1532 pred.constraints
1533 .insert(active_idx, PredicateConstraint::BoolEq(true));
1534 let plan = pred
1535 .choose_index_plan(&model, &specs)
1536 .expect("plan")
1537 .expect("should find index");
1538 assert_eq!(plan.spec_idx, 0);
1539 assert_eq!(plan.constrained_prefix_len, 1);
1540 }
1541
1542 #[test]
1543 fn float_constraint_contradiction() {
1544 let mut lo: Option<(f64, bool)> = None;
1545 let mut hi: Option<(f64, bool)> = None;
1546 let mut contradiction = false;
1547 apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 10.0, &mut contradiction);
1548 assert!(!contradiction);
1549 apply_float_constraint(&mut lo, &mut hi, Operator::Lt, 5.0, &mut contradiction);
1550 assert!(contradiction);
1551 }
1552
1553 #[test]
1554 fn float_constraint_eq_then_range_contradicts() {
1555 let mut lo: Option<(f64, bool)> = None;
1556 let mut hi: Option<(f64, bool)> = None;
1557 let mut contradiction = false;
1558 apply_float_constraint(&mut lo, &mut hi, Operator::Eq, 5.0, &mut contradiction);
1559 assert!(!contradiction);
1560 apply_float_constraint(&mut lo, &mut hi, Operator::Gt, 5.0, &mut contradiction);
1561 assert!(contradiction);
1562 }
1563
1564 #[test]
1565 fn float_nan_literal_comparison_marks_contradiction() {
1566 let config = KvTableConfig::new(
1567 0,
1568 vec![
1569 TableColumnConfig::new("id", DataType::Int64, false),
1570 TableColumnConfig::new("score", DataType::Float64, false),
1571 ],
1572 vec!["id".to_string()],
1573 vec![],
1574 )
1575 .unwrap();
1576 let model = TableModel::from_config(&config).unwrap();
1577
1578 use datafusion::logical_expr::col;
1579 let filter = col("score").gt(Expr::Literal(ScalarValue::Float64(Some(f64::NAN)), None));
1580 assert!(QueryPredicate::supports_filter(&filter, &model));
1581
1582 let pred = QueryPredicate::from_filters(&[filter], &model);
1583 assert!(
1584 pred.contradiction,
1585 "comparison with NaN literal must produce contradiction"
1586 );
1587 }
1588
1589 #[test]
1590 fn table_config_accepts_date32_column() {
1591 let config = KvTableConfig::new(
1592 0,
1593 vec![
1594 TableColumnConfig::new("id", DataType::Int64, false),
1595 TableColumnConfig::new("created", DataType::Date32, false),
1596 ],
1597 vec!["id".to_string()],
1598 vec![],
1599 )
1600 .expect("Date32 column should be accepted");
1601 assert_eq!(config.columns.len(), 2);
1602 }
1603
1604 #[test]
1605 fn table_config_accepts_timestamp_column() {
1606 let config = KvTableConfig::new(
1607 0,
1608 vec![
1609 TableColumnConfig::new("id", DataType::Int64, false),
1610 TableColumnConfig::new(
1611 "ts",
1612 DataType::Timestamp(TimeUnit::Microsecond, None),
1613 false,
1614 ),
1615 ],
1616 vec!["id".to_string()],
1617 vec![],
1618 )
1619 .expect("Timestamp column should be accepted");
1620 let schema = config.to_schema();
1621 assert!(matches!(
1622 schema.field(1).data_type(),
1623 DataType::Timestamp(TimeUnit::Microsecond, _)
1624 ));
1625 }
1626
1627 #[test]
1628 fn table_config_normalizes_timestamp_to_microsecond() {
1629 let config = KvTableConfig::new(
1630 0,
1631 vec![
1632 TableColumnConfig::new("id", DataType::Int64, false),
1633 TableColumnConfig::new(
1634 "ts",
1635 DataType::Timestamp(TimeUnit::Nanosecond, None),
1636 false,
1637 ),
1638 ],
1639 vec!["id".to_string()],
1640 vec![],
1641 )
1642 .expect("Nanosecond timestamp should be accepted");
1643 let schema = config.to_schema();
1644 assert!(matches!(
1645 schema.field(1).data_type(),
1646 DataType::Timestamp(TimeUnit::Microsecond, _)
1647 ));
1648 }
1649
1650 #[test]
1651 fn table_config_accepts_decimal128_column() {
1652 let config = KvTableConfig::new(
1653 0,
1654 vec![
1655 TableColumnConfig::new("id", DataType::Int64, false),
1656 TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1657 ],
1658 vec!["id".to_string()],
1659 vec![],
1660 )
1661 .expect("Decimal128 column should be accepted");
1662 assert_eq!(config.columns.len(), 2);
1663 }
1664
1665 #[test]
1666 fn table_config_accepts_list_column() {
1667 use datafusion::arrow::datatypes::Field;
1668
1669 let config = KvTableConfig::new(
1670 0,
1671 vec![
1672 TableColumnConfig::new("id", DataType::Int64, false),
1673 TableColumnConfig::new(
1674 "tags",
1675 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1676 false,
1677 ),
1678 ],
1679 vec!["id".to_string()],
1680 vec![],
1681 )
1682 .expect("List<Utf8> column should be accepted");
1683 assert_eq!(config.columns.len(), 2);
1684 }
1685
1686 #[test]
1687 fn list_column_rejected_in_index() {
1688 use datafusion::arrow::datatypes::Field;
1689
1690 let result = KvTableConfig::new(
1691 0,
1692 vec![
1693 TableColumnConfig::new("id", DataType::Int64, false),
1694 TableColumnConfig::new(
1695 "tags",
1696 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1697 false,
1698 ),
1699 ],
1700 vec!["id".to_string()],
1701 vec![IndexSpec::new("tags_idx", vec!["tags".to_string()]).unwrap()],
1702 );
1703 assert!(
1704 result.is_err() || {
1705 let config = result.unwrap();
1706 let model = TableModel::from_config(&config).unwrap();
1707 model.resolve_index_specs(&config.index_specs).is_err()
1708 }
1709 );
1710 }
1711
1712 #[test]
1713 fn i32_ordered_encoding_round_trip() {
1714 let values = [i32::MIN, -1000, -1, 0, 1, 1000, i32::MAX];
1715 for v in values {
1716 assert_eq!(decode_i32_ordered(encode_i32_ordered(v)), v);
1717 }
1718 let encoded: Vec<[u8; 4]> = values.iter().map(|v| encode_i32_ordered(*v)).collect();
1719 for i in 0..encoded.len() - 1 {
1720 assert!(encoded[i] < encoded[i + 1]);
1721 }
1722 }
1723
1724 #[test]
1725 fn i128_ordered_encoding_round_trip() {
1726 let values = [i128::MIN, -1, 0, 1, 1234567890123456789, i128::MAX];
1727 for v in values {
1728 assert_eq!(decode_i128_ordered(encode_i128_ordered(v)), v);
1729 }
1730 let encoded: Vec<[u8; 16]> = values.iter().map(|v| encode_i128_ordered(*v)).collect();
1731 for i in 0..encoded.len() - 1 {
1732 assert!(encoded[i] < encoded[i + 1]);
1733 }
1734 }
1735
1736 fn extended_model() -> (TableModel, Vec<ResolvedIndexSpec>) {
1737 let config = KvTableConfig::new(
1738 0,
1739 vec![
1740 TableColumnConfig::new("id", DataType::Int64, false),
1741 TableColumnConfig::new("created", DataType::Date32, false),
1742 TableColumnConfig::new(
1743 "ts",
1744 DataType::Timestamp(TimeUnit::Microsecond, None),
1745 false,
1746 ),
1747 TableColumnConfig::new("price", DataType::Decimal128(10, 2), false),
1748 TableColumnConfig::new("label", DataType::Utf8, false),
1749 ],
1750 vec!["id".to_string()],
1751 vec![
1752 IndexSpec::new(
1753 "date_label",
1754 vec!["created".to_string(), "label".to_string()],
1755 )
1756 .expect("valid"),
1757 IndexSpec::new("price_idx", vec!["price".to_string()]).expect("valid"),
1758 ],
1759 )
1760 .expect("valid config");
1761 let model = TableModel::from_config(&config).expect("model");
1762 let specs = model
1763 .resolve_index_specs(&config.index_specs)
1764 .expect("specs");
1765 (model, specs)
1766 }
1767
1768 #[test]
1769 fn secondary_index_key_round_trip_date32_and_decimal128() {
1770 let (model, specs) = extended_model();
1771 let row = KvRow {
1772 values: vec![
1773 CellValue::Int64(42),
1774 CellValue::Date32(19000),
1775 CellValue::Timestamp(1_700_000_000_000_000),
1776 CellValue::Decimal128(123456),
1777 CellValue::Utf8("hello".to_string()),
1778 ],
1779 };
1780 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
1781 .expect("encode");
1782 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
1783 .expect("decode");
1784 let created_idx = *model.columns_by_name.get("created").unwrap();
1785 let label_idx = *model.columns_by_name.get("label").unwrap();
1786 assert!(matches!(
1787 decoded.values.get(&created_idx),
1788 Some(CellValue::Date32(19000))
1789 ));
1790 assert!(matches!(
1791 decoded.values.get(&label_idx),
1792 Some(CellValue::Utf8(v)) if v == "hello"
1793 ));
1794 assert!(matches!(
1795 &decoded.primary_key_values[0],
1796 CellValue::Int64(42)
1797 ));
1798
1799 let key2 = encode_secondary_index_key(model.table_prefix, &specs[1], &model, &row)
1800 .expect("encode");
1801 let decoded2 = decode_secondary_index_key(model.table_prefix, &specs[1], &model, &key2)
1802 .expect("decode");
1803 let price_idx = *model.columns_by_name.get("price").unwrap();
1804 assert!(matches!(
1805 decoded2.values.get(&price_idx),
1806 Some(CellValue::Decimal128(123456))
1807 ));
1808 assert!(matches!(
1809 &decoded2.primary_key_values[0],
1810 CellValue::Int64(42)
1811 ));
1812 }
1813
1814 #[test]
1815 fn base_row_round_trip_with_date32_timestamp_decimal128() {
1816 let (model, _specs) = extended_model();
1817 let row = KvRow {
1818 values: vec![
1819 CellValue::Int64(7),
1820 CellValue::Date32(19500),
1821 CellValue::Timestamp(1_700_000_000_000_000),
1822 CellValue::Decimal128(-9876543),
1823 CellValue::Utf8("world".to_string()),
1824 ],
1825 };
1826 let encoded = encode_base_row_value(&row, &model).expect("encode");
1827 let decoded = decode_base_row(vec![CellValue::Int64(7)], &encoded, &model).expect("decode");
1828 assert!(matches!(&decoded.values[0], CellValue::Int64(7)));
1829 assert!(matches!(&decoded.values[1], CellValue::Date32(19500)));
1830 assert!(matches!(
1831 &decoded.values[2],
1832 CellValue::Timestamp(1_700_000_000_000_000)
1833 ));
1834 assert!(matches!(
1835 &decoded.values[3],
1836 CellValue::Decimal128(-9876543)
1837 ));
1838 assert!(matches!(&decoded.values[4], CellValue::Utf8(v) if v == "world"));
1839 }
1840
1841 #[test]
1842 fn base_row_round_trip_with_list() {
1843 use datafusion::arrow::datatypes::Field;
1844
1845 let config = KvTableConfig::new(
1846 0,
1847 vec![
1848 TableColumnConfig::new("id", DataType::Int64, false),
1849 TableColumnConfig::new(
1850 "tags",
1851 DataType::List(Arc::new(Field::new("item", DataType::Utf8, false))),
1852 false,
1853 ),
1854 TableColumnConfig::new(
1855 "scores",
1856 DataType::List(Arc::new(Field::new("item", DataType::Int64, false))),
1857 false,
1858 ),
1859 ],
1860 vec!["id".to_string()],
1861 vec![],
1862 )
1863 .expect("valid");
1864 let model = TableModel::from_config(&config).expect("model");
1865 let row = KvRow {
1866 values: vec![
1867 CellValue::Int64(1),
1868 CellValue::List(vec![
1869 CellValue::Utf8("a".to_string()),
1870 CellValue::Utf8("b".to_string()),
1871 ]),
1872 CellValue::List(vec![CellValue::Int64(10), CellValue::Int64(20)]),
1873 ],
1874 };
1875 let encoded = encode_base_row_value(&row, &model).expect("encode");
1876 let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).expect("decode");
1877 assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
1878 match &decoded.values[1] {
1879 CellValue::List(items) => {
1880 assert_eq!(items.len(), 2);
1881 assert!(matches!(&items[0], CellValue::Utf8(v) if v == "a"));
1882 assert!(matches!(&items[1], CellValue::Utf8(v) if v == "b"));
1883 }
1884 _ => panic!("expected List"),
1885 }
1886 match &decoded.values[2] {
1887 CellValue::List(items) => {
1888 assert_eq!(items.len(), 2);
1889 assert!(matches!(&items[0], CellValue::Int64(10)));
1890 assert!(matches!(&items[1], CellValue::Int64(20)));
1891 }
1892 _ => panic!("expected List"),
1893 }
1894 }
1895
1896 #[test]
1897 fn decimal128_constraint_range() {
1898 let mut min: Option<i128> = None;
1899 let mut max: Option<i128> = None;
1900 let mut contradiction = false;
1901 apply_decimal128_constraint(&mut min, &mut max, Operator::GtEq, 100, &mut contradiction);
1902 assert!(!contradiction);
1903 apply_decimal128_constraint(&mut min, &mut max, Operator::LtEq, 200, &mut contradiction);
1904 assert!(!contradiction);
1905 assert_eq!(min, Some(100));
1906 assert_eq!(max, Some(200));
1907 assert!(in_i128_bounds(150, min, max));
1908 assert!(!in_i128_bounds(99, min, max));
1909 assert!(!in_i128_bounds(201, min, max));
1910 }
1911
1912 #[test]
1913 fn decimal256_gt_max_is_contradiction() {
1914 let mut min: Option<i256> = None;
1915 let mut max: Option<i256> = None;
1916 let mut contradiction = false;
1917 apply_i256_constraint(
1918 &mut min,
1919 &mut max,
1920 Operator::Gt,
1921 i256::MAX,
1922 &mut contradiction,
1923 );
1924 assert!(contradiction);
1925 assert_eq!(min, None);
1926 assert_eq!(max, None);
1927 }
1928
1929 #[test]
1930 fn decimal256_lt_min_is_contradiction() {
1931 let mut min: Option<i256> = None;
1932 let mut max: Option<i256> = None;
1933 let mut contradiction = false;
1934 apply_i256_constraint(
1935 &mut min,
1936 &mut max,
1937 Operator::Lt,
1938 i256::MIN,
1939 &mut contradiction,
1940 );
1941 assert!(contradiction);
1942 assert_eq!(min, None);
1943 assert_eq!(max, None);
1944 }
1945
1946 #[test]
1947 fn date32_index_bound_clamps_on_i64_overflow() {
1948 let config = KvTableConfig::new(
1949 0,
1950 vec![
1951 TableColumnConfig::new("id", DataType::Int64, false),
1952 TableColumnConfig::new("created", DataType::Date32, false),
1953 ],
1954 vec!["id".to_string()],
1955 vec![IndexSpec::new("created_idx", vec!["created".to_string()]).unwrap()],
1956 )
1957 .unwrap();
1958 let model = TableModel::from_config(&config).unwrap();
1959 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
1960
1961 let created_idx = *model.columns_by_name.get("created").unwrap();
1962 let mut pred = QueryPredicate::default();
1963 pred.constraints.insert(
1964 created_idx,
1965 PredicateConstraint::IntRange {
1966 min: Some(i32::MAX as i64 + 1),
1967 max: None,
1968 },
1969 );
1970
1971 let start = pred
1972 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, false)
1973 .unwrap();
1974 let end = pred
1975 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 1, true)
1976 .unwrap();
1977
1978 assert!(
1979 start <= end,
1980 "lower bound must not exceed upper bound (was wrapping via as i32)"
1981 );
1982
1983 let encoded_lower = specs[0].codec.read_payload_exact::<4>(&start, 0).unwrap();
1984 let decoded_lower = decode_i32_ordered(encoded_lower);
1985 assert_eq!(
1986 decoded_lower,
1987 i32::MAX,
1988 "out-of-range i64 must clamp to i32::MAX, not wrap"
1989 );
1990 }
1991
1992 #[test]
1993 fn timestamp_nanos_gt_uses_floor_division() {
1994 let micros = timestamp_scalar_to_micros_for_op(
1995 &ScalarValue::TimestampNanosecond(Some(-1500), None),
1996 Operator::Gt,
1997 )
1998 .unwrap();
1999 assert_eq!(micros, -2, "Gt on -1500ns should floor to -2us");
2000
2001 let mut min: Option<i64> = None;
2002 let mut max: Option<i64> = None;
2003 let mut contradiction = false;
2004 apply_int_constraint(&mut min, &mut max, Operator::Gt, micros, &mut contradiction);
2005 assert_eq!(min, Some(-1), "Gt(-2us) + 1 = min -1us");
2006
2007 let row_at_minus_1 = CellValue::Timestamp(-1);
2008 assert!(matches_constraint(
2009 &row_at_minus_1,
2010 &PredicateConstraint::IntRange { min, max }
2011 ));
2012 }
2013
2014 #[test]
2015 fn timestamp_nanos_lteq_uses_floor_division() {
2016 let micros = timestamp_scalar_to_micros_for_op(
2017 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2018 Operator::LtEq,
2019 )
2020 .unwrap();
2021 assert_eq!(micros, -2, "LtEq on -1500ns should floor to -2us");
2022
2023 let row_at_minus_1 = CellValue::Timestamp(-1);
2024 assert!(
2025 !matches_constraint(
2026 &row_at_minus_1,
2027 &PredicateConstraint::IntRange {
2028 min: None,
2029 max: Some(micros)
2030 }
2031 ),
2032 "-1us (-1000ns) > -1500ns, must not satisfy <= -1500ns"
2033 );
2034 }
2035
2036 #[test]
2037 fn timestamp_nanos_gteq_uses_ceil_division() {
2038 let micros = timestamp_scalar_to_micros_for_op(
2039 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2040 Operator::GtEq,
2041 )
2042 .unwrap();
2043 assert_eq!(micros, -1, "GtEq on -1500ns should ceil to -1us");
2044 }
2045
2046 #[test]
2047 fn timestamp_nanos_lt_uses_ceil_division() {
2048 let micros = timestamp_scalar_to_micros_for_op(
2049 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2050 Operator::Lt,
2051 )
2052 .unwrap();
2053 assert_eq!(micros, -1, "Lt on -1500ns should ceil to -1us");
2054
2055 let mut min: Option<i64> = None;
2056 let mut max: Option<i64> = None;
2057 let mut contradiction = false;
2058 apply_int_constraint(&mut min, &mut max, Operator::Lt, micros, &mut contradiction);
2059 assert_eq!(max, Some(-2), "Lt(-1us) - 1 = max -2us");
2060 }
2061
2062 #[test]
2063 fn timestamp_nanos_eq_non_aligned_is_contradiction() {
2064 let result = timestamp_scalar_to_micros_for_op(
2065 &ScalarValue::TimestampNanosecond(Some(-1500), None),
2066 Operator::Eq,
2067 );
2068 assert!(
2069 result.is_none(),
2070 "non-aligned ns Eq must produce contradiction"
2071 );
2072 }
2073
2074 #[test]
2075 fn timestamp_nanos_exact_multiple_is_unchanged() {
2076 for op in [
2077 Operator::Eq,
2078 Operator::Gt,
2079 Operator::GtEq,
2080 Operator::Lt,
2081 Operator::LtEq,
2082 ] {
2083 let micros = timestamp_scalar_to_micros_for_op(
2084 &ScalarValue::TimestampNanosecond(Some(-2000), None),
2085 op,
2086 )
2087 .unwrap();
2088 assert_eq!(micros, -2, "exact multiple -2000ns = -2us for {op:?}");
2089 }
2090 }
2091
2092 #[test]
2093 fn float64_index_bounds_include_infinity() {
2094 let config = KvTableConfig::new(
2095 0,
2096 vec![
2097 TableColumnConfig::new("id", DataType::Int64, false),
2098 TableColumnConfig::new("val", DataType::Float64, false),
2099 ],
2100 vec!["id".to_string()],
2101 vec![IndexSpec::new("val_idx", vec!["val".to_string()]).unwrap()],
2102 )
2103 .unwrap();
2104 let model = TableModel::from_config(&config).unwrap();
2105 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
2106
2107 let pred = QueryPredicate::default();
2108 let start = pred
2109 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, false)
2110 .unwrap();
2111 let end = pred
2112 .encode_index_bound_key(model.table_prefix, &model, &specs[0], 0, true)
2113 .unwrap();
2114
2115 let neg_inf_row = KvRow {
2116 values: vec![CellValue::Int64(1), CellValue::Float64(f64::NEG_INFINITY)],
2117 };
2118 let pos_inf_row = KvRow {
2119 values: vec![CellValue::Int64(2), CellValue::Float64(f64::INFINITY)],
2120 };
2121
2122 let neg_inf_key =
2123 encode_secondary_index_key(model.table_prefix, &specs[0], &model, &neg_inf_row)
2124 .unwrap();
2125 let pos_inf_key =
2126 encode_secondary_index_key(model.table_prefix, &specs[0], &model, &pos_inf_row)
2127 .unwrap();
2128
2129 assert!(
2130 neg_inf_key >= start,
2131 "NEG_INFINITY row key must be within scan start bound"
2132 );
2133 assert!(
2134 pos_inf_key <= end,
2135 "INFINITY row key must be within scan end bound"
2136 );
2137 }
2138
2139 #[test]
2140 fn distinct_table_prefixes_produce_non_overlapping_pk_ranges() {
2141 let range_a = primary_key_prefix_range(1);
2142 let range_b = primary_key_prefix_range(2);
2143 assert!(
2144 range_a.end < range_b.start,
2145 "table prefix 1 pk range must be entirely below table prefix 2"
2146 );
2147 }
2148
2149 #[test]
2150 fn distinct_table_prefixes_isolate_primary_keys() {
2151 let model_1 = simple_int64_model(1);
2152 let model_2 = simple_int64_model(2);
2153 let pk = CellValue::Int64(42);
2154 let key_a = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2155 let key_b = encode_primary_key(2, &[&pk], &model_2).expect("pk key encodes");
2156 assert_ne!(key_a, key_b, "same PK under different prefixes must differ");
2157 assert!(
2158 decode_primary_key(1, &key_a, &model_1).is_some(),
2159 "key_a must decode under prefix 1"
2160 );
2161 assert!(
2162 decode_primary_key(2, &key_a, &model_2).is_none(),
2163 "key_a must NOT decode under prefix 2"
2164 );
2165 assert!(
2166 decode_primary_key(2, &key_b, &model_2).is_some(),
2167 "key_b must decode under prefix 2"
2168 );
2169 }
2170
2171 #[test]
2172 fn distinct_table_prefixes_isolate_secondary_keys() {
2173 let config_a = KvTableConfig::new(
2174 10,
2175 vec![
2176 TableColumnConfig::new("id", DataType::Int64, false),
2177 TableColumnConfig::new("name", DataType::Utf8, false),
2178 ],
2179 vec!["id".to_string()],
2180 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2181 )
2182 .unwrap();
2183 let config_b = KvTableConfig::new(
2184 11,
2185 vec![
2186 TableColumnConfig::new("id", DataType::Int64, false),
2187 TableColumnConfig::new("name", DataType::Utf8, false),
2188 ],
2189 vec!["id".to_string()],
2190 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2191 )
2192 .unwrap();
2193
2194 let model_a = TableModel::from_config(&config_a).unwrap();
2195 let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2196 let model_b = TableModel::from_config(&config_b).unwrap();
2197 let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2198
2199 let row = KvRow {
2200 values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2201 };
2202 let key_a =
2203 encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2204 let key_b =
2205 encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2206
2207 assert_ne!(
2208 key_a, key_b,
2209 "same row under different prefixes must differ"
2210 );
2211 assert!(
2212 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_a)
2213 .is_some()
2214 );
2215 assert!(
2216 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2217 .is_none(),
2218 "key from table B must not decode under table A's prefix"
2219 );
2220 }
2221
2222 #[test]
2223 fn table_prefix_stored_in_model() {
2224 let config = KvTableConfig::new(
2225 12,
2226 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2227 vec!["id".to_string()],
2228 vec![],
2229 )
2230 .unwrap();
2231 let model = TableModel::from_config(&config).unwrap();
2232 assert_eq!(model.table_prefix, 12);
2233 }
2234
2235 #[test]
2236 fn codec_layout_exposes_payload_bits_under_reserved_family_bits() {
2237 let config = KvTableConfig::new(
2238 0,
2239 vec![
2240 TableColumnConfig::new("id", DataType::FixedSizeBinary(16), false),
2241 TableColumnConfig::new("bucket", DataType::FixedSizeBinary(16), false),
2242 ],
2243 vec!["id".to_string()],
2244 vec![IndexSpec::new("bucket_idx", vec!["bucket".to_string()]).unwrap()],
2245 )
2246 .unwrap();
2247 let model = TableModel::from_config(&config).unwrap();
2248 let spec = model
2249 .resolve_index_specs(&config.index_specs)
2250 .unwrap()
2251 .remove(0);
2252
2253 let mut current_primary = HashSet::new();
2254 let mut current_secondary = HashSet::new();
2255
2256 fn first_twelve_bits_of_key(key: &[u8]) -> u16 {
2257 let first = u16::from(*key.first().unwrap_or(&0));
2258 let second = u16::from(*key.get(1).unwrap_or(&0));
2259 (first << 4) | (second >> 4)
2260 }
2261
2262 for first_byte in 0u8..=255 {
2263 let mut id = vec![0u8; 16];
2264 id[0] = first_byte;
2265 let mut bucket = vec![0u8; 16];
2266 bucket[0] = first_byte;
2267
2268 let pk = CellValue::FixedBinary(id.clone());
2269 let current_pk = encode_primary_key(model.table_prefix, &[&pk], &model).unwrap();
2270 current_primary.insert(first_twelve_bits_of_key(¤t_pk));
2271
2272 let row = KvRow {
2273 values: vec![
2274 CellValue::FixedBinary(id),
2275 CellValue::FixedBinary(bucket.clone()),
2276 ],
2277 };
2278 let current_index =
2279 encode_secondary_index_key(model.table_prefix, &spec, &model, &row).unwrap();
2280 current_secondary.insert(first_twelve_bits_of_key(¤t_index));
2281 }
2282
2283 assert_eq!(current_primary.len(), 128);
2286
2287 assert_eq!(current_secondary.len(), 8);
2290 }
2291
2292 #[test]
2293 fn kv_schema_auto_assigns_sequential_prefixes() {
2294 let client = StoreClient::new("http://localhost:10000");
2295 let schema = KvSchema::new(client)
2296 .table(
2297 "alpha",
2298 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2299 vec!["id".to_string()],
2300 vec![],
2301 )
2302 .unwrap()
2303 .table(
2304 "beta",
2305 vec![
2306 TableColumnConfig::new("id", DataType::Int64, false),
2307 TableColumnConfig::new("name", DataType::Utf8, false),
2308 ],
2309 vec!["id".to_string()],
2310 vec![],
2311 )
2312 .unwrap()
2313 .table(
2314 "gamma",
2315 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2316 vec!["id".to_string()],
2317 vec![],
2318 )
2319 .unwrap();
2320
2321 assert_eq!(schema.table_count(), 3);
2322 }
2323
2324 #[test]
2325 fn kv_schema_allows_max_codec_table_count_and_rejects_overflow() {
2326 let client = StoreClient::new("http://localhost:10000");
2327 let mut schema = KvSchema::new(client);
2328 for idx in 0..MAX_TABLES {
2329 schema = schema
2330 .table(
2331 format!("t{idx}"),
2332 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2333 vec!["id".to_string()],
2334 vec![],
2335 )
2336 .expect("tables up to codec capacity should be accepted");
2337 }
2338 assert_eq!(schema.table_count(), MAX_TABLES);
2339
2340 let overflow = schema.table(
2341 "overflow",
2342 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2343 vec!["id".to_string()],
2344 vec![],
2345 );
2346 match overflow {
2347 Ok(_) => panic!("overflow table should be rejected"),
2348 Err(err) => assert!(
2349 err.contains(&format!(
2350 "too many tables for codec layout (max {MAX_TABLES})"
2351 )),
2352 "overflow table should be rejected with codec-capacity error"
2353 ),
2354 }
2355 }
2356
2357 #[test]
2358 fn sequential_prefixes_produce_non_overlapping_pk_ranges() {
2359 let range_a = primary_key_prefix_range(0);
2360 let range_b = primary_key_prefix_range(1);
2361 let range_c = primary_key_prefix_range(2);
2362 assert!(range_a.end < range_b.start);
2363 assert!(range_b.end < range_c.start);
2364 }
2365
2366 #[test]
2367 fn sequential_prefixes_isolate_primary_keys() {
2368 let model_0 = simple_int64_model(0);
2369 let model_1 = simple_int64_model(1);
2370 let pk = CellValue::Int64(42);
2371 let key_a = encode_primary_key(0, &[&pk], &model_0).expect("pk key encodes");
2372 let key_b = encode_primary_key(1, &[&pk], &model_1).expect("pk key encodes");
2373 assert_ne!(key_a, key_b);
2374 assert!(decode_primary_key(0, &key_a, &model_0).is_some());
2375 assert!(decode_primary_key(1, &key_a, &model_1).is_none());
2376 assert!(decode_primary_key(0, &key_b, &model_0).is_none());
2377 assert!(decode_primary_key(1, &key_b, &model_1).is_some());
2378 }
2379
2380 #[test]
2381 fn sequential_prefixes_isolate_secondary_keys() {
2382 let config_a = KvTableConfig::new(
2383 0,
2384 vec![
2385 TableColumnConfig::new("id", DataType::Int64, false),
2386 TableColumnConfig::new("name", DataType::Utf8, false),
2387 ],
2388 vec!["id".to_string()],
2389 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2390 )
2391 .unwrap();
2392 let config_b = KvTableConfig::new(
2393 1,
2394 vec![
2395 TableColumnConfig::new("id", DataType::Int64, false),
2396 TableColumnConfig::new("name", DataType::Utf8, false),
2397 ],
2398 vec!["id".to_string()],
2399 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2400 )
2401 .unwrap();
2402
2403 let model_a = TableModel::from_config(&config_a).unwrap();
2404 let specs_a = model_a.resolve_index_specs(&config_a.index_specs).unwrap();
2405 let model_b = TableModel::from_config(&config_b).unwrap();
2406 let specs_b = model_b.resolve_index_specs(&config_b.index_specs).unwrap();
2407
2408 let row = KvRow {
2409 values: vec![CellValue::Int64(1), CellValue::Utf8("alice".to_string())],
2410 };
2411 let key_a =
2412 encode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &row).unwrap();
2413 let key_b =
2414 encode_secondary_index_key(model_b.table_prefix, &specs_b[0], &model_b, &row).unwrap();
2415 assert_ne!(key_a, key_b);
2416 assert!(
2417 decode_secondary_index_key(model_a.table_prefix, &specs_a[0], &model_a, &key_b)
2418 .is_none(),
2419 "key from prefix 1 must not decode under prefix 0"
2420 );
2421 }
2422
2423 #[tokio::test]
2424 async fn kv_schema_register_all_enables_join() {
2425 let ctx = SessionContext::new();
2426 let client = StoreClient::new("http://localhost:10000");
2427
2428 let result = KvSchema::new(client)
2429 .table(
2430 "customers",
2431 vec![
2432 TableColumnConfig::new("customer_id", DataType::Int64, false),
2433 TableColumnConfig::new("name", DataType::Utf8, false),
2434 ],
2435 vec!["customer_id".to_string()],
2436 vec![],
2437 )
2438 .unwrap()
2439 .table(
2440 "orders",
2441 vec![
2442 TableColumnConfig::new("order_id", DataType::Int64, false),
2443 TableColumnConfig::new("customer_id", DataType::Int64, false),
2444 TableColumnConfig::new("amount", DataType::Int64, false),
2445 ],
2446 vec!["order_id".to_string()],
2447 vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2448 )
2449 .unwrap()
2450 .register_all(&ctx);
2451
2452 assert!(
2453 result.is_ok(),
2454 "register_all must succeed: {:?}",
2455 result.err()
2456 );
2457
2458 let plan = ctx
2459 .sql(
2460 "SELECT c.name, o.order_id, o.amount \
2461 FROM orders o \
2462 JOIN customers c ON o.customer_id = c.customer_id",
2463 )
2464 .await;
2465 assert!(
2466 plan.is_ok(),
2467 "JOIN query must plan successfully: {:?}",
2468 plan.err()
2469 );
2470 }
2471
2472 #[tokio::test]
2473 async fn kv_schema_three_way_join() {
2474 let ctx = SessionContext::new();
2475 let client = StoreClient::new("http://localhost:10000");
2476
2477 KvSchema::new(client)
2478 .table(
2479 "products",
2480 vec![
2481 TableColumnConfig::new("product_id", DataType::Int64, false),
2482 TableColumnConfig::new("name", DataType::Utf8, false),
2483 TableColumnConfig::new("price", DataType::Int64, false),
2484 ],
2485 vec!["product_id".to_string()],
2486 vec![],
2487 )
2488 .unwrap()
2489 .table(
2490 "line_items",
2491 vec![
2492 TableColumnConfig::new("item_id", DataType::Int64, false),
2493 TableColumnConfig::new("order_id", DataType::Int64, false),
2494 TableColumnConfig::new("product_id", DataType::Int64, false),
2495 TableColumnConfig::new("qty", DataType::Int64, false),
2496 ],
2497 vec!["item_id".to_string()],
2498 vec![
2499 IndexSpec::new("prod_idx", vec!["product_id".to_string()]).unwrap(),
2500 IndexSpec::new("order_idx", vec!["order_id".to_string()]).unwrap(),
2501 ],
2502 )
2503 .unwrap()
2504 .table(
2505 "orders",
2506 vec![
2507 TableColumnConfig::new("order_id", DataType::Int64, false),
2508 TableColumnConfig::new("customer", DataType::Utf8, false),
2509 ],
2510 vec!["order_id".to_string()],
2511 vec![],
2512 )
2513 .unwrap()
2514 .register_all(&ctx)
2515 .unwrap();
2516
2517 let plan = ctx
2518 .sql(
2519 "SELECT o.customer, p.name, li.qty \
2520 FROM line_items li \
2521 JOIN products p ON li.product_id = p.product_id \
2522 JOIN orders o ON li.order_id = o.order_id",
2523 )
2524 .await;
2525 assert!(plan.is_ok(), "three-way JOIN must plan: {:?}", plan.err());
2526 }
2527
2528 #[test]
2529 fn kv_schema_orders_table_convenience() {
2530 let client = StoreClient::new("http://localhost:10000");
2531 let schema = KvSchema::new(client)
2532 .orders_table(
2533 "my_orders",
2534 vec![IndexSpec::new(
2535 "region_customer",
2536 vec!["region".to_string(), "customer_id".to_string()],
2537 )
2538 .unwrap()],
2539 )
2540 .unwrap();
2541 assert_eq!(schema.table_count(), 1);
2542 }
2543
2544 #[test]
2545 fn nullable_column_accepted_in_config() {
2546 let config = KvTableConfig::new(
2547 0,
2548 vec![
2549 TableColumnConfig::new("id", DataType::Int64, false),
2550 TableColumnConfig::new("name", DataType::Utf8, true),
2551 ],
2552 vec!["id".to_string()],
2553 vec![],
2554 );
2555 assert!(config.is_ok());
2556 }
2557
2558 #[test]
2559 fn nullable_column_rejected_in_index() {
2560 let config = KvTableConfig::new(
2561 0,
2562 vec![
2563 TableColumnConfig::new("id", DataType::Int64, false),
2564 TableColumnConfig::new("name", DataType::Utf8, true),
2565 ],
2566 vec!["id".to_string()],
2567 vec![IndexSpec::new("name_idx", vec!["name".to_string()]).unwrap()],
2568 )
2569 .unwrap();
2570 let model = TableModel::from_config(&config).unwrap();
2571 let result = model.resolve_index_specs(&config.index_specs);
2572 assert!(result.is_err());
2573 assert!(result.unwrap_err().contains("nullable"));
2574 }
2575
2576 #[test]
2577 fn base_row_round_trip_with_null() {
2578 let config = KvTableConfig::new(
2579 0,
2580 vec![
2581 TableColumnConfig::new("id", DataType::Int64, false),
2582 TableColumnConfig::new("label", DataType::Utf8, true),
2583 TableColumnConfig::new("score", DataType::Int64, true),
2584 ],
2585 vec!["id".to_string()],
2586 vec![],
2587 )
2588 .unwrap();
2589 let model = TableModel::from_config(&config).unwrap();
2590 let row = KvRow {
2591 values: vec![CellValue::Int64(1), CellValue::Null, CellValue::Int64(42)],
2592 };
2593 let encoded = encode_base_row_value(&row, &model).unwrap();
2594 let decoded = decode_base_row(vec![CellValue::Int64(1)], &encoded, &model).unwrap();
2595 assert!(matches!(&decoded.values[0], CellValue::Int64(1)));
2596 assert!(matches!(&decoded.values[1], CellValue::Null));
2597 assert!(matches!(&decoded.values[2], CellValue::Int64(42)));
2598 }
2599
2600 #[test]
2601 fn null_does_not_match_equality_constraint() {
2602 assert!(!matches_constraint(
2603 &CellValue::Null,
2604 &PredicateConstraint::StringEq("x".to_string())
2605 ));
2606 assert!(!matches_constraint(
2607 &CellValue::Null,
2608 &PredicateConstraint::IntRange {
2609 min: Some(0),
2610 max: Some(10)
2611 }
2612 ));
2613 }
2614
2615 #[test]
2616 fn is_null_constraint_matches() {
2617 assert!(matches_constraint(
2618 &CellValue::Null,
2619 &PredicateConstraint::IsNull
2620 ));
2621 assert!(!matches_constraint(
2622 &CellValue::Utf8("x".to_string()),
2623 &PredicateConstraint::IsNull
2624 ));
2625 assert!(!matches_constraint(
2626 &CellValue::Null,
2627 &PredicateConstraint::IsNotNull
2628 ));
2629 assert!(matches_constraint(
2630 &CellValue::Int64(5),
2631 &PredicateConstraint::IsNotNull
2632 ));
2633 }
2634
2635 #[test]
2636 fn string_in_constraint_matches() {
2637 let constraint =
2638 PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]);
2639 assert!(matches_constraint(
2640 &CellValue::Utf8("us-east".to_string()),
2641 &constraint,
2642 ));
2643 assert!(matches_constraint(
2644 &CellValue::Utf8("us-west".to_string()),
2645 &constraint,
2646 ));
2647 assert!(!matches_constraint(
2648 &CellValue::Utf8("eu-central".to_string()),
2649 &constraint,
2650 ));
2651 }
2652
2653 #[test]
2654 fn int_in_constraint_matches() {
2655 let constraint = PredicateConstraint::IntIn(vec![1, 2, 3]);
2656 assert!(matches_constraint(&CellValue::Int64(1), &constraint));
2657 assert!(matches_constraint(&CellValue::Int64(3), &constraint));
2658 assert!(!matches_constraint(&CellValue::Int64(4), &constraint));
2659 }
2660
2661 #[test]
2662 fn in_predicate_generates_multiple_index_ranges() {
2663 let (model, specs) = test_model();
2664 let region_idx = *model.columns_by_name.get("region").unwrap();
2665 let mut pred = QueryPredicate::default();
2666 pred.constraints.insert(
2667 region_idx,
2668 PredicateConstraint::StringIn(vec!["us-east".to_string(), "us-west".to_string()]),
2669 );
2670 let plan = pred
2671 .choose_index_plan(&model, &specs)
2672 .expect("plan")
2673 .expect("should find index");
2674 assert_eq!(plan.ranges.len(), 2);
2675 }
2676
2677 #[test]
2678 fn int_in_generates_multiple_pk_ranges() {
2679 let (model, _specs) = test_model();
2680 let mut pred = QueryPredicate::default();
2681 pred.constraints.insert(
2682 model.primary_key_indices[0],
2683 PredicateConstraint::IntIn(vec![100, 200, 300]),
2684 );
2685 let ranges = pred.primary_key_ranges(&model).unwrap();
2686 assert_eq!(ranges.len(), 3);
2687 }
2688
2689 #[test]
2690 fn duplicate_int_in_values_deduplicated() {
2691 let (model, _specs) = test_model();
2692 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2694 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2695 "order_id",
2696 ))),
2697 list: vec![
2698 Expr::Literal(ScalarValue::Int64(Some(5)), None),
2699 Expr::Literal(ScalarValue::Int64(Some(5)), None),
2700 Expr::Literal(ScalarValue::Int64(Some(10)), None),
2701 ],
2702 negated: false,
2703 });
2704 let pred = QueryPredicate::from_filters(&[filter], &model);
2705 let ranges = pred.primary_key_ranges(&model).unwrap();
2706 assert_eq!(
2707 ranges.len(),
2708 2,
2709 "duplicate IN values must be deduped, producing 2 ranges not 3"
2710 );
2711 }
2712
2713 #[test]
2714 fn duplicate_uint64_in_values_deduplicated() {
2715 let config = KvTableConfig::new(
2716 0,
2717 vec![
2718 TableColumnConfig::new("id", DataType::UInt64, false),
2719 TableColumnConfig::new("name", DataType::Utf8, false),
2720 ],
2721 vec!["id".to_string()],
2722 vec![],
2723 )
2724 .unwrap();
2725 let model = TableModel::from_config(&config).unwrap();
2726 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2727 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2728 "id",
2729 ))),
2730 list: vec![
2731 Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2732 Expr::Literal(ScalarValue::UInt64(Some(100)), None),
2733 Expr::Literal(ScalarValue::UInt64(Some(200)), None),
2734 ],
2735 negated: false,
2736 });
2737 let pred = QueryPredicate::from_filters(&[filter], &model);
2738 let ranges = pred.primary_key_ranges(&model).unwrap();
2739 assert_eq!(
2740 ranges.len(),
2741 2,
2742 "duplicate UInt64 IN values must be deduped"
2743 );
2744 }
2745
2746 #[test]
2747 fn duplicate_fixed_binary_in_values_deduplicated() {
2748 let config = KvTableConfig::new(
2749 0,
2750 vec![
2751 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
2752 TableColumnConfig::new("val", DataType::Int64, false),
2753 ],
2754 vec!["hash".to_string()],
2755 vec![],
2756 )
2757 .unwrap();
2758 let model = TableModel::from_config(&config).unwrap();
2759 let dup_val = vec![0xAA; 16];
2760 let other_val = vec![0xBB; 16];
2761 let filter = Expr::InList(datafusion::logical_expr::expr::InList {
2762 expr: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2763 "hash",
2764 ))),
2765 list: vec![
2766 Expr::Literal(
2767 ScalarValue::FixedSizeBinary(16, Some(dup_val.clone())),
2768 None,
2769 ),
2770 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(dup_val)), None),
2771 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(other_val)), None),
2772 ],
2773 negated: false,
2774 });
2775 let pred = QueryPredicate::from_filters(&[filter], &model);
2776 let ranges = pred.primary_key_ranges(&model).unwrap();
2777 assert_eq!(
2778 ranges.len(),
2779 2,
2780 "duplicate FixedBinary IN values must be deduped"
2781 );
2782 }
2783
2784 #[test]
2785 fn or_equalities_extracted_as_in_list() {
2786 let (model, _) = test_model();
2787 let expr = Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2788 left: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2789 left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2790 "region",
2791 ))),
2792 op: Operator::Eq,
2793 right: Box::new(Expr::Literal(
2794 ScalarValue::Utf8(Some("us-east".to_string())),
2795 None,
2796 )),
2797 })),
2798 op: Operator::Or,
2799 right: Box::new(Expr::BinaryExpr(datafusion::logical_expr::BinaryExpr {
2800 left: Box::new(Expr::Column(datafusion::common::Column::new_unqualified(
2801 "region",
2802 ))),
2803 op: Operator::Eq,
2804 right: Box::new(Expr::Literal(
2805 ScalarValue::Utf8(Some("us-west".to_string())),
2806 None,
2807 )),
2808 })),
2809 });
2810 let result = extract_or_in_column(&expr, &model);
2811 assert!(result.is_some());
2812 let (col, vals) = result.unwrap();
2813 assert_eq!(col, "region");
2814 assert_eq!(vals.len(), 2);
2815 }
2816
2817 #[test]
2818 fn or_equalities_on_float64_are_not_pushdown_supported() {
2819 let config = KvTableConfig::new(
2820 0,
2821 vec![
2822 TableColumnConfig::new("id", DataType::Int64, false),
2823 TableColumnConfig::new("score", DataType::Float64, false),
2824 ],
2825 vec!["id".to_string()],
2826 vec![],
2827 )
2828 .unwrap();
2829 let model = TableModel::from_config(&config).unwrap();
2830
2831 use datafusion::logical_expr::col;
2832 let filter = col("score")
2833 .eq(Expr::Literal(ScalarValue::Float64(Some(1.0)), None))
2834 .or(col("score").eq(Expr::Literal(ScalarValue::Float64(Some(2.0)), None)));
2835
2836 assert!(
2837 !QueryPredicate::supports_filter(&filter, &model),
2838 "OR-equality pushdown should be disabled for Float64 because apply_in_list cannot enforce it"
2839 );
2840
2841 let pred = QueryPredicate::from_filters(&[filter], &model);
2842 assert!(!pred.contradiction);
2843 assert!(
2844 pred.constraints.is_empty(),
2845 "unsupported OR predicate must not contribute pushdown constraints"
2846 );
2847 }
2848
2849 #[test]
2850 fn batch_writer_encodes_rows_across_tables() {
2851 let client = StoreClient::new("http://localhost:10000");
2852 let schema = KvSchema::new(client)
2853 .table(
2854 "customers",
2855 vec![
2856 TableColumnConfig::new("customer_id", DataType::Int64, false),
2857 TableColumnConfig::new("name", DataType::Utf8, false),
2858 ],
2859 vec!["customer_id".to_string()],
2860 vec![],
2861 )
2862 .unwrap()
2863 .table(
2864 "orders",
2865 vec![
2866 TableColumnConfig::new("order_id", DataType::Int64, false),
2867 TableColumnConfig::new("customer_id", DataType::Int64, false),
2868 TableColumnConfig::new("amount", DataType::Int64, false),
2869 ],
2870 vec!["order_id".to_string()],
2871 vec![IndexSpec::new("cust_idx", vec!["customer_id".to_string()]).unwrap()],
2872 )
2873 .unwrap();
2874
2875 let mut batch = schema.batch_writer();
2876 batch
2877 .insert(
2878 "customers",
2879 vec![CellValue::Int64(1), CellValue::Utf8("Alice".to_string())],
2880 )
2881 .unwrap();
2882 batch
2883 .insert(
2884 "orders",
2885 vec![
2886 CellValue::Int64(100),
2887 CellValue::Int64(1),
2888 CellValue::Int64(4999),
2889 ],
2890 )
2891 .unwrap();
2892 batch
2893 .insert(
2894 "orders",
2895 vec![
2896 CellValue::Int64(101),
2897 CellValue::Int64(1),
2898 CellValue::Int64(2999),
2899 ],
2900 )
2901 .unwrap();
2902
2903 assert_eq!(batch.pending_count(), 5);
2905 }
2906
2907 #[test]
2908 fn batch_writer_rejects_unknown_table() {
2909 let client = StoreClient::new("http://localhost:10000");
2910 let schema = KvSchema::new(client)
2911 .table(
2912 "t1",
2913 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2914 vec!["id".to_string()],
2915 vec![],
2916 )
2917 .unwrap();
2918
2919 let mut batch = schema.batch_writer();
2920 let result = batch.insert("nonexistent", vec![CellValue::Int64(1)]);
2921 assert!(result.is_err());
2922 assert!(result.unwrap_err().contains("unknown table"));
2923 }
2924
2925 #[test]
2926 fn batch_writer_rejects_wrong_column_count() {
2927 let client = StoreClient::new("http://localhost:10000");
2928 let schema = KvSchema::new(client)
2929 .table(
2930 "t1",
2931 vec![
2932 TableColumnConfig::new("id", DataType::Int64, false),
2933 TableColumnConfig::new("name", DataType::Utf8, false),
2934 ],
2935 vec!["id".to_string()],
2936 vec![],
2937 )
2938 .unwrap();
2939
2940 let mut batch = schema.batch_writer();
2941 let result = batch.insert("t1", vec![CellValue::Int64(1)]);
2942 assert!(result.is_err());
2943 assert!(result.unwrap_err().contains("expected 2"));
2944 }
2945
2946 #[test]
2947 fn batch_writer_rejects_non_pk_type_mismatch() {
2948 let client = StoreClient::new("http://localhost:10000");
2949 let schema = KvSchema::new(client)
2950 .table(
2951 "t1",
2952 vec![
2953 TableColumnConfig::new("id", DataType::Int64, false),
2954 TableColumnConfig::new("amount", DataType::Int64, false),
2955 ],
2956 vec!["id".to_string()],
2957 vec![],
2958 )
2959 .unwrap();
2960
2961 let mut batch = schema.batch_writer();
2962 let result = batch.insert(
2963 "t1",
2964 vec![CellValue::Int64(1), CellValue::Utf8("bad".to_string())],
2965 );
2966 assert!(result.is_err());
2967 assert!(
2968 result.unwrap_err().contains("type mismatch"),
2969 "non-PK schema-invalid values must be rejected at insert-time"
2970 );
2971 }
2972
2973 #[test]
2974 fn batch_writer_entries_use_distinct_table_prefixes() {
2975 let client = StoreClient::new("http://localhost:10000");
2976 let schema = KvSchema::new(client)
2977 .table(
2978 "a",
2979 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2980 vec!["id".to_string()],
2981 vec![],
2982 )
2983 .unwrap()
2984 .table(
2985 "b",
2986 vec![TableColumnConfig::new("id", DataType::Int64, false)],
2987 vec!["id".to_string()],
2988 vec![],
2989 )
2990 .unwrap();
2991
2992 let mut batch = schema.batch_writer();
2993 batch.insert("a", vec![CellValue::Int64(42)]).unwrap();
2994 batch.insert("b", vec![CellValue::Int64(42)]).unwrap();
2995
2996 assert_eq!(batch.pending_count(), 2);
2997 assert_ne!(
2998 batch.pending_keys[0], batch.pending_keys[1],
2999 "same PK in different tables must produce different keys"
3000 );
3001 assert_ne!(
3002 batch.pending_keys[0][0], batch.pending_keys[1][0],
3003 "table prefix byte must differ"
3004 );
3005 }
3006
3007 #[tokio::test]
3008 async fn batch_writer_trait_failure_requeues_prepared_before_new_pending() {
3009 let client = StoreClient::new("http://localhost:10000");
3010 let schema = KvSchema::new(client)
3011 .table(
3012 "t",
3013 vec![TableColumnConfig::new("id", DataType::Int64, false)],
3014 vec!["id".to_string()],
3015 vec![],
3016 )
3017 .unwrap();
3018
3019 let mut batch = schema.batch_writer();
3020 batch.insert("t", vec![CellValue::Int64(1)]).unwrap();
3021 let prepared = batch.prepare_flush().unwrap().expect("prepared row");
3022 assert_eq!(prepared.request_id(), 0);
3023 assert_eq!(prepared.entry_count(), 1);
3024
3025 batch.insert("t", vec![CellValue::Int64(2)]).unwrap();
3026 StoreBatchUpload::mark_upload_failed(&batch, prepared, "commit failed".to_string()).await;
3027 assert_eq!(batch.pending_count(), 2);
3028
3029 let retry = batch.prepare_flush().unwrap().expect("retry row");
3030 assert_eq!(retry.request_id(), 0);
3031 assert_eq!(retry.entry_count(), 1);
3032 let next = batch.prepare_flush().unwrap().expect("new pending row");
3033 assert_eq!(next.request_id(), 1);
3034 assert_eq!(next.entry_count(), 1);
3035 }
3036
3037 #[test]
3038 fn batch_writer_supports_nullable_columns() {
3039 let client = StoreClient::new("http://localhost:10000");
3040 let schema = KvSchema::new(client)
3041 .table(
3042 "t",
3043 vec![
3044 TableColumnConfig::new("id", DataType::Int64, false),
3045 TableColumnConfig::new("note", DataType::Utf8, true),
3046 ],
3047 vec!["id".to_string()],
3048 vec![],
3049 )
3050 .unwrap();
3051
3052 let mut batch = schema.batch_writer();
3053 batch
3054 .insert("t", vec![CellValue::Int64(1), CellValue::Null])
3055 .unwrap();
3056 assert_eq!(batch.pending_count(), 1);
3057 }
3058
3059 #[test]
3060 fn non_nullable_column_rejects_null_in_batch_writer() {
3061 let client = StoreClient::new("http://localhost:10000");
3062 let schema = KvSchema::new(client)
3063 .table(
3064 "t",
3065 vec![
3066 TableColumnConfig::new("id", DataType::Int64, false),
3067 TableColumnConfig::new("name", DataType::Utf8, false),
3068 TableColumnConfig::new("note", DataType::Utf8, true),
3069 ],
3070 vec!["id".to_string()],
3071 vec![],
3072 )
3073 .unwrap();
3074
3075 let mut batch = schema.batch_writer();
3077 let result = batch.insert(
3078 "t",
3079 vec![
3080 CellValue::Int64(1),
3081 CellValue::Null,
3082 CellValue::Utf8("ok".to_string()),
3083 ],
3084 );
3085 assert!(result.is_err());
3086 assert!(
3087 result.unwrap_err().contains("not nullable"),
3088 "error should mention non-nullable constraint"
3089 );
3090
3091 let mut batch = schema.batch_writer();
3093 batch
3094 .insert(
3095 "t",
3096 vec![
3097 CellValue::Int64(1),
3098 CellValue::Utf8("Alice".to_string()),
3099 CellValue::Null,
3100 ],
3101 )
3102 .unwrap();
3103 assert_eq!(batch.pending_count(), 1);
3104
3105 let mut batch = schema.batch_writer();
3107 batch
3108 .insert(
3109 "t",
3110 vec![
3111 CellValue::Int64(1),
3112 CellValue::Utf8("Alice".to_string()),
3113 CellValue::Utf8("hello".to_string()),
3114 ],
3115 )
3116 .unwrap();
3117 assert_eq!(batch.pending_count(), 1);
3118 }
3119
3120 #[test]
3121 fn uint64_column_accepted() {
3122 let config = KvTableConfig::new(
3123 0,
3124 vec![
3125 TableColumnConfig::new("id", DataType::UInt64, false),
3126 TableColumnConfig::new("name", DataType::Utf8, false),
3127 ],
3128 vec!["id".to_string()],
3129 vec![],
3130 );
3131 assert!(config.is_ok());
3132 }
3133
3134 #[test]
3135 fn uint64_primary_key_round_trip() {
3136 let config = KvTableConfig::new(
3137 0,
3138 vec![
3139 TableColumnConfig::new("id", DataType::UInt64, false),
3140 TableColumnConfig::new("label", DataType::Utf8, false),
3141 ],
3142 vec!["id".to_string()],
3143 vec![],
3144 )
3145 .unwrap();
3146 let model = TableModel::from_config(&config).unwrap();
3147 let row = KvRow {
3148 values: vec![
3149 CellValue::UInt64(u64::MAX),
3150 CellValue::Utf8("max".to_string()),
3151 ],
3152 };
3153 let encoded = encode_base_row_value(&row, &model).unwrap();
3154 let pk = row
3155 .primary_key_values(&model)
3156 .into_iter()
3157 .cloned()
3158 .collect::<Vec<_>>();
3159 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3160 assert!(matches!(&decoded.values[0], CellValue::UInt64(v) if *v == u64::MAX));
3161 assert!(matches!(&decoded.values[1], CellValue::Utf8(v) if v == "max"));
3162 }
3163
3164 #[test]
3165 fn string_primary_key_accepted() {
3166 let config = KvTableConfig::new(
3167 0,
3168 vec![
3169 TableColumnConfig::new("code", DataType::Utf8, false),
3170 TableColumnConfig::new("value", DataType::Int64, false),
3171 ],
3172 vec!["code".to_string()],
3173 vec![],
3174 );
3175 assert!(config.is_ok());
3176 }
3177
3178 #[test]
3179 fn fixed_binary_primary_key_round_trip() {
3180 let config = KvTableConfig::new(
3181 0,
3182 vec![
3183 TableColumnConfig::new("hash", DataType::FixedSizeBinary(32), false),
3184 TableColumnConfig::new("amount", DataType::Int64, false),
3185 ],
3186 vec!["hash".to_string()],
3187 vec![],
3188 )
3189 .unwrap();
3190 let model = TableModel::from_config(&config).unwrap();
3191 let hash_val = vec![0xABu8; 32];
3192 let row = KvRow {
3193 values: vec![
3194 CellValue::FixedBinary(hash_val.clone()),
3195 CellValue::Int64(100),
3196 ],
3197 };
3198 let encoded = encode_base_row_value(&row, &model).unwrap();
3199 let pk = row
3200 .primary_key_values(&model)
3201 .into_iter()
3202 .cloned()
3203 .collect::<Vec<_>>();
3204 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3205 assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if *v == hash_val));
3206 }
3207
3208 #[test]
3209 fn fixed_binary_key_rejects_wrong_length() {
3210 let config = KvTableConfig::new(
3211 0,
3212 vec![
3213 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3214 TableColumnConfig::new("amount", DataType::Int64, false),
3215 ],
3216 vec!["hash".to_string()],
3217 vec![],
3218 )
3219 .unwrap();
3220 let model = TableModel::from_config(&config).unwrap();
3221
3222 let short_row = KvRow {
3224 values: vec![CellValue::FixedBinary(vec![0xAB; 10]), CellValue::Int64(1)],
3225 };
3226 let result = encode_primary_key_from_row(model.table_prefix, &short_row, &model);
3227 assert!(result.is_err());
3228 assert!(
3229 result.unwrap_err().contains("requires exactly 16 bytes"),
3230 "should mention exact width requirement"
3231 );
3232
3233 let long_row = KvRow {
3235 values: vec![CellValue::FixedBinary(vec![0xCD; 20]), CellValue::Int64(2)],
3236 };
3237 let result = encode_primary_key_from_row(model.table_prefix, &long_row, &model);
3238 assert!(result.is_err());
3239 assert!(result.unwrap_err().contains("requires exactly 16 bytes"));
3240
3241 let ok_row = KvRow {
3243 values: vec![CellValue::FixedBinary(vec![0xEF; 16]), CellValue::Int64(3)],
3244 };
3245 assert!(encode_primary_key_from_row(model.table_prefix, &ok_row, &model).is_ok());
3246 }
3247
3248 #[test]
3249 fn fixed_binary_index_key_rejects_wrong_length() {
3250 let config = KvTableConfig::new(
3251 0,
3252 vec![
3253 TableColumnConfig::new("id", DataType::Int64, false),
3254 TableColumnConfig::new("tag", DataType::FixedSizeBinary(8), false),
3255 ],
3256 vec!["id".to_string()],
3257 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3258 )
3259 .unwrap();
3260 let model = TableModel::from_config(&config).unwrap();
3261 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3262
3263 let bad_row = KvRow {
3265 values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x01; 4])],
3266 };
3267 let result = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &bad_row);
3268 assert!(result.is_err());
3269 assert!(result.unwrap_err().contains("requires exactly 8 bytes"));
3270
3271 let ok_row = KvRow {
3273 values: vec![CellValue::Int64(1), CellValue::FixedBinary(vec![0x02; 8])],
3274 };
3275 assert!(encode_secondary_index_key(model.table_prefix, &specs[0], &model, &ok_row).is_ok());
3276 }
3277
3278 #[test]
3279 fn decimal256_column_round_trip() {
3280 let config = KvTableConfig::new(
3281 0,
3282 vec![
3283 TableColumnConfig::new("id", DataType::Int64, false),
3284 TableColumnConfig::new("balance", DataType::Decimal256(76, 0), false),
3285 ],
3286 vec!["id".to_string()],
3287 vec![],
3288 )
3289 .unwrap();
3290 let model = TableModel::from_config(&config).unwrap();
3291 let big_val = i256::from(123456789012345i64);
3292 let row = KvRow {
3293 values: vec![CellValue::Int64(1), CellValue::Decimal256(big_val)],
3294 };
3295 let encoded = encode_base_row_value(&row, &model).unwrap();
3296 let pk = row
3297 .primary_key_values(&model)
3298 .into_iter()
3299 .cloned()
3300 .collect::<Vec<_>>();
3301 let decoded = decode_base_row(pk, &encoded, &model).unwrap();
3302 assert!(matches!(&decoded.values[1], CellValue::Decimal256(v) if *v == big_val));
3303 }
3304
3305 #[test]
3306 fn float64_primary_key_rejected() {
3307 let config = KvTableConfig::new(
3308 0,
3309 vec![TableColumnConfig::new("id", DataType::Float64, false)],
3310 vec!["id".to_string()],
3311 vec![],
3312 );
3313 assert!(config.is_err());
3314 }
3315
3316 #[test]
3317 fn i256_ordered_encoding_round_trip() {
3318 let values = [
3319 i256::from_i128(i128::MIN),
3320 i256::from(-1i64),
3321 i256::from(0i64),
3322 i256::from(1i64),
3323 i256::from_i128(i128::MAX),
3324 ];
3325 for v in values {
3326 assert_eq!(decode_i256_ordered(encode_i256_ordered(v)), v);
3327 }
3328 let encoded: Vec<[u8; 32]> = values.iter().map(|v| encode_i256_ordered(*v)).collect();
3329 for i in 0..encoded.len() - 1 {
3330 assert!(encoded[i] < encoded[i + 1]);
3331 }
3332 }
3333
3334 #[test]
3335 fn uint64_primary_key_encode_decode() {
3336 let config = KvTableConfig::new(
3337 5,
3338 vec![
3339 TableColumnConfig::new("id", DataType::UInt64, false),
3340 TableColumnConfig::new("name", DataType::Utf8, false),
3341 ],
3342 vec!["id".to_string()],
3343 vec![],
3344 )
3345 .unwrap();
3346 let model = TableModel::from_config(&config).unwrap();
3347 let pk = CellValue::UInt64(12345);
3348 let key = encode_primary_key(5, &[&pk], &model).expect("pk key encodes");
3349 let decoded = decode_primary_key(5, &key, &model).unwrap();
3350 assert!(matches!(&decoded[0], CellValue::UInt64(12345)));
3351 }
3352
3353 #[test]
3354 fn utf8_primary_key_encode_decode() {
3355 let config = KvTableConfig::new(
3356 3,
3357 vec![
3358 TableColumnConfig::new("code", DataType::Utf8, false),
3359 TableColumnConfig::new("val", DataType::Int64, false),
3360 ],
3361 vec!["code".to_string()],
3362 vec![],
3363 )
3364 .unwrap();
3365 let model = TableModel::from_config(&config).unwrap();
3366 let pk = CellValue::Utf8("HELLO".to_string());
3367 let key = encode_primary_key(3, &[&pk], &model).expect("pk key encodes");
3368 let decoded = decode_primary_key(3, &key, &model).unwrap();
3369 assert!(matches!(&decoded[0], CellValue::Utf8(v) if v == "HELLO"));
3370 }
3371
3372 #[test]
3373 fn fixed_binary_primary_key_encode_decode() {
3374 let config = KvTableConfig::new(
3375 7,
3376 vec![
3377 TableColumnConfig::new("hash", DataType::FixedSizeBinary(16), false),
3378 TableColumnConfig::new("val", DataType::Int64, false),
3379 ],
3380 vec!["hash".to_string()],
3381 vec![],
3382 )
3383 .unwrap();
3384 let model = TableModel::from_config(&config).unwrap();
3385 let data = vec![0xDE, 0xAD, 0xBE, 0xEF, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
3386 let pk = CellValue::FixedBinary(data.clone());
3387 let key = encode_primary_key(7, &[&pk], &model).expect("pk key encodes");
3388 let decoded = decode_primary_key(7, &key, &model).unwrap();
3389 assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == data));
3390 }
3391
3392 #[test]
3393 fn secondary_index_with_uint64_column() {
3394 let config = KvTableConfig::new(
3395 0,
3396 vec![
3397 TableColumnConfig::new("id", DataType::Int64, false),
3398 TableColumnConfig::new("counter", DataType::UInt64, false),
3399 ],
3400 vec!["id".to_string()],
3401 vec![IndexSpec::new("counter_idx", vec!["counter".to_string()]).unwrap()],
3402 )
3403 .unwrap();
3404 let model = TableModel::from_config(&config).unwrap();
3405 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3406 let row = KvRow {
3407 values: vec![CellValue::Int64(1), CellValue::UInt64(999)],
3408 };
3409 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3410 let decoded =
3411 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3412 let counter_idx = *model.columns_by_name.get("counter").unwrap();
3413 assert!(matches!(
3414 decoded.values.get(&counter_idx),
3415 Some(CellValue::UInt64(999))
3416 ));
3417 assert!(matches!(
3418 &decoded.primary_key_values[0],
3419 CellValue::Int64(1)
3420 ));
3421 }
3422
3423 #[test]
3424 fn secondary_index_with_decimal256_column() {
3425 let config = KvTableConfig::new(
3426 0,
3427 vec![
3428 TableColumnConfig::new("id", DataType::Int64, false),
3429 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
3430 ],
3431 vec!["id".to_string()],
3432 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
3433 )
3434 .unwrap();
3435 let model = TableModel::from_config(&config).unwrap();
3436 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3437 let val = i256::from(42i64);
3438 let row = KvRow {
3439 values: vec![CellValue::Int64(1), CellValue::Decimal256(val)],
3440 };
3441 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3442 let decoded =
3443 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3444 let big_idx = *model.columns_by_name.get("big_val").unwrap();
3445 assert!(matches!(
3446 decoded.values.get(&big_idx),
3447 Some(CellValue::Decimal256(v)) if *v == val
3448 ));
3449 }
3450
3451 #[test]
3456 fn composite_pk_config_accepted() {
3457 let config = KvTableConfig::new(
3458 0,
3459 vec![
3460 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3461 TableColumnConfig::new("version", DataType::UInt64, false),
3462 TableColumnConfig::new("data", DataType::Utf8, true),
3463 ],
3464 vec!["entity".to_string(), "version".to_string()],
3465 vec![],
3466 );
3467 assert!(config.is_ok());
3468 let c = config.unwrap();
3469 assert_eq!(c.primary_key_columns, vec!["entity", "version"]);
3470 }
3471
3472 #[test]
3473 fn composite_pk_rejects_unsupported_type() {
3474 let result = KvTableConfig::new(
3475 0,
3476 vec![
3477 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3478 TableColumnConfig::new("score", DataType::Float64, false),
3479 ],
3480 vec!["entity".to_string(), "score".to_string()],
3481 vec![],
3482 );
3483 assert!(result.is_err());
3484 let err = result.unwrap_err();
3485 assert!(
3486 err.contains("must be Int64") || err.contains("must be"),
3487 "expected PK type error, got: {err}"
3488 );
3489 }
3490
3491 #[test]
3492 fn composite_pk_rejects_too_wide() {
3493 let config = KvTableConfig::new(
3494 0,
3495 vec![
3496 TableColumnConfig::new("big", DataType::FixedSizeBinary(60), false),
3497 TableColumnConfig::new("ver", DataType::UInt64, false),
3498 ],
3499 vec!["big".to_string(), "ver".to_string()],
3500 vec![],
3501 )
3502 .expect("variable-length keys should allow wider composite PKs");
3503 let model = TableModel::from_config(&config).expect("model");
3504 assert_eq!(model.primary_key_width, 68);
3505 }
3506
3507 #[test]
3508 fn composite_pk_encode_decode_round_trip() {
3509 let config = KvTableConfig::new(
3510 1,
3511 vec![
3512 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3513 TableColumnConfig::new("version", DataType::UInt64, false),
3514 TableColumnConfig::new("title", DataType::Utf8, true),
3515 ],
3516 vec!["entity".to_string(), "version".to_string()],
3517 vec![],
3518 )
3519 .unwrap();
3520 let model = TableModel::from_config(&config).unwrap();
3521
3522 let entity = vec![0xAA; 32];
3523 let pk_entity = CellValue::FixedBinary(entity.clone());
3524 let pk_version = CellValue::UInt64(42);
3525 let key =
3526 encode_primary_key(1, &[&pk_entity, &pk_version], &model).expect("pk key encodes");
3527
3528 let decoded = decode_primary_key(1, &key, &model).unwrap();
3529 assert_eq!(decoded.len(), 2);
3530 assert!(matches!(&decoded[0], CellValue::FixedBinary(v) if *v == entity));
3531 assert!(matches!(&decoded[1], CellValue::UInt64(42)));
3532 }
3533
3534 #[test]
3535 fn composite_pk_version_sort_order() {
3536 let config = KvTableConfig::new(
3537 0,
3538 vec![
3539 TableColumnConfig::new("entity", DataType::FixedSizeBinary(32), false),
3540 TableColumnConfig::new("version", DataType::UInt64, false),
3541 ],
3542 vec!["entity".to_string(), "version".to_string()],
3543 vec![],
3544 )
3545 .unwrap();
3546 let model = TableModel::from_config(&config).unwrap();
3547
3548 let entity = vec![0xBB; 32];
3549 let pk_entity = CellValue::FixedBinary(entity.clone());
3550
3551 let key_v1 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(1)], &model)
3552 .expect("pk key encodes");
3553 let key_v10 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(10)], &model)
3554 .expect("pk key encodes");
3555 let key_v100 = encode_primary_key(0, &[&pk_entity, &CellValue::UInt64(100)], &model)
3556 .expect("pk key encodes");
3557
3558 assert!(key_v1 < key_v10);
3560 assert!(key_v10 < key_v100);
3561 }
3562
3563 #[test]
3564 fn composite_pk_value_excludes_all_pk_columns() {
3565 let config = KvTableConfig::new(
3566 0,
3567 vec![
3568 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3569 TableColumnConfig::new("version", DataType::UInt64, false),
3570 TableColumnConfig::new("data", DataType::Utf8, true),
3571 ],
3572 vec!["entity".to_string(), "version".to_string()],
3573 vec![],
3574 )
3575 .unwrap();
3576 let model = TableModel::from_config(&config).unwrap();
3577
3578 let row = KvRow {
3579 values: vec![
3580 CellValue::FixedBinary(vec![0xCC; 16]),
3581 CellValue::UInt64(7),
3582 CellValue::Utf8("hello".to_string()),
3583 ],
3584 };
3585 let encoded = encode_base_row_value(&row, &model).unwrap();
3586 let decoded = decode_base_row(
3588 vec![CellValue::FixedBinary(vec![0xCC; 16]), CellValue::UInt64(7)],
3589 &encoded,
3590 &model,
3591 )
3592 .unwrap();
3593 assert!(matches!(&decoded.values[0], CellValue::FixedBinary(v) if v.len() == 16));
3594 assert!(matches!(&decoded.values[1], CellValue::UInt64(7)));
3595 assert!(matches!(&decoded.values[2], CellValue::Utf8(v) if v == "hello"));
3596 }
3597
3598 #[test]
3599 fn composite_pk_secondary_index_appends_all_pk_columns() {
3600 let config = KvTableConfig::new(
3601 0,
3602 vec![
3603 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3604 TableColumnConfig::new("version", DataType::UInt64, false),
3605 TableColumnConfig::new("tag", DataType::Int64, false),
3606 ],
3607 vec!["entity".to_string(), "version".to_string()],
3608 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
3609 )
3610 .unwrap();
3611 let model = TableModel::from_config(&config).unwrap();
3612 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
3613
3614 let entity_data = vec![0xDD; 16];
3615 let row = KvRow {
3616 values: vec![
3617 CellValue::FixedBinary(entity_data.clone()),
3618 CellValue::UInt64(99),
3619 CellValue::Int64(42),
3620 ],
3621 };
3622 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row).unwrap();
3623 let decoded =
3624 decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key).unwrap();
3625
3626 assert_eq!(decoded.primary_key_values.len(), 2);
3627 assert!(matches!(
3628 &decoded.primary_key_values[0],
3629 CellValue::FixedBinary(v) if *v == entity_data
3630 ));
3631 assert!(matches!(
3632 &decoded.primary_key_values[1],
3633 CellValue::UInt64(99)
3634 ));
3635 let tag_idx = *model.columns_by_name.get("tag").unwrap();
3636 assert!(matches!(
3637 decoded.values.get(&tag_idx),
3638 Some(CellValue::Int64(42))
3639 ));
3640 }
3641
3642 #[test]
3643 fn table_versioned_convenience() {
3644 let client = StoreClient::new("http://localhost:10000");
3645 let schema = KvSchema::new(client)
3646 .table_versioned(
3647 "documents",
3648 vec![
3649 TableColumnConfig::new("doc_id", DataType::FixedSizeBinary(32), false),
3650 TableColumnConfig::new("version", DataType::UInt64, false),
3651 TableColumnConfig::new("title", DataType::Utf8, false),
3652 ],
3653 "doc_id",
3654 "version",
3655 vec![],
3656 )
3657 .unwrap();
3658 assert_eq!(schema.table_count(), 1);
3659 }
3660
3661 #[test]
3662 fn single_column_pk_backward_compat() {
3663 let config = KvTableConfig::new(
3665 0,
3666 vec![
3667 TableColumnConfig::new("id", DataType::Int64, false),
3668 TableColumnConfig::new("name", DataType::Utf8, true),
3669 ],
3670 vec!["id".to_string()],
3671 vec![],
3672 )
3673 .unwrap();
3674 let model = TableModel::from_config(&config).unwrap();
3675 assert_eq!(model.primary_key_indices.len(), 1);
3676 assert_eq!(model.primary_key_indices[0], 0);
3677 assert_eq!(model.primary_key_width, 8);
3678
3679 let pk = CellValue::Int64(42);
3680 let key = encode_primary_key(0, &[&pk], &model).expect("pk key encodes");
3681 let decoded = decode_primary_key(0, &key, &model).unwrap();
3682 assert_eq!(decoded.len(), 1);
3683 assert!(matches!(&decoded[0], CellValue::Int64(42)));
3684 }
3685
3686 #[test]
3687 fn partial_prefix_upper_bound_fills_trailing_pk_bytes() {
3688 let config = KvTableConfig::new(
3693 0,
3694 vec![
3695 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3696 TableColumnConfig::new("version", DataType::UInt64, false),
3697 ],
3698 vec!["entity".to_string(), "version".to_string()],
3699 vec![],
3700 )
3701 .unwrap();
3702 let model = TableModel::from_config(&config).unwrap();
3703 assert_eq!(model.primary_key_width, 24); let entity = CellValue::FixedBinary(vec![0xAA; 16]);
3706 let upper =
3708 encode_primary_key_bound(0, &[&entity], &model, true).expect("pk bound encodes");
3709
3710 assert_eq!(primary_payload(&model, &upper, 0, 16), vec![0xAA; 16]);
3712 assert_eq!(
3714 primary_payload(&model, &upper, 16, 8),
3715 vec![0xFF; 8],
3716 "trailing PK column (version) must be 0xFF for upper bound"
3717 );
3718 assert!(primary_payload(
3720 &model,
3721 &upper,
3722 24,
3723 model.primary_key_codec.payload_capacity_bytes() - 24
3724 )
3725 .iter()
3726 .all(|&b| b == 0xFF));
3727
3728 let lower =
3730 encode_primary_key_bound(0, &[&entity], &model, false).expect("pk bound encodes");
3731 assert_eq!(primary_payload(&model, &lower, 0, 16), vec![0xAA; 16]);
3732 assert_eq!(
3733 primary_payload(&model, &lower, 16, 8),
3734 vec![0x00; 8],
3735 "trailing PK column (version) must be 0x00 for lower bound"
3736 );
3737 }
3738
3739 #[test]
3744 fn composite_pk_range_pushdown_entity_eq_version_lte() {
3745 let config = KvTableConfig::new(
3749 0,
3750 vec![
3751 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3752 TableColumnConfig::new("version", DataType::UInt64, false),
3753 TableColumnConfig::new("data", DataType::Utf8, true),
3754 ],
3755 vec!["entity".to_string(), "version".to_string()],
3756 vec![],
3757 )
3758 .unwrap();
3759 let model = TableModel::from_config(&config).unwrap();
3760
3761 let mut pred = QueryPredicate::default();
3763 pred.constraints
3764 .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xCC; 16]));
3765 pred.constraints.insert(
3766 1,
3767 PredicateConstraint::UInt64Range {
3768 min: None,
3769 max: Some(42),
3770 },
3771 );
3772
3773 let ranges = pred.primary_key_ranges(&model).unwrap();
3774 assert_eq!(ranges.len(), 1, "should produce exactly one range");
3775
3776 let range = &ranges[0];
3777
3778 let expected_start = encode_primary_key(
3780 0,
3781 &[
3782 &CellValue::FixedBinary(vec![0xCC; 16]),
3783 &CellValue::UInt64(0),
3784 ],
3785 &model,
3786 )
3787 .expect("pk key encodes");
3788 assert_eq!(
3789 range.start, expected_start,
3790 "start should be entity=CC..CC, version=0"
3791 );
3792
3793 let expected_end_prefix = encode_primary_key(
3795 0,
3796 &[
3797 &CellValue::FixedBinary(vec![0xCC; 16]),
3798 &CellValue::UInt64(42),
3799 ],
3800 &model,
3801 )
3802 .expect("pk key encodes");
3803 assert_eq!(
3805 primary_payload(&model, &range.end, 0, model.primary_key_width),
3806 primary_payload(&model, &expected_end_prefix, 0, model.primary_key_width),
3807 "end prefix should be entity=CC..CC, version=42"
3808 );
3809 assert!(
3811 primary_payload(
3812 &model,
3813 &range.end,
3814 model.primary_key_width,
3815 model.primary_key_codec.payload_capacity_bytes() - model.primary_key_width
3816 )
3817 .iter()
3818 .all(|&b| b == 0xFF),
3819 "end trailing bytes should be 0xFF"
3820 );
3821
3822 let full_range = primary_key_prefix_range(0);
3824 assert_ne!(
3825 range.start, full_range.start,
3826 "range must not be a full table scan"
3827 );
3828 }
3829
3830 #[test]
3831 fn composite_pk_range_pushdown_entity_eq_only() {
3832 let config = KvTableConfig::new(
3836 0,
3837 vec![
3838 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3839 TableColumnConfig::new("version", DataType::UInt64, false),
3840 ],
3841 vec!["entity".to_string(), "version".to_string()],
3842 vec![],
3843 )
3844 .unwrap();
3845 let model = TableModel::from_config(&config).unwrap();
3846
3847 let mut pred = QueryPredicate::default();
3848 pred.constraints
3849 .insert(0, PredicateConstraint::FixedBinaryEq(vec![0xDD; 16]));
3850
3851 let ranges = pred.primary_key_ranges(&model).unwrap();
3852 assert_eq!(ranges.len(), 1);
3853
3854 let range = &ranges[0];
3855 assert_eq!(primary_payload(&model, &range.start, 0, 16), vec![0xDD; 16]);
3857 assert_eq!(primary_payload(&model, &range.end, 0, 16), vec![0xDD; 16]);
3859 assert!(
3860 primary_payload(
3861 &model,
3862 &range.end,
3863 16,
3864 model.primary_key_codec.payload_capacity_bytes() - 16
3865 )
3866 .iter()
3867 .all(|&b| b == 0xFF),
3868 "after entity bytes, everything should be 0xFF"
3869 );
3870 }
3871
3872 #[test]
3873 fn fixed_binary_eq_constraint_extracted() {
3874 let config = KvTableConfig::new(
3875 0,
3876 vec![
3877 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
3878 TableColumnConfig::new("version", DataType::UInt64, false),
3879 ],
3880 vec!["entity".to_string(), "version".to_string()],
3881 vec![],
3882 )
3883 .unwrap();
3884 let model = TableModel::from_config(&config).unwrap();
3885
3886 use datafusion::logical_expr::col;
3888 let entity_literal =
3889 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None);
3890 let filter = col("entity").eq(entity_literal);
3891
3892 assert!(
3893 QueryPredicate::supports_filter(&filter, &model),
3894 "FixedSizeBinary equality should be supported"
3895 );
3896
3897 let pred = QueryPredicate::from_filters(&[filter], &model);
3898 assert!(
3899 matches!(
3900 pred.constraints.get(&0),
3901 Some(PredicateConstraint::FixedBinaryEq(v)) if *v == vec![0xAA; 16]
3902 ),
3903 "should extract FixedBinaryEq constraint"
3904 );
3905 }
3906
3907 #[test]
3908 fn uint64_range_constraint_extracted() {
3909 let config = KvTableConfig::new(
3910 0,
3911 vec![
3912 TableColumnConfig::new("version", DataType::UInt64, false),
3913 TableColumnConfig::new("data", DataType::Utf8, true),
3914 ],
3915 vec!["version".to_string()],
3916 vec![],
3917 )
3918 .unwrap();
3919 let model = TableModel::from_config(&config).unwrap();
3920
3921 use datafusion::logical_expr::col;
3922 let filter = col("version").lt_eq(Expr::Literal(ScalarValue::UInt64(Some(42)), None));
3923
3924 assert!(
3925 QueryPredicate::supports_filter(&filter, &model),
3926 "UInt64 range should be supported"
3927 );
3928
3929 let pred = QueryPredicate::from_filters(&[filter], &model);
3930 assert!(
3931 matches!(
3932 pred.constraints.get(&0),
3933 Some(PredicateConstraint::UInt64Range {
3934 min: None,
3935 max: Some(42)
3936 })
3937 ),
3938 "should extract UInt64Range with max=42"
3939 );
3940 }
3941
3942 #[test]
3943 fn uint64_range_constraint_supports_values_above_i64_max() {
3944 let config = KvTableConfig::new(
3945 0,
3946 vec![
3947 TableColumnConfig::new("version", DataType::UInt64, false),
3948 TableColumnConfig::new("data", DataType::Utf8, true),
3949 ],
3950 vec!["version".to_string()],
3951 vec![],
3952 )
3953 .unwrap();
3954 let model = TableModel::from_config(&config).unwrap();
3955
3956 let threshold = (1u64 << 63) + 5;
3957 use datafusion::logical_expr::col;
3958 let filter =
3959 col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(threshold)), None));
3960
3961 assert!(QueryPredicate::supports_filter(&filter, &model));
3962
3963 let pred = QueryPredicate::from_filters(&[filter], &model);
3964 assert!(matches!(
3965 pred.constraints.get(&0),
3966 Some(PredicateConstraint::UInt64Range {
3967 min: Some(v),
3968 max: None
3969 }) if *v == threshold
3970 ));
3971 }
3972
3973 #[test]
3974 fn unsupported_uint64_comparison_does_not_force_contradiction() {
3975 let config = KvTableConfig::new(
3976 0,
3977 vec![
3978 TableColumnConfig::new("version", DataType::UInt64, false),
3979 TableColumnConfig::new("data", DataType::Utf8, true),
3980 ],
3981 vec!["version".to_string()],
3982 vec![],
3983 )
3984 .unwrap();
3985 let model = TableModel::from_config(&config).unwrap();
3986
3987 use datafusion::logical_expr::col;
3988 let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
3989
3990 assert!(
3991 !QueryPredicate::supports_filter(&unsupported, &model),
3992 "negative Int64 literal on UInt64 column should not be pushdown-supported"
3993 );
3994
3995 let pred = QueryPredicate::from_filters(&[unsupported], &model);
3996 assert!(
3997 !pred.contradiction,
3998 "unsupported filter must not collapse scan to empty result"
3999 );
4000 assert!(
4001 pred.constraints.is_empty(),
4002 "unsupported filter must not contribute pushed constraints"
4003 );
4004 }
4005
4006 #[test]
4007 fn unsupported_uint64_comparison_in_and_keeps_supported_sibling() {
4008 let config = KvTableConfig::new(
4009 0,
4010 vec![
4011 TableColumnConfig::new("version", DataType::UInt64, false),
4012 TableColumnConfig::new("data", DataType::Utf8, true),
4013 ],
4014 vec!["version".to_string()],
4015 vec![],
4016 )
4017 .unwrap();
4018 let model = TableModel::from_config(&config).unwrap();
4019
4020 use datafusion::logical_expr::col;
4021 let supported = col("version").gt_eq(Expr::Literal(ScalarValue::UInt64(Some(10)), None));
4022 let unsupported = col("version").gt(Expr::Literal(ScalarValue::Int64(Some(-1)), None));
4023 let filter = supported.and(unsupported);
4024
4025 assert!(
4026 !QueryPredicate::supports_filter(&filter, &model),
4027 "mixed AND should not be marked fully pushdown-supported"
4028 );
4029
4030 let pred = QueryPredicate::from_filters(&[filter], &model);
4031 assert!(!pred.contradiction);
4032 assert!(matches!(
4033 pred.constraints.get(&0),
4034 Some(PredicateConstraint::UInt64Range {
4035 min: Some(10),
4036 max: None
4037 })
4038 ));
4039 }
4040
4041 #[test]
4042 fn uint64_in_list_pushdown() {
4043 let config = KvTableConfig::new(
4044 0,
4045 vec![
4046 TableColumnConfig::new("version", DataType::UInt64, false),
4047 TableColumnConfig::new("data", DataType::Utf8, true),
4048 ],
4049 vec!["version".to_string()],
4050 vec![],
4051 )
4052 .unwrap();
4053 let model = TableModel::from_config(&config).unwrap();
4054
4055 use datafusion::logical_expr::{col, in_list};
4056 let filter = in_list(
4057 col("version"),
4058 vec![
4059 Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4060 Expr::Literal(ScalarValue::UInt64(Some(5)), None),
4061 Expr::Literal(ScalarValue::UInt64(Some(10)), None),
4062 ],
4063 false,
4064 );
4065
4066 assert!(QueryPredicate::supports_filter(&filter, &model));
4067 let pred = QueryPredicate::from_filters(&[filter], &model);
4068 assert!(
4069 matches!(pred.constraints.get(&0), Some(PredicateConstraint::UInt64In(v)) if v.len() == 3),
4070 "should extract UInt64In with 3 values"
4071 );
4072 }
4073
4074 #[test]
4075 fn uint64_in_list_pushdown_supports_values_above_i64_max() {
4076 let config = KvTableConfig::new(
4077 0,
4078 vec![
4079 TableColumnConfig::new("version", DataType::UInt64, false),
4080 TableColumnConfig::new("data", DataType::Utf8, true),
4081 ],
4082 vec!["version".to_string()],
4083 vec![],
4084 )
4085 .unwrap();
4086 let model = TableModel::from_config(&config).unwrap();
4087
4088 let huge = 1u64 << 63;
4089 use datafusion::logical_expr::{col, in_list};
4090 let filter = in_list(
4091 col("version"),
4092 vec![
4093 Expr::Literal(ScalarValue::UInt64(Some(1)), None),
4094 Expr::Literal(ScalarValue::UInt64(Some(huge)), None),
4095 ],
4096 false,
4097 );
4098
4099 assert!(QueryPredicate::supports_filter(&filter, &model));
4100 let pred = QueryPredicate::from_filters(&[filter], &model);
4101 assert!(matches!(
4102 pred.constraints.get(&0),
4103 Some(PredicateConstraint::UInt64In(v)) if v.contains(&huge) && v.len() == 2
4104 ));
4105 }
4106
4107 #[test]
4108 fn fixed_binary_in_list_pushdown() {
4109 let config = KvTableConfig::new(
4110 0,
4111 vec![
4112 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4113 TableColumnConfig::new("data", DataType::Utf8, true),
4114 ],
4115 vec!["entity".to_string()],
4116 vec![],
4117 )
4118 .unwrap();
4119 let model = TableModel::from_config(&config).unwrap();
4120
4121 use datafusion::logical_expr::{col, in_list};
4122 let filter = in_list(
4123 col("entity"),
4124 vec![
4125 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xAA; 16])), None),
4126 Expr::Literal(ScalarValue::FixedSizeBinary(16, Some(vec![0xBB; 16])), None),
4127 ],
4128 false,
4129 );
4130
4131 assert!(QueryPredicate::supports_filter(&filter, &model));
4132 let pred = QueryPredicate::from_filters(&[filter], &model);
4133 assert!(
4134 matches!(
4135 pred.constraints.get(&0),
4136 Some(PredicateConstraint::FixedBinaryIn(v)) if v.len() == 2
4137 ),
4138 "should extract FixedBinaryIn with 2 values"
4139 );
4140
4141 let ranges = pred.primary_key_ranges(&model).unwrap();
4143 assert_eq!(ranges.len(), 2, "should produce one range per entity");
4144 }
4145
4146 #[test]
4147 fn decimal256_range_pushdown() {
4148 let config = KvTableConfig::new(
4149 0,
4150 vec![
4151 TableColumnConfig::new("id", DataType::Int64, false),
4152 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4153 ],
4154 vec!["id".to_string()],
4155 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4156 )
4157 .unwrap();
4158 let model = TableModel::from_config(&config).unwrap();
4159
4160 use datafusion::logical_expr::col;
4161 let filter = col("big_val").gt_eq(Expr::Literal(
4162 ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4163 None,
4164 ));
4165
4166 assert!(
4167 QueryPredicate::supports_filter(&filter, &model),
4168 "Decimal256 range should be supported"
4169 );
4170
4171 let pred = QueryPredicate::from_filters(&[filter], &model);
4172 let big_idx = *model.columns_by_name.get("big_val").unwrap();
4173 assert!(
4174 matches!(
4175 pred.constraints.get(&big_idx),
4176 Some(PredicateConstraint::Decimal256Range {
4177 min: Some(_),
4178 max: None
4179 })
4180 ),
4181 "should extract Decimal256Range with min=100, no max"
4182 );
4183
4184 let val_in = CellValue::Decimal256(i256::from(200i64));
4186 let val_out = CellValue::Decimal256(i256::from(50i64));
4187 let constraint = pred.constraints.get(&big_idx).unwrap();
4188 assert!(matches_constraint(&val_in, constraint));
4189 assert!(!matches_constraint(&val_out, constraint));
4190 }
4191
4192 #[test]
4193 fn uint64_constraint_matching_does_not_wrap_large_values() {
4194 let gt_zero = PredicateConstraint::UInt64Range {
4195 min: Some(1),
4196 max: None,
4197 };
4198 assert!(matches_constraint(&CellValue::UInt64(1u64 << 63), >_zero));
4199 assert!(!matches_constraint(&CellValue::UInt64(0), >_zero));
4200
4201 let in_list = PredicateConstraint::UInt64In(vec![1, 2, 3]);
4202 assert!(matches_constraint(&CellValue::UInt64(2), &in_list));
4203 assert!(!matches_constraint(
4204 &CellValue::UInt64(1u64 << 63),
4205 &in_list
4206 ));
4207 }
4208
4209 #[test]
4210 fn uint64_empty_range_produces_no_pk_ranges() {
4211 let config = KvTableConfig::new(
4212 0,
4213 vec![TableColumnConfig::new("version", DataType::UInt64, false)],
4214 vec!["version".to_string()],
4215 vec![],
4216 )
4217 .unwrap();
4218 let model = TableModel::from_config(&config).unwrap();
4219 let mut pred = QueryPredicate::default();
4220 pred.constraints.insert(
4221 0,
4222 PredicateConstraint::UInt64Range {
4223 min: Some(10),
4224 max: Some(9),
4225 },
4226 );
4227
4228 let ranges = pred.primary_key_ranges(&model).unwrap();
4229 assert!(ranges.is_empty());
4230 }
4231
4232 #[test]
4233 fn utf8_primary_key_encoding_supports_unicode_and_long_values() {
4234 let config = KvTableConfig::new(
4235 0,
4236 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4237 vec!["id".to_string()],
4238 vec![],
4239 )
4240 .unwrap();
4241 let model = TableModel::from_config(&config).unwrap();
4242
4243 let row_non_ascii = KvRow {
4244 values: vec![CellValue::Utf8("naive-cafe-e9".replace("e9", "\u{00E9}"))],
4245 };
4246 let key_non_ascii = encode_primary_key_from_row(model.table_prefix, &row_non_ascii, &model)
4247 .expect("non-ascii PK should encode");
4248 let decoded_non_ascii = decode_primary_key(model.table_prefix, &key_non_ascii, &model)
4249 .expect("non-ascii PK should decode");
4250 assert!(matches!(
4251 decoded_non_ascii.as_slice(),
4252 [CellValue::Utf8(value)] if value == "naive-cafe-\u{00E9}"
4253 ));
4254
4255 let row_too_long = KvRow {
4256 values: vec![CellValue::Utf8("abcdefghijklmnopq".to_string())],
4257 };
4258 let key_too_long = encode_primary_key_from_row(model.table_prefix, &row_too_long, &model)
4259 .expect("long UTF-8 PK should encode");
4260 let decoded_too_long = decode_primary_key(model.table_prefix, &key_too_long, &model)
4261 .expect("long UTF-8 PK should decode");
4262 assert!(matches!(
4263 decoded_too_long.as_slice(),
4264 [CellValue::Utf8(value)] if value == "abcdefghijklmnopq"
4265 ));
4266 }
4267
4268 #[test]
4269 fn utf8_primary_key_encodes_at_max_codec_payload_and_rejects_overflow() {
4270 let config = KvTableConfig::new(
4271 0,
4272 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4273 vec!["id".to_string()],
4274 vec![],
4275 )
4276 .unwrap();
4277 let model = TableModel::from_config(&config).unwrap();
4278 let max_payload = model.primary_key_codec.payload_capacity_bytes();
4279 let max_value = "a".repeat(max_payload - 1);
4280 let overflow_value = "a".repeat(max_payload);
4281
4282 let key = encode_primary_key_from_row(
4283 model.table_prefix,
4284 &KvRow {
4285 values: vec![CellValue::Utf8(max_value.clone())],
4286 },
4287 &model,
4288 )
4289 .expect("max-length UTF-8 PK should encode");
4290 assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4291 let decoded = decode_primary_key(model.table_prefix, &key, &model)
4292 .expect("max-length PK should decode");
4293 assert!(matches!(
4294 decoded.as_slice(),
4295 [CellValue::Utf8(value)] if value == &max_value
4296 ));
4297
4298 let err = encode_primary_key_from_row(
4299 model.table_prefix,
4300 &KvRow {
4301 values: vec![CellValue::Utf8(overflow_value)],
4302 },
4303 &model,
4304 )
4305 .expect_err("UTF-8 PK exceeding codec payload should be rejected");
4306 assert!(err.contains("primary key payload exceeds codec payload capacity 253 bytes"));
4307 }
4308
4309 #[test]
4310 fn utf8_primary_key_round_trips_embedded_nul() {
4311 let config = KvTableConfig::new(
4312 0,
4313 vec![TableColumnConfig::new("id", DataType::Utf8, false)],
4314 vec!["id".to_string()],
4315 vec![],
4316 )
4317 .unwrap();
4318 let model = TableModel::from_config(&config).unwrap();
4319 let row = KvRow {
4320 values: vec![CellValue::Utf8("AB\0CD".to_string())],
4321 };
4322
4323 let key = encode_primary_key_from_row(model.table_prefix, &row, &model)
4324 .expect("embedded NUL in key text must encode");
4325 let decoded =
4326 decode_primary_key(model.table_prefix, &key, &model).expect("embedded NUL must decode");
4327 assert!(matches!(
4328 decoded.as_slice(),
4329 [CellValue::Utf8(value)] if value == "AB\0CD"
4330 ));
4331 }
4332
4333 #[test]
4334 fn utf8_index_key_round_trips_embedded_nul() {
4335 let config = KvTableConfig::new(
4336 0,
4337 vec![
4338 TableColumnConfig::new("id", DataType::Int64, false),
4339 TableColumnConfig::new("tag", DataType::Utf8, false),
4340 ],
4341 vec!["id".to_string()],
4342 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4343 )
4344 .unwrap();
4345 let model = TableModel::from_config(&config).unwrap();
4346 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4347 let row = KvRow {
4348 values: vec![CellValue::Int64(1), CellValue::Utf8("AB\0CD".to_string())],
4349 };
4350
4351 let key = encode_secondary_index_key(model.table_prefix, &specs[0], &model, &row)
4352 .expect("embedded NUL in index key text must encode");
4353 let decoded = decode_secondary_index_key(model.table_prefix, &specs[0], &model, &key)
4354 .expect("embedded NUL index key must decode");
4355 assert!(matches!(
4356 decoded.values.get(&1),
4357 Some(CellValue::Utf8(value)) if value == "AB\0CD"
4358 ));
4359 }
4360
4361 #[test]
4362 fn secondary_index_with_long_utf8_primary_key_encodes_at_max_payload_and_rejects_overflow() {
4363 let config = KvTableConfig::new(
4364 0,
4365 vec![
4366 TableColumnConfig::new("id", DataType::Utf8, false),
4367 TableColumnConfig::new("tag", DataType::Utf8, false),
4368 ],
4369 vec!["id".to_string()],
4370 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4371 )
4372 .unwrap();
4373 let model = TableModel::from_config(&config).unwrap();
4374 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4375 let spec = &specs[0];
4376 let max_payload = spec.codec.payload_capacity_bytes();
4377 let max_tag = "t".to_string();
4378 let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4379 let overflow_id = format!("{max_id}x");
4380
4381 let key = encode_secondary_index_key(
4382 model.table_prefix,
4383 spec,
4384 &model,
4385 &KvRow {
4386 values: vec![
4387 CellValue::Utf8(max_id.clone()),
4388 CellValue::Utf8(max_tag.clone()),
4389 ],
4390 },
4391 )
4392 .expect("secondary key at max payload should encode");
4393 assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4394 let decoded =
4395 decode_secondary_index_key(model.table_prefix, spec, &model, &key).expect("decode");
4396 assert!(matches!(
4397 decoded.values.get(&1),
4398 Some(CellValue::Utf8(value)) if value == &max_tag
4399 ));
4400 assert!(matches!(
4401 decoded.primary_key_values.as_slice(),
4402 [CellValue::Utf8(value)] if value == &max_id
4403 ));
4404
4405 let err = encode_secondary_index_key(
4406 model.table_prefix,
4407 spec,
4408 &model,
4409 &KvRow {
4410 values: vec![CellValue::Utf8(overflow_id), CellValue::Utf8(max_tag)],
4411 },
4412 )
4413 .expect_err("secondary key exceeding max payload should be rejected");
4414 assert!(err.contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4415 }
4416
4417 #[test]
4418 fn secondary_index_from_parts_with_long_utf8_primary_key_rejects_overflow() {
4419 let config = KvTableConfig::new(
4420 0,
4421 vec![
4422 TableColumnConfig::new("id", DataType::Utf8, false),
4423 TableColumnConfig::new("tag", DataType::Utf8, false),
4424 ],
4425 vec!["id".to_string()],
4426 vec![IndexSpec::new("tag_idx", vec!["tag".to_string()]).unwrap()],
4427 )
4428 .unwrap();
4429 let model = TableModel::from_config(&config).unwrap();
4430 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4431 let spec = &specs[0];
4432 let max_payload = spec.codec.payload_capacity_bytes();
4433 let max_tag = "t".to_string();
4434 let max_id = "i".repeat(max_payload - encode_string_variable(&max_tag).unwrap().len() - 1);
4435 let overflow_id = format!("{max_id}x");
4436 let max_row = KvRow {
4437 values: vec![
4438 CellValue::Utf8(max_id.clone()),
4439 CellValue::Utf8(max_tag.clone()),
4440 ],
4441 };
4442 let encoded_row = encode_base_row_value(&max_row, &model).expect("encode row");
4443 let archived = decode_stored_row(&encoded_row).expect("archive row");
4444
4445 let key = encode_secondary_index_key_from_parts(
4446 model.table_prefix,
4447 spec,
4448 &model,
4449 &[CellValue::Utf8(max_id.clone())],
4450 &archived,
4451 )
4452 .expect("backfill path should encode max payload");
4453 assert_eq!(key.len(), exoware_sdk::keys::MAX_KEY_LEN);
4454
4455 let err = encode_secondary_index_key_from_parts(
4456 model.table_prefix,
4457 spec,
4458 &model,
4459 &[CellValue::Utf8(overflow_id)],
4460 &archived,
4461 )
4462 .expect_err("backfill path overflow should be rejected");
4463 assert!(err
4464 .to_string()
4465 .contains("index 'tag_idx' payload exceeds codec payload capacity 252 bytes"));
4466 }
4467
4468 #[test]
4469 fn primary_key_type_mismatch_returns_error_instead_of_panicking() {
4470 let config = KvTableConfig::new(
4471 0,
4472 vec![TableColumnConfig::new("id", DataType::UInt64, false)],
4473 vec!["id".to_string()],
4474 vec![],
4475 )
4476 .unwrap();
4477 let model = TableModel::from_config(&config).unwrap();
4478 let row = KvRow {
4479 values: vec![CellValue::Int64(7)],
4480 };
4481
4482 let err = encode_primary_key_from_row(model.table_prefix, &row, &model)
4483 .expect_err("mismatched PK type should return an error");
4484 assert!(err.contains("type mismatch while encoding key value"));
4485 }
4486
4487 #[test]
4488 fn choose_index_plan_uses_fixed_binary_leading_constraint() {
4489 let config = KvTableConfig::new(
4490 0,
4491 vec![
4492 TableColumnConfig::new("id", DataType::Int64, false),
4493 TableColumnConfig::new("entity", DataType::FixedSizeBinary(16), false),
4494 ],
4495 vec!["id".to_string()],
4496 vec![IndexSpec::new("entity_idx", vec!["entity".to_string()]).unwrap()],
4497 )
4498 .unwrap();
4499 let model = TableModel::from_config(&config).unwrap();
4500 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4501
4502 use datafusion::logical_expr::col;
4503 let filter = col("entity").eq(Expr::Literal(
4504 ScalarValue::FixedSizeBinary(16, Some(vec![0xAB; 16])),
4505 None,
4506 ));
4507 let pred = QueryPredicate::from_filters(&[filter], &model);
4508 let plan = pred
4509 .choose_index_plan(&model, &specs)
4510 .unwrap()
4511 .expect("fixed-binary equality should choose an index");
4512
4513 assert_eq!(plan.constrained_prefix_len, 1);
4514 assert_eq!(plan.ranges.len(), 1);
4515 let range = &plan.ranges[0];
4516 assert_eq!(
4517 index_payload(&specs[0], &range.start, 0, 16),
4518 vec![0xAB; 16]
4519 );
4520 assert_eq!(index_payload(&specs[0], &range.end, 0, 16), vec![0xAB; 16]);
4521 }
4522
4523 #[test]
4524 fn choose_index_plan_uses_decimal256_leading_constraint() {
4525 let config = KvTableConfig::new(
4526 0,
4527 vec![
4528 TableColumnConfig::new("id", DataType::Int64, false),
4529 TableColumnConfig::new("big_val", DataType::Decimal256(76, 0), false),
4530 ],
4531 vec!["id".to_string()],
4532 vec![IndexSpec::new("big_idx", vec!["big_val".to_string()]).unwrap()],
4533 )
4534 .unwrap();
4535 let model = TableModel::from_config(&config).unwrap();
4536 let specs = model.resolve_index_specs(&config.index_specs).unwrap();
4537
4538 use datafusion::logical_expr::col;
4539 let filter = col("big_val").gt_eq(Expr::Literal(
4540 ScalarValue::Decimal256(Some(i256::from(100i64)), 76, 0),
4541 None,
4542 ));
4543 let pred = QueryPredicate::from_filters(&[filter], &model);
4544 let plan = pred
4545 .choose_index_plan(&model, &specs)
4546 .unwrap()
4547 .expect("decimal256 range should choose an index");
4548
4549 assert_eq!(plan.constrained_prefix_len, 1);
4550 assert_eq!(plan.ranges.len(), 1);
4551 let range = &plan.ranges[0];
4552 assert_eq!(
4553 index_payload(&specs[0], &range.start, 0, 32),
4554 encode_i256_ordered(i256::from(100i64)).to_vec()
4555 );
4556 }
4557
4558 #[tokio::test]
4559 async fn backfill_added_indexes_writes_entries_for_existing_rows() {
4560 let state = MockState {
4561 kv: Arc::new(Mutex::new(BTreeMap::new())),
4562 range_calls: Arc::new(AtomicUsize::new(0)),
4563 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4564 sequence_number: Arc::new(AtomicU64::new(0)),
4565 };
4566 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4567 let client = StoreClient::new(&base_url);
4568
4569 let seed_schema = KvSchema::new(client.clone())
4570 .table(
4571 "orders",
4572 vec![
4573 TableColumnConfig::new("id", DataType::Int64, false),
4574 TableColumnConfig::new("status", DataType::Utf8, false),
4575 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4576 ],
4577 vec!["id".to_string()],
4578 vec![],
4579 )
4580 .expect("seed schema");
4581 let mut writer = seed_schema.batch_writer();
4582 for i in 0..6i64 {
4583 writer
4584 .insert(
4585 "orders",
4586 vec![
4587 CellValue::Int64(i),
4588 CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
4589 CellValue::Int64(i * 10),
4590 ],
4591 )
4592 .expect("seed row");
4593 }
4594 writer.flush().await.expect("seed flush");
4595
4596 {
4597 let guard = state.kv.lock().expect("kv mutex poisoned");
4598 let base_rows = guard
4599 .keys()
4600 .filter(|key| matches_primary_key(0, key))
4601 .count();
4602 let index_rows = guard
4603 .keys()
4604 .filter(|key| matches_secondary_index_key(0, 1, key))
4605 .count();
4606 assert_eq!(base_rows, 6);
4607 assert_eq!(index_rows, 0);
4608 }
4609
4610 let backfill_schema = KvSchema::new(client.clone())
4611 .table(
4612 "orders",
4613 vec![
4614 TableColumnConfig::new("id", DataType::Int64, false),
4615 TableColumnConfig::new("status", DataType::Utf8, false),
4616 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4617 ],
4618 vec!["id".to_string()],
4619 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
4620 .expect("valid index")
4621 .with_cover_columns(vec!["amount_cents".to_string()])],
4622 )
4623 .expect("backfill schema");
4624 let report = backfill_schema
4625 .backfill_added_indexes_with_options(
4626 "orders",
4627 &[],
4628 IndexBackfillOptions {
4629 row_batch_size: 2,
4630 start_from_primary_key: None,
4631 },
4632 )
4633 .await
4634 .expect("backfill should succeed");
4635 assert_eq!(report.scanned_rows, 6);
4636 assert_eq!(report.indexes_backfilled, 1);
4637 assert_eq!(report.index_entries_written, 6);
4638
4639 {
4640 let guard = state.kv.lock().expect("kv mutex poisoned");
4641 let index_rows = guard
4642 .keys()
4643 .filter(|key| matches_secondary_index_key(0, 1, key))
4644 .count();
4645 assert_eq!(index_rows, 6);
4646 let (_, sample_value) = guard
4647 .iter()
4648 .find(|(key, _)| matches_secondary_index_key(0, 1, key))
4649 .expect("backfill should create index entry");
4650 let archived = decode_stored_row(sample_value.as_ref())
4651 .expect("covering value must be valid codec");
4652 assert_eq!(archived.values.len(), 3);
4653 }
4654
4655 let _ = shutdown_tx.send(());
4656 }
4657
4658 #[tokio::test]
4659 async fn backfill_added_indexes_writes_zorder_entries_for_existing_rows() {
4660 let state = MockState {
4661 kv: Arc::new(Mutex::new(BTreeMap::new())),
4662 range_calls: Arc::new(AtomicUsize::new(0)),
4663 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4664 sequence_number: Arc::new(AtomicU64::new(0)),
4665 };
4666 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4667 let client = StoreClient::new(&base_url);
4668
4669 let seed_schema = KvSchema::new(client.clone())
4670 .table(
4671 "points",
4672 vec![
4673 TableColumnConfig::new("x", DataType::Int64, false),
4674 TableColumnConfig::new("y", DataType::Int64, false),
4675 TableColumnConfig::new("id", DataType::Int64, false),
4676 TableColumnConfig::new("value", DataType::Int64, false),
4677 ],
4678 vec!["id".to_string()],
4679 vec![],
4680 )
4681 .expect("seed schema");
4682 let mut writer = seed_schema.batch_writer();
4683 for (x, y, id, value) in [(1, 1, 11, 110), (1, 2, 12, 120), (2, 1, 21, 210)] {
4684 writer
4685 .insert(
4686 "points",
4687 vec![
4688 CellValue::Int64(x),
4689 CellValue::Int64(y),
4690 CellValue::Int64(id),
4691 CellValue::Int64(value),
4692 ],
4693 )
4694 .expect("seed row");
4695 }
4696 writer.flush().await.expect("seed flush");
4697
4698 let backfill_schema = KvSchema::new(client.clone())
4699 .table(
4700 "points",
4701 vec![
4702 TableColumnConfig::new("x", DataType::Int64, false),
4703 TableColumnConfig::new("y", DataType::Int64, false),
4704 TableColumnConfig::new("id", DataType::Int64, false),
4705 TableColumnConfig::new("value", DataType::Int64, false),
4706 ],
4707 vec!["id".to_string()],
4708 vec![
4709 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
4710 .expect("valid index")
4711 .with_cover_columns(vec!["value".to_string()]),
4712 ],
4713 )
4714 .expect("backfill schema");
4715 let report = backfill_schema
4716 .backfill_added_indexes_with_options(
4717 "points",
4718 &[],
4719 IndexBackfillOptions {
4720 row_batch_size: 2,
4721 start_from_primary_key: None,
4722 },
4723 )
4724 .await
4725 .expect("backfill should succeed");
4726 assert_eq!(report.scanned_rows, 3);
4727 assert_eq!(report.index_entries_written, 3);
4728
4729 let guard = state.kv.lock().expect("kv mutex poisoned");
4730 let index_entry = guard
4731 .keys()
4732 .find(|key| matches_secondary_index_key(0, 1, key))
4733 .cloned()
4734 .expect("z-order backfill should create index entry");
4735 let config = KvTableConfig::new(
4736 0,
4737 vec![
4738 TableColumnConfig::new("x", DataType::Int64, false),
4739 TableColumnConfig::new("y", DataType::Int64, false),
4740 TableColumnConfig::new("id", DataType::Int64, false),
4741 TableColumnConfig::new("value", DataType::Int64, false),
4742 ],
4743 vec!["id".to_string()],
4744 vec![
4745 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()]).expect("valid"),
4746 ],
4747 )
4748 .expect("config");
4749 let model = TableModel::from_config(&config).expect("model");
4750 let spec = model
4751 .resolve_index_specs(&config.index_specs)
4752 .expect("specs")
4753 .remove(0);
4754 let decoded = decode_secondary_index_key(model.table_prefix, &spec, &model, &index_entry)
4755 .expect("decode z-order key");
4756 let x_idx = *model.columns_by_name.get("x").unwrap();
4757 let y_idx = *model.columns_by_name.get("y").unwrap();
4758 assert!(matches!(
4759 decoded.values.get(&x_idx),
4760 Some(CellValue::Int64(_))
4761 ));
4762 assert!(matches!(
4763 decoded.values.get(&y_idx),
4764 Some(CellValue::Int64(_))
4765 ));
4766
4767 let _ = shutdown_tx.send(());
4768 }
4769
4770 #[tokio::test]
4771 async fn backfill_added_indexes_requires_append_only_index_evolution() {
4772 let client = StoreClient::new("http://127.0.0.1:1");
4773 let schema = KvSchema::new(client)
4774 .table(
4775 "orders",
4776 vec![
4777 TableColumnConfig::new("id", DataType::Int64, false),
4778 TableColumnConfig::new("status", DataType::Utf8, false),
4779 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4780 ],
4781 vec!["id".to_string()],
4782 vec![
4783 IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid"),
4784 IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid"),
4785 ],
4786 )
4787 .expect("schema");
4788
4789 let previous_specs =
4790 vec![IndexSpec::new("amount_idx", vec!["amount_cents".to_string()]).expect("valid")];
4791 let err = schema
4792 .backfill_added_indexes("orders", &previous_specs)
4793 .await
4794 .expect_err("non-append-only evolution should be rejected");
4795 assert!(err
4796 .to_string()
4797 .contains("index evolution must be append-only"));
4798 }
4799
4800 #[tokio::test]
4801 async fn backfill_added_indexes_is_noop_when_no_new_indexes() {
4802 let client = StoreClient::new("http://127.0.0.1:1");
4803 let existing = IndexSpec::new("status_idx", vec!["status".to_string()])
4804 .expect("valid")
4805 .with_cover_columns(vec!["amount_cents".to_string()]);
4806 let schema = KvSchema::new(client)
4807 .table(
4808 "orders",
4809 vec![
4810 TableColumnConfig::new("id", DataType::Int64, false),
4811 TableColumnConfig::new("status", DataType::Utf8, false),
4812 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4813 ],
4814 vec!["id".to_string()],
4815 vec![existing.clone()],
4816 )
4817 .expect("schema");
4818
4819 let report = schema
4820 .backfill_added_indexes("orders", &[existing])
4821 .await
4822 .expect("no-op backfill should succeed");
4823 assert_eq!(report, IndexBackfillReport::default());
4824 }
4825
4826 #[tokio::test]
4827 async fn backfill_added_indexes_rejects_zero_row_batch_size() {
4828 let client = StoreClient::new("http://127.0.0.1:1");
4829 let schema = KvSchema::new(client)
4830 .table(
4831 "orders",
4832 vec![
4833 TableColumnConfig::new("id", DataType::Int64, false),
4834 TableColumnConfig::new("status", DataType::Utf8, false),
4835 ],
4836 vec!["id".to_string()],
4837 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4838 )
4839 .expect("schema");
4840 let err = schema
4841 .backfill_added_indexes_with_options(
4842 "orders",
4843 &[],
4844 IndexBackfillOptions {
4845 row_batch_size: 0,
4846 start_from_primary_key: None,
4847 },
4848 )
4849 .await
4850 .expect_err("row_batch_size=0 should fail");
4851 assert!(err.to_string().contains("row_batch_size must be > 0"));
4852 }
4853
4854 #[tokio::test]
4855 async fn backfill_added_indexes_emits_progress_events() {
4856 let state = MockState {
4857 kv: Arc::new(Mutex::new(BTreeMap::new())),
4858 range_calls: Arc::new(AtomicUsize::new(0)),
4859 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4860 sequence_number: Arc::new(AtomicU64::new(0)),
4861 };
4862 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4863 let client = StoreClient::new(&base_url);
4864
4865 let seed_schema = KvSchema::new(client.clone())
4866 .table(
4867 "orders",
4868 vec![
4869 TableColumnConfig::new("id", DataType::Int64, false),
4870 TableColumnConfig::new("status", DataType::Utf8, false),
4871 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4872 ],
4873 vec!["id".to_string()],
4874 vec![],
4875 )
4876 .expect("seed schema");
4877 let mut writer = seed_schema.batch_writer();
4878 for i in 0..5i64 {
4879 writer
4880 .insert(
4881 "orders",
4882 vec![
4883 CellValue::Int64(i),
4884 CellValue::Utf8("open".to_string()),
4885 CellValue::Int64(i * 10),
4886 ],
4887 )
4888 .expect("seed row");
4889 }
4890 writer.flush().await.expect("seed flush");
4891
4892 let backfill_schema = KvSchema::new(client.clone())
4893 .table(
4894 "orders",
4895 vec![
4896 TableColumnConfig::new("id", DataType::Int64, false),
4897 TableColumnConfig::new("status", DataType::Utf8, false),
4898 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4899 ],
4900 vec!["id".to_string()],
4901 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
4902 )
4903 .expect("backfill schema");
4904
4905 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
4906 let report = backfill_schema
4907 .backfill_added_indexes_with_options_and_progress(
4908 "orders",
4909 &[],
4910 IndexBackfillOptions {
4911 row_batch_size: 2,
4912 start_from_primary_key: None,
4913 },
4914 Some(&progress_tx),
4915 )
4916 .await
4917 .expect("backfill should succeed");
4918 drop(progress_tx);
4919
4920 let mut saw_started = false;
4921 let mut saw_completed = false;
4922 let mut progress_events = 0usize;
4923 while let Some(event) = progress_rx.recv().await {
4924 match event {
4925 IndexBackfillEvent::Started {
4926 table_name,
4927 indexes_backfilled,
4928 row_batch_size,
4929 ..
4930 } => {
4931 saw_started = true;
4932 assert_eq!(table_name, "orders");
4933 assert_eq!(indexes_backfilled, 1);
4934 assert_eq!(row_batch_size, 2);
4935 }
4936 IndexBackfillEvent::Progress {
4937 scanned_rows,
4938 index_entries_written,
4939 ..
4940 } => {
4941 progress_events += 1;
4942 assert!(scanned_rows >= 1);
4943 assert_eq!(scanned_rows, index_entries_written);
4944 }
4945 IndexBackfillEvent::Completed {
4946 report: completed_report,
4947 } => {
4948 saw_completed = true;
4949 assert_eq!(completed_report, report);
4950 }
4951 }
4952 }
4953 assert!(saw_started);
4954 assert!(saw_completed);
4955 assert!(progress_events >= 1);
4956
4957 let _ = shutdown_tx.send(());
4958 }
4959
4960 #[tokio::test]
4961 async fn backfill_added_indexes_can_resume_from_primary_key() {
4962 let state = MockState {
4963 kv: Arc::new(Mutex::new(BTreeMap::new())),
4964 range_calls: Arc::new(AtomicUsize::new(0)),
4965 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
4966 sequence_number: Arc::new(AtomicU64::new(0)),
4967 };
4968 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
4969 let client = StoreClient::new(&base_url);
4970
4971 let seed_schema = KvSchema::new(client.clone())
4972 .table(
4973 "orders",
4974 vec![
4975 TableColumnConfig::new("id", DataType::Int64, false),
4976 TableColumnConfig::new("status", DataType::Utf8, false),
4977 TableColumnConfig::new("amount_cents", DataType::Int64, false),
4978 ],
4979 vec!["id".to_string()],
4980 vec![],
4981 )
4982 .expect("seed schema");
4983 let mut writer = seed_schema.batch_writer();
4984 for i in 0..6i64 {
4985 writer
4986 .insert(
4987 "orders",
4988 vec![
4989 CellValue::Int64(i),
4990 CellValue::Utf8("open".to_string()),
4991 CellValue::Int64(i * 10),
4992 ],
4993 )
4994 .expect("seed row");
4995 }
4996 writer.flush().await.expect("seed flush");
4997
4998 let backfill_schema = KvSchema::new(client.clone())
4999 .table(
5000 "orders",
5001 vec![
5002 TableColumnConfig::new("id", DataType::Int64, false),
5003 TableColumnConfig::new("status", DataType::Utf8, false),
5004 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5005 ],
5006 vec!["id".to_string()],
5007 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5008 )
5009 .expect("backfill schema");
5010
5011 let config = KvTableConfig::new(
5012 0,
5013 vec![
5014 TableColumnConfig::new("id", DataType::Int64, false),
5015 TableColumnConfig::new("status", DataType::Utf8, false),
5016 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5017 ],
5018 vec!["id".to_string()],
5019 vec![],
5020 )
5021 .expect("valid config");
5022 let model = TableModel::from_config(&config).expect("model");
5023 let resume_value = CellValue::Int64(3);
5024 let resume_key =
5025 encode_primary_key(model.table_prefix, &[&resume_value], &model).expect("resume key");
5026
5027 let report = backfill_schema
5028 .backfill_added_indexes_with_options(
5029 "orders",
5030 &[],
5031 IndexBackfillOptions {
5032 row_batch_size: 2,
5033 start_from_primary_key: Some(resume_key.clone()),
5034 },
5035 )
5036 .await
5037 .expect("resume backfill should succeed");
5038 assert_eq!(report.scanned_rows, 3);
5039 assert_eq!(report.index_entries_written, 3);
5040
5041 {
5042 let guard = state.kv.lock().expect("kv mutex poisoned");
5043 let index_rows = guard
5044 .keys()
5045 .filter(|key| matches_secondary_index_key(0, 1, key))
5046 .count();
5047 assert_eq!(index_rows, 3);
5048 }
5049
5050 let resume_payload = model
5051 .primary_key_codec
5052 .read_payload(&resume_key, 0, model.primary_key_width)
5053 .expect("resume payload");
5054 let wrong_prefix = secondary_index_codec(model.table_prefix, 1)
5055 .expect("secondary codec")
5056 .encode(&resume_payload)
5057 .expect("wrong prefix key");
5058 let err = backfill_schema
5059 .backfill_added_indexes_with_options(
5060 "orders",
5061 &[],
5062 IndexBackfillOptions {
5063 row_batch_size: 2,
5064 start_from_primary_key: Some(wrong_prefix),
5065 },
5066 )
5067 .await
5068 .expect_err("wrong key prefix must be rejected");
5069 assert!(err.to_string().contains("primary-key prefix"));
5070
5071 let _ = shutdown_tx.send(());
5072 }
5073
5074 #[tokio::test]
5075 async fn covering_index_scan_fails_closed_when_covering_payload_missing() {
5076 let state = MockState {
5077 kv: Arc::new(Mutex::new(BTreeMap::new())),
5078 range_calls: Arc::new(AtomicUsize::new(0)),
5079 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5080 sequence_number: Arc::new(AtomicU64::new(0)),
5081 };
5082 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5083 let client = StoreClient::new(&base_url);
5084
5085 let schema = KvSchema::new(client.clone())
5086 .table(
5087 "orders",
5088 vec![
5089 TableColumnConfig::new("id", DataType::Int64, false),
5090 TableColumnConfig::new("status", DataType::Utf8, false),
5091 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5092 ],
5093 vec!["id".to_string()],
5094 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5095 .expect("valid")
5096 .with_cover_columns(vec!["amount_cents".to_string()])],
5097 )
5098 .expect("schema");
5099 let mut writer = schema.batch_writer();
5100 for id in 0..4i64 {
5101 writer
5102 .insert(
5103 "orders",
5104 vec![
5105 CellValue::Int64(id),
5106 CellValue::Utf8("open".to_string()),
5107 CellValue::Int64(id * 10),
5108 ],
5109 )
5110 .expect("row");
5111 }
5112 writer.flush().await.expect("flush");
5113
5114 {
5115 let mut guard = state.kv.lock().expect("kv mutex poisoned");
5116 let key = guard
5117 .keys()
5118 .find(|key| matches_secondary_index_key(0, 1, key))
5119 .expect("index row should exist")
5120 .clone();
5121 guard.insert(key, Bytes::new());
5122 }
5123
5124 let ctx = SessionContext::new();
5125 schema.register_all(&ctx).expect("register");
5126 let df = ctx
5127 .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5128 .await
5129 .expect("query should plan");
5130 let err = df
5131 .collect()
5132 .await
5133 .expect_err("missing covering payload must fail closed");
5134 assert!(err
5135 .to_string()
5136 .contains("secondary index entry missing covering payload"));
5137
5138 let _ = shutdown_tx.send(());
5139 }
5140
5141 #[tokio::test]
5142 async fn covering_index_scan_fails_closed_when_covering_payload_is_corrupt() {
5143 let state = MockState {
5144 kv: Arc::new(Mutex::new(BTreeMap::new())),
5145 range_calls: Arc::new(AtomicUsize::new(0)),
5146 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5147 sequence_number: Arc::new(AtomicU64::new(0)),
5148 };
5149 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5150 let client = StoreClient::new(&base_url);
5151
5152 let schema = KvSchema::new(client.clone())
5153 .table(
5154 "orders",
5155 vec![
5156 TableColumnConfig::new("id", DataType::Int64, false),
5157 TableColumnConfig::new("status", DataType::Utf8, false),
5158 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5159 ],
5160 vec!["id".to_string()],
5161 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5162 .expect("valid")
5163 .with_cover_columns(vec!["amount_cents".to_string()])],
5164 )
5165 .expect("schema");
5166 let mut writer = schema.batch_writer();
5167 for id in 0..4i64 {
5168 writer
5169 .insert(
5170 "orders",
5171 vec![
5172 CellValue::Int64(id),
5173 CellValue::Utf8("open".to_string()),
5174 CellValue::Int64(id * 10),
5175 ],
5176 )
5177 .expect("row");
5178 }
5179 writer.flush().await.expect("flush");
5180
5181 {
5182 let mut guard = state.kv.lock().expect("kv mutex poisoned");
5183 let key = guard
5184 .keys()
5185 .find(|key| matches_secondary_index_key(0, 1, key))
5186 .expect("index row should exist")
5187 .clone();
5188 guard.insert(key, Bytes::from_static(b"not-codec"));
5189 }
5190
5191 let ctx = SessionContext::new();
5192 schema.register_all(&ctx).expect("register");
5193 let df = ctx
5194 .sql("SELECT amount_cents FROM orders WHERE status = 'open'")
5195 .await
5196 .expect("query should plan");
5197 let err = df
5198 .collect()
5199 .await
5200 .expect_err("corrupt covering payload must fail closed");
5201 assert!(err.to_string().contains("invalid covering index payload"));
5202
5203 let _ = shutdown_tx.send(());
5204 }
5205
5206 #[tokio::test]
5207 async fn non_covering_index_uses_point_lookup_instead_of_full_scan() {
5208 let state = MockState {
5209 kv: Arc::new(Mutex::new(BTreeMap::new())),
5210 range_calls: Arc::new(AtomicUsize::new(0)),
5211 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5212 sequence_number: Arc::new(AtomicU64::new(0)),
5213 };
5214 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5215 let client = StoreClient::new(&base_url);
5216
5217 let schema = KvSchema::new(client.clone())
5218 .table(
5219 "orders",
5220 vec![
5221 TableColumnConfig::new("id", DataType::Int64, false),
5222 TableColumnConfig::new("status", DataType::Utf8, false),
5223 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5224 TableColumnConfig::new("notes", DataType::Utf8, true),
5225 ],
5226 vec!["id".to_string()],
5227 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5228 )
5229 .expect("schema");
5230 let mut writer = schema.batch_writer();
5231 writer
5232 .insert(
5233 "orders",
5234 vec![
5235 CellValue::Int64(1),
5236 CellValue::Utf8("open".to_string()),
5237 CellValue::Int64(100),
5238 CellValue::Utf8("first".to_string()),
5239 ],
5240 )
5241 .expect("row");
5242 writer
5243 .insert(
5244 "orders",
5245 vec![
5246 CellValue::Int64(2),
5247 CellValue::Utf8("closed".to_string()),
5248 CellValue::Int64(200),
5249 CellValue::Utf8("second".to_string()),
5250 ],
5251 )
5252 .expect("row");
5253 writer
5254 .insert(
5255 "orders",
5256 vec![
5257 CellValue::Int64(3),
5258 CellValue::Utf8("open".to_string()),
5259 CellValue::Int64(300),
5260 CellValue::Utf8("third".to_string()),
5261 ],
5262 )
5263 .expect("row");
5264 writer.flush().await.expect("flush");
5265
5266 let ctx = SessionContext::new();
5267 schema.register_all(&ctx).expect("register");
5268
5269 let df = ctx
5270 .sql("SELECT id, notes FROM orders WHERE status = 'open' ORDER BY id")
5271 .await
5272 .expect("plan");
5273 let batches = df.collect().await.expect("non-covering index lookup");
5274 let ids: Vec<i64> = batches
5275 .iter()
5276 .flat_map(|b| {
5277 b.column(0)
5278 .as_any()
5279 .downcast_ref::<datafusion::arrow::array::Int64Array>()
5280 .unwrap()
5281 .iter()
5282 .map(|v| v.unwrap())
5283 })
5284 .collect();
5285 let notes: Vec<String> = batches
5286 .iter()
5287 .flat_map(|b| {
5288 b.column(1)
5289 .as_any()
5290 .downcast_ref::<datafusion::arrow::array::StringArray>()
5291 .unwrap()
5292 .iter()
5293 .map(|v| v.unwrap().to_string())
5294 })
5295 .collect();
5296 assert_eq!(ids, vec![1, 3]);
5297 assert_eq!(notes, vec!["first", "third"]);
5298
5299 let _ = shutdown_tx.send(());
5300 }
5301
5302 #[tokio::test]
5303 async fn backfill_resume_cursor_can_continue_without_skips_or_duplicates() {
5304 let state = MockState {
5305 kv: Arc::new(Mutex::new(BTreeMap::new())),
5306 range_calls: Arc::new(AtomicUsize::new(0)),
5307 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5308 sequence_number: Arc::new(AtomicU64::new(0)),
5309 };
5310 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5311 let client = StoreClient::new(&base_url);
5312
5313 let seed_schema = KvSchema::new(client.clone())
5314 .table(
5315 "orders",
5316 vec![
5317 TableColumnConfig::new("id", DataType::Int64, false),
5318 TableColumnConfig::new("status", DataType::Utf8, false),
5319 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5320 ],
5321 vec!["id".to_string()],
5322 vec![],
5323 )
5324 .expect("seed schema");
5325 let mut writer = seed_schema.batch_writer();
5326 for i in 0..8i64 {
5327 writer
5328 .insert(
5329 "orders",
5330 vec![
5331 CellValue::Int64(i),
5332 CellValue::Utf8("open".to_string()),
5333 CellValue::Int64(i * 10),
5334 ],
5335 )
5336 .expect("seed row");
5337 }
5338 writer.flush().await.expect("seed flush");
5339
5340 let backfill_schema = KvSchema::new(client.clone())
5341 .table(
5342 "orders",
5343 vec![
5344 TableColumnConfig::new("id", DataType::Int64, false),
5345 TableColumnConfig::new("status", DataType::Utf8, false),
5346 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5347 ],
5348 vec!["id".to_string()],
5349 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5350 )
5351 .expect("backfill schema");
5352
5353 let task_schema = KvSchema::new(client.clone())
5354 .table(
5355 "orders",
5356 vec![
5357 TableColumnConfig::new("id", DataType::Int64, false),
5358 TableColumnConfig::new("status", DataType::Utf8, false),
5359 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5360 ],
5361 vec!["id".to_string()],
5362 vec![IndexSpec::new("status_idx", vec!["status".to_string()]).expect("valid")],
5363 )
5364 .expect("task schema");
5365 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5366 let handle = tokio::spawn(async move {
5367 task_schema
5368 .backfill_added_indexes_with_options_and_progress(
5369 "orders",
5370 &[],
5371 IndexBackfillOptions {
5372 row_batch_size: 2,
5373 start_from_primary_key: None,
5374 },
5375 Some(&progress_tx),
5376 )
5377 .await
5378 });
5379
5380 let mut resume_cursor = None;
5381 while let Some(event) = progress_rx.recv().await {
5382 if let IndexBackfillEvent::Progress { next_cursor, .. } = event {
5383 resume_cursor = next_cursor;
5384 break;
5385 }
5386 }
5387 handle.abort();
5388 let resume_cursor =
5389 resume_cursor.expect("first progress event should provide resume cursor");
5390
5391 let report = backfill_schema
5392 .backfill_added_indexes_with_options(
5393 "orders",
5394 &[],
5395 IndexBackfillOptions {
5396 row_batch_size: 2,
5397 start_from_primary_key: Some(resume_cursor),
5398 },
5399 )
5400 .await
5401 .expect("resume backfill should succeed");
5402 assert_eq!(report.scanned_rows, 6);
5403
5404 let guard = state.kv.lock().expect("kv mutex poisoned");
5405 let base_rows = guard
5406 .keys()
5407 .filter(|key| matches_primary_key(0, key))
5408 .count();
5409 let index_rows = guard
5410 .keys()
5411 .filter(|key| matches_secondary_index_key(0, 1, key))
5412 .count();
5413 assert_eq!(base_rows, 8);
5414 assert_eq!(
5415 index_rows, 8,
5416 "resume should backfill each row exactly once"
5417 );
5418
5419 let _ = shutdown_tx.send(());
5420 }
5421
5422 #[tokio::test]
5423 async fn concurrent_writes_during_backfill_preserve_index_correctness() {
5424 let state = MockState {
5425 kv: Arc::new(Mutex::new(BTreeMap::new())),
5426 range_calls: Arc::new(AtomicUsize::new(0)),
5427 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
5428 sequence_number: Arc::new(AtomicU64::new(0)),
5429 };
5430 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
5431 let client = StoreClient::new(&base_url);
5432
5433 let seed_schema = KvSchema::new(client.clone())
5434 .table(
5435 "orders",
5436 vec![
5437 TableColumnConfig::new("id", DataType::Int64, false),
5438 TableColumnConfig::new("status", DataType::Utf8, false),
5439 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5440 ],
5441 vec!["id".to_string()],
5442 vec![],
5443 )
5444 .expect("seed schema");
5445 let mut seed_writer = seed_schema.batch_writer();
5446 for i in 0..40i64 {
5447 seed_writer
5448 .insert(
5449 "orders",
5450 vec![
5451 CellValue::Int64(i),
5452 CellValue::Utf8(if i % 2 == 0 { "open" } else { "closed" }.to_string()),
5453 CellValue::Int64(i * 10),
5454 ],
5455 )
5456 .expect("seed row");
5457 }
5458 seed_writer.flush().await.expect("seed flush");
5459
5460 let backfill_schema = KvSchema::new(client.clone())
5461 .table(
5462 "orders",
5463 vec![
5464 TableColumnConfig::new("id", DataType::Int64, false),
5465 TableColumnConfig::new("status", DataType::Utf8, false),
5466 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5467 ],
5468 vec!["id".to_string()],
5469 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5470 .expect("valid")
5471 .with_cover_columns(vec!["amount_cents".to_string()])],
5472 )
5473 .expect("backfill schema");
5474
5475 let task_schema = KvSchema::new(client.clone())
5476 .table(
5477 "orders",
5478 vec![
5479 TableColumnConfig::new("id", DataType::Int64, false),
5480 TableColumnConfig::new("status", DataType::Utf8, false),
5481 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5482 ],
5483 vec!["id".to_string()],
5484 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5485 .expect("valid")
5486 .with_cover_columns(vec!["amount_cents".to_string()])],
5487 )
5488 .expect("task schema");
5489 let (progress_tx, mut progress_rx) = mpsc::unbounded_channel();
5490 let handle = tokio::spawn(async move {
5491 task_schema
5492 .backfill_added_indexes_with_options_and_progress(
5493 "orders",
5494 &[],
5495 IndexBackfillOptions {
5496 row_batch_size: 5,
5497 start_from_primary_key: None,
5498 },
5499 Some(&progress_tx),
5500 )
5501 .await
5502 });
5503
5504 while let Some(event) = progress_rx.recv().await {
5505 if matches!(event, IndexBackfillEvent::Progress { .. }) {
5506 break;
5507 }
5508 }
5509
5510 let mut concurrent_writer = backfill_schema.batch_writer();
5511 for id in [100i64, 101i64] {
5512 concurrent_writer
5513 .insert(
5514 "orders",
5515 vec![
5516 CellValue::Int64(id),
5517 CellValue::Utf8("open".to_string()),
5518 CellValue::Int64(id * 10),
5519 ],
5520 )
5521 .expect("concurrent row");
5522 }
5523 concurrent_writer.flush().await.expect("concurrent flush");
5524
5525 let report = handle
5526 .await
5527 .expect("backfill task join")
5528 .expect("backfill result");
5529 assert!(
5530 report.scanned_rows >= 40,
5531 "backfill should at least scan the original historical rows"
5532 );
5533
5534 let guard = state.kv.lock().expect("kv mutex poisoned");
5535 let base_rows = guard
5536 .keys()
5537 .filter(|key| matches_primary_key(0, key))
5538 .count();
5539 let index_rows = guard
5540 .keys()
5541 .filter(|key| matches_secondary_index_key(0, 1, key))
5542 .count();
5543 assert_eq!(base_rows, 42);
5544 assert_eq!(
5545 index_rows, 42,
5546 "historical backfill plus concurrent indexed writes should leave one index row per base row"
5547 );
5548
5549 let _ = shutdown_tx.send(());
5550 }
5551
5552 #[derive(Clone)]
5553 struct DeferredChunkRangeHarness {
5554 first_chunk_sent: Arc<Notify>,
5555 release_second_chunk: Arc<Notify>,
5556 first_frame: ProtoRangeFrame,
5557 second_frame: ProtoRangeFrame,
5558 }
5559
5560 impl QueryService for DeferredChunkRangeHarness {
5561 async fn get(
5562 &self,
5563 _ctx: Context,
5564 _request: buffa::view::OwnedView<
5565 exoware_sdk::store::query::v1::GetRequestView<'static>,
5566 >,
5567 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5568 Err(ConnectError::unimplemented("test harness"))
5569 }
5570
5571 async fn get_many(
5572 &self,
5573 _ctx: Context,
5574 _request: buffa::view::OwnedView<
5575 exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5576 >,
5577 ) -> Result<
5578 (
5579 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5580 Context,
5581 ),
5582 ConnectError,
5583 > {
5584 Err(ConnectError::unimplemented("test harness"))
5585 }
5586
5587 async fn range(
5588 &self,
5589 _ctx: Context,
5590 _request: buffa::view::OwnedView<
5591 exoware_sdk::store::query::v1::RangeRequestView<'static>,
5592 >,
5593 ) -> Result<
5594 (
5595 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5596 Context,
5597 ),
5598 ConnectError,
5599 > {
5600 let first_chunk_sent = self.first_chunk_sent.clone();
5601 let release_second_chunk = self.release_second_chunk.clone();
5602 let first_frame = self.first_frame.clone();
5603 let second_frame = self.second_frame.clone();
5604 let stream = stream::try_unfold(0u8, move |state| {
5605 let first_chunk_sent = first_chunk_sent.clone();
5606 let release_second_chunk = release_second_chunk.clone();
5607 let first_frame = first_frame.clone();
5608 let second_frame = second_frame.clone();
5609 async move {
5610 match state {
5611 0 => {
5612 first_chunk_sent.notify_one();
5613 Ok(Some((first_frame, 1)))
5614 }
5615 1 => {
5616 release_second_chunk.notified().await;
5617 Ok(Some((second_frame, 2)))
5618 }
5619 _ => Ok(None),
5620 }
5621 }
5622 });
5623 Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5624 }
5625
5626 async fn reduce(
5627 &self,
5628 _ctx: Context,
5629 _request: buffa::view::OwnedView<
5630 exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5631 >,
5632 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5633 Err(ConnectError::unimplemented("test harness"))
5634 }
5635 }
5636
5637 #[derive(Clone)]
5638 struct ObservedLimitRangeHarness {
5639 release_second_chunk: Arc<Notify>,
5640 observed_limit: Arc<AtomicUsize>,
5641 first_frame: ProtoRangeFrame,
5642 second_frame: ProtoRangeFrame,
5643 }
5644
5645 impl QueryService for ObservedLimitRangeHarness {
5646 async fn get(
5647 &self,
5648 _ctx: Context,
5649 _request: buffa::view::OwnedView<
5650 exoware_sdk::store::query::v1::GetRequestView<'static>,
5651 >,
5652 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5653 Err(ConnectError::unimplemented("test harness"))
5654 }
5655
5656 async fn get_many(
5657 &self,
5658 _ctx: Context,
5659 _request: buffa::view::OwnedView<
5660 exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5661 >,
5662 ) -> Result<
5663 (
5664 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5665 Context,
5666 ),
5667 ConnectError,
5668 > {
5669 Err(ConnectError::unimplemented("test harness"))
5670 }
5671
5672 async fn range(
5673 &self,
5674 _ctx: Context,
5675 request: buffa::view::OwnedView<
5676 exoware_sdk::store::query::v1::RangeRequestView<'static>,
5677 >,
5678 ) -> Result<
5679 (
5680 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5681 Context,
5682 ),
5683 ConnectError,
5684 > {
5685 let limit = request.limit.map(|v| v as usize).unwrap_or(usize::MAX);
5686 self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5687 let release_second_chunk = self.release_second_chunk.clone();
5688 let first_frame = self.first_frame.clone();
5689 let second_frame = self.second_frame.clone();
5690 let stream = stream::try_unfold(0u8, move |state| {
5691 let release_second_chunk = release_second_chunk.clone();
5692 let first_frame = first_frame.clone();
5693 let second_frame = second_frame.clone();
5694 async move {
5695 match state {
5696 0 => Ok(Some((first_frame, 1))),
5697 1 => {
5698 if limit > 1 {
5699 release_second_chunk.notified().await;
5700 Ok(Some((second_frame, 2)))
5701 } else {
5702 Ok(None)
5703 }
5704 }
5705 2 => Ok(None),
5706 _ => Ok(None),
5707 }
5708 }
5709 });
5710 Ok((Box::pin(stream), query_detail_trailer_ctx(7)))
5711 }
5712
5713 async fn reduce(
5714 &self,
5715 _ctx: Context,
5716 _request: buffa::view::OwnedView<
5717 exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5718 >,
5719 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5720 Err(ConnectError::unimplemented("test harness"))
5721 }
5722 }
5723
5724 #[derive(Clone)]
5725 struct ObservedLimitIndexRangeHarness {
5726 observed_limit: Arc<AtomicUsize>,
5727 entries_frame: ProtoRangeFrame,
5728 }
5729
5730 impl QueryService for ObservedLimitIndexRangeHarness {
5731 async fn get(
5732 &self,
5733 _ctx: Context,
5734 _request: buffa::view::OwnedView<
5735 exoware_sdk::store::query::v1::GetRequestView<'static>,
5736 >,
5737 ) -> Result<(ProtoGetResponse, Context), ConnectError> {
5738 Err(ConnectError::unimplemented("test harness"))
5739 }
5740
5741 async fn get_many(
5742 &self,
5743 _ctx: Context,
5744 _request: buffa::view::OwnedView<
5745 exoware_sdk::store::query::v1::GetManyRequestView<'static>,
5746 >,
5747 ) -> Result<
5748 (
5749 Pin<Box<dyn Stream<Item = Result<ProtoGetManyFrame, ConnectError>> + Send>>,
5750 Context,
5751 ),
5752 ConnectError,
5753 > {
5754 Err(ConnectError::unimplemented("test harness"))
5755 }
5756
5757 async fn range(
5758 &self,
5759 _ctx: Context,
5760 request: buffa::view::OwnedView<
5761 exoware_sdk::store::query::v1::RangeRequestView<'static>,
5762 >,
5763 ) -> Result<
5764 (
5765 Pin<Box<dyn Stream<Item = Result<ProtoRangeFrame, ConnectError>> + Send>>,
5766 Context,
5767 ),
5768 ConnectError,
5769 > {
5770 let limit = request
5771 .limit
5772 .map(|v| {
5773 if v == u32::MAX {
5774 usize::MAX
5775 } else {
5776 v as usize
5777 }
5778 })
5779 .unwrap_or(usize::MAX);
5780 self.observed_limit.store(limit, AtomicOrdering::SeqCst);
5781 let entries_frame = self.entries_frame.clone();
5782 Ok((
5783 Box::pin(stream::iter(vec![Ok(entries_frame)])),
5784 query_detail_trailer_ctx(7),
5785 ))
5786 }
5787
5788 async fn reduce(
5789 &self,
5790 _ctx: Context,
5791 _request: buffa::view::OwnedView<
5792 exoware_sdk::store::query::v1::ReduceRequestView<'static>,
5793 >,
5794 ) -> Result<(ProtoReduceResponse, Context), ConnectError> {
5795 Err(ConnectError::unimplemented("test harness"))
5796 }
5797 }
5798
5799 #[tokio::test]
5800 async fn kv_scan_streaming_range_reads_emit_first_batch_before_full_range_completes() {
5801 let model = Arc::new(simple_int64_model(0));
5802 let first_chunk_sent = Arc::new(Notify::new());
5803 let release_second_chunk = Arc::new(Notify::new());
5804
5805 let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5806
5807 let first_results = {
5808 let mut results = Vec::with_capacity(BATCH_FLUSH_ROWS);
5809 for id in 0..BATCH_FLUSH_ROWS {
5810 let key =
5811 encode_primary_key(model.table_prefix, &[&CellValue::Int64(id as i64)], &model)
5812 .expect("primary key");
5813 results.push((key, encoded_row.clone()));
5814 }
5815 results
5816 };
5817 let first_frame = proto_range_entries_frame(first_results);
5818
5819 let second_results = {
5820 let key = encode_primary_key(
5821 model.table_prefix,
5822 &[&CellValue::Int64(BATCH_FLUSH_ROWS as i64)],
5823 &model,
5824 )
5825 .expect("primary key");
5826 vec![(key, encoded_row)]
5827 };
5828 let second_frame = proto_range_entries_frame(second_results);
5829
5830 let harness = DeferredChunkRangeHarness {
5831 first_chunk_sent: first_chunk_sent.clone(),
5832 release_second_chunk: release_second_chunk.clone(),
5833 first_frame,
5834 second_frame,
5835 };
5836 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5837 .with_compression(connect_compression_registry());
5838 let app = Router::new().fallback_service(connect);
5839
5840 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5841 .await
5842 .expect("bind test listener");
5843 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5844 tokio::spawn(async move {
5845 axum::serve(listener, app).await.expect("serve test app");
5846 });
5847
5848 let client = StoreClient::new(&url);
5849 let scan = KvScanExec::new(
5850 client,
5851 model.clone(),
5852 Arc::new(Vec::new()),
5853 QueryPredicate::default(),
5854 None,
5855 model.schema.clone(),
5856 None,
5857 );
5858
5859 let session_ctx = SessionContext::new();
5860 let mut stream = scan
5861 .execute(0, session_ctx.task_ctx())
5862 .expect("scan execute should start");
5863
5864 tokio::time::timeout(Duration::from_secs(1), first_chunk_sent.notified())
5865 .await
5866 .expect("server should send first range frame");
5867 let first_batch = tokio::time::timeout(Duration::from_millis(200), stream.try_next())
5868 .await
5869 .expect("first record batch should arrive before the second stream chunk is released")
5870 .expect("stream poll should succeed")
5871 .expect("expected first record batch");
5872 assert_eq!(first_batch.num_rows(), BATCH_FLUSH_ROWS);
5873
5874 release_second_chunk.notify_one();
5875
5876 let second_batch = stream
5877 .try_next()
5878 .await
5879 .expect("second poll should succeed")
5880 .expect("expected second record batch");
5881 assert_eq!(second_batch.num_rows(), 1);
5882 assert!(
5883 stream
5884 .try_next()
5885 .await
5886 .expect("stream completion poll")
5887 .is_none(),
5888 "stream should finish after the second batch"
5889 );
5890 }
5891
5892 #[tokio::test]
5893 async fn kv_scan_sql_limit_is_pushed_upstream_on_exact_streaming_scan() {
5894 let release_second_chunk = Arc::new(Notify::new());
5895 let observed_limit = Arc::new(AtomicUsize::new(0));
5896 let model = simple_int64_model(0);
5897
5898 let encoded_row = (StoredRow { values: vec![None] }).encode().to_vec();
5899
5900 let first_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(1)], &model)
5901 .expect("first primary key");
5902 let second_key = encode_primary_key(model.table_prefix, &[&CellValue::Int64(2)], &model)
5903 .expect("second primary key");
5904
5905 let first_frame = proto_range_entries_frame(vec![(first_key, encoded_row.clone())]);
5906 let second_frame = proto_range_entries_frame(vec![(second_key, encoded_row)]);
5907
5908 let harness = ObservedLimitRangeHarness {
5909 release_second_chunk: release_second_chunk.clone(),
5910 observed_limit: observed_limit.clone(),
5911 first_frame,
5912 second_frame,
5913 };
5914 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
5915 .with_compression(connect_compression_registry());
5916 let app = Router::new().fallback_service(connect);
5917
5918 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
5919 .await
5920 .expect("bind test listener");
5921 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
5922 tokio::spawn(async move {
5923 axum::serve(listener, app).await.expect("serve test app");
5924 });
5925
5926 let client = StoreClient::new(&url);
5927 let schema = KvSchema::new(client)
5928 .table(
5929 "items",
5930 vec![TableColumnConfig::new("id", DataType::Int64, false)],
5931 vec!["id".to_string()],
5932 vec![],
5933 )
5934 .expect("schema");
5935 let ctx = SessionContext::new();
5936 schema.register_all(&ctx).expect("register");
5937
5938 let batches = tokio::time::timeout(Duration::from_millis(200), async {
5939 ctx.sql("SELECT id FROM items LIMIT 1")
5940 .await
5941 .expect("query")
5942 .collect()
5943 .await
5944 .expect("collect")
5945 })
5946 .await
5947 .expect("query with LIMIT 1 should finish without waiting for a delayed second chunk");
5948
5949 assert_eq!(
5950 batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
5951 1
5952 );
5953 assert_eq!(
5954 observed_limit.load(AtomicOrdering::SeqCst),
5955 1,
5956 "exact streaming scan should push SQL LIMIT upstream"
5957 );
5958 release_second_chunk.notify_one();
5959 }
5960
5961 #[tokio::test]
5962 async fn kv_scan_index_limit_does_not_push_upstream_when_seen_dedup_can_drop_entries() {
5963 let observed_limit = Arc::new(AtomicUsize::new(0));
5964 let config = KvTableConfig::new(
5965 0,
5966 vec![
5967 TableColumnConfig::new("id", DataType::Int64, false),
5968 TableColumnConfig::new("status", DataType::Utf8, false),
5969 TableColumnConfig::new("amount_cents", DataType::Int64, false),
5970 ],
5971 vec!["id".to_string()],
5972 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
5973 .expect("valid")
5974 .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
5975 )
5976 .expect("config");
5977 let model = TableModel::from_config(&config).expect("model");
5978 let spec = model
5979 .resolve_index_specs(&config.index_specs)
5980 .expect("specs")
5981 .into_iter()
5982 .next()
5983 .expect("status index spec");
5984 let stale_row = KvRow {
5985 values: vec![
5986 CellValue::Int64(7),
5987 CellValue::Utf8("closed".to_string()),
5988 CellValue::Int64(10),
5989 ],
5990 };
5991 let current_row = KvRow {
5992 values: vec![
5993 CellValue::Int64(7),
5994 CellValue::Utf8("open".to_string()),
5995 CellValue::Int64(10),
5996 ],
5997 };
5998 let unique_row = KvRow {
5999 values: vec![
6000 CellValue::Int64(8),
6001 CellValue::Utf8("open".to_string()),
6002 CellValue::Int64(20),
6003 ],
6004 };
6005 let stale_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &stale_row)
6006 .expect("stale index key");
6007 let current_key =
6008 encode_secondary_index_key(model.table_prefix, &spec, &model, ¤t_row)
6009 .expect("current index key");
6010 let unique_key = encode_secondary_index_key(model.table_prefix, &spec, &model, &unique_row)
6011 .expect("unique index key");
6012 let stale_payload =
6013 encode_secondary_index_value(&stale_row, &model, &spec).expect("stale payload");
6014 let current_payload =
6015 encode_secondary_index_value(¤t_row, &model, &spec).expect("current payload");
6016 let unique_payload =
6017 encode_secondary_index_value(&unique_row, &model, &spec).expect("unique payload");
6018
6019 let entries_frame = proto_range_entries_frame(vec![
6020 (stale_key, stale_payload),
6021 (current_key, current_payload),
6022 (unique_key, unique_payload),
6023 ]);
6024 let harness = ObservedLimitIndexRangeHarness {
6025 observed_limit: observed_limit.clone(),
6026 entries_frame,
6027 };
6028 let connect = ConnectRpcService::new(QueryServiceServer::new(harness))
6029 .with_compression(connect_compression_registry());
6030 let app = Router::new().fallback_service(connect);
6031
6032 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
6033 .await
6034 .expect("bind test listener");
6035 let url = format!("http://{}", listener.local_addr().expect("listener addr"));
6036 tokio::spawn(async move {
6037 axum::serve(listener, app).await.expect("serve test app");
6038 });
6039
6040 let client = StoreClient::new(&url);
6041 let schema = KvSchema::new(client)
6042 .table(
6043 "orders",
6044 vec![
6045 TableColumnConfig::new("id", DataType::Int64, false),
6046 TableColumnConfig::new("status", DataType::Utf8, false),
6047 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6048 ],
6049 vec!["id".to_string()],
6050 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6051 .expect("valid")
6052 .with_cover_columns(vec!["status".to_string(), "amount_cents".to_string()])],
6053 )
6054 .expect("schema");
6055 let ctx = SessionContext::new();
6056 schema.register_all(&ctx).expect("register");
6057
6058 let batches = ctx
6059 .sql(
6060 "SELECT id, amount_cents \
6061 FROM orders \
6062 WHERE status IN ('open', 'closed') \
6063 LIMIT 2",
6064 )
6065 .await
6066 .expect("query")
6067 .collect()
6068 .await
6069 .expect("collect");
6070
6071 assert_eq!(
6072 batches.iter().map(|batch| batch.num_rows()).sum::<usize>(),
6073 2
6074 );
6075 assert_eq!(
6076 observed_limit.load(AtomicOrdering::SeqCst),
6077 usize::MAX,
6078 "index streaming scans should not push SQL LIMIT upstream while seen-dedup can drop duplicate primary keys"
6079 );
6080 }
6081
6082 #[tokio::test]
6083 async fn zorder_covering_index_scan_filters_false_positive_morton_span_rows() {
6084 let state = MockState {
6085 kv: Arc::new(Mutex::new(BTreeMap::new())),
6086 range_calls: Arc::new(AtomicUsize::new(0)),
6087 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6088 sequence_number: Arc::new(AtomicU64::new(0)),
6089 };
6090 let (base_url, shutdown_tx) = spawn_mock_server(state).await;
6091 let client = StoreClient::new(&base_url);
6092
6093 let schema = KvSchema::new(client)
6094 .table(
6095 "points",
6096 vec![
6097 TableColumnConfig::new("x", DataType::Int64, false),
6098 TableColumnConfig::new("y", DataType::Int64, false),
6099 TableColumnConfig::new("id", DataType::Int64, false),
6100 TableColumnConfig::new("value", DataType::Int64, false),
6101 ],
6102 vec!["id".to_string()],
6103 vec![
6104 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6105 .expect("valid")
6106 .with_cover_columns(vec!["value".to_string()]),
6107 ],
6108 )
6109 .expect("schema");
6110
6111 let mut writer = schema.batch_writer();
6112 for (x, y, id, value) in [
6113 (0, 2, 2, 20),
6114 (1, 1, 11, 110),
6115 (1, 2, 12, 120),
6116 (2, 1, 21, 210),
6117 (2, 2, 22, 220),
6118 (3, 0, 30, 300),
6119 ] {
6120 writer
6121 .insert(
6122 "points",
6123 vec![
6124 CellValue::Int64(x),
6125 CellValue::Int64(y),
6126 CellValue::Int64(id),
6127 CellValue::Int64(value),
6128 ],
6129 )
6130 .expect("row");
6131 }
6132 writer.flush().await.expect("flush");
6133
6134 let ctx = SessionContext::new();
6135 schema.register_all(&ctx).expect("register");
6136
6137 let batches = ctx
6138 .sql(
6139 "SELECT id, value FROM points \
6140 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2 \
6141 ORDER BY id",
6142 )
6143 .await
6144 .expect("query")
6145 .collect()
6146 .await
6147 .expect("collect");
6148
6149 let mut rows = Vec::new();
6150 for batch in &batches {
6151 let ids = batch
6152 .column(0)
6153 .as_any()
6154 .downcast_ref::<Int64Array>()
6155 .expect("id int64");
6156 let values = batch
6157 .column(1)
6158 .as_any()
6159 .downcast_ref::<Int64Array>()
6160 .expect("value int64");
6161 for row_idx in 0..batch.num_rows() {
6162 rows.push((ids.value(row_idx), values.value(row_idx)));
6163 }
6164 }
6165 assert_eq!(rows, vec![(11, 110), (12, 120), (21, 210), (22, 220)]);
6166
6167 let _ = shutdown_tx.send(());
6168 }
6169
6170 #[tokio::test]
6171 async fn aggregate_pushdown_uses_range_reduce_for_supported_global_aggregates() {
6172 let state = MockState {
6173 kv: Arc::new(Mutex::new(BTreeMap::new())),
6174 range_calls: Arc::new(AtomicUsize::new(0)),
6175 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6176 sequence_number: Arc::new(AtomicU64::new(0)),
6177 };
6178 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6179 let client = StoreClient::new(&base_url);
6180
6181 let schema = KvSchema::new(client)
6182 .table(
6183 "orders",
6184 vec![
6185 TableColumnConfig::new("id", DataType::Int64, false),
6186 TableColumnConfig::new("status", DataType::Utf8, false),
6187 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6188 ],
6189 vec!["id".to_string()],
6190 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6191 .expect("valid")
6192 .with_cover_columns(vec!["amount_cents".to_string()])],
6193 )
6194 .expect("schema");
6195
6196 let mut writer = schema.batch_writer();
6197 for (id, status, amount) in [
6198 (1, "open", 10),
6199 (2, "closed", 15),
6200 (3, "open", 30),
6201 (4, "closed", 40),
6202 ] {
6203 writer
6204 .insert(
6205 "orders",
6206 vec![
6207 CellValue::Int64(id),
6208 CellValue::Utf8(status.to_string()),
6209 CellValue::Int64(amount),
6210 ],
6211 )
6212 .expect("row");
6213 }
6214 writer.flush().await.expect("flush");
6215
6216 let ctx = SessionContext::new();
6217 schema.register_all(&ctx).expect("register");
6218
6219 state.range_calls.store(0, AtomicOrdering::SeqCst);
6220 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6221
6222 let df = ctx
6223 .sql(
6224 "SELECT COUNT(*) AS row_count, SUM(amount_cents) AS total_cents, \
6225 AVG(amount_cents) AS avg_cents \
6226 FROM orders WHERE status = 'open'",
6227 )
6228 .await
6229 .expect("query");
6230 let batches = df.collect().await.expect("collect");
6231
6232 assert_eq!(batches.len(), 1);
6233 let batch = &batches[0];
6234 let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6235 let total = batch
6236 .column(1)
6237 .as_any()
6238 .downcast_ref::<Int64Array>()
6239 .expect("sum int64")
6240 .value(0);
6241 let avg = batch
6242 .column(2)
6243 .as_any()
6244 .downcast_ref::<Float64Array>()
6245 .expect("avg float64")
6246 .value(0);
6247 assert!(matches!(
6248 row_count,
6249 ScalarValue::UInt64(Some(2)) | ScalarValue::Int64(Some(2))
6250 ));
6251 assert_eq!(total, 40);
6252 assert_eq!(avg, 20.0);
6253 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6254 assert!(
6255 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6256 "supported aggregate should use range reduction path"
6257 );
6258
6259 let _ = shutdown_tx.send(());
6260 }
6261
6262 #[tokio::test]
6264 async fn primary_key_inclusive_upper_bound_streaming_scan_uses_range() {
6265 let state = MockState {
6266 kv: Arc::new(Mutex::new(BTreeMap::new())),
6267 range_calls: Arc::new(AtomicUsize::new(0)),
6268 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6269 sequence_number: Arc::new(AtomicU64::new(0)),
6270 };
6271 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6272 let client = StoreClient::new(&base_url);
6273
6274 let schema = KvSchema::new(client)
6275 .table(
6276 "inc_pk",
6277 vec![
6278 TableColumnConfig::new("id", DataType::Int64, false),
6279 TableColumnConfig::new("amount", DataType::Int64, false),
6280 ],
6281 vec!["id".to_string()],
6282 vec![],
6283 )
6284 .expect("schema");
6285
6286 let mut writer = schema.batch_writer();
6287 for id in 1i64..=5i64 {
6288 writer
6289 .insert(
6290 "inc_pk",
6291 vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6292 )
6293 .expect("row");
6294 }
6295 writer.flush().await.expect("flush");
6296
6297 let ctx = SessionContext::new();
6298 schema.register_all(&ctx).expect("register");
6299
6300 state.range_calls.store(0, AtomicOrdering::SeqCst);
6301 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6302
6303 let batches = ctx
6304 .sql("SELECT id FROM inc_pk WHERE id <= 3 ORDER BY id")
6305 .await
6306 .expect("lte query")
6307 .collect()
6308 .await
6309 .expect("collect");
6310 let mut ids = Vec::new();
6311 for batch in &batches {
6312 let col = batch
6313 .column(0)
6314 .as_any()
6315 .downcast_ref::<Int64Array>()
6316 .expect("id");
6317 for i in 0..batch.num_rows() {
6318 ids.push(col.value(i));
6319 }
6320 }
6321 assert_eq!(ids, vec![1, 2, 3], "id <= 3 must include id 3");
6322 assert!(
6323 state.range_calls.load(AtomicOrdering::SeqCst) >= 1,
6324 "PK bounded scan should call range"
6325 );
6326 assert_eq!(
6327 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6328 0,
6329 "streaming scan must not use range_reduce"
6330 );
6331
6332 state.range_calls.store(0, AtomicOrdering::SeqCst);
6333 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6334
6335 let batches = ctx
6336 .sql("SELECT id FROM inc_pk WHERE id BETWEEN 2 AND 4 ORDER BY id")
6337 .await
6338 .expect("between query")
6339 .collect()
6340 .await
6341 .expect("collect");
6342 ids.clear();
6343 for batch in &batches {
6344 let col = batch
6345 .column(0)
6346 .as_any()
6347 .downcast_ref::<Int64Array>()
6348 .expect("id");
6349 for i in 0..batch.num_rows() {
6350 ids.push(col.value(i));
6351 }
6352 }
6353 assert_eq!(ids, vec![2, 3, 4], "BETWEEN must include both endpoints");
6354 assert!(state.range_calls.load(AtomicOrdering::SeqCst) >= 1);
6355 assert_eq!(state.range_reduce_calls.load(AtomicOrdering::SeqCst), 0);
6356
6357 let _ = shutdown_tx.send(());
6358 }
6359
6360 #[tokio::test]
6361 async fn primary_key_inclusive_upper_bound_scalar_aggregates_use_range_reduce() {
6362 let state = MockState {
6363 kv: Arc::new(Mutex::new(BTreeMap::new())),
6364 range_calls: Arc::new(AtomicUsize::new(0)),
6365 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6366 sequence_number: Arc::new(AtomicU64::new(0)),
6367 };
6368 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6369 let client = StoreClient::new(&base_url);
6370
6371 let schema = KvSchema::new(client)
6372 .table(
6373 "inc_pk",
6374 vec![
6375 TableColumnConfig::new("id", DataType::Int64, false),
6376 TableColumnConfig::new("amount", DataType::Int64, false),
6377 ],
6378 vec!["id".to_string()],
6379 vec![],
6380 )
6381 .expect("schema");
6382
6383 let mut writer = schema.batch_writer();
6384 for id in 1i64..=5i64 {
6385 writer
6386 .insert(
6387 "inc_pk",
6388 vec![CellValue::Int64(id), CellValue::Int64(id * 100)],
6389 )
6390 .expect("row");
6391 }
6392 writer.flush().await.expect("flush");
6393
6394 let ctx = SessionContext::new();
6395 schema.register_all(&ctx).expect("register");
6396
6397 state.range_calls.store(0, AtomicOrdering::SeqCst);
6398 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6399
6400 let batches = ctx
6401 .sql("SELECT COUNT(*) AS c, SUM(amount) AS s FROM inc_pk WHERE id <= 3")
6402 .await
6403 .expect("lte agg")
6404 .collect()
6405 .await
6406 .expect("collect");
6407 assert_eq!(batches.len(), 1);
6408 let batch = &batches[0];
6409 let c = ScalarValue::try_from_array(batch.column(0), 0).expect("count");
6410 assert!(
6411 matches!(
6412 c,
6413 ScalarValue::UInt64(Some(3)) | ScalarValue::Int64(Some(3))
6414 ),
6415 "count should include id=3"
6416 );
6417 assert_eq!(
6418 batch
6419 .column(1)
6420 .as_any()
6421 .downcast_ref::<Int64Array>()
6422 .expect("sum")
6423 .value(0),
6424 100 + 200 + 300
6425 );
6426 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6427 assert!(
6428 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6429 "scalar aggregate on PK range should use range_reduce"
6430 );
6431
6432 state.range_calls.store(0, AtomicOrdering::SeqCst);
6433 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6434
6435 let batches = ctx
6436 .sql("SELECT SUM(amount) AS s FROM inc_pk WHERE id BETWEEN 2 AND 4")
6437 .await
6438 .expect("between agg")
6439 .collect()
6440 .await
6441 .expect("collect");
6442 assert_eq!(batches.len(), 1);
6443 let batch = &batches[0];
6444 assert_eq!(
6445 batch
6446 .column(0)
6447 .as_any()
6448 .downcast_ref::<Int64Array>()
6449 .expect("sum")
6450 .value(0),
6451 200 + 300 + 400
6452 );
6453 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6454 assert!(
6455 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6456 "BETWEEN aggregate should use range_reduce"
6457 );
6458
6459 let _ = shutdown_tx.send(());
6460 }
6461
6462 #[tokio::test]
6463 async fn aggregate_pushdown_uses_zorder_index_with_worker_filter() {
6464 let state = MockState {
6465 kv: Arc::new(Mutex::new(BTreeMap::new())),
6466 range_calls: Arc::new(AtomicUsize::new(0)),
6467 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6468 sequence_number: Arc::new(AtomicU64::new(0)),
6469 };
6470 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6471 let client = StoreClient::new(&base_url);
6472
6473 let schema = KvSchema::new(client)
6474 .table(
6475 "points",
6476 vec![
6477 TableColumnConfig::new("x", DataType::Int64, false),
6478 TableColumnConfig::new("y", DataType::Int64, false),
6479 TableColumnConfig::new("id", DataType::Int64, false),
6480 TableColumnConfig::new("value", DataType::Int64, false),
6481 ],
6482 vec!["id".to_string()],
6483 vec![
6484 IndexSpec::z_order("xy_z", vec!["x".to_string(), "y".to_string()])
6485 .expect("valid")
6486 .with_cover_columns(vec!["value".to_string()]),
6487 ],
6488 )
6489 .expect("schema");
6490
6491 let mut writer = schema.batch_writer();
6492 for (x, y, id, value) in [
6493 (0, 2, 2, 20),
6494 (1, 1, 11, 110),
6495 (1, 2, 12, 120),
6496 (2, 1, 21, 210),
6497 (2, 2, 22, 220),
6498 (3, 0, 30, 300),
6499 ] {
6500 writer
6501 .insert(
6502 "points",
6503 vec![
6504 CellValue::Int64(x),
6505 CellValue::Int64(y),
6506 CellValue::Int64(id),
6507 CellValue::Int64(value),
6508 ],
6509 )
6510 .expect("row");
6511 }
6512 writer.flush().await.expect("flush");
6513
6514 let ctx = SessionContext::new();
6515 schema.register_all(&ctx).expect("register");
6516
6517 state.range_calls.store(0, AtomicOrdering::SeqCst);
6518 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6519
6520 let batches = ctx
6521 .sql(
6522 "SELECT COUNT(*) AS row_count, SUM(value) AS total_value \
6523 FROM points \
6524 WHERE x >= 1 AND x <= 2 AND y >= 1 AND y <= 2",
6525 )
6526 .await
6527 .expect("query")
6528 .collect()
6529 .await
6530 .expect("collect");
6531
6532 assert_eq!(batches.len(), 1);
6533 let batch = &batches[0];
6534 let row_count = ScalarValue::try_from_array(batch.column(0), 0).expect("row_count scalar");
6535 let total = batch
6536 .column(1)
6537 .as_any()
6538 .downcast_ref::<Int64Array>()
6539 .expect("sum int64")
6540 .value(0);
6541 assert!(matches!(
6542 row_count,
6543 ScalarValue::UInt64(Some(4)) | ScalarValue::Int64(Some(4))
6544 ));
6545 assert_eq!(total, 660);
6546 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6547 assert!(
6548 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6549 "z-order aggregate should use range reduction path"
6550 );
6551
6552 let _ = shutdown_tx.send(());
6553 }
6554
6555 #[tokio::test]
6556 async fn aggregate_pushdown_avg_merges_sum_and_count_across_multiple_ranges() {
6557 let state = MockState {
6558 kv: Arc::new(Mutex::new(BTreeMap::new())),
6559 range_calls: Arc::new(AtomicUsize::new(0)),
6560 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6561 sequence_number: Arc::new(AtomicU64::new(0)),
6562 };
6563 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6564 let client = StoreClient::new(&base_url);
6565
6566 let schema = KvSchema::new(client)
6567 .table(
6568 "orders",
6569 vec![
6570 TableColumnConfig::new("id", DataType::Int64, false),
6571 TableColumnConfig::new("status", DataType::Utf8, false),
6572 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6573 ],
6574 vec!["id".to_string()],
6575 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6576 .expect("valid")
6577 .with_cover_columns(vec!["amount_cents".to_string()])],
6578 )
6579 .expect("schema");
6580
6581 let mut writer = schema.batch_writer();
6582 for (id, status, amount) in [
6583 (1, "open", 10),
6584 (2, "open", 20),
6585 (3, "closed", 100),
6586 (4, "pending", 1_000),
6587 ] {
6588 writer
6589 .insert(
6590 "orders",
6591 vec![
6592 CellValue::Int64(id),
6593 CellValue::Utf8(status.to_string()),
6594 CellValue::Int64(amount),
6595 ],
6596 )
6597 .expect("row");
6598 }
6599 writer.flush().await.expect("flush");
6600
6601 let ctx = SessionContext::new();
6602 schema.register_all(&ctx).expect("register");
6603
6604 state.range_calls.store(0, AtomicOrdering::SeqCst);
6605 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6606
6607 let batches = ctx
6608 .sql(
6609 "SELECT AVG(amount_cents) AS avg_cents \
6610 FROM orders \
6611 WHERE status IN ('open', 'closed')",
6612 )
6613 .await
6614 .expect("query")
6615 .collect()
6616 .await
6617 .expect("collect");
6618
6619 assert_eq!(batches.len(), 1);
6620 let batch = &batches[0];
6621 let avg = batch
6622 .column(0)
6623 .as_any()
6624 .downcast_ref::<Float64Array>()
6625 .expect("avg float64")
6626 .value(0);
6627 let expected = 130.0 / 3.0;
6628 assert!(
6629 (avg - expected).abs() < 1e-12,
6630 "AVG should merge SUM+COUNT across unequal-count ranges: got {avg}, expected {expected}"
6631 );
6632 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6633 assert_eq!(
6634 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
6635 2,
6636 "status IN (...) should expand to two pushed reduction ranges"
6637 );
6638
6639 let _ = shutdown_tx.send(());
6640 }
6641
6642 #[tokio::test]
6643 async fn aggregate_pushdown_supports_filtered_global_aggregates() {
6644 let state = MockState {
6645 kv: Arc::new(Mutex::new(BTreeMap::new())),
6646 range_calls: Arc::new(AtomicUsize::new(0)),
6647 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6648 sequence_number: Arc::new(AtomicU64::new(0)),
6649 };
6650 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6651 let client = StoreClient::new(&base_url);
6652
6653 let schema = KvSchema::new(client)
6654 .table(
6655 "orders",
6656 vec![
6657 TableColumnConfig::new("id", DataType::Int64, false),
6658 TableColumnConfig::new("status", DataType::Utf8, false),
6659 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6660 ],
6661 vec!["id".to_string()],
6662 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6663 .expect("valid")
6664 .with_cover_columns(vec!["amount_cents".to_string()])],
6665 )
6666 .expect("schema");
6667
6668 let mut writer = schema.batch_writer();
6669 for (id, status, amount) in [
6670 (1, "open", 10),
6671 (2, "closed", 15),
6672 (3, "open", 30),
6673 (4, "closed", 40),
6674 ] {
6675 writer
6676 .insert(
6677 "orders",
6678 vec![
6679 CellValue::Int64(id),
6680 CellValue::Utf8(status.to_string()),
6681 CellValue::Int64(amount),
6682 ],
6683 )
6684 .expect("row");
6685 }
6686 writer.flush().await.expect("flush");
6687
6688 let ctx = SessionContext::new();
6689 schema.register_all(&ctx).expect("register");
6690
6691 state.range_calls.store(0, AtomicOrdering::SeqCst);
6692 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6693
6694 let query = "SELECT COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
6695 COUNT(*) FILTER (WHERE status = 'closed') AS closed_count, \
6696 AVG(amount_cents) FILTER (WHERE status = 'closed') AS closed_avg \
6697 FROM orders";
6698 let batches = ctx
6699 .sql(query)
6700 .await
6701 .expect("query")
6702 .collect()
6703 .await
6704 .expect("collect");
6705
6706 assert_eq!(batches.len(), 1);
6707 let batch = &batches[0];
6708 assert_count_scalar(batch, 0, 0, 2);
6709 assert_count_scalar(batch, 1, 0, 2);
6710 let closed_avg = batch
6711 .column(2)
6712 .as_any()
6713 .downcast_ref::<Float64Array>()
6714 .expect("avg float64")
6715 .value(0);
6716 assert_eq!(closed_avg, 27.5);
6717 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6718 assert!(
6719 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6720 "filtered aggregate pushdown should use dedicated reduction jobs"
6721 );
6722
6723 let _ = shutdown_tx.send(());
6724 }
6725
6726 #[tokio::test]
6727 async fn aggregate_pushdown_supports_case_filtered_global_aggregates() {
6728 let state = MockState {
6729 kv: Arc::new(Mutex::new(BTreeMap::new())),
6730 range_calls: Arc::new(AtomicUsize::new(0)),
6731 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6732 sequence_number: Arc::new(AtomicU64::new(0)),
6733 };
6734 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6735 let client = StoreClient::new(&base_url);
6736
6737 let schema = KvSchema::new(client)
6738 .table(
6739 "orders",
6740 vec![
6741 TableColumnConfig::new("id", DataType::Int64, false),
6742 TableColumnConfig::new("status", DataType::Utf8, false),
6743 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6744 ],
6745 vec!["id".to_string()],
6746 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6747 .expect("valid")
6748 .with_cover_columns(vec!["amount_cents".to_string()])],
6749 )
6750 .expect("schema");
6751
6752 let mut writer = schema.batch_writer();
6753 for (id, status, amount) in [
6754 (1, "open", 10),
6755 (2, "closed", 15),
6756 (3, "open", 30),
6757 (4, "closed", 40),
6758 ] {
6759 writer
6760 .insert(
6761 "orders",
6762 vec![
6763 CellValue::Int64(id),
6764 CellValue::Utf8(status.to_string()),
6765 CellValue::Int64(amount),
6766 ],
6767 )
6768 .expect("row");
6769 }
6770 writer.flush().await.expect("flush");
6771
6772 let ctx = SessionContext::new();
6773 schema.register_all(&ctx).expect("register");
6774
6775 state.range_calls.store(0, AtomicOrdering::SeqCst);
6776 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6777
6778 let query = "SELECT SUM(CASE status WHEN 'open' THEN amount_cents END) AS open_total, \
6779 COUNT(CASE status WHEN 'closed' THEN 1 END) AS closed_count, \
6780 AVG(CASE WHEN status = 'closed' THEN amount_cents END) AS closed_avg \
6781 FROM orders";
6782 let batches = ctx
6783 .sql(query)
6784 .await
6785 .expect("query")
6786 .collect()
6787 .await
6788 .expect("collect");
6789
6790 assert_eq!(batches.len(), 1);
6791 let batch = &batches[0];
6792 assert_eq!(
6793 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6794 ScalarValue::Int64(Some(40))
6795 );
6796 assert_count_scalar(batch, 1, 0, 2);
6797 let closed_avg = batch
6798 .column(2)
6799 .as_any()
6800 .downcast_ref::<Float64Array>()
6801 .expect("avg float64")
6802 .value(0);
6803 assert_eq!(closed_avg, 27.5);
6804 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6805 assert!(
6806 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
6807 "case-based conditional aggregates should use reduction jobs"
6808 );
6809
6810 let _ = shutdown_tx.send(());
6811 }
6812
6813 #[tokio::test]
6814 async fn aggregate_pushdown_supports_casted_group_and_aggregate_expressions() {
6815 let state = MockState {
6816 kv: Arc::new(Mutex::new(BTreeMap::new())),
6817 range_calls: Arc::new(AtomicUsize::new(0)),
6818 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6819 sequence_number: Arc::new(AtomicU64::new(0)),
6820 };
6821 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6822 let client = StoreClient::new(&base_url);
6823
6824 let schema = KvSchema::new(client)
6825 .table(
6826 "orders",
6827 vec![
6828 TableColumnConfig::new("id", DataType::Int64, false),
6829 TableColumnConfig::new("status", DataType::Utf8, false),
6830 TableColumnConfig::new("amount_cents", DataType::Int64, false),
6831 ],
6832 vec!["id".to_string()],
6833 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
6834 .expect("valid")
6835 .with_cover_columns(vec!["amount_cents".to_string()])],
6836 )
6837 .expect("schema");
6838
6839 let mut writer = schema.batch_writer();
6840 for (id, status, amount) in [
6841 (1, "open", 10),
6842 (2, "open", 30),
6843 (3, "closed", 15),
6844 (4, "closed", 40),
6845 ] {
6846 writer
6847 .insert(
6848 "orders",
6849 vec![
6850 CellValue::Int64(id),
6851 CellValue::Utf8(status.to_string()),
6852 CellValue::Int64(amount),
6853 ],
6854 )
6855 .expect("row");
6856 }
6857 writer.flush().await.expect("flush");
6858
6859 let ctx = SessionContext::new();
6860 schema.register_all(&ctx).expect("register");
6861
6862 state.range_calls.store(0, AtomicOrdering::SeqCst);
6863 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6864
6865 let batches = ctx
6866 .sql(
6867 "SELECT CAST(status AS VARCHAR) AS status_text, \
6868 SUM(CAST(amount_cents AS DOUBLE)) AS total_cents \
6869 FROM orders \
6870 GROUP BY CAST(status AS VARCHAR) \
6871 ORDER BY status_text",
6872 )
6873 .await
6874 .expect("query")
6875 .collect()
6876 .await
6877 .expect("collect");
6878
6879 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
6880 let batch = &batches[0];
6881 let status = ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar");
6882 assert_eq!(scalar_to_string(&status).as_deref(), Some("closed"));
6883 let closed_total = batch
6884 .column(1)
6885 .as_any()
6886 .downcast_ref::<Float64Array>()
6887 .expect("sum float64")
6888 .value(0);
6889 let open_total = batch
6890 .column(1)
6891 .as_any()
6892 .downcast_ref::<Float64Array>()
6893 .expect("sum float64")
6894 .value(1);
6895 assert_eq!(closed_total, 55.0);
6896 assert_eq!(open_total, 40.0);
6897 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6898 assert!(
6899 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
6900 "casted grouped aggregates should stay on the reduction path"
6901 );
6902
6903 let _ = shutdown_tx.send(());
6904 }
6905
6906 #[tokio::test]
6907 async fn aggregate_pushdown_supports_computed_aggregate_inputs() {
6908 let state = MockState {
6909 kv: Arc::new(Mutex::new(BTreeMap::new())),
6910 range_calls: Arc::new(AtomicUsize::new(0)),
6911 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6912 sequence_number: Arc::new(AtomicU64::new(0)),
6913 };
6914 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6915 let client = StoreClient::new(&base_url);
6916
6917 let schema = KvSchema::new(client)
6918 .table(
6919 "orders",
6920 vec![
6921 TableColumnConfig::new("id", DataType::Int64, false),
6922 TableColumnConfig::new("price_cents", DataType::Int64, false),
6923 TableColumnConfig::new("qty", DataType::Int64, false),
6924 TableColumnConfig::new("duration_ms", DataType::Int64, false),
6925 ],
6926 vec!["id".to_string()],
6927 vec![],
6928 )
6929 .expect("schema");
6930
6931 let mut writer = schema.batch_writer();
6932 for (id, price, qty, duration_ms) in [(1, 10, 2, 500), (2, 15, 3, 2500), (3, 7, 4, 1000)] {
6933 writer
6934 .insert(
6935 "orders",
6936 vec![
6937 CellValue::Int64(id),
6938 CellValue::Int64(price),
6939 CellValue::Int64(qty),
6940 CellValue::Int64(duration_ms),
6941 ],
6942 )
6943 .expect("row");
6944 }
6945 writer.flush().await.expect("flush");
6946
6947 let ctx = SessionContext::new();
6948 schema.register_all(&ctx).expect("register");
6949
6950 state.range_calls.store(0, AtomicOrdering::SeqCst);
6951 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
6952
6953 let batches = ctx
6954 .sql(
6955 "SELECT SUM(price_cents * qty) AS total_revenue, \
6956 AVG(duration_ms / 1e3) AS avg_seconds \
6957 FROM orders",
6958 )
6959 .await
6960 .expect("query")
6961 .collect()
6962 .await
6963 .expect("collect");
6964
6965 assert_eq!(batches.len(), 1);
6966 let batch = &batches[0];
6967 assert_eq!(
6968 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
6969 ScalarValue::Int64(Some(93))
6970 );
6971 let avg_seconds = batch
6972 .column(1)
6973 .as_any()
6974 .downcast_ref::<Float64Array>()
6975 .expect("avg float64")
6976 .value(0);
6977 assert!((avg_seconds - (4.0 / 3.0)).abs() < 1e-12);
6978 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
6979 assert!(
6980 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
6981 "computed aggregate inputs should use reduction jobs"
6982 );
6983
6984 let _ = shutdown_tx.send(());
6985 }
6986
6987 #[tokio::test]
6988 async fn aggregate_pushdown_supports_add_and_subtract_inputs() {
6989 let state = MockState {
6990 kv: Arc::new(Mutex::new(BTreeMap::new())),
6991 range_calls: Arc::new(AtomicUsize::new(0)),
6992 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
6993 sequence_number: Arc::new(AtomicU64::new(0)),
6994 };
6995 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
6996 let client = StoreClient::new(&base_url);
6997
6998 let schema = KvSchema::new(client)
6999 .table(
7000 "orders",
7001 vec![
7002 TableColumnConfig::new("id", DataType::Int64, false),
7003 TableColumnConfig::new("price_cents", DataType::Int64, false),
7004 TableColumnConfig::new("fee_cents", DataType::Int64, false),
7005 TableColumnConfig::new("discount_cents", DataType::Int64, false),
7006 ],
7007 vec!["id".to_string()],
7008 vec![],
7009 )
7010 .expect("schema");
7011
7012 let mut writer = schema.batch_writer();
7013 for (id, price, fee, discount) in [(1, 10, 2, 1), (2, 15, 3, 4), (3, 7, 1, 2)] {
7014 writer
7015 .insert(
7016 "orders",
7017 vec![
7018 CellValue::Int64(id),
7019 CellValue::Int64(price),
7020 CellValue::Int64(fee),
7021 CellValue::Int64(discount),
7022 ],
7023 )
7024 .expect("row");
7025 }
7026 writer.flush().await.expect("flush");
7027
7028 let ctx = SessionContext::new();
7029 schema.register_all(&ctx).expect("register");
7030
7031 state.range_calls.store(0, AtomicOrdering::SeqCst);
7032 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7033
7034 let batches = ctx
7035 .sql(
7036 "SELECT SUM(price_cents + fee_cents) AS gross_plus_fee, \
7037 SUM(price_cents - discount_cents) AS net_total \
7038 FROM orders",
7039 )
7040 .await
7041 .expect("query")
7042 .collect()
7043 .await
7044 .expect("collect");
7045
7046 assert_eq!(batches.len(), 1);
7047 let batch = &batches[0];
7048 assert_eq!(
7049 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7050 ScalarValue::Int64(Some(38))
7051 );
7052 assert_eq!(
7053 ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7054 ScalarValue::Int64(Some(25))
7055 );
7056 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7057 assert!(
7058 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7059 "add/sub aggregate inputs should use reduction jobs"
7060 );
7061
7062 let _ = shutdown_tx.send(());
7063 }
7064
7065 #[tokio::test]
7066 async fn aggregate_pushdown_supports_case_filtered_computed_aggregates() {
7067 let state = MockState {
7068 kv: Arc::new(Mutex::new(BTreeMap::new())),
7069 range_calls: Arc::new(AtomicUsize::new(0)),
7070 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7071 sequence_number: Arc::new(AtomicU64::new(0)),
7072 };
7073 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7074 let client = StoreClient::new(&base_url);
7075
7076 let schema = KvSchema::new(client)
7077 .table(
7078 "orders",
7079 vec![
7080 TableColumnConfig::new("id", DataType::Int64, false),
7081 TableColumnConfig::new("status", DataType::Utf8, false),
7082 TableColumnConfig::new("price_cents", DataType::Int64, false),
7083 TableColumnConfig::new("qty", DataType::Int64, false),
7084 ],
7085 vec!["id".to_string()],
7086 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7087 .expect("valid")
7088 .with_cover_columns(vec!["price_cents".to_string(), "qty".to_string()])],
7089 )
7090 .expect("schema");
7091
7092 let mut writer = schema.batch_writer();
7093 for (id, status, price, qty) in [
7094 (1, "open", 10, 2),
7095 (2, "closed", 99, 1),
7096 (3, "open", 15, 3),
7097 (4, "closed", 7, 4),
7098 ] {
7099 writer
7100 .insert(
7101 "orders",
7102 vec![
7103 CellValue::Int64(id),
7104 CellValue::Utf8(status.to_string()),
7105 CellValue::Int64(price),
7106 CellValue::Int64(qty),
7107 ],
7108 )
7109 .expect("row");
7110 }
7111 writer.flush().await.expect("flush");
7112
7113 let ctx = SessionContext::new();
7114 schema.register_all(&ctx).expect("register");
7115
7116 state.range_calls.store(0, AtomicOrdering::SeqCst);
7117 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7118
7119 let batches = ctx
7120 .sql(
7121 "SELECT SUM(CASE WHEN status = 'open' THEN price_cents * qty END) \
7122 AS open_revenue \
7123 FROM orders",
7124 )
7125 .await
7126 .expect("query")
7127 .collect()
7128 .await
7129 .expect("collect");
7130
7131 assert_eq!(batches.len(), 1);
7132 let batch = &batches[0];
7133 assert_eq!(
7134 ScalarValue::try_from_array(batch.column(0), 0).expect("sum scalar"),
7135 ScalarValue::Int64(Some(65))
7136 );
7137 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7138 assert!(
7139 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7140 "case-filtered computed aggregate should use reduction jobs"
7141 );
7142
7143 let _ = shutdown_tx.send(());
7144 }
7145
7146 #[tokio::test]
7147 async fn aggregate_pushdown_does_not_rewrite_sum_case_else_zero_semantics() {
7148 let state = MockState {
7149 kv: Arc::new(Mutex::new(BTreeMap::new())),
7150 range_calls: Arc::new(AtomicUsize::new(0)),
7151 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7152 sequence_number: Arc::new(AtomicU64::new(0)),
7153 };
7154 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7155 let client = StoreClient::new(&base_url);
7156
7157 let schema = KvSchema::new(client)
7158 .table(
7159 "orders",
7160 vec![
7161 TableColumnConfig::new("id", DataType::Int64, false),
7162 TableColumnConfig::new("region", DataType::Utf8, false),
7163 TableColumnConfig::new("status", DataType::Utf8, false),
7164 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7165 ],
7166 vec!["id".to_string()],
7167 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7168 .expect("valid")
7169 .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7170 )
7171 .expect("schema");
7172
7173 let mut writer = schema.batch_writer();
7174 for (id, region, status, amount) in [
7175 (1, "east", "open", 10),
7176 (2, "east", "closed", 20),
7177 (3, "west", "closed", 30),
7178 ] {
7179 writer
7180 .insert(
7181 "orders",
7182 vec![
7183 CellValue::Int64(id),
7184 CellValue::Utf8(region.to_string()),
7185 CellValue::Utf8(status.to_string()),
7186 CellValue::Int64(amount),
7187 ],
7188 )
7189 .expect("row");
7190 }
7191 writer.flush().await.expect("flush");
7192
7193 let ctx = SessionContext::new();
7194 schema.register_all(&ctx).expect("register");
7195
7196 state.range_calls.store(0, AtomicOrdering::SeqCst);
7197 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7198
7199 let batches = ctx
7200 .sql(
7201 "SELECT region, \
7202 SUM(CASE WHEN status = 'open' THEN amount_cents ELSE 0 END) AS open_total \
7203 FROM orders \
7204 GROUP BY region \
7205 ORDER BY region",
7206 )
7207 .await
7208 .expect("query")
7209 .collect()
7210 .await
7211 .expect("collect");
7212
7213 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7214 let batch = &batches[0];
7215 assert_eq!(
7216 ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7217 ScalarValue::Utf8(Some("east".to_string()))
7218 );
7219 assert_eq!(
7220 ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar"),
7221 ScalarValue::Int64(Some(10))
7222 );
7223 assert_eq!(
7224 ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7225 ScalarValue::Utf8(Some("west".to_string()))
7226 );
7227 assert_eq!(
7228 ScalarValue::try_from_array(batch.column(1), 1).expect("sum scalar"),
7229 ScalarValue::Int64(Some(0))
7230 );
7231 assert_eq!(
7232 state.range_reduce_calls.load(AtomicOrdering::SeqCst),
7233 0,
7234 "SUM(CASE ... ELSE 0 END) must not push down because FILTER rewrite changes semantics"
7235 );
7236
7237 let _ = shutdown_tx.send(());
7238 }
7239
7240 #[tokio::test]
7241 async fn aggregate_pushdown_supports_computed_group_keys() {
7242 let state = MockState {
7243 kv: Arc::new(Mutex::new(BTreeMap::new())),
7244 range_calls: Arc::new(AtomicUsize::new(0)),
7245 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7246 sequence_number: Arc::new(AtomicU64::new(0)),
7247 };
7248 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7249 let client = StoreClient::new(&base_url);
7250
7251 let schema = KvSchema::new(client)
7252 .table(
7253 "events",
7254 vec![
7255 TableColumnConfig::new("id", DataType::Int64, false),
7256 TableColumnConfig::new("country", DataType::Utf8, false),
7257 TableColumnConfig::new(
7258 "occurred_at",
7259 DataType::Timestamp(TimeUnit::Microsecond, None),
7260 false,
7261 ),
7262 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7263 ],
7264 vec!["id".to_string()],
7265 vec![],
7266 )
7267 .expect("schema");
7268
7269 let day_micros = 86_400_000_000i64;
7270 let day0 = 1_700_000_000_000_000i64;
7271 let day1 = day0 + day_micros;
7272 let day0_bucket = day0.div_euclid(day_micros) * day_micros;
7273 let day1_bucket = day1.div_euclid(day_micros) * day_micros;
7274 let mut writer = schema.batch_writer();
7275 for (id, country, occurred_at, amount) in [
7276 (1, "East", day0 + 111, 10),
7277 (2, "east", day0 + 222, 30),
7278 (3, "West", day1 + 333, 7),
7279 ] {
7280 writer
7281 .insert(
7282 "events",
7283 vec![
7284 CellValue::Int64(id),
7285 CellValue::Utf8(country.to_string()),
7286 CellValue::Timestamp(occurred_at),
7287 CellValue::Int64(amount),
7288 ],
7289 )
7290 .expect("row");
7291 }
7292 writer.flush().await.expect("flush");
7293
7294 let ctx = SessionContext::new();
7295 schema.register_all(&ctx).expect("register");
7296
7297 state.range_calls.store(0, AtomicOrdering::SeqCst);
7298 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7299
7300 let batches = ctx
7301 .sql(
7302 "SELECT lower(country) AS country_norm, \
7303 date_trunc('day', occurred_at) AS day_bucket, \
7304 SUM(amount_cents) AS total_cents \
7305 FROM events \
7306 GROUP BY lower(country), date_trunc('day', occurred_at) \
7307 ORDER BY country_norm, day_bucket",
7308 )
7309 .await
7310 .expect("query")
7311 .collect()
7312 .await
7313 .expect("collect");
7314
7315 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7316 let batch = &batches[0];
7317 assert_eq!(
7318 scalar_to_string(
7319 &ScalarValue::try_from_array(batch.column(0), 0).expect("country scalar")
7320 )
7321 .as_deref(),
7322 Some("east")
7323 );
7324 assert_eq!(
7325 ScalarValue::try_from_array(batch.column(1), 0).expect("day scalar"),
7326 ScalarValue::TimestampMicrosecond(Some(day0_bucket), None)
7327 );
7328 assert_eq!(
7329 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7330 ScalarValue::Int64(Some(40))
7331 );
7332 assert_eq!(
7333 scalar_to_string(
7334 &ScalarValue::try_from_array(batch.column(0), 1).expect("country scalar")
7335 )
7336 .as_deref(),
7337 Some("west")
7338 );
7339 assert_eq!(
7340 ScalarValue::try_from_array(batch.column(1), 1).expect("day scalar"),
7341 ScalarValue::TimestampMicrosecond(Some(day1_bucket), None)
7342 );
7343 assert_eq!(
7344 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7345 ScalarValue::Int64(Some(7))
7346 );
7347 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7348 assert!(
7349 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7350 "computed group keys should use grouped reduction path"
7351 );
7352
7353 let _ = shutdown_tx.send(());
7354 }
7355
7356 #[tokio::test]
7357 async fn aggregate_pushdown_supports_group_by_queries() {
7358 let state = MockState {
7359 kv: Arc::new(Mutex::new(BTreeMap::new())),
7360 range_calls: Arc::new(AtomicUsize::new(0)),
7361 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7362 sequence_number: Arc::new(AtomicU64::new(0)),
7363 };
7364 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7365 let client = StoreClient::new(&base_url);
7366
7367 let schema = KvSchema::new(client)
7368 .table(
7369 "orders",
7370 vec![
7371 TableColumnConfig::new("id", DataType::Int64, false),
7372 TableColumnConfig::new("status", DataType::Utf8, false),
7373 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7374 ],
7375 vec!["id".to_string()],
7376 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7377 .expect("valid")
7378 .with_cover_columns(vec!["amount_cents".to_string()])],
7379 )
7380 .expect("schema");
7381
7382 let mut writer = schema.batch_writer();
7383 for (id, status, amount) in [
7384 (1, "open", 10),
7385 (2, "open", 30),
7386 (3, "closed", 15),
7387 (4, "closed", 40),
7388 ] {
7389 writer
7390 .insert(
7391 "orders",
7392 vec![
7393 CellValue::Int64(id),
7394 CellValue::Utf8(status.to_string()),
7395 CellValue::Int64(amount),
7396 ],
7397 )
7398 .expect("row");
7399 }
7400 writer.flush().await.expect("flush");
7401
7402 let ctx = SessionContext::new();
7403 schema.register_all(&ctx).expect("register");
7404
7405 state.range_calls.store(0, AtomicOrdering::SeqCst);
7406 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7407
7408 let batches = ctx
7409 .sql(
7410 "SELECT status, COUNT(*) AS row_count, SUM(amount_cents) AS total_cents \
7411 FROM orders GROUP BY status ORDER BY status",
7412 )
7413 .await
7414 .expect("query")
7415 .collect()
7416 .await
7417 .expect("collect");
7418
7419 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7420 let batch = &batches[0];
7421 assert_eq!(
7422 ScalarValue::try_from_array(batch.column(0), 0).expect("status scalar"),
7423 ScalarValue::Utf8(Some("closed".to_string()))
7424 );
7425 assert_count_scalar(batch, 1, 0, 2);
7426 assert_eq!(
7427 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7428 ScalarValue::Int64(Some(55))
7429 );
7430 assert_eq!(
7431 ScalarValue::try_from_array(batch.column(0), 1).expect("status scalar"),
7432 ScalarValue::Utf8(Some("open".to_string()))
7433 );
7434 assert_count_scalar(batch, 1, 1, 2);
7435 assert_eq!(
7436 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7437 ScalarValue::Int64(Some(40))
7438 );
7439 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7440 assert!(
7441 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 2,
7442 "group-by aggregate should use grouped range reduction path"
7443 );
7444
7445 let _ = shutdown_tx.send(());
7446 }
7447
7448 #[tokio::test]
7449 async fn aggregate_pushdown_group_by_float_canonicalizes_signed_zero() {
7450 let state = MockState {
7451 kv: Arc::new(Mutex::new(BTreeMap::new())),
7452 range_calls: Arc::new(AtomicUsize::new(0)),
7453 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7454 sequence_number: Arc::new(AtomicU64::new(0)),
7455 };
7456 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7457 let client = StoreClient::new(&base_url);
7458
7459 let schema = KvSchema::new(client)
7460 .table(
7461 "metrics",
7462 vec![
7463 TableColumnConfig::new("id", DataType::Int64, false),
7464 TableColumnConfig::new("score", DataType::Float64, false),
7465 ],
7466 vec!["id".to_string()],
7467 vec![],
7468 )
7469 .expect("schema");
7470
7471 let mut writer = schema.batch_writer();
7472 for (id, score) in [(1, -0.0), (2, 0.0), (3, 1.5)] {
7473 writer
7474 .insert(
7475 "metrics",
7476 vec![CellValue::Int64(id), CellValue::Float64(score)],
7477 )
7478 .expect("row");
7479 }
7480 writer.flush().await.expect("flush");
7481
7482 let ctx = SessionContext::new();
7483 schema.register_all(&ctx).expect("register");
7484
7485 state.range_calls.store(0, AtomicOrdering::SeqCst);
7486 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7487
7488 let batches = ctx
7489 .sql(
7490 "SELECT score, COUNT(*) AS row_count \
7491 FROM metrics GROUP BY score ORDER BY row_count DESC, score",
7492 )
7493 .await
7494 .expect("query")
7495 .collect()
7496 .await
7497 .expect("collect");
7498
7499 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
7500 let batch = &batches[0];
7501 let top_score = batch
7502 .column(0)
7503 .as_any()
7504 .downcast_ref::<Float64Array>()
7505 .expect("score float64")
7506 .value(0);
7507 assert_eq!(top_score.to_bits(), 0.0f64.to_bits());
7508 assert_count_scalar(batch, 1, 0, 2);
7509 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7510 assert!(
7511 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 1,
7512 "float group-by aggregate should stay on grouped reduction path"
7513 );
7514
7515 let _ = shutdown_tx.send(());
7516 }
7517
7518 #[tokio::test]
7519 async fn aggregate_pushdown_supports_filtered_group_by_queries() {
7520 let state = MockState {
7521 kv: Arc::new(Mutex::new(BTreeMap::new())),
7522 range_calls: Arc::new(AtomicUsize::new(0)),
7523 range_reduce_calls: Arc::new(AtomicUsize::new(0)),
7524 sequence_number: Arc::new(AtomicU64::new(0)),
7525 };
7526 let (base_url, shutdown_tx) = spawn_mock_server(state.clone()).await;
7527 let client = StoreClient::new(&base_url);
7528
7529 let schema = KvSchema::new(client)
7530 .table(
7531 "orders",
7532 vec![
7533 TableColumnConfig::new("id", DataType::Int64, false),
7534 TableColumnConfig::new("region", DataType::Utf8, false),
7535 TableColumnConfig::new("status", DataType::Utf8, false),
7536 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7537 ],
7538 vec!["id".to_string()],
7539 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7540 .expect("valid")
7541 .with_cover_columns(vec!["region".to_string(), "amount_cents".to_string()])],
7542 )
7543 .expect("schema");
7544
7545 let mut writer = schema.batch_writer();
7546 for (id, region, status, amount) in [
7547 (1, "east", "open", 10),
7548 (2, "east", "closed", 20),
7549 (3, "west", "open", 30),
7550 (4, "north", "closed", 40),
7551 ] {
7552 writer
7553 .insert(
7554 "orders",
7555 vec![
7556 CellValue::Int64(id),
7557 CellValue::Utf8(region.to_string()),
7558 CellValue::Utf8(status.to_string()),
7559 CellValue::Int64(amount),
7560 ],
7561 )
7562 .expect("row");
7563 }
7564 writer.flush().await.expect("flush");
7565
7566 let ctx = SessionContext::new();
7567 schema.register_all(&ctx).expect("register");
7568
7569 state.range_calls.store(0, AtomicOrdering::SeqCst);
7570 state.range_reduce_calls.store(0, AtomicOrdering::SeqCst);
7571
7572 let batches = ctx
7573 .sql(
7574 "SELECT region, \
7575 COUNT(*) FILTER (WHERE status = 'open') AS open_count, \
7576 SUM(amount_cents) FILTER (WHERE status = 'closed') AS closed_total \
7577 FROM orders \
7578 GROUP BY region \
7579 ORDER BY region",
7580 )
7581 .await
7582 .expect("query")
7583 .collect()
7584 .await
7585 .expect("collect");
7586
7587 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
7588 let batch = &batches[0];
7589
7590 assert_eq!(
7591 ScalarValue::try_from_array(batch.column(0), 0).expect("region scalar"),
7592 ScalarValue::Utf8(Some("east".to_string()))
7593 );
7594 assert_count_scalar(batch, 1, 0, 1);
7595 assert_eq!(
7596 ScalarValue::try_from_array(batch.column(2), 0).expect("sum scalar"),
7597 ScalarValue::Int64(Some(20))
7598 );
7599
7600 assert_eq!(
7601 ScalarValue::try_from_array(batch.column(0), 1).expect("region scalar"),
7602 ScalarValue::Utf8(Some("north".to_string()))
7603 );
7604 assert_count_scalar(batch, 1, 1, 0);
7605 assert_eq!(
7606 ScalarValue::try_from_array(batch.column(2), 1).expect("sum scalar"),
7607 ScalarValue::Int64(Some(40))
7608 );
7609
7610 assert_eq!(
7611 ScalarValue::try_from_array(batch.column(0), 2).expect("region scalar"),
7612 ScalarValue::Utf8(Some("west".to_string()))
7613 );
7614 assert_count_scalar(batch, 1, 2, 1);
7615 assert_eq!(
7616 ScalarValue::try_from_array(batch.column(2), 2).expect("sum scalar"),
7617 ScalarValue::Int64(None)
7618 );
7619
7620 assert_eq!(state.range_calls.load(AtomicOrdering::SeqCst), 0);
7621 assert!(
7622 state.range_reduce_calls.load(AtomicOrdering::SeqCst) >= 3,
7623 "filtered group-by aggregate should use grouped reduction plus seed job"
7624 );
7625
7626 let _ = shutdown_tx.send(());
7627 }
7628
7629 mod e2e {
7630 use super::*;
7631 use axum::{routing::get, Router};
7632 use datafusion::prelude::SessionContext;
7633 use exoware_sdk::StoreClient;
7634 use exoware_server::{connect_stack, AppState};
7635 use exoware_simulator::RocksStore;
7636 use tempfile::tempdir;
7637
7638 struct TestServers {
7639 ingest_url: String,
7640 query_url: String,
7641 }
7642
7643 impl TestServers {
7644 fn client(&self) -> StoreClient {
7645 StoreClient::with_split_urls(
7646 &self.query_url,
7647 &self.ingest_url,
7648 &self.query_url,
7649 &self.ingest_url,
7650 )
7651 }
7652 }
7653
7654 async fn spawn_e2e_servers() -> TestServers {
7655 let dir = tempdir().expect("tempdir");
7656 let db = RocksStore::open(dir.path()).expect("db");
7657 let state = AppState::new(std::sync::Arc::new(db));
7658 let connect = connect_stack(state);
7659 let app = Router::new()
7660 .route("/health", get(|| async { "ok" }))
7661 .fallback_service(connect);
7662 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
7663 .await
7664 .expect("bind");
7665 let url = format!("http://{}", listener.local_addr().unwrap());
7666 tokio::spawn(async move {
7667 axum::serve(listener, app).await.expect("serve");
7668 });
7669 for _ in 0..200 {
7670 if reqwest::get(format!("{url}/health"))
7671 .await
7672 .ok()
7673 .is_some_and(|r| r.status().is_success())
7674 {
7675 return TestServers {
7676 ingest_url: url.clone(),
7677 query_url: url,
7678 };
7679 }
7680 tokio::time::sleep(std::time::Duration::from_millis(25)).await;
7681 }
7682 panic!("e2e simulator did not become ready");
7683 }
7684
7685 #[tokio::test]
7686 async fn sql_insert_and_select_through_real_ingest_query_workers() {
7687 let servers = spawn_e2e_servers().await;
7688 let client = servers.client();
7689
7690 let schema = KvSchema::new(client)
7691 .table(
7692 "orders",
7693 vec![
7694 TableColumnConfig::new("id", DataType::Int64, false),
7695 TableColumnConfig::new("status", DataType::Utf8, false),
7696 TableColumnConfig::new("amount_cents", DataType::Int64, false),
7697 ],
7698 vec!["id".to_string()],
7699 vec![IndexSpec::new("status_idx", vec!["status".to_string()])
7700 .expect("valid")
7701 .with_cover_columns(vec!["amount_cents".to_string()])],
7702 )
7703 .expect("schema");
7704
7705 let mut writer = schema.batch_writer();
7706 for (id, status, amount) in [
7707 (1i64, "open", 100i64),
7708 (2, "closed", 200),
7709 (3, "open", 300),
7710 (4, "closed", 400),
7711 (5, "open", 500),
7712 ] {
7713 writer
7714 .insert(
7715 "orders",
7716 vec![
7717 CellValue::Int64(id),
7718 CellValue::Utf8(status.to_string()),
7719 CellValue::Int64(amount),
7720 ],
7721 )
7722 .expect("insert row");
7723 }
7724 writer.flush().await.expect("flush batch");
7725
7726 let ctx = SessionContext::new();
7727 schema.register_all(&ctx).expect("register tables");
7728
7729 let batches = ctx
7730 .sql("SELECT id, amount_cents FROM orders ORDER BY id")
7731 .await
7732 .expect("full scan query")
7733 .collect()
7734 .await
7735 .expect("collect full scan");
7736 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
7737 assert_eq!(total_rows, 5, "all 5 rows returned from full scan");
7738
7739 let mut ids = Vec::new();
7740 let mut amounts = Vec::new();
7741 for batch in &batches {
7742 let id_col = batch
7743 .column(0)
7744 .as_any()
7745 .downcast_ref::<Int64Array>()
7746 .expect("id column");
7747 let amt_col = batch
7748 .column(1)
7749 .as_any()
7750 .downcast_ref::<Int64Array>()
7751 .expect("amount column");
7752 for i in 0..batch.num_rows() {
7753 ids.push(id_col.value(i));
7754 amounts.push(amt_col.value(i));
7755 }
7756 }
7757 assert_eq!(ids, vec![1, 2, 3, 4, 5]);
7758 assert_eq!(amounts, vec![100, 200, 300, 400, 500]);
7759
7760 let filtered = ctx
7761 .sql(
7762 "SELECT id, amount_cents FROM orders \
7763 WHERE status = 'open' ORDER BY id",
7764 )
7765 .await
7766 .expect("filtered query")
7767 .collect()
7768 .await
7769 .expect("collect filtered");
7770 let mut filtered_ids = Vec::new();
7771 let mut filtered_amounts = Vec::new();
7772 for batch in &filtered {
7773 let id_col = batch
7774 .column(0)
7775 .as_any()
7776 .downcast_ref::<Int64Array>()
7777 .expect("id column");
7778 let amt_col = batch
7779 .column(1)
7780 .as_any()
7781 .downcast_ref::<Int64Array>()
7782 .expect("amount column");
7783 for i in 0..batch.num_rows() {
7784 filtered_ids.push(id_col.value(i));
7785 filtered_amounts.push(amt_col.value(i));
7786 }
7787 }
7788 assert_eq!(filtered_ids, vec![1, 3, 5]);
7789 assert_eq!(filtered_amounts, vec![100, 300, 500]);
7790
7791 let agg = ctx
7792 .sql(
7793 "SELECT COUNT(*) AS cnt, SUM(amount_cents) AS total \
7794 FROM orders WHERE status = 'open'",
7795 )
7796 .await
7797 .expect("aggregate query")
7798 .collect()
7799 .await
7800 .expect("collect aggregate");
7801 assert_eq!(agg.len(), 1);
7802 let batch = &agg[0];
7803 assert_eq!(batch.num_rows(), 1);
7804 assert_count_scalar(batch, 0, 0, 3);
7805 let total = ScalarValue::try_from_array(batch.column(1), 0).expect("sum scalar");
7806 match total {
7807 ScalarValue::Int64(Some(v)) => assert_eq!(v, 900),
7808 other => panic!("unexpected sum type: {other:?}"),
7809 }
7810 }
7811 }
7812}