Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo, SelectTwoModel,
3    Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10#[cfg(not(feature = "sync"))]
11type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12#[cfg(feature = "sync")]
13type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
14
15/// Defined a structure to handle pagination of a result from a query operation on a Model
16#[derive(Clone, Debug)]
17pub struct Paginator<'db, C, S>
18where
19    C: ConnectionTrait,
20    S: SelectorTrait + 'db,
21{
22    pub(crate) query: SelectStatement,
23    pub(crate) page: u64,
24    pub(crate) page_size: u64,
25    pub(crate) db: &'db C,
26    pub(crate) selector: PhantomData<S>,
27}
28
29/// Define a structure containing the numbers of items and pages of a Paginator
30#[derive(Clone, Debug)]
31pub struct ItemsAndPagesNumber {
32    /// The total number of items of a paginator
33    pub number_of_items: u64,
34    /// The total number of pages of a paginator
35    pub number_of_pages: u64,
36}
37
38// LINT: warn if paginator is used without an order by clause
39
40impl<'db, C, S> Paginator<'db, C, S>
41where
42    C: ConnectionTrait,
43    S: SelectorTrait + 'db,
44{
45    /// Fetch a specific page; page index starts from zero
46    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
47        let query = self
48            .query
49            .clone()
50            .limit(self.page_size)
51            .offset(self.page_size * page)
52            .to_owned();
53        let rows = self.db.query_all(&query).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            buffer.push(S::from_raw_query_result(row)?);
57        }
58        Ok(buffer)
59    }
60
61    /// Fetch the current page
62    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
63        self.fetch_page(self.page).await
64    }
65
66    /// Get the total number of items
67    pub async fn num_items(&self) -> Result<u64, DbErr> {
68        let query = SelectStatement::new()
69            .expr(Expr::cust("COUNT(*) AS num_items"))
70            .from_subquery(
71                self.query
72                    .clone()
73                    .reset_limit()
74                    .reset_offset()
75                    .clear_order_by()
76                    .to_owned(),
77                "sub_query",
78            )
79            .to_owned();
80        let result = match self.db.query_one(&query).await? {
81            Some(res) => res,
82            None => return Ok(0),
83        };
84        let num_items = result.try_get::<i64>("", "num_items")? as u64;
85        Ok(num_items)
86    }
87
88    /// Get the total number of pages
89    pub async fn num_pages(&self) -> Result<u64, DbErr> {
90        let num_items = self.num_items().await?;
91        let num_pages = self.compute_pages_number(num_items);
92        Ok(num_pages)
93    }
94
95    /// Get the total number of items and pages
96    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
97        let number_of_items = self.num_items().await?;
98        let number_of_pages = self.compute_pages_number(number_of_items);
99
100        Ok(ItemsAndPagesNumber {
101            number_of_items,
102            number_of_pages,
103        })
104    }
105
106    /// Compute the number of pages for the current page
107    fn compute_pages_number(&self, num_items: u64) -> u64 {
108        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
109    }
110
111    /// Increment the page counter
112    pub fn next(&mut self) {
113        self.page += 1;
114    }
115
116    /// Get current page number
117    pub fn cur_page(&self) -> u64 {
118        self.page
119    }
120
121    /// Fetch one page and increment the page counter
122    ///
123    /// ```
124    /// # use sea_orm::{error::*, tests_cfg::*, *};
125    /// #
126    /// # #[smol_potat::main]
127    /// # #[cfg(feature = "mock")]
128    /// # pub async fn main() -> Result<(), DbErr> {
129    /// #
130    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
131    /// #     .append_query_results([
132    /// #         vec![cake::Model {
133    /// #             id: 1,
134    /// #             name: "Cake".to_owned(),
135    /// #         }],
136    /// #         vec![],
137    /// #     ])
138    /// #     .into_connection();
139    /// # let db = &owned_db;
140    /// #
141    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
142    /// let mut cake_pages = cake::Entity::find()
143    ///     .order_by_asc(cake::Column::Id)
144    ///     .paginate(db, 50);
145    ///
146    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
147    ///     // Do something on cakes: Vec<cake::Model>
148    /// }
149    /// #
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
154        let vec = self.fetch().await?;
155        self.next();
156        let opt = if !vec.is_empty() { Some(vec) } else { None };
157        Ok(opt)
158    }
159
160    /// Convert self into an async stream
161    ///
162    /// ```
163    /// # use sea_orm::{error::*, tests_cfg::*, *};
164    /// #
165    /// # #[smol_potat::main]
166    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
167    /// # pub async fn main() -> Result<(), DbErr> {
168    /// #
169    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
170    /// #     .append_query_results([
171    /// #         vec![cake::Model {
172    /// #             id: 1,
173    /// #             name: "Cake".to_owned(),
174    /// #         }],
175    /// #         vec![],
176    /// #     ])
177    /// #     .into_connection();
178    /// # let db = &owned_db;
179    /// #
180    /// use futures_util::TryStreamExt;
181    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
182    /// let mut cake_stream = cake::Entity::find()
183    ///     .order_by_asc(cake::Column::Id)
184    ///     .paginate(db, 50)
185    ///     .into_stream();
186    ///
187    /// while let Some(cakes) = cake_stream.try_next().await? {
188    ///     // Do something on cakes: Vec<cake::Model>
189    /// }
190    /// #
191    /// # Ok(())
192    /// # }
193    /// # #[cfg(all(feature = "mock", feature = "sync"))]
194    /// # fn main() {}
195    /// ```
196    /// (for SeaORM Sync)
197    /// ```
198    /// # use sea_orm::{error::*, tests_cfg::*, *};
199    /// # #[cfg(all(feature = "mock", feature = "sync"))]
200    /// # fn example() -> Result<(), DbErr> {
201    /// #
202    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
203    /// #     .append_query_results([
204    /// #         vec![cake::Model {
205    /// #             id: 1,
206    /// #             name: "Cake".to_owned(),
207    /// #         }],
208    /// #         vec![],
209    /// #     ])
210    /// #     .into_connection();
211    /// # let db = &owned_db;
212    /// #
213    /// use futures_util::TryStreamExt;
214    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
215    /// let mut cake_stream = cake::Entity::find()
216    ///     .order_by_asc(cake::Column::Id)
217    ///     .paginate(db, 50)
218    ///     .into_stream();
219    ///
220    /// while let Some(cakes) = cake_stream.next() {
221    ///     // Do something on cakes: Vec<cake::Model>
222    /// }
223    /// #
224    /// # Ok(())
225    /// # }
226    /// ```
227    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
228        #[cfg(not(feature = "sync"))]
229        {
230            let mut streamer = self;
231            Box::pin(stream! {
232                while let Some(vec) = streamer.fetch_and_next().await? {
233                    yield Ok(vec);
234                }
235            })
236        }
237        #[cfg(feature = "sync")]
238        {
239            Box::new(PaginatorStream { paginator: self })
240        }
241    }
242}
243
244#[cfg(feature = "sync")]
245#[derive(Debug)]
246/// Stream items by page
247pub struct PaginatorStream<'db, C, S>
248where
249    C: ConnectionTrait,
250    S: SelectorTrait + 'db,
251{
252    paginator: Paginator<'db, C, S>,
253}
254
255#[async_trait::async_trait]
256/// A Trait for any type that can paginate results
257pub trait PaginatorTrait<'db, C>
258where
259    C: ConnectionTrait,
260{
261    /// Select operation
262    type Selector: SelectorTrait + Send + Sync + 'db;
263
264    /// Paginate the result of a select operation.
265    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
266
267    /// Perform a count on the paginated results
268    async fn count(self, db: &'db C) -> Result<u64, DbErr>
269    where
270        Self: Send + Sized,
271    {
272        self.paginate(db, 1).num_items().await
273    }
274}
275
276impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
277where
278    C: ConnectionTrait,
279    S: SelectorTrait + Send + Sync + 'db,
280{
281    type Selector = S;
282
283    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
284        assert!(page_size != 0, "page_size should not be zero");
285        Paginator {
286            query: self.query,
287            page: 0,
288            page_size,
289            db,
290            selector: PhantomData,
291        }
292    }
293}
294
295impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
296where
297    C: ConnectionTrait,
298    S: SelectorTrait + Send + Sync + 'db,
299{
300    type Selector = S;
301    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
302        assert!(page_size != 0, "page_size should not be zero");
303        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
304        let mut query = SelectStatement::new();
305        query.expr(if let Some(values) = self.stmt.values {
306            Expr::cust_with_values(sql, values.0)
307        } else {
308            Expr::cust(sql)
309        });
310
311        Paginator {
312            query,
313            page: 0,
314            page_size,
315            db,
316            selector: PhantomData,
317        }
318    }
319}
320
321impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
322where
323    C: ConnectionTrait,
324    E: EntityTrait<Model = M>,
325    M: FromQueryResult + Sized + Send + Sync + 'db,
326{
327    type Selector = SelectModel<M>;
328
329    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
330        self.into_model().paginate(db, page_size)
331    }
332}
333
334impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
335where
336    C: ConnectionTrait,
337    E: EntityTrait<Model = M>,
338    F: EntityTrait<Model = N>,
339    M: FromQueryResult + Sized + Send + Sync + 'db,
340    N: FromQueryResult + Sized + Send + Sync + 'db,
341{
342    type Selector = SelectTwoModel<M, N>;
343
344    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
345        self.into_model().paginate(db, page_size)
346    }
347}
348
349#[cfg(feature = "sync")]
350impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
351where
352    C: ConnectionTrait,
353    S: SelectorTrait + 'db,
354{
355    type Item = Result<Vec<S::Item>, DbErr>;
356
357    fn next(&mut self) -> Option<Self::Item> {
358        match self.paginator.fetch_and_next() {
359            Ok(Some(vec)) => Some(Ok(vec)),
360            Ok(None) => None,
361            Err(e) => Some(Err(e)),
362        }
363    }
364}
365
366#[cfg(test)]
367#[cfg(feature = "mock")]
368mod tests {
369    use super::*;
370    use crate::entity::prelude::*;
371    #[cfg(feature = "sync")]
372    use crate::util::StreamShim;
373    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
374    use crate::{QueryOrder, QuerySelect};
375    use crate::{Statement, tests_cfg::*};
376    use futures_util::{TryStreamExt, stream::TryNext};
377    use pretty_assertions::assert_eq;
378    use sea_query::{Expr, SelectStatement, Value};
379    use std::sync::LazyLock;
380
381    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
382        Statement::from_sql_and_values(
383            DbBackend::Postgres,
384            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
385            [],
386        )
387    });
388
389    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
390        let page1 = vec![
391            fruit::Model {
392                id: 1,
393                name: "Blueberry".into(),
394                cake_id: Some(1),
395            },
396            fruit::Model {
397                id: 2,
398                name: "Raspberry".into(),
399                cake_id: Some(1),
400            },
401        ];
402
403        let page2 = vec![fruit::Model {
404            id: 3,
405            name: "Strawberry".into(),
406            cake_id: Some(2),
407        }];
408
409        let page3 = Vec::<fruit::Model>::new();
410
411        let db = MockDatabase::new(DbBackend::Postgres)
412            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
413            .into_connection();
414
415        (db, vec![page1, page2, page3])
416    }
417
418    fn setup_num_items() -> (DatabaseConnection, i64) {
419        let num_items = 3;
420        let db = MockDatabase::new(DbBackend::Postgres)
421            .append_query_results([[maplit::btreemap! {
422                "num_items" => Into::<Value>::into(num_items),
423            }]])
424            .into_connection();
425
426        (db, num_items)
427    }
428
429    #[smol_potat::test]
430    async fn fetch_page() -> Result<(), DbErr> {
431        let (db, pages) = setup();
432
433        let paginator = fruit::Entity::find().paginate(&db, 2);
434
435        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
436        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
437        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
438
439        let mut select = SelectStatement::new()
440            .exprs([
441                Expr::col((fruit::Entity, fruit::Column::Id)),
442                Expr::col((fruit::Entity, fruit::Column::Name)),
443                Expr::col((fruit::Entity, fruit::Column::CakeId)),
444            ])
445            .from(fruit::Entity)
446            .to_owned();
447
448        let query_builder = db.get_database_backend();
449        let stmts = [
450            query_builder.build(select.clone().offset(0).limit(2)),
451            query_builder.build(select.clone().offset(2).limit(2)),
452            query_builder.build(select.offset(4).limit(2)),
453        ];
454
455        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
456        Ok(())
457    }
458
459    #[smol_potat::test]
460    async fn fetch_page_raw() -> Result<(), DbErr> {
461        let (db, pages) = setup();
462
463        let paginator = fruit::Entity::find()
464            .from_raw_sql(RAW_STMT.clone())
465            .paginate(&db, 2);
466
467        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
468        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
469        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
470
471        let mut select = SelectStatement::new()
472            .exprs([
473                Expr::col((fruit::Entity, fruit::Column::Id)),
474                Expr::col((fruit::Entity, fruit::Column::Name)),
475                Expr::col((fruit::Entity, fruit::Column::CakeId)),
476            ])
477            .from(fruit::Entity)
478            .to_owned();
479
480        let query_builder = db.get_database_backend();
481        let stmts = [
482            query_builder.build(select.clone().offset(0).limit(2)),
483            query_builder.build(select.clone().offset(2).limit(2)),
484            query_builder.build(select.offset(4).limit(2)),
485        ];
486
487        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
488        Ok(())
489    }
490
491    #[smol_potat::test]
492    async fn fetch() -> Result<(), DbErr> {
493        let (db, pages) = setup();
494
495        let mut paginator = fruit::Entity::find().paginate(&db, 2);
496
497        assert_eq!(paginator.fetch().await?, pages[0].clone());
498        paginator.next();
499
500        assert_eq!(paginator.fetch().await?, pages[1].clone());
501        paginator.next();
502
503        assert_eq!(paginator.fetch().await?, pages[2].clone());
504
505        let mut select = SelectStatement::new()
506            .exprs([
507                Expr::col((fruit::Entity, fruit::Column::Id)),
508                Expr::col((fruit::Entity, fruit::Column::Name)),
509                Expr::col((fruit::Entity, fruit::Column::CakeId)),
510            ])
511            .from(fruit::Entity)
512            .to_owned();
513
514        let query_builder = db.get_database_backend();
515        let stmts = [
516            query_builder.build(select.clone().offset(0).limit(2)),
517            query_builder.build(select.clone().offset(2).limit(2)),
518            query_builder.build(select.offset(4).limit(2)),
519        ];
520
521        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
522        Ok(())
523    }
524
525    #[smol_potat::test]
526    async fn fetch_raw() -> Result<(), DbErr> {
527        let (db, pages) = setup();
528
529        let mut paginator = fruit::Entity::find()
530            .from_raw_sql(RAW_STMT.clone())
531            .paginate(&db, 2);
532
533        assert_eq!(paginator.fetch().await?, pages[0].clone());
534        paginator.next();
535
536        assert_eq!(paginator.fetch().await?, pages[1].clone());
537        paginator.next();
538
539        assert_eq!(paginator.fetch().await?, pages[2].clone());
540
541        let mut select = SelectStatement::new()
542            .exprs([
543                Expr::col((fruit::Entity, fruit::Column::Id)),
544                Expr::col((fruit::Entity, fruit::Column::Name)),
545                Expr::col((fruit::Entity, fruit::Column::CakeId)),
546            ])
547            .from(fruit::Entity)
548            .to_owned();
549
550        let query_builder = db.get_database_backend();
551        let stmts = [
552            query_builder.build(select.clone().offset(0).limit(2)),
553            query_builder.build(select.clone().offset(2).limit(2)),
554            query_builder.build(select.offset(4).limit(2)),
555        ];
556
557        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
558        Ok(())
559    }
560
561    #[smol_potat::test]
562    async fn num_pages() -> Result<(), DbErr> {
563        let (db, num_items) = setup_num_items();
564
565        let num_items = num_items as u64;
566        let page_size = 2_u64;
567        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
568        let paginator = fruit::Entity::find().paginate(&db, page_size);
569
570        assert_eq!(paginator.num_pages().await?, num_pages);
571
572        let sub_query = SelectStatement::new()
573            .exprs([
574                Expr::col((fruit::Entity, fruit::Column::Id)),
575                Expr::col((fruit::Entity, fruit::Column::Name)),
576                Expr::col((fruit::Entity, fruit::Column::CakeId)),
577            ])
578            .from(fruit::Entity)
579            .to_owned();
580
581        let select = SelectStatement::new()
582            .expr(Expr::cust("COUNT(*) AS num_items"))
583            .from_subquery(sub_query, "sub_query")
584            .to_owned();
585
586        let query_builder = db.get_database_backend();
587        let stmts = [query_builder.build(&select)];
588
589        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
590        Ok(())
591    }
592
593    #[smol_potat::test]
594    async fn num_pages_raw() -> Result<(), DbErr> {
595        let (db, num_items) = setup_num_items();
596
597        let num_items = num_items as u64;
598        let page_size = 2_u64;
599        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
600        let paginator = fruit::Entity::find()
601            .from_raw_sql(RAW_STMT.clone())
602            .paginate(&db, page_size);
603
604        assert_eq!(paginator.num_pages().await?, num_pages);
605
606        let sub_query = SelectStatement::new()
607            .exprs([
608                Expr::col((fruit::Entity, fruit::Column::Id)),
609                Expr::col((fruit::Entity, fruit::Column::Name)),
610                Expr::col((fruit::Entity, fruit::Column::CakeId)),
611            ])
612            .from(fruit::Entity)
613            .to_owned();
614
615        let select = SelectStatement::new()
616            .expr(Expr::cust("COUNT(*) AS num_items"))
617            .from_subquery(sub_query, "sub_query")
618            .to_owned();
619
620        let query_builder = db.get_database_backend();
621        let stmts = [query_builder.build(&select)];
622
623        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
624        Ok(())
625    }
626
627    #[smol_potat::test]
628    async fn next_and_cur_page() -> Result<(), DbErr> {
629        let (db, _) = setup();
630
631        let mut paginator = fruit::Entity::find().paginate(&db, 2);
632
633        assert_eq!(paginator.cur_page(), 0);
634        paginator.next();
635
636        assert_eq!(paginator.cur_page(), 1);
637        paginator.next();
638
639        assert_eq!(paginator.cur_page(), 2);
640        Ok(())
641    }
642
643    #[smol_potat::test]
644    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
645        let (db, _) = setup();
646
647        let mut paginator = fruit::Entity::find()
648            .from_raw_sql(RAW_STMT.clone())
649            .paginate(&db, 2);
650
651        assert_eq!(paginator.cur_page(), 0);
652        paginator.next();
653
654        assert_eq!(paginator.cur_page(), 1);
655        paginator.next();
656
657        assert_eq!(paginator.cur_page(), 2);
658        Ok(())
659    }
660
661    #[smol_potat::test]
662    async fn fetch_and_next() -> Result<(), DbErr> {
663        let (db, pages) = setup();
664
665        let mut paginator = fruit::Entity::find().paginate(&db, 2);
666
667        assert_eq!(paginator.cur_page(), 0);
668        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
669
670        assert_eq!(paginator.cur_page(), 1);
671        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
672
673        assert_eq!(paginator.cur_page(), 2);
674        assert_eq!(paginator.fetch_and_next().await?, None);
675
676        let mut select = SelectStatement::new()
677            .exprs([
678                Expr::col((fruit::Entity, fruit::Column::Id)),
679                Expr::col((fruit::Entity, fruit::Column::Name)),
680                Expr::col((fruit::Entity, fruit::Column::CakeId)),
681            ])
682            .from(fruit::Entity)
683            .to_owned();
684
685        let query_builder = db.get_database_backend();
686        let stmts = [
687            query_builder.build(select.clone().offset(0).limit(2)),
688            query_builder.build(select.clone().offset(2).limit(2)),
689            query_builder.build(select.offset(4).limit(2)),
690        ];
691
692        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
693        Ok(())
694    }
695
696    #[smol_potat::test]
697    async fn fetch_and_next_raw() -> Result<(), DbErr> {
698        let (db, pages) = setup();
699
700        let mut paginator = fruit::Entity::find()
701            .from_raw_sql(RAW_STMT.clone())
702            .paginate(&db, 2);
703
704        assert_eq!(paginator.cur_page(), 0);
705        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
706
707        assert_eq!(paginator.cur_page(), 1);
708        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
709
710        assert_eq!(paginator.cur_page(), 2);
711        assert_eq!(paginator.fetch_and_next().await?, None);
712
713        let mut select = SelectStatement::new()
714            .exprs([
715                Expr::col((fruit::Entity, fruit::Column::Id)),
716                Expr::col((fruit::Entity, fruit::Column::Name)),
717                Expr::col((fruit::Entity, fruit::Column::CakeId)),
718            ])
719            .from(fruit::Entity)
720            .to_owned();
721
722        let query_builder = db.get_database_backend();
723        let stmts = [
724            query_builder.build(select.clone().offset(0).limit(2)),
725            query_builder.build(select.clone().offset(2).limit(2)),
726            query_builder.build(select.offset(4).limit(2)),
727        ];
728
729        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
730        Ok(())
731    }
732
733    #[smol_potat::test]
734    async fn into_stream() -> Result<(), DbErr> {
735        let (db, pages) = setup();
736
737        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
738
739        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
740        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
741        assert_eq!(fruit_stream.try_next().await?, None);
742
743        drop(fruit_stream);
744
745        let mut select = SelectStatement::new()
746            .exprs([
747                Expr::col((fruit::Entity, fruit::Column::Id)),
748                Expr::col((fruit::Entity, fruit::Column::Name)),
749                Expr::col((fruit::Entity, fruit::Column::CakeId)),
750            ])
751            .from(fruit::Entity)
752            .to_owned();
753
754        let query_builder = db.get_database_backend();
755        let stmts = [
756            query_builder.build(select.clone().offset(0).limit(2)),
757            query_builder.build(select.clone().offset(2).limit(2)),
758            query_builder.build(select.offset(4).limit(2)),
759        ];
760
761        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
762        Ok(())
763    }
764
765    #[smol_potat::test]
766    async fn into_stream_raw() -> Result<(), DbErr> {
767        let (db, pages) = setup();
768
769        let mut fruit_stream = fruit::Entity::find()
770            .from_raw_sql(RAW_STMT.clone())
771            .paginate(&db, 2)
772            .into_stream();
773
774        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
775        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
776        assert_eq!(fruit_stream.try_next().await?, None);
777
778        drop(fruit_stream);
779
780        let mut select = SelectStatement::new()
781            .exprs([
782                Expr::col((fruit::Entity, fruit::Column::Id)),
783                Expr::col((fruit::Entity, fruit::Column::Name)),
784                Expr::col((fruit::Entity, fruit::Column::CakeId)),
785            ])
786            .from(fruit::Entity)
787            .to_owned();
788
789        let query_builder = db.get_database_backend();
790        let stmts = [
791            query_builder.build(select.clone().offset(0).limit(2)),
792            query_builder.build(select.clone().offset(2).limit(2)),
793            query_builder.build(select.offset(4).limit(2)),
794        ];
795
796        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
797        Ok(())
798    }
799
800    #[smol_potat::test]
801    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
802        let (db, pages) = setup();
803
804        let raw_stmt = Statement::from_sql_and_values(
805            DbBackend::Postgres,
806            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
807            [],
808        );
809
810        let mut fruit_stream = fruit::Entity::find()
811            .from_raw_sql(raw_stmt.clone())
812            .paginate(&db, 2)
813            .into_stream();
814
815        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
816        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
817        assert_eq!(fruit_stream.try_next().await?, None);
818
819        drop(fruit_stream);
820
821        let mut select = SelectStatement::new()
822            .exprs([
823                Expr::col((fruit::Entity, fruit::Column::Id)),
824                Expr::col((fruit::Entity, fruit::Column::Name)),
825                Expr::col((fruit::Entity, fruit::Column::CakeId)),
826            ])
827            .from(fruit::Entity)
828            .to_owned();
829
830        let query_builder = db.get_database_backend();
831        let stmts = [
832            query_builder.build(select.clone().offset(0).limit(2)),
833            query_builder.build(select.clone().offset(2).limit(2)),
834            query_builder.build(select.offset(4).limit(2)),
835        ];
836
837        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
838        Ok(())
839    }
840
841    #[smol_potat::test]
842    #[should_panic]
843    async fn error() {
844        let (db, _pages) = setup();
845
846        fruit::Entity::find().paginate(&db, 0);
847    }
848}