Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo, SelectTwoModel,
3    Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use sea_query::{Expr, SelectStatement};
6use std::marker::PhantomData;
7
8#[cfg(not(feature = "sync"))]
9type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
10#[cfg(feature = "sync")]
11type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let rows = self.db.query_all(&query)?;
52        let mut buffer = Vec::with_capacity(rows.len());
53        for row in rows.into_iter() {
54            buffer.push(S::from_raw_query_result(row)?);
55        }
56        Ok(buffer)
57    }
58
59    /// Fetch the current page
60    pub fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
61        self.fetch_page(self.page)
62    }
63
64    /// Get the total number of items
65    pub fn num_items(&self) -> Result<u64, DbErr> {
66        let query = SelectStatement::new()
67            .expr(Expr::cust("COUNT(*) AS num_items"))
68            .from_subquery(
69                self.query
70                    .clone()
71                    .reset_limit()
72                    .reset_offset()
73                    .clear_order_by()
74                    .to_owned(),
75                "sub_query",
76            )
77            .to_owned();
78        let result = match self.db.query_one(&query)? {
79            Some(res) => res,
80            None => return Ok(0),
81        };
82        let num_items = result.try_get::<i64>("", "num_items")? as u64;
83        Ok(num_items)
84    }
85
86    /// Get the total number of pages
87    pub fn num_pages(&self) -> Result<u64, DbErr> {
88        let num_items = self.num_items()?;
89        let num_pages = self.compute_pages_number(num_items);
90        Ok(num_pages)
91    }
92
93    /// Get the total number of items and pages
94    pub fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
95        let number_of_items = self.num_items()?;
96        let number_of_pages = self.compute_pages_number(number_of_items);
97
98        Ok(ItemsAndPagesNumber {
99            number_of_items,
100            number_of_pages,
101        })
102    }
103
104    /// Compute the number of pages for the current page
105    fn compute_pages_number(&self, num_items: u64) -> u64 {
106        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
107    }
108
109    /// Increment the page counter
110    pub fn next(&mut self) {
111        self.page += 1;
112    }
113
114    /// Get current page number
115    pub fn cur_page(&self) -> u64 {
116        self.page
117    }
118
119    /// Fetch one page and increment the page counter
120    ///
121    /// ```
122    /// # use sea_orm::{error::*, tests_cfg::*, *};
123    /// #
124    /// # #[cfg(feature = "mock")]
125    /// # pub fn main() -> Result<(), DbErr> {
126    /// #
127    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
128    /// #     .append_query_results([
129    /// #         vec![cake::Model {
130    /// #             id: 1,
131    /// #             name: "Cake".to_owned(),
132    /// #         }],
133    /// #         vec![],
134    /// #     ])
135    /// #     .into_connection();
136    /// # let db = &owned_db;
137    /// #
138    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
139    /// let mut cake_pages = cake::Entity::find()
140    ///     .order_by_asc(cake::Column::Id)
141    ///     .paginate(db, 50);
142    ///
143    /// while let Some(cakes) = cake_pages.fetch_and_next()? {
144    ///     // Do something on cakes: Vec<cake::Model>
145    /// }
146    /// #
147    /// # Ok(())
148    /// # }
149    /// ```
150    pub fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
151        let vec = self.fetch()?;
152        self.next();
153        let opt = if !vec.is_empty() { Some(vec) } else { None };
154        Ok(opt)
155    }
156
157    /// Convert self into an stream
158    ///
159    /// ```
160    /// # use sea_orm::{error::*, tests_cfg::*, *};
161    /// #
162    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
163    /// # pub fn main() -> Result<(), DbErr> {
164    /// #
165    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
166    /// #     .append_query_results([
167    /// #         vec![cake::Model {
168    /// #             id: 1,
169    /// #             name: "Cake".to_owned(),
170    /// #         }],
171    /// #         vec![],
172    /// #     ])
173    /// #     .into_connection();
174    /// # let db = &owned_db;
175    /// #
176    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
177    /// let mut cake_stream = cake::Entity::find()
178    ///     .order_by_asc(cake::Column::Id)
179    ///     .paginate(db, 50)
180    ///     .into_stream();
181    ///
182    /// while let Some(cakes) = cake_stream.try_next()? {
183    ///     // Do something on cakes: Vec<cake::Model>
184    /// }
185    /// #
186    /// # Ok(())
187    /// # }
188    /// # #[cfg(all(feature = "mock", feature = "sync"))]
189    /// # fn main() {}
190    /// ```
191    /// (for SeaORM Sync)
192    /// ```
193    /// # use sea_orm::{error::*, tests_cfg::*, *};
194    /// # #[cfg(all(feature = "mock", feature = "sync"))]
195    /// # fn example() -> Result<(), DbErr> {
196    /// #
197    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
198    /// #     .append_query_results([
199    /// #         vec![cake::Model {
200    /// #             id: 1,
201    /// #             name: "Cake".to_owned(),
202    /// #         }],
203    /// #         vec![],
204    /// #     ])
205    /// #     .into_connection();
206    /// # let db = &owned_db;
207    /// #
208    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
209    /// let mut cake_stream = cake::Entity::find()
210    ///     .order_by_asc(cake::Column::Id)
211    ///     .paginate(db, 50)
212    ///     .into_stream();
213    ///
214    /// while let Some(cakes) = cake_stream.next() {
215    ///     // Do something on cakes: Vec<cake::Model>
216    /// }
217    /// #
218    /// # Ok(())
219    /// # }
220    /// ```
221    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
222        #[cfg(not(feature = "sync"))]
223        {
224            let mut streamer = self;
225            Box::new(stream! {
226                while let Some(vec) = streamer.fetch_and_next()? {
227                    yield Ok(vec);
228                }
229            })
230        }
231        #[cfg(feature = "sync")]
232        {
233            Box::new(PaginatorStream { paginator: self })
234        }
235    }
236}
237
238#[cfg(feature = "sync")]
239#[derive(Debug)]
240/// Stream items by page
241pub struct PaginatorStream<'db, C, S>
242where
243    C: ConnectionTrait,
244    S: SelectorTrait + 'db,
245{
246    paginator: Paginator<'db, C, S>,
247}
248
249/// A Trait for any type that can paginate results
250pub trait PaginatorTrait<'db, C>
251where
252    C: ConnectionTrait,
253{
254    /// Select operation
255    type Selector: SelectorTrait + 'db;
256
257    /// Paginate the result of a select operation.
258    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
259
260    /// Perform a count on the paginated results
261    fn count(self, db: &'db C) -> Result<u64, DbErr>
262    where
263        Self: Sized,
264    {
265        self.paginate(db, 1).num_items()
266    }
267}
268
269impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
270where
271    C: ConnectionTrait,
272    S: SelectorTrait + 'db,
273{
274    type Selector = S;
275
276    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
277        assert!(page_size != 0, "page_size should not be zero");
278        Paginator {
279            query: self.query,
280            page: 0,
281            page_size,
282            db,
283            selector: PhantomData,
284        }
285    }
286}
287
288impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
289where
290    C: ConnectionTrait,
291    S: SelectorTrait + 'db,
292{
293    type Selector = S;
294    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
295        assert!(page_size != 0, "page_size should not be zero");
296        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
297        let mut query = SelectStatement::new();
298        query.expr(if let Some(values) = self.stmt.values {
299            Expr::cust_with_values(sql, values.0)
300        } else {
301            Expr::cust(sql)
302        });
303
304        Paginator {
305            query,
306            page: 0,
307            page_size,
308            db,
309            selector: PhantomData,
310        }
311    }
312}
313
314impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
315where
316    C: ConnectionTrait,
317    E: EntityTrait<Model = M>,
318    M: FromQueryResult + Sized + 'db,
319{
320    type Selector = SelectModel<M>;
321
322    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
323        self.into_model().paginate(db, page_size)
324    }
325}
326
327impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
328where
329    C: ConnectionTrait,
330    E: EntityTrait<Model = M>,
331    F: EntityTrait<Model = N>,
332    M: FromQueryResult + Sized + 'db,
333    N: FromQueryResult + Sized + 'db,
334{
335    type Selector = SelectTwoModel<M, N>;
336
337    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
338        self.into_model().paginate(db, page_size)
339    }
340}
341
342#[cfg(feature = "sync")]
343impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
344where
345    C: ConnectionTrait,
346    S: SelectorTrait + 'db,
347{
348    type Item = Result<Vec<S::Item>, DbErr>;
349
350    fn next(&mut self) -> Option<Self::Item> {
351        match self.paginator.fetch_and_next() {
352            Ok(Some(vec)) => Some(Ok(vec)),
353            Ok(None) => None,
354            Err(e) => Some(Err(e)),
355        }
356    }
357}
358
359#[cfg(test)]
360#[cfg(feature = "mock")]
361mod tests {
362    use super::*;
363    use crate::entity::prelude::*;
364    #[cfg(feature = "sync")]
365    use crate::util::StreamShim;
366    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
367    use crate::{QueryOrder, QuerySelect};
368    use crate::{Statement, tests_cfg::*};
369    use pretty_assertions::assert_eq;
370    use sea_query::{Expr, SelectStatement, Value};
371    use std::sync::LazyLock;
372
373    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
374        Statement::from_sql_and_values(
375            DbBackend::Postgres,
376            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
377            [],
378        )
379    });
380
381    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
382        let page1 = vec![
383            fruit::Model {
384                id: 1,
385                name: "Blueberry".into(),
386                cake_id: Some(1),
387            },
388            fruit::Model {
389                id: 2,
390                name: "Raspberry".into(),
391                cake_id: Some(1),
392            },
393        ];
394
395        let page2 = vec![fruit::Model {
396            id: 3,
397            name: "Strawberry".into(),
398            cake_id: Some(2),
399        }];
400
401        let page3 = Vec::<fruit::Model>::new();
402
403        let db = MockDatabase::new(DbBackend::Postgres)
404            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
405            .into_connection();
406
407        (db, vec![page1, page2, page3])
408    }
409
410    fn setup_num_items() -> (DatabaseConnection, i64) {
411        let num_items = 3;
412        let db = MockDatabase::new(DbBackend::Postgres)
413            .append_query_results([[maplit::btreemap! {
414                "num_items" => Into::<Value>::into(num_items),
415            }]])
416            .into_connection();
417
418        (db, num_items)
419    }
420
421    #[test]
422    fn fetch_page() -> Result<(), DbErr> {
423        let (db, pages) = setup();
424
425        let paginator = fruit::Entity::find().paginate(&db, 2);
426
427        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
428        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
429        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
430
431        let mut select = SelectStatement::new()
432            .exprs([
433                Expr::col((fruit::Entity, fruit::Column::Id)),
434                Expr::col((fruit::Entity, fruit::Column::Name)),
435                Expr::col((fruit::Entity, fruit::Column::CakeId)),
436            ])
437            .from(fruit::Entity)
438            .to_owned();
439
440        let query_builder = db.get_database_backend();
441        let stmts = [
442            query_builder.build(select.clone().offset(0).limit(2)),
443            query_builder.build(select.clone().offset(2).limit(2)),
444            query_builder.build(select.offset(4).limit(2)),
445        ];
446
447        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
448        Ok(())
449    }
450
451    #[test]
452    fn fetch_page_raw() -> Result<(), DbErr> {
453        let (db, pages) = setup();
454
455        let paginator = fruit::Entity::find()
456            .from_raw_sql(RAW_STMT.clone())
457            .paginate(&db, 2);
458
459        assert_eq!(paginator.fetch_page(0)?, pages[0].clone());
460        assert_eq!(paginator.fetch_page(1)?, pages[1].clone());
461        assert_eq!(paginator.fetch_page(2)?, pages[2].clone());
462
463        let mut select = SelectStatement::new()
464            .exprs([
465                Expr::col((fruit::Entity, fruit::Column::Id)),
466                Expr::col((fruit::Entity, fruit::Column::Name)),
467                Expr::col((fruit::Entity, fruit::Column::CakeId)),
468            ])
469            .from(fruit::Entity)
470            .to_owned();
471
472        let query_builder = db.get_database_backend();
473        let stmts = [
474            query_builder.build(select.clone().offset(0).limit(2)),
475            query_builder.build(select.clone().offset(2).limit(2)),
476            query_builder.build(select.offset(4).limit(2)),
477        ];
478
479        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
480        Ok(())
481    }
482
483    #[test]
484    fn fetch() -> Result<(), DbErr> {
485        let (db, pages) = setup();
486
487        let mut paginator = fruit::Entity::find().paginate(&db, 2);
488
489        assert_eq!(paginator.fetch()?, pages[0].clone());
490        paginator.next();
491
492        assert_eq!(paginator.fetch()?, pages[1].clone());
493        paginator.next();
494
495        assert_eq!(paginator.fetch()?, pages[2].clone());
496
497        let mut select = SelectStatement::new()
498            .exprs([
499                Expr::col((fruit::Entity, fruit::Column::Id)),
500                Expr::col((fruit::Entity, fruit::Column::Name)),
501                Expr::col((fruit::Entity, fruit::Column::CakeId)),
502            ])
503            .from(fruit::Entity)
504            .to_owned();
505
506        let query_builder = db.get_database_backend();
507        let stmts = [
508            query_builder.build(select.clone().offset(0).limit(2)),
509            query_builder.build(select.clone().offset(2).limit(2)),
510            query_builder.build(select.offset(4).limit(2)),
511        ];
512
513        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
514        Ok(())
515    }
516
517    #[test]
518    fn fetch_raw() -> Result<(), DbErr> {
519        let (db, pages) = setup();
520
521        let mut paginator = fruit::Entity::find()
522            .from_raw_sql(RAW_STMT.clone())
523            .paginate(&db, 2);
524
525        assert_eq!(paginator.fetch()?, pages[0].clone());
526        paginator.next();
527
528        assert_eq!(paginator.fetch()?, pages[1].clone());
529        paginator.next();
530
531        assert_eq!(paginator.fetch()?, pages[2].clone());
532
533        let mut select = SelectStatement::new()
534            .exprs([
535                Expr::col((fruit::Entity, fruit::Column::Id)),
536                Expr::col((fruit::Entity, fruit::Column::Name)),
537                Expr::col((fruit::Entity, fruit::Column::CakeId)),
538            ])
539            .from(fruit::Entity)
540            .to_owned();
541
542        let query_builder = db.get_database_backend();
543        let stmts = [
544            query_builder.build(select.clone().offset(0).limit(2)),
545            query_builder.build(select.clone().offset(2).limit(2)),
546            query_builder.build(select.offset(4).limit(2)),
547        ];
548
549        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
550        Ok(())
551    }
552
553    #[test]
554    fn num_pages() -> Result<(), DbErr> {
555        let (db, num_items) = setup_num_items();
556
557        let num_items = num_items as u64;
558        let page_size = 2_u64;
559        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
560        let paginator = fruit::Entity::find().paginate(&db, page_size);
561
562        assert_eq!(paginator.num_pages()?, num_pages);
563
564        let sub_query = SelectStatement::new()
565            .exprs([
566                Expr::col((fruit::Entity, fruit::Column::Id)),
567                Expr::col((fruit::Entity, fruit::Column::Name)),
568                Expr::col((fruit::Entity, fruit::Column::CakeId)),
569            ])
570            .from(fruit::Entity)
571            .to_owned();
572
573        let select = SelectStatement::new()
574            .expr(Expr::cust("COUNT(*) AS num_items"))
575            .from_subquery(sub_query, "sub_query")
576            .to_owned();
577
578        let query_builder = db.get_database_backend();
579        let stmts = [query_builder.build(&select)];
580
581        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
582        Ok(())
583    }
584
585    #[test]
586    fn num_pages_raw() -> Result<(), DbErr> {
587        let (db, num_items) = setup_num_items();
588
589        let num_items = num_items as u64;
590        let page_size = 2_u64;
591        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
592        let paginator = fruit::Entity::find()
593            .from_raw_sql(RAW_STMT.clone())
594            .paginate(&db, page_size);
595
596        assert_eq!(paginator.num_pages()?, num_pages);
597
598        let sub_query = SelectStatement::new()
599            .exprs([
600                Expr::col((fruit::Entity, fruit::Column::Id)),
601                Expr::col((fruit::Entity, fruit::Column::Name)),
602                Expr::col((fruit::Entity, fruit::Column::CakeId)),
603            ])
604            .from(fruit::Entity)
605            .to_owned();
606
607        let select = SelectStatement::new()
608            .expr(Expr::cust("COUNT(*) AS num_items"))
609            .from_subquery(sub_query, "sub_query")
610            .to_owned();
611
612        let query_builder = db.get_database_backend();
613        let stmts = [query_builder.build(&select)];
614
615        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
616        Ok(())
617    }
618
619    #[test]
620    fn next_and_cur_page() -> Result<(), DbErr> {
621        let (db, _) = setup();
622
623        let mut paginator = fruit::Entity::find().paginate(&db, 2);
624
625        assert_eq!(paginator.cur_page(), 0);
626        paginator.next();
627
628        assert_eq!(paginator.cur_page(), 1);
629        paginator.next();
630
631        assert_eq!(paginator.cur_page(), 2);
632        Ok(())
633    }
634
635    #[test]
636    fn next_and_cur_page_raw() -> Result<(), DbErr> {
637        let (db, _) = setup();
638
639        let mut paginator = fruit::Entity::find()
640            .from_raw_sql(RAW_STMT.clone())
641            .paginate(&db, 2);
642
643        assert_eq!(paginator.cur_page(), 0);
644        paginator.next();
645
646        assert_eq!(paginator.cur_page(), 1);
647        paginator.next();
648
649        assert_eq!(paginator.cur_page(), 2);
650        Ok(())
651    }
652
653    #[test]
654    fn fetch_and_next() -> Result<(), DbErr> {
655        let (db, pages) = setup();
656
657        let mut paginator = fruit::Entity::find().paginate(&db, 2);
658
659        assert_eq!(paginator.cur_page(), 0);
660        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
661
662        assert_eq!(paginator.cur_page(), 1);
663        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
664
665        assert_eq!(paginator.cur_page(), 2);
666        assert_eq!(paginator.fetch_and_next()?, None);
667
668        let mut select = SelectStatement::new()
669            .exprs([
670                Expr::col((fruit::Entity, fruit::Column::Id)),
671                Expr::col((fruit::Entity, fruit::Column::Name)),
672                Expr::col((fruit::Entity, fruit::Column::CakeId)),
673            ])
674            .from(fruit::Entity)
675            .to_owned();
676
677        let query_builder = db.get_database_backend();
678        let stmts = [
679            query_builder.build(select.clone().offset(0).limit(2)),
680            query_builder.build(select.clone().offset(2).limit(2)),
681            query_builder.build(select.offset(4).limit(2)),
682        ];
683
684        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
685        Ok(())
686    }
687
688    #[test]
689    fn fetch_and_next_raw() -> Result<(), DbErr> {
690        let (db, pages) = setup();
691
692        let mut paginator = fruit::Entity::find()
693            .from_raw_sql(RAW_STMT.clone())
694            .paginate(&db, 2);
695
696        assert_eq!(paginator.cur_page(), 0);
697        assert_eq!(paginator.fetch_and_next()?, Some(pages[0].clone()));
698
699        assert_eq!(paginator.cur_page(), 1);
700        assert_eq!(paginator.fetch_and_next()?, Some(pages[1].clone()));
701
702        assert_eq!(paginator.cur_page(), 2);
703        assert_eq!(paginator.fetch_and_next()?, None);
704
705        let mut select = SelectStatement::new()
706            .exprs([
707                Expr::col((fruit::Entity, fruit::Column::Id)),
708                Expr::col((fruit::Entity, fruit::Column::Name)),
709                Expr::col((fruit::Entity, fruit::Column::CakeId)),
710            ])
711            .from(fruit::Entity)
712            .to_owned();
713
714        let query_builder = db.get_database_backend();
715        let stmts = [
716            query_builder.build(select.clone().offset(0).limit(2)),
717            query_builder.build(select.clone().offset(2).limit(2)),
718            query_builder.build(select.offset(4).limit(2)),
719        ];
720
721        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
722        Ok(())
723    }
724
725    #[test]
726    fn into_stream() -> Result<(), DbErr> {
727        let (db, pages) = setup();
728
729        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
730
731        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
732        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
733        assert_eq!(fruit_stream.try_next()?, None);
734
735        drop(fruit_stream);
736
737        let mut select = SelectStatement::new()
738            .exprs([
739                Expr::col((fruit::Entity, fruit::Column::Id)),
740                Expr::col((fruit::Entity, fruit::Column::Name)),
741                Expr::col((fruit::Entity, fruit::Column::CakeId)),
742            ])
743            .from(fruit::Entity)
744            .to_owned();
745
746        let query_builder = db.get_database_backend();
747        let stmts = [
748            query_builder.build(select.clone().offset(0).limit(2)),
749            query_builder.build(select.clone().offset(2).limit(2)),
750            query_builder.build(select.offset(4).limit(2)),
751        ];
752
753        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
754        Ok(())
755    }
756
757    #[test]
758    fn into_stream_raw() -> Result<(), DbErr> {
759        let (db, pages) = setup();
760
761        let mut fruit_stream = fruit::Entity::find()
762            .from_raw_sql(RAW_STMT.clone())
763            .paginate(&db, 2)
764            .into_stream();
765
766        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
767        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
768        assert_eq!(fruit_stream.try_next()?, None);
769
770        drop(fruit_stream);
771
772        let mut select = SelectStatement::new()
773            .exprs([
774                Expr::col((fruit::Entity, fruit::Column::Id)),
775                Expr::col((fruit::Entity, fruit::Column::Name)),
776                Expr::col((fruit::Entity, fruit::Column::CakeId)),
777            ])
778            .from(fruit::Entity)
779            .to_owned();
780
781        let query_builder = db.get_database_backend();
782        let stmts = [
783            query_builder.build(select.clone().offset(0).limit(2)),
784            query_builder.build(select.clone().offset(2).limit(2)),
785            query_builder.build(select.offset(4).limit(2)),
786        ];
787
788        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
789        Ok(())
790    }
791
792    #[test]
793    fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
794        let (db, pages) = setup();
795
796        let raw_stmt = Statement::from_sql_and_values(
797            DbBackend::Postgres,
798            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
799            [],
800        );
801
802        let mut fruit_stream = fruit::Entity::find()
803            .from_raw_sql(raw_stmt.clone())
804            .paginate(&db, 2)
805            .into_stream();
806
807        assert_eq!(fruit_stream.try_next()?, Some(pages[0].clone()));
808        assert_eq!(fruit_stream.try_next()?, Some(pages[1].clone()));
809        assert_eq!(fruit_stream.try_next()?, None);
810
811        drop(fruit_stream);
812
813        let mut select = SelectStatement::new()
814            .exprs([
815                Expr::col((fruit::Entity, fruit::Column::Id)),
816                Expr::col((fruit::Entity, fruit::Column::Name)),
817                Expr::col((fruit::Entity, fruit::Column::CakeId)),
818            ])
819            .from(fruit::Entity)
820            .to_owned();
821
822        let query_builder = db.get_database_backend();
823        let stmts = [
824            query_builder.build(select.clone().offset(0).limit(2)),
825            query_builder.build(select.clone().offset(2).limit(2)),
826            query_builder.build(select.offset(4).limit(2)),
827        ];
828
829        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
830        Ok(())
831    }
832
833    #[test]
834    #[should_panic]
835    fn error() {
836        let (db, _pages) = setup();
837
838        fruit::Entity::find().paginate(&db, 0);
839    }
840}