1use std::borrow::Cow;
2#[cfg(test)]
3use std::cell::RefCell;
4use std::collections::hash_map::Iter as HashMapIter;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use crate::catalog::Catalog;
9use crate::db::scalar_at_path;
10use crate::db::SharedDbState;
11use crate::error::{DbError, QueryError, SchemaError};
12use crate::file_format::MAX_QUERY_LIMIT;
13use crate::index::IndexState;
14use crate::record::RowValue;
15use crate::schema::{CollectionId, FieldPath, IndexKind};
16use crate::storage::{FileStore, Store};
17use crate::ScalarValue;
18
19use super::ast::{OrderBy, OrderDirection};
20use super::ast::{Predicate, Query};
21use super::operators::{LimitOp, RowKey, RowSource};
22
23fn row_for_index_pk(
24 latest: &crate::db::LatestMap,
25 collection_id: u32,
26 pk_key: Vec<u8>,
27 index_name: &str,
28) -> Result<BTreeMap<String, RowValue>, DbError> {
29 latest
30 .get(&(collection_id, pk_key))
31 .cloned()
32 .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
33 collection_id,
34 index_name: index_name.to_string(),
35 }))
36}
37
38#[derive(Debug, Clone, PartialEq)]
39struct IndexKeyRange {
40 lo: Option<ScalarValue>,
41 lo_inclusive: bool,
42 hi: Option<ScalarValue>,
43 hi_inclusive: bool,
44}
45
46#[derive(Debug, Clone, PartialEq)]
47enum Plan {
48 IndexLookup {
49 collection_id: u32,
50 index_name: String,
51 kind: IndexKind,
52 key: Vec<u8>,
53 residual: Option<Predicate>,
54 limit: Option<usize>,
55 order_by: Option<OrderBy>,
56 },
57 IndexRangeLookup {
58 collection_id: u32,
59 index_name: String,
60 kind: IndexKind,
61 key_range: IndexKeyRange,
62 residual: Option<Predicate>,
63 limit: Option<usize>,
64 order_by: Option<OrderBy>,
65 },
66 CollectionScan {
67 collection_id: u32,
68 predicate: Option<Predicate>,
69 limit: Option<usize>,
70 order_by: Option<OrderBy>,
71 },
72}
73
74pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
75 validate_query_limit(query)?;
76 let col =
77 catalog
78 .get(query.collection)
79 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
80 id: query.collection.0,
81 }))?;
82 let plan = plan_query(col.id, &col.indexes, query);
83 #[cfg(feature = "tracing")]
84 tracing::debug!(plan = ?plan, "explain_query");
85 Ok(match plan {
86 Plan::IndexLookup {
87 index_name,
88 kind,
89 residual,
90 limit,
91 order_by,
92 ..
93 } => {
94 let mut s = String::new();
95 s.push_str("Plan:\n");
96 s.push_str(&format!(
97 " IndexLookup index={index_name:?} kind={kind:?}\n"
98 ));
99 if let Some(r) = residual {
100 s.push_str(&format!(" ResidualFilter {r:?}\n"));
101 }
102 if let Some(n) = limit {
103 s.push_str(&format!(" Limit {n}\n"));
104 }
105 if let Some(ob) = order_by {
106 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
107 }
108 s
109 }
110 Plan::IndexRangeLookup {
111 index_name,
112 kind,
113 key_range,
114 residual,
115 limit,
116 order_by,
117 ..
118 } => {
119 let mut s = String::new();
120 s.push_str("Plan:\n");
121 s.push_str(&format!(
122 " IndexRangeLookup index={index_name:?} kind={kind:?}\n"
123 ));
124 if let Some(ref lo) = key_range.lo {
125 let op = if key_range.lo_inclusive { ">=" } else { ">" };
126 s.push_str(&format!(" KeyRange lo {op} {lo:?}\n"));
127 }
128 if let Some(ref hi) = key_range.hi {
129 let op = if key_range.hi_inclusive { "<=" } else { "<" };
130 s.push_str(&format!(" KeyRange hi {op} {hi:?}\n"));
131 }
132 if let Some(r) = residual {
133 s.push_str(&format!(" ResidualFilter {r:?}\n"));
134 }
135 if let Some(n) = limit {
136 s.push_str(&format!(" Limit {n}\n"));
137 }
138 if let Some(ob) = order_by {
139 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
140 }
141 s
142 }
143 Plan::CollectionScan {
144 predicate,
145 limit,
146 order_by,
147 ..
148 } => {
149 let mut s = String::new();
150 s.push_str("Plan:\n");
151 s.push_str(" CollectionScan\n");
152 if let Some(p) = predicate {
153 s.push_str(&format!(" Filter {p:?}\n"));
154 }
155 if let Some(n) = limit {
156 s.push_str(&format!(" Limit {n}\n"));
157 }
158 if let Some(ob) = order_by {
159 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
160 }
161 s
162 }
163 })
164}
165
166fn validate_query_limit(query: &Query) -> Result<(), DbError> {
167 if let Some(n) = query.limit {
168 if n > MAX_QUERY_LIMIT {
169 return Err(DbError::Query(QueryError {
170 message: format!("query limit {n} exceeds maximum {MAX_QUERY_LIMIT}"),
171 }));
172 }
173 }
174 Ok(())
175}
176
177pub fn execute_query(
178 catalog: &Catalog,
179 indexes: &IndexState,
180 latest: &crate::db::LatestMap,
181 query: &Query,
182) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
183 validate_query_limit(query)?;
184 let col =
185 catalog
186 .get(query.collection)
187 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
188 id: query.collection.0,
189 }))?;
190 let plan = plan_query(col.id, &col.indexes, query);
191
192 #[cfg(feature = "tracing")]
193 tracing::debug!(plan = ?plan, "execute_query");
194
195 match plan {
196 Plan::IndexLookup {
197 collection_id,
198 index_name,
199 kind,
200 key,
201 residual,
202 limit,
203 order_by,
204 } => {
205 let mut out = Vec::new();
206
207 match kind {
208 IndexKind::Unique => {
209 if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
210 out.push(row_for_index_pk(
211 latest,
212 collection_id,
213 pk.to_vec(),
214 &index_name,
215 )?);
216 }
217 }
218 IndexKind::NonUnique => {
219 if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
220 for pk in pks {
221 out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
222 }
223 }
224 }
225 }
226
227 if let Some(pred) = residual {
228 out.retain(|row| eval_predicate(row, &pred));
229 }
230 apply_order_by_and_limit(
231 &mut out,
232 order_by.as_ref(),
233 limit,
234 col.primary_field.as_deref(),
235 );
236 Ok(out)
237 }
238 Plan::IndexRangeLookup {
239 collection_id,
240 index_name,
241 kind,
242 key_range,
243 residual,
244 limit,
245 order_by,
246 } => {
247 let mut out = collect_index_range_rows(
248 indexes,
249 latest,
250 collection_id,
251 &index_name,
252 kind,
253 &key_range,
254 )?;
255 if let Some(pred) = residual {
256 out.retain(|row| eval_predicate(row, &pred));
257 }
258 apply_order_by_and_limit(
259 &mut out,
260 order_by.as_ref(),
261 limit,
262 col.primary_field.as_deref(),
263 );
264 Ok(out)
265 }
266 Plan::CollectionScan {
267 collection_id,
268 predicate,
269 limit,
270 order_by,
271 } => {
272 let mut out = Vec::new();
273 for ((cid, _pk), row) in latest.iter() {
274 if *cid != collection_id {
275 continue;
276 }
277 if let Some(ref p) = predicate {
278 if !eval_predicate(row, p) {
279 continue;
280 }
281 }
282 out.push(row.clone());
283 }
284 apply_order_by_and_limit(
285 &mut out,
286 order_by.as_ref(),
287 limit,
288 col.primary_field.as_deref(),
289 );
290 Ok(out)
291 }
292 }
293}
294
295pub struct QueryRowIter<'a> {
301 state: QueryRowIterState<'a>,
302}
303
304enum QueryRowIterState<'a> {
305 Vec {
306 rows: Vec<BTreeMap<String, RowValue>>,
307 pos: usize,
308 },
309 Source {
310 latest: &'a crate::db::LatestMap,
311 source: Box<dyn RowSource + 'a>,
312 },
313 Owned {
314 snapshot: Arc<SharedDbState>,
315 source: Box<dyn RowSource + 'static>,
316 },
317}
318
319impl<'a> Iterator for QueryRowIter<'a> {
320 type Item = Result<BTreeMap<String, RowValue>, DbError>;
321
322 fn next(&mut self) -> Option<Self::Item> {
323 match &mut self.state {
324 QueryRowIterState::Vec { rows, pos } => {
325 if *pos >= rows.len() {
326 None
327 } else {
328 let out = rows[*pos].clone();
329 *pos += 1;
330 Some(Ok(out))
331 }
332 }
333 QueryRowIterState::Source { latest, source } => match source.next_key() {
334 None => None,
335 Some(Err(e)) => Some(Err(e)),
336 Some(Ok((cid, pk_key))) => Some(row_for_index_pk(latest, cid.0, pk_key, "")),
337 },
338 QueryRowIterState::Owned { snapshot, source } => match source.next_key() {
339 None => None,
340 Some(Err(e)) => Some(Err(e)),
341 Some(Ok((cid, pk_key))) => {
342 Some(row_for_index_pk(&snapshot.latest, cid.0, pk_key, ""))
343 }
344 },
345 }
346 }
347}
348
349struct IndexUniqueSource<'a> {
350 latest: &'a crate::db::LatestMap,
351 collection_id: u32,
352 index_name: String,
353 pk: Option<Vec<u8>>,
354 residual: Option<Predicate>,
355 done: bool,
356}
357
358impl RowSource for IndexUniqueSource<'_> {
359 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
360 if self.done {
361 return None;
362 }
363 self.done = true;
364 let pk_key = self.pk.take()?;
365 let row = match row_for_index_pk(
366 self.latest,
367 self.collection_id,
368 pk_key.clone(),
369 &self.index_name,
370 ) {
371 Ok(r) => r,
372 Err(e) => return Some(Err(e)),
373 };
374 if let Some(pred) = &self.residual {
375 if !eval_predicate(&row, pred) {
376 return None;
377 }
378 }
379 Some(Ok((CollectionId(self.collection_id), pk_key)))
380 }
381}
382
383struct IndexNonUniqueSource<'a> {
384 latest: &'a crate::db::LatestMap,
385 collection_id: u32,
386 index_name: String,
387 pks: std::vec::IntoIter<Vec<u8>>,
388 residual: Option<Predicate>,
389}
390
391impl RowSource for IndexNonUniqueSource<'_> {
392 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
393 for pk_key in self.pks.by_ref() {
394 let row = match row_for_index_pk(
395 self.latest,
396 self.collection_id,
397 pk_key.clone(),
398 &self.index_name,
399 ) {
400 Ok(r) => r,
401 Err(e) => return Some(Err(e)),
402 };
403 if let Some(pred) = &self.residual {
404 if !eval_predicate(&row, pred) {
405 continue;
406 }
407 }
408 return Some(Ok((CollectionId(self.collection_id), pk_key)));
409 }
410 None
411 }
412}
413
414struct IndexRangeSource<'a> {
415 latest: &'a crate::db::LatestMap,
416 collection_id: u32,
417 index_name: String,
418 pks: std::vec::IntoIter<Vec<u8>>,
419 residual: Option<Predicate>,
420}
421
422impl RowSource for IndexRangeSource<'_> {
423 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
424 for pk_key in self.pks.by_ref() {
425 let row = match row_for_index_pk(
426 self.latest,
427 self.collection_id,
428 pk_key.clone(),
429 &self.index_name,
430 ) {
431 Ok(r) => r,
432 Err(e) => return Some(Err(e)),
433 };
434 if let Some(pred) = &self.residual {
435 if !eval_predicate(&row, pred) {
436 continue;
437 }
438 }
439 return Some(Ok((CollectionId(self.collection_id), pk_key)));
440 }
441 None
442 }
443}
444
445fn collect_index_range_rows(
446 indexes: &IndexState,
447 latest: &crate::db::LatestMap,
448 collection_id: u32,
449 index_name: &str,
450 kind: IndexKind,
451 key_range: &IndexKeyRange,
452) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
453 let lo = key_range.lo.as_ref();
454 let hi = key_range.hi.as_ref();
455 let pks = match kind {
456 IndexKind::Unique => indexes.unique_range_lookup(
457 collection_id,
458 index_name,
459 lo,
460 key_range.lo_inclusive,
461 hi,
462 key_range.hi_inclusive,
463 ),
464 IndexKind::NonUnique => indexes.non_unique_range_lookup(
465 collection_id,
466 index_name,
467 lo,
468 key_range.lo_inclusive,
469 hi,
470 key_range.hi_inclusive,
471 ),
472 };
473 let mut out = Vec::with_capacity(pks.len());
474 for pk in pks {
475 out.push(row_for_index_pk(latest, collection_id, pk, index_name)?);
476 }
477 Ok(out)
478}
479
480fn index_range_source<'a>(
481 indexes: &'a IndexState,
482 latest: &'a crate::db::LatestMap,
483 collection_id: u32,
484 index_name: String,
485 kind: IndexKind,
486 key_range: &IndexKeyRange,
487 residual: Option<Predicate>,
488) -> IndexRangeSource<'a> {
489 let lo = key_range.lo.as_ref();
490 let hi = key_range.hi.as_ref();
491 let pks = match kind {
492 IndexKind::Unique => indexes.unique_range_lookup(
493 collection_id,
494 &index_name,
495 lo,
496 key_range.lo_inclusive,
497 hi,
498 key_range.hi_inclusive,
499 ),
500 IndexKind::NonUnique => indexes.non_unique_range_lookup(
501 collection_id,
502 &index_name,
503 lo,
504 key_range.lo_inclusive,
505 hi,
506 key_range.hi_inclusive,
507 ),
508 };
509 IndexRangeSource {
510 latest,
511 collection_id,
512 index_name,
513 pks: pks.into_iter(),
514 residual,
515 }
516}
517
518struct ScanSource<'a> {
519 it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
520 collection_id: u32,
521 predicate: Option<Predicate>,
522}
523
524impl RowSource for ScanSource<'_> {
525 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
526 for (&(cid, ref pk_key), row) in self.it.by_ref() {
527 if cid != self.collection_id {
528 continue;
529 }
530 if let Some(p) = &self.predicate {
531 if !eval_predicate(row, p) {
532 continue;
533 }
534 }
535 return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
536 }
537 None
538 }
539}
540
541struct OwnedIndexUniqueSource {
542 snapshot: Arc<SharedDbState>,
543 collection_id: u32,
544 index_name: String,
545 pk: Option<Vec<u8>>,
546 residual: Option<Predicate>,
547 done: bool,
548}
549
550impl RowSource for OwnedIndexUniqueSource {
551 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
552 if self.done {
553 return None;
554 }
555 self.done = true;
556 let pk_key = self.pk.take()?;
557 let row = match row_for_index_pk(
558 &self.snapshot.latest,
559 self.collection_id,
560 pk_key.clone(),
561 &self.index_name,
562 ) {
563 Ok(r) => r,
564 Err(e) => return Some(Err(e)),
565 };
566 if let Some(pred) = &self.residual {
567 if !eval_predicate(&row, pred) {
568 return None;
569 }
570 }
571 Some(Ok((CollectionId(self.collection_id), pk_key)))
572 }
573}
574
575struct OwnedIndexNonUniqueSource {
576 snapshot: Arc<SharedDbState>,
577 collection_id: u32,
578 index_name: String,
579 pks: std::vec::IntoIter<Vec<u8>>,
580 residual: Option<Predicate>,
581}
582
583impl RowSource for OwnedIndexNonUniqueSource {
584 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
585 for pk_key in self.pks.by_ref() {
586 let row = match row_for_index_pk(
587 &self.snapshot.latest,
588 self.collection_id,
589 pk_key.clone(),
590 &self.index_name,
591 ) {
592 Ok(r) => r,
593 Err(e) => return Some(Err(e)),
594 };
595 if let Some(pred) = &self.residual {
596 if !eval_predicate(&row, pred) {
597 continue;
598 }
599 }
600 return Some(Ok((CollectionId(self.collection_id), pk_key)));
601 }
602 None
603 }
604}
605
606struct OwnedScanSource {
607 snapshot: Arc<SharedDbState>,
608 collection_id: u32,
609 predicate: Option<Predicate>,
610 pos: usize,
611 keys: Vec<(u32, Vec<u8>)>,
612}
613
614impl OwnedScanSource {
615 fn new(snapshot: Arc<SharedDbState>, collection_id: u32, predicate: Option<Predicate>) -> Self {
616 let mut keys: Vec<(u32, Vec<u8>)> = snapshot
617 .latest
618 .keys()
619 .filter(|(cid, _)| *cid == collection_id)
620 .cloned()
621 .collect();
622 keys.sort_by(|a, b| a.1.cmp(&b.1));
623 Self {
624 snapshot,
625 collection_id,
626 predicate,
627 pos: 0,
628 keys,
629 }
630 }
631}
632
633impl RowSource for OwnedScanSource {
634 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
635 while self.pos < self.keys.len() {
636 let (cid, pk_key) = self.keys[self.pos].clone();
637 self.pos += 1;
638 if cid != self.collection_id {
639 continue;
640 }
641 let row = match self.snapshot.latest.get(&(cid, pk_key.clone())) {
642 Some(r) => r,
643 None => continue,
644 };
645 if let Some(p) = &self.predicate {
646 if !eval_predicate(row, p) {
647 continue;
648 }
649 }
650 return Some(Ok((CollectionId(self.collection_id), pk_key)));
651 }
652 None
653 }
654}
655
656fn owned_row_source_for_plan(
657 snapshot: Arc<SharedDbState>,
658 plan: Plan,
659) -> Box<dyn RowSource + 'static> {
660 match plan {
661 Plan::IndexLookup {
662 collection_id,
663 index_name,
664 kind,
665 key,
666 residual,
667 ..
668 } => match kind {
669 IndexKind::Unique => {
670 let pk: Option<Vec<u8>> = snapshot
671 .indexes
672 .unique_lookup(collection_id, &index_name, &key)
673 .map(|p| p.to_vec());
674 Box::new(OwnedIndexUniqueSource {
675 snapshot,
676 collection_id,
677 index_name,
678 pk,
679 residual,
680 done: false,
681 })
682 }
683 IndexKind::NonUnique => {
684 let pks = snapshot
685 .indexes
686 .non_unique_lookup(collection_id, &index_name, &key)
687 .unwrap_or_default()
688 .into_iter();
689 Box::new(OwnedIndexNonUniqueSource {
690 snapshot,
691 collection_id,
692 index_name,
693 pks,
694 residual,
695 })
696 }
697 },
698 Plan::IndexRangeLookup {
699 collection_id,
700 index_name,
701 kind,
702 key_range,
703 residual,
704 ..
705 } => {
706 let lo = key_range.lo.as_ref();
707 let hi = key_range.hi.as_ref();
708 let pks = match kind {
709 IndexKind::Unique => snapshot.indexes.unique_range_lookup(
710 collection_id,
711 &index_name,
712 lo,
713 key_range.lo_inclusive,
714 hi,
715 key_range.hi_inclusive,
716 ),
717 IndexKind::NonUnique => snapshot.indexes.non_unique_range_lookup(
718 collection_id,
719 &index_name,
720 lo,
721 key_range.lo_inclusive,
722 hi,
723 key_range.hi_inclusive,
724 ),
725 };
726 Box::new(OwnedIndexNonUniqueSource {
727 snapshot,
728 collection_id,
729 index_name,
730 pks: pks.into_iter(),
731 residual,
732 })
733 }
734 Plan::CollectionScan {
735 collection_id,
736 predicate,
737 ..
738 } => Box::new(OwnedScanSource::new(snapshot, collection_id, predicate)),
739 }
740}
741
742pub fn execute_query_iter_owned(
744 snapshot: Arc<SharedDbState>,
745 query: &Query,
746 db_path: Option<&std::path::Path>,
747) -> Result<QueryRowIter<'static>, DbError> {
748 if query.order_by.is_none() {
749 validate_query_limit(query)?;
750 let col = snapshot
751 .catalog
752 .get(query.collection)
753 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
754 id: query.collection.0,
755 }))?;
756 let plan = plan_query(col.id, &col.indexes, query);
757 let mut source = owned_row_source_for_plan(snapshot.clone(), plan);
758 if let Some(n) = query.limit {
759 source = Box::new(LimitOp::new(source, n));
760 }
761 return Ok(QueryRowIter {
762 state: QueryRowIterState::Owned { snapshot, source },
763 });
764 }
765
766 let order_by = query
767 .order_by
768 .clone()
769 .expect("order_by is Some when this function continues");
770 let Some(path) = db_path else {
771 return Ok(QueryRowIter {
772 state: QueryRowIterState::Vec {
773 rows: execute_query(
774 &snapshot.catalog,
775 &snapshot.indexes,
776 &snapshot.latest,
777 query,
778 )?,
779 pos: 0,
780 },
781 });
782 };
783
784 validate_query_limit(query)?;
785 let col = snapshot
786 .catalog
787 .get(query.collection)
788 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
789 id: query.collection.0,
790 }))?;
791 let plan = plan_query(col.id, &col.indexes, query);
792 let base = owned_row_source_for_plan(snapshot.clone(), plan.clone());
793 let spill_store = open_sorted_query_spill_store(path)?;
794 #[cfg(feature = "tracing")]
795 tracing::debug!(spill_path = %path.display(), "execute_query_iter_owned_spill");
796 let spill = crate::spill::TempSpillFile::new(spill_store)?;
797 let index_name_for_sort = match &plan {
798 Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
799 index_name.as_str()
800 }
801 Plan::CollectionScan { .. } => "",
802 };
803 let sort_source = Box::new(ExternalSortSourceOwned::new(
804 spill,
805 snapshot.clone(),
806 base,
807 col.id.0,
808 order_by,
809 index_name_for_sort,
810 )?);
811 let mut source: Box<dyn RowSource + 'static> = sort_source;
812 if let Some(n) = query.limit {
813 source = Box::new(LimitOp::new(source, n));
814 }
815 Ok(QueryRowIter {
816 state: QueryRowIterState::Owned { snapshot, source },
817 })
818}
819
820pub fn execute_query_iter<'a>(
822 catalog: &'a Catalog,
823 indexes: &'a IndexState,
824 latest: &'a crate::db::LatestMap,
825 query: &Query,
826) -> Result<QueryRowIter<'a>, DbError> {
827 if query.order_by.is_some() {
828 return Ok(QueryRowIter {
829 state: QueryRowIterState::Vec {
830 rows: execute_query(catalog, indexes, latest, query)?,
831 pos: 0,
832 },
833 });
834 }
835 let col =
836 catalog
837 .get(query.collection)
838 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
839 id: query.collection.0,
840 }))?;
841 let plan = plan_query(col.id, &col.indexes, query);
842 let mut source: Box<dyn RowSource + 'a> = match plan {
843 Plan::IndexLookup {
844 collection_id,
845 index_name,
846 kind,
847 key,
848 residual,
849 ..
850 } => match kind {
851 IndexKind::Unique => {
852 let pk = indexes
853 .unique_lookup(collection_id, &index_name, &key)
854 .map(|p| p.to_vec());
855 Box::new(IndexUniqueSource {
856 latest,
857 collection_id,
858 index_name,
859 pk,
860 residual,
861 done: false,
862 })
863 }
864 IndexKind::NonUnique => {
865 let pks = indexes
866 .non_unique_lookup(collection_id, &index_name, &key)
867 .unwrap_or_default()
868 .into_iter();
869 Box::new(IndexNonUniqueSource {
870 latest,
871 collection_id,
872 index_name,
873 pks,
874 residual,
875 })
876 }
877 },
878 Plan::IndexRangeLookup {
879 collection_id,
880 index_name,
881 kind,
882 key_range,
883 residual,
884 ..
885 } => Box::new(index_range_source(
886 indexes,
887 latest,
888 collection_id,
889 index_name,
890 kind,
891 &key_range,
892 residual,
893 )),
894 Plan::CollectionScan {
895 collection_id,
896 predicate,
897 ..
898 } => Box::new(ScanSource {
899 it: latest.iter(),
900 collection_id,
901 predicate,
902 }),
903 };
904
905 if let Some(n) = query.limit {
906 source = Box::new(LimitOp::new(source, n));
907 }
908
909 Ok(QueryRowIter {
910 state: QueryRowIterState::Source { latest, source },
911 })
912}
913
914#[cfg(test)]
915type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
916#[cfg(test)]
917type SortedQuerySpillStoreOverrideHook =
918 Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
919
920#[cfg(test)]
921thread_local! {
922 static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
923 RefCell::new(None);
924
925 static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
926 RefCell::new(None);
927}
928
929#[cfg(test)]
931pub(crate) fn test_set_sorted_query_spill_store_open_hook(
932 hook: Option<SortedQuerySpillStoreOpenHook>,
933) {
934 QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
935 *c.borrow_mut() = hook;
936 });
937}
938
939#[cfg(test)]
941pub(crate) fn test_set_sorted_query_spill_store_override_hook(
942 hook: Option<SortedQuerySpillStoreOverrideHook>,
943) {
944 QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
945 *c.borrow_mut() = hook;
946 });
947}
948
949pub(crate) enum SortedQuerySpillStore {
950 File(FileStore),
951 #[cfg(test)]
952 FailLen,
953}
954
955impl Store for SortedQuerySpillStore {
956 fn len(&self) -> Result<u64, DbError> {
957 match self {
958 Self::File(f) => f.len(),
959 #[cfg(test)]
960 Self::FailLen => Err(DbError::Io(std::io::Error::other(
961 "sorted query spill store synthetic len() failure (test override)",
962 ))),
963 }
964 }
965
966 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
967 match self {
968 Self::File(f) => f.read_exact_at(offset, buf),
969 #[cfg(test)]
970 Self::FailLen => Err(DbError::Io(std::io::Error::other(
971 "sorted query spill store synthetic read failure (test override)",
972 ))),
973 }
974 }
975
976 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
977 match self {
978 Self::File(f) => f.write_all_at(offset, buf),
979 #[cfg(test)]
980 Self::FailLen => Err(DbError::Io(std::io::Error::other(
981 "sorted query spill store synthetic write failure (test override)",
982 ))),
983 }
984 }
985
986 fn sync(&mut self) -> Result<(), DbError> {
987 match self {
988 Self::File(f) => f.sync(),
989 #[cfg(test)]
990 Self::FailLen => Ok(()),
991 }
992 }
993
994 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
995 match self {
996 Self::File(f) => f.truncate(len),
997 #[cfg(test)]
998 Self::FailLen => Ok(()),
999 }
1000 }
1001}
1002
1003fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
1004 #[cfg(test)]
1005 {
1006 let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
1007 let mut bm = c.borrow_mut();
1008 bm.as_mut().map(|hook| hook(path))
1009 });
1010 if let Some(r) = overridden {
1011 return r;
1012 }
1013
1014 let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
1015 let mut bm = c.borrow_mut();
1016 bm.as_mut().map(|hook| hook(path))
1017 });
1018 if let Some(r) = hooked {
1019 return r.map(SortedQuerySpillStore::File);
1020 }
1021 }
1022 let _ = path;
1023 let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
1025 Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
1026}
1027
1028pub fn execute_query_iter_with_spill_path<'a>(
1033 catalog: &'a Catalog,
1034 indexes: &'a IndexState,
1035 latest: &'a crate::db::LatestMap,
1036 q: &Query,
1037 db_path: Option<&std::path::Path>,
1038) -> Result<QueryRowIter<'a>, DbError> {
1039 if q.order_by.is_none() {
1040 return execute_query_iter(catalog, indexes, latest, q);
1041 }
1042 let order_by = q
1043 .order_by
1044 .clone()
1045 .expect("order_by is Some when this function continues");
1046
1047 let Some(path) = db_path else {
1049 return Ok(QueryRowIter {
1050 state: QueryRowIterState::Vec {
1051 rows: execute_query(catalog, indexes, latest, q)?,
1052 pos: 0,
1053 },
1054 });
1055 };
1056
1057 let col = catalog
1058 .get(q.collection)
1059 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1060 id: q.collection.0,
1061 }))?;
1062 let plan = plan_query(col.id, &col.indexes, q);
1063
1064 let base: Box<dyn RowSource + 'a> = match plan.clone() {
1065 Plan::IndexLookup {
1066 collection_id,
1067 index_name,
1068 kind,
1069 key,
1070 residual,
1071 ..
1072 } => match kind {
1073 IndexKind::Unique => Box::new(IndexUniqueSource {
1074 latest,
1075 collection_id,
1076 index_name: index_name.clone(),
1077 pk: indexes
1078 .unique_lookup(collection_id, &index_name, &key)
1079 .map(|p| p.to_vec()),
1080 residual,
1081 done: false,
1082 }),
1083 IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
1084 latest,
1085 collection_id,
1086 index_name: index_name.clone(),
1087 pks: indexes
1088 .non_unique_lookup(collection_id, &index_name, &key)
1089 .unwrap_or_default()
1090 .into_iter(),
1091 residual,
1092 }),
1093 },
1094 Plan::IndexRangeLookup {
1095 collection_id,
1096 index_name,
1097 kind,
1098 key_range,
1099 residual,
1100 ..
1101 } => Box::new(index_range_source(
1102 indexes,
1103 latest,
1104 collection_id,
1105 index_name,
1106 kind,
1107 &key_range,
1108 residual,
1109 )),
1110 Plan::CollectionScan {
1111 collection_id,
1112 predicate,
1113 ..
1114 } => Box::new(ScanSource {
1115 it: latest.iter(),
1116 collection_id,
1117 predicate,
1118 }),
1119 };
1120
1121 let spill_store = open_sorted_query_spill_store(path)?;
1123 #[cfg(feature = "tracing")]
1124 tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
1125 let spill = crate::spill::TempSpillFile::new(spill_store)?;
1126 let index_name_for_sort = match &plan {
1127 Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
1128 index_name.as_str()
1129 }
1130 Plan::CollectionScan { .. } => "",
1131 };
1132 let sort_source = Box::new(ExternalSortSource::new(
1133 spill,
1134 latest,
1135 base,
1136 col.id.0,
1137 order_by,
1138 index_name_for_sort,
1139 )?);
1140
1141 let mut source: Box<dyn RowSource + 'a> = sort_source;
1142 if let Some(n) = q.limit {
1143 source = Box::new(LimitOp::new(source, n));
1144 }
1145
1146 Ok(QueryRowIter {
1147 state: QueryRowIterState::Source { latest, source },
1148 })
1149}
1150
1151#[derive(Clone)]
1152struct SortItem {
1153 none_flag: u8,
1155 sort_key: Vec<u8>,
1156 key: RowKey,
1157}
1158
1159#[cfg(test)]
1160fn sort_item_for(
1161 latest: &crate::db::LatestMap,
1162 key: &RowKey,
1163 order_by: &OrderBy,
1164) -> Option<SortItem> {
1165 sort_item_for_result(latest, key, order_by, "").ok()
1166}
1167
1168fn sort_item_for_result(
1169 latest: &crate::db::LatestMap,
1170 key: &RowKey,
1171 order_by: &OrderBy,
1172 index_name: &str,
1173) -> Result<SortItem, DbError> {
1174 let (cid, pk) = key;
1175 let row =
1176 latest
1177 .get(&(cid.0, pk.clone()))
1178 .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
1179 collection_id: cid.0,
1180 index_name: index_name.to_string(),
1181 }))?;
1182 let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
1183 None => (1u8, Vec::new()),
1184 Some(s) => (0u8, scalar_sort_key_bytes(&s)),
1185 };
1186 Ok(SortItem {
1187 none_flag,
1188 sort_key,
1189 key: (CollectionId(cid.0), pk.clone()),
1190 })
1191}
1192
1193fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
1194 match s {
1195 ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
1196 ScalarValue::Int64(v) => {
1197 let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
1198 let mut out = vec![1];
1199 out.extend_from_slice(&u.to_be_bytes());
1200 out
1201 }
1202 ScalarValue::Uint64(v) => {
1203 let mut out = vec![2];
1204 out.extend_from_slice(&v.to_be_bytes());
1205 out
1206 }
1207 ScalarValue::Float64(v) => {
1208 let n = if *v == 0.0 { 0.0f64 } else { *v };
1209 let mut bits = n.to_bits();
1210 if bits & (1u64 << 63) != 0 {
1211 bits = !bits;
1212 } else {
1213 bits ^= 1u64 << 63;
1214 }
1215 let mut out = vec![3];
1216 out.extend_from_slice(&bits.to_be_bytes());
1217 out
1218 }
1219 ScalarValue::String(st) => {
1220 let mut out = vec![4];
1221 out.extend_from_slice(st.as_bytes());
1222 out
1223 }
1224 ScalarValue::Bytes(b) => {
1225 let mut out = vec![5];
1226 out.extend_from_slice(b);
1227 out
1228 }
1229 ScalarValue::Uuid(u) => {
1230 let mut out = vec![6];
1231 out.extend_from_slice(u);
1232 out
1233 }
1234 ScalarValue::Timestamp(t) => {
1235 let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
1236 let mut out = vec![7];
1237 out.extend_from_slice(&u.to_be_bytes());
1238 out
1239 }
1240 }
1241}
1242
1243fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
1244 let ord = a
1245 .none_flag
1246 .cmp(&b.none_flag)
1247 .then_with(|| a.sort_key.cmp(&b.sort_key))
1248 .then_with(|| a.key.1.cmp(&b.key.1));
1249 match dir {
1250 OrderDirection::Asc => ord,
1251 OrderDirection::Desc => ord.reverse(),
1252 }
1253}
1254
1255struct ExternalSortSource<'a, S: Store = FileStore> {
1258 spill: crate::spill::TempSpillFile<S>,
1259 collection_id: u32,
1260 dir: OrderDirection,
1261 heap: std::collections::BinaryHeap<HeapItem>,
1262 runs_meta: Vec<RunMeta>,
1263 run_cursors: Vec<usize>,
1264 _latest: &'a crate::db::LatestMap,
1265}
1266
1267#[derive(Clone)]
1268struct RunMeta {
1269 offset: u64,
1270 payload_len: u64,
1271}
1272
1273type SpillSortRunItem = (u8, Vec<u8>, Vec<u8>);
1274
1275#[cfg(test)]
1276struct RunReader {
1277 buf: Vec<u8>,
1278 pos: usize,
1279}
1280
1281#[cfg(test)]
1282impl RunReader {
1283 fn new(buf: Vec<u8>) -> Self {
1284 Self { buf, pos: 0 }
1285 }
1286
1287 fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
1288 read_run_item_from_buf(&self.buf, &mut self.pos)
1289 }
1290}
1291
1292#[cfg(test)]
1293fn read_run_item_from_buf(buf: &[u8], pos: &mut usize) -> Option<(u8, Vec<u8>, Vec<u8>)> {
1294 fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
1295 let b = buf.get(*pos..*pos + 4)?;
1296 *pos += 4;
1297 Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
1298 }
1299 let none_flag = *buf.get(*pos)?;
1300 *pos += 1;
1301 let key_len = read_u32(buf, pos)? as usize;
1302 let key = buf.get(*pos..*pos + key_len)?.to_vec();
1303 *pos += key_len;
1304 let pk_len = read_u32(buf, pos)? as usize;
1305 let pk = buf.get(*pos..*pos + pk_len)?.to_vec();
1306 *pos += pk_len;
1307 Some((none_flag, key, pk))
1308}
1309
1310fn read_spill_run_item<S: Store>(
1311 spill: &mut crate::spill::TempSpillFile<S>,
1312 meta: &RunMeta,
1313 pos: &mut usize,
1314) -> Result<Option<SpillSortRunItem>, DbError> {
1315 let payload_len = meta.payload_len as usize;
1316 if *pos >= payload_len {
1317 return Ok(None);
1318 }
1319 let mut one = [0u8; 1];
1320 spill.read_temp_payload_into(meta.offset, *pos as u64, &mut one)?;
1321 *pos += 1;
1322 let none_flag = one[0];
1323
1324 if *pos + 4 > payload_len {
1325 return Err(DbError::Query(crate::error::QueryError {
1326 message: "external sort spill segment truncated".into(),
1327 }));
1328 }
1329 let mut len_buf = [0u8; 4];
1330 spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
1331 *pos += 4;
1332 let key_len = u32::from_le_bytes(len_buf) as usize;
1333 crate::file_format::check_field_bytes_len(key_len).map_err(|e| match e {
1334 DbError::Format(fe) => DbError::Query(crate::error::QueryError {
1335 message: format!("external sort spill key: {fe}"),
1336 }),
1337 other => other,
1338 })?;
1339 if *pos + key_len > payload_len {
1340 return Err(DbError::Query(crate::error::QueryError {
1341 message: "external sort spill segment truncated".into(),
1342 }));
1343 }
1344
1345 let mut key = vec![0u8; key_len];
1346 spill.read_temp_payload_into(meta.offset, *pos as u64, &mut key)?;
1347 *pos += key_len;
1348
1349 if *pos + 4 > payload_len {
1350 return Err(DbError::Query(crate::error::QueryError {
1351 message: "external sort spill segment truncated".into(),
1352 }));
1353 }
1354 spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
1355 *pos += 4;
1356 let pk_len = u32::from_le_bytes(len_buf) as usize;
1357 crate::file_format::check_field_bytes_len(pk_len).map_err(|e| match e {
1358 DbError::Format(fe) => DbError::Query(crate::error::QueryError {
1359 message: format!("external sort spill pk: {fe}"),
1360 }),
1361 other => other,
1362 })?;
1363 if *pos + pk_len > payload_len {
1364 return Err(DbError::Query(crate::error::QueryError {
1365 message: "external sort spill segment truncated".into(),
1366 }));
1367 }
1368
1369 let mut pk = vec![0u8; pk_len];
1370 spill.read_temp_payload_into(meta.offset, *pos as u64, &mut pk)?;
1371 *pos += pk_len;
1372
1373 Ok(Some((none_flag, key, pk)))
1374}
1375
1376#[derive(Clone)]
1377struct HeapItem {
1378 run_idx: usize,
1379 none_flag: u8,
1380 sort_key: Vec<u8>,
1381 pk: Vec<u8>,
1382 dir: OrderDirection,
1383}
1384
1385impl PartialEq for HeapItem {
1386 fn eq(&self, other: &Self) -> bool {
1387 (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
1388 }
1389}
1390impl Eq for HeapItem {}
1391
1392impl PartialOrd for HeapItem {
1393 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1394 Some(self.cmp(other))
1395 }
1396}
1397impl Ord for HeapItem {
1398 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1399 let a = SortItem {
1401 none_flag: self.none_flag,
1402 sort_key: self.sort_key.clone(),
1403 key: (CollectionId(0), self.pk.clone()),
1404 };
1405 let b = SortItem {
1406 none_flag: other.none_flag,
1407 sort_key: other.sort_key.clone(),
1408 key: (CollectionId(0), other.pk.clone()),
1409 };
1410 cmp_sort_item(&a, &b, self.dir).reverse()
1411 }
1412}
1413
1414impl<'a, S: Store> ExternalSortSource<'a, S> {
1415 fn flush_sorted_run(
1416 spill: &mut crate::spill::TempSpillFile<S>,
1417 runs_meta: &mut Vec<RunMeta>,
1418 run: &mut Vec<SortItem>,
1419 dir: OrderDirection,
1420 ) -> Result<(), DbError> {
1421 if run.is_empty() {
1422 return Ok(());
1423 }
1424 run.sort_by(|a, b| cmp_sort_item(a, b, dir));
1425 let payload = encode_run(run, dir);
1426 let off = spill.append_temp_segment(&payload)?;
1427 runs_meta.push(RunMeta {
1428 offset: off,
1429 payload_len: payload.len() as u64,
1430 });
1431 run.clear();
1432 Ok(())
1433 }
1434
1435 fn new(
1436 mut spill: crate::spill::TempSpillFile<S>,
1437 latest: &'a crate::db::LatestMap,
1438 mut input: Box<dyn RowSource + 'a>,
1439 collection_id: u32,
1440 order_by: OrderBy,
1441 index_name: &str,
1442 ) -> Result<Self, DbError> {
1443 const RUN_KEYS: usize = 2048;
1444
1445 let dir = order_by.direction;
1446 let mut runs_meta: Vec<RunMeta> = Vec::new();
1447 let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
1448
1449 while let Some(rk) = input.next_key() {
1450 let rk = rk?;
1451 let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
1452 run.push(item);
1453 if run.len() >= RUN_KEYS {
1454 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1455 }
1456 }
1457
1458 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1459
1460 let mut run_cursors = vec![0usize; runs_meta.len()];
1462 let mut heap = std::collections::BinaryHeap::new();
1463 for (i, m) in runs_meta.iter().enumerate() {
1464 match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
1465 Ok(Some((none_flag, sort_key, pk))) => {
1466 heap.push(HeapItem {
1467 run_idx: i,
1468 none_flag,
1469 sort_key,
1470 pk: pk.clone(),
1471 dir,
1472 });
1473 }
1474 Ok(None) => {}
1475 Err(e) => return Err(e),
1476 }
1477 }
1478
1479 Ok(Self {
1480 spill,
1481 collection_id,
1482 dir,
1483 heap,
1484 runs_meta,
1485 run_cursors,
1486 _latest: latest,
1487 })
1488 }
1489}
1490
1491fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
1492 let mut out = Vec::new();
1493 for it in run {
1494 out.push(it.none_flag);
1495 out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
1496 out.extend_from_slice(&it.sort_key);
1497 out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
1498 out.extend_from_slice(&it.key.1);
1499 }
1500 out
1501}
1502
1503impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
1504 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
1505 let top = self.heap.pop()?;
1506 let run_idx = top.run_idx;
1507 let meta = self.runs_meta[run_idx].clone();
1508 match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
1509 Ok(Some((none_flag, sort_key, pk))) => {
1510 self.heap.push(HeapItem {
1511 run_idx,
1512 none_flag,
1513 sort_key,
1514 pk: pk.clone(),
1515 dir: self.dir,
1516 });
1517 }
1518 Ok(None) => {}
1519 Err(e) => return Some(Err(e)),
1520 }
1521 Some(Ok((CollectionId(self.collection_id), top.pk)))
1522 }
1523}
1524
1525struct ExternalSortSourceOwned<S: Store = FileStore> {
1526 spill: crate::spill::TempSpillFile<S>,
1527 collection_id: u32,
1528 dir: OrderDirection,
1529 heap: std::collections::BinaryHeap<HeapItem>,
1530 runs_meta: Vec<RunMeta>,
1531 run_cursors: Vec<usize>,
1532 _snapshot: Arc<SharedDbState>,
1533}
1534
1535impl<S: Store> ExternalSortSourceOwned<S> {
1536 fn new(
1537 mut spill: crate::spill::TempSpillFile<S>,
1538 snapshot: Arc<SharedDbState>,
1539 mut input: Box<dyn RowSource + 'static>,
1540 collection_id: u32,
1541 order_by: OrderBy,
1542 index_name: &str,
1543 ) -> Result<Self, DbError> {
1544 const RUN_KEYS: usize = 2048;
1545
1546 let dir = order_by.direction;
1547 let mut runs_meta: Vec<RunMeta> = Vec::new();
1548 let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
1549 let latest = &snapshot.latest;
1550
1551 while let Some(rk) = input.next_key() {
1552 let rk = rk?;
1553 let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
1554 run.push(item);
1555 if run.len() >= RUN_KEYS {
1556 ExternalSortSource::<S>::flush_sorted_run(
1557 &mut spill,
1558 &mut runs_meta,
1559 &mut run,
1560 dir,
1561 )?;
1562 }
1563 }
1564
1565 ExternalSortSource::<S>::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1566
1567 let mut run_cursors = vec![0usize; runs_meta.len()];
1568 let mut heap = std::collections::BinaryHeap::new();
1569 for (i, m) in runs_meta.iter().enumerate() {
1570 match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
1571 Ok(Some((none_flag, sort_key, pk))) => {
1572 heap.push(HeapItem {
1573 run_idx: i,
1574 none_flag,
1575 sort_key,
1576 pk: pk.clone(),
1577 dir,
1578 });
1579 }
1580 Ok(None) => {}
1581 Err(e) => return Err(e),
1582 }
1583 }
1584
1585 Ok(Self {
1586 spill,
1587 collection_id,
1588 dir,
1589 heap,
1590 runs_meta,
1591 run_cursors,
1592 _snapshot: snapshot,
1593 })
1594 }
1595}
1596
1597impl<S: Store> RowSource for ExternalSortSourceOwned<S> {
1598 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
1599 let top = self.heap.pop()?;
1600 let run_idx = top.run_idx;
1601 let meta = self.runs_meta[run_idx].clone();
1602 match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
1603 Ok(Some((none_flag, sort_key, pk))) => {
1604 self.heap.push(HeapItem {
1605 run_idx,
1606 none_flag,
1607 sort_key,
1608 pk: pk.clone(),
1609 dir: self.dir,
1610 });
1611 }
1612 Ok(None) => {}
1613 Err(e) => return Some(Err(e)),
1614 }
1615 Some(Ok((CollectionId(self.collection_id), top.pk)))
1616 }
1617}
1618
1619fn plan_query(
1620 collection: CollectionId,
1621 indexes: &[crate::schema::IndexDef],
1622 query: &Query,
1623) -> Plan {
1624 let Some(pred) = query.predicate.clone() else {
1625 return Plan::CollectionScan {
1626 collection_id: collection.0,
1627 predicate: None,
1628 limit: query.limit,
1629 order_by: query.order_by.clone(),
1630 };
1631 };
1632
1633 let (best, residual) = match choose_index(indexes, &pred) {
1634 None => (None, Some(pred)),
1635 Some(choice) => {
1636 let used = match &choice {
1637 IndexChoice::Eq { used, .. } | IndexChoice::Range { used, .. } => used.clone(),
1638 };
1639 let residual = remove_used_predicate(pred, used);
1640 (Some(choice), residual)
1641 }
1642 };
1643
1644 if let Some(choice) = best {
1645 match choice {
1646 IndexChoice::Eq { idx, value, .. } => Plan::IndexLookup {
1647 collection_id: collection.0,
1648 index_name: idx.name.clone(),
1649 kind: idx.kind,
1650 key: value.canonical_key_bytes(),
1651 residual,
1652 limit: query.limit,
1653 order_by: query.order_by.clone(),
1654 },
1655 IndexChoice::Range { idx, key_range, .. } => Plan::IndexRangeLookup {
1656 collection_id: collection.0,
1657 index_name: idx.name.clone(),
1658 kind: idx.kind,
1659 key_range,
1660 residual,
1661 limit: query.limit,
1662 order_by: query.order_by.clone(),
1663 },
1664 }
1665 } else {
1666 Plan::CollectionScan {
1667 collection_id: collection.0,
1668 predicate: residual,
1669 limit: query.limit,
1670 order_by: query.order_by.clone(),
1671 }
1672 }
1673}
1674
1675#[derive(Debug, Clone, PartialEq)]
1676enum IndexChoice<'a> {
1677 Eq {
1678 idx: &'a crate::schema::IndexDef,
1679 value: ScalarValue,
1680 used: Predicate,
1681 },
1682 Range {
1683 idx: &'a crate::schema::IndexDef,
1684 key_range: IndexKeyRange,
1685 used: Predicate,
1686 },
1687}
1688
1689fn is_range_indexable(value: &ScalarValue) -> bool {
1690 matches!(value, ScalarValue::Int64(_) | ScalarValue::String(_))
1691}
1692
1693fn merge_lo_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
1694 match ¤t.0 {
1695 None => *current = (Some(value), inclusive),
1696 Some(existing) => match scalar_partial_cmp(&value, existing) {
1697 Some(std::cmp::Ordering::Greater) => *current = (Some(value), inclusive),
1698 Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
1699 _ => {}
1700 },
1701 }
1702}
1703
1704fn merge_hi_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
1705 match ¤t.0 {
1706 None => *current = (Some(value), inclusive),
1707 Some(existing) => match scalar_partial_cmp(&value, existing) {
1708 Some(std::cmp::Ordering::Less) => *current = (Some(value), inclusive),
1709 Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
1710 _ => {}
1711 },
1712 }
1713}
1714
1715fn extract_range_on_path(path: &FieldPath, pred: &Predicate) -> Option<IndexKeyRange> {
1716 let mut lo: (Option<ScalarValue>, bool) = (None, true);
1717 let mut hi: (Option<ScalarValue>, bool) = (None, true);
1718 let mut any = false;
1719
1720 let mut visit = |p: &Predicate| match p {
1721 Predicate::Gte { path: pp, value } if pp == path && is_range_indexable(value) => {
1722 merge_lo_bound(&mut lo, value.clone(), true);
1723 any = true;
1724 }
1725 Predicate::Gt { path: pp, value } if pp == path && is_range_indexable(value) => {
1726 merge_lo_bound(&mut lo, value.clone(), false);
1727 any = true;
1728 }
1729 Predicate::Lte { path: pp, value } if pp == path && is_range_indexable(value) => {
1730 merge_hi_bound(&mut hi, value.clone(), true);
1731 any = true;
1732 }
1733 Predicate::Lt { path: pp, value } if pp == path && is_range_indexable(value) => {
1734 merge_hi_bound(&mut hi, value.clone(), false);
1735 any = true;
1736 }
1737 _ => {}
1738 };
1739
1740 match pred {
1741 Predicate::Gte { .. }
1742 | Predicate::Gt { .. }
1743 | Predicate::Lte { .. }
1744 | Predicate::Lt { .. } => visit(pred),
1745 Predicate::And(items) => {
1746 for item in items {
1747 visit(item);
1748 }
1749 }
1750 _ => return None,
1751 }
1752
1753 if !any {
1754 return None;
1755 }
1756 Some(IndexKeyRange {
1757 lo: lo.0,
1758 lo_inclusive: lo.1,
1759 hi: hi.0,
1760 hi_inclusive: hi.1,
1761 })
1762}
1763
1764fn range_used_predicate(path: &FieldPath, pred: &Predicate) -> Option<Predicate> {
1765 match pred {
1766 Predicate::Gte { path: p, .. }
1767 | Predicate::Gt { path: p, .. }
1768 | Predicate::Lte { path: p, .. }
1769 | Predicate::Lt { path: p, .. }
1770 if p == path =>
1771 {
1772 Some(pred.clone())
1773 }
1774 Predicate::And(items) => {
1775 let used: Vec<Predicate> = items
1776 .iter()
1777 .filter(|p| {
1778 matches!(
1779 p,
1780 Predicate::Gte { path: pp, .. }
1781 | Predicate::Gt { path: pp, .. }
1782 | Predicate::Lte { path: pp, .. }
1783 | Predicate::Lt { path: pp, .. } if pp == path
1784 )
1785 })
1786 .cloned()
1787 .collect();
1788 match used.len() {
1789 0 => None,
1790 1 => Some(used.into_iter().next().unwrap()),
1791 _ => Some(Predicate::And(used)),
1792 }
1793 }
1794 _ => None,
1795 }
1796}
1797
1798fn try_range_index<'a>(
1799 indexes: &'a [crate::schema::IndexDef],
1800 pred: &Predicate,
1801) -> Option<IndexChoice<'a>> {
1802 for idx in indexes {
1803 if let Some(range) = extract_range_on_path(&idx.path, pred) {
1804 if let Some(used) = range_used_predicate(&idx.path, pred) {
1805 return Some(IndexChoice::Range {
1806 idx,
1807 key_range: range,
1808 used,
1809 });
1810 }
1811 }
1812 }
1813 None
1814}
1815
1816fn choose_index<'a>(
1817 indexes: &'a [crate::schema::IndexDef],
1818 pred: &Predicate,
1819) -> Option<IndexChoice<'a>> {
1820 match pred {
1821 Predicate::Eq { path, value } => {
1822 indexes
1823 .iter()
1824 .find(|idx| &idx.path == path)
1825 .map(|idx| IndexChoice::Eq {
1826 idx,
1827 value: value.clone(),
1828 used: pred.clone(),
1829 })
1830 }
1831 Predicate::Lt { .. }
1832 | Predicate::Lte { .. }
1833 | Predicate::Gt { .. }
1834 | Predicate::Gte { .. } => try_range_index(indexes, pred),
1835 Predicate::Or(_) => None,
1836 Predicate::And(items) => {
1837 let range = try_range_index(indexes, pred);
1838 let mut unique_eq: Option<IndexChoice<'a>> = None;
1839 let mut any_eq: Option<IndexChoice<'a>> = None;
1840 for p in items {
1841 if let Some(IndexChoice::Eq { idx, value, used }) = choose_index(indexes, p) {
1842 if idx.kind == IndexKind::Unique {
1843 unique_eq = Some(IndexChoice::Eq { idx, value, used });
1844 } else if any_eq.is_none() {
1845 any_eq = Some(IndexChoice::Eq { idx, value, used });
1846 }
1847 }
1848 }
1849 if let Some(u) = unique_eq {
1850 return Some(u);
1851 }
1852 if let Some(r) = range {
1853 return Some(r);
1854 }
1855 any_eq
1856 }
1857 }
1858}
1859
1860fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
1861 if pred == used {
1862 return None;
1863 }
1864 match pred {
1865 Predicate::And(items) => {
1866 let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
1867 match out.len() {
1868 0 => None,
1869 1 => Some(out.remove(0)),
1870 _ => Some(Predicate::And(out)),
1871 }
1872 }
1873 _ => Some(pred),
1874 }
1875}
1876
1877fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
1878 match pred {
1879 Predicate::Eq { path, value } => scalar_at_path(row, path)
1880 .map(|s| &s == value)
1881 .unwrap_or(false),
1882 Predicate::Lt { path, value } => scalar_at_path(row, path)
1883 .and_then(|s| scalar_partial_cmp(&s, value))
1884 .map(|o| o.is_lt())
1885 .unwrap_or(false),
1886 Predicate::Lte { path, value } => scalar_at_path(row, path)
1887 .and_then(|s| scalar_partial_cmp(&s, value))
1888 .map(|o| o.is_lt() || o.is_eq())
1889 .unwrap_or(false),
1890 Predicate::Gt { path, value } => scalar_at_path(row, path)
1891 .and_then(|s| scalar_partial_cmp(&s, value))
1892 .map(|o| o.is_gt())
1893 .unwrap_or(false),
1894 Predicate::Gte { path, value } => scalar_at_path(row, path)
1895 .and_then(|s| scalar_partial_cmp(&s, value))
1896 .map(|o| o.is_gt() || o.is_eq())
1897 .unwrap_or(false),
1898 Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1899 Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1900 }
1901}
1902
1903fn apply_order_by_and_limit(
1904 rows: &mut Vec<BTreeMap<String, RowValue>>,
1905 order_by: Option<&OrderBy>,
1906 limit: Option<usize>,
1907 pk_field: Option<&str>,
1908) {
1909 const TOPK_ORDER_BY_LIMIT: usize = 1024;
1910
1911 if let Some(ob) = order_by {
1912 let pk_path: Option<FieldPath> =
1913 pk_field.map(|name| FieldPath(vec![Cow::Owned(name.to_string())]));
1914
1915 if let Some(n) = limit {
1916 if n <= TOPK_ORDER_BY_LIMIT && rows.len() > n {
1917 topk_order_by(rows, ob, n, pk_path.as_ref());
1918 return;
1919 }
1920 }
1921
1922 rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path.as_ref()));
1923 }
1924 if let Some(n) = limit {
1925 rows.truncate(n);
1926 }
1927}
1928
1929fn compare_rows_for_order(
1930 a: &BTreeMap<String, RowValue>,
1931 b: &BTreeMap<String, RowValue>,
1932 ob: &OrderBy,
1933 pk_path: Option<&FieldPath>,
1934) -> std::cmp::Ordering {
1935 let av = scalar_at_path(a, &ob.path);
1936 let bv = scalar_at_path(b, &ob.path);
1937 let mut ord = match (av, bv) {
1938 (None, None) => std::cmp::Ordering::Equal,
1939 (None, Some(_)) => std::cmp::Ordering::Greater,
1940 (Some(_), None) => std::cmp::Ordering::Less,
1941 (Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
1942 };
1943 if ord == std::cmp::Ordering::Equal {
1944 if let Some(path) = pk_path {
1945 let apk = scalar_at_path(a, path);
1946 let bpk = scalar_at_path(b, path);
1947 ord = match (apk, bpk) {
1948 (None, None) => std::cmp::Ordering::Equal,
1949 (None, Some(_)) => std::cmp::Ordering::Greater,
1950 (Some(_), None) => std::cmp::Ordering::Less,
1951 (Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
1952 };
1953 }
1954 }
1955 match ob.direction {
1956 OrderDirection::Asc => ord,
1957 OrderDirection::Desc => ord.reverse(),
1958 }
1959}
1960
1961fn topk_order_by(
1962 rows: &mut Vec<BTreeMap<String, RowValue>>,
1963 ob: &OrderBy,
1964 k: usize,
1965 pk_path: Option<&FieldPath>,
1966) {
1967 if rows.len() <= k {
1968 rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
1969 return;
1970 }
1971 rows.select_nth_unstable_by(k - 1, |a, b| compare_rows_for_order(a, b, ob, pk_path));
1972 rows.truncate(k);
1973 rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
1974}
1975
1976fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1977 use ScalarValue::*;
1978 match (a, b) {
1979 (Bool(x), Bool(y)) => Some(x.cmp(y)),
1980 (Int64(x), Int64(y)) => Some(x.cmp(y)),
1981 (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1982 (Float64(x), Float64(y)) => x.partial_cmp(y),
1983 (String(x), String(y)) => Some(x.cmp(y)),
1984 (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1985 (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1986 (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1987 _ => None,
1988 }
1989}
1990
1991#[cfg(test)]
1992mod tests {
1993 include!(concat!(
1994 env!("CARGO_MANIFEST_DIR"),
1995 "/tests/unit/src_query_planner_tests.rs"
1996 ));
1997}