1use super::value_compare::total_compare_values;
6use crate::storage::schema::Value;
7use std::cmp::Ordering;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11pub enum Direction {
12 #[default]
14 Asc,
15 Desc,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum NullsOrder {
22 First,
24 #[default]
26 Last,
27}
28
29#[derive(Debug, Clone)]
31pub struct SortKey {
32 pub column: String,
34 pub direction: Direction,
36 pub nulls: NullsOrder,
38}
39
40impl SortKey {
41 pub fn asc(column: impl Into<String>) -> Self {
43 Self {
44 column: column.into(),
45 direction: Direction::Asc,
46 nulls: NullsOrder::Last,
47 }
48 }
49
50 pub fn desc(column: impl Into<String>) -> Self {
52 Self {
53 column: column.into(),
54 direction: Direction::Desc,
55 nulls: NullsOrder::Last,
56 }
57 }
58
59 pub fn nulls_first(mut self) -> Self {
61 self.nulls = NullsOrder::First;
62 self
63 }
64
65 pub fn nulls_last(mut self) -> Self {
67 self.nulls = NullsOrder::Last;
68 self
69 }
70
71 pub fn compare(&self, a: &Value, b: &Value) -> Ordering {
73 match (a, b) {
75 (Value::Null, Value::Null) => Ordering::Equal,
76 (Value::Null, _) => match self.nulls {
77 NullsOrder::First => Ordering::Less,
78 NullsOrder::Last => Ordering::Greater,
79 },
80 (_, Value::Null) => match self.nulls {
81 NullsOrder::First => Ordering::Greater,
82 NullsOrder::Last => Ordering::Less,
83 },
84 _ => {
85 let base_order = total_compare_values(a, b);
86 match self.direction {
87 Direction::Asc => base_order,
88 Direction::Desc => base_order.reverse(),
89 }
90 }
91 }
92 }
93}
94
95#[derive(Debug, Clone, Default)]
97pub struct OrderBy {
98 keys: Vec<SortKey>,
100}
101
102impl OrderBy {
103 pub fn new() -> Self {
105 Self { keys: Vec::new() }
106 }
107
108 pub fn asc(mut self, column: impl Into<String>) -> Self {
110 self.keys.push(SortKey::asc(column));
111 self
112 }
113
114 pub fn desc(mut self, column: impl Into<String>) -> Self {
116 self.keys.push(SortKey::desc(column));
117 self
118 }
119
120 pub fn then(mut self, key: SortKey) -> Self {
122 self.keys.push(key);
123 self
124 }
125
126 pub fn is_empty(&self) -> bool {
128 self.keys.is_empty()
129 }
130
131 pub fn keys(&self) -> &[SortKey] {
133 &self.keys
134 }
135
136 pub fn compare<R>(&self, a: &R, b: &R, get_value: impl Fn(&R, &str) -> Value) -> Ordering {
140 for key in &self.keys {
141 let val_a = get_value(a, &key.column);
142 let val_b = get_value(b, &key.column);
143 let ord = key.compare(&val_a, &val_b);
144 if ord != Ordering::Equal {
145 return ord;
146 }
147 }
148 Ordering::Equal
149 }
150
151 pub fn sort_rows<R>(&self, rows: &mut [R], get_value: impl Fn(&R, &str) -> Value) {
153 if self.is_empty() {
154 return;
155 }
156 rows.sort_by(|a, b| self.compare(a, b, &get_value));
157 }
158
159 pub fn dispatch_sort<R>(
178 &self,
179 rows: Vec<R>,
180 prefix_keys: usize,
181 limit: Option<usize>,
182 get_value: impl Fn(&R, &str) -> Value,
183 ) -> Vec<R> {
184 if self.is_empty() {
185 return rows;
186 }
187 match (prefix_keys, limit) {
188 (0, None) => {
189 let mut all = rows;
190 self.sort_rows(&mut all, &get_value);
191 all
192 }
193 (0, Some(k)) => {
194 let mut all = rows;
195 self.sort_rows(&mut all, &get_value);
196 all.truncate(k);
197 all
198 }
199 (_, Some(k)) => self.incremental_sort_top_k(rows, prefix_keys, k, get_value),
200 (_, None) => {
201 self.incremental_sort_top_k(rows, prefix_keys, usize::MAX, get_value)
204 }
205 }
206 }
207
208 pub fn incremental_sort_top_k<R>(
236 &self,
237 rows: Vec<R>,
238 prefix_keys: usize,
239 k: usize,
240 get_value: impl Fn(&R, &str) -> Value,
241 ) -> Vec<R> {
242 if k == 0 || rows.is_empty() {
243 return Vec::new();
244 }
245 if prefix_keys == 0 {
247 let mut all = rows;
248 self.sort_rows(&mut all, &get_value);
249 all.truncate(k);
250 return all;
251 }
252 if prefix_keys >= self.keys.len() {
254 let mut out = rows;
255 out.truncate(k);
256 return out;
257 }
258
259 let suffix_keys = &self.keys[prefix_keys..];
260 let prefix_slice = &self.keys[..prefix_keys];
261 let mut out: Vec<R> = Vec::with_capacity(k);
262 let mut group: Vec<R> = Vec::new();
263
264 let flush =
268 |group: &mut Vec<R>, out: &mut Vec<R>, get_value: &dyn Fn(&R, &str) -> Value| -> bool {
269 if group.is_empty() {
270 return false;
271 }
272 group.sort_by(|a, b| {
275 for key in suffix_keys {
276 let ord =
277 key.compare(&get_value(a, &key.column), &get_value(b, &key.column));
278 if ord != Ordering::Equal {
279 return ord;
280 }
281 }
282 Ordering::Equal
283 });
284 let remaining = k - out.len();
285 if group.len() <= remaining {
286 out.append(group);
287 } else {
288 out.extend(group.drain(..remaining));
289 group.clear();
290 }
291 out.len() >= k
292 };
293
294 let prefix_eq = |a: &R, b: &R| -> bool {
297 for key in prefix_slice {
298 if key.compare(&get_value(a, &key.column), &get_value(b, &key.column))
299 != Ordering::Equal
300 {
301 return false;
302 }
303 }
304 true
305 };
306
307 let get_value_dyn: &dyn Fn(&R, &str) -> Value = &get_value;
310
311 for row in rows {
312 if let Some(first) = group.first() {
313 if !prefix_eq(first, &row) && flush(&mut group, &mut out, get_value_dyn) {
314 return out;
315 }
316 }
317 group.push(row);
318 }
319 flush(&mut group, &mut out, get_value_dyn);
321 out
322 }
323
324 pub fn referenced_columns(&self) -> Vec<&str> {
326 self.keys.iter().map(|k| k.column.as_str()).collect()
327 }
328}
329
330#[derive(Debug, Clone, Default)]
332pub struct QueryLimits {
333 pub limit: Option<usize>,
335 pub offset: usize,
337}
338
339impl QueryLimits {
340 pub fn none() -> Self {
342 Self {
343 limit: None,
344 offset: 0,
345 }
346 }
347
348 pub fn limit(mut self, n: usize) -> Self {
350 self.limit = Some(n);
351 self
352 }
353
354 pub fn offset(mut self, n: usize) -> Self {
356 self.offset = n;
357 self
358 }
359
360 pub fn apply<T>(&self, mut items: Vec<T>) -> Vec<T> {
362 if self.offset >= items.len() {
363 return Vec::new();
364 }
365
366 let start = self.offset;
367 let end = match self.limit {
368 Some(limit) => (start + limit).min(items.len()),
369 None => items.len(),
370 };
371
372 items.drain(start..end).collect()
373 }
374
375 pub fn apply_iter<T: 'static, I: Iterator<Item = T> + 'static>(
377 &self,
378 iter: I,
379 ) -> Box<dyn Iterator<Item = T>> {
380 let iter = iter.skip(self.offset);
381 match self.limit {
382 Some(limit) => Box::new(iter.take(limit)),
383 None => Box::new(iter),
384 }
385 }
386
387 pub fn range(&self, total: usize) -> std::ops::Range<usize> {
389 let start = self.offset.min(total);
390 let end = match self.limit {
391 Some(limit) => (start + limit).min(total),
392 None => total,
393 };
394 start..end
395 }
396}
397
398pub struct TopK<T> {
400 k: usize,
402 items: Vec<T>,
404 compare: Box<dyn Fn(&T, &T) -> Ordering>,
406}
407
408impl<T> TopK<T> {
409 pub fn new<F>(k: usize, compare: F) -> Self
411 where
412 F: Fn(&T, &T) -> Ordering + 'static,
413 {
414 Self {
415 k,
416 items: Vec::with_capacity(k + 1),
417 compare: Box::new(compare),
418 }
419 }
420
421 pub fn push(&mut self, item: T) {
423 self.items.push(item);
424 self.items.sort_by(&self.compare);
425 if self.items.len() > self.k {
426 self.items.pop();
427 }
428 }
429
430 pub fn items(&self) -> &[T] {
432 &self.items
433 }
434
435 pub fn into_items(self) -> Vec<T> {
437 self.items
438 }
439
440 pub fn len(&self) -> usize {
442 self.items.len()
443 }
444
445 pub fn is_empty(&self) -> bool {
447 self.items.is_empty()
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use super::*;
454
455 #[test]
456 fn test_sort_key_asc() {
457 let key = SortKey::asc("age");
458
459 assert_eq!(
460 key.compare(&Value::Integer(25), &Value::Integer(30)),
461 Ordering::Less
462 );
463 assert_eq!(
464 key.compare(&Value::Integer(30), &Value::Integer(25)),
465 Ordering::Greater
466 );
467 assert_eq!(
468 key.compare(&Value::Integer(25), &Value::Integer(25)),
469 Ordering::Equal
470 );
471 }
472
473 #[test]
474 fn test_sort_key_desc() {
475 let key = SortKey::desc("age");
476
477 assert_eq!(
478 key.compare(&Value::Integer(25), &Value::Integer(30)),
479 Ordering::Greater
480 );
481 assert_eq!(
482 key.compare(&Value::Integer(30), &Value::Integer(25)),
483 Ordering::Less
484 );
485 }
486
487 #[test]
488 fn test_sort_key_nulls_first() {
489 let key = SortKey::asc("age").nulls_first();
490
491 assert_eq!(
492 key.compare(&Value::Null, &Value::Integer(30)),
493 Ordering::Less
494 );
495 assert_eq!(
496 key.compare(&Value::Integer(30), &Value::Null),
497 Ordering::Greater
498 );
499 }
500
501 #[test]
502 fn test_sort_key_nulls_last() {
503 let key = SortKey::asc("age").nulls_last();
504
505 assert_eq!(
506 key.compare(&Value::Null, &Value::Integer(30)),
507 Ordering::Greater
508 );
509 assert_eq!(
510 key.compare(&Value::Integer(30), &Value::Null),
511 Ordering::Less
512 );
513 }
514
515 #[test]
516 fn test_order_by_single() {
517 let order = OrderBy::new().asc("age");
518
519 type Row = (String, i64);
521 let get_value = |row: &Row, col: &str| -> Value {
522 match col {
523 "name" => Value::text(row.0.clone()),
524 "age" => Value::Integer(row.1),
525 _ => Value::Null,
526 }
527 };
528
529 let mut rows = vec![
530 ("Charlie".to_string(), 30),
531 ("Alice".to_string(), 25),
532 ("Bob".to_string(), 35),
533 ];
534
535 order.sort_rows(&mut rows, get_value);
536
537 assert_eq!(rows[0].0, "Alice");
538 assert_eq!(rows[1].0, "Charlie");
539 assert_eq!(rows[2].0, "Bob");
540 }
541
542 #[test]
543 fn test_order_by_multiple() {
544 let order = OrderBy::new().asc("department").desc("salary");
545
546 type Row = (String, String, i64); let get_value = |row: &Row, col: &str| -> Value {
548 match col {
549 "name" => Value::text(row.0.clone()),
550 "department" => Value::text(row.1.clone()),
551 "salary" => Value::Integer(row.2),
552 _ => Value::Null,
553 }
554 };
555
556 let mut rows = vec![
557 ("Alice".to_string(), "Engineering".to_string(), 100000),
558 ("Bob".to_string(), "Engineering".to_string(), 120000),
559 ("Charlie".to_string(), "Sales".to_string(), 90000),
560 ("Diana".to_string(), "Engineering".to_string(), 110000),
561 ];
562
563 order.sort_rows(&mut rows, get_value);
564
565 assert_eq!(rows[0].0, "Bob"); assert_eq!(rows[1].0, "Diana"); assert_eq!(rows[2].0, "Alice"); assert_eq!(rows[3].0, "Charlie"); }
571
572 #[test]
573 fn test_query_limits_apply() {
574 let items: Vec<i32> = (0..10).collect();
575
576 let limited = QueryLimits::none().limit(3).apply(items.clone());
578 assert_eq!(limited, vec![0, 1, 2]);
579
580 let offset = QueryLimits::none().offset(3).apply(items.clone());
582 assert_eq!(offset, vec![3, 4, 5, 6, 7, 8, 9]);
583
584 let both = QueryLimits::none().offset(2).limit(3).apply(items.clone());
586 assert_eq!(both, vec![2, 3, 4]);
587
588 let empty = QueryLimits::none().offset(20).apply(items.clone());
590 assert!(empty.is_empty());
591 }
592
593 #[test]
594 fn test_query_limits_range() {
595 let limits = QueryLimits::none().offset(5).limit(10);
596
597 assert_eq!(limits.range(100), 5..15);
598 assert_eq!(limits.range(8), 5..8); assert_eq!(limits.range(3), 3..3); }
601
602 #[test]
603 fn test_top_k() {
604 let mut topk = TopK::new(3, |a: &i32, b: &i32| a.cmp(b));
605
606 topk.push(5);
607 topk.push(2);
608 topk.push(8);
609 topk.push(1);
610 topk.push(9);
611
612 let items = topk.into_items();
613 assert_eq!(items, vec![1, 2, 5]); }
615
616 #[test]
617 fn test_top_k_desc() {
618 let mut topk = TopK::new(3, |a: &i32, b: &i32| b.cmp(a)); topk.push(5);
621 topk.push(2);
622 topk.push(8);
623 topk.push(1);
624 topk.push(9);
625
626 let items = topk.into_items();
627 assert_eq!(items, vec![9, 8, 5]); }
629
630 #[test]
631 fn test_compare_cross_type() {
632 assert_eq!(
633 total_compare_values(&Value::Integer(10), &Value::Float(10.0)),
634 Ordering::Equal
635 );
636 assert_eq!(
637 total_compare_values(&Value::Integer(9), &Value::Float(10.0)),
638 Ordering::Less
639 );
640 }
641
642 #[test]
643 fn test_order_by_empty() {
644 let order = OrderBy::new();
645 assert!(order.is_empty());
646
647 let mut rows = vec![3, 1, 2];
648 order.sort_rows(&mut rows, |r, _| Value::Integer(*r));
649 assert_eq!(rows, vec![3, 1, 2]);
651 }
652
653 #[test]
654 fn test_sort_key_text() {
655 let key = SortKey::asc("name");
656
657 assert_eq!(
658 key.compare(
659 &Value::text("Alice".to_string()),
660 &Value::text("Bob".to_string())
661 ),
662 Ordering::Less
663 );
664 }
665
666 #[test]
667 fn test_sort_key_timestamp() {
668 let key = SortKey::desc("created_at");
669
670 assert_eq!(
672 key.compare(&Value::Timestamp(1000), &Value::Timestamp(500)),
673 Ordering::Less );
675 }
676}