1#[cfg(test)]
2use std::cell::RefCell;
3use std::collections::hash_map::Iter as HashMapIter;
4use std::collections::BTreeMap;
5
6use crate::catalog::Catalog;
7use crate::db::scalar_at_path;
8use crate::error::{DbError, SchemaError};
9use crate::index::IndexState;
10use crate::record::RowValue;
11use crate::schema::{CollectionId, IndexKind};
12use crate::storage::{FileStore, Store};
13use crate::ScalarValue;
14
15use super::ast::{OrderBy, OrderDirection};
16use super::ast::{Predicate, Query};
17use super::operators::{LimitOp, RowKey, RowSource};
18
19fn row_for_index_pk(
20 latest: &crate::db::LatestMap,
21 collection_id: u32,
22 pk_key: Vec<u8>,
23 index_name: &str,
24) -> Result<BTreeMap<String, RowValue>, DbError> {
25 latest
26 .get(&(collection_id, pk_key))
27 .cloned()
28 .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
29 collection_id,
30 index_name: index_name.to_string(),
31 }))
32}
33
34#[derive(Debug, Clone, PartialEq)]
35enum Plan {
36 IndexLookup {
37 collection_id: u32,
38 index_name: String,
39 kind: IndexKind,
40 key: Vec<u8>,
41 residual: Option<Predicate>,
42 limit: Option<usize>,
43 order_by: Option<OrderBy>,
44 },
45 CollectionScan {
46 collection_id: u32,
47 predicate: Option<Predicate>,
48 limit: Option<usize>,
49 order_by: Option<OrderBy>,
50 },
51}
52
53pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
54 let col =
55 catalog
56 .get(query.collection)
57 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
58 id: query.collection.0,
59 }))?;
60 let plan = plan_query(col.id, &col.indexes, query);
61 #[cfg(feature = "tracing")]
62 tracing::debug!(plan = ?plan, "explain_query");
63 Ok(match plan {
64 Plan::IndexLookup {
65 index_name,
66 kind,
67 residual,
68 limit,
69 order_by,
70 ..
71 } => {
72 let mut s = String::new();
73 s.push_str("Plan:\n");
74 s.push_str(&format!(
75 " IndexLookup index={index_name:?} kind={kind:?}\n"
76 ));
77 if let Some(r) = residual {
78 s.push_str(&format!(" ResidualFilter {r:?}\n"));
79 }
80 if let Some(n) = limit {
81 s.push_str(&format!(" Limit {n}\n"));
82 }
83 if let Some(ob) = order_by {
84 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
85 }
86 s
87 }
88 Plan::CollectionScan {
89 predicate,
90 limit,
91 order_by,
92 ..
93 } => {
94 let mut s = String::new();
95 s.push_str("Plan:\n");
96 s.push_str(" CollectionScan\n");
97 if let Some(p) = predicate {
98 s.push_str(&format!(" Filter {p:?}\n"));
99 }
100 if let Some(n) = limit {
101 s.push_str(&format!(" Limit {n}\n"));
102 }
103 if let Some(ob) = order_by {
104 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
105 }
106 s
107 }
108 })
109}
110
111pub fn execute_query(
112 catalog: &Catalog,
113 indexes: &IndexState,
114 latest: &crate::db::LatestMap,
115 query: &Query,
116) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
117 let col =
118 catalog
119 .get(query.collection)
120 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
121 id: query.collection.0,
122 }))?;
123 let plan = plan_query(col.id, &col.indexes, query);
124
125 #[cfg(feature = "tracing")]
126 tracing::debug!(plan = ?plan, "execute_query");
127
128 match plan {
129 Plan::IndexLookup {
130 collection_id,
131 index_name,
132 kind,
133 key,
134 residual,
135 limit,
136 order_by,
137 } => {
138 let mut out = Vec::new();
139
140 match kind {
141 IndexKind::Unique => {
142 if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
143 out.push(row_for_index_pk(
144 latest,
145 collection_id,
146 pk.to_vec(),
147 &index_name,
148 )?);
149 }
150 }
151 IndexKind::NonUnique => {
152 if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
153 for pk in pks {
154 out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
155 }
156 }
157 }
158 }
159
160 if let Some(pred) = residual {
161 out.retain(|row| eval_predicate(row, &pred));
162 }
163 apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
164 Ok(out)
165 }
166 Plan::CollectionScan {
167 collection_id,
168 predicate,
169 limit,
170 order_by,
171 } => {
172 let mut out = Vec::new();
173 for ((cid, _pk), row) in latest.iter() {
174 if *cid != collection_id {
175 continue;
176 }
177 if let Some(ref p) = predicate {
178 if !eval_predicate(row, p) {
179 continue;
180 }
181 }
182 out.push(row.clone());
183 }
184 apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
185 Ok(out)
186 }
187 }
188}
189
190pub struct QueryRowIter<'a> {
196 state: QueryRowIterState<'a>,
197}
198
199enum QueryRowIterState<'a> {
200 Vec {
201 rows: Vec<BTreeMap<String, RowValue>>,
202 pos: usize,
203 },
204 Source {
205 latest: &'a crate::db::LatestMap,
206 source: Box<dyn RowSource + 'a>,
207 },
208}
209
210impl<'a> Iterator for QueryRowIter<'a> {
211 type Item = Result<BTreeMap<String, RowValue>, DbError>;
212
213 fn next(&mut self) -> Option<Self::Item> {
214 match &mut self.state {
215 QueryRowIterState::Vec { rows, pos } => {
216 if *pos >= rows.len() {
217 None
218 } else {
219 let out = rows[*pos].clone();
220 *pos += 1;
221 Some(Ok(out))
222 }
223 }
224 QueryRowIterState::Source { latest, source } => loop {
225 let rk = source.next_key()?;
226 match rk {
227 Err(e) => return Some(Err(e)),
228 Ok((cid, pk_key)) => {
229 if let Some(row) = latest.get(&(cid.0, pk_key)).cloned() {
230 return Some(Ok(row));
231 }
232 continue;
233 }
234 }
235 },
236 }
237 }
238}
239
240struct IndexUniqueSource<'a> {
241 latest: &'a crate::db::LatestMap,
242 collection_id: u32,
243 index_name: String,
244 pk: Option<Vec<u8>>,
245 residual: Option<Predicate>,
246 done: bool,
247}
248
249impl RowSource for IndexUniqueSource<'_> {
250 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
251 if self.done {
252 return None;
253 }
254 self.done = true;
255 let pk_key = self.pk.take()?;
256 let row = match row_for_index_pk(
257 self.latest,
258 self.collection_id,
259 pk_key.clone(),
260 &self.index_name,
261 ) {
262 Ok(r) => r,
263 Err(e) => return Some(Err(e)),
264 };
265 if let Some(pred) = &self.residual {
266 if !eval_predicate(&row, pred) {
267 return None;
268 }
269 }
270 Some(Ok((CollectionId(self.collection_id), pk_key)))
271 }
272}
273
274struct IndexNonUniqueSource<'a> {
275 latest: &'a crate::db::LatestMap,
276 collection_id: u32,
277 index_name: String,
278 pks: std::vec::IntoIter<Vec<u8>>,
279 residual: Option<Predicate>,
280}
281
282impl RowSource for IndexNonUniqueSource<'_> {
283 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
284 for pk_key in self.pks.by_ref() {
285 let row = match row_for_index_pk(
286 self.latest,
287 self.collection_id,
288 pk_key.clone(),
289 &self.index_name,
290 ) {
291 Ok(r) => r,
292 Err(e) => return Some(Err(e)),
293 };
294 if let Some(pred) = &self.residual {
295 if !eval_predicate(&row, pred) {
296 continue;
297 }
298 }
299 return Some(Ok((CollectionId(self.collection_id), pk_key)));
300 }
301 None
302 }
303}
304
305struct ScanSource<'a> {
306 it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
307 collection_id: u32,
308 predicate: Option<Predicate>,
309}
310
311impl RowSource for ScanSource<'_> {
312 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
313 for (&(cid, ref pk_key), row) in self.it.by_ref() {
314 if cid != self.collection_id {
315 continue;
316 }
317 if let Some(p) = &self.predicate {
318 if !eval_predicate(row, p) {
319 continue;
320 }
321 }
322 return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
323 }
324 None
325 }
326}
327
328pub fn execute_query_iter<'a>(
330 catalog: &'a Catalog,
331 indexes: &'a IndexState,
332 latest: &'a crate::db::LatestMap,
333 query: &Query,
334) -> Result<QueryRowIter<'a>, DbError> {
335 if query.order_by.is_some() {
336 return Ok(QueryRowIter {
337 state: QueryRowIterState::Vec {
338 rows: execute_query(catalog, indexes, latest, query)?,
339 pos: 0,
340 },
341 });
342 }
343 let col =
344 catalog
345 .get(query.collection)
346 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
347 id: query.collection.0,
348 }))?;
349 let plan = plan_query(col.id, &col.indexes, query);
350 let mut source: Box<dyn RowSource + 'a> = match plan {
351 Plan::IndexLookup {
352 collection_id,
353 index_name,
354 kind,
355 key,
356 residual,
357 ..
358 } => match kind {
359 IndexKind::Unique => {
360 let pk = indexes
361 .unique_lookup(collection_id, &index_name, &key)
362 .map(|p| p.to_vec());
363 Box::new(IndexUniqueSource {
364 latest,
365 collection_id,
366 index_name,
367 pk,
368 residual,
369 done: false,
370 })
371 }
372 IndexKind::NonUnique => {
373 let pks = indexes
374 .non_unique_lookup(collection_id, &index_name, &key)
375 .unwrap_or_default()
376 .into_iter();
377 Box::new(IndexNonUniqueSource {
378 latest,
379 collection_id,
380 index_name,
381 pks,
382 residual,
383 })
384 }
385 },
386 Plan::CollectionScan {
387 collection_id,
388 predicate,
389 ..
390 } => Box::new(ScanSource {
391 it: latest.iter(),
392 collection_id,
393 predicate,
394 }),
395 };
396
397 if let Some(n) = query.limit {
398 source = Box::new(LimitOp::new(source, n));
399 }
400
401 Ok(QueryRowIter {
402 state: QueryRowIterState::Source { latest, source },
403 })
404}
405
406#[cfg(test)]
407type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
408#[cfg(test)]
409type SortedQuerySpillStoreOverrideHook =
410 Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
411
412#[cfg(test)]
413thread_local! {
414 static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
415 RefCell::new(None);
416
417 static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
418 RefCell::new(None);
419}
420
421#[cfg(test)]
423pub(crate) fn test_set_sorted_query_spill_store_open_hook(
424 hook: Option<SortedQuerySpillStoreOpenHook>,
425) {
426 QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
427 *c.borrow_mut() = hook;
428 });
429}
430
431#[cfg(test)]
433pub(crate) fn test_set_sorted_query_spill_store_override_hook(
434 hook: Option<SortedQuerySpillStoreOverrideHook>,
435) {
436 QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
437 *c.borrow_mut() = hook;
438 });
439}
440
441pub(crate) enum SortedQuerySpillStore {
442 File(FileStore),
443 #[cfg(test)]
444 FailLen,
445}
446
447impl Store for SortedQuerySpillStore {
448 fn len(&self) -> Result<u64, DbError> {
449 match self {
450 Self::File(f) => f.len(),
451 #[cfg(test)]
452 Self::FailLen => Err(DbError::Io(std::io::Error::other(
453 "sorted query spill store synthetic len() failure (test override)",
454 ))),
455 }
456 }
457
458 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
459 match self {
460 Self::File(f) => f.read_exact_at(offset, buf),
461 #[cfg(test)]
462 Self::FailLen => Err(DbError::Io(std::io::Error::other(
463 "sorted query spill store synthetic read failure (test override)",
464 ))),
465 }
466 }
467
468 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
469 match self {
470 Self::File(f) => f.write_all_at(offset, buf),
471 #[cfg(test)]
472 Self::FailLen => Err(DbError::Io(std::io::Error::other(
473 "sorted query spill store synthetic write failure (test override)",
474 ))),
475 }
476 }
477
478 fn sync(&mut self) -> Result<(), DbError> {
479 match self {
480 Self::File(f) => f.sync(),
481 #[cfg(test)]
482 Self::FailLen => Ok(()),
483 }
484 }
485
486 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
487 match self {
488 Self::File(f) => f.truncate(len),
489 #[cfg(test)]
490 Self::FailLen => Ok(()),
491 }
492 }
493}
494
495fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
496 #[cfg(test)]
497 {
498 let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
499 let mut bm = c.borrow_mut();
500 bm.as_mut().map(|hook| hook(path))
501 });
502 if let Some(r) = overridden {
503 return r;
504 }
505
506 let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
507 let mut bm = c.borrow_mut();
508 bm.as_mut().map(|hook| hook(path))
509 });
510 if let Some(r) = hooked {
511 return r.map(SortedQuerySpillStore::File);
512 }
513 }
514 let _ = path;
515 let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
517 Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
518}
519
520pub fn execute_query_iter_with_spill_path<'a>(
525 catalog: &'a Catalog,
526 indexes: &'a IndexState,
527 latest: &'a crate::db::LatestMap,
528 q: &Query,
529 db_path: Option<&std::path::Path>,
530) -> Result<QueryRowIter<'a>, DbError> {
531 if q.order_by.is_none() {
532 return execute_query_iter(catalog, indexes, latest, q);
533 }
534 let order_by = q
535 .order_by
536 .clone()
537 .expect("order_by is Some when this function continues");
538
539 let Some(path) = db_path else {
541 return Ok(QueryRowIter {
542 state: QueryRowIterState::Vec {
543 rows: execute_query(catalog, indexes, latest, q)?,
544 pos: 0,
545 },
546 });
547 };
548
549 let col = catalog
550 .get(q.collection)
551 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
552 id: q.collection.0,
553 }))?;
554 let plan = plan_query(col.id, &col.indexes, q);
555
556 let base: Box<dyn RowSource + 'a> = match plan.clone() {
557 Plan::IndexLookup {
558 collection_id,
559 index_name,
560 kind,
561 key,
562 residual,
563 ..
564 } => match kind {
565 IndexKind::Unique => Box::new(IndexUniqueSource {
566 latest,
567 collection_id,
568 index_name: index_name.clone(),
569 pk: indexes
570 .unique_lookup(collection_id, &index_name, &key)
571 .map(|p| p.to_vec()),
572 residual,
573 done: false,
574 }),
575 IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
576 latest,
577 collection_id,
578 index_name: index_name.clone(),
579 pks: indexes
580 .non_unique_lookup(collection_id, &index_name, &key)
581 .unwrap_or_default()
582 .into_iter(),
583 residual,
584 }),
585 },
586 Plan::CollectionScan {
587 collection_id,
588 predicate,
589 ..
590 } => Box::new(ScanSource {
591 it: latest.iter(),
592 collection_id,
593 predicate,
594 }),
595 };
596
597 let spill_store = open_sorted_query_spill_store(path)?;
599 #[cfg(feature = "tracing")]
600 tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
601 let spill = crate::spill::TempSpillFile::new(spill_store)?;
602 let sort_source = Box::new(ExternalSortSource::new(
603 spill, latest, base, col.id.0, order_by,
604 )?);
605
606 let mut source: Box<dyn RowSource + 'a> = sort_source;
607 if let Some(n) = q.limit {
608 source = Box::new(LimitOp::new(source, n));
609 }
610
611 Ok(QueryRowIter {
612 state: QueryRowIterState::Source { latest, source },
613 })
614}
615
616#[derive(Clone)]
617struct SortItem {
618 none_flag: u8,
620 sort_key: Vec<u8>,
621 key: RowKey,
622}
623
624fn sort_item_for(
625 latest: &crate::db::LatestMap,
626 key: &RowKey,
627 order_by: &OrderBy,
628) -> Option<SortItem> {
629 let (cid, pk) = key;
630 let row = latest.get(&(cid.0, pk.clone()))?;
631 let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
632 None => (1u8, Vec::new()),
633 Some(s) => (0u8, scalar_sort_key_bytes(&s)),
634 };
635 Some(SortItem {
636 none_flag,
637 sort_key,
638 key: (CollectionId(cid.0), pk.clone()),
639 })
640}
641
642fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
643 match s {
644 ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
645 ScalarValue::Int64(v) => {
646 let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
647 let mut out = vec![1];
648 out.extend_from_slice(&u.to_be_bytes());
649 out
650 }
651 ScalarValue::Uint64(v) => {
652 let mut out = vec![2];
653 out.extend_from_slice(&v.to_be_bytes());
654 out
655 }
656 ScalarValue::Float64(v) => {
657 let mut bits = v.to_bits();
658 if bits & (1u64 << 63) != 0 {
659 bits = !bits;
660 } else {
661 bits ^= 1u64 << 63;
662 }
663 let mut out = vec![3];
664 out.extend_from_slice(&bits.to_be_bytes());
665 out
666 }
667 ScalarValue::String(st) => {
668 let mut out = vec![4];
669 out.extend_from_slice(st.as_bytes());
670 out
671 }
672 ScalarValue::Bytes(b) => {
673 let mut out = vec![5];
674 out.extend_from_slice(b);
675 out
676 }
677 ScalarValue::Uuid(u) => {
678 let mut out = vec![6];
679 out.extend_from_slice(u);
680 out
681 }
682 ScalarValue::Timestamp(t) => {
683 let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
684 let mut out = vec![7];
685 out.extend_from_slice(&u.to_be_bytes());
686 out
687 }
688 }
689}
690
691fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
692 let ord = a
693 .none_flag
694 .cmp(&b.none_flag)
695 .then_with(|| a.sort_key.cmp(&b.sort_key))
696 .then_with(|| a.key.1.cmp(&b.key.1));
697 match dir {
698 OrderDirection::Asc => ord,
699 OrderDirection::Desc => ord.reverse(),
700 }
701}
702
703struct ExternalSortSource<'a, S: Store = FileStore> {
706 _spill: crate::spill::TempSpillFile<S>,
707 collection_id: u32,
708 dir: OrderDirection,
709 heap: std::collections::BinaryHeap<HeapItem>,
710 runs: Vec<RunReader>,
711 _latest: &'a crate::db::LatestMap,
712}
713
714#[derive(Clone)]
715struct RunMeta {
716 offset: u64,
717 payload_len: u64,
718}
719
720struct RunReader {
721 buf: Vec<u8>,
722 pos: usize,
723}
724
725impl RunReader {
726 fn new(buf: Vec<u8>) -> Self {
727 Self { buf, pos: 0 }
728 }
729
730 fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
731 fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
732 let b = buf.get(*pos..*pos + 4)?;
733 *pos += 4;
734 Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
735 }
736 let none_flag = *self.buf.get(self.pos)?;
737 self.pos += 1;
738 let key_len = read_u32(&self.buf, &mut self.pos)? as usize;
739 let key = self.buf.get(self.pos..self.pos + key_len)?.to_vec();
740 self.pos += key_len;
741 let pk_len = read_u32(&self.buf, &mut self.pos)? as usize;
742 let pk = self.buf.get(self.pos..self.pos + pk_len)?.to_vec();
743 self.pos += pk_len;
744 Some((none_flag, key, pk))
745 }
746}
747
748#[derive(Clone)]
749struct HeapItem {
750 run_idx: usize,
751 none_flag: u8,
752 sort_key: Vec<u8>,
753 pk: Vec<u8>,
754 dir: OrderDirection,
755}
756
757impl PartialEq for HeapItem {
758 fn eq(&self, other: &Self) -> bool {
759 (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
760 }
761}
762impl Eq for HeapItem {}
763
764impl PartialOrd for HeapItem {
765 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
766 Some(self.cmp(other))
767 }
768}
769impl Ord for HeapItem {
770 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
771 let a = SortItem {
773 none_flag: self.none_flag,
774 sort_key: self.sort_key.clone(),
775 key: (CollectionId(0), self.pk.clone()),
776 };
777 let b = SortItem {
778 none_flag: other.none_flag,
779 sort_key: other.sort_key.clone(),
780 key: (CollectionId(0), other.pk.clone()),
781 };
782 cmp_sort_item(&a, &b, self.dir).reverse()
783 }
784}
785
786impl<'a, S: Store> ExternalSortSource<'a, S> {
787 fn flush_sorted_run(
788 spill: &mut crate::spill::TempSpillFile<S>,
789 runs_meta: &mut Vec<RunMeta>,
790 run: &mut Vec<SortItem>,
791 dir: OrderDirection,
792 ) -> Result<(), DbError> {
793 if run.is_empty() {
794 return Ok(());
795 }
796 run.sort_by(|a, b| cmp_sort_item(a, b, dir));
797 let payload = encode_run(run, dir);
798 let off = spill.append_temp_segment(&payload)?;
799 runs_meta.push(RunMeta {
800 offset: off,
801 payload_len: payload.len() as u64,
802 });
803 run.clear();
804 Ok(())
805 }
806
807 fn new(
808 mut spill: crate::spill::TempSpillFile<S>,
809 latest: &'a crate::db::LatestMap,
810 mut input: Box<dyn RowSource + 'a>,
811 collection_id: u32,
812 order_by: OrderBy,
813 ) -> Result<Self, DbError> {
814 const RUN_KEYS: usize = 2048;
815
816 let dir = order_by.direction;
817 let mut runs_meta: Vec<RunMeta> = Vec::new();
818 let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
819
820 while let Some(rk) = input.next_key() {
821 let rk = rk?;
822 if let Some(item) = sort_item_for(latest, &rk, &order_by) {
823 run.push(item);
824 }
825 if run.len() >= RUN_KEYS {
826 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
827 }
828 }
829
830 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
831
832 let mut runs: Vec<RunReader> = Vec::new();
834 let mut heap = std::collections::BinaryHeap::new();
835 for (i, m) in runs_meta.into_iter().enumerate() {
836 let buf = spill.read_temp_payload(m.offset, m.payload_len)?;
837 let mut rr = RunReader::new(buf);
838 if let Some((none_flag, sort_key, pk)) = rr.next_item() {
839 heap.push(HeapItem {
840 run_idx: i,
841 none_flag,
842 sort_key,
843 pk: pk.clone(),
844 dir,
845 });
846 }
847 runs.push(rr);
848 }
849
850 Ok(Self {
851 _spill: spill,
852 collection_id,
853 dir,
854 heap,
855 runs,
856 _latest: latest,
857 })
858 }
859}
860
861fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
862 let mut out = Vec::new();
863 for it in run {
864 out.push(it.none_flag);
865 out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
866 out.extend_from_slice(&it.sort_key);
867 out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
868 out.extend_from_slice(&it.key.1);
869 }
870 out
871}
872
873impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
874 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
875 let top = self.heap.pop()?;
876 let run_idx = top.run_idx;
877 if let Some((none_flag, sort_key, pk)) = self.runs[run_idx].next_item() {
879 self.heap.push(HeapItem {
880 run_idx,
881 none_flag,
882 sort_key,
883 pk: pk.clone(),
884 dir: self.dir,
885 });
886 }
887 Some(Ok((CollectionId(self.collection_id), top.pk)))
888 }
889}
890
891fn plan_query(
892 collection: CollectionId,
893 indexes: &[crate::schema::IndexDef],
894 query: &Query,
895) -> Plan {
896 let Some(pred) = query.predicate.clone() else {
897 return Plan::CollectionScan {
898 collection_id: collection.0,
899 predicate: None,
900 limit: query.limit,
901 order_by: query.order_by.clone(),
902 };
903 };
904
905 let (best, residual) = match choose_index(indexes, &pred) {
906 None => (None, Some(pred)),
907 Some((idx, value, used_pred)) => {
908 let residual = remove_used_predicate(pred, used_pred);
909 (Some((idx, value)), residual)
910 }
911 };
912
913 if let Some((idx, value)) = best {
914 Plan::IndexLookup {
915 collection_id: collection.0,
916 index_name: idx.name.clone(),
917 kind: idx.kind,
918 key: value.canonical_key_bytes(),
919 residual,
920 limit: query.limit,
921 order_by: query.order_by.clone(),
922 }
923 } else {
924 Plan::CollectionScan {
925 collection_id: collection.0,
926 predicate: residual,
927 limit: query.limit,
928 order_by: query.order_by.clone(),
929 }
930 }
931}
932
933fn choose_index<'a>(
934 indexes: &'a [crate::schema::IndexDef],
935 pred: &Predicate,
936) -> Option<(&'a crate::schema::IndexDef, ScalarValue, Predicate)> {
937 match pred {
938 Predicate::Eq { path, value } => indexes
939 .iter()
940 .find(|idx| &idx.path == path)
941 .map(|idx| (idx, value.clone(), pred.clone())),
942 Predicate::Lt { .. }
943 | Predicate::Lte { .. }
944 | Predicate::Gt { .. }
945 | Predicate::Gte { .. }
946 | Predicate::Or(_) => None,
947 Predicate::And(items) => {
948 let mut best: Option<(&crate::schema::IndexDef, ScalarValue, Predicate)> = None;
950 for p in items {
951 if let Some((idx, v, used)) = choose_index(indexes, p) {
952 match best {
953 None => best = Some((idx, v, used)),
954 Some((best_idx, _, _)) => {
955 if best_idx.kind != IndexKind::Unique && idx.kind == IndexKind::Unique {
956 best = Some((idx, v, used));
957 }
958 }
959 }
960 }
961 }
962 best
963 }
964 }
965}
966
967fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
968 if pred == used {
969 return None;
970 }
971 match pred {
972 Predicate::And(items) => {
973 let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
974 match out.len() {
975 0 => None,
976 1 => Some(out.remove(0)),
977 _ => Some(Predicate::And(out)),
978 }
979 }
980 _ => Some(pred),
981 }
982}
983
984fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
985 match pred {
986 Predicate::Eq { path, value } => scalar_at_path(row, path)
987 .map(|s| &s == value)
988 .unwrap_or(false),
989 Predicate::Lt { path, value } => scalar_at_path(row, path)
990 .and_then(|s| scalar_partial_cmp(&s, value))
991 .map(|o| o.is_lt())
992 .unwrap_or(false),
993 Predicate::Lte { path, value } => scalar_at_path(row, path)
994 .and_then(|s| scalar_partial_cmp(&s, value))
995 .map(|o| o.is_lt() || o.is_eq())
996 .unwrap_or(false),
997 Predicate::Gt { path, value } => scalar_at_path(row, path)
998 .and_then(|s| scalar_partial_cmp(&s, value))
999 .map(|o| o.is_gt())
1000 .unwrap_or(false),
1001 Predicate::Gte { path, value } => scalar_at_path(row, path)
1002 .and_then(|s| scalar_partial_cmp(&s, value))
1003 .map(|o| o.is_gt() || o.is_eq())
1004 .unwrap_or(false),
1005 Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1006 Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1007 }
1008}
1009
1010fn apply_order_by_and_limit(
1011 rows: &mut Vec<BTreeMap<String, RowValue>>,
1012 order_by: Option<&OrderBy>,
1013 limit: Option<usize>,
1014) {
1015 if let Some(ob) = order_by {
1016 rows.sort_by(|a, b| {
1017 let av = scalar_at_path(a, &ob.path);
1018 let bv = scalar_at_path(b, &ob.path);
1019 let ord = match (av, bv) {
1020 (None, None) => std::cmp::Ordering::Equal,
1021 (None, Some(_)) => std::cmp::Ordering::Greater,
1022 (Some(_), None) => std::cmp::Ordering::Less,
1023 (Some(x), Some(y)) => {
1024 scalar_partial_cmp(&x, &y).unwrap_or(std::cmp::Ordering::Equal)
1025 }
1026 };
1027 match ob.direction {
1028 OrderDirection::Asc => ord,
1029 OrderDirection::Desc => ord.reverse(),
1030 }
1031 });
1032 }
1033 if let Some(n) = limit {
1034 rows.truncate(n);
1035 }
1036}
1037
1038fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1039 use ScalarValue::*;
1040 match (a, b) {
1041 (Bool(x), Bool(y)) => Some(x.cmp(y)),
1042 (Int64(x), Int64(y)) => Some(x.cmp(y)),
1043 (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1044 (Float64(x), Float64(y)) => x.partial_cmp(y),
1045 (String(x), String(y)) => Some(x.cmp(y)),
1046 (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1047 (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1048 (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1049 _ => None,
1050 }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 include!(concat!(
1056 env!("CARGO_MANIFEST_DIR"),
1057 "/tests/unit/src_query_planner_tests.rs"
1058 ));
1059}