1use crate::{
7 db::{
8 PersistedRow,
9 executor::{
10 ExecutablePlan, LoadExecutor, ScalarNumericFieldBoundaryRequest,
11 ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryRequest,
12 },
13 query::{
14 api::ResponseCardinalityExt,
15 builder::{
16 AggregateExpr,
17 aggregate::{exists, first, last, max, min},
18 },
19 explain::{ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor},
20 fluent::load::FluentLoadQuery,
21 intent::QueryError,
22 plan::AggregateKind,
23 },
24 response::EntityResponse,
25 },
26 error::InternalError,
27 traits::EntityValue,
28 types::{Decimal, Id},
29 value::Value,
30};
31
32type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
33
34impl<E> FluentLoadQuery<'_, E>
35where
36 E: PersistedRow,
37{
38 pub fn execute(&self) -> Result<EntityResponse<E>, QueryError>
44 where
45 E: EntityValue,
46 {
47 self.ensure_non_paged_mode_ready()?;
48
49 self.session.execute_query(self.query())
50 }
51
52 fn execute_scalar_non_paged_terminal<T, F>(&self, execute: F) -> Result<T, QueryError>
55 where
56 E: EntityValue,
57 F: FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
58 {
59 self.ensure_non_paged_mode_ready()?;
60
61 self.session.execute_load_query_with(self.query(), execute)
62 }
63
64 fn explain_scalar_non_paged_terminal(
67 &self,
68 aggregate: AggregateExpr,
69 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
70 where
71 E: EntityValue,
72 {
73 self.ensure_non_paged_mode_ready()?;
74
75 self.query().explain_aggregate_terminal(aggregate)
76 }
77
78 pub fn is_empty(&self) -> Result<bool, QueryError>
84 where
85 E: EntityValue,
86 {
87 self.not_exists()
88 }
89
90 pub fn not_exists(&self) -> Result<bool, QueryError>
92 where
93 E: EntityValue,
94 {
95 Ok(!self.exists()?)
96 }
97
98 pub fn exists(&self) -> Result<bool, QueryError>
100 where
101 E: EntityValue,
102 {
103 self.execute_scalar_non_paged_terminal(|load, plan| {
104 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Exists)?
105 .into_exists()
106 })
107 }
108
109 pub fn explain_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
111 where
112 E: EntityValue,
113 {
114 self.explain_scalar_non_paged_terminal(exists())
115 }
116
117 pub fn explain_not_exists(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
121 where
122 E: EntityValue,
123 {
124 self.explain_exists()
125 }
126
127 pub fn explain_execution(&self) -> Result<ExplainExecutionNodeDescriptor, QueryError>
129 where
130 E: EntityValue,
131 {
132 self.query().explain_execution()
133 }
134
135 pub fn explain_execution_text(&self) -> Result<String, QueryError>
137 where
138 E: EntityValue,
139 {
140 self.query().explain_execution_text()
141 }
142
143 pub fn explain_execution_json(&self) -> Result<String, QueryError>
145 where
146 E: EntityValue,
147 {
148 self.query().explain_execution_json()
149 }
150
151 pub fn explain_execution_verbose(&self) -> Result<String, QueryError>
153 where
154 E: EntityValue,
155 {
156 self.query().explain_execution_verbose()
157 }
158
159 pub fn count(&self) -> Result<u32, QueryError>
161 where
162 E: EntityValue,
163 {
164 self.execute_scalar_non_paged_terminal(|load, plan| {
165 load.execute_scalar_terminal_request(plan, ScalarTerminalBoundaryRequest::Count)?
166 .into_count()
167 })
168 }
169
170 pub fn bytes(&self) -> Result<u64, QueryError>
173 where
174 E: EntityValue,
175 {
176 self.execute_scalar_non_paged_terminal(|load, plan| load.bytes(plan))
177 }
178
179 pub fn bytes_by(&self, field: impl AsRef<str>) -> Result<u64, QueryError>
182 where
183 E: EntityValue,
184 {
185 self.ensure_non_paged_mode_ready()?;
186
187 Self::with_slot(field, |target_slot| {
188 self.session
189 .execute_load_query_with(self.query(), move |load, plan| {
190 load.bytes_by_slot(plan, target_slot)
191 })
192 })
193 }
194
195 pub fn explain_bytes_by(
197 &self,
198 field: impl AsRef<str>,
199 ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
200 where
201 E: EntityValue,
202 {
203 self.ensure_non_paged_mode_ready()?;
204
205 Self::with_slot(field, |target_slot| {
206 self.query().explain_bytes_by(target_slot.field())
207 })
208 }
209
210 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
212 where
213 E: EntityValue,
214 {
215 self.execute_scalar_non_paged_terminal(|load, plan| {
216 load.execute_scalar_terminal_request(
217 plan,
218 ScalarTerminalBoundaryRequest::IdTerminal {
219 kind: AggregateKind::Min,
220 },
221 )?
222 .into_id()
223 })
224 }
225
226 pub fn explain_min(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
228 where
229 E: EntityValue,
230 {
231 self.explain_scalar_non_paged_terminal(min())
232 }
233
234 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
238 where
239 E: EntityValue,
240 {
241 self.ensure_non_paged_mode_ready()?;
242
243 Self::with_slot(field, |target_slot| {
244 self.session
245 .execute_load_query_with(self.query(), move |load, plan| {
246 load.execute_scalar_terminal_request(
247 plan,
248 ScalarTerminalBoundaryRequest::IdBySlot {
249 kind: AggregateKind::Min,
250 target_field: target_slot,
251 },
252 )?
253 .into_id()
254 })
255 })
256 }
257
258 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
260 where
261 E: EntityValue,
262 {
263 self.execute_scalar_non_paged_terminal(|load, plan| {
264 load.execute_scalar_terminal_request(
265 plan,
266 ScalarTerminalBoundaryRequest::IdTerminal {
267 kind: AggregateKind::Max,
268 },
269 )?
270 .into_id()
271 })
272 }
273
274 pub fn explain_max(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
276 where
277 E: EntityValue,
278 {
279 self.explain_scalar_non_paged_terminal(max())
280 }
281
282 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
286 where
287 E: EntityValue,
288 {
289 self.ensure_non_paged_mode_ready()?;
290
291 Self::with_slot(field, |target_slot| {
292 self.session
293 .execute_load_query_with(self.query(), move |load, plan| {
294 load.execute_scalar_terminal_request(
295 plan,
296 ScalarTerminalBoundaryRequest::IdBySlot {
297 kind: AggregateKind::Max,
298 target_field: target_slot,
299 },
300 )?
301 .into_id()
302 })
303 })
304 }
305
306 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
309 where
310 E: EntityValue,
311 {
312 self.ensure_non_paged_mode_ready()?;
313
314 Self::with_slot(field, |target_slot| {
315 self.session
316 .execute_load_query_with(self.query(), move |load, plan| {
317 load.execute_scalar_terminal_request(
318 plan,
319 ScalarTerminalBoundaryRequest::NthBySlot {
320 target_field: target_slot,
321 nth,
322 },
323 )?
324 .into_id()
325 })
326 })
327 }
328
329 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
331 where
332 E: EntityValue,
333 {
334 self.ensure_non_paged_mode_ready()?;
335
336 Self::with_slot(field, |target_slot| {
337 self.session
338 .execute_load_query_with(self.query(), move |load, plan| {
339 load.execute_numeric_field_boundary(
340 plan,
341 target_slot,
342 ScalarNumericFieldBoundaryRequest::Sum,
343 )
344 })
345 })
346 }
347
348 pub fn sum_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
350 where
351 E: EntityValue,
352 {
353 self.ensure_non_paged_mode_ready()?;
354
355 Self::with_slot(field, |target_slot| {
356 self.session
357 .execute_load_query_with(self.query(), move |load, plan| {
358 load.execute_numeric_field_boundary(
359 plan,
360 target_slot,
361 ScalarNumericFieldBoundaryRequest::SumDistinct,
362 )
363 })
364 })
365 }
366
367 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
369 where
370 E: EntityValue,
371 {
372 self.ensure_non_paged_mode_ready()?;
373
374 Self::with_slot(field, |target_slot| {
375 self.session
376 .execute_load_query_with(self.query(), move |load, plan| {
377 load.execute_numeric_field_boundary(
378 plan,
379 target_slot,
380 ScalarNumericFieldBoundaryRequest::Avg,
381 )
382 })
383 })
384 }
385
386 pub fn avg_distinct_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
388 where
389 E: EntityValue,
390 {
391 self.ensure_non_paged_mode_ready()?;
392
393 Self::with_slot(field, |target_slot| {
394 self.session
395 .execute_load_query_with(self.query(), move |load, plan| {
396 load.execute_numeric_field_boundary(
397 plan,
398 target_slot,
399 ScalarNumericFieldBoundaryRequest::AvgDistinct,
400 )
401 })
402 })
403 }
404
405 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
410 where
411 E: EntityValue,
412 {
413 self.ensure_non_paged_mode_ready()?;
414
415 Self::with_slot(field, |target_slot| {
416 self.session
417 .execute_load_query_with(self.query(), move |load, plan| {
418 load.execute_scalar_terminal_request(
419 plan,
420 ScalarTerminalBoundaryRequest::MedianBySlot {
421 target_field: target_slot,
422 },
423 )?
424 .into_id()
425 })
426 })
427 }
428
429 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
432 where
433 E: EntityValue,
434 {
435 self.ensure_non_paged_mode_ready()?;
436
437 Self::with_slot(field, |target_slot| {
438 self.session
439 .execute_load_query_with(self.query(), move |load, plan| {
440 load.execute_scalar_projection_boundary(
441 plan,
442 target_slot,
443 ScalarProjectionBoundaryRequest::CountDistinct,
444 )?
445 .into_count()
446 })
447 })
448 }
449
450 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
454 where
455 E: EntityValue,
456 {
457 self.ensure_non_paged_mode_ready()?;
458
459 Self::with_slot(field, |target_slot| {
460 self.session
461 .execute_load_query_with(self.query(), move |load, plan| {
462 load.execute_scalar_terminal_request(
463 plan,
464 ScalarTerminalBoundaryRequest::MinMaxBySlot {
465 target_field: target_slot,
466 },
467 )?
468 .into_id_pair()
469 })
470 })
471 }
472
473 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
475 where
476 E: EntityValue,
477 {
478 self.ensure_non_paged_mode_ready()?;
479
480 Self::with_slot(field, |target_slot| {
481 self.session
482 .execute_load_query_with(self.query(), move |load, plan| {
483 load.execute_scalar_projection_boundary(
484 plan,
485 target_slot,
486 ScalarProjectionBoundaryRequest::Values,
487 )?
488 .into_values()
489 })
490 })
491 }
492
493 pub fn take(&self, take_count: u32) -> Result<EntityResponse<E>, QueryError>
495 where
496 E: EntityValue,
497 {
498 self.execute_scalar_non_paged_terminal(|load, plan| load.take(plan, take_count))
499 }
500
501 pub fn top_k_by(
509 &self,
510 field: impl AsRef<str>,
511 take_count: u32,
512 ) -> Result<EntityResponse<E>, QueryError>
513 where
514 E: EntityValue,
515 {
516 self.ensure_non_paged_mode_ready()?;
517
518 Self::with_slot(field, |target_slot| {
519 self.session
520 .execute_load_query_with(self.query(), move |load, plan| {
521 load.top_k_by_slot(plan, target_slot, take_count)
522 })
523 })
524 }
525
526 pub fn bottom_k_by(
534 &self,
535 field: impl AsRef<str>,
536 take_count: u32,
537 ) -> Result<EntityResponse<E>, QueryError>
538 where
539 E: EntityValue,
540 {
541 self.ensure_non_paged_mode_ready()?;
542
543 Self::with_slot(field, |target_slot| {
544 self.session
545 .execute_load_query_with(self.query(), move |load, plan| {
546 load.bottom_k_by_slot(plan, target_slot, take_count)
547 })
548 })
549 }
550
551 pub fn top_k_by_values(
559 &self,
560 field: impl AsRef<str>,
561 take_count: u32,
562 ) -> Result<Vec<Value>, QueryError>
563 where
564 E: EntityValue,
565 {
566 self.ensure_non_paged_mode_ready()?;
567
568 Self::with_slot(field, |target_slot| {
569 self.session
570 .execute_load_query_with(self.query(), move |load, plan| {
571 load.top_k_by_values_slot(plan, target_slot, take_count)
572 })
573 })
574 }
575
576 pub fn bottom_k_by_values(
584 &self,
585 field: impl AsRef<str>,
586 take_count: u32,
587 ) -> Result<Vec<Value>, QueryError>
588 where
589 E: EntityValue,
590 {
591 self.ensure_non_paged_mode_ready()?;
592
593 Self::with_slot(field, |target_slot| {
594 self.session
595 .execute_load_query_with(self.query(), move |load, plan| {
596 load.bottom_k_by_values_slot(plan, target_slot, take_count)
597 })
598 })
599 }
600
601 pub fn top_k_by_with_ids(
609 &self,
610 field: impl AsRef<str>,
611 take_count: u32,
612 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
613 where
614 E: EntityValue,
615 {
616 self.ensure_non_paged_mode_ready()?;
617
618 Self::with_slot(field, |target_slot| {
619 self.session
620 .execute_load_query_with(self.query(), move |load, plan| {
621 load.top_k_by_with_ids_slot(plan, target_slot, take_count)
622 })
623 })
624 }
625
626 pub fn bottom_k_by_with_ids(
634 &self,
635 field: impl AsRef<str>,
636 take_count: u32,
637 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
638 where
639 E: EntityValue,
640 {
641 self.ensure_non_paged_mode_ready()?;
642
643 Self::with_slot(field, |target_slot| {
644 self.session
645 .execute_load_query_with(self.query(), move |load, plan| {
646 load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
647 })
648 })
649 }
650
651 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
654 where
655 E: EntityValue,
656 {
657 self.ensure_non_paged_mode_ready()?;
658
659 Self::with_slot(field, |target_slot| {
660 self.session
661 .execute_load_query_with(self.query(), move |load, plan| {
662 load.execute_scalar_projection_boundary(
663 plan,
664 target_slot,
665 ScalarProjectionBoundaryRequest::DistinctValues,
666 )?
667 .into_values()
668 })
669 })
670 }
671
672 pub fn values_by_with_ids(
675 &self,
676 field: impl AsRef<str>,
677 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
678 where
679 E: EntityValue,
680 {
681 self.ensure_non_paged_mode_ready()?;
682
683 Self::with_slot(field, |target_slot| {
684 self.session
685 .execute_load_query_with(self.query(), move |load, plan| {
686 load.execute_scalar_projection_boundary(
687 plan,
688 target_slot,
689 ScalarProjectionBoundaryRequest::ValuesWithIds,
690 )?
691 .into_values_with_ids()
692 })
693 })
694 }
695
696 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
699 where
700 E: EntityValue,
701 {
702 self.ensure_non_paged_mode_ready()?;
703
704 Self::with_slot(field, |target_slot| {
705 self.session
706 .execute_load_query_with(self.query(), move |load, plan| {
707 load.execute_scalar_projection_boundary(
708 plan,
709 target_slot,
710 ScalarProjectionBoundaryRequest::TerminalValue {
711 terminal_kind: AggregateKind::First,
712 },
713 )?
714 .into_terminal_value()
715 })
716 })
717 }
718
719 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
722 where
723 E: EntityValue,
724 {
725 self.ensure_non_paged_mode_ready()?;
726
727 Self::with_slot(field, |target_slot| {
728 self.session
729 .execute_load_query_with(self.query(), move |load, plan| {
730 load.execute_scalar_projection_boundary(
731 plan,
732 target_slot,
733 ScalarProjectionBoundaryRequest::TerminalValue {
734 terminal_kind: AggregateKind::Last,
735 },
736 )?
737 .into_terminal_value()
738 })
739 })
740 }
741
742 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
744 where
745 E: EntityValue,
746 {
747 self.execute_scalar_non_paged_terminal(|load, plan| {
748 load.execute_scalar_terminal_request(
749 plan,
750 ScalarTerminalBoundaryRequest::IdTerminal {
751 kind: AggregateKind::First,
752 },
753 )?
754 .into_id()
755 })
756 }
757
758 pub fn explain_first(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
760 where
761 E: EntityValue,
762 {
763 self.explain_scalar_non_paged_terminal(first())
764 }
765
766 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
768 where
769 E: EntityValue,
770 {
771 self.execute_scalar_non_paged_terminal(|load, plan| {
772 load.execute_scalar_terminal_request(
773 plan,
774 ScalarTerminalBoundaryRequest::IdTerminal {
775 kind: AggregateKind::Last,
776 },
777 )?
778 .into_id()
779 })
780 }
781
782 pub fn explain_last(&self) -> Result<ExplainAggregateTerminalPlan, QueryError>
784 where
785 E: EntityValue,
786 {
787 self.explain_scalar_non_paged_terminal(last())
788 }
789
790 pub fn require_one(&self) -> Result<(), QueryError>
792 where
793 E: EntityValue,
794 {
795 self.execute()?.require_one()?;
796 Ok(())
797 }
798
799 pub fn require_some(&self) -> Result<(), QueryError>
801 where
802 E: EntityValue,
803 {
804 self.execute()?.require_some()?;
805 Ok(())
806 }
807}