Skip to main content

modelvault_core/query/
planner.rs

1#[cfg(test)]
2use std::cell::RefCell;
3use std::collections::hash_map::Iter as HashMapIter;
4use std::collections::BTreeMap;
5
6use crate::catalog::Catalog;
7use crate::db::scalar_at_path;
8use crate::error::{DbError, SchemaError};
9use crate::index::IndexState;
10use crate::record::RowValue;
11use crate::schema::{CollectionId, IndexKind};
12use crate::storage::{FileStore, Store};
13use crate::ScalarValue;
14
15use super::ast::{OrderBy, OrderDirection};
16use super::ast::{Predicate, Query};
17use super::operators::{LimitOp, RowKey, RowSource};
18
19fn row_for_index_pk(
20    latest: &crate::db::LatestMap,
21    collection_id: u32,
22    pk_key: Vec<u8>,
23    index_name: &str,
24) -> Result<BTreeMap<String, RowValue>, DbError> {
25    latest
26        .get(&(collection_id, pk_key))
27        .cloned()
28        .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
29            collection_id,
30            index_name: index_name.to_string(),
31        }))
32}
33
34#[derive(Debug, Clone, PartialEq)]
35enum Plan {
36    IndexLookup {
37        collection_id: u32,
38        index_name: String,
39        kind: IndexKind,
40        key: Vec<u8>,
41        residual: Option<Predicate>,
42        limit: Option<usize>,
43        order_by: Option<OrderBy>,
44    },
45    CollectionScan {
46        collection_id: u32,
47        predicate: Option<Predicate>,
48        limit: Option<usize>,
49        order_by: Option<OrderBy>,
50    },
51}
52
53pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
54    let col =
55        catalog
56            .get(query.collection)
57            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
58                id: query.collection.0,
59            }))?;
60    let plan = plan_query(col.id, &col.indexes, query);
61    #[cfg(feature = "tracing")]
62    tracing::debug!(plan = ?plan, "explain_query");
63    Ok(match plan {
64        Plan::IndexLookup {
65            index_name,
66            kind,
67            residual,
68            limit,
69            order_by,
70            ..
71        } => {
72            let mut s = String::new();
73            s.push_str("Plan:\n");
74            s.push_str(&format!(
75                "  IndexLookup index={index_name:?} kind={kind:?}\n"
76            ));
77            if let Some(r) = residual {
78                s.push_str(&format!("  ResidualFilter {r:?}\n"));
79            }
80            if let Some(n) = limit {
81                s.push_str(&format!("  Limit {n}\n"));
82            }
83            if let Some(ob) = order_by {
84                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
85            }
86            s
87        }
88        Plan::CollectionScan {
89            predicate,
90            limit,
91            order_by,
92            ..
93        } => {
94            let mut s = String::new();
95            s.push_str("Plan:\n");
96            s.push_str("  CollectionScan\n");
97            if let Some(p) = predicate {
98                s.push_str(&format!("  Filter {p:?}\n"));
99            }
100            if let Some(n) = limit {
101                s.push_str(&format!("  Limit {n}\n"));
102            }
103            if let Some(ob) = order_by {
104                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
105            }
106            s
107        }
108    })
109}
110
111pub fn execute_query(
112    catalog: &Catalog,
113    indexes: &IndexState,
114    latest: &crate::db::LatestMap,
115    query: &Query,
116) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
117    let col =
118        catalog
119            .get(query.collection)
120            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
121                id: query.collection.0,
122            }))?;
123    let plan = plan_query(col.id, &col.indexes, query);
124
125    #[cfg(feature = "tracing")]
126    tracing::debug!(plan = ?plan, "execute_query");
127
128    match plan {
129        Plan::IndexLookup {
130            collection_id,
131            index_name,
132            kind,
133            key,
134            residual,
135            limit,
136            order_by,
137        } => {
138            let mut out = Vec::new();
139
140            match kind {
141                IndexKind::Unique => {
142                    if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
143                        out.push(row_for_index_pk(
144                            latest,
145                            collection_id,
146                            pk.to_vec(),
147                            &index_name,
148                        )?);
149                    }
150                }
151                IndexKind::NonUnique => {
152                    if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
153                        for pk in pks {
154                            out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
155                        }
156                    }
157                }
158            }
159
160            if let Some(pred) = residual {
161                out.retain(|row| eval_predicate(row, &pred));
162            }
163            apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
164            Ok(out)
165        }
166        Plan::CollectionScan {
167            collection_id,
168            predicate,
169            limit,
170            order_by,
171        } => {
172            let mut out = Vec::new();
173            for ((cid, _pk), row) in latest.iter() {
174                if *cid != collection_id {
175                    continue;
176                }
177                if let Some(ref p) = predicate {
178                    if !eval_predicate(row, p) {
179                        continue;
180                    }
181                }
182                out.push(row.clone());
183            }
184            apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
185            Ok(out)
186        }
187    }
188}
189
190/// Pull-based row iterator for simple queries (0.7 execution boundary).
191///
192/// This is **not** a full Volcano-style operator engine yet (no joins / async), but it does
193/// establish an internal streaming operator boundary by yielding `(collection_id, pk_key)` and
194/// materializing rows from `latest` at the edge.
195pub struct QueryRowIter<'a> {
196    state: QueryRowIterState<'a>,
197}
198
199enum QueryRowIterState<'a> {
200    Vec {
201        rows: Vec<BTreeMap<String, RowValue>>,
202        pos: usize,
203    },
204    Source {
205        latest: &'a crate::db::LatestMap,
206        source: Box<dyn RowSource + 'a>,
207    },
208}
209
210impl<'a> Iterator for QueryRowIter<'a> {
211    type Item = Result<BTreeMap<String, RowValue>, DbError>;
212
213    fn next(&mut self) -> Option<Self::Item> {
214        match &mut self.state {
215            QueryRowIterState::Vec { rows, pos } => {
216                if *pos >= rows.len() {
217                    None
218                } else {
219                    let out = rows[*pos].clone();
220                    *pos += 1;
221                    Some(Ok(out))
222                }
223            }
224            QueryRowIterState::Source { latest, source } => loop {
225                let rk = source.next_key()?;
226                match rk {
227                    Err(e) => return Some(Err(e)),
228                    Ok((cid, pk_key)) => {
229                        if let Some(row) = latest.get(&(cid.0, pk_key)).cloned() {
230                            return Some(Ok(row));
231                        }
232                        continue;
233                    }
234                }
235            },
236        }
237    }
238}
239
240struct IndexUniqueSource<'a> {
241    latest: &'a crate::db::LatestMap,
242    collection_id: u32,
243    index_name: String,
244    pk: Option<Vec<u8>>,
245    residual: Option<Predicate>,
246    done: bool,
247}
248
249impl RowSource for IndexUniqueSource<'_> {
250    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
251        if self.done {
252            return None;
253        }
254        self.done = true;
255        let pk_key = self.pk.take()?;
256        let row = match row_for_index_pk(
257            self.latest,
258            self.collection_id,
259            pk_key.clone(),
260            &self.index_name,
261        ) {
262            Ok(r) => r,
263            Err(e) => return Some(Err(e)),
264        };
265        if let Some(pred) = &self.residual {
266            if !eval_predicate(&row, pred) {
267                return None;
268            }
269        }
270        Some(Ok((CollectionId(self.collection_id), pk_key)))
271    }
272}
273
274struct IndexNonUniqueSource<'a> {
275    latest: &'a crate::db::LatestMap,
276    collection_id: u32,
277    index_name: String,
278    pks: std::vec::IntoIter<Vec<u8>>,
279    residual: Option<Predicate>,
280}
281
282impl RowSource for IndexNonUniqueSource<'_> {
283    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
284        for pk_key in self.pks.by_ref() {
285            let row = match row_for_index_pk(
286                self.latest,
287                self.collection_id,
288                pk_key.clone(),
289                &self.index_name,
290            ) {
291                Ok(r) => r,
292                Err(e) => return Some(Err(e)),
293            };
294            if let Some(pred) = &self.residual {
295                if !eval_predicate(&row, pred) {
296                    continue;
297                }
298            }
299            return Some(Ok((CollectionId(self.collection_id), pk_key)));
300        }
301        None
302    }
303}
304
305struct ScanSource<'a> {
306    it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
307    collection_id: u32,
308    predicate: Option<Predicate>,
309}
310
311impl RowSource for ScanSource<'_> {
312    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
313        for (&(cid, ref pk_key), row) in self.it.by_ref() {
314            if cid != self.collection_id {
315                continue;
316            }
317            if let Some(p) = &self.predicate {
318                if !eval_predicate(row, p) {
319                    continue;
320                }
321            }
322            return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
323        }
324        None
325    }
326}
327
328/// Same planning and row sources as [`execute_query`], but as a lazy iterator.
329pub fn execute_query_iter<'a>(
330    catalog: &'a Catalog,
331    indexes: &'a IndexState,
332    latest: &'a crate::db::LatestMap,
333    query: &Query,
334) -> Result<QueryRowIter<'a>, DbError> {
335    if query.order_by.is_some() {
336        return Ok(QueryRowIter {
337            state: QueryRowIterState::Vec {
338                rows: execute_query(catalog, indexes, latest, query)?,
339                pos: 0,
340            },
341        });
342    }
343    let col =
344        catalog
345            .get(query.collection)
346            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
347                id: query.collection.0,
348            }))?;
349    let plan = plan_query(col.id, &col.indexes, query);
350    let mut source: Box<dyn RowSource + 'a> = match plan {
351        Plan::IndexLookup {
352            collection_id,
353            index_name,
354            kind,
355            key,
356            residual,
357            ..
358        } => match kind {
359            IndexKind::Unique => {
360                let pk = indexes
361                    .unique_lookup(collection_id, &index_name, &key)
362                    .map(|p| p.to_vec());
363                Box::new(IndexUniqueSource {
364                    latest,
365                    collection_id,
366                    index_name,
367                    pk,
368                    residual,
369                    done: false,
370                })
371            }
372            IndexKind::NonUnique => {
373                let pks = indexes
374                    .non_unique_lookup(collection_id, &index_name, &key)
375                    .unwrap_or_default()
376                    .into_iter();
377                Box::new(IndexNonUniqueSource {
378                    latest,
379                    collection_id,
380                    index_name,
381                    pks,
382                    residual,
383                })
384            }
385        },
386        Plan::CollectionScan {
387            collection_id,
388            predicate,
389            ..
390        } => Box::new(ScanSource {
391            it: latest.iter(),
392            collection_id,
393            predicate,
394        }),
395    };
396
397    if let Some(n) = query.limit {
398        source = Box::new(LimitOp::new(source, n));
399    }
400
401    Ok(QueryRowIter {
402        state: QueryRowIterState::Source { latest, source },
403    })
404}
405
406#[cfg(test)]
407type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
408#[cfg(test)]
409type SortedQuerySpillStoreOverrideHook =
410    Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
411
412#[cfg(test)]
413thread_local! {
414    static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
415        RefCell::new(None);
416
417    static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
418        RefCell::new(None);
419}
420
421/// Covers sorted-query spill `FileStore` construction error paths during unit tests only.
422#[cfg(test)]
423pub(crate) fn test_set_sorted_query_spill_store_open_hook(
424    hook: Option<SortedQuerySpillStoreOpenHook>,
425) {
426    QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
427        *c.borrow_mut() = hook;
428    });
429}
430
431/// Test-only: override the underlying spill store implementation.
432#[cfg(test)]
433pub(crate) fn test_set_sorted_query_spill_store_override_hook(
434    hook: Option<SortedQuerySpillStoreOverrideHook>,
435) {
436    QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
437        *c.borrow_mut() = hook;
438    });
439}
440
441pub(crate) enum SortedQuerySpillStore {
442    File(FileStore),
443    #[cfg(test)]
444    FailLen,
445}
446
447impl Store for SortedQuerySpillStore {
448    fn len(&self) -> Result<u64, DbError> {
449        match self {
450            Self::File(f) => f.len(),
451            #[cfg(test)]
452            Self::FailLen => Err(DbError::Io(std::io::Error::other(
453                "sorted query spill store synthetic len() failure (test override)",
454            ))),
455        }
456    }
457
458    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
459        match self {
460            Self::File(f) => f.read_exact_at(offset, buf),
461            #[cfg(test)]
462            Self::FailLen => Err(DbError::Io(std::io::Error::other(
463                "sorted query spill store synthetic read failure (test override)",
464            ))),
465        }
466    }
467
468    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
469        match self {
470            Self::File(f) => f.write_all_at(offset, buf),
471            #[cfg(test)]
472            Self::FailLen => Err(DbError::Io(std::io::Error::other(
473                "sorted query spill store synthetic write failure (test override)",
474            ))),
475        }
476    }
477
478    fn sync(&mut self) -> Result<(), DbError> {
479        match self {
480            Self::File(f) => f.sync(),
481            #[cfg(test)]
482            Self::FailLen => Ok(()),
483        }
484    }
485
486    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
487        match self {
488            Self::File(f) => f.truncate(len),
489            #[cfg(test)]
490            Self::FailLen => Ok(()),
491        }
492    }
493}
494
495fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
496    #[cfg(test)]
497    {
498        let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
499            let mut bm = c.borrow_mut();
500            bm.as_mut().map(|hook| hook(path))
501        });
502        if let Some(r) = overridden {
503            return r;
504        }
505
506        let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
507            let mut bm = c.borrow_mut();
508            bm.as_mut().map(|hook| hook(path))
509        });
510        if let Some(r) = hooked {
511            return r.map(SortedQuerySpillStore::File);
512        }
513    }
514    let _ = path;
515    // Never spill into the live DB file: a second handle would race with truncate-on-drop.
516    let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
517    Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
518}
519
520/// Like [`execute_query_iter`], but when `q.order_by` is set this will attempt a bounded-memory
521/// external sort by spilling ephemeral `Temp` segments to a dedicated temporary file.
522///
523/// If `db_path` is `None` (e.g. in-memory), this falls back to the in-memory sort path.
524pub fn execute_query_iter_with_spill_path<'a>(
525    catalog: &'a Catalog,
526    indexes: &'a IndexState,
527    latest: &'a crate::db::LatestMap,
528    q: &Query,
529    db_path: Option<&std::path::Path>,
530) -> Result<QueryRowIter<'a>, DbError> {
531    if q.order_by.is_none() {
532        return execute_query_iter(catalog, indexes, latest, q);
533    }
534    let order_by = q
535        .order_by
536        .clone()
537        .expect("order_by is Some when this function continues");
538
539    // If we don't have a file path to spill into, fall back to the existing in-memory behavior.
540    let Some(path) = db_path else {
541        return Ok(QueryRowIter {
542            state: QueryRowIterState::Vec {
543                rows: execute_query(catalog, indexes, latest, q)?,
544                pos: 0,
545            },
546        });
547    };
548
549    let col = catalog
550        .get(q.collection)
551        .ok_or(DbError::Schema(SchemaError::UnknownCollection {
552            id: q.collection.0,
553        }))?;
554    let plan = plan_query(col.id, &col.indexes, q);
555
556    let base: Box<dyn RowSource + 'a> = match plan.clone() {
557        Plan::IndexLookup {
558            collection_id,
559            index_name,
560            kind,
561            key,
562            residual,
563            ..
564        } => match kind {
565            IndexKind::Unique => Box::new(IndexUniqueSource {
566                latest,
567                collection_id,
568                index_name: index_name.clone(),
569                pk: indexes
570                    .unique_lookup(collection_id, &index_name, &key)
571                    .map(|p| p.to_vec()),
572                residual,
573                done: false,
574            }),
575            IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
576                latest,
577                collection_id,
578                index_name: index_name.clone(),
579                pks: indexes
580                    .non_unique_lookup(collection_id, &index_name, &key)
581                    .unwrap_or_default()
582                    .into_iter(),
583                residual,
584            }),
585        },
586        Plan::CollectionScan {
587            collection_id,
588            predicate,
589            ..
590        } => Box::new(ScanSource {
591            it: latest.iter(),
592            collection_id,
593            predicate,
594        }),
595    };
596
597    // Build a sorted key source (potentially spilling to Temp segments).
598    let spill_store = open_sorted_query_spill_store(path)?;
599    #[cfg(feature = "tracing")]
600    tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
601    let spill = crate::spill::TempSpillFile::new(spill_store)?;
602    let sort_source = Box::new(ExternalSortSource::new(
603        spill, latest, base, col.id.0, order_by,
604    )?);
605
606    let mut source: Box<dyn RowSource + 'a> = sort_source;
607    if let Some(n) = q.limit {
608        source = Box::new(LimitOp::new(source, n));
609    }
610
611    Ok(QueryRowIter {
612        state: QueryRowIterState::Source { latest, source },
613    })
614}
615
616#[derive(Clone)]
617struct SortItem {
618    // `none_flag`: 0 for Some, 1 for None (so None sorts last on ascending).
619    none_flag: u8,
620    sort_key: Vec<u8>,
621    key: RowKey,
622}
623
624fn sort_item_for(
625    latest: &crate::db::LatestMap,
626    key: &RowKey,
627    order_by: &OrderBy,
628) -> Option<SortItem> {
629    let (cid, pk) = key;
630    let row = latest.get(&(cid.0, pk.clone()))?;
631    let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
632        None => (1u8, Vec::new()),
633        Some(s) => (0u8, scalar_sort_key_bytes(&s)),
634    };
635    Some(SortItem {
636        none_flag,
637        sort_key,
638        key: (CollectionId(cid.0), pk.clone()),
639    })
640}
641
642fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
643    match s {
644        ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
645        ScalarValue::Int64(v) => {
646            let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
647            let mut out = vec![1];
648            out.extend_from_slice(&u.to_be_bytes());
649            out
650        }
651        ScalarValue::Uint64(v) => {
652            let mut out = vec![2];
653            out.extend_from_slice(&v.to_be_bytes());
654            out
655        }
656        ScalarValue::Float64(v) => {
657            let mut bits = v.to_bits();
658            if bits & (1u64 << 63) != 0 {
659                bits = !bits;
660            } else {
661                bits ^= 1u64 << 63;
662            }
663            let mut out = vec![3];
664            out.extend_from_slice(&bits.to_be_bytes());
665            out
666        }
667        ScalarValue::String(st) => {
668            let mut out = vec![4];
669            out.extend_from_slice(st.as_bytes());
670            out
671        }
672        ScalarValue::Bytes(b) => {
673            let mut out = vec![5];
674            out.extend_from_slice(b);
675            out
676        }
677        ScalarValue::Uuid(u) => {
678            let mut out = vec![6];
679            out.extend_from_slice(u);
680            out
681        }
682        ScalarValue::Timestamp(t) => {
683            let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
684            let mut out = vec![7];
685            out.extend_from_slice(&u.to_be_bytes());
686            out
687        }
688    }
689}
690
691fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
692    let ord = a
693        .none_flag
694        .cmp(&b.none_flag)
695        .then_with(|| a.sort_key.cmp(&b.sort_key))
696        .then_with(|| a.key.1.cmp(&b.key.1));
697    match dir {
698        OrderDirection::Asc => ord,
699        OrderDirection::Desc => ord.reverse(),
700    }
701}
702
703// Simple external sort: sort fixed-size runs, spill each run as one Temp segment,
704// then k-way merge those runs.
705struct ExternalSortSource<'a, S: Store = FileStore> {
706    _spill: crate::spill::TempSpillFile<S>,
707    collection_id: u32,
708    dir: OrderDirection,
709    heap: std::collections::BinaryHeap<HeapItem>,
710    runs: Vec<RunReader>,
711    _latest: &'a crate::db::LatestMap,
712}
713
714#[derive(Clone)]
715struct RunMeta {
716    offset: u64,
717    payload_len: u64,
718}
719
720struct RunReader {
721    buf: Vec<u8>,
722    pos: usize,
723}
724
725impl RunReader {
726    fn new(buf: Vec<u8>) -> Self {
727        Self { buf, pos: 0 }
728    }
729
730    fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
731        fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
732            let b = buf.get(*pos..*pos + 4)?;
733            *pos += 4;
734            Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
735        }
736        let none_flag = *self.buf.get(self.pos)?;
737        self.pos += 1;
738        let key_len = read_u32(&self.buf, &mut self.pos)? as usize;
739        let key = self.buf.get(self.pos..self.pos + key_len)?.to_vec();
740        self.pos += key_len;
741        let pk_len = read_u32(&self.buf, &mut self.pos)? as usize;
742        let pk = self.buf.get(self.pos..self.pos + pk_len)?.to_vec();
743        self.pos += pk_len;
744        Some((none_flag, key, pk))
745    }
746}
747
748#[derive(Clone)]
749struct HeapItem {
750    run_idx: usize,
751    none_flag: u8,
752    sort_key: Vec<u8>,
753    pk: Vec<u8>,
754    dir: OrderDirection,
755}
756
757impl PartialEq for HeapItem {
758    fn eq(&self, other: &Self) -> bool {
759        (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
760    }
761}
762impl Eq for HeapItem {}
763
764impl PartialOrd for HeapItem {
765    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
766        Some(self.cmp(other))
767    }
768}
769impl Ord for HeapItem {
770    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
771        // BinaryHeap is max-heap; invert to get min-heap behavior.
772        let a = SortItem {
773            none_flag: self.none_flag,
774            sort_key: self.sort_key.clone(),
775            key: (CollectionId(0), self.pk.clone()),
776        };
777        let b = SortItem {
778            none_flag: other.none_flag,
779            sort_key: other.sort_key.clone(),
780            key: (CollectionId(0), other.pk.clone()),
781        };
782        cmp_sort_item(&a, &b, self.dir).reverse()
783    }
784}
785
786impl<'a, S: Store> ExternalSortSource<'a, S> {
787    fn flush_sorted_run(
788        spill: &mut crate::spill::TempSpillFile<S>,
789        runs_meta: &mut Vec<RunMeta>,
790        run: &mut Vec<SortItem>,
791        dir: OrderDirection,
792    ) -> Result<(), DbError> {
793        if run.is_empty() {
794            return Ok(());
795        }
796        run.sort_by(|a, b| cmp_sort_item(a, b, dir));
797        let payload = encode_run(run, dir);
798        let off = spill.append_temp_segment(&payload)?;
799        runs_meta.push(RunMeta {
800            offset: off,
801            payload_len: payload.len() as u64,
802        });
803        run.clear();
804        Ok(())
805    }
806
807    fn new(
808        mut spill: crate::spill::TempSpillFile<S>,
809        latest: &'a crate::db::LatestMap,
810        mut input: Box<dyn RowSource + 'a>,
811        collection_id: u32,
812        order_by: OrderBy,
813    ) -> Result<Self, DbError> {
814        const RUN_KEYS: usize = 2048;
815
816        let dir = order_by.direction;
817        let mut runs_meta: Vec<RunMeta> = Vec::new();
818        let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
819
820        while let Some(rk) = input.next_key() {
821            let rk = rk?;
822            if let Some(item) = sort_item_for(latest, &rk, &order_by) {
823                run.push(item);
824            }
825            if run.len() >= RUN_KEYS {
826                Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
827            }
828        }
829
830        Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
831
832        // Load run buffers and seed heap.
833        let mut runs: Vec<RunReader> = Vec::new();
834        let mut heap = std::collections::BinaryHeap::new();
835        for (i, m) in runs_meta.into_iter().enumerate() {
836            let buf = spill.read_temp_payload(m.offset, m.payload_len)?;
837            let mut rr = RunReader::new(buf);
838            if let Some((none_flag, sort_key, pk)) = rr.next_item() {
839                heap.push(HeapItem {
840                    run_idx: i,
841                    none_flag,
842                    sort_key,
843                    pk: pk.clone(),
844                    dir,
845                });
846            }
847            runs.push(rr);
848        }
849
850        Ok(Self {
851            _spill: spill,
852            collection_id,
853            dir,
854            heap,
855            runs,
856            _latest: latest,
857        })
858    }
859}
860
861fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
862    let mut out = Vec::new();
863    for it in run {
864        out.push(it.none_flag);
865        out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
866        out.extend_from_slice(&it.sort_key);
867        out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
868        out.extend_from_slice(&it.key.1);
869    }
870    out
871}
872
873impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
874    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
875        let top = self.heap.pop()?;
876        let run_idx = top.run_idx;
877        // refill from same run
878        if let Some((none_flag, sort_key, pk)) = self.runs[run_idx].next_item() {
879            self.heap.push(HeapItem {
880                run_idx,
881                none_flag,
882                sort_key,
883                pk: pk.clone(),
884                dir: self.dir,
885            });
886        }
887        Some(Ok((CollectionId(self.collection_id), top.pk)))
888    }
889}
890
891fn plan_query(
892    collection: CollectionId,
893    indexes: &[crate::schema::IndexDef],
894    query: &Query,
895) -> Plan {
896    let Some(pred) = query.predicate.clone() else {
897        return Plan::CollectionScan {
898            collection_id: collection.0,
899            predicate: None,
900            limit: query.limit,
901            order_by: query.order_by.clone(),
902        };
903    };
904
905    let (best, residual) = match choose_index(indexes, &pred) {
906        None => (None, Some(pred)),
907        Some((idx, value, used_pred)) => {
908            let residual = remove_used_predicate(pred, used_pred);
909            (Some((idx, value)), residual)
910        }
911    };
912
913    if let Some((idx, value)) = best {
914        Plan::IndexLookup {
915            collection_id: collection.0,
916            index_name: idx.name.clone(),
917            kind: idx.kind,
918            key: value.canonical_key_bytes(),
919            residual,
920            limit: query.limit,
921            order_by: query.order_by.clone(),
922        }
923    } else {
924        Plan::CollectionScan {
925            collection_id: collection.0,
926            predicate: residual,
927            limit: query.limit,
928            order_by: query.order_by.clone(),
929        }
930    }
931}
932
933fn choose_index<'a>(
934    indexes: &'a [crate::schema::IndexDef],
935    pred: &Predicate,
936) -> Option<(&'a crate::schema::IndexDef, ScalarValue, Predicate)> {
937    match pred {
938        Predicate::Eq { path, value } => indexes
939            .iter()
940            .find(|idx| &idx.path == path)
941            .map(|idx| (idx, value.clone(), pred.clone())),
942        Predicate::Lt { .. }
943        | Predicate::Lte { .. }
944        | Predicate::Gt { .. }
945        | Predicate::Gte { .. }
946        | Predicate::Or(_) => None,
947        Predicate::And(items) => {
948            // Prefer unique index predicates, else first indexed predicate.
949            let mut best: Option<(&crate::schema::IndexDef, ScalarValue, Predicate)> = None;
950            for p in items {
951                if let Some((idx, v, used)) = choose_index(indexes, p) {
952                    match best {
953                        None => best = Some((idx, v, used)),
954                        Some((best_idx, _, _)) => {
955                            if best_idx.kind != IndexKind::Unique && idx.kind == IndexKind::Unique {
956                                best = Some((idx, v, used));
957                            }
958                        }
959                    }
960                }
961            }
962            best
963        }
964    }
965}
966
967fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
968    if pred == used {
969        return None;
970    }
971    match pred {
972        Predicate::And(items) => {
973            let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
974            match out.len() {
975                0 => None,
976                1 => Some(out.remove(0)),
977                _ => Some(Predicate::And(out)),
978            }
979        }
980        _ => Some(pred),
981    }
982}
983
984fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
985    match pred {
986        Predicate::Eq { path, value } => scalar_at_path(row, path)
987            .map(|s| &s == value)
988            .unwrap_or(false),
989        Predicate::Lt { path, value } => scalar_at_path(row, path)
990            .and_then(|s| scalar_partial_cmp(&s, value))
991            .map(|o| o.is_lt())
992            .unwrap_or(false),
993        Predicate::Lte { path, value } => scalar_at_path(row, path)
994            .and_then(|s| scalar_partial_cmp(&s, value))
995            .map(|o| o.is_lt() || o.is_eq())
996            .unwrap_or(false),
997        Predicate::Gt { path, value } => scalar_at_path(row, path)
998            .and_then(|s| scalar_partial_cmp(&s, value))
999            .map(|o| o.is_gt())
1000            .unwrap_or(false),
1001        Predicate::Gte { path, value } => scalar_at_path(row, path)
1002            .and_then(|s| scalar_partial_cmp(&s, value))
1003            .map(|o| o.is_gt() || o.is_eq())
1004            .unwrap_or(false),
1005        Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1006        Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1007    }
1008}
1009
1010fn apply_order_by_and_limit(
1011    rows: &mut Vec<BTreeMap<String, RowValue>>,
1012    order_by: Option<&OrderBy>,
1013    limit: Option<usize>,
1014) {
1015    if let Some(ob) = order_by {
1016        rows.sort_by(|a, b| {
1017            let av = scalar_at_path(a, &ob.path);
1018            let bv = scalar_at_path(b, &ob.path);
1019            let ord = match (av, bv) {
1020                (None, None) => std::cmp::Ordering::Equal,
1021                (None, Some(_)) => std::cmp::Ordering::Greater,
1022                (Some(_), None) => std::cmp::Ordering::Less,
1023                (Some(x), Some(y)) => {
1024                    scalar_partial_cmp(&x, &y).unwrap_or(std::cmp::Ordering::Equal)
1025                }
1026            };
1027            match ob.direction {
1028                OrderDirection::Asc => ord,
1029                OrderDirection::Desc => ord.reverse(),
1030            }
1031        });
1032    }
1033    if let Some(n) = limit {
1034        rows.truncate(n);
1035    }
1036}
1037
1038fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1039    use ScalarValue::*;
1040    match (a, b) {
1041        (Bool(x), Bool(y)) => Some(x.cmp(y)),
1042        (Int64(x), Int64(y)) => Some(x.cmp(y)),
1043        (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1044        (Float64(x), Float64(y)) => x.partial_cmp(y),
1045        (String(x), String(y)) => Some(x.cmp(y)),
1046        (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1047        (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1048        (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1049        _ => None,
1050    }
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    include!(concat!(
1056        env!("CARGO_MANIFEST_DIR"),
1057        "/tests/unit/src_query_planner_tests.rs"
1058    ));
1059}