1use crate::{
7 db::{
8 PersistedRow,
9 executor::{
10 ExecutablePlan, LoadExecutor, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryRequest,
12 },
13 query::{
14 api::ResponseCardinalityExt,
15 builder::{
16 AggregateExpr,
17 aggregate::{exists, first, last, max, min},
18 },
19 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
20 fluent::load::FluentLoadQuery,
21 intent::QueryError,
22 plan::AggregateKind,
23 },
24 response::EntityResponse,
25 },
26 error::InternalError,
27 traits::EntityValue,
28 types::{Decimal, Id},
29 value::Value,
30};
31
32type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
33
34impl<E> FluentLoadQuery<'_, E>
35where
36 E: PersistedRow,
37{
38 pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
44 where
45 E: EntityValue,
46 {
47 self.ensure_non_paged_mode_ready()?;
48
49 self.session.execute_query(self.query())
50 }
51
52 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
55 where
56 E: EntityValue,
57 F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
58 {
59 self.ensure_non_paged_mode_ready()?;
60
61 self.session.execute_load_query_with(self.query(), execute)
62 }
63
64 fn explain_scalar_non_paged_terminal(
67 &self,
68 aggregate: AggregateExpr,
69 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
70 where
71 E: EntityValue,
72 {
73 self.ensure_non_paged_mode_ready()?;
74
75 self.session
76 .explain_query_aggregate_terminal_with_visible_indexes(self.query(), aggregate)
77 }
78
79 pub fn is_empty(&self) -> Result<bool, QueryError>
85 where
86 E: EntityValue,
87 {
88 self.not_exists()
89 }
90
91 pub fn not_exists(&self) -> Result<bool, QueryError>
93 where
94 E: EntityValue,
95 {
96 Ok(!self.exists()?)
97 }
98
99 pub fn exists(&self) -> Result<bool, QueryError>
101 where
102 E: EntityValue,
103 {
104 self.execute_scalar_non_paged_terminal(|load, plan| {
105 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Exists)?
106 .into_exists()
107 })
108 }
109
110 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
112 where
113 E: EntityValue,
114 {
115 self.explain_scalar_non_paged_terminal(exists())
116 }
117
118 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
122 where
123 E: EntityValue,
124 {
125 self.explain_exists()
126 }
127
128 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
130 where
131 E: EntityValue,
132 {
133 self.session
134 .explain_query_execution_with_visible_indexes(self.query())
135 }
136
137 pub fn explain_execution_text(&self) -> Result<String, QueryError>
139 where
140 E: EntityValue,
141 {
142 self.session
143 .explain_query_execution_text_with_visible_indexes(self.query())
144 }
145
146 pub fn explain_execution_json(&self) -> Result<String, QueryError>
148 where
149 E: EntityValue,
150 {
151 self.session
152 .explain_query_execution_json_with_visible_indexes(self.query())
153 }
154
155 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
157 where
158 E: EntityValue,
159 {
160 self.session
161 .explain_query_execution_verbose_with_visible_indexes(self.query())
162 }
163
164 pub fn count(&self) -> Result<u32, QueryError>
166 where
167 E: EntityValue,
168 {
169 self.execute_scalar_non_paged_terminal(|load, plan| {
170 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Count)?
171 .into_count()
172 })
173 }
174
175 pub fn bytes(&self) -> Result<u64, QueryError>
178 where
179 E: EntityValue,
180 {
181 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
182 }
183
184 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
187 where
188 E: EntityValue,
189 {
190 self.ensure_non_paged_mode_ready()?;
191
192 Self::with_slot(field, |target_slot| {
193 self.session
194 .execute_load_query_with(self.query(), move |load, plan| {
195 load.bytes_by_slot(plan, target_slot)
196 })
197 })
198 }
199
200 pub fn explain_bytes_by(
202 &self,
203 field: impl AsRef<str>,
204 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
205 where
206 E: EntityValue,
207 {
208 self.ensure_non_paged_mode_ready()?;
209
210 Self::with_slot(field, |target_slot| {
211 self.session
212 .explain_query_bytes_by_with_visible_indexes(self.query(), target_slot.field())
213 })
214 }
215
216 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
218 where
219 E: EntityValue,
220 {
221 self.execute_scalar_non_paged_terminal(|load, plan| {
222 load.execute_scalar_terminal_request(
223 plan,
224 ScalarTerminalBoundaryRequest::IdTerminal {
225 kind: AggregateKind::Min,
226 },
227 )?
228 .into_id()
229 })
230 }
231
232 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
234 where
235 E: EntityValue,
236 {
237 self.explain_scalar_non_paged_terminal(min())
238 }
239
240 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
244 where
245 E: EntityValue,
246 {
247 self.ensure_non_paged_mode_ready()?;
248
249 Self::with_slot(field, |target_slot| {
250 self.session
251 .execute_load_query_with(self.query(), move |load, plan| {
252 load.execute_scalar_terminal_request(
253 plan,
254 ScalarTerminalBoundaryRequest::IdBySlot {
255 kind: AggregateKind::Min,
256 target_field: target_slot,
257 },
258 )?
259 .into_id()
260 })
261 })
262 }
263
264 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
266 where
267 E: EntityValue,
268 {
269 self.execute_scalar_non_paged_terminal(|load, plan| {
270 load.execute_scalar_terminal_request(
271 plan,
272 ScalarTerminalBoundaryRequest::IdTerminal {
273 kind: AggregateKind::Max,
274 },
275 )?
276 .into_id()
277 })
278 }
279
280 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
282 where
283 E: EntityValue,
284 {
285 self.explain_scalar_non_paged_terminal(max())
286 }
287
288 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
292 where
293 E: EntityValue,
294 {
295 self.ensure_non_paged_mode_ready()?;
296
297 Self::with_slot(field, |target_slot| {
298 self.session
299 .execute_load_query_with(self.query(), move |load, plan| {
300 load.execute_scalar_terminal_request(
301 plan,
302 ScalarTerminalBoundaryRequest::IdBySlot {
303 kind: AggregateKind::Max,
304 target_field: target_slot,
305 },
306 )?
307 .into_id()
308 })
309 })
310 }
311
312 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
315 where
316 E: EntityValue,
317 {
318 self.ensure_non_paged_mode_ready()?;
319
320 Self::with_slot(field, |target_slot| {
321 self.session
322 .execute_load_query_with(self.query(), move |load, plan| {
323 load.execute_scalar_terminal_request(
324 plan,
325 ScalarTerminalBoundaryRequest::NthBySlot {
326 target_field: target_slot,
327 nth,
328 },
329 )?
330 .into_id()
331 })
332 })
333 }
334
335 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
337 where
338 E: EntityValue,
339 {
340 self.ensure_non_paged_mode_ready()?;
341
342 Self::with_slot(field, |target_slot| {
343 self.session
344 .execute_load_query_with(self.query(), move |load, plan| {
345 load.execute_numeric_field_boundary(
346 plan,
347 target_slot,
348 ScalarNumericFieldBoundaryRequest::Sum,
349 )
350 })
351 })
352 }
353
354 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
356 where
357 E: EntityValue,
358 {
359 self.ensure_non_paged_mode_ready()?;
360
361 Self::with_slot(field, |target_slot| {
362 self.session
363 .execute_load_query_with(self.query(), move |load, plan| {
364 load.execute_numeric_field_boundary(
365 plan,
366 target_slot,
367 ScalarNumericFieldBoundaryRequest::SumDistinct,
368 )
369 })
370 })
371 }
372
373 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
375 where
376 E: EntityValue,
377 {
378 self.ensure_non_paged_mode_ready()?;
379
380 Self::with_slot(field, |target_slot| {
381 self.session
382 .execute_load_query_with(self.query(), move |load, plan| {
383 load.execute_numeric_field_boundary(
384 plan,
385 target_slot,
386 ScalarNumericFieldBoundaryRequest::Avg,
387 )
388 })
389 })
390 }
391
392 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
394 where
395 E: EntityValue,
396 {
397 self.ensure_non_paged_mode_ready()?;
398
399 Self::with_slot(field, |target_slot| {
400 self.session
401 .execute_load_query_with(self.query(), move |load, plan| {
402 load.execute_numeric_field_boundary(
403 plan,
404 target_slot,
405 ScalarNumericFieldBoundaryRequest::AvgDistinct,
406 )
407 })
408 })
409 }
410
411 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
416 where
417 E: EntityValue,
418 {
419 self.ensure_non_paged_mode_ready()?;
420
421 Self::with_slot(field, |target_slot| {
422 self.session
423 .execute_load_query_with(self.query(), move |load, plan| {
424 load.execute_scalar_terminal_request(
425 plan,
426 ScalarTerminalBoundaryRequest::MedianBySlot {
427 target_field: target_slot,
428 },
429 )?
430 .into_id()
431 })
432 })
433 }
434
435 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
438 where
439 E: EntityValue,
440 {
441 self.ensure_non_paged_mode_ready()?;
442
443 Self::with_slot(field, |target_slot| {
444 self.session
445 .execute_load_query_with(self.query(), move |load, plan| {
446 load.execute_scalar_projection_boundary(
447 plan,
448 target_slot,
449 ScalarProjectionBoundaryRequest::CountDistinct,
450 )?
451 .into_count()
452 })
453 })
454 }
455
456 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
460 where
461 E: EntityValue,
462 {
463 self.ensure_non_paged_mode_ready()?;
464
465 Self::with_slot(field, |target_slot| {
466 self.session
467 .execute_load_query_with(self.query(), move |load, plan| {
468 load.execute_scalar_terminal_request(
469 plan,
470 ScalarTerminalBoundaryRequest::MinMaxBySlot {
471 target_field: target_slot,
472 },
473 )?
474 .into_id_pair()
475 })
476 })
477 }
478
479 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
481 where
482 E: EntityValue,
483 {
484 self.ensure_non_paged_mode_ready()?;
485
486 Self::with_slot(field, |target_slot| {
487 self.session
488 .execute_load_query_with(self.query(), move |load, plan| {
489 load.execute_scalar_projection_boundary(
490 plan,
491 target_slot,
492 ScalarProjectionBoundaryRequest::Values,
493 )?
494 .into_values()
495 })
496 })
497 }
498
499 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
501 where
502 E: EntityValue,
503 {
504 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
505 }
506
507 pub fn top_k_by(
515 &self,
516 field: impl AsRef<str>,
517 take_count: u32,
518 ) -> Result<EntityResponse<E>, QueryError>
519 where
520 E: EntityValue,
521 {
522 self.ensure_non_paged_mode_ready()?;
523
524 Self::with_slot(field, |target_slot| {
525 self.session
526 .execute_load_query_with(self.query(), move |load, plan| {
527 load.top_k_by_slot(plan, target_slot, take_count)
528 })
529 })
530 }
531
532 pub fn bottom_k_by(
540 &self,
541 field: impl AsRef<str>,
542 take_count: u32,
543 ) -> Result<EntityResponse<E>, QueryError>
544 where
545 E: EntityValue,
546 {
547 self.ensure_non_paged_mode_ready()?;
548
549 Self::with_slot(field, |target_slot| {
550 self.session
551 .execute_load_query_with(self.query(), move |load, plan| {
552 load.bottom_k_by_slot(plan, target_slot, take_count)
553 })
554 })
555 }
556
557 pub fn top_k_by_values(
565 &self,
566 field: impl AsRef<str>,
567 take_count: u32,
568 ) -> Result<Vec<Value>, QueryError>
569 where
570 E: EntityValue,
571 {
572 self.ensure_non_paged_mode_ready()?;
573
574 Self::with_slot(field, |target_slot| {
575 self.session
576 .execute_load_query_with(self.query(), move |load, plan| {
577 load.top_k_by_values_slot(plan, target_slot, take_count)
578 })
579 })
580 }
581
582 pub fn bottom_k_by_values(
590 &self,
591 field: impl AsRef<str>,
592 take_count: u32,
593 ) -> Result<Vec<Value>, QueryError>
594 where
595 E: EntityValue,
596 {
597 self.ensure_non_paged_mode_ready()?;
598
599 Self::with_slot(field, |target_slot| {
600 self.session
601 .execute_load_query_with(self.query(), move |load, plan| {
602 load.bottom_k_by_values_slot(plan, target_slot, take_count)
603 })
604 })
605 }
606
607 pub fn top_k_by_with_ids(
615 &self,
616 field: impl AsRef<str>,
617 take_count: u32,
618 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
619 where
620 E: EntityValue,
621 {
622 self.ensure_non_paged_mode_ready()?;
623
624 Self::with_slot(field, |target_slot| {
625 self.session
626 .execute_load_query_with(self.query(), move |load, plan| {
627 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
628 })
629 })
630 }
631
632 pub fn bottom_k_by_with_ids(
640 &self,
641 field: impl AsRef<str>,
642 take_count: u32,
643 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
644 where
645 E: EntityValue,
646 {
647 self.ensure_non_paged_mode_ready()?;
648
649 Self::with_slot(field, |target_slot| {
650 self.session
651 .execute_load_query_with(self.query(), move |load, plan| {
652 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
653 })
654 })
655 }
656
657 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
660 where
661 E: EntityValue,
662 {
663 self.ensure_non_paged_mode_ready()?;
664
665 Self::with_slot(field, |target_slot| {
666 self.session
667 .execute_load_query_with(self.query(), move |load, plan| {
668 load.execute_scalar_projection_boundary(
669 plan,
670 target_slot,
671 ScalarProjectionBoundaryRequest::DistinctValues,
672 )?
673 .into_values()
674 })
675 })
676 }
677
678 pub fn values_by_with_ids(
681 &self,
682 field: impl AsRef<str>,
683 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
684 where
685 E: EntityValue,
686 {
687 self.ensure_non_paged_mode_ready()?;
688
689 Self::with_slot(field, |target_slot| {
690 self.session
691 .execute_load_query_with(self.query(), move |load, plan| {
692 load.execute_scalar_projection_boundary(
693 plan,
694 target_slot,
695 ScalarProjectionBoundaryRequest::ValuesWithIds,
696 )?
697 .into_values_with_ids()
698 })
699 })
700 }
701
702 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
705 where
706 E: EntityValue,
707 {
708 self.ensure_non_paged_mode_ready()?;
709
710 Self::with_slot(field, |target_slot| {
711 self.session
712 .execute_load_query_with(self.query(), move |load, plan| {
713 load.execute_scalar_projection_boundary(
714 plan,
715 target_slot,
716 ScalarProjectionBoundaryRequest::TerminalValue {
717 terminal_kind: AggregateKind::First,
718 },
719 )?
720 .into_terminal_value()
721 })
722 })
723 }
724
725 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
728 where
729 E: EntityValue,
730 {
731 self.ensure_non_paged_mode_ready()?;
732
733 Self::with_slot(field, |target_slot| {
734 self.session
735 .execute_load_query_with(self.query(), move |load, plan| {
736 load.execute_scalar_projection_boundary(
737 plan,
738 target_slot,
739 ScalarProjectionBoundaryRequest::TerminalValue {
740 terminal_kind: AggregateKind::Last,
741 },
742 )?
743 .into_terminal_value()
744 })
745 })
746 }
747
748 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
750 where
751 E: EntityValue,
752 {
753 self.execute_scalar_non_paged_terminal(|load, plan| {
754 load.execute_scalar_terminal_request(
755 plan,
756 ScalarTerminalBoundaryRequest::IdTerminal {
757 kind: AggregateKind::First,
758 },
759 )?
760 .into_id()
761 })
762 }
763
764 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
766 where
767 E: EntityValue,
768 {
769 self.explain_scalar_non_paged_terminal(first())
770 }
771
772 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
774 where
775 E: EntityValue,
776 {
777 self.execute_scalar_non_paged_terminal(|load, plan| {
778 load.execute_scalar_terminal_request(
779 plan,
780 ScalarTerminalBoundaryRequest::IdTerminal {
781 kind: AggregateKind::Last,
782 },
783 )?
784 .into_id()
785 })
786 }
787
788 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
790 where
791 E: EntityValue,
792 {
793 self.explain_scalar_non_paged_terminal(last())
794 }
795
796 pub fn require_one(&self) -> Result<(), QueryError>
798 where
799 E: EntityValue,
800 {
801 self.execute()?.require_one()?;
802 Ok(())
803 }
804
805 pub fn require_some(&self) -> Result<(), QueryError>
807 where
808 E: EntityValue,
809 {
810 self.execute()?.require_some()?;
811 Ok(())
812 }
813}