Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo,
3    SelectTwoModel, Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use sea_query::{Expr, SelectStatement};
6use std::marker::PhantomData;
7
8#[cfg(not(feature = "sync"))]
9type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
10#[cfg(feature = "sync")]
11type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let rows = self.db.query_all(&query)?;
52        let mut buffer = Vec::with_capacity(rows.len());
53        for row in rows.into_iter() {
54            buffer.push(S::from_raw_query_result(row)?);
55        }
56        Ok(buffer)
57    }
58
59    /// Fetch the current page
60    pub fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
61        self.fetch_page(self.page)
62    }
63
64    /// Get the total number of items
65    pub fn num_items(&self) -> Result<u64, DbErr> {
66        let db_backend = self.db.get_database_backend();
67        let query = SelectStatement::new()
68            .expr(Expr::cust("COUNT(*) AS num_items"))
69            .from_subquery(
70                self.query
71                    .clone()
72                    .reset_limit()
73                    .reset_offset()
74                    .clear_order_by()
75                    .to_owned(),
76                "sub_query",
77            )
78            .to_owned();
79        let result = match self.db.query_one(&query)? {
80            Some(res) => res,
81            None => return Ok(0),
82        };
83        let num_items = match db_backend {
84            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
85            _ => result.try_get::<i32>("", "num_items")? as u64,
86        };
87        Ok(num_items)
88    }
89
90    /// Get the total number of pages
91    pub fn num_pages(&self) -> Result<u64, DbErr> {
92        let num_items = self.num_items()?;
93        let num_pages = self.compute_pages_number(num_items);
94        Ok(num_pages)
95    }
96
97    /// Get the total number of items and pages
98    pub fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
99        let number_of_items = self.num_items()?;
100        let number_of_pages = self.compute_pages_number(number_of_items);
101
102        Ok(ItemsAndPagesNumber {
103            number_of_items,
104            number_of_pages,
105        })
106    }
107
108    /// Compute the number of pages for the current page
109    fn compute_pages_number(&self, num_items: u64) -> u64 {
110        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
111    }
112
113    /// Increment the page counter
114    pub fn next(&mut self) {
115        self.page += 1;
116    }
117
118    /// Get current page number
119    pub fn cur_page(&self) -> u64 {
120        self.page
121    }
122
123    /// Fetch one page and increment the page counter
124    ///
125    /// ```
126    /// # use sea_orm::{error::*, tests_cfg::*, *};
127    /// #
128    /// # #[cfg(feature = "mock")]
129    /// # pub fn main() -> Result<(), DbErr> {
130    /// #
131    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
132    /// #     .append_query_results([
133    /// #         vec![cake::Model {
134    /// #             id: 1,
135    /// #             name: "Cake".to_owned(),
136    /// #         }],
137    /// #         vec![],
138    /// #     ])
139    /// #     .into_connection();
140    /// # let db = &owned_db;
141    /// #
142    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
143    /// let mut cake_pages = cake::Entity::find()
144    ///     .order_by_asc(cake::Column::Id)
145    ///     .paginate(db, 50);
146    ///
147    /// while let Some(cakes) = cake_pages.fetch_and_next()? {
148    ///     // Do something on cakes: Vec<cake::Model>
149    /// }
150    /// #
151    /// # Ok(())
152    /// # }
153    /// ```
154    pub fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
155        let vec = self.fetch()?;
156        self.next();
157        let opt = if !vec.is_empty() { Some(vec) } else { None };
158        Ok(opt)
159    }
160
161    /// Convert self into an stream
162    ///
163    /// ```
164    /// # use sea_orm::{error::*, tests_cfg::*, *};
165    /// #
166    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
167    /// # pub fn main() -> Result<(), DbErr> {
168    /// #
169    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
170    /// #     .append_query_results([
171    /// #         vec![cake::Model {
172    /// #             id: 1,
173    /// #             name: "Cake".to_owned(),
174    /// #         }],
175    /// #         vec![],
176    /// #     ])
177    /// #     .into_connection();
178    /// # let db = &owned_db;
179    /// #
180    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
181    /// let mut cake_stream = cake::Entity::find()
182    ///     .order_by_asc(cake::Column::Id)
183    ///     .paginate(db, 50)
184    ///     .into_stream();
185    ///
186    /// while let Some(cakes) = cake_stream.try_next()? {
187    ///     // Do something on cakes: Vec<cake::Model>
188    /// }
189    /// #
190    /// # Ok(())
191    /// # }
192    /// # #[cfg(all(feature = "mock", feature = "sync"))]
193    /// # fn main() {}
194    /// ```
195    /// (for SeaORM Sync)
196    /// ```
197    /// # use sea_orm::{error::*, tests_cfg::*, *};
198    /// # #[cfg(all(feature = "mock", feature = "sync"))]
199    /// # fn example() -> Result<(), DbErr> {
200    /// #
201    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
202    /// #     .append_query_results([
203    /// #         vec![cake::Model {
204    /// #             id: 1,
205    /// #             name: "Cake".to_owned(),
206    /// #         }],
207    /// #         vec![],
208    /// #     ])
209    /// #     .into_connection();
210    /// # let db = &owned_db;
211    /// #
212    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
213    /// let mut cake_stream = cake::Entity::find()
214    ///     .order_by_asc(cake::Column::Id)
215    ///     .paginate(db, 50)
216    ///     .into_stream();
217    ///
218    /// while let Some(cakes) = cake_stream.next() {
219    ///     // Do something on cakes: Vec<cake::Model>
220    /// }
221    /// #
222    /// # Ok(())
223    /// # }
224    /// ```
225    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
226        #[cfg(not(feature = "sync"))]
227        {
228            let mut streamer = self;
229            Box::new(stream! {
230                while let Some(vec) = streamer.fetch_and_next()? {
231                    yield Ok(vec);
232                }
233            })
234        }
235        #[cfg(feature = "sync")]
236        {
237            Box::new(PaginatorStream { paginator: self })
238        }
239    }
240}
241
242#[cfg(feature = "sync")]
243#[derive(Debug)]
244/// Stream items by page
245pub struct PaginatorStream<'db, C, S>
246where
247    C: ConnectionTrait,
248    S: SelectorTrait + 'db,
249{
250    paginator: Paginator<'db, C, S>,
251}
252
253/// A Trait for any type that can paginate results
254pub trait PaginatorTrait<'db, C>
255where
256    C: ConnectionTrait,
257{
258    /// Select operation
259    type Selector: SelectorTrait + 'db;
260
261    /// Paginate the result of a select operation.
262    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
263
264    /// Perform a count on the paginated results
265    fn count(self, db: &'db C) -> Result<u64, DbErr>
266    where
267        Self: Sized,
268    {
269        self.paginate(db, 1).num_items()
270    }
271}
272
273impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
274where
275    C: ConnectionTrait,
276    S: SelectorTrait + 'db,
277{
278    type Selector = S;
279
280    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
281        assert!(page_size != 0, "page_size should not be zero");
282        Paginator {
283            query: self.query,
284            page: 0,
285            page_size,
286            db,
287            selector: PhantomData,
288        }
289    }
290}
291
292impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
293where
294    C: ConnectionTrait,
295    S: SelectorTrait + 'db,
296{
297    type Selector = S;
298    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
299        assert!(page_size != 0, "page_size should not be zero");
300        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
301        let mut query = SelectStatement::new();
302        query.expr(if let Some(values) = self.stmt.values {
303            Expr::cust_with_values(sql, values.0)
304        } else {
305            Expr::cust(sql)
306        });
307
308        Paginator {
309            query,
310            page: 0,
311            page_size,
312            db,
313            selector: PhantomData,
314        }
315    }
316}
317
318impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
319where
320    C: ConnectionTrait,
321    E: EntityTrait<Model = M>,
322    M: FromQueryResult + Sized + 'db,
323{
324    type Selector = SelectModel<M>;
325
326    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
327        self.into_model().paginate(db, page_size)
328    }
329}
330
331impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
332where
333    C: ConnectionTrait,
334    E: EntityTrait<Model = M>,
335    F: EntityTrait<Model = N>,
336    M: FromQueryResult + Sized + 'db,
337    N: FromQueryResult + Sized + 'db,
338{
339    type Selector = SelectTwoModel<M, N>;
340
341    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
342        self.into_model().paginate(db, page_size)
343    }
344}
345
346#[cfg(feature = "sync")]
347impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
348where
349    C: ConnectionTrait,
350    S: SelectorTrait + 'db,
351{
352    type Item = Result<Vec<S::Item>, DbErr>;
353
354    fn next(&mut self) -> Option<Self::Item> {
355        match self.paginator.fetch_and_next() {
356            Ok(Some(vec)) => Some(Ok(vec)),
357            Ok(None) => None,
358            Err(e) => Some(Err(e)),
359        }
360    }
361}
362
363#[cfg(test)]
364#[cfg(feature = "mock")]
365mod tests {
366    use super::*;
367    use crate::entity::prelude::*;
368    #[cfg(feature = "sync")]
369    use crate::util::StreamShim;
370    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
371    use crate::{QueryOrder, QuerySelect};
372    use crate::{Statement, tests_cfg::*};
373    use pretty_assertions::assert_eq;
374    use sea_query::{Expr, SelectStatement, Value};
375    use std::sync::LazyLock;
376
377    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
378        Statement::from_sql_and_values(
379            DbBackend::Postgres,
380            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
381            [],
382        )
383    });
384
385    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
386        let page1 = vec![
387            fruit::Model {
388                id: 1,
389                name: "Blueberry".into(),
390                cake_id: Some(1),
391            },
392            fruit::Model {
393                id: 2,
394                name: "Raspberry".into(),
395                cake_id: Some(1),
396            },
397        ];
398
399        let page2 = vec![fruit::Model {
400            id: 3,
401            name: "Strawberry".into(),
402            cake_id: Some(2),
403        }];
404
405        let page3 = Vec::<fruit::Model>::new();
406
407        let db = MockDatabase::new(DbBackend::Postgres)
408            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
409            .into_connection();
410
411        (db, vec![page1, page2, page3])
412    }
413
414    fn setup_num_items() -> (DatabaseConnection, i64) {
415        let num_items = 3;
416        let db = MockDatabase::new(DbBackend::Postgres)
417            .append_query_results([[maplit::btreemap! {
418                "num_items" => Into::<Value>::into(num_items),
419            }]])
420            .into_connection();
421
422        (db, num_items)
423    }
424
425    #[test]
426    fn fetch_page() -> Result<(), DbErr> {
427        let (db, pages) = setup();
428
429        let paginator = fruit::Entity::find().paginate(&db, 2);
430
431        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
432        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
433        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
434
435        let mut select = SelectStatement::new()
436            .exprs([
437                Expr::col((fruit::Entity, fruit::Column::Id)),
438                Expr::col((fruit::Entity, fruit::Column::Name)),
439                Expr::col((fruit::Entity, fruit::Column::CakeId)),
440            ])
441            .from(fruit::Entity)
442            .to_owned();
443
444        let query_builder = db.get_database_backend();
445        let stmts = [
446            query_builder.build(select.clone().offset(0).limit(2)),
447            query_builder.build(select.clone().offset(2).limit(2)),
448            query_builder.build(select.offset(4).limit(2)),
449        ];
450
451        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
452        Ok(())
453    }
454
455    #[test]
456    fn fetch_page_raw() -> Result<(), DbErr> {
457        let (db, pages) = setup();
458
459        let paginator = fruit::Entity::find()
460            .from_raw_sql(RAW_STMT.clone())
461            .paginate(&db, 2);
462
463        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
464        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
465        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
466
467        let mut select = SelectStatement::new()
468            .exprs([
469                Expr::col((fruit::Entity, fruit::Column::Id)),
470                Expr::col((fruit::Entity, fruit::Column::Name)),
471                Expr::col((fruit::Entity, fruit::Column::CakeId)),
472            ])
473            .from(fruit::Entity)
474            .to_owned();
475
476        let query_builder = db.get_database_backend();
477        let stmts = [
478            query_builder.build(select.clone().offset(0).limit(2)),
479            query_builder.build(select.clone().offset(2).limit(2)),
480            query_builder.build(select.offset(4).limit(2)),
481        ];
482
483        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
484        Ok(())
485    }
486
487    #[test]
488    fn fetch() -> Result<(), DbErr> {
489        let (db, pages) = setup();
490
491        let mut paginator = fruit::Entity::find().paginate(&db, 2);
492
493        assert_eq!(paginator.fetch()?, pages[0].clone());
494        paginator.next();
495
496        assert_eq!(paginator.fetch()?, pages[1].clone());
497        paginator.next();
498
499        assert_eq!(paginator.fetch()?, pages[2].clone());
500
501        let mut select = SelectStatement::new()
502            .exprs([
503                Expr::col((fruit::Entity, fruit::Column::Id)),
504                Expr::col((fruit::Entity, fruit::Column::Name)),
505                Expr::col((fruit::Entity, fruit::Column::CakeId)),
506            ])
507            .from(fruit::Entity)
508            .to_owned();
509
510        let query_builder = db.get_database_backend();
511        let stmts = [
512            query_builder.build(select.clone().offset(0).limit(2)),
513            query_builder.build(select.clone().offset(2).limit(2)),
514            query_builder.build(select.offset(4).limit(2)),
515        ];
516
517        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
518        Ok(())
519    }
520
521    #[test]
522    fn fetch_raw() -> Result<(), DbErr> {
523        let (db, pages) = setup();
524
525        let mut paginator = fruit::Entity::find()
526            .from_raw_sql(RAW_STMT.clone())
527            .paginate(&db, 2);
528
529        assert_eq!(paginator.fetch()?, pages[0].clone());
530        paginator.next();
531
532        assert_eq!(paginator.fetch()?, pages[1].clone());
533        paginator.next();
534
535        assert_eq!(paginator.fetch()?, pages[2].clone());
536
537        let mut select = SelectStatement::new()
538            .exprs([
539                Expr::col((fruit::Entity, fruit::Column::Id)),
540                Expr::col((fruit::Entity, fruit::Column::Name)),
541                Expr::col((fruit::Entity, fruit::Column::CakeId)),
542            ])
543            .from(fruit::Entity)
544            .to_owned();
545
546        let query_builder = db.get_database_backend();
547        let stmts = [
548            query_builder.build(select.clone().offset(0).limit(2)),
549            query_builder.build(select.clone().offset(2).limit(2)),
550            query_builder.build(select.offset(4).limit(2)),
551        ];
552
553        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
554        Ok(())
555    }
556
557    #[test]
558    fn num_pages() -> Result<(), DbErr> {
559        let (db, num_items) = setup_num_items();
560
561        let num_items = num_items as u64;
562        let page_size = 2_u64;
563        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
564        let paginator = fruit::Entity::find().paginate(&db, page_size);
565
566        assert_eq!(paginator.num_pages()?, num_pages);
567
568        let sub_query = SelectStatement::new()
569            .exprs([
570                Expr::col((fruit::Entity, fruit::Column::Id)),
571                Expr::col((fruit::Entity, fruit::Column::Name)),
572                Expr::col((fruit::Entity, fruit::Column::CakeId)),
573            ])
574            .from(fruit::Entity)
575            .to_owned();
576
577        let select = SelectStatement::new()
578            .expr(Expr::cust("COUNT(*) AS num_items"))
579            .from_subquery(sub_query, "sub_query")
580            .to_owned();
581
582        let query_builder = db.get_database_backend();
583        let stmts = [query_builder.build(&select)];
584
585        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
586        Ok(())
587    }
588
589    #[test]
590    fn num_pages_raw() -> Result<(), DbErr> {
591        let (db, num_items) = setup_num_items();
592
593        let num_items = num_items as u64;
594        let page_size = 2_u64;
595        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
596        let paginator = fruit::Entity::find()
597            .from_raw_sql(RAW_STMT.clone())
598            .paginate(&db, page_size);
599
600        assert_eq!(paginator.num_pages()?, num_pages);
601
602        let sub_query = SelectStatement::new()
603            .exprs([
604                Expr::col((fruit::Entity, fruit::Column::Id)),
605                Expr::col((fruit::Entity, fruit::Column::Name)),
606                Expr::col((fruit::Entity, fruit::Column::CakeId)),
607            ])
608            .from(fruit::Entity)
609            .to_owned();
610
611        let select = SelectStatement::new()
612            .expr(Expr::cust("COUNT(*) AS num_items"))
613            .from_subquery(sub_query, "sub_query")
614            .to_owned();
615
616        let query_builder = db.get_database_backend();
617        let stmts = [query_builder.build(&select)];
618
619        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
620        Ok(())
621    }
622
623    #[test]
624    fn next_and_cur_page() -> Result<(), DbErr> {
625        let (db, _) = setup();
626
627        let mut paginator = fruit::Entity::find().paginate(&db, 2);
628
629        assert_eq!(paginator.cur_page(), 0);
630        paginator.next();
631
632        assert_eq!(paginator.cur_page(), 1);
633        paginator.next();
634
635        assert_eq!(paginator.cur_page(), 2);
636        Ok(())
637    }
638
639    #[test]
640    fn next_and_cur_page_raw() -> Result<(), DbErr> {
641        let (db, _) = setup();
642
643        let mut paginator = fruit::Entity::find()
644            .from_raw_sql(RAW_STMT.clone())
645            .paginate(&db, 2);
646
647        assert_eq!(paginator.cur_page(), 0);
648        paginator.next();
649
650        assert_eq!(paginator.cur_page(), 1);
651        paginator.next();
652
653        assert_eq!(paginator.cur_page(), 2);
654        Ok(())
655    }
656
657    #[test]
658    fn fetch_and_next() -> Result<(), DbErr> {
659        let (db, pages) = setup();
660
661        let mut paginator = fruit::Entity::find().paginate(&db, 2);
662
663        assert_eq!(paginator.cur_page(), 0);
664        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
665
666        assert_eq!(paginator.cur_page(), 1);
667        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
668
669        assert_eq!(paginator.cur_page(), 2);
670        assert_eq!(paginator.fetch_and_next()?, None);
671
672        let mut select = SelectStatement::new()
673            .exprs([
674                Expr::col((fruit::Entity, fruit::Column::Id)),
675                Expr::col((fruit::Entity, fruit::Column::Name)),
676                Expr::col((fruit::Entity, fruit::Column::CakeId)),
677            ])
678            .from(fruit::Entity)
679            .to_owned();
680
681        let query_builder = db.get_database_backend();
682        let stmts = [
683            query_builder.build(select.clone().offset(0).limit(2)),
684            query_builder.build(select.clone().offset(2).limit(2)),
685            query_builder.build(select.offset(4).limit(2)),
686        ];
687
688        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
689        Ok(())
690    }
691
692    #[test]
693    fn fetch_and_next_raw() -> Result<(), DbErr> {
694        let (db, pages) = setup();
695
696        let mut paginator = fruit::Entity::find()
697            .from_raw_sql(RAW_STMT.clone())
698            .paginate(&db, 2);
699
700        assert_eq!(paginator.cur_page(), 0);
701        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
702
703        assert_eq!(paginator.cur_page(), 1);
704        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
705
706        assert_eq!(paginator.cur_page(), 2);
707        assert_eq!(paginator.fetch_and_next()?, None);
708
709        let mut select = SelectStatement::new()
710            .exprs([
711                Expr::col((fruit::Entity, fruit::Column::Id)),
712                Expr::col((fruit::Entity, fruit::Column::Name)),
713                Expr::col((fruit::Entity, fruit::Column::CakeId)),
714            ])
715            .from(fruit::Entity)
716            .to_owned();
717
718        let query_builder = db.get_database_backend();
719        let stmts = [
720            query_builder.build(select.clone().offset(0).limit(2)),
721            query_builder.build(select.clone().offset(2).limit(2)),
722            query_builder.build(select.offset(4).limit(2)),
723        ];
724
725        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
726        Ok(())
727    }
728
729    #[test]
730    fn into_stream() -> Result<(), DbErr> {
731        let (db, pages) = setup();
732
733        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
734
735        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
736        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
737        assert_eq!(fruit_stream.try_next()?, None);
738
739        drop(fruit_stream);
740
741        let mut select = SelectStatement::new()
742            .exprs([
743                Expr::col((fruit::Entity, fruit::Column::Id)),
744                Expr::col((fruit::Entity, fruit::Column::Name)),
745                Expr::col((fruit::Entity, fruit::Column::CakeId)),
746            ])
747            .from(fruit::Entity)
748            .to_owned();
749
750        let query_builder = db.get_database_backend();
751        let stmts = [
752            query_builder.build(select.clone().offset(0).limit(2)),
753            query_builder.build(select.clone().offset(2).limit(2)),
754            query_builder.build(select.offset(4).limit(2)),
755        ];
756
757        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
758        Ok(())
759    }
760
761    #[test]
762    fn into_stream_raw() -> Result<(), DbErr> {
763        let (db, pages) = setup();
764
765        let mut fruit_stream = fruit::Entity::find()
766            .from_raw_sql(RAW_STMT.clone())
767            .paginate(&db, 2)
768            .into_stream();
769
770        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
771        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
772        assert_eq!(fruit_stream.try_next()?, None);
773
774        drop(fruit_stream);
775
776        let mut select = SelectStatement::new()
777            .exprs([
778                Expr::col((fruit::Entity, fruit::Column::Id)),
779                Expr::col((fruit::Entity, fruit::Column::Name)),
780                Expr::col((fruit::Entity, fruit::Column::CakeId)),
781            ])
782            .from(fruit::Entity)
783            .to_owned();
784
785        let query_builder = db.get_database_backend();
786        let stmts = [
787            query_builder.build(select.clone().offset(0).limit(2)),
788            query_builder.build(select.clone().offset(2).limit(2)),
789            query_builder.build(select.offset(4).limit(2)),
790        ];
791
792        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
793        Ok(())
794    }
795
796    #[test]
797    fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
798        let (db, pages) = setup();
799
800        let raw_stmt = Statement::from_sql_and_values(
801            DbBackend::Postgres,
802            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
803            [],
804        );
805
806        let mut fruit_stream = fruit::Entity::find()
807            .from_raw_sql(raw_stmt.clone())
808            .paginate(&db, 2)
809            .into_stream();
810
811        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
812        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
813        assert_eq!(fruit_stream.try_next()?, None);
814
815        drop(fruit_stream);
816
817        let mut select = SelectStatement::new()
818            .exprs([
819                Expr::col((fruit::Entity, fruit::Column::Id)),
820                Expr::col((fruit::Entity, fruit::Column::Name)),
821                Expr::col((fruit::Entity, fruit::Column::CakeId)),
822            ])
823            .from(fruit::Entity)
824            .to_owned();
825
826        let query_builder = db.get_database_backend();
827        let stmts = [
828            query_builder.build(select.clone().offset(0).limit(2)),
829            query_builder.build(select.clone().offset(2).limit(2)),
830            query_builder.build(select.offset(4).limit(2)),
831        ];
832
833        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
834        Ok(())
835    }
836
837    #[test]
838    #[should_panic]
839    fn error() {
840        let (db, _pages) = setup();
841
842        fruit::Entity::find().paginate(&db, 0);
843    }
844}