Skip to main content

modelvault_core/query/
planner.rs

1use std::borrow::Cow;
2#[cfg(test)]
3use std::cell::RefCell;
4use std::collections::hash_map::Iter as HashMapIter;
5use std::collections::BTreeMap;
6use std::sync::Arc;
7
8use crate::catalog::Catalog;
9use crate::db::scalar_at_path;
10use crate::db::SharedDbState;
11use crate::error::{DbError, QueryError, SchemaError};
12use crate::file_format::MAX_QUERY_LIMIT;
13use crate::index::IndexState;
14use crate::record::RowValue;
15use crate::schema::{CollectionId, FieldPath, IndexKind};
16use crate::storage::{FileStore, Store};
17use crate::ScalarValue;
18
19use super::ast::{OrderBy, OrderDirection};
20use super::ast::{Predicate, Query};
21use super::operators::{LimitOp, RowKey, RowSource};
22
23fn row_for_index_pk(
24    latest: &crate::db::LatestMap,
25    collection_id: u32,
26    pk_key: Vec<u8>,
27    index_name: &str,
28) -> Result<BTreeMap<String, RowValue>, DbError> {
29    latest
30        .get(&(collection_id, pk_key))
31        .cloned()
32        .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
33            collection_id,
34            index_name: index_name.to_string(),
35        }))
36}
37
38#[derive(Debug, Clone, PartialEq)]
39struct IndexKeyRange {
40    lo: Option<ScalarValue>,
41    lo_inclusive: bool,
42    hi: Option<ScalarValue>,
43    hi_inclusive: bool,
44}
45
46#[derive(Debug, Clone, PartialEq)]
47enum Plan {
48    IndexLookup {
49        collection_id: u32,
50        index_name: String,
51        kind: IndexKind,
52        key: Vec<u8>,
53        residual: Option<Predicate>,
54        limit: Option<usize>,
55        order_by: Option<OrderBy>,
56    },
57    IndexRangeLookup {
58        collection_id: u32,
59        index_name: String,
60        kind: IndexKind,
61        key_range: IndexKeyRange,
62        residual: Option<Predicate>,
63        limit: Option<usize>,
64        order_by: Option<OrderBy>,
65    },
66    CollectionScan {
67        collection_id: u32,
68        predicate: Option<Predicate>,
69        limit: Option<usize>,
70        order_by: Option<OrderBy>,
71    },
72}
73
74pub fn explain_query(catalog: &Catalog, query: &Query) -> Result<String, DbError> {
75    validate_query_limit(query)?;
76    let col =
77        catalog
78            .get(query.collection)
79            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
80                id: query.collection.0,
81            }))?;
82    let plan = plan_query(col.id, &col.indexes, query);
83    #[cfg(feature = "tracing")]
84    tracing::debug!(plan = ?plan, "explain_query");
85    Ok(match plan {
86        Plan::IndexLookup {
87            index_name,
88            kind,
89            residual,
90            limit,
91            order_by,
92            ..
93        } => {
94            let mut s = String::new();
95            s.push_str("Plan:\n");
96            s.push_str(&format!(
97                "  IndexLookup index={index_name:?} kind={kind:?}\n"
98            ));
99            if let Some(r) = residual {
100                s.push_str(&format!("  ResidualFilter {r:?}\n"));
101            }
102            if let Some(n) = limit {
103                s.push_str(&format!("  Limit {n}\n"));
104            }
105            if let Some(ob) = order_by {
106                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
107            }
108            s
109        }
110        Plan::IndexRangeLookup {
111            index_name,
112            kind,
113            key_range,
114            residual,
115            limit,
116            order_by,
117            ..
118        } => {
119            let mut s = String::new();
120            s.push_str("Plan:\n");
121            s.push_str(&format!(
122                "  IndexRangeLookup index={index_name:?} kind={kind:?}\n"
123            ));
124            if let Some(ref lo) = key_range.lo {
125                let op = if key_range.lo_inclusive { ">=" } else { ">" };
126                s.push_str(&format!("  KeyRange lo {op} {lo:?}\n"));
127            }
128            if let Some(ref hi) = key_range.hi {
129                let op = if key_range.hi_inclusive { "<=" } else { "<" };
130                s.push_str(&format!("  KeyRange hi {op} {hi:?}\n"));
131            }
132            if let Some(r) = residual {
133                s.push_str(&format!("  ResidualFilter {r:?}\n"));
134            }
135            if let Some(n) = limit {
136                s.push_str(&format!("  Limit {n}\n"));
137            }
138            if let Some(ob) = order_by {
139                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
140            }
141            s
142        }
143        Plan::CollectionScan {
144            predicate,
145            limit,
146            order_by,
147            ..
148        } => {
149            let mut s = String::new();
150            s.push_str("Plan:\n");
151            s.push_str("  CollectionScan\n");
152            if let Some(p) = predicate {
153                s.push_str(&format!("  Filter {p:?}\n"));
154            }
155            if let Some(n) = limit {
156                s.push_str(&format!("  Limit {n}\n"));
157            }
158            if let Some(ob) = order_by {
159                s.push_str(&format!("  OrderBy {:?} {:?}\n", ob.path, ob.direction));
160            }
161            s
162        }
163    })
164}
165
166fn validate_query_limit(query: &Query) -> Result<(), DbError> {
167    if let Some(n) = query.limit {
168        if n > MAX_QUERY_LIMIT {
169            return Err(DbError::Query(QueryError {
170                message: format!("query limit {n} exceeds maximum {MAX_QUERY_LIMIT}"),
171            }));
172        }
173    }
174    Ok(())
175}
176
177pub fn execute_query(
178    catalog: &Catalog,
179    indexes: &IndexState,
180    latest: &crate::db::LatestMap,
181    query: &Query,
182) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
183    validate_query_limit(query)?;
184    let col =
185        catalog
186            .get(query.collection)
187            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
188                id: query.collection.0,
189            }))?;
190    let plan = plan_query(col.id, &col.indexes, query);
191
192    #[cfg(feature = "tracing")]
193    tracing::debug!(plan = ?plan, "execute_query");
194
195    match plan {
196        Plan::IndexLookup {
197            collection_id,
198            index_name,
199            kind,
200            key,
201            residual,
202            limit,
203            order_by,
204        } => {
205            let mut out = Vec::new();
206
207            match kind {
208                IndexKind::Unique => {
209                    if let Some(pk) = indexes.unique_lookup(collection_id, &index_name, &key) {
210                        out.push(row_for_index_pk(
211                            latest,
212                            collection_id,
213                            pk.to_vec(),
214                            &index_name,
215                        )?);
216                    }
217                }
218                IndexKind::NonUnique => {
219                    if let Some(pks) = indexes.non_unique_lookup(collection_id, &index_name, &key) {
220                        for pk in pks {
221                            out.push(row_for_index_pk(latest, collection_id, pk, &index_name)?);
222                        }
223                    }
224                }
225            }
226
227            if let Some(pred) = residual {
228                out.retain(|row| eval_predicate(row, &pred));
229            }
230            apply_order_by_and_limit(
231                &mut out,
232                order_by.as_ref(),
233                limit,
234                col.primary_field.as_deref(),
235            );
236            Ok(out)
237        }
238        Plan::IndexRangeLookup {
239            collection_id,
240            index_name,
241            kind,
242            key_range,
243            residual,
244            limit,
245            order_by,
246        } => {
247            let mut out = collect_index_range_rows(
248                indexes,
249                latest,
250                collection_id,
251                &index_name,
252                kind,
253                &key_range,
254            )?;
255            if let Some(pred) = residual {
256                out.retain(|row| eval_predicate(row, &pred));
257            }
258            apply_order_by_and_limit(
259                &mut out,
260                order_by.as_ref(),
261                limit,
262                col.primary_field.as_deref(),
263            );
264            Ok(out)
265        }
266        Plan::CollectionScan {
267            collection_id,
268            predicate,
269            limit,
270            order_by,
271        } => {
272            let mut out = Vec::new();
273            for ((cid, _pk), row) in latest.iter() {
274                if *cid != collection_id {
275                    continue;
276                }
277                if let Some(ref p) = predicate {
278                    if !eval_predicate(row, p) {
279                        continue;
280                    }
281                }
282                out.push(row.clone());
283            }
284            apply_order_by_and_limit(
285                &mut out,
286                order_by.as_ref(),
287                limit,
288                col.primary_field.as_deref(),
289            );
290            Ok(out)
291        }
292    }
293}
294
295/// Pull-based row iterator for simple queries (0.7 execution boundary).
296///
297/// This is **not** a full Volcano-style operator engine yet (no joins / async), but it does
298/// establish an internal streaming operator boundary by yielding `(collection_id, pk_key)` and
299/// materializing rows from `latest` at the edge.
300pub struct QueryRowIter<'a> {
301    state: QueryRowIterState<'a>,
302}
303
304enum QueryRowIterState<'a> {
305    Vec {
306        rows: Vec<BTreeMap<String, RowValue>>,
307        pos: usize,
308    },
309    Source {
310        latest: &'a crate::db::LatestMap,
311        source: Box<dyn RowSource + 'a>,
312    },
313    Owned {
314        snapshot: Arc<SharedDbState>,
315        source: Box<dyn RowSource + 'static>,
316    },
317}
318
319impl<'a> Iterator for QueryRowIter<'a> {
320    type Item = Result<BTreeMap<String, RowValue>, DbError>;
321
322    fn next(&mut self) -> Option<Self::Item> {
323        match &mut self.state {
324            QueryRowIterState::Vec { rows, pos } => {
325                if *pos >= rows.len() {
326                    None
327                } else {
328                    let out = rows[*pos].clone();
329                    *pos += 1;
330                    Some(Ok(out))
331                }
332            }
333            QueryRowIterState::Source { latest, source } => match source.next_key() {
334                None => None,
335                Some(Err(e)) => Some(Err(e)),
336                Some(Ok((cid, pk_key))) => Some(row_for_index_pk(latest, cid.0, pk_key, "")),
337            },
338            QueryRowIterState::Owned { snapshot, source } => match source.next_key() {
339                None => None,
340                Some(Err(e)) => Some(Err(e)),
341                Some(Ok((cid, pk_key))) => {
342                    Some(row_for_index_pk(&snapshot.latest, cid.0, pk_key, ""))
343                }
344            },
345        }
346    }
347}
348
349struct IndexUniqueSource<'a> {
350    latest: &'a crate::db::LatestMap,
351    collection_id: u32,
352    index_name: String,
353    pk: Option<Vec<u8>>,
354    residual: Option<Predicate>,
355    done: bool,
356}
357
358impl RowSource for IndexUniqueSource<'_> {
359    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
360        if self.done {
361            return None;
362        }
363        self.done = true;
364        let pk_key = self.pk.take()?;
365        let row = match row_for_index_pk(
366            self.latest,
367            self.collection_id,
368            pk_key.clone(),
369            &self.index_name,
370        ) {
371            Ok(r) => r,
372            Err(e) => return Some(Err(e)),
373        };
374        if let Some(pred) = &self.residual {
375            if !eval_predicate(&row, pred) {
376                return None;
377            }
378        }
379        Some(Ok((CollectionId(self.collection_id), pk_key)))
380    }
381}
382
383struct IndexNonUniqueSource<'a> {
384    latest: &'a crate::db::LatestMap,
385    collection_id: u32,
386    index_name: String,
387    pks: std::vec::IntoIter<Vec<u8>>,
388    residual: Option<Predicate>,
389}
390
391impl RowSource for IndexNonUniqueSource<'_> {
392    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
393        for pk_key in self.pks.by_ref() {
394            let row = match row_for_index_pk(
395                self.latest,
396                self.collection_id,
397                pk_key.clone(),
398                &self.index_name,
399            ) {
400                Ok(r) => r,
401                Err(e) => return Some(Err(e)),
402            };
403            if let Some(pred) = &self.residual {
404                if !eval_predicate(&row, pred) {
405                    continue;
406                }
407            }
408            return Some(Ok((CollectionId(self.collection_id), pk_key)));
409        }
410        None
411    }
412}
413
414struct IndexRangeSource<'a> {
415    latest: &'a crate::db::LatestMap,
416    collection_id: u32,
417    index_name: String,
418    pks: std::vec::IntoIter<Vec<u8>>,
419    residual: Option<Predicate>,
420}
421
422impl RowSource for IndexRangeSource<'_> {
423    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
424        for pk_key in self.pks.by_ref() {
425            let row = match row_for_index_pk(
426                self.latest,
427                self.collection_id,
428                pk_key.clone(),
429                &self.index_name,
430            ) {
431                Ok(r) => r,
432                Err(e) => return Some(Err(e)),
433            };
434            if let Some(pred) = &self.residual {
435                if !eval_predicate(&row, pred) {
436                    continue;
437                }
438            }
439            return Some(Ok((CollectionId(self.collection_id), pk_key)));
440        }
441        None
442    }
443}
444
445fn collect_index_range_rows(
446    indexes: &IndexState,
447    latest: &crate::db::LatestMap,
448    collection_id: u32,
449    index_name: &str,
450    kind: IndexKind,
451    key_range: &IndexKeyRange,
452) -> Result<Vec<BTreeMap<String, RowValue>>, DbError> {
453    let lo = key_range.lo.as_ref();
454    let hi = key_range.hi.as_ref();
455    let pks = match kind {
456        IndexKind::Unique => indexes.unique_range_lookup(
457            collection_id,
458            index_name,
459            lo,
460            key_range.lo_inclusive,
461            hi,
462            key_range.hi_inclusive,
463        ),
464        IndexKind::NonUnique => indexes.non_unique_range_lookup(
465            collection_id,
466            index_name,
467            lo,
468            key_range.lo_inclusive,
469            hi,
470            key_range.hi_inclusive,
471        ),
472    };
473    let mut out = Vec::with_capacity(pks.len());
474    for pk in pks {
475        out.push(row_for_index_pk(latest, collection_id, pk, index_name)?);
476    }
477    Ok(out)
478}
479
480fn index_range_source<'a>(
481    indexes: &'a IndexState,
482    latest: &'a crate::db::LatestMap,
483    collection_id: u32,
484    index_name: String,
485    kind: IndexKind,
486    key_range: &IndexKeyRange,
487    residual: Option<Predicate>,
488) -> IndexRangeSource<'a> {
489    let lo = key_range.lo.as_ref();
490    let hi = key_range.hi.as_ref();
491    let pks = match kind {
492        IndexKind::Unique => indexes.unique_range_lookup(
493            collection_id,
494            &index_name,
495            lo,
496            key_range.lo_inclusive,
497            hi,
498            key_range.hi_inclusive,
499        ),
500        IndexKind::NonUnique => indexes.non_unique_range_lookup(
501            collection_id,
502            &index_name,
503            lo,
504            key_range.lo_inclusive,
505            hi,
506            key_range.hi_inclusive,
507        ),
508    };
509    IndexRangeSource {
510        latest,
511        collection_id,
512        index_name,
513        pks: pks.into_iter(),
514        residual,
515    }
516}
517
518struct ScanSource<'a> {
519    it: HashMapIter<'a, (u32, Vec<u8>), BTreeMap<String, RowValue>>,
520    collection_id: u32,
521    predicate: Option<Predicate>,
522}
523
524impl RowSource for ScanSource<'_> {
525    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
526        for (&(cid, ref pk_key), row) in self.it.by_ref() {
527            if cid != self.collection_id {
528                continue;
529            }
530            if let Some(p) = &self.predicate {
531                if !eval_predicate(row, p) {
532                    continue;
533                }
534            }
535            return Some(Ok((CollectionId(self.collection_id), pk_key.clone())));
536        }
537        None
538    }
539}
540
541struct OwnedIndexUniqueSource {
542    snapshot: Arc<SharedDbState>,
543    collection_id: u32,
544    index_name: String,
545    pk: Option<Vec<u8>>,
546    residual: Option<Predicate>,
547    done: bool,
548}
549
550impl RowSource for OwnedIndexUniqueSource {
551    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
552        if self.done {
553            return None;
554        }
555        self.done = true;
556        let pk_key = self.pk.take()?;
557        let row = match row_for_index_pk(
558            &self.snapshot.latest,
559            self.collection_id,
560            pk_key.clone(),
561            &self.index_name,
562        ) {
563            Ok(r) => r,
564            Err(e) => return Some(Err(e)),
565        };
566        if let Some(pred) = &self.residual {
567            if !eval_predicate(&row, pred) {
568                return None;
569            }
570        }
571        Some(Ok((CollectionId(self.collection_id), pk_key)))
572    }
573}
574
575struct OwnedIndexNonUniqueSource {
576    snapshot: Arc<SharedDbState>,
577    collection_id: u32,
578    index_name: String,
579    pks: std::vec::IntoIter<Vec<u8>>,
580    residual: Option<Predicate>,
581}
582
583impl RowSource for OwnedIndexNonUniqueSource {
584    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
585        for pk_key in self.pks.by_ref() {
586            let row = match row_for_index_pk(
587                &self.snapshot.latest,
588                self.collection_id,
589                pk_key.clone(),
590                &self.index_name,
591            ) {
592                Ok(r) => r,
593                Err(e) => return Some(Err(e)),
594            };
595            if let Some(pred) = &self.residual {
596                if !eval_predicate(&row, pred) {
597                    continue;
598                }
599            }
600            return Some(Ok((CollectionId(self.collection_id), pk_key)));
601        }
602        None
603    }
604}
605
606struct OwnedScanSource {
607    snapshot: Arc<SharedDbState>,
608    collection_id: u32,
609    predicate: Option<Predicate>,
610    pos: usize,
611    keys: Vec<(u32, Vec<u8>)>,
612}
613
614impl OwnedScanSource {
615    fn new(snapshot: Arc<SharedDbState>, collection_id: u32, predicate: Option<Predicate>) -> Self {
616        let mut keys: Vec<(u32, Vec<u8>)> = snapshot
617            .latest
618            .keys()
619            .filter(|(cid, _)| *cid == collection_id)
620            .cloned()
621            .collect();
622        keys.sort_by(|a, b| a.1.cmp(&b.1));
623        Self {
624            snapshot,
625            collection_id,
626            predicate,
627            pos: 0,
628            keys,
629        }
630    }
631}
632
633impl RowSource for OwnedScanSource {
634    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
635        while self.pos < self.keys.len() {
636            let (cid, pk_key) = self.keys[self.pos].clone();
637            self.pos += 1;
638            if cid != self.collection_id {
639                continue;
640            }
641            let row = match self.snapshot.latest.get(&(cid, pk_key.clone())) {
642                Some(r) => r,
643                None => continue,
644            };
645            if let Some(p) = &self.predicate {
646                if !eval_predicate(row, p) {
647                    continue;
648                }
649            }
650            return Some(Ok((CollectionId(self.collection_id), pk_key)));
651        }
652        None
653    }
654}
655
656fn owned_row_source_for_plan(
657    snapshot: Arc<SharedDbState>,
658    plan: Plan,
659) -> Box<dyn RowSource + 'static> {
660    match plan {
661        Plan::IndexLookup {
662            collection_id,
663            index_name,
664            kind,
665            key,
666            residual,
667            ..
668        } => match kind {
669            IndexKind::Unique => {
670                let pk: Option<Vec<u8>> = snapshot
671                    .indexes
672                    .unique_lookup(collection_id, &index_name, &key)
673                    .map(|p| p.to_vec());
674                Box::new(OwnedIndexUniqueSource {
675                    snapshot,
676                    collection_id,
677                    index_name,
678                    pk,
679                    residual,
680                    done: false,
681                })
682            }
683            IndexKind::NonUnique => {
684                let pks = snapshot
685                    .indexes
686                    .non_unique_lookup(collection_id, &index_name, &key)
687                    .unwrap_or_default()
688                    .into_iter();
689                Box::new(OwnedIndexNonUniqueSource {
690                    snapshot,
691                    collection_id,
692                    index_name,
693                    pks,
694                    residual,
695                })
696            }
697        },
698        Plan::IndexRangeLookup {
699            collection_id,
700            index_name,
701            kind,
702            key_range,
703            residual,
704            ..
705        } => {
706            let lo = key_range.lo.as_ref();
707            let hi = key_range.hi.as_ref();
708            let pks = match kind {
709                IndexKind::Unique => snapshot.indexes.unique_range_lookup(
710                    collection_id,
711                    &index_name,
712                    lo,
713                    key_range.lo_inclusive,
714                    hi,
715                    key_range.hi_inclusive,
716                ),
717                IndexKind::NonUnique => snapshot.indexes.non_unique_range_lookup(
718                    collection_id,
719                    &index_name,
720                    lo,
721                    key_range.lo_inclusive,
722                    hi,
723                    key_range.hi_inclusive,
724                ),
725            };
726            Box::new(OwnedIndexNonUniqueSource {
727                snapshot,
728                collection_id,
729                index_name,
730                pks: pks.into_iter(),
731                residual,
732            })
733        }
734        Plan::CollectionScan {
735            collection_id,
736            predicate,
737            ..
738        } => Box::new(OwnedScanSource::new(snapshot, collection_id, predicate)),
739    }
740}
741
742/// Like [`execute_query_iter`], but holds an owned live snapshot (for attached read-only handles).
743pub fn execute_query_iter_owned(
744    snapshot: Arc<SharedDbState>,
745    query: &Query,
746    db_path: Option<&std::path::Path>,
747) -> Result<QueryRowIter<'static>, DbError> {
748    if query.order_by.is_none() {
749        validate_query_limit(query)?;
750        let col = snapshot
751            .catalog
752            .get(query.collection)
753            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
754                id: query.collection.0,
755            }))?;
756        let plan = plan_query(col.id, &col.indexes, query);
757        let mut source = owned_row_source_for_plan(snapshot.clone(), plan);
758        if let Some(n) = query.limit {
759            source = Box::new(LimitOp::new(source, n));
760        }
761        return Ok(QueryRowIter {
762            state: QueryRowIterState::Owned { snapshot, source },
763        });
764    }
765
766    let order_by = query
767        .order_by
768        .clone()
769        .expect("order_by is Some when this function continues");
770    let Some(path) = db_path else {
771        return Ok(QueryRowIter {
772            state: QueryRowIterState::Vec {
773                rows: execute_query(
774                    &snapshot.catalog,
775                    &snapshot.indexes,
776                    &snapshot.latest,
777                    query,
778                )?,
779                pos: 0,
780            },
781        });
782    };
783
784    validate_query_limit(query)?;
785    let col = snapshot
786        .catalog
787        .get(query.collection)
788        .ok_or(DbError::Schema(SchemaError::UnknownCollection {
789            id: query.collection.0,
790        }))?;
791    let plan = plan_query(col.id, &col.indexes, query);
792    let base = owned_row_source_for_plan(snapshot.clone(), plan.clone());
793    let spill_store = open_sorted_query_spill_store(path)?;
794    #[cfg(feature = "tracing")]
795    tracing::debug!(spill_path = %path.display(), "execute_query_iter_owned_spill");
796    let spill = crate::spill::TempSpillFile::new(spill_store)?;
797    let index_name_for_sort = match &plan {
798        Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
799            index_name.as_str()
800        }
801        Plan::CollectionScan { .. } => "",
802    };
803    let sort_source = Box::new(ExternalSortSourceOwned::new(
804        spill,
805        snapshot.clone(),
806        base,
807        col.id.0,
808        order_by,
809        index_name_for_sort,
810    )?);
811    let mut source: Box<dyn RowSource + 'static> = sort_source;
812    if let Some(n) = query.limit {
813        source = Box::new(LimitOp::new(source, n));
814    }
815    Ok(QueryRowIter {
816        state: QueryRowIterState::Owned { snapshot, source },
817    })
818}
819
820/// Same planning and row sources as [`execute_query`], but as a lazy iterator.
821pub fn execute_query_iter<'a>(
822    catalog: &'a Catalog,
823    indexes: &'a IndexState,
824    latest: &'a crate::db::LatestMap,
825    query: &Query,
826) -> Result<QueryRowIter<'a>, DbError> {
827    if query.order_by.is_some() {
828        return Ok(QueryRowIter {
829            state: QueryRowIterState::Vec {
830                rows: execute_query(catalog, indexes, latest, query)?,
831                pos: 0,
832            },
833        });
834    }
835    let col =
836        catalog
837            .get(query.collection)
838            .ok_or(DbError::Schema(SchemaError::UnknownCollection {
839                id: query.collection.0,
840            }))?;
841    let plan = plan_query(col.id, &col.indexes, query);
842    let mut source: Box<dyn RowSource + 'a> = match plan {
843        Plan::IndexLookup {
844            collection_id,
845            index_name,
846            kind,
847            key,
848            residual,
849            ..
850        } => match kind {
851            IndexKind::Unique => {
852                let pk = indexes
853                    .unique_lookup(collection_id, &index_name, &key)
854                    .map(|p| p.to_vec());
855                Box::new(IndexUniqueSource {
856                    latest,
857                    collection_id,
858                    index_name,
859                    pk,
860                    residual,
861                    done: false,
862                })
863            }
864            IndexKind::NonUnique => {
865                let pks = indexes
866                    .non_unique_lookup(collection_id, &index_name, &key)
867                    .unwrap_or_default()
868                    .into_iter();
869                Box::new(IndexNonUniqueSource {
870                    latest,
871                    collection_id,
872                    index_name,
873                    pks,
874                    residual,
875                })
876            }
877        },
878        Plan::IndexRangeLookup {
879            collection_id,
880            index_name,
881            kind,
882            key_range,
883            residual,
884            ..
885        } => Box::new(index_range_source(
886            indexes,
887            latest,
888            collection_id,
889            index_name,
890            kind,
891            &key_range,
892            residual,
893        )),
894        Plan::CollectionScan {
895            collection_id,
896            predicate,
897            ..
898        } => Box::new(ScanSource {
899            it: latest.iter(),
900            collection_id,
901            predicate,
902        }),
903    };
904
905    if let Some(n) = query.limit {
906        source = Box::new(LimitOp::new(source, n));
907    }
908
909    Ok(QueryRowIter {
910        state: QueryRowIterState::Source { latest, source },
911    })
912}
913
914#[cfg(test)]
915type SortedQuerySpillStoreOpenHook = Box<dyn FnMut(&std::path::Path) -> Result<FileStore, DbError>>;
916#[cfg(test)]
917type SortedQuerySpillStoreOverrideHook =
918    Box<dyn FnMut(&std::path::Path) -> Result<SortedQuerySpillStore, DbError>>;
919
920#[cfg(test)]
921thread_local! {
922    static QUERY_SORT_SPILL_STORE_OPEN_HOOK: RefCell<Option<SortedQuerySpillStoreOpenHook>> =
923        RefCell::new(None);
924
925    static QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK: RefCell<Option<SortedQuerySpillStoreOverrideHook>> =
926        RefCell::new(None);
927}
928
929/// Covers sorted-query spill `FileStore` construction error paths during unit tests only.
930#[cfg(test)]
931pub(crate) fn test_set_sorted_query_spill_store_open_hook(
932    hook: Option<SortedQuerySpillStoreOpenHook>,
933) {
934    QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
935        *c.borrow_mut() = hook;
936    });
937}
938
939/// Test-only: override the underlying spill store implementation.
940#[cfg(test)]
941pub(crate) fn test_set_sorted_query_spill_store_override_hook(
942    hook: Option<SortedQuerySpillStoreOverrideHook>,
943) {
944    QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
945        *c.borrow_mut() = hook;
946    });
947}
948
949pub(crate) enum SortedQuerySpillStore {
950    File(FileStore),
951    #[cfg(test)]
952    FailLen,
953}
954
955impl Store for SortedQuerySpillStore {
956    fn len(&self) -> Result<u64, DbError> {
957        match self {
958            Self::File(f) => f.len(),
959            #[cfg(test)]
960            Self::FailLen => Err(DbError::Io(std::io::Error::other(
961                "sorted query spill store synthetic len() failure (test override)",
962            ))),
963        }
964    }
965
966    fn read_exact_at(&mut self, offset: u64, buf: &mut [u8]) -> Result<(), DbError> {
967        match self {
968            Self::File(f) => f.read_exact_at(offset, buf),
969            #[cfg(test)]
970            Self::FailLen => Err(DbError::Io(std::io::Error::other(
971                "sorted query spill store synthetic read failure (test override)",
972            ))),
973        }
974    }
975
976    fn write_all_at(&mut self, offset: u64, buf: &[u8]) -> Result<(), DbError> {
977        match self {
978            Self::File(f) => f.write_all_at(offset, buf),
979            #[cfg(test)]
980            Self::FailLen => Err(DbError::Io(std::io::Error::other(
981                "sorted query spill store synthetic write failure (test override)",
982            ))),
983        }
984    }
985
986    fn sync(&mut self) -> Result<(), DbError> {
987        match self {
988            Self::File(f) => f.sync(),
989            #[cfg(test)]
990            Self::FailLen => Ok(()),
991        }
992    }
993
994    fn truncate(&mut self, len: u64) -> Result<(), DbError> {
995        match self {
996            Self::File(f) => f.truncate(len),
997            #[cfg(test)]
998            Self::FailLen => Ok(()),
999        }
1000    }
1001}
1002
1003fn open_sorted_query_spill_store(path: &std::path::Path) -> Result<SortedQuerySpillStore, DbError> {
1004    #[cfg(test)]
1005    {
1006        let overridden = QUERY_SORT_SPILL_STORE_OVERRIDE_HOOK.with(|c| {
1007            let mut bm = c.borrow_mut();
1008            bm.as_mut().map(|hook| hook(path))
1009        });
1010        if let Some(r) = overridden {
1011            return r;
1012        }
1013
1014        let hooked = QUERY_SORT_SPILL_STORE_OPEN_HOOK.with(|c| {
1015            let mut bm = c.borrow_mut();
1016            bm.as_mut().map(|hook| hook(path))
1017        });
1018        if let Some(r) = hooked {
1019            return r.map(SortedQuerySpillStore::File);
1020        }
1021    }
1022    let _ = path;
1023    // Never spill into the live DB file: a second handle would race with truncate-on-drop.
1024    let spill_file = tempfile::tempfile().map_err(DbError::Io)?;
1025    Ok(SortedQuerySpillStore::File(FileStore::new(spill_file)))
1026}
1027
1028/// Like [`execute_query_iter`], but when `q.order_by` is set this will attempt a bounded-memory
1029/// external sort by spilling ephemeral `Temp` segments to a dedicated temporary file.
1030///
1031/// If `db_path` is `None` (e.g. in-memory), this falls back to the in-memory sort path.
1032pub fn execute_query_iter_with_spill_path<'a>(
1033    catalog: &'a Catalog,
1034    indexes: &'a IndexState,
1035    latest: &'a crate::db::LatestMap,
1036    q: &Query,
1037    db_path: Option<&std::path::Path>,
1038) -> Result<QueryRowIter<'a>, DbError> {
1039    if q.order_by.is_none() {
1040        return execute_query_iter(catalog, indexes, latest, q);
1041    }
1042    let order_by = q
1043        .order_by
1044        .clone()
1045        .expect("order_by is Some when this function continues");
1046
1047    // If we don't have a file path to spill into, fall back to the existing in-memory behavior.
1048    let Some(path) = db_path else {
1049        return Ok(QueryRowIter {
1050            state: QueryRowIterState::Vec {
1051                rows: execute_query(catalog, indexes, latest, q)?,
1052                pos: 0,
1053            },
1054        });
1055    };
1056
1057    let col = catalog
1058        .get(q.collection)
1059        .ok_or(DbError::Schema(SchemaError::UnknownCollection {
1060            id: q.collection.0,
1061        }))?;
1062    let plan = plan_query(col.id, &col.indexes, q);
1063
1064    let base: Box<dyn RowSource + 'a> = match plan.clone() {
1065        Plan::IndexLookup {
1066            collection_id,
1067            index_name,
1068            kind,
1069            key,
1070            residual,
1071            ..
1072        } => match kind {
1073            IndexKind::Unique => Box::new(IndexUniqueSource {
1074                latest,
1075                collection_id,
1076                index_name: index_name.clone(),
1077                pk: indexes
1078                    .unique_lookup(collection_id, &index_name, &key)
1079                    .map(|p| p.to_vec()),
1080                residual,
1081                done: false,
1082            }),
1083            IndexKind::NonUnique => Box::new(IndexNonUniqueSource {
1084                latest,
1085                collection_id,
1086                index_name: index_name.clone(),
1087                pks: indexes
1088                    .non_unique_lookup(collection_id, &index_name, &key)
1089                    .unwrap_or_default()
1090                    .into_iter(),
1091                residual,
1092            }),
1093        },
1094        Plan::IndexRangeLookup {
1095            collection_id,
1096            index_name,
1097            kind,
1098            key_range,
1099            residual,
1100            ..
1101        } => Box::new(index_range_source(
1102            indexes,
1103            latest,
1104            collection_id,
1105            index_name,
1106            kind,
1107            &key_range,
1108            residual,
1109        )),
1110        Plan::CollectionScan {
1111            collection_id,
1112            predicate,
1113            ..
1114        } => Box::new(ScanSource {
1115            it: latest.iter(),
1116            collection_id,
1117            predicate,
1118        }),
1119    };
1120
1121    // Build a sorted key source (potentially spilling to Temp segments).
1122    let spill_store = open_sorted_query_spill_store(path)?;
1123    #[cfg(feature = "tracing")]
1124    tracing::debug!(spill_path = %path.display(), "execute_query_order_by_spill");
1125    let spill = crate::spill::TempSpillFile::new(spill_store)?;
1126    let index_name_for_sort = match &plan {
1127        Plan::IndexLookup { index_name, .. } | Plan::IndexRangeLookup { index_name, .. } => {
1128            index_name.as_str()
1129        }
1130        Plan::CollectionScan { .. } => "",
1131    };
1132    let sort_source = Box::new(ExternalSortSource::new(
1133        spill,
1134        latest,
1135        base,
1136        col.id.0,
1137        order_by,
1138        index_name_for_sort,
1139    )?);
1140
1141    let mut source: Box<dyn RowSource + 'a> = sort_source;
1142    if let Some(n) = q.limit {
1143        source = Box::new(LimitOp::new(source, n));
1144    }
1145
1146    Ok(QueryRowIter {
1147        state: QueryRowIterState::Source { latest, source },
1148    })
1149}
1150
1151#[derive(Clone)]
1152struct SortItem {
1153    // `none_flag`: 0 for Some, 1 for None (so None sorts last on ascending).
1154    none_flag: u8,
1155    sort_key: Vec<u8>,
1156    key: RowKey,
1157}
1158
1159#[cfg(test)]
1160fn sort_item_for(
1161    latest: &crate::db::LatestMap,
1162    key: &RowKey,
1163    order_by: &OrderBy,
1164) -> Option<SortItem> {
1165    sort_item_for_result(latest, key, order_by, "").ok()
1166}
1167
1168fn sort_item_for_result(
1169    latest: &crate::db::LatestMap,
1170    key: &RowKey,
1171    order_by: &OrderBy,
1172    index_name: &str,
1173) -> Result<SortItem, DbError> {
1174    let (cid, pk) = key;
1175    let row =
1176        latest
1177            .get(&(cid.0, pk.clone()))
1178            .ok_or(DbError::Schema(SchemaError::IndexRowMissing {
1179                collection_id: cid.0,
1180                index_name: index_name.to_string(),
1181            }))?;
1182    let (none_flag, sort_key) = match scalar_at_path(row, &order_by.path) {
1183        None => (1u8, Vec::new()),
1184        Some(s) => (0u8, scalar_sort_key_bytes(&s)),
1185    };
1186    Ok(SortItem {
1187        none_flag,
1188        sort_key,
1189        key: (CollectionId(cid.0), pk.clone()),
1190    })
1191}
1192
1193fn scalar_sort_key_bytes(s: &ScalarValue) -> Vec<u8> {
1194    match s {
1195        ScalarValue::Bool(b) => vec![0, if *b { 1 } else { 0 }],
1196        ScalarValue::Int64(v) => {
1197            let u = (*v as u64) ^ 0x8000_0000_0000_0000u64;
1198            let mut out = vec![1];
1199            out.extend_from_slice(&u.to_be_bytes());
1200            out
1201        }
1202        ScalarValue::Uint64(v) => {
1203            let mut out = vec![2];
1204            out.extend_from_slice(&v.to_be_bytes());
1205            out
1206        }
1207        ScalarValue::Float64(v) => {
1208            let n = if *v == 0.0 { 0.0f64 } else { *v };
1209            let mut bits = n.to_bits();
1210            if bits & (1u64 << 63) != 0 {
1211                bits = !bits;
1212            } else {
1213                bits ^= 1u64 << 63;
1214            }
1215            let mut out = vec![3];
1216            out.extend_from_slice(&bits.to_be_bytes());
1217            out
1218        }
1219        ScalarValue::String(st) => {
1220            let mut out = vec![4];
1221            out.extend_from_slice(st.as_bytes());
1222            out
1223        }
1224        ScalarValue::Bytes(b) => {
1225            let mut out = vec![5];
1226            out.extend_from_slice(b);
1227            out
1228        }
1229        ScalarValue::Uuid(u) => {
1230            let mut out = vec![6];
1231            out.extend_from_slice(u);
1232            out
1233        }
1234        ScalarValue::Timestamp(t) => {
1235            let u = (*t as u64) ^ 0x8000_0000_0000_0000u64;
1236            let mut out = vec![7];
1237            out.extend_from_slice(&u.to_be_bytes());
1238            out
1239        }
1240    }
1241}
1242
1243fn cmp_sort_item(a: &SortItem, b: &SortItem, dir: OrderDirection) -> std::cmp::Ordering {
1244    let ord = a
1245        .none_flag
1246        .cmp(&b.none_flag)
1247        .then_with(|| a.sort_key.cmp(&b.sort_key))
1248        .then_with(|| a.key.1.cmp(&b.key.1));
1249    match dir {
1250        OrderDirection::Asc => ord,
1251        OrderDirection::Desc => ord.reverse(),
1252    }
1253}
1254
1255// Simple external sort: sort fixed-size runs, spill each run as one Temp segment,
1256// then k-way merge those runs without loading full run payloads into RAM.
1257struct ExternalSortSource<'a, S: Store = FileStore> {
1258    spill: crate::spill::TempSpillFile<S>,
1259    collection_id: u32,
1260    dir: OrderDirection,
1261    heap: std::collections::BinaryHeap<HeapItem>,
1262    runs_meta: Vec<RunMeta>,
1263    run_cursors: Vec<usize>,
1264    _latest: &'a crate::db::LatestMap,
1265}
1266
1267#[derive(Clone)]
1268struct RunMeta {
1269    offset: u64,
1270    payload_len: u64,
1271}
1272
1273type SpillSortRunItem = (u8, Vec<u8>, Vec<u8>);
1274
1275#[cfg(test)]
1276struct RunReader {
1277    buf: Vec<u8>,
1278    pos: usize,
1279}
1280
1281#[cfg(test)]
1282impl RunReader {
1283    fn new(buf: Vec<u8>) -> Self {
1284        Self { buf, pos: 0 }
1285    }
1286
1287    fn next_item(&mut self) -> Option<(u8, Vec<u8>, Vec<u8>)> {
1288        read_run_item_from_buf(&self.buf, &mut self.pos)
1289    }
1290}
1291
1292#[cfg(test)]
1293fn read_run_item_from_buf(buf: &[u8], pos: &mut usize) -> Option<(u8, Vec<u8>, Vec<u8>)> {
1294    fn read_u32(buf: &[u8], pos: &mut usize) -> Option<u32> {
1295        let b = buf.get(*pos..*pos + 4)?;
1296        *pos += 4;
1297        Some(u32::from_le_bytes([b[0], b[1], b[2], b[3]]))
1298    }
1299    let none_flag = *buf.get(*pos)?;
1300    *pos += 1;
1301    let key_len = read_u32(buf, pos)? as usize;
1302    let key = buf.get(*pos..*pos + key_len)?.to_vec();
1303    *pos += key_len;
1304    let pk_len = read_u32(buf, pos)? as usize;
1305    let pk = buf.get(*pos..*pos + pk_len)?.to_vec();
1306    *pos += pk_len;
1307    Some((none_flag, key, pk))
1308}
1309
1310fn read_spill_run_item<S: Store>(
1311    spill: &mut crate::spill::TempSpillFile<S>,
1312    meta: &RunMeta,
1313    pos: &mut usize,
1314) -> Result<Option<SpillSortRunItem>, DbError> {
1315    let payload_len = meta.payload_len as usize;
1316    if *pos >= payload_len {
1317        return Ok(None);
1318    }
1319    let mut one = [0u8; 1];
1320    spill.read_temp_payload_into(meta.offset, *pos as u64, &mut one)?;
1321    *pos += 1;
1322    let none_flag = one[0];
1323
1324    if *pos + 4 > payload_len {
1325        return Err(DbError::Query(crate::error::QueryError {
1326            message: "external sort spill segment truncated".into(),
1327        }));
1328    }
1329    let mut len_buf = [0u8; 4];
1330    spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
1331    *pos += 4;
1332    let key_len = u32::from_le_bytes(len_buf) as usize;
1333    crate::file_format::check_field_bytes_len(key_len).map_err(|e| match e {
1334        DbError::Format(fe) => DbError::Query(crate::error::QueryError {
1335            message: format!("external sort spill key: {fe}"),
1336        }),
1337        other => other,
1338    })?;
1339    if *pos + key_len > payload_len {
1340        return Err(DbError::Query(crate::error::QueryError {
1341            message: "external sort spill segment truncated".into(),
1342        }));
1343    }
1344
1345    let mut key = vec![0u8; key_len];
1346    spill.read_temp_payload_into(meta.offset, *pos as u64, &mut key)?;
1347    *pos += key_len;
1348
1349    if *pos + 4 > payload_len {
1350        return Err(DbError::Query(crate::error::QueryError {
1351            message: "external sort spill segment truncated".into(),
1352        }));
1353    }
1354    spill.read_temp_payload_into(meta.offset, *pos as u64, &mut len_buf)?;
1355    *pos += 4;
1356    let pk_len = u32::from_le_bytes(len_buf) as usize;
1357    crate::file_format::check_field_bytes_len(pk_len).map_err(|e| match e {
1358        DbError::Format(fe) => DbError::Query(crate::error::QueryError {
1359            message: format!("external sort spill pk: {fe}"),
1360        }),
1361        other => other,
1362    })?;
1363    if *pos + pk_len > payload_len {
1364        return Err(DbError::Query(crate::error::QueryError {
1365            message: "external sort spill segment truncated".into(),
1366        }));
1367    }
1368
1369    let mut pk = vec![0u8; pk_len];
1370    spill.read_temp_payload_into(meta.offset, *pos as u64, &mut pk)?;
1371    *pos += pk_len;
1372
1373    Ok(Some((none_flag, key, pk)))
1374}
1375
1376#[derive(Clone)]
1377struct HeapItem {
1378    run_idx: usize,
1379    none_flag: u8,
1380    sort_key: Vec<u8>,
1381    pk: Vec<u8>,
1382    dir: OrderDirection,
1383}
1384
1385impl PartialEq for HeapItem {
1386    fn eq(&self, other: &Self) -> bool {
1387        (self.none_flag, &self.sort_key, &self.pk) == (other.none_flag, &other.sort_key, &other.pk)
1388    }
1389}
1390impl Eq for HeapItem {}
1391
1392impl PartialOrd for HeapItem {
1393    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1394        Some(self.cmp(other))
1395    }
1396}
1397impl Ord for HeapItem {
1398    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1399        // BinaryHeap is max-heap; invert to get min-heap behavior.
1400        let a = SortItem {
1401            none_flag: self.none_flag,
1402            sort_key: self.sort_key.clone(),
1403            key: (CollectionId(0), self.pk.clone()),
1404        };
1405        let b = SortItem {
1406            none_flag: other.none_flag,
1407            sort_key: other.sort_key.clone(),
1408            key: (CollectionId(0), other.pk.clone()),
1409        };
1410        cmp_sort_item(&a, &b, self.dir).reverse()
1411    }
1412}
1413
1414impl<'a, S: Store> ExternalSortSource<'a, S> {
1415    fn flush_sorted_run(
1416        spill: &mut crate::spill::TempSpillFile<S>,
1417        runs_meta: &mut Vec<RunMeta>,
1418        run: &mut Vec<SortItem>,
1419        dir: OrderDirection,
1420    ) -> Result<(), DbError> {
1421        if run.is_empty() {
1422            return Ok(());
1423        }
1424        run.sort_by(|a, b| cmp_sort_item(a, b, dir));
1425        let payload = encode_run(run, dir);
1426        let off = spill.append_temp_segment(&payload)?;
1427        runs_meta.push(RunMeta {
1428            offset: off,
1429            payload_len: payload.len() as u64,
1430        });
1431        run.clear();
1432        Ok(())
1433    }
1434
1435    fn new(
1436        mut spill: crate::spill::TempSpillFile<S>,
1437        latest: &'a crate::db::LatestMap,
1438        mut input: Box<dyn RowSource + 'a>,
1439        collection_id: u32,
1440        order_by: OrderBy,
1441        index_name: &str,
1442    ) -> Result<Self, DbError> {
1443        const RUN_KEYS: usize = 2048;
1444
1445        let dir = order_by.direction;
1446        let mut runs_meta: Vec<RunMeta> = Vec::new();
1447        let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
1448
1449        while let Some(rk) = input.next_key() {
1450            let rk = rk?;
1451            let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
1452            run.push(item);
1453            if run.len() >= RUN_KEYS {
1454                Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1455            }
1456        }
1457
1458        Self::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1459
1460        // Seed merge heap by reading only the first item from each spilled run.
1461        let mut run_cursors = vec![0usize; runs_meta.len()];
1462        let mut heap = std::collections::BinaryHeap::new();
1463        for (i, m) in runs_meta.iter().enumerate() {
1464            match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
1465                Ok(Some((none_flag, sort_key, pk))) => {
1466                    heap.push(HeapItem {
1467                        run_idx: i,
1468                        none_flag,
1469                        sort_key,
1470                        pk: pk.clone(),
1471                        dir,
1472                    });
1473                }
1474                Ok(None) => {}
1475                Err(e) => return Err(e),
1476            }
1477        }
1478
1479        Ok(Self {
1480            spill,
1481            collection_id,
1482            dir,
1483            heap,
1484            runs_meta,
1485            run_cursors,
1486            _latest: latest,
1487        })
1488    }
1489}
1490
1491fn encode_run(run: &[SortItem], _dir: OrderDirection) -> Vec<u8> {
1492    let mut out = Vec::new();
1493    for it in run {
1494        out.push(it.none_flag);
1495        out.extend_from_slice(&(it.sort_key.len() as u32).to_le_bytes());
1496        out.extend_from_slice(&it.sort_key);
1497        out.extend_from_slice(&(it.key.1.len() as u32).to_le_bytes());
1498        out.extend_from_slice(&it.key.1);
1499    }
1500    out
1501}
1502
1503impl<'a, S: Store> RowSource for ExternalSortSource<'a, S> {
1504    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
1505        let top = self.heap.pop()?;
1506        let run_idx = top.run_idx;
1507        let meta = self.runs_meta[run_idx].clone();
1508        match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
1509            Ok(Some((none_flag, sort_key, pk))) => {
1510                self.heap.push(HeapItem {
1511                    run_idx,
1512                    none_flag,
1513                    sort_key,
1514                    pk: pk.clone(),
1515                    dir: self.dir,
1516                });
1517            }
1518            Ok(None) => {}
1519            Err(e) => return Some(Err(e)),
1520        }
1521        Some(Ok((CollectionId(self.collection_id), top.pk)))
1522    }
1523}
1524
1525struct ExternalSortSourceOwned<S: Store = FileStore> {
1526    spill: crate::spill::TempSpillFile<S>,
1527    collection_id: u32,
1528    dir: OrderDirection,
1529    heap: std::collections::BinaryHeap<HeapItem>,
1530    runs_meta: Vec<RunMeta>,
1531    run_cursors: Vec<usize>,
1532    _snapshot: Arc<SharedDbState>,
1533}
1534
1535impl<S: Store> ExternalSortSourceOwned<S> {
1536    fn new(
1537        mut spill: crate::spill::TempSpillFile<S>,
1538        snapshot: Arc<SharedDbState>,
1539        mut input: Box<dyn RowSource + 'static>,
1540        collection_id: u32,
1541        order_by: OrderBy,
1542        index_name: &str,
1543    ) -> Result<Self, DbError> {
1544        const RUN_KEYS: usize = 2048;
1545
1546        let dir = order_by.direction;
1547        let mut runs_meta: Vec<RunMeta> = Vec::new();
1548        let mut run: Vec<SortItem> = Vec::with_capacity(RUN_KEYS);
1549        let latest = &snapshot.latest;
1550
1551        while let Some(rk) = input.next_key() {
1552            let rk = rk?;
1553            let item = sort_item_for_result(latest, &rk, &order_by, index_name)?;
1554            run.push(item);
1555            if run.len() >= RUN_KEYS {
1556                ExternalSortSource::<S>::flush_sorted_run(
1557                    &mut spill,
1558                    &mut runs_meta,
1559                    &mut run,
1560                    dir,
1561                )?;
1562            }
1563        }
1564
1565        ExternalSortSource::<S>::flush_sorted_run(&mut spill, &mut runs_meta, &mut run, dir)?;
1566
1567        let mut run_cursors = vec![0usize; runs_meta.len()];
1568        let mut heap = std::collections::BinaryHeap::new();
1569        for (i, m) in runs_meta.iter().enumerate() {
1570            match read_spill_run_item(&mut spill, m, &mut run_cursors[i]) {
1571                Ok(Some((none_flag, sort_key, pk))) => {
1572                    heap.push(HeapItem {
1573                        run_idx: i,
1574                        none_flag,
1575                        sort_key,
1576                        pk: pk.clone(),
1577                        dir,
1578                    });
1579                }
1580                Ok(None) => {}
1581                Err(e) => return Err(e),
1582            }
1583        }
1584
1585        Ok(Self {
1586            spill,
1587            collection_id,
1588            dir,
1589            heap,
1590            runs_meta,
1591            run_cursors,
1592            _snapshot: snapshot,
1593        })
1594    }
1595}
1596
1597impl<S: Store> RowSource for ExternalSortSourceOwned<S> {
1598    fn next_key(&mut self) -> Option<Result<RowKey, DbError>> {
1599        let top = self.heap.pop()?;
1600        let run_idx = top.run_idx;
1601        let meta = self.runs_meta[run_idx].clone();
1602        match read_spill_run_item(&mut self.spill, &meta, &mut self.run_cursors[run_idx]) {
1603            Ok(Some((none_flag, sort_key, pk))) => {
1604                self.heap.push(HeapItem {
1605                    run_idx,
1606                    none_flag,
1607                    sort_key,
1608                    pk: pk.clone(),
1609                    dir: self.dir,
1610                });
1611            }
1612            Ok(None) => {}
1613            Err(e) => return Some(Err(e)),
1614        }
1615        Some(Ok((CollectionId(self.collection_id), top.pk)))
1616    }
1617}
1618
1619fn plan_query(
1620    collection: CollectionId,
1621    indexes: &[crate::schema::IndexDef],
1622    query: &Query,
1623) -> Plan {
1624    let Some(pred) = query.predicate.clone() else {
1625        return Plan::CollectionScan {
1626            collection_id: collection.0,
1627            predicate: None,
1628            limit: query.limit,
1629            order_by: query.order_by.clone(),
1630        };
1631    };
1632
1633    let (best, residual) = match choose_index(indexes, &pred) {
1634        None => (None, Some(pred)),
1635        Some(choice) => {
1636            let used = match &choice {
1637                IndexChoice::Eq { used, .. } | IndexChoice::Range { used, .. } => used.clone(),
1638            };
1639            let residual = remove_used_predicate(pred, used);
1640            (Some(choice), residual)
1641        }
1642    };
1643
1644    if let Some(choice) = best {
1645        match choice {
1646            IndexChoice::Eq { idx, value, .. } => Plan::IndexLookup {
1647                collection_id: collection.0,
1648                index_name: idx.name.clone(),
1649                kind: idx.kind,
1650                key: value.canonical_key_bytes(),
1651                residual,
1652                limit: query.limit,
1653                order_by: query.order_by.clone(),
1654            },
1655            IndexChoice::Range { idx, key_range, .. } => Plan::IndexRangeLookup {
1656                collection_id: collection.0,
1657                index_name: idx.name.clone(),
1658                kind: idx.kind,
1659                key_range,
1660                residual,
1661                limit: query.limit,
1662                order_by: query.order_by.clone(),
1663            },
1664        }
1665    } else {
1666        Plan::CollectionScan {
1667            collection_id: collection.0,
1668            predicate: residual,
1669            limit: query.limit,
1670            order_by: query.order_by.clone(),
1671        }
1672    }
1673}
1674
1675#[derive(Debug, Clone, PartialEq)]
1676enum IndexChoice<'a> {
1677    Eq {
1678        idx: &'a crate::schema::IndexDef,
1679        value: ScalarValue,
1680        used: Predicate,
1681    },
1682    Range {
1683        idx: &'a crate::schema::IndexDef,
1684        key_range: IndexKeyRange,
1685        used: Predicate,
1686    },
1687}
1688
1689fn is_range_indexable(value: &ScalarValue) -> bool {
1690    matches!(value, ScalarValue::Int64(_) | ScalarValue::String(_))
1691}
1692
1693fn merge_lo_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
1694    match &current.0 {
1695        None => *current = (Some(value), inclusive),
1696        Some(existing) => match scalar_partial_cmp(&value, existing) {
1697            Some(std::cmp::Ordering::Greater) => *current = (Some(value), inclusive),
1698            Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
1699            _ => {}
1700        },
1701    }
1702}
1703
1704fn merge_hi_bound(current: &mut (Option<ScalarValue>, bool), value: ScalarValue, inclusive: bool) {
1705    match &current.0 {
1706        None => *current = (Some(value), inclusive),
1707        Some(existing) => match scalar_partial_cmp(&value, existing) {
1708            Some(std::cmp::Ordering::Less) => *current = (Some(value), inclusive),
1709            Some(std::cmp::Ordering::Equal) if !inclusive => current.1 = false,
1710            _ => {}
1711        },
1712    }
1713}
1714
1715fn extract_range_on_path(path: &FieldPath, pred: &Predicate) -> Option<IndexKeyRange> {
1716    let mut lo: (Option<ScalarValue>, bool) = (None, true);
1717    let mut hi: (Option<ScalarValue>, bool) = (None, true);
1718    let mut any = false;
1719
1720    let mut visit = |p: &Predicate| match p {
1721        Predicate::Gte { path: pp, value } if pp == path && is_range_indexable(value) => {
1722            merge_lo_bound(&mut lo, value.clone(), true);
1723            any = true;
1724        }
1725        Predicate::Gt { path: pp, value } if pp == path && is_range_indexable(value) => {
1726            merge_lo_bound(&mut lo, value.clone(), false);
1727            any = true;
1728        }
1729        Predicate::Lte { path: pp, value } if pp == path && is_range_indexable(value) => {
1730            merge_hi_bound(&mut hi, value.clone(), true);
1731            any = true;
1732        }
1733        Predicate::Lt { path: pp, value } if pp == path && is_range_indexable(value) => {
1734            merge_hi_bound(&mut hi, value.clone(), false);
1735            any = true;
1736        }
1737        _ => {}
1738    };
1739
1740    match pred {
1741        Predicate::Gte { .. }
1742        | Predicate::Gt { .. }
1743        | Predicate::Lte { .. }
1744        | Predicate::Lt { .. } => visit(pred),
1745        Predicate::And(items) => {
1746            for item in items {
1747                visit(item);
1748            }
1749        }
1750        _ => return None,
1751    }
1752
1753    if !any {
1754        return None;
1755    }
1756    Some(IndexKeyRange {
1757        lo: lo.0,
1758        lo_inclusive: lo.1,
1759        hi: hi.0,
1760        hi_inclusive: hi.1,
1761    })
1762}
1763
1764fn range_used_predicate(path: &FieldPath, pred: &Predicate) -> Option<Predicate> {
1765    match pred {
1766        Predicate::Gte { path: p, .. }
1767        | Predicate::Gt { path: p, .. }
1768        | Predicate::Lte { path: p, .. }
1769        | Predicate::Lt { path: p, .. }
1770            if p == path =>
1771        {
1772            Some(pred.clone())
1773        }
1774        Predicate::And(items) => {
1775            let used: Vec<Predicate> = items
1776                .iter()
1777                .filter(|p| {
1778                    matches!(
1779                        p,
1780                        Predicate::Gte { path: pp, .. }
1781                            | Predicate::Gt { path: pp, .. }
1782                            | Predicate::Lte { path: pp, .. }
1783                            | Predicate::Lt { path: pp, .. } if pp == path
1784                    )
1785                })
1786                .cloned()
1787                .collect();
1788            match used.len() {
1789                0 => None,
1790                1 => Some(used.into_iter().next().unwrap()),
1791                _ => Some(Predicate::And(used)),
1792            }
1793        }
1794        _ => None,
1795    }
1796}
1797
1798fn try_range_index<'a>(
1799    indexes: &'a [crate::schema::IndexDef],
1800    pred: &Predicate,
1801) -> Option<IndexChoice<'a>> {
1802    for idx in indexes {
1803        if let Some(range) = extract_range_on_path(&idx.path, pred) {
1804            if let Some(used) = range_used_predicate(&idx.path, pred) {
1805                return Some(IndexChoice::Range {
1806                    idx,
1807                    key_range: range,
1808                    used,
1809                });
1810            }
1811        }
1812    }
1813    None
1814}
1815
1816fn choose_index<'a>(
1817    indexes: &'a [crate::schema::IndexDef],
1818    pred: &Predicate,
1819) -> Option<IndexChoice<'a>> {
1820    match pred {
1821        Predicate::Eq { path, value } => {
1822            indexes
1823                .iter()
1824                .find(|idx| &idx.path == path)
1825                .map(|idx| IndexChoice::Eq {
1826                    idx,
1827                    value: value.clone(),
1828                    used: pred.clone(),
1829                })
1830        }
1831        Predicate::Lt { .. }
1832        | Predicate::Lte { .. }
1833        | Predicate::Gt { .. }
1834        | Predicate::Gte { .. } => try_range_index(indexes, pred),
1835        Predicate::Or(_) => None,
1836        Predicate::And(items) => {
1837            let range = try_range_index(indexes, pred);
1838            let mut unique_eq: Option<IndexChoice<'a>> = None;
1839            let mut any_eq: Option<IndexChoice<'a>> = None;
1840            for p in items {
1841                if let Some(IndexChoice::Eq { idx, value, used }) = choose_index(indexes, p) {
1842                    if idx.kind == IndexKind::Unique {
1843                        unique_eq = Some(IndexChoice::Eq { idx, value, used });
1844                    } else if any_eq.is_none() {
1845                        any_eq = Some(IndexChoice::Eq { idx, value, used });
1846                    }
1847                }
1848            }
1849            if let Some(u) = unique_eq {
1850                return Some(u);
1851            }
1852            if let Some(r) = range {
1853                return Some(r);
1854            }
1855            any_eq
1856        }
1857    }
1858}
1859
1860fn remove_used_predicate(pred: Predicate, used: Predicate) -> Option<Predicate> {
1861    if pred == used {
1862        return None;
1863    }
1864    match pred {
1865        Predicate::And(items) => {
1866            let mut out: Vec<Predicate> = items.into_iter().filter(|p| p != &used).collect();
1867            match out.len() {
1868                0 => None,
1869                1 => Some(out.remove(0)),
1870                _ => Some(Predicate::And(out)),
1871            }
1872        }
1873        _ => Some(pred),
1874    }
1875}
1876
1877fn eval_predicate(row: &BTreeMap<String, RowValue>, pred: &Predicate) -> bool {
1878    match pred {
1879        Predicate::Eq { path, value } => scalar_at_path(row, path)
1880            .map(|s| &s == value)
1881            .unwrap_or(false),
1882        Predicate::Lt { path, value } => scalar_at_path(row, path)
1883            .and_then(|s| scalar_partial_cmp(&s, value))
1884            .map(|o| o.is_lt())
1885            .unwrap_or(false),
1886        Predicate::Lte { path, value } => scalar_at_path(row, path)
1887            .and_then(|s| scalar_partial_cmp(&s, value))
1888            .map(|o| o.is_lt() || o.is_eq())
1889            .unwrap_or(false),
1890        Predicate::Gt { path, value } => scalar_at_path(row, path)
1891            .and_then(|s| scalar_partial_cmp(&s, value))
1892            .map(|o| o.is_gt())
1893            .unwrap_or(false),
1894        Predicate::Gte { path, value } => scalar_at_path(row, path)
1895            .and_then(|s| scalar_partial_cmp(&s, value))
1896            .map(|o| o.is_gt() || o.is_eq())
1897            .unwrap_or(false),
1898        Predicate::And(items) => items.iter().all(|p| eval_predicate(row, p)),
1899        Predicate::Or(items) => items.iter().any(|p| eval_predicate(row, p)),
1900    }
1901}
1902
1903fn apply_order_by_and_limit(
1904    rows: &mut Vec<BTreeMap<String, RowValue>>,
1905    order_by: Option<&OrderBy>,
1906    limit: Option<usize>,
1907    pk_field: Option<&str>,
1908) {
1909    const TOPK_ORDER_BY_LIMIT: usize = 1024;
1910
1911    if let Some(ob) = order_by {
1912        let pk_path: Option<FieldPath> =
1913            pk_field.map(|name| FieldPath(vec![Cow::Owned(name.to_string())]));
1914
1915        if let Some(n) = limit {
1916            if n <= TOPK_ORDER_BY_LIMIT && rows.len() > n {
1917                topk_order_by(rows, ob, n, pk_path.as_ref());
1918                return;
1919            }
1920        }
1921
1922        rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path.as_ref()));
1923    }
1924    if let Some(n) = limit {
1925        rows.truncate(n);
1926    }
1927}
1928
1929fn compare_rows_for_order(
1930    a: &BTreeMap<String, RowValue>,
1931    b: &BTreeMap<String, RowValue>,
1932    ob: &OrderBy,
1933    pk_path: Option<&FieldPath>,
1934) -> std::cmp::Ordering {
1935    let av = scalar_at_path(a, &ob.path);
1936    let bv = scalar_at_path(b, &ob.path);
1937    let mut ord = match (av, bv) {
1938        (None, None) => std::cmp::Ordering::Equal,
1939        (None, Some(_)) => std::cmp::Ordering::Greater,
1940        (Some(_), None) => std::cmp::Ordering::Less,
1941        (Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
1942    };
1943    if ord == std::cmp::Ordering::Equal {
1944        if let Some(path) = pk_path {
1945            let apk = scalar_at_path(a, path);
1946            let bpk = scalar_at_path(b, path);
1947            ord = match (apk, bpk) {
1948                (None, None) => std::cmp::Ordering::Equal,
1949                (None, Some(_)) => std::cmp::Ordering::Greater,
1950                (Some(_), None) => std::cmp::Ordering::Less,
1951                (Some(x), Some(y)) => scalar_sort_key_bytes(&x).cmp(&scalar_sort_key_bytes(&y)),
1952            };
1953        }
1954    }
1955    match ob.direction {
1956        OrderDirection::Asc => ord,
1957        OrderDirection::Desc => ord.reverse(),
1958    }
1959}
1960
1961fn topk_order_by(
1962    rows: &mut Vec<BTreeMap<String, RowValue>>,
1963    ob: &OrderBy,
1964    k: usize,
1965    pk_path: Option<&FieldPath>,
1966) {
1967    if rows.len() <= k {
1968        rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
1969        return;
1970    }
1971    rows.select_nth_unstable_by(k - 1, |a, b| compare_rows_for_order(a, b, ob, pk_path));
1972    rows.truncate(k);
1973    rows.sort_by(|a, b| compare_rows_for_order(a, b, ob, pk_path));
1974}
1975
1976fn scalar_partial_cmp(a: &ScalarValue, b: &ScalarValue) -> Option<std::cmp::Ordering> {
1977    use ScalarValue::*;
1978    match (a, b) {
1979        (Bool(x), Bool(y)) => Some(x.cmp(y)),
1980        (Int64(x), Int64(y)) => Some(x.cmp(y)),
1981        (Uint64(x), Uint64(y)) => Some(x.cmp(y)),
1982        (Float64(x), Float64(y)) => x.partial_cmp(y),
1983        (String(x), String(y)) => Some(x.cmp(y)),
1984        (Bytes(x), Bytes(y)) => Some(x.cmp(y)),
1985        (Uuid(x), Uuid(y)) => Some(x.cmp(y)),
1986        (Timestamp(x), Timestamp(y)) => Some(x.cmp(y)),
1987        _ => None,
1988    }
1989}
1990
1991#[cfg(test)]
1992mod tests {
1993    include!(concat!(
1994        env!("CARGO_MANIFEST_DIR"),
1995        "/tests/unit/src_query_planner_tests.rs"
1996    ));
1997}