Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo,
3    SelectTwoModel, Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10#[cfg(not(feature = "sync"))]
11type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12#[cfg(feature = "sync")]
13type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
14
15/// Defined a structure to handle pagination of a result from a query operation on a Model
16#[derive(Clone, Debug)]
17pub struct Paginator<'db, C, S>
18where
19    C: ConnectionTrait,
20    S: SelectorTrait + 'db,
21{
22    pub(crate) query: SelectStatement,
23    pub(crate) page: u64,
24    pub(crate) page_size: u64,
25    pub(crate) db: &'db C,
26    pub(crate) selector: PhantomData<S>,
27}
28
29/// Define a structure containing the numbers of items and pages of a Paginator
30#[derive(Clone, Debug)]
31pub struct ItemsAndPagesNumber {
32    /// The total number of items of a paginator
33    pub number_of_items: u64,
34    /// The total number of pages of a paginator
35    pub number_of_pages: u64,
36}
37
38// LINT: warn if paginator is used without an order by clause
39
40impl<'db, C, S> Paginator<'db, C, S>
41where
42    C: ConnectionTrait,
43    S: SelectorTrait + 'db,
44{
45    /// Fetch a specific page; page index starts from zero
46    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
47        let query = self
48            .query
49            .clone()
50            .limit(self.page_size)
51            .offset(self.page_size * page)
52            .to_owned();
53        let rows = self.db.query_all(&query).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            buffer.push(S::from_raw_query_result(row)?);
57        }
58        Ok(buffer)
59    }
60
61    /// Fetch the current page
62    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
63        self.fetch_page(self.page).await
64    }
65
66    /// Get the total number of items
67    pub async fn num_items(&self) -> Result<u64, DbErr> {
68        let db_backend = self.db.get_database_backend();
69        let query = SelectStatement::new()
70            .expr(Expr::cust("COUNT(*) AS num_items"))
71            .from_subquery(
72                self.query
73                    .clone()
74                    .reset_limit()
75                    .reset_offset()
76                    .clear_order_by()
77                    .to_owned(),
78                "sub_query",
79            )
80            .to_owned();
81        let result = match self.db.query_one(&query).await? {
82            Some(res) => res,
83            None => return Ok(0),
84        };
85        let num_items = match db_backend {
86            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
87            _ => result.try_get::<i32>("", "num_items")? as u64,
88        };
89        Ok(num_items)
90    }
91
92    /// Get the total number of pages
93    pub async fn num_pages(&self) -> Result<u64, DbErr> {
94        let num_items = self.num_items().await?;
95        let num_pages = self.compute_pages_number(num_items);
96        Ok(num_pages)
97    }
98
99    /// Get the total number of items and pages
100    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
101        let number_of_items = self.num_items().await?;
102        let number_of_pages = self.compute_pages_number(number_of_items);
103
104        Ok(ItemsAndPagesNumber {
105            number_of_items,
106            number_of_pages,
107        })
108    }
109
110    /// Compute the number of pages for the current page
111    fn compute_pages_number(&self, num_items: u64) -> u64 {
112        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
113    }
114
115    /// Increment the page counter
116    pub fn next(&mut self) {
117        self.page += 1;
118    }
119
120    /// Get current page number
121    pub fn cur_page(&self) -> u64 {
122        self.page
123    }
124
125    /// Fetch one page and increment the page counter
126    ///
127    /// ```
128    /// # use sea_orm::{error::*, tests_cfg::*, *};
129    /// #
130    /// # #[smol_potat::main]
131    /// # #[cfg(feature = "mock")]
132    /// # pub async fn main() -> Result<(), DbErr> {
133    /// #
134    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
135    /// #     .append_query_results([
136    /// #         vec![cake::Model {
137    /// #             id: 1,
138    /// #             name: "Cake".to_owned(),
139    /// #         }],
140    /// #         vec![],
141    /// #     ])
142    /// #     .into_connection();
143    /// # let db = &owned_db;
144    /// #
145    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
146    /// let mut cake_pages = cake::Entity::find()
147    ///     .order_by_asc(cake::Column::Id)
148    ///     .paginate(db, 50);
149    ///
150    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
151    ///     // Do something on cakes: Vec<cake::Model>
152    /// }
153    /// #
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
158        let vec = self.fetch().await?;
159        self.next();
160        let opt = if !vec.is_empty() { Some(vec) } else { None };
161        Ok(opt)
162    }
163
164    /// Convert self into an async stream
165    ///
166    /// ```
167    /// # use sea_orm::{error::*, tests_cfg::*, *};
168    /// #
169    /// # #[smol_potat::main]
170    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
171    /// # pub async fn main() -> Result<(), DbErr> {
172    /// #
173    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
174    /// #     .append_query_results([
175    /// #         vec![cake::Model {
176    /// #             id: 1,
177    /// #             name: "Cake".to_owned(),
178    /// #         }],
179    /// #         vec![],
180    /// #     ])
181    /// #     .into_connection();
182    /// # let db = &owned_db;
183    /// #
184    /// use futures_util::TryStreamExt;
185    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
186    /// let mut cake_stream = cake::Entity::find()
187    ///     .order_by_asc(cake::Column::Id)
188    ///     .paginate(db, 50)
189    ///     .into_stream();
190    ///
191    /// while let Some(cakes) = cake_stream.try_next().await? {
192    ///     // Do something on cakes: Vec<cake::Model>
193    /// }
194    /// #
195    /// # Ok(())
196    /// # }
197    /// # #[cfg(all(feature = "mock", feature = "sync"))]
198    /// # fn main() {}
199    /// ```
200    /// (for SeaORM Sync)
201    /// ```
202    /// # use sea_orm::{error::*, tests_cfg::*, *};
203    /// # #[cfg(all(feature = "mock", feature = "sync"))]
204    /// # fn example() -> Result<(), DbErr> {
205    /// #
206    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
207    /// #     .append_query_results([
208    /// #         vec![cake::Model {
209    /// #             id: 1,
210    /// #             name: "Cake".to_owned(),
211    /// #         }],
212    /// #         vec![],
213    /// #     ])
214    /// #     .into_connection();
215    /// # let db = &owned_db;
216    /// #
217    /// use futures_util::TryStreamExt;
218    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
219    /// let mut cake_stream = cake::Entity::find()
220    ///     .order_by_asc(cake::Column::Id)
221    ///     .paginate(db, 50)
222    ///     .into_stream();
223    ///
224    /// while let Some(cakes) = cake_stream.next() {
225    ///     // Do something on cakes: Vec<cake::Model>
226    /// }
227    /// #
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
232        #[cfg(not(feature = "sync"))]
233        {
234            let mut streamer = self;
235            Box::pin(stream! {
236                while let Some(vec) = streamer.fetch_and_next().await? {
237                    yield Ok(vec);
238                }
239            })
240        }
241        #[cfg(feature = "sync")]
242        {
243            Box::new(PaginatorStream { paginator: self })
244        }
245    }
246}
247
248#[cfg(feature = "sync")]
249#[derive(Debug)]
250/// Stream items by page
251pub struct PaginatorStream<'db, C, S>
252where
253    C: ConnectionTrait,
254    S: SelectorTrait + 'db,
255{
256    paginator: Paginator<'db, C, S>,
257}
258
259#[async_trait::async_trait]
260/// A Trait for any type that can paginate results
261pub trait PaginatorTrait<'db, C>
262where
263    C: ConnectionTrait,
264{
265    /// Select operation
266    type Selector: SelectorTrait + Send + Sync + 'db;
267
268    /// Paginate the result of a select operation.
269    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
270
271    /// Perform a count on the paginated results
272    async fn count(self, db: &'db C) -> Result<u64, DbErr>
273    where
274        Self: Send + Sized,
275    {
276        self.paginate(db, 1).num_items().await
277    }
278}
279
280impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
281where
282    C: ConnectionTrait,
283    S: SelectorTrait + Send + Sync + 'db,
284{
285    type Selector = S;
286
287    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
288        assert!(page_size != 0, "page_size should not be zero");
289        Paginator {
290            query: self.query,
291            page: 0,
292            page_size,
293            db,
294            selector: PhantomData,
295        }
296    }
297}
298
299impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
300where
301    C: ConnectionTrait,
302    S: SelectorTrait + Send + Sync + 'db,
303{
304    type Selector = S;
305    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
306        assert!(page_size != 0, "page_size should not be zero");
307        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
308        let mut query = SelectStatement::new();
309        query.expr(if let Some(values) = self.stmt.values {
310            Expr::cust_with_values(sql, values.0)
311        } else {
312            Expr::cust(sql)
313        });
314
315        Paginator {
316            query,
317            page: 0,
318            page_size,
319            db,
320            selector: PhantomData,
321        }
322    }
323}
324
325impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
326where
327    C: ConnectionTrait,
328    E: EntityTrait<Model = M>,
329    M: FromQueryResult + Sized + Send + Sync + 'db,
330{
331    type Selector = SelectModel<M>;
332
333    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
334        self.into_model().paginate(db, page_size)
335    }
336}
337
338impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
339where
340    C: ConnectionTrait,
341    E: EntityTrait<Model = M>,
342    F: EntityTrait<Model = N>,
343    M: FromQueryResult + Sized + Send + Sync + 'db,
344    N: FromQueryResult + Sized + Send + Sync + 'db,
345{
346    type Selector = SelectTwoModel<M, N>;
347
348    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
349        self.into_model().paginate(db, page_size)
350    }
351}
352
353#[cfg(feature = "sync")]
354impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
355where
356    C: ConnectionTrait,
357    S: SelectorTrait + 'db,
358{
359    type Item = Result<Vec<S::Item>, DbErr>;
360
361    fn next(&mut self) -> Option<Self::Item> {
362        match self.paginator.fetch_and_next() {
363            Ok(Some(vec)) => Some(Ok(vec)),
364            Ok(None) => None,
365            Err(e) => Some(Err(e)),
366        }
367    }
368}
369
370#[cfg(test)]
371#[cfg(feature = "mock")]
372mod tests {
373    use super::*;
374    use crate::entity::prelude::*;
375    #[cfg(feature = "sync")]
376    use crate::util::StreamShim;
377    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
378    use crate::{QueryOrder, QuerySelect};
379    use crate::{Statement, tests_cfg::*};
380    use futures_util::{TryStreamExt, stream::TryNext};
381    use pretty_assertions::assert_eq;
382    use sea_query::{Expr, SelectStatement, Value};
383    use std::sync::LazyLock;
384
385    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
386        Statement::from_sql_and_values(
387            DbBackend::Postgres,
388            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
389            [],
390        )
391    });
392
393    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
394        let page1 = vec![
395            fruit::Model {
396                id: 1,
397                name: "Blueberry".into(),
398                cake_id: Some(1),
399            },
400            fruit::Model {
401                id: 2,
402                name: "Raspberry".into(),
403                cake_id: Some(1),
404            },
405        ];
406
407        let page2 = vec![fruit::Model {
408            id: 3,
409            name: "Strawberry".into(),
410            cake_id: Some(2),
411        }];
412
413        let page3 = Vec::<fruit::Model>::new();
414
415        let db = MockDatabase::new(DbBackend::Postgres)
416            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
417            .into_connection();
418
419        (db, vec![page1, page2, page3])
420    }
421
422    fn setup_num_items() -> (DatabaseConnection, i64) {
423        let num_items = 3;
424        let db = MockDatabase::new(DbBackend::Postgres)
425            .append_query_results([[maplit::btreemap! {
426                "num_items" => Into::<Value>::into(num_items),
427            }]])
428            .into_connection();
429
430        (db, num_items)
431    }
432
433    #[smol_potat::test]
434    async fn fetch_page() -> Result<(), DbErr> {
435        let (db, pages) = setup();
436
437        let paginator = fruit::Entity::find().paginate(&db, 2);
438
439        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
440        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
441        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
442
443        let mut select = SelectStatement::new()
444            .exprs([
445                Expr::col((fruit::Entity, fruit::Column::Id)),
446                Expr::col((fruit::Entity, fruit::Column::Name)),
447                Expr::col((fruit::Entity, fruit::Column::CakeId)),
448            ])
449            .from(fruit::Entity)
450            .to_owned();
451
452        let query_builder = db.get_database_backend();
453        let stmts = [
454            query_builder.build(select.clone().offset(0).limit(2)),
455            query_builder.build(select.clone().offset(2).limit(2)),
456            query_builder.build(select.offset(4).limit(2)),
457        ];
458
459        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
460        Ok(())
461    }
462
463    #[smol_potat::test]
464    async fn fetch_page_raw() -> Result<(), DbErr> {
465        let (db, pages) = setup();
466
467        let paginator = fruit::Entity::find()
468            .from_raw_sql(RAW_STMT.clone())
469            .paginate(&db, 2);
470
471        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
472        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
473        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
474
475        let mut select = SelectStatement::new()
476            .exprs([
477                Expr::col((fruit::Entity, fruit::Column::Id)),
478                Expr::col((fruit::Entity, fruit::Column::Name)),
479                Expr::col((fruit::Entity, fruit::Column::CakeId)),
480            ])
481            .from(fruit::Entity)
482            .to_owned();
483
484        let query_builder = db.get_database_backend();
485        let stmts = [
486            query_builder.build(select.clone().offset(0).limit(2)),
487            query_builder.build(select.clone().offset(2).limit(2)),
488            query_builder.build(select.offset(4).limit(2)),
489        ];
490
491        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
492        Ok(())
493    }
494
495    #[smol_potat::test]
496    async fn fetch() -> Result<(), DbErr> {
497        let (db, pages) = setup();
498
499        let mut paginator = fruit::Entity::find().paginate(&db, 2);
500
501        assert_eq!(paginator.fetch().await?, pages[0].clone());
502        paginator.next();
503
504        assert_eq!(paginator.fetch().await?, pages[1].clone());
505        paginator.next();
506
507        assert_eq!(paginator.fetch().await?, pages[2].clone());
508
509        let mut select = SelectStatement::new()
510            .exprs([
511                Expr::col((fruit::Entity, fruit::Column::Id)),
512                Expr::col((fruit::Entity, fruit::Column::Name)),
513                Expr::col((fruit::Entity, fruit::Column::CakeId)),
514            ])
515            .from(fruit::Entity)
516            .to_owned();
517
518        let query_builder = db.get_database_backend();
519        let stmts = [
520            query_builder.build(select.clone().offset(0).limit(2)),
521            query_builder.build(select.clone().offset(2).limit(2)),
522            query_builder.build(select.offset(4).limit(2)),
523        ];
524
525        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
526        Ok(())
527    }
528
529    #[smol_potat::test]
530    async fn fetch_raw() -> Result<(), DbErr> {
531        let (db, pages) = setup();
532
533        let mut paginator = fruit::Entity::find()
534            .from_raw_sql(RAW_STMT.clone())
535            .paginate(&db, 2);
536
537        assert_eq!(paginator.fetch().await?, pages[0].clone());
538        paginator.next();
539
540        assert_eq!(paginator.fetch().await?, pages[1].clone());
541        paginator.next();
542
543        assert_eq!(paginator.fetch().await?, pages[2].clone());
544
545        let mut select = SelectStatement::new()
546            .exprs([
547                Expr::col((fruit::Entity, fruit::Column::Id)),
548                Expr::col((fruit::Entity, fruit::Column::Name)),
549                Expr::col((fruit::Entity, fruit::Column::CakeId)),
550            ])
551            .from(fruit::Entity)
552            .to_owned();
553
554        let query_builder = db.get_database_backend();
555        let stmts = [
556            query_builder.build(select.clone().offset(0).limit(2)),
557            query_builder.build(select.clone().offset(2).limit(2)),
558            query_builder.build(select.offset(4).limit(2)),
559        ];
560
561        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
562        Ok(())
563    }
564
565    #[smol_potat::test]
566    async fn num_pages() -> Result<(), DbErr> {
567        let (db, num_items) = setup_num_items();
568
569        let num_items = num_items as u64;
570        let page_size = 2_u64;
571        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
572        let paginator = fruit::Entity::find().paginate(&db, page_size);
573
574        assert_eq!(paginator.num_pages().await?, num_pages);
575
576        let sub_query = SelectStatement::new()
577            .exprs([
578                Expr::col((fruit::Entity, fruit::Column::Id)),
579                Expr::col((fruit::Entity, fruit::Column::Name)),
580                Expr::col((fruit::Entity, fruit::Column::CakeId)),
581            ])
582            .from(fruit::Entity)
583            .to_owned();
584
585        let select = SelectStatement::new()
586            .expr(Expr::cust("COUNT(*) AS num_items"))
587            .from_subquery(sub_query, "sub_query")
588            .to_owned();
589
590        let query_builder = db.get_database_backend();
591        let stmts = [query_builder.build(&select)];
592
593        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
594        Ok(())
595    }
596
597    #[smol_potat::test]
598    async fn num_pages_raw() -> Result<(), DbErr> {
599        let (db, num_items) = setup_num_items();
600
601        let num_items = num_items as u64;
602        let page_size = 2_u64;
603        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
604        let paginator = fruit::Entity::find()
605            .from_raw_sql(RAW_STMT.clone())
606            .paginate(&db, page_size);
607
608        assert_eq!(paginator.num_pages().await?, num_pages);
609
610        let sub_query = SelectStatement::new()
611            .exprs([
612                Expr::col((fruit::Entity, fruit::Column::Id)),
613                Expr::col((fruit::Entity, fruit::Column::Name)),
614                Expr::col((fruit::Entity, fruit::Column::CakeId)),
615            ])
616            .from(fruit::Entity)
617            .to_owned();
618
619        let select = SelectStatement::new()
620            .expr(Expr::cust("COUNT(*) AS num_items"))
621            .from_subquery(sub_query, "sub_query")
622            .to_owned();
623
624        let query_builder = db.get_database_backend();
625        let stmts = [query_builder.build(&select)];
626
627        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
628        Ok(())
629    }
630
631    #[smol_potat::test]
632    async fn next_and_cur_page() -> Result<(), DbErr> {
633        let (db, _) = setup();
634
635        let mut paginator = fruit::Entity::find().paginate(&db, 2);
636
637        assert_eq!(paginator.cur_page(), 0);
638        paginator.next();
639
640        assert_eq!(paginator.cur_page(), 1);
641        paginator.next();
642
643        assert_eq!(paginator.cur_page(), 2);
644        Ok(())
645    }
646
647    #[smol_potat::test]
648    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
649        let (db, _) = setup();
650
651        let mut paginator = fruit::Entity::find()
652            .from_raw_sql(RAW_STMT.clone())
653            .paginate(&db, 2);
654
655        assert_eq!(paginator.cur_page(), 0);
656        paginator.next();
657
658        assert_eq!(paginator.cur_page(), 1);
659        paginator.next();
660
661        assert_eq!(paginator.cur_page(), 2);
662        Ok(())
663    }
664
665    #[smol_potat::test]
666    async fn fetch_and_next() -> Result<(), DbErr> {
667        let (db, pages) = setup();
668
669        let mut paginator = fruit::Entity::find().paginate(&db, 2);
670
671        assert_eq!(paginator.cur_page(), 0);
672        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
673
674        assert_eq!(paginator.cur_page(), 1);
675        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
676
677        assert_eq!(paginator.cur_page(), 2);
678        assert_eq!(paginator.fetch_and_next().await?, None);
679
680        let mut select = SelectStatement::new()
681            .exprs([
682                Expr::col((fruit::Entity, fruit::Column::Id)),
683                Expr::col((fruit::Entity, fruit::Column::Name)),
684                Expr::col((fruit::Entity, fruit::Column::CakeId)),
685            ])
686            .from(fruit::Entity)
687            .to_owned();
688
689        let query_builder = db.get_database_backend();
690        let stmts = [
691            query_builder.build(select.clone().offset(0).limit(2)),
692            query_builder.build(select.clone().offset(2).limit(2)),
693            query_builder.build(select.offset(4).limit(2)),
694        ];
695
696        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
697        Ok(())
698    }
699
700    #[smol_potat::test]
701    async fn fetch_and_next_raw() -> Result<(), DbErr> {
702        let (db, pages) = setup();
703
704        let mut paginator = fruit::Entity::find()
705            .from_raw_sql(RAW_STMT.clone())
706            .paginate(&db, 2);
707
708        assert_eq!(paginator.cur_page(), 0);
709        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
710
711        assert_eq!(paginator.cur_page(), 1);
712        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
713
714        assert_eq!(paginator.cur_page(), 2);
715        assert_eq!(paginator.fetch_and_next().await?, None);
716
717        let mut select = SelectStatement::new()
718            .exprs([
719                Expr::col((fruit::Entity, fruit::Column::Id)),
720                Expr::col((fruit::Entity, fruit::Column::Name)),
721                Expr::col((fruit::Entity, fruit::Column::CakeId)),
722            ])
723            .from(fruit::Entity)
724            .to_owned();
725
726        let query_builder = db.get_database_backend();
727        let stmts = [
728            query_builder.build(select.clone().offset(0).limit(2)),
729            query_builder.build(select.clone().offset(2).limit(2)),
730            query_builder.build(select.offset(4).limit(2)),
731        ];
732
733        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
734        Ok(())
735    }
736
737    #[smol_potat::test]
738    async fn into_stream() -> Result<(), DbErr> {
739        let (db, pages) = setup();
740
741        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
742
743        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
744        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
745        assert_eq!(fruit_stream.try_next().await?, None);
746
747        drop(fruit_stream);
748
749        let mut select = SelectStatement::new()
750            .exprs([
751                Expr::col((fruit::Entity, fruit::Column::Id)),
752                Expr::col((fruit::Entity, fruit::Column::Name)),
753                Expr::col((fruit::Entity, fruit::Column::CakeId)),
754            ])
755            .from(fruit::Entity)
756            .to_owned();
757
758        let query_builder = db.get_database_backend();
759        let stmts = [
760            query_builder.build(select.clone().offset(0).limit(2)),
761            query_builder.build(select.clone().offset(2).limit(2)),
762            query_builder.build(select.offset(4).limit(2)),
763        ];
764
765        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
766        Ok(())
767    }
768
769    #[smol_potat::test]
770    async fn into_stream_raw() -> Result<(), DbErr> {
771        let (db, pages) = setup();
772
773        let mut fruit_stream = fruit::Entity::find()
774            .from_raw_sql(RAW_STMT.clone())
775            .paginate(&db, 2)
776            .into_stream();
777
778        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
779        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
780        assert_eq!(fruit_stream.try_next().await?, None);
781
782        drop(fruit_stream);
783
784        let mut select = SelectStatement::new()
785            .exprs([
786                Expr::col((fruit::Entity, fruit::Column::Id)),
787                Expr::col((fruit::Entity, fruit::Column::Name)),
788                Expr::col((fruit::Entity, fruit::Column::CakeId)),
789            ])
790            .from(fruit::Entity)
791            .to_owned();
792
793        let query_builder = db.get_database_backend();
794        let stmts = [
795            query_builder.build(select.clone().offset(0).limit(2)),
796            query_builder.build(select.clone().offset(2).limit(2)),
797            query_builder.build(select.offset(4).limit(2)),
798        ];
799
800        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
801        Ok(())
802    }
803
804    #[smol_potat::test]
805    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
806        let (db, pages) = setup();
807
808        let raw_stmt = Statement::from_sql_and_values(
809            DbBackend::Postgres,
810            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
811            [],
812        );
813
814        let mut fruit_stream = fruit::Entity::find()
815            .from_raw_sql(raw_stmt.clone())
816            .paginate(&db, 2)
817            .into_stream();
818
819        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
820        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
821        assert_eq!(fruit_stream.try_next().await?, None);
822
823        drop(fruit_stream);
824
825        let mut select = SelectStatement::new()
826            .exprs([
827                Expr::col((fruit::Entity, fruit::Column::Id)),
828                Expr::col((fruit::Entity, fruit::Column::Name)),
829                Expr::col((fruit::Entity, fruit::Column::CakeId)),
830            ])
831            .from(fruit::Entity)
832            .to_owned();
833
834        let query_builder = db.get_database_backend();
835        let stmts = [
836            query_builder.build(select.clone().offset(0).limit(2)),
837            query_builder.build(select.clone().offset(2).limit(2)),
838            query_builder.build(select.offset(4).limit(2)),
839        ];
840
841        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
842        Ok(())
843    }
844
845    #[smol_potat::test]
846    #[should_panic]
847    async fn error() {
848        let (db, _pages) = setup();
849
850        fruit::Entity::find().paginate(&db, 0);
851    }
852}