Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo, SelectTwoModel,
3    Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10#[cfg(not(feature = "sync"))]
11type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12#[cfg(feature = "sync")]
13type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
14
15/// Defined a structure to handle pagination of a result from a query operation on a Model
16#[derive(Clone, Debug)]
17pub struct Paginator<'db, C, S>
18where
19    C: ConnectionTrait,
20    S: SelectorTrait + 'db,
21{
22    pub(crate) query: SelectStatement,
23    pub(crate) page: u64,
24    pub(crate) page_size: u64,
25    pub(crate) db: &'db C,
26    pub(crate) selector: PhantomData<S>,
27}
28
29/// Define a structure containing the numbers of items and pages of a Paginator
30#[derive(Clone, Debug)]
31pub struct ItemsAndPagesNumber {
32    /// The total number of items of a paginator
33    pub number_of_items: u64,
34    /// The total number of pages of a paginator
35    pub number_of_pages: u64,
36}
37
38// LINT: warn if paginator is used without an order by clause
39
40impl<'db, C, S> Paginator<'db, C, S>
41where
42    C: ConnectionTrait,
43    S: SelectorTrait + 'db,
44{
45    /// Fetch a specific page; page index starts from zero
46    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
47        let query = self
48            .query
49            .clone()
50            .limit(self.page_size)
51            .offset(self.page_size * page)
52            .to_owned();
53        let rows = self.db.query_all(&query).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            buffer.push(S::from_raw_query_result(row)?);
57        }
58        Ok(buffer)
59    }
60
61    /// Fetch the current page
62    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
63        self.fetch_page(self.page).await
64    }
65
66    /// Get the total number of items
67    pub async fn num_items(&self) -> Result<u64, DbErr> {
68        let query = SelectStatement::new()
69            .expr(Expr::cust("COUNT(*) AS num_items"))
70            .from_subquery(
71                self.query
72                    .clone()
73                    .reset_limit()
74                    .reset_offset()
75                    .clear_order_by()
76                    .to_owned(),
77                "sub_query",
78            )
79            .to_owned();
80        let result = match self.db.query_one(&query).await? {
81            Some(res) => res,
82            None => return Ok(0),
83        };
84        #[allow(clippy::match_single_binding)]
85        let num_items = match self.db.get_database_backend() {
86            _ => result.try_get::<i64>("", "num_items")? as u64,
87        };
88        Ok(num_items)
89    }
90
91    /// Get the total number of pages
92    pub async fn num_pages(&self) -> Result<u64, DbErr> {
93        let num_items = self.num_items().await?;
94        let num_pages = self.compute_pages_number(num_items);
95        Ok(num_pages)
96    }
97
98    /// Get the total number of items and pages
99    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
100        let number_of_items = self.num_items().await?;
101        let number_of_pages = self.compute_pages_number(number_of_items);
102
103        Ok(ItemsAndPagesNumber {
104            number_of_items,
105            number_of_pages,
106        })
107    }
108
109    /// Compute the number of pages for the current page
110    fn compute_pages_number(&self, num_items: u64) -> u64 {
111        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
112    }
113
114    /// Increment the page counter
115    pub fn next(&mut self) {
116        self.page += 1;
117    }
118
119    /// Get current page number
120    pub fn cur_page(&self) -> u64 {
121        self.page
122    }
123
124    /// Fetch one page and increment the page counter
125    ///
126    /// ```
127    /// # use sea_orm::{error::*, tests_cfg::*, *};
128    /// #
129    /// # #[smol_potat::main]
130    /// # #[cfg(feature = "mock")]
131    /// # pub async fn main() -> Result<(), DbErr> {
132    /// #
133    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
134    /// #     .append_query_results([
135    /// #         vec![cake::Model {
136    /// #             id: 1,
137    /// #             name: "Cake".to_owned(),
138    /// #         }],
139    /// #         vec![],
140    /// #     ])
141    /// #     .into_connection();
142    /// # let db = &owned_db;
143    /// #
144    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
145    /// let mut cake_pages = cake::Entity::find()
146    ///     .order_by_asc(cake::Column::Id)
147    ///     .paginate(db, 50);
148    ///
149    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
150    ///     // Do something on cakes: Vec<cake::Model>
151    /// }
152    /// #
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
157        let vec = self.fetch().await?;
158        self.next();
159        let opt = if !vec.is_empty() { Some(vec) } else { None };
160        Ok(opt)
161    }
162
163    /// Convert self into an async stream
164    ///
165    /// ```
166    /// # use sea_orm::{error::*, tests_cfg::*, *};
167    /// #
168    /// # #[smol_potat::main]
169    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
170    /// # pub async fn main() -> Result<(), DbErr> {
171    /// #
172    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
173    /// #     .append_query_results([
174    /// #         vec![cake::Model {
175    /// #             id: 1,
176    /// #             name: "Cake".to_owned(),
177    /// #         }],
178    /// #         vec![],
179    /// #     ])
180    /// #     .into_connection();
181    /// # let db = &owned_db;
182    /// #
183    /// use futures_util::TryStreamExt;
184    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
185    /// let mut cake_stream = cake::Entity::find()
186    ///     .order_by_asc(cake::Column::Id)
187    ///     .paginate(db, 50)
188    ///     .into_stream();
189    ///
190    /// while let Some(cakes) = cake_stream.try_next().await? {
191    ///     // Do something on cakes: Vec<cake::Model>
192    /// }
193    /// #
194    /// # Ok(())
195    /// # }
196    /// # #[cfg(all(feature = "mock", feature = "sync"))]
197    /// # fn main() {}
198    /// ```
199    /// (for SeaORM Sync)
200    /// ```
201    /// # use sea_orm::{error::*, tests_cfg::*, *};
202    /// # #[cfg(all(feature = "mock", feature = "sync"))]
203    /// # fn example() -> Result<(), DbErr> {
204    /// #
205    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
206    /// #     .append_query_results([
207    /// #         vec![cake::Model {
208    /// #             id: 1,
209    /// #             name: "Cake".to_owned(),
210    /// #         }],
211    /// #         vec![],
212    /// #     ])
213    /// #     .into_connection();
214    /// # let db = &owned_db;
215    /// #
216    /// use futures_util::TryStreamExt;
217    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
218    /// let mut cake_stream = cake::Entity::find()
219    ///     .order_by_asc(cake::Column::Id)
220    ///     .paginate(db, 50)
221    ///     .into_stream();
222    ///
223    /// while let Some(cakes) = cake_stream.next() {
224    ///     // Do something on cakes: Vec<cake::Model>
225    /// }
226    /// #
227    /// # Ok(())
228    /// # }
229    /// ```
230    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
231        #[cfg(not(feature = "sync"))]
232        {
233            let mut streamer = self;
234            Box::pin(stream! {
235                while let Some(vec) = streamer.fetch_and_next().await? {
236                    yield Ok(vec);
237                }
238            })
239        }
240        #[cfg(feature = "sync")]
241        {
242            Box::new(PaginatorStream { paginator: self })
243        }
244    }
245}
246
247#[cfg(feature = "sync")]
248#[derive(Debug)]
249/// Stream items by page
250pub struct PaginatorStream<'db, C, S>
251where
252    C: ConnectionTrait,
253    S: SelectorTrait + 'db,
254{
255    paginator: Paginator<'db, C, S>,
256}
257
258#[async_trait::async_trait]
259/// A Trait for any type that can paginate results
260pub trait PaginatorTrait<'db, C>
261where
262    C: ConnectionTrait,
263{
264    /// Select operation
265    type Selector: SelectorTrait + Send + Sync + 'db;
266
267    /// Paginate the result of a select operation.
268    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
269
270    /// Perform a count on the paginated results
271    async fn count(self, db: &'db C) -> Result<u64, DbErr>
272    where
273        Self: Send + Sized,
274    {
275        self.paginate(db, 1).num_items().await
276    }
277}
278
279impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
280where
281    C: ConnectionTrait,
282    S: SelectorTrait + Send + Sync + 'db,
283{
284    type Selector = S;
285
286    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
287        assert!(page_size != 0, "page_size should not be zero");
288        Paginator {
289            query: self.query,
290            page: 0,
291            page_size,
292            db,
293            selector: PhantomData,
294        }
295    }
296}
297
298impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
299where
300    C: ConnectionTrait,
301    S: SelectorTrait + Send + Sync + 'db,
302{
303    type Selector = S;
304    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
305        assert!(page_size != 0, "page_size should not be zero");
306        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
307        let mut query = SelectStatement::new();
308        query.expr(if let Some(values) = self.stmt.values {
309            Expr::cust_with_values(sql, values.0)
310        } else {
311            Expr::cust(sql)
312        });
313
314        Paginator {
315            query,
316            page: 0,
317            page_size,
318            db,
319            selector: PhantomData,
320        }
321    }
322}
323
324impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
325where
326    C: ConnectionTrait,
327    E: EntityTrait<Model = M>,
328    M: FromQueryResult + Sized + Send + Sync + 'db,
329{
330    type Selector = SelectModel<M>;
331
332    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
333        self.into_model().paginate(db, page_size)
334    }
335}
336
337impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
338where
339    C: ConnectionTrait,
340    E: EntityTrait<Model = M>,
341    F: EntityTrait<Model = N>,
342    M: FromQueryResult + Sized + Send + Sync + 'db,
343    N: FromQueryResult + Sized + Send + Sync + 'db,
344{
345    type Selector = SelectTwoModel<M, N>;
346
347    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
348        self.into_model().paginate(db, page_size)
349    }
350}
351
352#[cfg(feature = "sync")]
353impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
354where
355    C: ConnectionTrait,
356    S: SelectorTrait + 'db,
357{
358    type Item = Result<Vec<S::Item>, DbErr>;
359
360    fn next(&mut self) -> Option<Self::Item> {
361        match self.paginator.fetch_and_next() {
362            Ok(Some(vec)) => Some(Ok(vec)),
363            Ok(None) => None,
364            Err(e) => Some(Err(e)),
365        }
366    }
367}
368
369#[cfg(test)]
370#[cfg(feature = "mock")]
371mod tests {
372    use super::*;
373    use crate::entity::prelude::*;
374    #[cfg(feature = "sync")]
375    use crate::util::StreamShim;
376    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
377    use crate::{QueryOrder, QuerySelect};
378    use crate::{Statement, tests_cfg::*};
379    use futures_util::{TryStreamExt, stream::TryNext};
380    use pretty_assertions::assert_eq;
381    use sea_query::{Expr, SelectStatement, Value};
382    use std::sync::LazyLock;
383
384    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
385        Statement::from_sql_and_values(
386            DbBackend::Postgres,
387            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
388            [],
389        )
390    });
391
392    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
393        let page1 = vec![
394            fruit::Model {
395                id: 1,
396                name: "Blueberry".into(),
397                cake_id: Some(1),
398            },
399            fruit::Model {
400                id: 2,
401                name: "Raspberry".into(),
402                cake_id: Some(1),
403            },
404        ];
405
406        let page2 = vec![fruit::Model {
407            id: 3,
408            name: "Strawberry".into(),
409            cake_id: Some(2),
410        }];
411
412        let page3 = Vec::<fruit::Model>::new();
413
414        let db = MockDatabase::new(DbBackend::Postgres)
415            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
416            .into_connection();
417
418        (db, vec![page1, page2, page3])
419    }
420
421    fn setup_num_items() -> (DatabaseConnection, i64) {
422        let num_items = 3;
423        let db = MockDatabase::new(DbBackend::Postgres)
424            .append_query_results([[maplit::btreemap! {
425                "num_items" => Into::<Value>::into(num_items),
426            }]])
427            .into_connection();
428
429        (db, num_items)
430    }
431
432    #[smol_potat::test]
433    async fn fetch_page() -> Result<(), DbErr> {
434        let (db, pages) = setup();
435
436        let paginator = fruit::Entity::find().paginate(&db, 2);
437
438        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
439        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
440        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
441
442        let mut select = SelectStatement::new()
443            .exprs([
444                Expr::col((fruit::Entity, fruit::Column::Id)),
445                Expr::col((fruit::Entity, fruit::Column::Name)),
446                Expr::col((fruit::Entity, fruit::Column::CakeId)),
447            ])
448            .from(fruit::Entity)
449            .to_owned();
450
451        let query_builder = db.get_database_backend();
452        let stmts = [
453            query_builder.build(select.clone().offset(0).limit(2)),
454            query_builder.build(select.clone().offset(2).limit(2)),
455            query_builder.build(select.offset(4).limit(2)),
456        ];
457
458        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
459        Ok(())
460    }
461
462    #[smol_potat::test]
463    async fn fetch_page_raw() -> Result<(), DbErr> {
464        let (db, pages) = setup();
465
466        let paginator = fruit::Entity::find()
467            .from_raw_sql(RAW_STMT.clone())
468            .paginate(&db, 2);
469
470        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
471        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
472        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
473
474        let mut select = SelectStatement::new()
475            .exprs([
476                Expr::col((fruit::Entity, fruit::Column::Id)),
477                Expr::col((fruit::Entity, fruit::Column::Name)),
478                Expr::col((fruit::Entity, fruit::Column::CakeId)),
479            ])
480            .from(fruit::Entity)
481            .to_owned();
482
483        let query_builder = db.get_database_backend();
484        let stmts = [
485            query_builder.build(select.clone().offset(0).limit(2)),
486            query_builder.build(select.clone().offset(2).limit(2)),
487            query_builder.build(select.offset(4).limit(2)),
488        ];
489
490        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
491        Ok(())
492    }
493
494    #[smol_potat::test]
495    async fn fetch() -> Result<(), DbErr> {
496        let (db, pages) = setup();
497
498        let mut paginator = fruit::Entity::find().paginate(&db, 2);
499
500        assert_eq!(paginator.fetch().await?, pages[0].clone());
501        paginator.next();
502
503        assert_eq!(paginator.fetch().await?, pages[1].clone());
504        paginator.next();
505
506        assert_eq!(paginator.fetch().await?, pages[2].clone());
507
508        let mut select = SelectStatement::new()
509            .exprs([
510                Expr::col((fruit::Entity, fruit::Column::Id)),
511                Expr::col((fruit::Entity, fruit::Column::Name)),
512                Expr::col((fruit::Entity, fruit::Column::CakeId)),
513            ])
514            .from(fruit::Entity)
515            .to_owned();
516
517        let query_builder = db.get_database_backend();
518        let stmts = [
519            query_builder.build(select.clone().offset(0).limit(2)),
520            query_builder.build(select.clone().offset(2).limit(2)),
521            query_builder.build(select.offset(4).limit(2)),
522        ];
523
524        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
525        Ok(())
526    }
527
528    #[smol_potat::test]
529    async fn fetch_raw() -> Result<(), DbErr> {
530        let (db, pages) = setup();
531
532        let mut paginator = fruit::Entity::find()
533            .from_raw_sql(RAW_STMT.clone())
534            .paginate(&db, 2);
535
536        assert_eq!(paginator.fetch().await?, pages[0].clone());
537        paginator.next();
538
539        assert_eq!(paginator.fetch().await?, pages[1].clone());
540        paginator.next();
541
542        assert_eq!(paginator.fetch().await?, pages[2].clone());
543
544        let mut select = SelectStatement::new()
545            .exprs([
546                Expr::col((fruit::Entity, fruit::Column::Id)),
547                Expr::col((fruit::Entity, fruit::Column::Name)),
548                Expr::col((fruit::Entity, fruit::Column::CakeId)),
549            ])
550            .from(fruit::Entity)
551            .to_owned();
552
553        let query_builder = db.get_database_backend();
554        let stmts = [
555            query_builder.build(select.clone().offset(0).limit(2)),
556            query_builder.build(select.clone().offset(2).limit(2)),
557            query_builder.build(select.offset(4).limit(2)),
558        ];
559
560        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
561        Ok(())
562    }
563
564    #[smol_potat::test]
565    async fn num_pages() -> Result<(), DbErr> {
566        let (db, num_items) = setup_num_items();
567
568        let num_items = num_items as u64;
569        let page_size = 2_u64;
570        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
571        let paginator = fruit::Entity::find().paginate(&db, page_size);
572
573        assert_eq!(paginator.num_pages().await?, num_pages);
574
575        let sub_query = SelectStatement::new()
576            .exprs([
577                Expr::col((fruit::Entity, fruit::Column::Id)),
578                Expr::col((fruit::Entity, fruit::Column::Name)),
579                Expr::col((fruit::Entity, fruit::Column::CakeId)),
580            ])
581            .from(fruit::Entity)
582            .to_owned();
583
584        let select = SelectStatement::new()
585            .expr(Expr::cust("COUNT(*) AS num_items"))
586            .from_subquery(sub_query, "sub_query")
587            .to_owned();
588
589        let query_builder = db.get_database_backend();
590        let stmts = [query_builder.build(&select)];
591
592        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
593        Ok(())
594    }
595
596    #[smol_potat::test]
597    async fn num_pages_raw() -> Result<(), DbErr> {
598        let (db, num_items) = setup_num_items();
599
600        let num_items = num_items as u64;
601        let page_size = 2_u64;
602        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
603        let paginator = fruit::Entity::find()
604            .from_raw_sql(RAW_STMT.clone())
605            .paginate(&db, page_size);
606
607        assert_eq!(paginator.num_pages().await?, num_pages);
608
609        let sub_query = SelectStatement::new()
610            .exprs([
611                Expr::col((fruit::Entity, fruit::Column::Id)),
612                Expr::col((fruit::Entity, fruit::Column::Name)),
613                Expr::col((fruit::Entity, fruit::Column::CakeId)),
614            ])
615            .from(fruit::Entity)
616            .to_owned();
617
618        let select = SelectStatement::new()
619            .expr(Expr::cust("COUNT(*) AS num_items"))
620            .from_subquery(sub_query, "sub_query")
621            .to_owned();
622
623        let query_builder = db.get_database_backend();
624        let stmts = [query_builder.build(&select)];
625
626        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
627        Ok(())
628    }
629
630    #[smol_potat::test]
631    async fn next_and_cur_page() -> Result<(), DbErr> {
632        let (db, _) = setup();
633
634        let mut paginator = fruit::Entity::find().paginate(&db, 2);
635
636        assert_eq!(paginator.cur_page(), 0);
637        paginator.next();
638
639        assert_eq!(paginator.cur_page(), 1);
640        paginator.next();
641
642        assert_eq!(paginator.cur_page(), 2);
643        Ok(())
644    }
645
646    #[smol_potat::test]
647    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
648        let (db, _) = setup();
649
650        let mut paginator = fruit::Entity::find()
651            .from_raw_sql(RAW_STMT.clone())
652            .paginate(&db, 2);
653
654        assert_eq!(paginator.cur_page(), 0);
655        paginator.next();
656
657        assert_eq!(paginator.cur_page(), 1);
658        paginator.next();
659
660        assert_eq!(paginator.cur_page(), 2);
661        Ok(())
662    }
663
664    #[smol_potat::test]
665    async fn fetch_and_next() -> Result<(), DbErr> {
666        let (db, pages) = setup();
667
668        let mut paginator = fruit::Entity::find().paginate(&db, 2);
669
670        assert_eq!(paginator.cur_page(), 0);
671        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
672
673        assert_eq!(paginator.cur_page(), 1);
674        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
675
676        assert_eq!(paginator.cur_page(), 2);
677        assert_eq!(paginator.fetch_and_next().await?, None);
678
679        let mut select = SelectStatement::new()
680            .exprs([
681                Expr::col((fruit::Entity, fruit::Column::Id)),
682                Expr::col((fruit::Entity, fruit::Column::Name)),
683                Expr::col((fruit::Entity, fruit::Column::CakeId)),
684            ])
685            .from(fruit::Entity)
686            .to_owned();
687
688        let query_builder = db.get_database_backend();
689        let stmts = [
690            query_builder.build(select.clone().offset(0).limit(2)),
691            query_builder.build(select.clone().offset(2).limit(2)),
692            query_builder.build(select.offset(4).limit(2)),
693        ];
694
695        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
696        Ok(())
697    }
698
699    #[smol_potat::test]
700    async fn fetch_and_next_raw() -> Result<(), DbErr> {
701        let (db, pages) = setup();
702
703        let mut paginator = fruit::Entity::find()
704            .from_raw_sql(RAW_STMT.clone())
705            .paginate(&db, 2);
706
707        assert_eq!(paginator.cur_page(), 0);
708        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
709
710        assert_eq!(paginator.cur_page(), 1);
711        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
712
713        assert_eq!(paginator.cur_page(), 2);
714        assert_eq!(paginator.fetch_and_next().await?, None);
715
716        let mut select = SelectStatement::new()
717            .exprs([
718                Expr::col((fruit::Entity, fruit::Column::Id)),
719                Expr::col((fruit::Entity, fruit::Column::Name)),
720                Expr::col((fruit::Entity, fruit::Column::CakeId)),
721            ])
722            .from(fruit::Entity)
723            .to_owned();
724
725        let query_builder = db.get_database_backend();
726        let stmts = [
727            query_builder.build(select.clone().offset(0).limit(2)),
728            query_builder.build(select.clone().offset(2).limit(2)),
729            query_builder.build(select.offset(4).limit(2)),
730        ];
731
732        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
733        Ok(())
734    }
735
736    #[smol_potat::test]
737    async fn into_stream() -> Result<(), DbErr> {
738        let (db, pages) = setup();
739
740        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
741
742        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
743        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
744        assert_eq!(fruit_stream.try_next().await?, None);
745
746        drop(fruit_stream);
747
748        let mut select = SelectStatement::new()
749            .exprs([
750                Expr::col((fruit::Entity, fruit::Column::Id)),
751                Expr::col((fruit::Entity, fruit::Column::Name)),
752                Expr::col((fruit::Entity, fruit::Column::CakeId)),
753            ])
754            .from(fruit::Entity)
755            .to_owned();
756
757        let query_builder = db.get_database_backend();
758        let stmts = [
759            query_builder.build(select.clone().offset(0).limit(2)),
760            query_builder.build(select.clone().offset(2).limit(2)),
761            query_builder.build(select.offset(4).limit(2)),
762        ];
763
764        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
765        Ok(())
766    }
767
768    #[smol_potat::test]
769    async fn into_stream_raw() -> Result<(), DbErr> {
770        let (db, pages) = setup();
771
772        let mut fruit_stream = fruit::Entity::find()
773            .from_raw_sql(RAW_STMT.clone())
774            .paginate(&db, 2)
775            .into_stream();
776
777        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
778        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
779        assert_eq!(fruit_stream.try_next().await?, None);
780
781        drop(fruit_stream);
782
783        let mut select = SelectStatement::new()
784            .exprs([
785                Expr::col((fruit::Entity, fruit::Column::Id)),
786                Expr::col((fruit::Entity, fruit::Column::Name)),
787                Expr::col((fruit::Entity, fruit::Column::CakeId)),
788            ])
789            .from(fruit::Entity)
790            .to_owned();
791
792        let query_builder = db.get_database_backend();
793        let stmts = [
794            query_builder.build(select.clone().offset(0).limit(2)),
795            query_builder.build(select.clone().offset(2).limit(2)),
796            query_builder.build(select.offset(4).limit(2)),
797        ];
798
799        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
800        Ok(())
801    }
802
803    #[smol_potat::test]
804    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
805        let (db, pages) = setup();
806
807        let raw_stmt = Statement::from_sql_and_values(
808            DbBackend::Postgres,
809            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
810            [],
811        );
812
813        let mut fruit_stream = fruit::Entity::find()
814            .from_raw_sql(raw_stmt.clone())
815            .paginate(&db, 2)
816            .into_stream();
817
818        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
819        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
820        assert_eq!(fruit_stream.try_next().await?, None);
821
822        drop(fruit_stream);
823
824        let mut select = SelectStatement::new()
825            .exprs([
826                Expr::col((fruit::Entity, fruit::Column::Id)),
827                Expr::col((fruit::Entity, fruit::Column::Name)),
828                Expr::col((fruit::Entity, fruit::Column::CakeId)),
829            ])
830            .from(fruit::Entity)
831            .to_owned();
832
833        let query_builder = db.get_database_backend();
834        let stmts = [
835            query_builder.build(select.clone().offset(0).limit(2)),
836            query_builder.build(select.clone().offset(2).limit(2)),
837            query_builder.build(select.offset(4).limit(2)),
838        ];
839
840        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
841        Ok(())
842    }
843
844    #[smol_potat::test]
845    #[should_panic]
846    async fn error() {
847        let (db, _pages) = setup();
848
849        fruit::Entity::find().paginate(&db, 0);
850    }
851}