sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel, SelectTwo,
3    SelectTwoModel, Selector, SelectorRaw, SelectorTrait, error::*,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10#[cfg(not(feature = "sync"))]
11type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12#[cfg(feature = "sync")]
13type PinBoxStream<'db, Item> = Box<dyn Iterator<Item = Item> + 'db>;
14
15/// Defined a structure to handle pagination of a result from a query operation on a Model
16#[derive(Clone, Debug)]
17pub struct Paginator<'db, C, S>
18where
19    C: ConnectionTrait,
20    S: SelectorTrait + 'db,
21{
22    pub(crate) query: SelectStatement,
23    pub(crate) page: u64,
24    pub(crate) page_size: u64,
25    pub(crate) db: &'db C,
26    pub(crate) selector: PhantomData<S>,
27}
28
29/// Define a structure containing the numbers of items and pages of a Paginator
30#[derive(Clone, Debug)]
31pub struct ItemsAndPagesNumber {
32    /// The total number of items of a paginator
33    pub number_of_items: u64,
34    /// The total number of pages of a paginator
35    pub number_of_pages: u64,
36}
37
38// LINT: warn if paginator is used without an order by clause
39
40impl<'db, C, S> Paginator<'db, C, S>
41where
42    C: ConnectionTrait,
43    S: SelectorTrait + 'db,
44{
45    /// Fetch a specific page; page index starts from zero
46    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
47        let query = self
48            .query
49            .clone()
50            .limit(self.page_size)
51            .offset(self.page_size * page)
52            .to_owned();
53        let rows = self.db.query_all(&query).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            buffer.push(S::from_raw_query_result(row)?);
57        }
58        Ok(buffer)
59    }
60
61    /// Fetch the current page
62    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
63        self.fetch_page(self.page).await
64    }
65
66    /// Get the total number of items
67    pub async fn num_items(&self) -> Result<u64, DbErr> {
68        let db_backend = self.db.get_database_backend();
69        let query = SelectStatement::new()
70            .expr(Expr::cust("COUNT(*) AS num_items"))
71            .from_subquery(
72                self.query
73                    .clone()
74                    .reset_limit()
75                    .reset_offset()
76                    .clear_order_by()
77                    .to_owned(),
78                "sub_query",
79            )
80            .to_owned();
81        let result = match self.db.query_one(&query).await? {
82            Some(res) => res,
83            None => return Ok(0),
84        };
85        let num_items = match db_backend {
86            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
87            _ => result.try_get::<i32>("", "num_items")? as u64,
88        };
89        Ok(num_items)
90    }
91
92    /// Get the total number of pages
93    pub async fn num_pages(&self) -> Result<u64, DbErr> {
94        let num_items = self.num_items().await?;
95        let num_pages = self.compute_pages_number(num_items);
96        Ok(num_pages)
97    }
98
99    /// Get the total number of items and pages
100    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
101        let number_of_items = self.num_items().await?;
102        let number_of_pages = self.compute_pages_number(number_of_items);
103
104        Ok(ItemsAndPagesNumber {
105            number_of_items,
106            number_of_pages,
107        })
108    }
109
110    /// Compute the number of pages for the current page
111    fn compute_pages_number(&self, num_items: u64) -> u64 {
112        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
113    }
114
115    /// Increment the page counter
116    pub fn next(&mut self) {
117        self.page += 1;
118    }
119
120    /// Get current page number
121    pub fn cur_page(&self) -> u64 {
122        self.page
123    }
124
125    /// Fetch one page and increment the page counter
126    ///
127    /// ```
128    /// # use sea_orm::{error::*, tests_cfg::*, *};
129    /// #
130    /// # #[smol_potat::main]
131    /// # #[cfg(feature = "mock")]
132    /// # pub async fn main() -> Result<(), DbErr> {
133    /// #
134    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
135    /// #     .append_query_results([
136    /// #         vec![cake::Model {
137    /// #             id: 1,
138    /// #             name: "Cake".to_owned(),
139    /// #         }],
140    /// #         vec![],
141    /// #     ])
142    /// #     .into_connection();
143    /// # let db = &owned_db;
144    /// #
145    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
146    /// let mut cake_pages = cake::Entity::find()
147    ///     .order_by_asc(cake::Column::Id)
148    ///     .paginate(db, 50);
149    ///
150    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
151    ///     // Do something on cakes: Vec<cake::Model>
152    /// }
153    /// #
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
158        let vec = self.fetch().await?;
159        self.next();
160        let opt = if !vec.is_empty() { Some(vec) } else { None };
161        Ok(opt)
162    }
163
164    /// Convert self into an async stream
165    ///
166    /// ```
167    /// # use sea_orm::{error::*, tests_cfg::*, *};
168    /// #
169    /// # #[smol_potat::main]
170    /// # #[cfg(all(feature = "mock", not(feature = "sync")))]
171    /// # pub async fn main() -> Result<(), DbErr> {
172    /// #
173    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
174    /// #     .append_query_results([
175    /// #         vec![cake::Model {
176    /// #             id: 1,
177    /// #             name: "Cake".to_owned(),
178    /// #         }],
179    /// #         vec![],
180    /// #     ])
181    /// #     .into_connection();
182    /// # let db = &owned_db;
183    /// #
184    /// use futures_util::TryStreamExt;
185    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
186    /// let mut cake_stream = cake::Entity::find()
187    ///     .order_by_asc(cake::Column::Id)
188    ///     .paginate(db, 50)
189    ///     .into_stream();
190    ///
191    /// while let Some(cakes) = cake_stream.try_next().await? {
192    ///     // Do something on cakes: Vec<cake::Model>
193    /// }
194    /// #
195    /// # Ok(())
196    /// # }
197    /// # #[cfg(all(feature = "mock", feature = "sync"))]
198    /// # fn main() {}
199    /// ```
200    /// (for SeaORM Sync)
201    /// ```
202    /// # use sea_orm::{error::*, tests_cfg::*, *};
203    /// # #[cfg(all(feature = "mock", feature = "sync"))]
204    /// # fn example() -> Result<(), DbErr> {
205    /// #
206    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
207    /// #     .append_query_results([
208    /// #         vec![cake::Model {
209    /// #             id: 1,
210    /// #             name: "Cake".to_owned(),
211    /// #         }],
212    /// #         vec![],
213    /// #     ])
214    /// #     .into_connection();
215    /// # let db = &owned_db;
216    /// #
217    /// use futures_util::TryStreamExt;
218    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
219    /// let mut cake_stream = cake::Entity::find()
220    ///     .order_by_asc(cake::Column::Id)
221    ///     .paginate(db, 50)
222    ///     .into_stream();
223    ///
224    /// while let Some(cakes) = cake_stream.next() {
225    ///     // Do something on cakes: Vec<cake::Model>
226    /// }
227    /// #
228    /// # Ok(())
229    /// # }
230    /// ```
231    pub fn into_stream(self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
232        #[cfg(not(feature = "sync"))]
233        {
234            let mut streamer = self;
235            Box::pin(stream! {
236                while let Some(vec) = streamer.fetch_and_next().await? {
237                    yield Ok(vec);
238                }
239            })
240        }
241        #[cfg(feature = "sync")]
242        {
243            Box::new(PaginatorStream { paginator: self })
244        }
245    }
246}
247
248#[cfg(feature = "sync")]
249#[derive(Debug)]
250/// Stream items by page
251pub struct PaginatorStream<'db, C, S>
252where
253    C: ConnectionTrait,
254    S: SelectorTrait + 'db,
255{
256    paginator: Paginator<'db, C, S>,
257}
258
259#[async_trait::async_trait]
260/// A Trait for any type that can paginate results
261pub trait PaginatorTrait<'db, C>
262where
263    C: ConnectionTrait,
264{
265    /// Select operation
266    type Selector: SelectorTrait + Send + Sync + 'db;
267
268    /// Paginate the result of a select operation.
269    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
270
271    /// Perform a count on the paginated results
272    async fn count(self, db: &'db C) -> Result<u64, DbErr>
273    where
274        Self: Send + Sized,
275    {
276        self.paginate(db, 1).num_items().await
277    }
278
279    /// Check if any records exist
280    async fn exists(self, db: &'db C) -> Result<bool, DbErr>
281    where
282        Self: Send + Sized,
283    {
284        let paginator = self.paginate(db, 1);
285        let stmt = SelectStatement::new()
286            .expr(Expr::cust("1"))
287            .from_subquery(
288                paginator
289                    .query
290                    .clone()
291                    .reset_limit()
292                    .reset_offset()
293                    .clear_order_by()
294                    .limit(1)
295                    .to_owned(),
296                "sub_query",
297            )
298            .limit(1)
299            .to_owned();
300        let result = db.query_one(&stmt).await?;
301        Ok(result.is_some())
302    }
303}
304
305impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
306where
307    C: ConnectionTrait,
308    S: SelectorTrait + Send + Sync + 'db,
309{
310    type Selector = S;
311
312    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
313        assert!(page_size != 0, "page_size should not be zero");
314        Paginator {
315            query: self.query,
316            page: 0,
317            page_size,
318            db,
319            selector: PhantomData,
320        }
321    }
322}
323
324impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
325where
326    C: ConnectionTrait,
327    S: SelectorTrait + Send + Sync + 'db,
328{
329    type Selector = S;
330    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
331        assert!(page_size != 0, "page_size should not be zero");
332        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
333        let mut query = SelectStatement::new();
334        query.expr(if let Some(values) = self.stmt.values {
335            Expr::cust_with_values(sql, values.0)
336        } else {
337            Expr::cust(sql)
338        });
339
340        Paginator {
341            query,
342            page: 0,
343            page_size,
344            db,
345            selector: PhantomData,
346        }
347    }
348}
349
350impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
351where
352    C: ConnectionTrait,
353    E: EntityTrait<Model = M>,
354    M: FromQueryResult + Sized + Send + Sync + 'db,
355{
356    type Selector = SelectModel<M>;
357
358    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
359        self.into_model().paginate(db, page_size)
360    }
361}
362
363impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
364where
365    C: ConnectionTrait,
366    E: EntityTrait<Model = M>,
367    F: EntityTrait<Model = N>,
368    M: FromQueryResult + Sized + Send + Sync + 'db,
369    N: FromQueryResult + Sized + Send + Sync + 'db,
370{
371    type Selector = SelectTwoModel<M, N>;
372
373    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
374        self.into_model().paginate(db, page_size)
375    }
376}
377
378#[cfg(feature = "sync")]
379impl<'db, C, S> Iterator for PaginatorStream<'db, C, S>
380where
381    C: ConnectionTrait,
382    S: SelectorTrait + 'db,
383{
384    type Item = Result<Vec<S::Item>, DbErr>;
385
386    fn next(&mut self) -> Option<Self::Item> {
387        match self.paginator.fetch_and_next() {
388            Ok(Some(vec)) => Some(Ok(vec)),
389            Ok(None) => None,
390            Err(e) => Some(Err(e)),
391        }
392    }
393}
394
395#[cfg(test)]
396#[cfg(feature = "mock")]
397mod tests {
398    use super::*;
399    use crate::entity::prelude::*;
400    #[cfg(feature = "sync")]
401    use crate::util::StreamShim;
402    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
403    use crate::{Statement, tests_cfg::*};
404    use futures_util::{TryStreamExt, stream::TryNext};
405    use pretty_assertions::assert_eq;
406    use sea_query::{Expr, SelectStatement, Value};
407    use std::sync::LazyLock;
408
409    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
410        Statement::from_sql_and_values(
411            DbBackend::Postgres,
412            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
413            [],
414        )
415    });
416
417    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
418        let page1 = vec![
419            fruit::Model {
420                id: 1,
421                name: "Blueberry".into(),
422                cake_id: Some(1),
423            },
424            fruit::Model {
425                id: 2,
426                name: "Raspberry".into(),
427                cake_id: Some(1),
428            },
429        ];
430
431        let page2 = vec![fruit::Model {
432            id: 3,
433            name: "Strawberry".into(),
434            cake_id: Some(2),
435        }];
436
437        let page3 = Vec::<fruit::Model>::new();
438
439        let db = MockDatabase::new(DbBackend::Postgres)
440            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
441            .into_connection();
442
443        (db, vec![page1, page2, page3])
444    }
445
446    fn setup_num_items() -> (DatabaseConnection, i64) {
447        let num_items = 3;
448        let db = MockDatabase::new(DbBackend::Postgres)
449            .append_query_results([[maplit::btreemap! {
450                "num_items" => Into::<Value>::into(num_items),
451            }]])
452            .into_connection();
453
454        (db, num_items)
455    }
456
457    #[smol_potat::test]
458    async fn fetch_page() -> Result<(), DbErr> {
459        let (db, pages) = setup();
460
461        let paginator = fruit::Entity::find().paginate(&db, 2);
462
463        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
464        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
465        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
466
467        let mut select = SelectStatement::new()
468            .exprs([
469                Expr::col((fruit::Entity, fruit::Column::Id)),
470                Expr::col((fruit::Entity, fruit::Column::Name)),
471                Expr::col((fruit::Entity, fruit::Column::CakeId)),
472            ])
473            .from(fruit::Entity)
474            .to_owned();
475
476        let query_builder = db.get_database_backend();
477        let stmts = [
478            query_builder.build(select.clone().offset(0).limit(2)),
479            query_builder.build(select.clone().offset(2).limit(2)),
480            query_builder.build(select.offset(4).limit(2)),
481        ];
482
483        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
484        Ok(())
485    }
486
487    #[smol_potat::test]
488    async fn fetch_page_raw() -> Result<(), DbErr> {
489        let (db, pages) = setup();
490
491        let paginator = fruit::Entity::find()
492            .from_raw_sql(RAW_STMT.clone())
493            .paginate(&db, 2);
494
495        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
496        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
497        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
498
499        let mut select = SelectStatement::new()
500            .exprs([
501                Expr::col((fruit::Entity, fruit::Column::Id)),
502                Expr::col((fruit::Entity, fruit::Column::Name)),
503                Expr::col((fruit::Entity, fruit::Column::CakeId)),
504            ])
505            .from(fruit::Entity)
506            .to_owned();
507
508        let query_builder = db.get_database_backend();
509        let stmts = [
510            query_builder.build(select.clone().offset(0).limit(2)),
511            query_builder.build(select.clone().offset(2).limit(2)),
512            query_builder.build(select.offset(4).limit(2)),
513        ];
514
515        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
516        Ok(())
517    }
518
519    #[smol_potat::test]
520    async fn fetch() -> Result<(), DbErr> {
521        let (db, pages) = setup();
522
523        let mut paginator = fruit::Entity::find().paginate(&db, 2);
524
525        assert_eq!(paginator.fetch().await?, pages[0].clone());
526        paginator.next();
527
528        assert_eq!(paginator.fetch().await?, pages[1].clone());
529        paginator.next();
530
531        assert_eq!(paginator.fetch().await?, pages[2].clone());
532
533        let mut select = SelectStatement::new()
534            .exprs([
535                Expr::col((fruit::Entity, fruit::Column::Id)),
536                Expr::col((fruit::Entity, fruit::Column::Name)),
537                Expr::col((fruit::Entity, fruit::Column::CakeId)),
538            ])
539            .from(fruit::Entity)
540            .to_owned();
541
542        let query_builder = db.get_database_backend();
543        let stmts = [
544            query_builder.build(select.clone().offset(0).limit(2)),
545            query_builder.build(select.clone().offset(2).limit(2)),
546            query_builder.build(select.offset(4).limit(2)),
547        ];
548
549        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
550        Ok(())
551    }
552
553    #[smol_potat::test]
554    async fn fetch_raw() -> Result<(), DbErr> {
555        let (db, pages) = setup();
556
557        let mut paginator = fruit::Entity::find()
558            .from_raw_sql(RAW_STMT.clone())
559            .paginate(&db, 2);
560
561        assert_eq!(paginator.fetch().await?, pages[0].clone());
562        paginator.next();
563
564        assert_eq!(paginator.fetch().await?, pages[1].clone());
565        paginator.next();
566
567        assert_eq!(paginator.fetch().await?, pages[2].clone());
568
569        let mut select = SelectStatement::new()
570            .exprs([
571                Expr::col((fruit::Entity, fruit::Column::Id)),
572                Expr::col((fruit::Entity, fruit::Column::Name)),
573                Expr::col((fruit::Entity, fruit::Column::CakeId)),
574            ])
575            .from(fruit::Entity)
576            .to_owned();
577
578        let query_builder = db.get_database_backend();
579        let stmts = [
580            query_builder.build(select.clone().offset(0).limit(2)),
581            query_builder.build(select.clone().offset(2).limit(2)),
582            query_builder.build(select.offset(4).limit(2)),
583        ];
584
585        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
586        Ok(())
587    }
588
589    #[smol_potat::test]
590    async fn num_pages() -> Result<(), DbErr> {
591        let (db, num_items) = setup_num_items();
592
593        let num_items = num_items as u64;
594        let page_size = 2_u64;
595        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
596        let paginator = fruit::Entity::find().paginate(&db, page_size);
597
598        assert_eq!(paginator.num_pages().await?, num_pages);
599
600        let sub_query = SelectStatement::new()
601            .exprs([
602                Expr::col((fruit::Entity, fruit::Column::Id)),
603                Expr::col((fruit::Entity, fruit::Column::Name)),
604                Expr::col((fruit::Entity, fruit::Column::CakeId)),
605            ])
606            .from(fruit::Entity)
607            .to_owned();
608
609        let select = SelectStatement::new()
610            .expr(Expr::cust("COUNT(*) AS num_items"))
611            .from_subquery(sub_query, "sub_query")
612            .to_owned();
613
614        let query_builder = db.get_database_backend();
615        let stmts = [query_builder.build(&select)];
616
617        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
618        Ok(())
619    }
620
621    #[smol_potat::test]
622    async fn num_pages_raw() -> Result<(), DbErr> {
623        let (db, num_items) = setup_num_items();
624
625        let num_items = num_items as u64;
626        let page_size = 2_u64;
627        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
628        let paginator = fruit::Entity::find()
629            .from_raw_sql(RAW_STMT.clone())
630            .paginate(&db, page_size);
631
632        assert_eq!(paginator.num_pages().await?, num_pages);
633
634        let sub_query = SelectStatement::new()
635            .exprs([
636                Expr::col((fruit::Entity, fruit::Column::Id)),
637                Expr::col((fruit::Entity, fruit::Column::Name)),
638                Expr::col((fruit::Entity, fruit::Column::CakeId)),
639            ])
640            .from(fruit::Entity)
641            .to_owned();
642
643        let select = SelectStatement::new()
644            .expr(Expr::cust("COUNT(*) AS num_items"))
645            .from_subquery(sub_query, "sub_query")
646            .to_owned();
647
648        let query_builder = db.get_database_backend();
649        let stmts = [query_builder.build(&select)];
650
651        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
652        Ok(())
653    }
654
655    #[smol_potat::test]
656    async fn next_and_cur_page() -> Result<(), DbErr> {
657        let (db, _) = setup();
658
659        let mut paginator = fruit::Entity::find().paginate(&db, 2);
660
661        assert_eq!(paginator.cur_page(), 0);
662        paginator.next();
663
664        assert_eq!(paginator.cur_page(), 1);
665        paginator.next();
666
667        assert_eq!(paginator.cur_page(), 2);
668        Ok(())
669    }
670
671    #[smol_potat::test]
672    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
673        let (db, _) = setup();
674
675        let mut paginator = fruit::Entity::find()
676            .from_raw_sql(RAW_STMT.clone())
677            .paginate(&db, 2);
678
679        assert_eq!(paginator.cur_page(), 0);
680        paginator.next();
681
682        assert_eq!(paginator.cur_page(), 1);
683        paginator.next();
684
685        assert_eq!(paginator.cur_page(), 2);
686        Ok(())
687    }
688
689    #[smol_potat::test]
690    async fn fetch_and_next() -> Result<(), DbErr> {
691        let (db, pages) = setup();
692
693        let mut paginator = fruit::Entity::find().paginate(&db, 2);
694
695        assert_eq!(paginator.cur_page(), 0);
696        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
697
698        assert_eq!(paginator.cur_page(), 1);
699        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
700
701        assert_eq!(paginator.cur_page(), 2);
702        assert_eq!(paginator.fetch_and_next().await?, None);
703
704        let mut select = SelectStatement::new()
705            .exprs([
706                Expr::col((fruit::Entity, fruit::Column::Id)),
707                Expr::col((fruit::Entity, fruit::Column::Name)),
708                Expr::col((fruit::Entity, fruit::Column::CakeId)),
709            ])
710            .from(fruit::Entity)
711            .to_owned();
712
713        let query_builder = db.get_database_backend();
714        let stmts = [
715            query_builder.build(select.clone().offset(0).limit(2)),
716            query_builder.build(select.clone().offset(2).limit(2)),
717            query_builder.build(select.offset(4).limit(2)),
718        ];
719
720        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
721        Ok(())
722    }
723
724    #[smol_potat::test]
725    async fn fetch_and_next_raw() -> Result<(), DbErr> {
726        let (db, pages) = setup();
727
728        let mut paginator = fruit::Entity::find()
729            .from_raw_sql(RAW_STMT.clone())
730            .paginate(&db, 2);
731
732        assert_eq!(paginator.cur_page(), 0);
733        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
734
735        assert_eq!(paginator.cur_page(), 1);
736        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
737
738        assert_eq!(paginator.cur_page(), 2);
739        assert_eq!(paginator.fetch_and_next().await?, None);
740
741        let mut select = SelectStatement::new()
742            .exprs([
743                Expr::col((fruit::Entity, fruit::Column::Id)),
744                Expr::col((fruit::Entity, fruit::Column::Name)),
745                Expr::col((fruit::Entity, fruit::Column::CakeId)),
746            ])
747            .from(fruit::Entity)
748            .to_owned();
749
750        let query_builder = db.get_database_backend();
751        let stmts = [
752            query_builder.build(select.clone().offset(0).limit(2)),
753            query_builder.build(select.clone().offset(2).limit(2)),
754            query_builder.build(select.offset(4).limit(2)),
755        ];
756
757        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
758        Ok(())
759    }
760
761    #[smol_potat::test]
762    async fn into_stream() -> Result<(), DbErr> {
763        let (db, pages) = setup();
764
765        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
766
767        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
768        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
769        assert_eq!(fruit_stream.try_next().await?, None);
770
771        drop(fruit_stream);
772
773        let mut select = SelectStatement::new()
774            .exprs([
775                Expr::col((fruit::Entity, fruit::Column::Id)),
776                Expr::col((fruit::Entity, fruit::Column::Name)),
777                Expr::col((fruit::Entity, fruit::Column::CakeId)),
778            ])
779            .from(fruit::Entity)
780            .to_owned();
781
782        let query_builder = db.get_database_backend();
783        let stmts = [
784            query_builder.build(select.clone().offset(0).limit(2)),
785            query_builder.build(select.clone().offset(2).limit(2)),
786            query_builder.build(select.offset(4).limit(2)),
787        ];
788
789        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
790        Ok(())
791    }
792
793    #[smol_potat::test]
794    async fn into_stream_raw() -> Result<(), DbErr> {
795        let (db, pages) = setup();
796
797        let mut fruit_stream = fruit::Entity::find()
798            .from_raw_sql(RAW_STMT.clone())
799            .paginate(&db, 2)
800            .into_stream();
801
802        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
803        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
804        assert_eq!(fruit_stream.try_next().await?, None);
805
806        drop(fruit_stream);
807
808        let mut select = SelectStatement::new()
809            .exprs([
810                Expr::col((fruit::Entity, fruit::Column::Id)),
811                Expr::col((fruit::Entity, fruit::Column::Name)),
812                Expr::col((fruit::Entity, fruit::Column::CakeId)),
813            ])
814            .from(fruit::Entity)
815            .to_owned();
816
817        let query_builder = db.get_database_backend();
818        let stmts = [
819            query_builder.build(select.clone().offset(0).limit(2)),
820            query_builder.build(select.clone().offset(2).limit(2)),
821            query_builder.build(select.offset(4).limit(2)),
822        ];
823
824        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
825        Ok(())
826    }
827
828    #[smol_potat::test]
829    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
830        let (db, pages) = setup();
831
832        let raw_stmt = Statement::from_sql_and_values(
833            DbBackend::Postgres,
834            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
835            [],
836        );
837
838        let mut fruit_stream = fruit::Entity::find()
839            .from_raw_sql(raw_stmt.clone())
840            .paginate(&db, 2)
841            .into_stream();
842
843        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
844        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
845        assert_eq!(fruit_stream.try_next().await?, None);
846
847        drop(fruit_stream);
848
849        let mut select = SelectStatement::new()
850            .exprs([
851                Expr::col((fruit::Entity, fruit::Column::Id)),
852                Expr::col((fruit::Entity, fruit::Column::Name)),
853                Expr::col((fruit::Entity, fruit::Column::CakeId)),
854            ])
855            .from(fruit::Entity)
856            .to_owned();
857
858        let query_builder = db.get_database_backend();
859        let stmts = [
860            query_builder.build(select.clone().offset(0).limit(2)),
861            query_builder.build(select.clone().offset(2).limit(2)),
862            query_builder.build(select.offset(4).limit(2)),
863        ];
864
865        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
866        Ok(())
867    }
868
869    #[smol_potat::test]
870    #[should_panic]
871    async fn error() {
872        let (db, _pages) = setup();
873
874        fruit::Entity::find().paginate(&db, 0);
875    }
876}