Skip to main content

modelvault_core/query/
planner.rs

1#[cfg(test)]
2use std::cell::RefCell;
3use std::collections::hash_map::Iter as HashMapIter;
4use std::collections::BTreeMap;
5
6use crate::catalog::Catalog;
7use crate::db::scalar_at_path;
8use crate::error::{DbError, SchemaError};
9use crate::index::IndexState;
10use crate::record::RowValue;
11use crate::schema::{CollectionId, IndexKind};
12use crate::storage::{FileStore, Store};
13use crate::ScalarValue;
14
15use super::ast::{OrderBy, OrderDirection};
16use super::ast::{Predicate, Query};
17use super::operators::{LimitOp, RowKey, RowSource};
18
19fn row_for_index_pk(
20    latest: &crate::db::LatestMap,
21    collection_id: u32,
22    pk_key: Vec<u8>,
23    index_name: &str,
24) -> Result<BTreeMap<String, RowValue>, DbError> {
25    latest
26        .get(&(collection_id, pk_key))
27        .cloned()
28        .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
29            collection_id,
30            index_name: index_name.to_string(),
31        }))
32}
33
34#[derive(Debug, Clone, PartialEq)]
35enum Plan {
36    IndexLookup {
37        collection_id: u32,
38        index_name: String,
39        kind: IndexKind,
40        key: Vec<u8>,
41        residual: Option<Predicate>,
42        limit: Option<usize>,
43        order_by: Option<OrderBy>,
44    },
45    CollectionScan {
46        collection_id: u32,
47        predicate: Option<Predicate>,
48        limit: Option<usize>,
49        order_by: Option<OrderBy>,
50    },
51}
52
53pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
54    let col =
55        catalog
56            .get(query.collection)
57            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
58                id: query.collection.0,
59            }))?;
60    let plan = plan_query(col.id, &col.indexes, query);
61    #[cfg(feature = "tracing")]
62    tracing::debug!(plan = ?plan, "explain_query");
63    Ok(match plan {
64        Plan::IndexLookup {
65            index_name,
66            kind,
67            residual,
68            limit,
69            order_by,
70            ..
71        } => {
72            let mut s = String::new();
73            s.push_str("Plan:\n");
74            s.push_str(&format!(
75                "  IndexLookup index={index_name:?} kind={kind:?}\n"
76            ));
77            if let Some(r) = residual {
78                s.push_str(&format!("  ResidualFilter {r:?}\n"));
79            }
80            if let Some(n) = limit {
81                s.push_str(&format!("  Limit {n}\n"));
82            }
83            if let Some(ob) = order_by {
84                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
85            }
86            s
87        }
88        Plan::CollectionScan {
89            predicate,
90            limit,
91            order_by,
92            ..
93        } => {
94            let mut s = String::new();
95            s.push_str("Plan:\n");
96            s.push_str("  CollectionScan\n");
97            if let Some(p) = predicate {
98                s.push_str(&format!("  Filter {p:?}\n"));
99            }
100            if let Some(n) = limit {
101                s.push_str(&format!("  Limit {n}\n"));
102            }
103            if let Some(ob) = order_by {
104                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
105            }
106            s
107        }
108    })
109}
110
111pub fn execute_query(
112    catalog: &Catalog,
113    indexes: &IndexState,
114    latest: &crate::db::LatestMap,
115    query: &Query,
116) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
117    let col =
118        catalog
119            .get(query.collection)
120            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
121                id: query.collection.0,
122            }))?;
123    let plan = plan_query(col.id, &col.indexes, query);
124
125    #[cfg(feature = "tracing")]
126    tracing::debug!(plan = ?plan, "execute_query");
127
128    match plan {
129        Plan::IndexLookup {
130            collection_id,
131            index_name,
132            kind,
133            key,
134            residual,
135            limit,
136            order_by,
137        } => {
138            let mut out = Vec::new();
139
140            match kind {
141                IndexKind::Unique => {
142                    if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
143                        out.push(row_for_index_pk(
144                            latest,
145                            collection_id,
146                            pk.to_vec(),
147                            &index_name,
148                        )?);
149                    }
150                }
151                IndexKind::NonUnique => {
152                    if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
153                        for pk in pks {
154                            out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
155                        }
156                    }
157                }
158            }
159
160            if let Some(pred) = residual {
161                out.retain(|row| eval_predicate(row, &pred));
162            }
163            apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
164            Ok(out)
165        }
166        Plan::CollectionScan {
167            collection_id,
168            predicate,
169            limit,
170            order_by,
171        } => {
172            let mut out = Vec::new();
173            for ((cid, _pk), row) in latest.iter() {
174                if *cid != collection_id {
175                    continue;
176                }
177                if let Some(ref p) = predicate {
178                    if !eval_predicate(row, p) {
179                        continue;
180                    }
181                }
182                out.push(row.clone());
183            }
184            apply_order_by_and_limit(&mut out, order_by.as_ref(), limit);
185            Ok(out)
186        }
187    }
188}
189
190/// Pull-based row iterator for simple queries (0.7 execution boundary).
191///
192/// This is **not** a full Volcano-style operator engine yet (no joins / async), but it does
193/// establish an internal streaming operator boundary by yielding `(collection_id, pk_key)` and
194/// materializing rows from `latest` at the edge.
195pub struct QueryRowIter<'a> {
196    state: QueryRowIterState<'a>,
197}
198
199enum QueryRowIterState<'a> {
200    Vec {
201        rows: Vec<BTreeMap<String, RowValue>>,
202        pos: usize,
203    },
204    Source {
205        latest: &'a crate::db::LatestMap,
206        source: Box<dyn RowSource + 'a>,
207    },
208}
209
210impl<'a> Iterator for QueryRowIter<'a> {
211    type Item = Result<BTreeMap<String, RowValue>, DbError>;
212
213    fn next(&mut self) -> Option<Self::Item> {
214        match &mut self.state {
215            QueryRowIterState::Vec { rows, pos } => {
216                if *pos >= rows.len() {
217                    None
218                } else {
219                    let out = rows[*pos].clone();
220                    *pos += 1;
221                    Some(Ok(out))
222                }
223            }
224            QueryRowIterState::Source { latest, source } => match source.next_key() {
225                None => None,
226                Some(Err(e)) => Some(Err(e)),
227                Some(Ok((cid, pk_key))) => Some(row_for_index_pk(latest, cid.0, pk_key, "")),
228            },
229        }
230    }
231}
232
233struct IndexUniqueSource<'a> {
234    latest: &'a crate::db::LatestMap,
235    collection_id: u32,
236    index_name: String,
237    pk: Option<Vec<u8>>,
238    residual: Option<Predicate>,
239    done: bool,
240}
241
242impl RowSource for IndexUniqueSource<'_> {
243    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
244        if self.done {
245            return None;
246        }
247        self.done = true;
248        let pk_key = self.pk.take()?;
249        let row = match row_for_index_pk(
250            self.latest,
251            self.collection_id,
252            pk_key.clone(),
253            &self.index_name,
254        ) {
255            Ok(r) => r,
256            Err(e) => return Some(Err(e)),
257        };
258        if let Some(pred) = &self.residual {
259            if !eval_predicate(&row, pred) {
260                return None;
261            }
262        }
263        Some(Ok((CollectionId(self.collection_id), pk_key)))
264    }
265}
266
267struct IndexNonUniqueSource<'a> {
268    latest: &'a crate::db::LatestMap,
269    collection_id: u32,
270    index_name: String,
271    pks: std::vec::IntoIter<Vec<u8>>,
272    residual: Option<Predicate>,
273}
274
275impl RowSource for IndexNonUniqueSource<'_> {
276    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
277        for pk_key in self.pks.by_ref() {
278            let row = match row_for_index_pk(
279                self.latest,
280                self.collection_id,
281                pk_key.clone(),
282                &self.index_name,
283            ) {
284                Ok(r) => r,
285                Err(e) => return Some(Err(e)),
286            };
287            if let Some(pred) = &self.residual {
288                if !eval_predicate(&row, pred) {
289                    continue;
290                }
291            }
292            return Some(Ok((CollectionId(self.collection_id), pk_key)));
293        }
294        None
295    }
296}
297
298struct ScanSource<'a> {
299    it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
300    collection_id: u32,
301    predicate: Option<Predicate>,
302}
303
304impl RowSource for ScanSource<'_> {
305    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
306        for (&(cid, ref pk_key), row) in self.it.by_ref() {
307            if cid != self.collection_id {
308                continue;
309            }
310            if let Some(p) = &self.predicate {
311                if !eval_predicate(row, p) {
312                    continue;
313                }
314            }
315            return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
316        }
317        None
318    }
319}
320
321/// Same planning and row sources as [`execute_query`], but as a lazy iterator.
322pub fn execute_query_iter<'a>(
323    catalog: &'a Catalog,
324    indexes: &'a IndexState,
325    latest: &'a crate::db::LatestMap,
326    query: &Query,
327) -> Result<QueryRowIter<'a>, DbError> {
328    if query.order_by.is_some() {
329        return Ok(QueryRowIter {
330            state: QueryRowIterState::Vec {
331                rows: execute_query(catalog, indexes, latest, query)?,
332                pos: 0,
333            },
334        });
335    }
336    let col =
337        catalog
338            .get(query.collection)
339            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
340                id: query.collection.0,
341            }))?;
342    let plan = plan_query(col.id, &col.indexes, query);
343    let mut source: Box<dyn RowSource + 'a> = match plan {
344        Plan::IndexLookup {
345            collection_id,
346            index_name,
347            kind,
348            key,
349            residual,
350            ..
351        } => match kind {
352            IndexKind::Unique => {
353                let pk = indexes
354                    .unique_lookup(collection_id, &index_name, &key)
355                    .map(|p| p.to_vec());
356                Box::new(IndexUniqueSource {
357                    latest,
358                    collection_id,
359                    index_name,
360                    pk,
361                    residual,
362                    done: false,
363                })
364            }
365            IndexKind::NonUnique => {
366                let pks = indexes
367                    .non_unique_lookup(collection_id, &index_name, &key)
368                    .unwrap_or_default()
369                    .into_iter();
370                Box::new(IndexNonUniqueSource {
371                    latest,
372                    collection_id,
373                    index_name,
374                    pks,
375                    residual,
376                })
377            }
378        },
379        Plan::CollectionScan {
380            collection_id,
381            predicate,
382            ..
383        } => Box::new(ScanSource {
384            it: latest.iter(),
385            collection_id,
386            predicate,
387        }),
388    };
389
390    if let Some(n) = query.limit {
391        source = Box::new(LimitOp::new(source, n));
392    }
393
394    Ok(QueryRowIter {
395        state: QueryRowIterState::Source { latest, source },
396    })
397}
398
399#[cfg(test)]
400type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
401#[cfg(test)]
402type SortedQuerySpillStoreOverrideHook =
403    Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
404
405#[cfg(test)]
406thread_local! {
407    static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
408        RefCell::new(None);
409
410    static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
411        RefCell::new(None);
412}
413
414/// Covers sorted-query spill `FileStore` construction error paths during unit tests only.
415#[cfg(test)]
416pub(crate) fn test_set_sorted_query_spill_store_open_hook(
417    hook: Option<SortedQuerySpillStoreOpenHook>,
418) {
419    QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
420        *c.borrow_mut() = hook;
421    });
422}
423
424/// Test-only: override the underlying spill store implementation.
425#[cfg(test)]
426pub(crate) fn test_set_sorted_query_spill_store_override_hook(
427    hook: Option<SortedQuerySpillStoreOverrideHook>,
428) {
429    QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
430        *c.borrow_mut() = hook;
431    });
432}
433
434pub(crate) enum SortedQuerySpillStore {
435    File(FileStore),
436    #[cfg(test)]
437    FailLen,
438}
439
440impl Store for SortedQuerySpillStore {
441    fn len(&self) -> Result<u64, DbError> {
442        match self {
443            Self::File(f) => f.len(),
444            #[cfg(test)]
445            Self::FailLen => Err(DbError::Io(std::io::Error::other(
446                "sorted query spill store synthetic len() failure (test override)",
447            ))),
448        }
449    }
450
451    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
452        match self {
453            Self::File(f) => f.read_exact_at(offset, buf),
454            #[cfg(test)]
455            Self::FailLen => Err(DbError::Io(std::io::Error::other(
456                "sorted query spill store synthetic read failure (test override)",
457            ))),
458        }
459    }
460
461    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
462        match self {
463            Self::File(f) => f.write_all_at(offset, buf),
464            #[cfg(test)]
465            Self::FailLen => Err(DbError::Io(std::io::Error::other(
466                "sorted query spill store synthetic write failure (test override)",
467            ))),
468        }
469    }
470
471    fn sync(&mut self) -> Result<(), DbError> {
472        match self {
473            Self::File(f) => f.sync(),
474            #[cfg(test)]
475            Self::FailLen => Ok(()),
476        }
477    }
478
479    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
480        match self {
481            Self::File(f) => f.truncate(len),
482            #[cfg(test)]
483            Self::FailLen => Ok(()),
484        }
485    }
486}
487
488fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
489    #[cfg(test)]
490    {
491        let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
492            let mut bm = c.borrow_mut();
493            bm.as_mut().map(|hook| hook(path))
494        });
495        if let Some(r) = overridden {
496            return r;
497        }
498
499        let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
500            let mut bm = c.borrow_mut();
501            bm.as_mut().map(|hook| hook(path))
502        });
503        if let Some(r) = hooked {
504            return r.map(SortedQuerySpillStore::File);
505        }
506    }
507    let _ = path;
508    // Never spill into the live DB file: a second handle would race with truncate-on-drop.
509    let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
510    Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
511}
512
513/// Like [`execute_query_iter`], but when `q.order_by` is set this will attempt a bounded-memory
514/// external sort by spilling ephemeral `Temp` segments to a dedicated temporary file.
515///
516/// If `db_path` is `None` (e.g. in-memory), this falls back to the in-memory sort path.
517pub fn execute_query_iter_with_spill_path<'a>(
518    catalog: &'a Catalog,
519    indexes: &'a IndexState,
520    latest: &'a crate::db::LatestMap,
521    q: &Query,
522    db_path: Option<&std::path::Path>,
523) -> Result<QueryRowIter<'a>, DbError> {
524    if q.order_by.is_none() {
525        return execute_query_iter(catalog, indexes, latest, q);
526    }
527    let order_by = q
528        .order_by
529        .clone()
530        .expect("order_by is Some when this function continues");
531
532    // If we don't have a file path to spill into, fall back to the existing in-memory behavior.
533    let Some(path) = db_path else {
534        return Ok(QueryRowIter {
535            state: QueryRowIterState::Vec {
536                rows: execute_query(catalog, indexes, latest, q)?,
537                pos: 0,
538            },
539        });
540    };
541
542    let col = catalog
543        .get(q.collection)
544        .ok_or(DbError::Schema(SchemaError::UnknownCollection {
545            id: q.collection.0,
546        }))?;
547    let plan = plan_query(col.id, &col.indexes, q);
548
549    let base: Box<dyn RowSource + 'a> = match plan.clone() {
550        Plan::IndexLookup {
551            collection_id,
552            index_name,
553            kind,
554            key,
555            residual,
556            ..
557        } => match kind {
558            IndexKind::Unique => Box::new(IndexUniqueSource {
559                latest,
560                collection_id,
561                index_name: index_name.clone(),
562                pk: indexes
563                    .unique_lookup(collection_id, &index_name, &key)
564                    .map(|p| p.to_vec()),
565                residual,
566                done: false,
567            }),
568            IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
569                latest,
570                collection_id,
571                index_name: index_name.clone(),
572                pks: indexes
573                    .non_unique_lookup(collection_id, &index_name, &key)
574                    .unwrap_or_default()
575                    .into_iter(),
576                residual,
577            }),
578        },
579        Plan::CollectionScan {
580            collection_id,
581            predicate,
582            ..
583        } => Box::new(ScanSource {
584            it: latest.iter(),
585            collection_id,
586            predicate,
587        }),
588    };
589
590    // Build a sorted key source (potentially spilling to Temp segments).
591    let spill_store = open_sorted_query_spill_store(path)?;
592    #[cfg(feature = "tracing")]
593    tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
594    let spill = crate::spill::TempSpillFile::new(spill_store)?;
595    let index_name_for_sort = match &plan {
596        Plan::IndexLookup { index_name, .. } => index_name.as_str(),
597        Plan::CollectionScan { .. } => "",
598    };
599    let sort_source = Box::new(ExternalSortSource::new(
600        spill,
601        latest,
602        base,
603        col.id.0,
604        order_by,
605        index_name_for_sort,
606    )?);
607
608    let mut source: Box<dyn RowSource + 'a> = sort_source;
609    if let Some(n) = q.limit {
610        source = Box::new(LimitOp::new(source, n));
611    }
612
613    Ok(QueryRowIter {
614        state: QueryRowIterState::Source { latest, source },
615    })
616}
617
618#[derive(Clone)]
619struct SortItem {
620    // `none_flag`: 0 for Some, 1 for None (so None sorts last on ascending).
621    none_flag: u8,
622    sort_key: Vec<u8>,
623    key: RowKey,
624}
625
626#[allow(dead_code)]
627fn sort_item_for(
628    latest: &crate::db::LatestMap,
629    key: &RowKey,
630    order_by: &OrderBy,
631) -> Option<SortItem> {
632    sort_item_for_result(latest, key, order_by, "").ok()
633}
634
635fn sort_item_for_result(
636    latest: &crate::db::LatestMap,
637    key: &RowKey,
638    order_by: &OrderBy,
639    index_name: &str,
640) -> Result<SortItem, DbError> {
641    let (cid, pk) = key;
642    let row =
643        latest
644            .get(&(cid.0, pk.clone()))
645            .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
646                collection_id: cid.0,
647                index_name: index_name.to_string(),
648            }))?;
649    let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
650        None => (1u8, Vec::new()),
651        Some(s) => (0u8, scalar_sort_key_bytes(&s)),
652    };
653    Ok(SortItem {
654        none_flag,
655        sort_key,
656        key: (CollectionId(cid.0), pk.clone()),
657    })
658}
659
660fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
661    match s {
662        ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
663        ScalarValue::Int64(v) => {
664            let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
665            let mut out = vec![1];
666            out.extend_from_slice(&u.to_be_bytes());
667            out
668        }
669        ScalarValue::Uint64(v) => {
670            let mut out = vec![2];
671            out.extend_from_slice(&v.to_be_bytes());
672            out
673        }
674        ScalarValue::Float64(v) => {
675            let mut bits = v.to_bits();
676            if bits & (1u64 << 63) != 0 {
677                bits = !bits;
678            } else {
679                bits ^= 1u64 << 63;
680            }
681            let mut out = vec![3];
682            out.extend_from_slice(&bits.to_be_bytes());
683            out
684        }
685        ScalarValue::String(st) => {
686            let mut out = vec![4];
687            out.extend_from_slice(st.as_bytes());
688            out
689        }
690        ScalarValue::Bytes(b) => {
691            let mut out = vec![5];
692            out.extend_from_slice(b);
693            out
694        }
695        ScalarValue::Uuid(u) => {
696            let mut out = vec![6];
697            out.extend_from_slice(u);
698            out
699        }
700        ScalarValue::Timestamp(t) => {
701            let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
702            let mut out = vec![7];
703            out.extend_from_slice(&u.to_be_bytes());
704            out
705        }
706    }
707}
708
709fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
710    let ord = a
711        .none_flag
712        .cmp(&b.none_flag)
713        .then_with(|| a.sort_key.cmp(&b.sort_key))
714        .then_with(|| a.key.1.cmp(&b.key.1));
715    match dir {
716        OrderDirection::Asc => ord,
717        OrderDirection::Desc => ord.reverse(),
718    }
719}
720
721// Simple external sort: sort fixed-size runs, spill each run as one Temp segment,
722// then k-way merge those runs.
723struct ExternalSortSource<'a, S: Store = FileStore> {
724    _spill: crate::spill::TempSpillFile<S>,
725    collection_id: u32,
726    dir: OrderDirection,
727    heap: std::collections::BinaryHeap<HeapItem>,
728    runs: Vec<RunReader>,
729    _latest: &'a crate::db::LatestMap,
730}
731
732#[derive(Clone)]
733struct RunMeta {
734    offset: u64,
735    payload_len: u64,
736}
737
738struct RunReader {
739    buf: Vec<u8>,
740    pos: usize,
741}
742
743impl RunReader {
744    fn new(buf: Vec<u8>) -> Self {
745        Self { buf, pos: 0 }
746    }
747
748    fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
749        fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
750            let b = buf.get(*pos..*pos + 4)?;
751            *pos += 4;
752            Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
753        }
754        let none_flag = *self.buf.get(self.pos)?;
755        self.pos += 1;
756        let key_len = read_u32(&self.buf, &mut self.pos)? as usize;
757        let key = self.buf.get(self.pos..self.pos + key_len)?.to_vec();
758        self.pos += key_len;
759        let pk_len = read_u32(&self.buf, &mut self.pos)? as usize;
760        let pk = self.buf.get(self.pos..self.pos + pk_len)?.to_vec();
761        self.pos += pk_len;
762        Some((none_flag, key, pk))
763    }
764}
765
766#[derive(Clone)]
767struct HeapItem {
768    run_idx: usize,
769    none_flag: u8,
770    sort_key: Vec<u8>,
771    pk: Vec<u8>,
772    dir: OrderDirection,
773}
774
775impl PartialEq for HeapItem {
776    fn eq(&self, other: &Self) -> bool {
777        (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
778    }
779}
780impl Eq for HeapItem {}
781
782impl PartialOrd for HeapItem {
783    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
784        Some(self.cmp(other))
785    }
786}
787impl Ord for HeapItem {
788    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
789        // BinaryHeap is max-heap; invert to get min-heap behavior.
790        let a = SortItem {
791            none_flag: self.none_flag,
792            sort_key: self.sort_key.clone(),
793            key: (CollectionId(0), self.pk.clone()),
794        };
795        let b = SortItem {
796            none_flag: other.none_flag,
797            sort_key: other.sort_key.clone(),
798            key: (CollectionId(0), other.pk.clone()),
799        };
800        cmp_sort_item(&a, &b, self.dir).reverse()
801    }
802}
803
804impl<'a, S: Store> ExternalSortSource<'a, S> {
805    fn flush_sorted_run(
806        spill: &mut crate::spill::TempSpillFile<S>,
807        runs_meta: &mut Vec<RunMeta>,
808        run: &mut Vec<SortItem>,
809        dir: OrderDirection,
810    ) -> Result<(), DbError> {
811        if run.is_empty() {
812            return Ok(());
813        }
814        run.sort_by(|a, b| cmp_sort_item(a, b, dir));
815        let payload = encode_run(run, dir);
816        let off = spill.append_temp_segment(&payload)?;
817        runs_meta.push(RunMeta {
818            offset: off,
819            payload_len: payload.len() as u64,
820        });
821        run.clear();
822        Ok(())
823    }
824
825    fn new(
826        mut spill: crate::spill::TempSpillFile<S>,
827        latest: &'a crate::db::LatestMap,
828        mut input: Box<dyn RowSource + 'a>,
829        collection_id: u32,
830        order_by: OrderBy,
831        index_name: &str,
832    ) -> Result<Self, DbError> {
833        const RUN_KEYS: usize = 2048;
834
835        let dir = order_by.direction;
836        let mut runs_meta: Vec<RunMeta> = Vec::new();
837        let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
838
839        while let Some(rk) = input.next_key() {
840            let rk = rk?;
841            let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
842            run.push(item);
843            if run.len() >= RUN_KEYS {
844                Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
845            }
846        }
847
848        Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
849
850        // Load run buffers and seed heap.
851        let mut runs: Vec<RunReader> = Vec::new();
852        let mut heap = std::collections::BinaryHeap::new();
853        for (i, m) in runs_meta.into_iter().enumerate() {
854            let buf = spill.read_temp_payload(m.offset, m.payload_len)?;
855            let mut rr = RunReader::new(buf);
856            if let Some((none_flag, sort_key, pk)) = rr.next_item() {
857                heap.push(HeapItem {
858                    run_idx: i,
859                    none_flag,
860                    sort_key,
861                    pk: pk.clone(),
862                    dir,
863                });
864            }
865            runs.push(rr);
866        }
867
868        Ok(Self {
869            _spill: spill,
870            collection_id,
871            dir,
872            heap,
873            runs,
874            _latest: latest,
875        })
876    }
877}
878
879fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
880    let mut out = Vec::new();
881    for it in run {
882        out.push(it.none_flag);
883        out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
884        out.extend_from_slice(&it.sort_key);
885        out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
886        out.extend_from_slice(&it.key.1);
887    }
888    out
889}
890
891impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
892    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
893        let top = self.heap.pop()?;
894        let run_idx = top.run_idx;
895        // refill from same run
896        if let Some((none_flag, sort_key, pk)) = self.runs[run_idx].next_item() {
897            self.heap.push(HeapItem {
898                run_idx,
899                none_flag,
900                sort_key,
901                pk: pk.clone(),
902                dir: self.dir,
903            });
904        }
905        Some(Ok((CollectionId(self.collection_id), top.pk)))
906    }
907}
908
909fn plan_query(
910    collection: CollectionId,
911    indexes: &[crate::schema::IndexDef],
912    query: &Query,
913) -> Plan {
914    let Some(pred) = query.predicate.clone() else {
915        return Plan::CollectionScan {
916            collection_id: collection.0,
917            predicate: None,
918            limit: query.limit,
919            order_by: query.order_by.clone(),
920        };
921    };
922
923    let (best, residual) = match choose_index(indexes, &pred) {
924        None => (None, Some(pred)),
925        Some((idx, value, used_pred)) => {
926            let residual = remove_used_predicate(pred, used_pred);
927            (Some((idx, value)), residual)
928        }
929    };
930
931    if let Some((idx, value)) = best {
932        Plan::IndexLookup {
933            collection_id: collection.0,
934            index_name: idx.name.clone(),
935            kind: idx.kind,
936            key: value.canonical_key_bytes(),
937            residual,
938            limit: query.limit,
939            order_by: query.order_by.clone(),
940        }
941    } else {
942        Plan::CollectionScan {
943            collection_id: collection.0,
944            predicate: residual,
945            limit: query.limit,
946            order_by: query.order_by.clone(),
947        }
948    }
949}
950
951fn choose_index<'a>(
952    indexes: &'a [crate::schema::IndexDef],
953    pred: &Predicate,
954) -> Option<(&'a crate::schema::IndexDef, ScalarValue, Predicate)> {
955    match pred {
956        Predicate::Eq { path, value } => indexes
957            .iter()
958            .find(|idx| &idx.path == path)
959            .map(|idx| (idx, value.clone(), pred.clone())),
960        Predicate::Lt { .. }
961        | Predicate::Lte { .. }
962        | Predicate::Gt { .. }
963        | Predicate::Gte { .. }
964        | Predicate::Or(_) => None,
965        Predicate::And(items) => {
966            // Prefer unique index predicates, else first indexed predicate.
967            let mut best: Option<(&crate::schema::IndexDef, ScalarValue, Predicate)> = None;
968            for p in items {
969                if let Some((idx, v, used)) = choose_index(indexes, p) {
970                    match best {
971                        None => best = Some((idx, v, used)),
972                        Some((best_idx, _, _)) => {
973                            if best_idx.kind != IndexKind::Unique && idx.kind == IndexKind::Unique {
974                                best = Some((idx, v, used));
975                            }
976                        }
977                    }
978                }
979            }
980            best
981        }
982    }
983}
984
985fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
986    if pred == used {
987        return None;
988    }
989    match pred {
990        Predicate::And(items) => {
991            let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
992            match out.len() {
993                0 => None,
994                1 => Some(out.remove(0)),
995                _ => Some(Predicate::And(out)),
996            }
997        }
998        _ => Some(pred),
999    }
1000}
1001
1002fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
1003    match pred {
1004        Predicate::Eq { path, value } => scalar_at_path(row, path)
1005            .map(|s| &s == value)
1006            .unwrap_or(false),
1007        Predicate::Lt { path, value } => scalar_at_path(row, path)
1008            .and_then(|s| scalar_partial_cmp(&s, value))
1009            .map(|o| o.is_lt())
1010            .unwrap_or(false),
1011        Predicate::Lte { path, value } => scalar_at_path(row, path)
1012            .and_then(|s| scalar_partial_cmp(&s, value))
1013            .map(|o| o.is_lt() || o.is_eq())
1014            .unwrap_or(false),
1015        Predicate::Gt { path, value } => scalar_at_path(row, path)
1016            .and_then(|s| scalar_partial_cmp(&s, value))
1017            .map(|o| o.is_gt())
1018            .unwrap_or(false),
1019        Predicate::Gte { path, value } => scalar_at_path(row, path)
1020            .and_then(|s| scalar_partial_cmp(&s, value))
1021            .map(|o| o.is_gt() || o.is_eq())
1022            .unwrap_or(false),
1023        Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1024        Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1025    }
1026}
1027
1028fn apply_order_by_and_limit(
1029    rows: &mut Vec<BTreeMap<String, RowValue>>,
1030    order_by: Option<&OrderBy>,
1031    limit: Option<usize>,
1032) {
1033    if let Some(ob) = order_by {
1034        rows.sort_by(|a, b| {
1035            let av = scalar_at_path(a, &ob.path);
1036            let bv = scalar_at_path(b, &ob.path);
1037            let ord = match (av, bv) {
1038                (None, None) => std::cmp::Ordering::Equal,
1039                (None, Some(_)) => std::cmp::Ordering::Greater,
1040                (Some(_), None) => std::cmp::Ordering::Less,
1041                (Some(x), Some(y)) => {
1042                    scalar_partial_cmp(&x, &y).unwrap_or(std::cmp::Ordering::Equal)
1043                }
1044            };
1045            match ob.direction {
1046                OrderDirection::Asc => ord,
1047                OrderDirection::Desc => ord.reverse(),
1048            }
1049        });
1050    }
1051    if let Some(n) = limit {
1052        rows.truncate(n);
1053    }
1054}
1055
1056fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1057    use ScalarValue::*;
1058    match (a, b) {
1059        (Bool(x), Bool(y)) => Some(x.cmp(y)),
1060        (Int64(x), Int64(y)) => Some(x.cmp(y)),
1061        (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1062        (Float64(x), Float64(y)) => x.partial_cmp(y),
1063        (String(x), String(y)) => Some(x.cmp(y)),
1064        (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1065        (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1066        (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1067        _ => None,
1068    }
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    include!(concat!(
1074        env!("CARGO_MANIFEST_DIR"),
1075        "/tests/unit/src_query_planner_tests.rs"
1076    ));
1077}