1#[cfg(test)]
2use std::cell::RefCell;
3use std::collections::hash_map::Iter as HashMapIter;
4use std::collections::BTreeMap;
5
6use crate::catalog::Catalog;
7use crate::db::scalar_at_path;
8use crate::error::{DbError, SchemaError};
9use crate::index::IndexState;
10use crate::record::RowValue;
11use crate::schema::{CollectionId, IndexKind};
12use crate::storage::{FileStore, Store};
13use crate::ScalarValue;
14
15use super::ast::{OrderBy, OrderDirection};
16use super::ast::{Predicate, Query};
17use super::operators::{LimitOp, RowKey, RowSource};
18
19fn row_for_index_pk(
20 latest: &crate::db::LatestMap,
21 collection_id: u32,
22 pk_key: Vec<u8>,
23 index_name: &str,
24) -> Result<BTreeMap<String, RowValue>, DbError> {
25 latest
26 .get(&(collection_id, pk_key))
27 .cloned()
28 .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
29 collection_id,
30 index_name: index_name.to_string(),
31 }))
32}
33
34#[derive(Debug, Clone, PartialEq)]
35enum Plan {
36 IndexLookup {
37 collection_id: u32,
38 index_name: String,
39 kind: IndexKind,
40 key: Vec<u8>,
41 residual: Option<Predicate>,
42 limit: Option<usize>,
43 order_by: Option<OrderBy>,
44 },
45 CollectionScan {
46 collection_id: u32,
47 predicate: Option<Predicate>,
48 limit: Option<usize>,
49 order_by: Option<OrderBy>,
50 },
51}
52
53pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
54 let col =
55 catalog
56 .get(query.collection)
57 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
58 id: query.collection.0,
59 }))?;
60 let plan = plan_query(col.id, &col.indexes, query);
61 #[cfg(feature = "tracing")]
62 tracing::debug!(plan = ?plan, "explain_query");
63 Ok(match plan {
64 Plan::IndexLookup {
65 index_name,
66 kind,
67 residual,
68 limit,
69 order_by,
70 ..
71 } => {
72 let mut s = String::new();
73 s.push_str("Plan:\n");
74 s.push_str(&format!(
75 " IndexLookup index={index_name:?} kind={kind:?}\n"
76 ));
77 if let Some(r) = residual {
78 s.push_str(&format!(" ResidualFilter {r:?}\n"));
79 }
80 if let Some(n) = limit {
81 s.push_str(&format!(" Limit {n}\n"));
82 }
83 if let Some(ob) = order_by {
84 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
85 }
86 s
87 }
88 Plan::CollectionScan {
89 predicate,
90 limit,
91 order_by,
92 ..
93 } => {
94 let mut s = String::new();
95 s.push_str("Plan:\n");
96 s.push_str(" CollectionScan\n");
97 if let Some(p) = predicate {
98 s.push_str(&format!(" Filter {p:?}\n"));
99 }
100 if let Some(n) = limit {
101 s.push_str(&format!(" Limit {n}\n"));
102 }
103 if let Some(ob) = order_by {
104 s.push_str(&format!(" OrderBy {:?} {:?}\n", ob.path, ob.direction));
105 }
106 s
107 }
108 })
109}
110
111pub fn execute_query(
112 catalog: &Catalog,
113 indexes: &IndexState,
114 latest: &crate::db::LatestMap,
115 query: &Query,
116) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
117 let col =
118 catalog
119 .get(query.collection)
120 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
121 id: query.collection.0,
122 }))?;
123 let plan = plan_query(col.id, &col.indexes, query);
124
125 #[cfg(feature = "tracing")]
126 tracing::debug!(plan = ?plan, "execute_query");
127
128 match plan {
129 Plan::IndexLookup {
130 collection_id,
131 index_name,
132 kind,
133 key,
134 residual,
135 limit,
136 order_by,
137 } => {
138 let mut out = Vec::new();
139
140 match kind {
141 IndexKind::Unique => {
142 if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
143 out.push(row_for_index_pk(
144 latest,
145 collection_id,
146 pk.to_vec(),
147 &index_name,
148 )?);
149 }
150 }
151 IndexKind::NonUnique => {
152 if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
153 for pk in pks {
154 out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
155 }
156 }
157 }
158 }
159
160 if let Some(pred) = residual {
161 out.retain(|row| eval_predicate(row, &pred));
162 }
163 apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
164 Ok(out)
165 }
166 Plan::CollectionScan {
167 collection_id,
168 predicate,
169 limit,
170 order_by,
171 } => {
172 let mut out = Vec::new();
173 for ((cid, _pk), row) in latest.iter() {
174 if *cid != collection_id {
175 continue;
176 }
177 if let Some(ref p) = predicate {
178 if !eval_predicate(row, p) {
179 continue;
180 }
181 }
182 out.push(row.clone());
183 }
184 apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
185 Ok(out)
186 }
187 }
188}
189
190pub struct QueryRowIter<'a> {
196 state: QueryRowIterState<'a>,
197}
198
199enum QueryRowIterState<'a> {
200 Vec {
201 rows: Vec<BTreeMap<String, RowValue>>,
202 pos: usize,
203 },
204 Source {
205 latest: &'a crate::db::LatestMap,
206 source: Box<dyn RowSource + 'a>,
207 },
208}
209
210impl<'a> Iterator for QueryRowIter<'a> {
211 type Item = Result<BTreeMap<String, RowValue>, DbError>;
212
213 fn next(&mut self) -> Option<Self::Item> {
214 match &mut self.state {
215 QueryRowIterState::Vec { rows, pos } => {
216 if *pos >= rows.len() {
217 None
218 } else {
219 let out = rows[*pos].clone();
220 *pos += 1;
221 Some(Ok(out))
222 }
223 }
224 QueryRowIterState::Source { latest, source } => match source.next_key() {
225 None => None,
226 Some(Err(e)) => Some(Err(e)),
227 Some(Ok((cid, pk_key))) => Some(row_for_index_pk(latest, cid.0, pk_key, "")),
228 },
229 }
230 }
231}
232
233struct IndexUniqueSource<'a> {
234 latest: &'a crate::db::LatestMap,
235 collection_id: u32,
236 index_name: String,
237 pk: Option<Vec<u8>>,
238 residual: Option<Predicate>,
239 done: bool,
240}
241
242impl RowSource for IndexUniqueSource<'_> {
243 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
244 if self.done {
245 return None;
246 }
247 self.done = true;
248 let pk_key = self.pk.take()?;
249 let row = match row_for_index_pk(
250 self.latest,
251 self.collection_id,
252 pk_key.clone(),
253 &self.index_name,
254 ) {
255 Ok(r) => r,
256 Err(e) => return Some(Err(e)),
257 };
258 if let Some(pred) = &self.residual {
259 if !eval_predicate(&row, pred) {
260 return None;
261 }
262 }
263 Some(Ok((CollectionId(self.collection_id), pk_key)))
264 }
265}
266
267struct IndexNonUniqueSource<'a> {
268 latest: &'a crate::db::LatestMap,
269 collection_id: u32,
270 index_name: String,
271 pks: std::vec::IntoIter<Vec<u8>>,
272 residual: Option<Predicate>,
273}
274
275impl RowSource for IndexNonUniqueSource<'_> {
276 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
277 for pk_key in self.pks.by_ref() {
278 let row = match row_for_index_pk(
279 self.latest,
280 self.collection_id,
281 pk_key.clone(),
282 &self.index_name,
283 ) {
284 Ok(r) => r,
285 Err(e) => return Some(Err(e)),
286 };
287 if let Some(pred) = &self.residual {
288 if !eval_predicate(&row, pred) {
289 continue;
290 }
291 }
292 return Some(Ok((CollectionId(self.collection_id), pk_key)));
293 }
294 None
295 }
296}
297
298struct ScanSource<'a> {
299 it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
300 collection_id: u32,
301 predicate: Option<Predicate>,
302}
303
304impl RowSource for ScanSource<'_> {
305 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
306 for (&(cid, ref pk_key), row) in self.it.by_ref() {
307 if cid != self.collection_id {
308 continue;
309 }
310 if let Some(p) = &self.predicate {
311 if !eval_predicate(row, p) {
312 continue;
313 }
314 }
315 return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
316 }
317 None
318 }
319}
320
321pub fn execute_query_iter<'a>(
323 catalog: &'a Catalog,
324 indexes: &'a IndexState,
325 latest: &'a crate::db::LatestMap,
326 query: &Query,
327) -> Result<QueryRowIter<'a>, DbError> {
328 if query.order_by.is_some() {
329 return Ok(QueryRowIter {
330 state: QueryRowIterState::Vec {
331 rows: execute_query(catalog, indexes, latest, query)?,
332 pos: 0,
333 },
334 });
335 }
336 let col =
337 catalog
338 .get(query.collection)
339 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
340 id: query.collection.0,
341 }))?;
342 let plan = plan_query(col.id, &col.indexes, query);
343 let mut source: Box<dyn RowSource + 'a> = match plan {
344 Plan::IndexLookup {
345 collection_id,
346 index_name,
347 kind,
348 key,
349 residual,
350 ..
351 } => match kind {
352 IndexKind::Unique => {
353 let pk = indexes
354 .unique_lookup(collection_id, &index_name, &key)
355 .map(|p| p.to_vec());
356 Box::new(IndexUniqueSource {
357 latest,
358 collection_id,
359 index_name,
360 pk,
361 residual,
362 done: false,
363 })
364 }
365 IndexKind::NonUnique => {
366 let pks = indexes
367 .non_unique_lookup(collection_id, &index_name, &key)
368 .unwrap_or_default()
369 .into_iter();
370 Box::new(IndexNonUniqueSource {
371 latest,
372 collection_id,
373 index_name,
374 pks,
375 residual,
376 })
377 }
378 },
379 Plan::CollectionScan {
380 collection_id,
381 predicate,
382 ..
383 } => Box::new(ScanSource {
384 it: latest.iter(),
385 collection_id,
386 predicate,
387 }),
388 };
389
390 if let Some(n) = query.limit {
391 source = Box::new(LimitOp::new(source, n));
392 }
393
394 Ok(QueryRowIter {
395 state: QueryRowIterState::Source { latest, source },
396 })
397}
398
399#[cfg(test)]
400type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
401#[cfg(test)]
402type SortedQuerySpillStoreOverrideHook =
403 Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
404
405#[cfg(test)]
406thread_local! {
407 static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
408 RefCell::new(None);
409
410 static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
411 RefCell::new(None);
412}
413
414#[cfg(test)]
416pub(crate) fn test_set_sorted_query_spill_store_open_hook(
417 hook: Option<SortedQuerySpillStoreOpenHook>,
418) {
419 QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
420 *c.borrow_mut() = hook;
421 });
422}
423
424#[cfg(test)]
426pub(crate) fn test_set_sorted_query_spill_store_override_hook(
427 hook: Option<SortedQuerySpillStoreOverrideHook>,
428) {
429 QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
430 *c.borrow_mut() = hook;
431 });
432}
433
434pub(crate) enum SortedQuerySpillStore {
435 File(FileStore),
436 #[cfg(test)]
437 FailLen,
438}
439
440impl Store for SortedQuerySpillStore {
441 fn len(&self) -> Result<u64, DbError> {
442 match self {
443 Self::File(f) => f.len(),
444 #[cfg(test)]
445 Self::FailLen => Err(DbError::Io(std::io::Error::other(
446 "sorted query spill store synthetic len() failure (test override)",
447 ))),
448 }
449 }
450
451 fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
452 match self {
453 Self::File(f) => f.read_exact_at(offset, buf),
454 #[cfg(test)]
455 Self::FailLen => Err(DbError::Io(std::io::Error::other(
456 "sorted query spill store synthetic read failure (test override)",
457 ))),
458 }
459 }
460
461 fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
462 match self {
463 Self::File(f) => f.write_all_at(offset, buf),
464 #[cfg(test)]
465 Self::FailLen => Err(DbError::Io(std::io::Error::other(
466 "sorted query spill store synthetic write failure (test override)",
467 ))),
468 }
469 }
470
471 fn sync(&mut self) -> Result<(), DbError> {
472 match self {
473 Self::File(f) => f.sync(),
474 #[cfg(test)]
475 Self::FailLen => Ok(()),
476 }
477 }
478
479 fn truncate(&mut self, len: u64) -> Result<(), DbError> {
480 match self {
481 Self::File(f) => f.truncate(len),
482 #[cfg(test)]
483 Self::FailLen => Ok(()),
484 }
485 }
486}
487
488fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
489 #[cfg(test)]
490 {
491 let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
492 let mut bm = c.borrow_mut();
493 bm.as_mut().map(|hook| hook(path))
494 });
495 if let Some(r) = overridden {
496 return r;
497 }
498
499 let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
500 let mut bm = c.borrow_mut();
501 bm.as_mut().map(|hook| hook(path))
502 });
503 if let Some(r) = hooked {
504 return r.map(SortedQuerySpillStore::File);
505 }
506 }
507 let _ = path;
508 let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
510 Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
511}
512
513pub fn execute_query_iter_with_spill_path<'a>(
518 catalog: &'a Catalog,
519 indexes: &'a IndexState,
520 latest: &'a crate::db::LatestMap,
521 q: &Query,
522 db_path: Option<&std::path::Path>,
523) -> Result<QueryRowIter<'a>, DbError> {
524 if q.order_by.is_none() {
525 return execute_query_iter(catalog, indexes, latest, q);
526 }
527 let order_by = q
528 .order_by
529 .clone()
530 .expect("order_by is Some when this function continues");
531
532 let Some(path) = db_path else {
534 return Ok(QueryRowIter {
535 state: QueryRowIterState::Vec {
536 rows: execute_query(catalog, indexes, latest, q)?,
537 pos: 0,
538 },
539 });
540 };
541
542 let col = catalog
543 .get(q.collection)
544 .ok_or(DbError::Schema(SchemaError::UnknownCollection {
545 id: q.collection.0,
546 }))?;
547 let plan = plan_query(col.id, &col.indexes, q);
548
549 let base: Box<dyn RowSource + 'a> = match plan.clone() {
550 Plan::IndexLookup {
551 collection_id,
552 index_name,
553 kind,
554 key,
555 residual,
556 ..
557 } => match kind {
558 IndexKind::Unique => Box::new(IndexUniqueSource {
559 latest,
560 collection_id,
561 index_name: index_name.clone(),
562 pk: indexes
563 .unique_lookup(collection_id, &index_name, &key)
564 .map(|p| p.to_vec()),
565 residual,
566 done: false,
567 }),
568 IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
569 latest,
570 collection_id,
571 index_name: index_name.clone(),
572 pks: indexes
573 .non_unique_lookup(collection_id, &index_name, &key)
574 .unwrap_or_default()
575 .into_iter(),
576 residual,
577 }),
578 },
579 Plan::CollectionScan {
580 collection_id,
581 predicate,
582 ..
583 } => Box::new(ScanSource {
584 it: latest.iter(),
585 collection_id,
586 predicate,
587 }),
588 };
589
590 let spill_store = open_sorted_query_spill_store(path)?;
592 #[cfg(feature = "tracing")]
593 tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
594 let spill = crate::spill::TempSpillFile::new(spill_store)?;
595 let index_name_for_sort = match &plan {
596 Plan::IndexLookup { index_name, .. } => index_name.as_str(),
597 Plan::CollectionScan { .. } => "",
598 };
599 let sort_source = Box::new(ExternalSortSource::new(
600 spill,
601 latest,
602 base,
603 col.id.0,
604 order_by,
605 index_name_for_sort,
606 )?);
607
608 let mut source: Box<dyn RowSource + 'a> = sort_source;
609 if let Some(n) = q.limit {
610 source = Box::new(LimitOp::new(source, n));
611 }
612
613 Ok(QueryRowIter {
614 state: QueryRowIterState::Source { latest, source },
615 })
616}
617
618#[derive(Clone)]
619struct SortItem {
620 none_flag: u8,
622 sort_key: Vec<u8>,
623 key: RowKey,
624}
625
626#[allow(dead_code)]
627fn sort_item_for(
628 latest: &crate::db::LatestMap,
629 key: &RowKey,
630 order_by: &OrderBy,
631) -> Option<SortItem> {
632 sort_item_for_result(latest, key, order_by, "").ok()
633}
634
635fn sort_item_for_result(
636 latest: &crate::db::LatestMap,
637 key: &RowKey,
638 order_by: &OrderBy,
639 index_name: &str,
640) -> Result<SortItem, DbError> {
641 let (cid, pk) = key;
642 let row =
643 latest
644 .get(&(cid.0, pk.clone()))
645 .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
646 collection_id: cid.0,
647 index_name: index_name.to_string(),
648 }))?;
649 let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
650 None => (1u8, Vec::new()),
651 Some(s) => (0u8, scalar_sort_key_bytes(&s)),
652 };
653 Ok(SortItem {
654 none_flag,
655 sort_key,
656 key: (CollectionId(cid.0), pk.clone()),
657 })
658}
659
660fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
661 match s {
662 ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
663 ScalarValue::Int64(v) => {
664 let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
665 let mut out = vec![1];
666 out.extend_from_slice(&u.to_be_bytes());
667 out
668 }
669 ScalarValue::Uint64(v) => {
670 let mut out = vec![2];
671 out.extend_from_slice(&v.to_be_bytes());
672 out
673 }
674 ScalarValue::Float64(v) => {
675 let mut bits = v.to_bits();
676 if bits & (1u64 << 63) != 0 {
677 bits = !bits;
678 } else {
679 bits ^= 1u64 << 63;
680 }
681 let mut out = vec![3];
682 out.extend_from_slice(&bits.to_be_bytes());
683 out
684 }
685 ScalarValue::String(st) => {
686 let mut out = vec![4];
687 out.extend_from_slice(st.as_bytes());
688 out
689 }
690 ScalarValue::Bytes(b) => {
691 let mut out = vec![5];
692 out.extend_from_slice(b);
693 out
694 }
695 ScalarValue::Uuid(u) => {
696 let mut out = vec![6];
697 out.extend_from_slice(u);
698 out
699 }
700 ScalarValue::Timestamp(t) => {
701 let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
702 let mut out = vec![7];
703 out.extend_from_slice(&u.to_be_bytes());
704 out
705 }
706 }
707}
708
709fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
710 let ord = a
711 .none_flag
712 .cmp(&b.none_flag)
713 .then_with(|| a.sort_key.cmp(&b.sort_key))
714 .then_with(|| a.key.1.cmp(&b.key.1));
715 match dir {
716 OrderDirection::Asc => ord,
717 OrderDirection::Desc => ord.reverse(),
718 }
719}
720
721struct ExternalSortSource<'a, S: Store = FileStore> {
724 _spill: crate::spill::TempSpillFile<S>,
725 collection_id: u32,
726 dir: OrderDirection,
727 heap: std::collections::BinaryHeap<HeapItem>,
728 runs: Vec<RunReader>,
729 _latest: &'a crate::db::LatestMap,
730}
731
732#[derive(Clone)]
733struct RunMeta {
734 offset: u64,
735 payload_len: u64,
736}
737
738struct RunReader {
739 buf: Vec<u8>,
740 pos: usize,
741}
742
743impl RunReader {
744 fn new(buf: Vec<u8>) -> Self {
745 Self { buf, pos: 0 }
746 }
747
748 fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
749 fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
750 let b = buf.get(*pos..*pos + 4)?;
751 *pos += 4;
752 Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
753 }
754 let none_flag = *self.buf.get(self.pos)?;
755 self.pos += 1;
756 let key_len = read_u32(&self.buf, &mut self.pos)? as usize;
757 let key = self.buf.get(self.pos..self.pos + key_len)?.to_vec();
758 self.pos += key_len;
759 let pk_len = read_u32(&self.buf, &mut self.pos)? as usize;
760 let pk = self.buf.get(self.pos..self.pos + pk_len)?.to_vec();
761 self.pos += pk_len;
762 Some((none_flag, key, pk))
763 }
764}
765
766#[derive(Clone)]
767struct HeapItem {
768 run_idx: usize,
769 none_flag: u8,
770 sort_key: Vec<u8>,
771 pk: Vec<u8>,
772 dir: OrderDirection,
773}
774
775impl PartialEq for HeapItem {
776 fn eq(&self, other: &Self) -> bool {
777 (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
778 }
779}
780impl Eq for HeapItem {}
781
782impl PartialOrd for HeapItem {
783 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
784 Some(self.cmp(other))
785 }
786}
787impl Ord for HeapItem {
788 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
789 let a = SortItem {
791 none_flag: self.none_flag,
792 sort_key: self.sort_key.clone(),
793 key: (CollectionId(0), self.pk.clone()),
794 };
795 let b = SortItem {
796 none_flag: other.none_flag,
797 sort_key: other.sort_key.clone(),
798 key: (CollectionId(0), other.pk.clone()),
799 };
800 cmp_sort_item(&a, &b, self.dir).reverse()
801 }
802}
803
804impl<'a, S: Store> ExternalSortSource<'a, S> {
805 fn flush_sorted_run(
806 spill: &mut crate::spill::TempSpillFile<S>,
807 runs_meta: &mut Vec<RunMeta>,
808 run: &mut Vec<SortItem>,
809 dir: OrderDirection,
810 ) -> Result<(), DbError> {
811 if run.is_empty() {
812 return Ok(());
813 }
814 run.sort_by(|a, b| cmp_sort_item(a, b, dir));
815 let payload = encode_run(run, dir);
816 let off = spill.append_temp_segment(&payload)?;
817 runs_meta.push(RunMeta {
818 offset: off,
819 payload_len: payload.len() as u64,
820 });
821 run.clear();
822 Ok(())
823 }
824
825 fn new(
826 mut spill: crate::spill::TempSpillFile<S>,
827 latest: &'a crate::db::LatestMap,
828 mut input: Box<dyn RowSource + 'a>,
829 collection_id: u32,
830 order_by: OrderBy,
831 index_name: &str,
832 ) -> Result<Self, DbError> {
833 const RUN_KEYS: usize = 2048;
834
835 let dir = order_by.direction;
836 let mut runs_meta: Vec<RunMeta> = Vec::new();
837 let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
838
839 while let Some(rk) = input.next_key() {
840 let rk = rk?;
841 let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
842 run.push(item);
843 if run.len() >= RUN_KEYS {
844 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
845 }
846 }
847
848 Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
849
850 let mut runs: Vec<RunReader> = Vec::new();
852 let mut heap = std::collections::BinaryHeap::new();
853 for (i, m) in runs_meta.into_iter().enumerate() {
854 let buf = spill.read_temp_payload(m.offset, m.payload_len)?;
855 let mut rr = RunReader::new(buf);
856 if let Some((none_flag, sort_key, pk)) = rr.next_item() {
857 heap.push(HeapItem {
858 run_idx: i,
859 none_flag,
860 sort_key,
861 pk: pk.clone(),
862 dir,
863 });
864 }
865 runs.push(rr);
866 }
867
868 Ok(Self {
869 _spill: spill,
870 collection_id,
871 dir,
872 heap,
873 runs,
874 _latest: latest,
875 })
876 }
877}
878
879fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
880 let mut out = Vec::new();
881 for it in run {
882 out.push(it.none_flag);
883 out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
884 out.extend_from_slice(&it.sort_key);
885 out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
886 out.extend_from_slice(&it.key.1);
887 }
888 out
889}
890
891impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
892 fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
893 let top = self.heap.pop()?;
894 let run_idx = top.run_idx;
895 if let Some((none_flag, sort_key, pk)) = self.runs[run_idx].next_item() {
897 self.heap.push(HeapItem {
898 run_idx,
899 none_flag,
900 sort_key,
901 pk: pk.clone(),
902 dir: self.dir,
903 });
904 }
905 Some(Ok((CollectionId(self.collection_id), top.pk)))
906 }
907}
908
909fn plan_query(
910 collection: CollectionId,
911 indexes: &[crate::schema::IndexDef],
912 query: &Query,
913) -> Plan {
914 let Some(pred) = query.predicate.clone() else {
915 return Plan::CollectionScan {
916 collection_id: collection.0,
917 predicate: None,
918 limit: query.limit,
919 order_by: query.order_by.clone(),
920 };
921 };
922
923 let (best, residual) = match choose_index(indexes, &pred) {
924 None => (None, Some(pred)),
925 Some((idx, value, used_pred)) => {
926 let residual = remove_used_predicate(pred, used_pred);
927 (Some((idx, value)), residual)
928 }
929 };
930
931 if let Some((idx, value)) = best {
932 Plan::IndexLookup {
933 collection_id: collection.0,
934 index_name: idx.name.clone(),
935 kind: idx.kind,
936 key: value.canonical_key_bytes(),
937 residual,
938 limit: query.limit,
939 order_by: query.order_by.clone(),
940 }
941 } else {
942 Plan::CollectionScan {
943 collection_id: collection.0,
944 predicate: residual,
945 limit: query.limit,
946 order_by: query.order_by.clone(),
947 }
948 }
949}
950
951fn choose_index<'a>(
952 indexes: &'a [crate::schema::IndexDef],
953 pred: &Predicate,
954) -> Option<(&'a crate::schema::IndexDef, ScalarValue, Predicate)> {
955 match pred {
956 Predicate::Eq { path, value } => indexes
957 .iter()
958 .find(|idx| &idx.path == path)
959 .map(|idx| (idx, value.clone(), pred.clone())),
960 Predicate::Lt { .. }
961 | Predicate::Lte { .. }
962 | Predicate::Gt { .. }
963 | Predicate::Gte { .. }
964 | Predicate::Or(_) => None,
965 Predicate::And(items) => {
966 let mut best: Option<(&crate::schema::IndexDef, ScalarValue, Predicate)> = None;
968 for p in items {
969 if let Some((idx, v, used)) = choose_index(indexes, p) {
970 match best {
971 None => best = Some((idx, v, used)),
972 Some((best_idx, _, _)) => {
973 if best_idx.kind != IndexKind::Unique && idx.kind == IndexKind::Unique {
974 best = Some((idx, v, used));
975 }
976 }
977 }
978 }
979 }
980 best
981 }
982 }
983}
984
985fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
986 if pred == used {
987 return None;
988 }
989 match pred {
990 Predicate::And(items) => {
991 let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
992 match out.len() {
993 0 => None,
994 1 => Some(out.remove(0)),
995 _ => Some(Predicate::And(out)),
996 }
997 }
998 _ => Some(pred),
999 }
1000}
1001
1002fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
1003 match pred {
1004 Predicate::Eq { path, value } => scalar_at_path(row, path)
1005 .map(|s| &s == value)
1006 .unwrap_or(false),
1007 Predicate::Lt { path, value } => scalar_at_path(row, path)
1008 .and_then(|s| scalar_partial_cmp(&s, value))
1009 .map(|o| o.is_lt())
1010 .unwrap_or(false),
1011 Predicate::Lte { path, value } => scalar_at_path(row, path)
1012 .and_then(|s| scalar_partial_cmp(&s, value))
1013 .map(|o| o.is_lt() || o.is_eq())
1014 .unwrap_or(false),
1015 Predicate::Gt { path, value } => scalar_at_path(row, path)
1016 .and_then(|s| scalar_partial_cmp(&s, value))
1017 .map(|o| o.is_gt())
1018 .unwrap_or(false),
1019 Predicate::Gte { path, value } => scalar_at_path(row, path)
1020 .and_then(|s| scalar_partial_cmp(&s, value))
1021 .map(|o| o.is_gt() || o.is_eq())
1022 .unwrap_or(false),
1023 Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1024 Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1025 }
1026}
1027
1028fn apply_order_by_and_limit(
1029 rows: &mut Vec<BTreeMap<String, RowValue>>,
1030 order_by: Option<&OrderBy>,
1031 limit: Option<usize>,
1032) {
1033 if let Some(ob) = order_by {
1034 rows.sort_by(|a, b| {
1035 let av = scalar_at_path(a, &ob.path);
1036 let bv = scalar_at_path(b, &ob.path);
1037 let ord = match (av, bv) {
1038 (None, None) => std::cmp::Ordering::Equal,
1039 (None, Some(_)) => std::cmp::Ordering::Greater,
1040 (Some(_), None) => std::cmp::Ordering::Less,
1041 (Some(x), Some(y)) => {
1042 scalar_partial_cmp(&x, &y).unwrap_or(std::cmp::Ordering::Equal)
1043 }
1044 };
1045 match ob.direction {
1046 OrderDirection::Asc => ord,
1047 OrderDirection::Desc => ord.reverse(),
1048 }
1049 });
1050 }
1051 if let Some(n) = limit {
1052 rows.truncate(n);
1053 }
1054}
1055
1056fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1057 use ScalarValue::*;
1058 match (a, b) {
1059 (Bool(x), Bool(y)) => Some(x.cmp(y)),
1060 (Int64(x), Int64(y)) => Some(x.cmp(y)),
1061 (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1062 (Float64(x), Float64(y)) => x.partial_cmp(y),
1063 (String(x), String(y)) => Some(x.cmp(y)),
1064 (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1065 (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1066 (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1067 _ => None,
1068 }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073 include!(concat!(
1074 env!("CARGO_MANIFEST_DIR"),
1075 "/tests/unit/src_query_planner_tests.rs"
1076 ));
1077}