Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo, SelectTwoModel,
3    Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use sea_query::{Expr, SelectStatement};
6use std::marker::PhantomData;
7
8#[cfg(not(feature = "sync"))]
9type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
10#[cfg(feature = "sync")]
11type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let rows = self.db.query_all(&query)?;
52        let mut buffer = Vec::with_capacity(rows.len());
53        for row in rows.into_iter() {
54            buffer.push(S::from_raw_query_result(row)?);
55        }
56        Ok(buffer)
57    }
58
59    /// Fetch the current page
60    pub fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
61        self.fetch_page(self.page)
62    }
63
64    /// Get the total number of items
65    pub fn num_items(&self) -> Result<u64, DbErr> {
66        let query = SelectStatement::new()
67            .expr(Expr::cust("COUNT(*) AS num_items"))
68            .from_subquery(
69                self.query
70                    .clone()
71                    .reset_limit()
72                    .reset_offset()
73                    .clear_order_by()
74                    .to_owned(),
75                "sub_query",
76            )
77            .to_owned();
78        let result = match self.db.query_one(&query)? {
79            Some(res) => res,
80            None => return Ok(0),
81        };
82        #[allow(clippy::match_single_binding)]
83        let num_items = match self.db.get_database_backend() {
84            _ => result.try_get::<i64>("", "num_items")? as u64,
85        };
86        Ok(num_items)
87    }
88
89    /// Get the total number of pages
90    pub fn num_pages(&self) -> Result<u64, DbErr> {
91        let num_items = self.num_items()?;
92        let num_pages = self.compute_pages_number(num_items);
93        Ok(num_pages)
94    }
95
96    /// Get the total number of items and pages
97    pub fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
98        let number_of_items = self.num_items()?;
99        let number_of_pages = self.compute_pages_number(number_of_items);
100
101        Ok(ItemsAndPagesNumber {
102            number_of_items,
103            number_of_pages,
104        })
105    }
106
107    /// Compute the number of pages for the current page
108    fn compute_pages_number(&self, num_items: u64) -> u64 {
109        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
110    }
111
112    /// Increment the page counter
113    pub fn next(&mut self) {
114        self.page += 1;
115    }
116
117    /// Get current page number
118    pub fn cur_page(&self) -> u64 {
119        self.page
120    }
121
122    /// Fetch one page and increment the page counter
123    ///
124    /// ```
125    /// # use sea_orm::{error::*, tests_cfg::*, *};
126    /// #
127    /// # #[cfg(feature = "mock")]
128    /// # pub fn main() -> Result<(), DbErr> {
129    /// #
130    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
131    /// #     .append_query_results([
132    /// #         vec![cake::Model {
133    /// #             id: 1,
134    /// #             name: "Cake".to_owned(),
135    /// #         }],
136    /// #         vec![],
137    /// #     ])
138    /// #     .into_connection();
139    /// # let db = &owned_db;
140    /// #
141    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
142    /// let mut cake_pages = cake::Entity::find()
143    ///     .order_by_asc(cake::Column::Id)
144    ///     .paginate(db, 50);
145    ///
146    /// while let Some(cakes) = cake_pages.fetch_and_next()? {
147    ///     // Do something on cakes: Vec<cake::Model>
148    /// }
149    /// #
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
154        let vec = self.fetch()?;
155        self.next();
156        let opt = if !vec.is_empty() { Some(vec) } else { None };
157        Ok(opt)
158    }
159
160    /// Convert self into an stream
161    ///
162    /// ```
163    /// # use sea_orm::{error::*, tests_cfg::*, *};
164    /// #
165    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
166    /// # pub fn main() -> Result<(), DbErr> {
167    /// #
168    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
169    /// #     .append_query_results([
170    /// #         vec![cake::Model {
171    /// #             id: 1,
172    /// #             name: "Cake".to_owned(),
173    /// #         }],
174    /// #         vec![],
175    /// #     ])
176    /// #     .into_connection();
177    /// # let db = &owned_db;
178    /// #
179    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
180    /// let mut cake_stream = cake::Entity::find()
181    ///     .order_by_asc(cake::Column::Id)
182    ///     .paginate(db, 50)
183    ///     .into_stream();
184    ///
185    /// while let Some(cakes) = cake_stream.try_next()? {
186    ///     // Do something on cakes: Vec<cake::Model>
187    /// }
188    /// #
189    /// # Ok(())
190    /// # }
191    /// # #[cfg(all(feature = "mock", feature = "sync"))]
192    /// # fn main() {}
193    /// ```
194    /// (for SeaORM Sync)
195    /// ```
196    /// # use sea_orm::{error::*, tests_cfg::*, *};
197    /// # #[cfg(all(feature = "mock", feature = "sync"))]
198    /// # fn example() -> Result<(), DbErr> {
199    /// #
200    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
201    /// #     .append_query_results([
202    /// #         vec![cake::Model {
203    /// #             id: 1,
204    /// #             name: "Cake".to_owned(),
205    /// #         }],
206    /// #         vec![],
207    /// #     ])
208    /// #     .into_connection();
209    /// # let db = &owned_db;
210    /// #
211    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
212    /// let mut cake_stream = cake::Entity::find()
213    ///     .order_by_asc(cake::Column::Id)
214    ///     .paginate(db, 50)
215    ///     .into_stream();
216    ///
217    /// while let Some(cakes) = cake_stream.next() {
218    ///     // Do something on cakes: Vec<cake::Model>
219    /// }
220    /// #
221    /// # Ok(())
222    /// # }
223    /// ```
224    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
225        #[cfg(not(feature = "sync"))]
226        {
227            let mut streamer = self;
228            Box::new(stream! {
229                while let Some(vec) = streamer.fetch_and_next()? {
230                    yield Ok(vec);
231                }
232            })
233        }
234        #[cfg(feature = "sync")]
235        {
236            Box::new(PaginatorStream { paginator: self })
237        }
238    }
239}
240
241#[cfg(feature = "sync")]
242#[derive(Debug)]
243/// Stream items by page
244pub struct PaginatorStream<'db, C, S>
245where
246    C: ConnectionTrait,
247    S: SelectorTrait + 'db,
248{
249    paginator: Paginator<'db, C, S>,
250}
251
252/// A Trait for any type that can paginate results
253pub trait PaginatorTrait<'db, C>
254where
255    C: ConnectionTrait,
256{
257    /// Select operation
258    type Selector: SelectorTrait + 'db;
259
260    /// Paginate the result of a select operation.
261    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
262
263    /// Perform a count on the paginated results
264    fn count(self, db: &'db C) -> Result<u64, DbErr>
265    where
266        Self: Sized,
267    {
268        self.paginate(db, 1).num_items()
269    }
270}
271
272impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
273where
274    C: ConnectionTrait,
275    S: SelectorTrait + 'db,
276{
277    type Selector = S;
278
279    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
280        assert!(page_size != 0, "page_size should not be zero");
281        Paginator {
282            query: self.query,
283            page: 0,
284            page_size,
285            db,
286            selector: PhantomData,
287        }
288    }
289}
290
291impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
292where
293    C: ConnectionTrait,
294    S: SelectorTrait + 'db,
295{
296    type Selector = S;
297    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
298        assert!(page_size != 0, "page_size should not be zero");
299        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
300        let mut query = SelectStatement::new();
301        query.expr(if let Some(values) = self.stmt.values {
302            Expr::cust_with_values(sql, values.0)
303        } else {
304            Expr::cust(sql)
305        });
306
307        Paginator {
308            query,
309            page: 0,
310            page_size,
311            db,
312            selector: PhantomData,
313        }
314    }
315}
316
317impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
318where
319    C: ConnectionTrait,
320    E: EntityTrait<Model = M>,
321    M: FromQueryResult + Sized + 'db,
322{
323    type Selector = SelectModel<M>;
324
325    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
326        self.into_model().paginate(db, page_size)
327    }
328}
329
330impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
331where
332    C: ConnectionTrait,
333    E: EntityTrait<Model = M>,
334    F: EntityTrait<Model = N>,
335    M: FromQueryResult + Sized + 'db,
336    N: FromQueryResult + Sized + 'db,
337{
338    type Selector = SelectTwoModel<M, N>;
339
340    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
341        self.into_model().paginate(db, page_size)
342    }
343}
344
345#[cfg(feature = "sync")]
346impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
347where
348    C: ConnectionTrait,
349    S: SelectorTrait + 'db,
350{
351    type Item = Result<Vec<S::Item>, DbErr>;
352
353    fn next(&mut self) -> Option<Self::Item> {
354        match self.paginator.fetch_and_next() {
355            Ok(Some(vec)) => Some(Ok(vec)),
356            Ok(None) => None,
357            Err(e) => Some(Err(e)),
358        }
359    }
360}
361
362#[cfg(test)]
363#[cfg(feature = "mock")]
364mod tests {
365    use super::*;
366    use crate::entity::prelude::*;
367    #[cfg(feature = "sync")]
368    use crate::util::StreamShim;
369    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
370    use crate::{QueryOrder, QuerySelect};
371    use crate::{Statement, tests_cfg::*};
372    use pretty_assertions::assert_eq;
373    use sea_query::{Expr, SelectStatement, Value};
374    use std::sync::LazyLock;
375
376    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
377        Statement::from_sql_and_values(
378            DbBackend::Postgres,
379            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
380            [],
381        )
382    });
383
384    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
385        let page1 = vec![
386            fruit::Model {
387                id: 1,
388                name: "Blueberry".into(),
389                cake_id: Some(1),
390            },
391            fruit::Model {
392                id: 2,
393                name: "Raspberry".into(),
394                cake_id: Some(1),
395            },
396        ];
397
398        let page2 = vec![fruit::Model {
399            id: 3,
400            name: "Strawberry".into(),
401            cake_id: Some(2),
402        }];
403
404        let page3 = Vec::<fruit::Model>::new();
405
406        let db = MockDatabase::new(DbBackend::Postgres)
407            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
408            .into_connection();
409
410        (db, vec![page1, page2, page3])
411    }
412
413    fn setup_num_items() -> (DatabaseConnection, i64) {
414        let num_items = 3;
415        let db = MockDatabase::new(DbBackend::Postgres)
416            .append_query_results([[maplit::btreemap! {
417                "num_items" => Into::<Value>::into(num_items),
418            }]])
419            .into_connection();
420
421        (db, num_items)
422    }
423
424    #[test]
425    fn fetch_page() -> Result<(), DbErr> {
426        let (db, pages) = setup();
427
428        let paginator = fruit::Entity::find().paginate(&db, 2);
429
430        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
431        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
432        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
433
434        let mut select = SelectStatement::new()
435            .exprs([
436                Expr::col((fruit::Entity, fruit::Column::Id)),
437                Expr::col((fruit::Entity, fruit::Column::Name)),
438                Expr::col((fruit::Entity, fruit::Column::CakeId)),
439            ])
440            .from(fruit::Entity)
441            .to_owned();
442
443        let query_builder = db.get_database_backend();
444        let stmts = [
445            query_builder.build(select.clone().offset(0).limit(2)),
446            query_builder.build(select.clone().offset(2).limit(2)),
447            query_builder.build(select.offset(4).limit(2)),
448        ];
449
450        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
451        Ok(())
452    }
453
454    #[test]
455    fn fetch_page_raw() -> Result<(), DbErr> {
456        let (db, pages) = setup();
457
458        let paginator = fruit::Entity::find()
459            .from_raw_sql(RAW_STMT.clone())
460            .paginate(&db, 2);
461
462        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
463        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
464        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
465
466        let mut select = SelectStatement::new()
467            .exprs([
468                Expr::col((fruit::Entity, fruit::Column::Id)),
469                Expr::col((fruit::Entity, fruit::Column::Name)),
470                Expr::col((fruit::Entity, fruit::Column::CakeId)),
471            ])
472            .from(fruit::Entity)
473            .to_owned();
474
475        let query_builder = db.get_database_backend();
476        let stmts = [
477            query_builder.build(select.clone().offset(0).limit(2)),
478            query_builder.build(select.clone().offset(2).limit(2)),
479            query_builder.build(select.offset(4).limit(2)),
480        ];
481
482        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
483        Ok(())
484    }
485
486    #[test]
487    fn fetch() -> Result<(), DbErr> {
488        let (db, pages) = setup();
489
490        let mut paginator = fruit::Entity::find().paginate(&db, 2);
491
492        assert_eq!(paginator.fetch()?, pages[0].clone());
493        paginator.next();
494
495        assert_eq!(paginator.fetch()?, pages[1].clone());
496        paginator.next();
497
498        assert_eq!(paginator.fetch()?, pages[2].clone());
499
500        let mut select = SelectStatement::new()
501            .exprs([
502                Expr::col((fruit::Entity, fruit::Column::Id)),
503                Expr::col((fruit::Entity, fruit::Column::Name)),
504                Expr::col((fruit::Entity, fruit::Column::CakeId)),
505            ])
506            .from(fruit::Entity)
507            .to_owned();
508
509        let query_builder = db.get_database_backend();
510        let stmts = [
511            query_builder.build(select.clone().offset(0).limit(2)),
512            query_builder.build(select.clone().offset(2).limit(2)),
513            query_builder.build(select.offset(4).limit(2)),
514        ];
515
516        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
517        Ok(())
518    }
519
520    #[test]
521    fn fetch_raw() -> Result<(), DbErr> {
522        let (db, pages) = setup();
523
524        let mut paginator = fruit::Entity::find()
525            .from_raw_sql(RAW_STMT.clone())
526            .paginate(&db, 2);
527
528        assert_eq!(paginator.fetch()?, pages[0].clone());
529        paginator.next();
530
531        assert_eq!(paginator.fetch()?, pages[1].clone());
532        paginator.next();
533
534        assert_eq!(paginator.fetch()?, pages[2].clone());
535
536        let mut select = SelectStatement::new()
537            .exprs([
538                Expr::col((fruit::Entity, fruit::Column::Id)),
539                Expr::col((fruit::Entity, fruit::Column::Name)),
540                Expr::col((fruit::Entity, fruit::Column::CakeId)),
541            ])
542            .from(fruit::Entity)
543            .to_owned();
544
545        let query_builder = db.get_database_backend();
546        let stmts = [
547            query_builder.build(select.clone().offset(0).limit(2)),
548            query_builder.build(select.clone().offset(2).limit(2)),
549            query_builder.build(select.offset(4).limit(2)),
550        ];
551
552        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
553        Ok(())
554    }
555
556    #[test]
557    fn num_pages() -> Result<(), DbErr> {
558        let (db, num_items) = setup_num_items();
559
560        let num_items = num_items as u64;
561        let page_size = 2_u64;
562        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
563        let paginator = fruit::Entity::find().paginate(&db, page_size);
564
565        assert_eq!(paginator.num_pages()?, num_pages);
566
567        let sub_query = SelectStatement::new()
568            .exprs([
569                Expr::col((fruit::Entity, fruit::Column::Id)),
570                Expr::col((fruit::Entity, fruit::Column::Name)),
571                Expr::col((fruit::Entity, fruit::Column::CakeId)),
572            ])
573            .from(fruit::Entity)
574            .to_owned();
575
576        let select = SelectStatement::new()
577            .expr(Expr::cust("COUNT(*) AS num_items"))
578            .from_subquery(sub_query, "sub_query")
579            .to_owned();
580
581        let query_builder = db.get_database_backend();
582        let stmts = [query_builder.build(&select)];
583
584        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
585        Ok(())
586    }
587
588    #[test]
589    fn num_pages_raw() -> Result<(), DbErr> {
590        let (db, num_items) = setup_num_items();
591
592        let num_items = num_items as u64;
593        let page_size = 2_u64;
594        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
595        let paginator = fruit::Entity::find()
596            .from_raw_sql(RAW_STMT.clone())
597            .paginate(&db, page_size);
598
599        assert_eq!(paginator.num_pages()?, num_pages);
600
601        let sub_query = SelectStatement::new()
602            .exprs([
603                Expr::col((fruit::Entity, fruit::Column::Id)),
604                Expr::col((fruit::Entity, fruit::Column::Name)),
605                Expr::col((fruit::Entity, fruit::Column::CakeId)),
606            ])
607            .from(fruit::Entity)
608            .to_owned();
609
610        let select = SelectStatement::new()
611            .expr(Expr::cust("COUNT(*) AS num_items"))
612            .from_subquery(sub_query, "sub_query")
613            .to_owned();
614
615        let query_builder = db.get_database_backend();
616        let stmts = [query_builder.build(&select)];
617
618        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
619        Ok(())
620    }
621
622    #[test]
623    fn next_and_cur_page() -> Result<(), DbErr> {
624        let (db, _) = setup();
625
626        let mut paginator = fruit::Entity::find().paginate(&db, 2);
627
628        assert_eq!(paginator.cur_page(), 0);
629        paginator.next();
630
631        assert_eq!(paginator.cur_page(), 1);
632        paginator.next();
633
634        assert_eq!(paginator.cur_page(), 2);
635        Ok(())
636    }
637
638    #[test]
639    fn next_and_cur_page_raw() -> Result<(), DbErr> {
640        let (db, _) = setup();
641
642        let mut paginator = fruit::Entity::find()
643            .from_raw_sql(RAW_STMT.clone())
644            .paginate(&db, 2);
645
646        assert_eq!(paginator.cur_page(), 0);
647        paginator.next();
648
649        assert_eq!(paginator.cur_page(), 1);
650        paginator.next();
651
652        assert_eq!(paginator.cur_page(), 2);
653        Ok(())
654    }
655
656    #[test]
657    fn fetch_and_next() -> Result<(), DbErr> {
658        let (db, pages) = setup();
659
660        let mut paginator = fruit::Entity::find().paginate(&db, 2);
661
662        assert_eq!(paginator.cur_page(), 0);
663        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
664
665        assert_eq!(paginator.cur_page(), 1);
666        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
667
668        assert_eq!(paginator.cur_page(), 2);
669        assert_eq!(paginator.fetch_and_next()?, None);
670
671        let mut select = SelectStatement::new()
672            .exprs([
673                Expr::col((fruit::Entity, fruit::Column::Id)),
674                Expr::col((fruit::Entity, fruit::Column::Name)),
675                Expr::col((fruit::Entity, fruit::Column::CakeId)),
676            ])
677            .from(fruit::Entity)
678            .to_owned();
679
680        let query_builder = db.get_database_backend();
681        let stmts = [
682            query_builder.build(select.clone().offset(0).limit(2)),
683            query_builder.build(select.clone().offset(2).limit(2)),
684            query_builder.build(select.offset(4).limit(2)),
685        ];
686
687        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
688        Ok(())
689    }
690
691    #[test]
692    fn fetch_and_next_raw() -> Result<(), DbErr> {
693        let (db, pages) = setup();
694
695        let mut paginator = fruit::Entity::find()
696            .from_raw_sql(RAW_STMT.clone())
697            .paginate(&db, 2);
698
699        assert_eq!(paginator.cur_page(), 0);
700        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
701
702        assert_eq!(paginator.cur_page(), 1);
703        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
704
705        assert_eq!(paginator.cur_page(), 2);
706        assert_eq!(paginator.fetch_and_next()?, None);
707
708        let mut select = SelectStatement::new()
709            .exprs([
710                Expr::col((fruit::Entity, fruit::Column::Id)),
711                Expr::col((fruit::Entity, fruit::Column::Name)),
712                Expr::col((fruit::Entity, fruit::Column::CakeId)),
713            ])
714            .from(fruit::Entity)
715            .to_owned();
716
717        let query_builder = db.get_database_backend();
718        let stmts = [
719            query_builder.build(select.clone().offset(0).limit(2)),
720            query_builder.build(select.clone().offset(2).limit(2)),
721            query_builder.build(select.offset(4).limit(2)),
722        ];
723
724        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
725        Ok(())
726    }
727
728    #[test]
729    fn into_stream() -> Result<(), DbErr> {
730        let (db, pages) = setup();
731
732        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
733
734        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
735        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
736        assert_eq!(fruit_stream.try_next()?, None);
737
738        drop(fruit_stream);
739
740        let mut select = SelectStatement::new()
741            .exprs([
742                Expr::col((fruit::Entity, fruit::Column::Id)),
743                Expr::col((fruit::Entity, fruit::Column::Name)),
744                Expr::col((fruit::Entity, fruit::Column::CakeId)),
745            ])
746            .from(fruit::Entity)
747            .to_owned();
748
749        let query_builder = db.get_database_backend();
750        let stmts = [
751            query_builder.build(select.clone().offset(0).limit(2)),
752            query_builder.build(select.clone().offset(2).limit(2)),
753            query_builder.build(select.offset(4).limit(2)),
754        ];
755
756        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
757        Ok(())
758    }
759
760    #[test]
761    fn into_stream_raw() -> Result<(), DbErr> {
762        let (db, pages) = setup();
763
764        let mut fruit_stream = fruit::Entity::find()
765            .from_raw_sql(RAW_STMT.clone())
766            .paginate(&db, 2)
767            .into_stream();
768
769        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
770        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
771        assert_eq!(fruit_stream.try_next()?, None);
772
773        drop(fruit_stream);
774
775        let mut select = SelectStatement::new()
776            .exprs([
777                Expr::col((fruit::Entity, fruit::Column::Id)),
778                Expr::col((fruit::Entity, fruit::Column::Name)),
779                Expr::col((fruit::Entity, fruit::Column::CakeId)),
780            ])
781            .from(fruit::Entity)
782            .to_owned();
783
784        let query_builder = db.get_database_backend();
785        let stmts = [
786            query_builder.build(select.clone().offset(0).limit(2)),
787            query_builder.build(select.clone().offset(2).limit(2)),
788            query_builder.build(select.offset(4).limit(2)),
789        ];
790
791        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
792        Ok(())
793    }
794
795    #[test]
796    fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
797        let (db, pages) = setup();
798
799        let raw_stmt = Statement::from_sql_and_values(
800            DbBackend::Postgres,
801            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
802            [],
803        );
804
805        let mut fruit_stream = fruit::Entity::find()
806            .from_raw_sql(raw_stmt.clone())
807            .paginate(&db, 2)
808            .into_stream();
809
810        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
811        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
812        assert_eq!(fruit_stream.try_next()?, None);
813
814        drop(fruit_stream);
815
816        let mut select = SelectStatement::new()
817            .exprs([
818                Expr::col((fruit::Entity, fruit::Column::Id)),
819                Expr::col((fruit::Entity, fruit::Column::Name)),
820                Expr::col((fruit::Entity, fruit::Column::CakeId)),
821            ])
822            .from(fruit::Entity)
823            .to_owned();
824
825        let query_builder = db.get_database_backend();
826        let stmts = [
827            query_builder.build(select.clone().offset(0).limit(2)),
828            query_builder.build(select.clone().offset(2).limit(2)),
829            query_builder.build(select.offset(4).limit(2)),
830        ];
831
832        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
833        Ok(())
834    }
835
836    #[test]
837    #[should_panic]
838    fn error() {
839        let (db, _pages) = setup();
840
841        fruit::Entity::find().paginate(&db, 0);
842    }
843}