1use crate::{
7 db::{
8 DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
9 predicate::Predicate,
10 query::{
11 explain::ExplainPlan,
12 expr::{FilterExpr, SortExpr},
13 intent::{CompiledQuery, IntentError, PlannedQuery, Query, QueryError},
14 policy,
15 },
16 response::Response,
17 },
18 traits::{EntityKind, EntityValue, SingletonEntity},
19 types::{Decimal, Id},
20 value::Value,
21};
22
23type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
24
25pub struct FluentLoadQuery<'a, E>
34where
35 E: EntityKind,
36{
37 session: &'a DbSession<E::Canister>,
38 query: Query<E>,
39 cursor_token: Option<String>,
40}
41
42pub struct PagedLoadQuery<'a, E>
50where
51 E: EntityKind,
52{
53 inner: FluentLoadQuery<'a, E>,
54}
55
56impl<'a, E> FluentLoadQuery<'a, E>
57where
58 E: EntityKind,
59{
60 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
61 Self {
62 session,
63 query,
64 cursor_token: None,
65 }
66 }
67
68 #[must_use]
73 pub const fn query(&self) -> &Query<E> {
74 &self.query
75 }
76
77 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
78 self.query = map(self.query);
79 self
80 }
81
82 fn try_map_query(
83 mut self,
84 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
85 ) -> Result<Self, QueryError> {
86 self.query = map(self.query)?;
87 Ok(self)
88 }
89
90 #[must_use]
98 pub fn by_id(mut self, id: Id<E>) -> Self {
99 self.query = self.query.by_id(id.key());
100 self
101 }
102
103 #[must_use]
107 pub fn by_ids<I>(mut self, ids: I) -> Self
108 where
109 I: IntoIterator<Item = Id<E>>,
110 {
111 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
112 self
113 }
114
115 #[must_use]
120 pub fn filter(self, predicate: Predicate) -> Self {
121 self.map_query(|query| query.filter(predicate))
122 }
123
124 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
125 self.try_map_query(|query| query.filter_expr(expr))
126 }
127
128 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
129 self.try_map_query(|query| query.sort_expr(expr))
130 }
131
132 #[must_use]
133 pub fn order_by(self, field: impl AsRef<str>) -> Self {
134 self.map_query(|query| query.order_by(field))
135 }
136
137 #[must_use]
138 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
139 self.map_query(|query| query.order_by_desc(field))
140 }
141
142 #[must_use]
147 pub fn limit(self, limit: u32) -> Self {
148 self.map_query(|query| query.limit(limit))
149 }
150
151 #[must_use]
156 pub fn offset(self, offset: u32) -> Self {
157 self.map_query(|query| query.offset(offset))
158 }
159
160 #[must_use]
166 pub fn cursor(mut self, token: impl Into<String>) -> Self {
167 self.cursor_token = Some(token.into());
168 self
169 }
170
171 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
176 self.query.explain()
177 }
178
179 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
180 if let Some(err) = self.cursor_intent_error() {
181 return Err(QueryError::Intent(err));
182 }
183
184 self.query.planned()
185 }
186
187 pub fn plan(&self) -> Result<CompiledQuery<E>, QueryError> {
188 if let Some(err) = self.cursor_intent_error() {
189 return Err(QueryError::Intent(err));
190 }
191
192 self.query.plan()
193 }
194
195 pub fn execute(&self) -> Result<Response<E>, QueryError>
201 where
202 E: EntityValue,
203 {
204 self.ensure_non_paged_mode_ready()?;
205
206 self.session.execute_query(self.query())
207 }
208
209 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
220 self.ensure_paged_mode_ready()?;
221
222 Ok(PagedLoadQuery { inner: self })
223 }
224
225 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
229 where
230 E: EntityValue,
231 {
232 self.page()?.execute()
233 }
234
235 pub fn is_empty(&self) -> Result<bool, QueryError>
241 where
242 E: EntityValue,
243 {
244 Ok(!self.exists()?)
245 }
246
247 pub fn exists(&self) -> Result<bool, QueryError>
249 where
250 E: EntityValue,
251 {
252 self.ensure_non_paged_mode_ready()?;
253
254 self.session
255 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
256 }
257
258 pub fn count(&self) -> Result<u32, QueryError>
260 where
261 E: EntityValue,
262 {
263 self.ensure_non_paged_mode_ready()?;
264
265 self.session
266 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
267 }
268
269 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
271 where
272 E: EntityValue,
273 {
274 self.ensure_non_paged_mode_ready()?;
275
276 self.session
277 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
278 }
279
280 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
284 where
285 E: EntityValue,
286 {
287 self.ensure_non_paged_mode_ready()?;
288
289 self.session
290 .execute_load_query_with(self.query(), |load, plan| {
291 load.aggregate_min_by(plan, field.as_ref())
292 })
293 }
294
295 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
297 where
298 E: EntityValue,
299 {
300 self.ensure_non_paged_mode_ready()?;
301
302 self.session
303 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
304 }
305
306 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
310 where
311 E: EntityValue,
312 {
313 self.ensure_non_paged_mode_ready()?;
314
315 self.session
316 .execute_load_query_with(self.query(), |load, plan| {
317 load.aggregate_max_by(plan, field.as_ref())
318 })
319 }
320
321 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
324 where
325 E: EntityValue,
326 {
327 self.ensure_non_paged_mode_ready()?;
328
329 self.session
330 .execute_load_query_with(self.query(), |load, plan| {
331 load.aggregate_nth_by(plan, field.as_ref(), nth)
332 })
333 }
334
335 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
337 where
338 E: EntityValue,
339 {
340 self.ensure_non_paged_mode_ready()?;
341
342 self.session
343 .execute_load_query_with(self.query(), |load, plan| {
344 load.aggregate_sum_by(plan, field.as_ref())
345 })
346 }
347
348 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
350 where
351 E: EntityValue,
352 {
353 self.ensure_non_paged_mode_ready()?;
354
355 self.session
356 .execute_load_query_with(self.query(), |load, plan| {
357 load.aggregate_avg_by(plan, field.as_ref())
358 })
359 }
360
361 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
366 where
367 E: EntityValue,
368 {
369 self.ensure_non_paged_mode_ready()?;
370
371 self.session
372 .execute_load_query_with(self.query(), |load, plan| {
373 load.aggregate_median_by(plan, field.as_ref())
374 })
375 }
376
377 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
380 where
381 E: EntityValue,
382 {
383 self.ensure_non_paged_mode_ready()?;
384
385 self.session
386 .execute_load_query_with(self.query(), |load, plan| {
387 load.aggregate_count_distinct_by(plan, field.as_ref())
388 })
389 }
390
391 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
395 where
396 E: EntityValue,
397 {
398 self.ensure_non_paged_mode_ready()?;
399
400 self.session
401 .execute_load_query_with(self.query(), |load, plan| {
402 load.aggregate_min_max_by(plan, field.as_ref())
403 })
404 }
405
406 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
408 where
409 E: EntityValue,
410 {
411 self.ensure_non_paged_mode_ready()?;
412
413 self.session
414 .execute_load_query_with(self.query(), |load, plan| {
415 load.values_by(plan, field.as_ref())
416 })
417 }
418
419 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
421 where
422 E: EntityValue,
423 {
424 self.ensure_non_paged_mode_ready()?;
425
426 self.session
427 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
428 }
429
430 pub fn top_k_by(
438 &self,
439 field: impl AsRef<str>,
440 take_count: u32,
441 ) -> Result<Response<E>, QueryError>
442 where
443 E: EntityValue,
444 {
445 self.ensure_non_paged_mode_ready()?;
446
447 self.session
448 .execute_load_query_with(self.query(), |load, plan| {
449 load.top_k_by(plan, field.as_ref(), take_count)
450 })
451 }
452
453 pub fn bottom_k_by(
461 &self,
462 field: impl AsRef<str>,
463 take_count: u32,
464 ) -> Result<Response<E>, QueryError>
465 where
466 E: EntityValue,
467 {
468 self.ensure_non_paged_mode_ready()?;
469
470 self.session
471 .execute_load_query_with(self.query(), |load, plan| {
472 load.bottom_k_by(plan, field.as_ref(), take_count)
473 })
474 }
475
476 pub fn top_k_by_values(
484 &self,
485 field: impl AsRef<str>,
486 take_count: u32,
487 ) -> Result<Vec<Value>, QueryError>
488 where
489 E: EntityValue,
490 {
491 self.ensure_non_paged_mode_ready()?;
492
493 self.session
494 .execute_load_query_with(self.query(), |load, plan| {
495 load.top_k_by_values(plan, field.as_ref(), take_count)
496 })
497 }
498
499 pub fn bottom_k_by_values(
507 &self,
508 field: impl AsRef<str>,
509 take_count: u32,
510 ) -> Result<Vec<Value>, QueryError>
511 where
512 E: EntityValue,
513 {
514 self.ensure_non_paged_mode_ready()?;
515
516 self.session
517 .execute_load_query_with(self.query(), |load, plan| {
518 load.bottom_k_by_values(plan, field.as_ref(), take_count)
519 })
520 }
521
522 pub fn top_k_by_with_ids(
530 &self,
531 field: impl AsRef<str>,
532 take_count: u32,
533 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
534 where
535 E: EntityValue,
536 {
537 self.ensure_non_paged_mode_ready()?;
538
539 self.session
540 .execute_load_query_with(self.query(), |load, plan| {
541 load.top_k_by_with_ids(plan, field.as_ref(), take_count)
542 })
543 }
544
545 pub fn bottom_k_by_with_ids(
553 &self,
554 field: impl AsRef<str>,
555 take_count: u32,
556 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
557 where
558 E: EntityValue,
559 {
560 self.ensure_non_paged_mode_ready()?;
561
562 self.session
563 .execute_load_query_with(self.query(), |load, plan| {
564 load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
565 })
566 }
567
568 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
571 where
572 E: EntityValue,
573 {
574 self.ensure_non_paged_mode_ready()?;
575
576 self.session
577 .execute_load_query_with(self.query(), |load, plan| {
578 load.distinct_values_by(plan, field.as_ref())
579 })
580 }
581
582 pub fn values_by_with_ids(
585 &self,
586 field: impl AsRef<str>,
587 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
588 where
589 E: EntityValue,
590 {
591 self.ensure_non_paged_mode_ready()?;
592
593 self.session
594 .execute_load_query_with(self.query(), |load, plan| {
595 load.values_by_with_ids(plan, field.as_ref())
596 })
597 }
598
599 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
602 where
603 E: EntityValue,
604 {
605 self.ensure_non_paged_mode_ready()?;
606
607 self.session
608 .execute_load_query_with(self.query(), |load, plan| {
609 load.first_value_by(plan, field.as_ref())
610 })
611 }
612
613 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
616 where
617 E: EntityValue,
618 {
619 self.ensure_non_paged_mode_ready()?;
620
621 self.session
622 .execute_load_query_with(self.query(), |load, plan| {
623 load.last_value_by(plan, field.as_ref())
624 })
625 }
626
627 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
629 where
630 E: EntityValue,
631 {
632 self.ensure_non_paged_mode_ready()?;
633
634 self.session
635 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
636 }
637
638 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
640 where
641 E: EntityValue,
642 {
643 self.ensure_non_paged_mode_ready()?;
644
645 self.session
646 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
647 }
648
649 pub fn require_one(&self) -> Result<(), QueryError>
651 where
652 E: EntityValue,
653 {
654 self.execute()?.require_one()?;
655 Ok(())
656 }
657
658 pub fn require_some(&self) -> Result<(), QueryError>
660 where
661 E: EntityValue,
662 {
663 self.execute()?.require_some()?;
664 Ok(())
665 }
666}
667
668impl<E> FluentLoadQuery<'_, E>
669where
670 E: EntityKind,
671{
672 fn non_paged_intent_error(&self) -> Option<IntentError> {
673 self.cursor_token
674 .as_ref()
675 .map(|_| IntentError::CursorRequiresPagedExecution)
676 }
677
678 fn cursor_intent_error(&self) -> Option<IntentError> {
679 self.cursor_token
680 .as_ref()
681 .and_then(|_| self.paged_intent_error())
682 }
683
684 fn paged_intent_error(&self) -> Option<IntentError> {
685 let spec = self.query.load_spec()?;
686
687 policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
688 .err()
689 .map(IntentError::from)
690 }
691
692 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
693 if let Some(err) = self.paged_intent_error() {
694 return Err(QueryError::Intent(err));
695 }
696
697 Ok(())
698 }
699
700 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
701 if let Some(err) = self.non_paged_intent_error() {
702 return Err(QueryError::Intent(err));
703 }
704
705 Ok(())
706 }
707}
708
709impl<E> FluentLoadQuery<'_, E>
710where
711 E: EntityKind + SingletonEntity,
712 E::Key: Default,
713{
714 #[must_use]
715 pub fn only(self) -> Self {
716 self.map_query(Query::only)
717 }
718}
719
720impl<E> PagedLoadQuery<'_, E>
721where
722 E: EntityKind,
723{
724 #[must_use]
729 pub const fn query(&self) -> &Query<E> {
730 self.inner.query()
731 }
732
733 #[must_use]
739 pub fn cursor(mut self, token: impl Into<String>) -> Self {
740 self.inner = self.inner.cursor(token);
741 self
742 }
743
744 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
754 where
755 E: EntityValue,
756 {
757 self.execute_with_trace()
758 .map(PagedLoadExecutionWithTrace::into_execution)
759 }
760
761 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
767 where
768 E: EntityValue,
769 {
770 self.inner.ensure_paged_mode_ready()?;
771
772 self.inner.session.execute_load_query_paged_with_trace(
773 self.inner.query(),
774 self.inner.cursor_token.as_deref(),
775 )
776 }
777}