sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo,
3    SelectTwoModel, Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use sea_query::{Expr, SelectStatement};
6use std::marker::PhantomData;
7
8#[cfg(not(feature = "sync"))]
9type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
10#[cfg(feature = "sync")]
11type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let rows = self.db.query_all(&query)?;
52        let mut buffer = Vec::with_capacity(rows.len());
53        for row in rows.into_iter() {
54            buffer.push(S::from_raw_query_result(row)?);
55        }
56        Ok(buffer)
57    }
58
59    /// Fetch the current page
60    pub fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
61        self.fetch_page(self.page)
62    }
63
64    /// Get the total number of items
65    pub fn num_items(&self) -> Result<u64, DbErr> {
66        let db_backend = self.db.get_database_backend();
67        let query = SelectStatement::new()
68            .expr(Expr::cust("COUNT(*) AS num_items"))
69            .from_subquery(
70                self.query
71                    .clone()
72                    .reset_limit()
73                    .reset_offset()
74                    .clear_order_by()
75                    .to_owned(),
76                "sub_query",
77            )
78            .to_owned();
79        let result = match self.db.query_one(&query)? {
80            Some(res) => res,
81            None => return Ok(0),
82        };
83        let num_items = match db_backend {
84            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
85            _ => result.try_get::<i32>("", "num_items")? as u64,
86        };
87        Ok(num_items)
88    }
89
90    /// Get the total number of pages
91    pub fn num_pages(&self) -> Result<u64, DbErr> {
92        let num_items = self.num_items()?;
93        let num_pages = self.compute_pages_number(num_items);
94        Ok(num_pages)
95    }
96
97    /// Get the total number of items and pages
98    pub fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
99        let number_of_items = self.num_items()?;
100        let number_of_pages = self.compute_pages_number(number_of_items);
101
102        Ok(ItemsAndPagesNumber {
103            number_of_items,
104            number_of_pages,
105        })
106    }
107
108    /// Compute the number of pages for the current page
109    fn compute_pages_number(&self, num_items: u64) -> u64 {
110        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
111    }
112
113    /// Increment the page counter
114    pub fn next(&mut self) {
115        self.page += 1;
116    }
117
118    /// Get current page number
119    pub fn cur_page(&self) -> u64 {
120        self.page
121    }
122
123    /// Fetch one page and increment the page counter
124    ///
125    /// ```
126    /// # use sea_orm::{error::*, tests_cfg::*, *};
127    /// #
128    /// # #[cfg(feature = "mock")]
129    /// # pub fn main() -> Result<(), DbErr> {
130    /// #
131    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
132    /// #     .append_query_results([
133    /// #         vec![cake::Model {
134    /// #             id: 1,
135    /// #             name: "Cake".to_owned(),
136    /// #         }],
137    /// #         vec![],
138    /// #     ])
139    /// #     .into_connection();
140    /// # let db = &owned_db;
141    /// #
142    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
143    /// let mut cake_pages = cake::Entity::find()
144    ///     .order_by_asc(cake::Column::Id)
145    ///     .paginate(db, 50);
146    ///
147    /// while let Some(cakes) = cake_pages.fetch_and_next()? {
148    ///     // Do something on cakes: Vec<cake::Model>
149    /// }
150    /// #
151    /// # Ok(())
152    /// # }
153    /// ```
154    pub fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
155        let vec = self.fetch()?;
156        self.next();
157        let opt = if !vec.is_empty() { Some(vec) } else { None };
158        Ok(opt)
159    }
160
161    /// Convert self into an stream
162    ///
163    /// ```
164    /// # use sea_orm::{error::*, tests_cfg::*, *};
165    /// #
166    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
167    /// # pub fn main() -> Result<(), DbErr> {
168    /// #
169    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
170    /// #     .append_query_results([
171    /// #         vec![cake::Model {
172    /// #             id: 1,
173    /// #             name: "Cake".to_owned(),
174    /// #         }],
175    /// #         vec![],
176    /// #     ])
177    /// #     .into_connection();
178    /// # let db = &owned_db;
179    /// #
180    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
181    /// let mut cake_stream = cake::Entity::find()
182    ///     .order_by_asc(cake::Column::Id)
183    ///     .paginate(db, 50)
184    ///     .into_stream();
185    ///
186    /// while let Some(cakes) = cake_stream.try_next()? {
187    ///     // Do something on cakes: Vec<cake::Model>
188    /// }
189    /// #
190    /// # Ok(())
191    /// # }
192    /// # #[cfg(all(feature = "mock", feature = "sync"))]
193    /// # fn main() {}
194    /// ```
195    /// (for SeaORM Sync)
196    /// ```
197    /// # use sea_orm::{error::*, tests_cfg::*, *};
198    /// # #[cfg(all(feature = "mock", feature = "sync"))]
199    /// # fn example() -> Result<(), DbErr> {
200    /// #
201    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
202    /// #     .append_query_results([
203    /// #         vec![cake::Model {
204    /// #             id: 1,
205    /// #             name: "Cake".to_owned(),
206    /// #         }],
207    /// #         vec![],
208    /// #     ])
209    /// #     .into_connection();
210    /// # let db = &owned_db;
211    /// #
212    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
213    /// let mut cake_stream = cake::Entity::find()
214    ///     .order_by_asc(cake::Column::Id)
215    ///     .paginate(db, 50)
216    ///     .into_stream();
217    ///
218    /// while let Some(cakes) = cake_stream.next() {
219    ///     // Do something on cakes: Vec<cake::Model>
220    /// }
221    /// #
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
226        #[cfg(not(feature = "sync"))]
227        {
228            let mut streamer = self;
229            Box::new(stream! {
230                while let Some(vec) = streamer.fetch_and_next()? {
231                    yield Ok(vec);
232                }
233            })
234        }
235        #[cfg(feature = "sync")]
236        {
237            Box::new(PaginatorStream { paginator: self })
238        }
239    }
240}
241
242#[cfg(feature = "sync")]
243#[derive(Debug)]
244/// Stream items by page
245pub struct PaginatorStream<'db, C, S>
246where
247    C: ConnectionTrait,
248    S: SelectorTrait + 'db,
249{
250    paginator: Paginator<'db, C, S>,
251}
252
253/// A Trait for any type that can paginate results
254pub trait PaginatorTrait<'db, C>
255where
256    C: ConnectionTrait,
257{
258    /// Select operation
259    type Selector: SelectorTrait + 'db;
260
261    /// Paginate the result of a select operation.
262    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
263
264    /// Perform a count on the paginated results
265    fn count(self, db: &'db C) -> Result<u64, DbErr>
266    where
267        Self: Sized,
268    {
269        self.paginate(db, 1).num_items()
270    }
271
272    /// Check if any records exist
273    fn exists(self, db: &'db C) -> Result<bool, DbErr>
274    where
275        Self: Sized,
276    {
277        let paginator = self.paginate(db, 1);
278        let stmt = SelectStatement::new()
279            .expr(Expr::cust("1"))
280            .from_subquery(
281                paginator
282                    .query
283                    .clone()
284                    .reset_limit()
285                    .reset_offset()
286                    .clear_order_by()
287                    .limit(1)
288                    .to_owned(),
289                "sub_query",
290            )
291            .limit(1)
292            .to_owned();
293        let result = db.query_one(&stmt)?;
294        Ok(result.is_some())
295    }
296}
297
298impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
299where
300    C: ConnectionTrait,
301    S: SelectorTrait + 'db,
302{
303    type Selector = S;
304
305    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
306        assert!(page_size != 0, "page_size should not be zero");
307        Paginator {
308            query: self.query,
309            page: 0,
310            page_size,
311            db,
312            selector: PhantomData,
313        }
314    }
315}
316
317impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
318where
319    C: ConnectionTrait,
320    S: SelectorTrait + 'db,
321{
322    type Selector = S;
323    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
324        assert!(page_size != 0, "page_size should not be zero");
325        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
326        let mut query = SelectStatement::new();
327        query.expr(if let Some(values) = self.stmt.values {
328            Expr::cust_with_values(sql, values.0)
329        } else {
330            Expr::cust(sql)
331        });
332
333        Paginator {
334            query,
335            page: 0,
336            page_size,
337            db,
338            selector: PhantomData,
339        }
340    }
341}
342
343impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
344where
345    C: ConnectionTrait,
346    E: EntityTrait<Model = M>,
347    M: FromQueryResult + Sized + 'db,
348{
349    type Selector = SelectModel<M>;
350
351    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
352        self.into_model().paginate(db, page_size)
353    }
354}
355
356impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
357where
358    C: ConnectionTrait,
359    E: EntityTrait<Model = M>,
360    F: EntityTrait<Model = N>,
361    M: FromQueryResult + Sized + 'db,
362    N: FromQueryResult + Sized + 'db,
363{
364    type Selector = SelectTwoModel<M, N>;
365
366    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
367        self.into_model().paginate(db, page_size)
368    }
369}
370
371#[cfg(feature = "sync")]
372impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
373where
374    C: ConnectionTrait,
375    S: SelectorTrait + 'db,
376{
377    type Item = Result<Vec<S::Item>, DbErr>;
378
379    fn next(&mut self) -> Option<Self::Item> {
380        match self.paginator.fetch_and_next() {
381            Ok(Some(vec)) => Some(Ok(vec)),
382            Ok(None) => None,
383            Err(e) => Some(Err(e)),
384        }
385    }
386}
387
388#[cfg(test)]
389#[cfg(feature = "mock")]
390mod tests {
391    use super::*;
392    use crate::entity::prelude::*;
393    #[cfg(feature = "sync")]
394    use crate::util::StreamShim;
395    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
396    use crate::{Statement, tests_cfg::*};
397    use pretty_assertions::assert_eq;
398    use sea_query::{Expr, SelectStatement, Value};
399    use std::sync::LazyLock;
400
401    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
402        Statement::from_sql_and_values(
403            DbBackend::Postgres,
404            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
405            [],
406        )
407    });
408
409    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
410        let page1 = vec![
411            fruit::Model {
412                id: 1,
413                name: "Blueberry".into(),
414                cake_id: Some(1),
415            },
416            fruit::Model {
417                id: 2,
418                name: "Raspberry".into(),
419                cake_id: Some(1),
420            },
421        ];
422
423        let page2 = vec![fruit::Model {
424            id: 3,
425            name: "Strawberry".into(),
426            cake_id: Some(2),
427        }];
428
429        let page3 = Vec::<fruit::Model>::new();
430
431        let db = MockDatabase::new(DbBackend::Postgres)
432            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
433            .into_connection();
434
435        (db, vec![page1, page2, page3])
436    }
437
438    fn setup_num_items() -> (DatabaseConnection, i64) {
439        let num_items = 3;
440        let db = MockDatabase::new(DbBackend::Postgres)
441            .append_query_results([[maplit::btreemap! {
442                "num_items" => Into::<Value>::into(num_items),
443            }]])
444            .into_connection();
445
446        (db, num_items)
447    }
448
449    #[test]
450    fn fetch_page() -> Result<(), DbErr> {
451        let (db, pages) = setup();
452
453        let paginator = fruit::Entity::find().paginate(&db, 2);
454
455        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
456        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
457        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
458
459        let mut select = SelectStatement::new()
460            .exprs([
461                Expr::col((fruit::Entity, fruit::Column::Id)),
462                Expr::col((fruit::Entity, fruit::Column::Name)),
463                Expr::col((fruit::Entity, fruit::Column::CakeId)),
464            ])
465            .from(fruit::Entity)
466            .to_owned();
467
468        let query_builder = db.get_database_backend();
469        let stmts = [
470            query_builder.build(select.clone().offset(0).limit(2)),
471            query_builder.build(select.clone().offset(2).limit(2)),
472            query_builder.build(select.offset(4).limit(2)),
473        ];
474
475        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
476        Ok(())
477    }
478
479    #[test]
480    fn fetch_page_raw() -> Result<(), DbErr> {
481        let (db, pages) = setup();
482
483        let paginator = fruit::Entity::find()
484            .from_raw_sql(RAW_STMT.clone())
485            .paginate(&db, 2);
486
487        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
488        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
489        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
490
491        let mut select = SelectStatement::new()
492            .exprs([
493                Expr::col((fruit::Entity, fruit::Column::Id)),
494                Expr::col((fruit::Entity, fruit::Column::Name)),
495                Expr::col((fruit::Entity, fruit::Column::CakeId)),
496            ])
497            .from(fruit::Entity)
498            .to_owned();
499
500        let query_builder = db.get_database_backend();
501        let stmts = [
502            query_builder.build(select.clone().offset(0).limit(2)),
503            query_builder.build(select.clone().offset(2).limit(2)),
504            query_builder.build(select.offset(4).limit(2)),
505        ];
506
507        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
508        Ok(())
509    }
510
511    #[test]
512    fn fetch() -> Result<(), DbErr> {
513        let (db, pages) = setup();
514
515        let mut paginator = fruit::Entity::find().paginate(&db, 2);
516
517        assert_eq!(paginator.fetch()?, pages[0].clone());
518        paginator.next();
519
520        assert_eq!(paginator.fetch()?, pages[1].clone());
521        paginator.next();
522
523        assert_eq!(paginator.fetch()?, pages[2].clone());
524
525        let mut select = SelectStatement::new()
526            .exprs([
527                Expr::col((fruit::Entity, fruit::Column::Id)),
528                Expr::col((fruit::Entity, fruit::Column::Name)),
529                Expr::col((fruit::Entity, fruit::Column::CakeId)),
530            ])
531            .from(fruit::Entity)
532            .to_owned();
533
534        let query_builder = db.get_database_backend();
535        let stmts = [
536            query_builder.build(select.clone().offset(0).limit(2)),
537            query_builder.build(select.clone().offset(2).limit(2)),
538            query_builder.build(select.offset(4).limit(2)),
539        ];
540
541        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
542        Ok(())
543    }
544
545    #[test]
546    fn fetch_raw() -> Result<(), DbErr> {
547        let (db, pages) = setup();
548
549        let mut paginator = fruit::Entity::find()
550            .from_raw_sql(RAW_STMT.clone())
551            .paginate(&db, 2);
552
553        assert_eq!(paginator.fetch()?, pages[0].clone());
554        paginator.next();
555
556        assert_eq!(paginator.fetch()?, pages[1].clone());
557        paginator.next();
558
559        assert_eq!(paginator.fetch()?, pages[2].clone());
560
561        let mut select = SelectStatement::new()
562            .exprs([
563                Expr::col((fruit::Entity, fruit::Column::Id)),
564                Expr::col((fruit::Entity, fruit::Column::Name)),
565                Expr::col((fruit::Entity, fruit::Column::CakeId)),
566            ])
567            .from(fruit::Entity)
568            .to_owned();
569
570        let query_builder = db.get_database_backend();
571        let stmts = [
572            query_builder.build(select.clone().offset(0).limit(2)),
573            query_builder.build(select.clone().offset(2).limit(2)),
574            query_builder.build(select.offset(4).limit(2)),
575        ];
576
577        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
578        Ok(())
579    }
580
581    #[test]
582    fn num_pages() -> Result<(), DbErr> {
583        let (db, num_items) = setup_num_items();
584
585        let num_items = num_items as u64;
586        let page_size = 2_u64;
587        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
588        let paginator = fruit::Entity::find().paginate(&db, page_size);
589
590        assert_eq!(paginator.num_pages()?, num_pages);
591
592        let sub_query = SelectStatement::new()
593            .exprs([
594                Expr::col((fruit::Entity, fruit::Column::Id)),
595                Expr::col((fruit::Entity, fruit::Column::Name)),
596                Expr::col((fruit::Entity, fruit::Column::CakeId)),
597            ])
598            .from(fruit::Entity)
599            .to_owned();
600
601        let select = SelectStatement::new()
602            .expr(Expr::cust("COUNT(*) AS num_items"))
603            .from_subquery(sub_query, "sub_query")
604            .to_owned();
605
606        let query_builder = db.get_database_backend();
607        let stmts = [query_builder.build(&select)];
608
609        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
610        Ok(())
611    }
612
613    #[test]
614    fn num_pages_raw() -> Result<(), DbErr> {
615        let (db, num_items) = setup_num_items();
616
617        let num_items = num_items as u64;
618        let page_size = 2_u64;
619        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
620        let paginator = fruit::Entity::find()
621            .from_raw_sql(RAW_STMT.clone())
622            .paginate(&db, page_size);
623
624        assert_eq!(paginator.num_pages()?, num_pages);
625
626        let sub_query = SelectStatement::new()
627            .exprs([
628                Expr::col((fruit::Entity, fruit::Column::Id)),
629                Expr::col((fruit::Entity, fruit::Column::Name)),
630                Expr::col((fruit::Entity, fruit::Column::CakeId)),
631            ])
632            .from(fruit::Entity)
633            .to_owned();
634
635        let select = SelectStatement::new()
636            .expr(Expr::cust("COUNT(*) AS num_items"))
637            .from_subquery(sub_query, "sub_query")
638            .to_owned();
639
640        let query_builder = db.get_database_backend();
641        let stmts = [query_builder.build(&select)];
642
643        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
644        Ok(())
645    }
646
647    #[test]
648    fn next_and_cur_page() -> Result<(), DbErr> {
649        let (db, _) = setup();
650
651        let mut paginator = fruit::Entity::find().paginate(&db, 2);
652
653        assert_eq!(paginator.cur_page(), 0);
654        paginator.next();
655
656        assert_eq!(paginator.cur_page(), 1);
657        paginator.next();
658
659        assert_eq!(paginator.cur_page(), 2);
660        Ok(())
661    }
662
663    #[test]
664    fn next_and_cur_page_raw() -> Result<(), DbErr> {
665        let (db, _) = setup();
666
667        let mut paginator = fruit::Entity::find()
668            .from_raw_sql(RAW_STMT.clone())
669            .paginate(&db, 2);
670
671        assert_eq!(paginator.cur_page(), 0);
672        paginator.next();
673
674        assert_eq!(paginator.cur_page(), 1);
675        paginator.next();
676
677        assert_eq!(paginator.cur_page(), 2);
678        Ok(())
679    }
680
681    #[test]
682    fn fetch_and_next() -> Result<(), DbErr> {
683        let (db, pages) = setup();
684
685        let mut paginator = fruit::Entity::find().paginate(&db, 2);
686
687        assert_eq!(paginator.cur_page(), 0);
688        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
689
690        assert_eq!(paginator.cur_page(), 1);
691        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
692
693        assert_eq!(paginator.cur_page(), 2);
694        assert_eq!(paginator.fetch_and_next()?, None);
695
696        let mut select = SelectStatement::new()
697            .exprs([
698                Expr::col((fruit::Entity, fruit::Column::Id)),
699                Expr::col((fruit::Entity, fruit::Column::Name)),
700                Expr::col((fruit::Entity, fruit::Column::CakeId)),
701            ])
702            .from(fruit::Entity)
703            .to_owned();
704
705        let query_builder = db.get_database_backend();
706        let stmts = [
707            query_builder.build(select.clone().offset(0).limit(2)),
708            query_builder.build(select.clone().offset(2).limit(2)),
709            query_builder.build(select.offset(4).limit(2)),
710        ];
711
712        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
713        Ok(())
714    }
715
716    #[test]
717    fn fetch_and_next_raw() -> Result<(), DbErr> {
718        let (db, pages) = setup();
719
720        let mut paginator = fruit::Entity::find()
721            .from_raw_sql(RAW_STMT.clone())
722            .paginate(&db, 2);
723
724        assert_eq!(paginator.cur_page(), 0);
725        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
726
727        assert_eq!(paginator.cur_page(), 1);
728        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
729
730        assert_eq!(paginator.cur_page(), 2);
731        assert_eq!(paginator.fetch_and_next()?, None);
732
733        let mut select = SelectStatement::new()
734            .exprs([
735                Expr::col((fruit::Entity, fruit::Column::Id)),
736                Expr::col((fruit::Entity, fruit::Column::Name)),
737                Expr::col((fruit::Entity, fruit::Column::CakeId)),
738            ])
739            .from(fruit::Entity)
740            .to_owned();
741
742        let query_builder = db.get_database_backend();
743        let stmts = [
744            query_builder.build(select.clone().offset(0).limit(2)),
745            query_builder.build(select.clone().offset(2).limit(2)),
746            query_builder.build(select.offset(4).limit(2)),
747        ];
748
749        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
750        Ok(())
751    }
752
753    #[test]
754    fn into_stream() -> Result<(), DbErr> {
755        let (db, pages) = setup();
756
757        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
758
759        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
760        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
761        assert_eq!(fruit_stream.try_next()?, None);
762
763        drop(fruit_stream);
764
765        let mut select = SelectStatement::new()
766            .exprs([
767                Expr::col((fruit::Entity, fruit::Column::Id)),
768                Expr::col((fruit::Entity, fruit::Column::Name)),
769                Expr::col((fruit::Entity, fruit::Column::CakeId)),
770            ])
771            .from(fruit::Entity)
772            .to_owned();
773
774        let query_builder = db.get_database_backend();
775        let stmts = [
776            query_builder.build(select.clone().offset(0).limit(2)),
777            query_builder.build(select.clone().offset(2).limit(2)),
778            query_builder.build(select.offset(4).limit(2)),
779        ];
780
781        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
782        Ok(())
783    }
784
785    #[test]
786    fn into_stream_raw() -> Result<(), DbErr> {
787        let (db, pages) = setup();
788
789        let mut fruit_stream = fruit::Entity::find()
790            .from_raw_sql(RAW_STMT.clone())
791            .paginate(&db, 2)
792            .into_stream();
793
794        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
795        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
796        assert_eq!(fruit_stream.try_next()?, None);
797
798        drop(fruit_stream);
799
800        let mut select = SelectStatement::new()
801            .exprs([
802                Expr::col((fruit::Entity, fruit::Column::Id)),
803                Expr::col((fruit::Entity, fruit::Column::Name)),
804                Expr::col((fruit::Entity, fruit::Column::CakeId)),
805            ])
806            .from(fruit::Entity)
807            .to_owned();
808
809        let query_builder = db.get_database_backend();
810        let stmts = [
811            query_builder.build(select.clone().offset(0).limit(2)),
812            query_builder.build(select.clone().offset(2).limit(2)),
813            query_builder.build(select.offset(4).limit(2)),
814        ];
815
816        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
817        Ok(())
818    }
819
820    #[test]
821    fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
822        let (db, pages) = setup();
823
824        let raw_stmt = Statement::from_sql_and_values(
825            DbBackend::Postgres,
826            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
827            [],
828        );
829
830        let mut fruit_stream = fruit::Entity::find()
831            .from_raw_sql(raw_stmt.clone())
832            .paginate(&db, 2)
833            .into_stream();
834
835        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
836        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
837        assert_eq!(fruit_stream.try_next()?, None);
838
839        drop(fruit_stream);
840
841        let mut select = SelectStatement::new()
842            .exprs([
843                Expr::col((fruit::Entity, fruit::Column::Id)),
844                Expr::col((fruit::Entity, fruit::Column::Name)),
845                Expr::col((fruit::Entity, fruit::Column::CakeId)),
846            ])
847            .from(fruit::Entity)
848            .to_owned();
849
850        let query_builder = db.get_database_backend();
851        let stmts = [
852            query_builder.build(select.clone().offset(0).limit(2)),
853            query_builder.build(select.clone().offset(2).limit(2)),
854            query_builder.build(select.offset(4).limit(2)),
855        ];
856
857        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
858        Ok(())
859    }
860
861    #[test]
862    #[should_panic]
863    fn error() {
864        let (db, _pages) = setup();
865
866        fruit::Entity::find().paginate(&db, 0);
867    }
868}