1use crate::{
2 db::{
3 DbSession, PagedLoadExecution, PagedLoadExecutionWithTrace,
4 contracts::Predicate,
5 policy,
6 query::{
7 explain::ExplainPlan,
8 expr::{FilterExpr, SortExpr},
9 intent::{IntentError, PlannedQuery, Query, QueryError},
10 },
11 response::Response,
12 },
13 traits::{EntityKind, EntityValue, SingletonEntity},
14 types::{Decimal, Id},
15 value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20pub struct FluentLoadQuery<'a, E>
29where
30 E: EntityKind,
31{
32 session: &'a DbSession<E::Canister>,
33 query: Query<E>,
34 cursor_token: Option<String>,
35}
36
37pub struct PagedLoadQuery<'a, E>
45where
46 E: EntityKind,
47{
48 inner: FluentLoadQuery<'a, E>,
49}
50
51impl<'a, E> FluentLoadQuery<'a, E>
52where
53 E: EntityKind,
54{
55 pub(crate) const fn new(session: &'a DbSession<E::Canister>, query: Query<E>) -> Self {
56 Self {
57 session,
58 query,
59 cursor_token: None,
60 }
61 }
62
63 #[must_use]
68 pub const fn query(&self) -> &Query<E> {
69 &self.query
70 }
71
72 fn map_query(mut self, map: impl FnOnce(Query<E>) -> Query<E>) -> Self {
73 self.query = map(self.query);
74 self
75 }
76
77 fn try_map_query(
78 mut self,
79 map: impl FnOnce(Query<E>) -> Result<Query<E>, QueryError>,
80 ) -> Result<Self, QueryError> {
81 self.query = map(self.query)?;
82 Ok(self)
83 }
84
85 #[must_use]
93 pub fn by_id(mut self, id: Id<E>) -> Self {
94 self.query = self.query.by_id(id.key());
95 self
96 }
97
98 #[must_use]
102 pub fn by_ids<I>(mut self, ids: I) -> Self
103 where
104 I: IntoIterator<Item = Id<E>>,
105 {
106 self.query = self.query.by_ids(ids.into_iter().map(|id| id.key()));
107 self
108 }
109
110 #[must_use]
115 pub fn filter(self, predicate: Predicate) -> Self {
116 self.map_query(|query| query.filter(predicate))
117 }
118
119 pub fn filter_expr(self, expr: FilterExpr) -> Result<Self, QueryError> {
120 self.try_map_query(|query| query.filter_expr(expr))
121 }
122
123 pub fn sort_expr(self, expr: SortExpr) -> Result<Self, QueryError> {
124 self.try_map_query(|query| query.sort_expr(expr))
125 }
126
127 #[must_use]
128 pub fn order_by(self, field: impl AsRef<str>) -> Self {
129 self.map_query(|query| query.order_by(field))
130 }
131
132 #[must_use]
133 pub fn order_by_desc(self, field: impl AsRef<str>) -> Self {
134 self.map_query(|query| query.order_by_desc(field))
135 }
136
137 #[must_use]
142 pub fn limit(self, limit: u32) -> Self {
143 self.map_query(|query| query.limit(limit))
144 }
145
146 #[must_use]
151 pub fn offset(self, offset: u32) -> Self {
152 self.map_query(|query| query.offset(offset))
153 }
154
155 #[must_use]
161 pub fn cursor(mut self, token: impl Into<String>) -> Self {
162 self.cursor_token = Some(token.into());
163 self
164 }
165
166 pub fn explain(&self) -> Result<ExplainPlan, QueryError> {
171 self.query.explain()
172 }
173
174 pub fn planned(&self) -> Result<PlannedQuery<E>, QueryError> {
175 if let Some(err) = self.cursor_intent_error() {
176 return Err(QueryError::Intent(err));
177 }
178
179 self.query.planned()
180 }
181
182 pub fn execute(&self) -> Result<Response<E>, QueryError>
188 where
189 E: EntityValue,
190 {
191 self.ensure_non_paged_mode_ready()?;
192
193 self.session.execute_query(self.query())
194 }
195
196 pub fn page(self) -> Result<PagedLoadQuery<'a, E>, QueryError> {
207 self.ensure_paged_mode_ready()?;
208
209 Ok(PagedLoadQuery { inner: self })
210 }
211
212 pub fn execute_paged(self) -> Result<PagedLoadExecution<E>, QueryError>
216 where
217 E: EntityValue,
218 {
219 self.page()?.execute()
220 }
221
222 pub fn is_empty(&self) -> Result<bool, QueryError>
228 where
229 E: EntityValue,
230 {
231 Ok(!self.exists()?)
232 }
233
234 pub fn exists(&self) -> Result<bool, QueryError>
236 where
237 E: EntityValue,
238 {
239 self.ensure_non_paged_mode_ready()?;
240
241 self.session
242 .execute_load_query_with(self.query(), |load, plan| load.aggregate_exists(plan))
243 }
244
245 pub fn count(&self) -> Result<u32, QueryError>
247 where
248 E: EntityValue,
249 {
250 self.ensure_non_paged_mode_ready()?;
251
252 self.session
253 .execute_load_query_with(self.query(), |load, plan| load.aggregate_count(plan))
254 }
255
256 pub fn min(&self) -> Result<Option<Id<E>>, QueryError>
258 where
259 E: EntityValue,
260 {
261 self.ensure_non_paged_mode_ready()?;
262
263 self.session
264 .execute_load_query_with(self.query(), |load, plan| load.aggregate_min(plan))
265 }
266
267 pub fn min_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
271 where
272 E: EntityValue,
273 {
274 self.ensure_non_paged_mode_ready()?;
275
276 self.session
277 .execute_load_query_with(self.query(), |load, plan| {
278 load.aggregate_min_by(plan, field.as_ref())
279 })
280 }
281
282 pub fn max(&self) -> Result<Option<Id<E>>, QueryError>
284 where
285 E: EntityValue,
286 {
287 self.ensure_non_paged_mode_ready()?;
288
289 self.session
290 .execute_load_query_with(self.query(), |load, plan| load.aggregate_max(plan))
291 }
292
293 pub fn max_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
297 where
298 E: EntityValue,
299 {
300 self.ensure_non_paged_mode_ready()?;
301
302 self.session
303 .execute_load_query_with(self.query(), |load, plan| {
304 load.aggregate_max_by(plan, field.as_ref())
305 })
306 }
307
308 pub fn nth_by(&self, field: impl AsRef<str>, nth: usize) -> Result<Option<Id<E>>, QueryError>
311 where
312 E: EntityValue,
313 {
314 self.ensure_non_paged_mode_ready()?;
315
316 self.session
317 .execute_load_query_with(self.query(), |load, plan| {
318 load.aggregate_nth_by(plan, field.as_ref(), nth)
319 })
320 }
321
322 pub fn sum_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
324 where
325 E: EntityValue,
326 {
327 self.ensure_non_paged_mode_ready()?;
328
329 self.session
330 .execute_load_query_with(self.query(), |load, plan| {
331 load.aggregate_sum_by(plan, field.as_ref())
332 })
333 }
334
335 pub fn avg_by(&self, field: impl AsRef<str>) -> Result<Option<Decimal>, QueryError>
337 where
338 E: EntityValue,
339 {
340 self.ensure_non_paged_mode_ready()?;
341
342 self.session
343 .execute_load_query_with(self.query(), |load, plan| {
344 load.aggregate_avg_by(plan, field.as_ref())
345 })
346 }
347
348 pub fn median_by(&self, field: impl AsRef<str>) -> Result<Option<Id<E>>, QueryError>
353 where
354 E: EntityValue,
355 {
356 self.ensure_non_paged_mode_ready()?;
357
358 self.session
359 .execute_load_query_with(self.query(), |load, plan| {
360 load.aggregate_median_by(plan, field.as_ref())
361 })
362 }
363
364 pub fn count_distinct_by(&self, field: impl AsRef<str>) -> Result<u32, QueryError>
367 where
368 E: EntityValue,
369 {
370 self.ensure_non_paged_mode_ready()?;
371
372 self.session
373 .execute_load_query_with(self.query(), |load, plan| {
374 load.aggregate_count_distinct_by(plan, field.as_ref())
375 })
376 }
377
378 pub fn min_max_by(&self, field: impl AsRef<str>) -> Result<MinMaxByIds<E>, QueryError>
382 where
383 E: EntityValue,
384 {
385 self.ensure_non_paged_mode_ready()?;
386
387 self.session
388 .execute_load_query_with(self.query(), |load, plan| {
389 load.aggregate_min_max_by(plan, field.as_ref())
390 })
391 }
392
393 pub fn values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
395 where
396 E: EntityValue,
397 {
398 self.ensure_non_paged_mode_ready()?;
399
400 self.session
401 .execute_load_query_with(self.query(), |load, plan| {
402 load.values_by(plan, field.as_ref())
403 })
404 }
405
406 pub fn take(&self, take_count: u32) -> Result<Response<E>, QueryError>
408 where
409 E: EntityValue,
410 {
411 self.ensure_non_paged_mode_ready()?;
412
413 self.session
414 .execute_load_query_with(self.query(), |load, plan| load.take(plan, take_count))
415 }
416
417 pub fn top_k_by(
425 &self,
426 field: impl AsRef<str>,
427 take_count: u32,
428 ) -> Result<Response<E>, QueryError>
429 where
430 E: EntityValue,
431 {
432 self.ensure_non_paged_mode_ready()?;
433
434 self.session
435 .execute_load_query_with(self.query(), |load, plan| {
436 load.top_k_by(plan, field.as_ref(), take_count)
437 })
438 }
439
440 pub fn bottom_k_by(
448 &self,
449 field: impl AsRef<str>,
450 take_count: u32,
451 ) -> Result<Response<E>, QueryError>
452 where
453 E: EntityValue,
454 {
455 self.ensure_non_paged_mode_ready()?;
456
457 self.session
458 .execute_load_query_with(self.query(), |load, plan| {
459 load.bottom_k_by(plan, field.as_ref(), take_count)
460 })
461 }
462
463 pub fn top_k_by_values(
471 &self,
472 field: impl AsRef<str>,
473 take_count: u32,
474 ) -> Result<Vec<Value>, QueryError>
475 where
476 E: EntityValue,
477 {
478 self.ensure_non_paged_mode_ready()?;
479
480 self.session
481 .execute_load_query_with(self.query(), |load, plan| {
482 load.top_k_by_values(plan, field.as_ref(), take_count)
483 })
484 }
485
486 pub fn bottom_k_by_values(
494 &self,
495 field: impl AsRef<str>,
496 take_count: u32,
497 ) -> Result<Vec<Value>, QueryError>
498 where
499 E: EntityValue,
500 {
501 self.ensure_non_paged_mode_ready()?;
502
503 self.session
504 .execute_load_query_with(self.query(), |load, plan| {
505 load.bottom_k_by_values(plan, field.as_ref(), take_count)
506 })
507 }
508
509 pub fn top_k_by_with_ids(
517 &self,
518 field: impl AsRef<str>,
519 take_count: u32,
520 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
521 where
522 E: EntityValue,
523 {
524 self.ensure_non_paged_mode_ready()?;
525
526 self.session
527 .execute_load_query_with(self.query(), |load, plan| {
528 load.top_k_by_with_ids(plan, field.as_ref(), take_count)
529 })
530 }
531
532 pub fn bottom_k_by_with_ids(
540 &self,
541 field: impl AsRef<str>,
542 take_count: u32,
543 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
544 where
545 E: EntityValue,
546 {
547 self.ensure_non_paged_mode_ready()?;
548
549 self.session
550 .execute_load_query_with(self.query(), |load, plan| {
551 load.bottom_k_by_with_ids(plan, field.as_ref(), take_count)
552 })
553 }
554
555 pub fn distinct_values_by(&self, field: impl AsRef<str>) -> Result<Vec<Value>, QueryError>
558 where
559 E: EntityValue,
560 {
561 self.ensure_non_paged_mode_ready()?;
562
563 self.session
564 .execute_load_query_with(self.query(), |load, plan| {
565 load.distinct_values_by(plan, field.as_ref())
566 })
567 }
568
569 pub fn values_by_with_ids(
572 &self,
573 field: impl AsRef<str>,
574 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
575 where
576 E: EntityValue,
577 {
578 self.ensure_non_paged_mode_ready()?;
579
580 self.session
581 .execute_load_query_with(self.query(), |load, plan| {
582 load.values_by_with_ids(plan, field.as_ref())
583 })
584 }
585
586 pub fn first_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
589 where
590 E: EntityValue,
591 {
592 self.ensure_non_paged_mode_ready()?;
593
594 self.session
595 .execute_load_query_with(self.query(), |load, plan| {
596 load.first_value_by(plan, field.as_ref())
597 })
598 }
599
600 pub fn last_value_by(&self, field: impl AsRef<str>) -> Result<Option<Value>, QueryError>
603 where
604 E: EntityValue,
605 {
606 self.ensure_non_paged_mode_ready()?;
607
608 self.session
609 .execute_load_query_with(self.query(), |load, plan| {
610 load.last_value_by(plan, field.as_ref())
611 })
612 }
613
614 pub fn first(&self) -> Result<Option<Id<E>>, QueryError>
616 where
617 E: EntityValue,
618 {
619 self.ensure_non_paged_mode_ready()?;
620
621 self.session
622 .execute_load_query_with(self.query(), |load, plan| load.aggregate_first(plan))
623 }
624
625 pub fn last(&self) -> Result<Option<Id<E>>, QueryError>
627 where
628 E: EntityValue,
629 {
630 self.ensure_non_paged_mode_ready()?;
631
632 self.session
633 .execute_load_query_with(self.query(), |load, plan| load.aggregate_last(plan))
634 }
635
636 pub fn require_one(&self) -> Result<(), QueryError>
638 where
639 E: EntityValue,
640 {
641 self.execute()?.require_one()?;
642 Ok(())
643 }
644
645 pub fn require_some(&self) -> Result<(), QueryError>
647 where
648 E: EntityValue,
649 {
650 self.execute()?.require_some()?;
651 Ok(())
652 }
653}
654
655impl<E> FluentLoadQuery<'_, E>
656where
657 E: EntityKind,
658{
659 fn non_paged_intent_error(&self) -> Option<IntentError> {
660 self.cursor_token
661 .as_ref()
662 .map(|_| IntentError::CursorRequiresPagedExecution)
663 }
664
665 fn cursor_intent_error(&self) -> Option<IntentError> {
666 self.cursor_token
667 .as_ref()
668 .and_then(|_| self.paged_intent_error())
669 }
670
671 fn paged_intent_error(&self) -> Option<IntentError> {
672 let spec = self.query.load_spec()?;
673
674 policy::validate_cursor_paging_requirements(self.query.has_explicit_order(), spec)
675 .err()
676 .map(IntentError::from)
677 }
678
679 fn ensure_paged_mode_ready(&self) -> Result<(), QueryError> {
680 if let Some(err) = self.paged_intent_error() {
681 return Err(QueryError::Intent(err));
682 }
683
684 Ok(())
685 }
686
687 fn ensure_non_paged_mode_ready(&self) -> Result<(), QueryError> {
688 if let Some(err) = self.non_paged_intent_error() {
689 return Err(QueryError::Intent(err));
690 }
691
692 Ok(())
693 }
694}
695
696impl<E> FluentLoadQuery<'_, E>
697where
698 E: EntityKind + SingletonEntity,
699 E::Key: Default,
700{
701 #[must_use]
702 pub fn only(self) -> Self {
703 self.map_query(Query::only)
704 }
705}
706
707impl<E> PagedLoadQuery<'_, E>
708where
709 E: EntityKind,
710{
711 #[must_use]
716 pub const fn query(&self) -> &Query<E> {
717 self.inner.query()
718 }
719
720 #[must_use]
726 pub fn cursor(mut self, token: impl Into<String>) -> Self {
727 self.inner = self.inner.cursor(token);
728 self
729 }
730
731 pub fn execute(self) -> Result<PagedLoadExecution<E>, QueryError>
741 where
742 E: EntityValue,
743 {
744 self.execute_with_trace()
745 .map(PagedLoadExecutionWithTrace::into_execution)
746 }
747
748 pub fn execute_with_trace(self) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
754 where
755 E: EntityValue,
756 {
757 self.inner.ensure_paged_mode_ready()?;
758
759 self.inner.session.execute_load_query_paged_with_trace(
760 self.inner.query(),
761 self.inner.cursor_token.as_deref(),
762 )
763 }
764}