sea_orm/executor/
paginator.rs

1use crate::{
2    error::*, ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel,
3    SelectThree, SelectThreeModel, SelectTwo, SelectTwoModel, Selector, SelectorRaw, SelectorTrait,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Alias, Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10/// Pin a Model so that stream operations can be performed on the model
11pub type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let builder = self.db.get_database_backend();
52        let stmt = builder.build(&query);
53        let rows = self.db.query_all(stmt).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            // TODO: Error handling
57            buffer.push(S::from_raw_query_result(row)?);
58        }
59        Ok(buffer)
60    }
61
62    /// Fetch the current page
63    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
64        self.fetch_page(self.page).await
65    }
66
67    /// Get the total number of items
68    pub async fn num_items(&self) -> Result<u64, DbErr> {
69        let builder = self.db.get_database_backend();
70        let stmt = SelectStatement::new()
71            .expr(Expr::cust("COUNT(*) AS num_items"))
72            .from_subquery(
73                self.query
74                    .clone()
75                    .reset_limit()
76                    .reset_offset()
77                    .clear_order_by()
78                    .to_owned(),
79                Alias::new("sub_query"),
80            )
81            .to_owned();
82        let stmt = builder.build(&stmt);
83        let result = match self.db.query_one(stmt).await? {
84            Some(res) => res,
85            None => return Ok(0),
86        };
87        let num_items = match builder {
88            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
89            _ => result.try_get::<i32>("", "num_items")? as u64,
90        };
91        Ok(num_items)
92    }
93
94    /// Get the total number of pages
95    pub async fn num_pages(&self) -> Result<u64, DbErr> {
96        let num_items = self.num_items().await?;
97        let num_pages = self.compute_pages_number(num_items);
98        Ok(num_pages)
99    }
100
101    /// Get the total number of items and pages
102    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
103        let number_of_items = self.num_items().await?;
104        let number_of_pages = self.compute_pages_number(number_of_items);
105
106        Ok(ItemsAndPagesNumber {
107            number_of_items,
108            number_of_pages,
109        })
110    }
111
112    /// Compute the number of pages for the current page
113    fn compute_pages_number(&self, num_items: u64) -> u64 {
114        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
115    }
116
117    /// Increment the page counter
118    pub fn next(&mut self) {
119        self.page += 1;
120    }
121
122    /// Get current page number
123    pub fn cur_page(&self) -> u64 {
124        self.page
125    }
126
127    /// Fetch one page and increment the page counter
128    ///
129    /// ```
130    /// # use sea_orm::{error::*, tests_cfg::*, *};
131    /// #
132    /// # #[smol_potat::main]
133    /// # #[cfg(feature = "mock")]
134    /// # pub async fn main() -> Result<(), DbErr> {
135    /// #
136    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
137    /// #     .append_query_results([
138    /// #         vec![cake::Model {
139    /// #             id: 1,
140    /// #             name: "Cake".to_owned(),
141    /// #         }],
142    /// #         vec![],
143    /// #     ])
144    /// #     .into_connection();
145    /// # let db = &owned_db;
146    /// #
147    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
148    /// let mut cake_pages = cake::Entity::find()
149    ///     .order_by_asc(cake::Column::Id)
150    ///     .paginate(db, 50);
151    ///
152    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
153    ///     // Do something on cakes: Vec<cake::Model>
154    /// }
155    /// #
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
160        let vec = self.fetch().await?;
161        self.next();
162        let opt = if !vec.is_empty() { Some(vec) } else { None };
163        Ok(opt)
164    }
165
166    /// Convert self into an async stream
167    ///
168    /// ```
169    /// # use sea_orm::{error::*, tests_cfg::*, *};
170    /// #
171    /// # #[smol_potat::main]
172    /// # #[cfg(feature = "mock")]
173    /// # pub async fn main() -> Result<(), DbErr> {
174    /// #
175    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
176    /// #     .append_query_results([
177    /// #         vec![cake::Model {
178    /// #             id: 1,
179    /// #             name: "Cake".to_owned(),
180    /// #         }],
181    /// #         vec![],
182    /// #     ])
183    /// #     .into_connection();
184    /// # let db = &owned_db;
185    /// #
186    /// use futures::TryStreamExt;
187    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
188    /// let mut cake_stream = cake::Entity::find()
189    ///     .order_by_asc(cake::Column::Id)
190    ///     .paginate(db, 50)
191    ///     .into_stream();
192    ///
193    /// while let Some(cakes) = cake_stream.try_next().await? {
194    ///     // Do something on cakes: Vec<cake::Model>
195    /// }
196    /// #
197    /// # Ok(())
198    /// # }
199    /// ```
200    pub fn into_stream(mut self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
201        Box::pin(stream! {
202            while let Some(vec) = self.fetch_and_next().await? {
203                yield Ok(vec);
204            }
205        })
206    }
207}
208
209#[async_trait::async_trait]
210/// A Trait for any type that can paginate results
211pub trait PaginatorTrait<'db, C>
212where
213    C: ConnectionTrait,
214{
215    /// Select operation
216    type Selector: SelectorTrait + Send + Sync + 'db;
217
218    /// Paginate the result of a select operation.
219    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
220
221    /// Perform a count on the paginated results
222    async fn count(self, db: &'db C) -> Result<u64, DbErr>
223    where
224        Self: Send + Sized,
225    {
226        self.paginate(db, 1).num_items().await
227    }
228}
229
230impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
231where
232    C: ConnectionTrait,
233    S: SelectorTrait + Send + Sync + 'db,
234{
235    type Selector = S;
236
237    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
238        assert!(page_size != 0, "page_size should not be zero");
239        Paginator {
240            query: self.query,
241            page: 0,
242            page_size,
243            db,
244            selector: PhantomData,
245        }
246    }
247}
248
249impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
250where
251    C: ConnectionTrait,
252    S: SelectorTrait + Send + Sync + 'db,
253{
254    type Selector = S;
255    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
256        assert!(page_size != 0, "page_size should not be zero");
257        let sql = self.stmt.sql.trim()[6..].trim();
258        let mut query = SelectStatement::new();
259        query.expr(if let Some(values) = self.stmt.values {
260            Expr::cust_with_values(sql, values.0)
261        } else {
262            Expr::cust(sql)
263        });
264
265        Paginator {
266            query,
267            page: 0,
268            page_size,
269            db,
270            selector: PhantomData,
271        }
272    }
273}
274
275impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
276where
277    C: ConnectionTrait,
278    E: EntityTrait<Model = M>,
279    M: FromQueryResult + Sized + Send + Sync + 'db,
280{
281    type Selector = SelectModel<M>;
282
283    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
284        self.into_model().paginate(db, page_size)
285    }
286}
287
288impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
289where
290    C: ConnectionTrait,
291    E: EntityTrait<Model = M>,
292    F: EntityTrait<Model = N>,
293    M: FromQueryResult + Sized + Send + Sync + 'db,
294    N: FromQueryResult + Sized + Send + Sync + 'db,
295{
296    type Selector = SelectTwoModel<M, N>;
297
298    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
299        self.into_model().paginate(db, page_size)
300    }
301}
302
303impl<'db, C, M, N, O, E, F, G> PaginatorTrait<'db, C> for SelectThree<E, F, G>
304where
305    C: ConnectionTrait,
306    E: EntityTrait<Model = M>,
307    F: EntityTrait<Model = N>,
308    G: EntityTrait<Model = O>,
309    M: FromQueryResult + Sized + Send + Sync + 'db,
310    N: FromQueryResult + Sized + Send + Sync + 'db,
311    O: FromQueryResult + Sized + Send + Sync + 'db,
312{
313    type Selector = SelectThreeModel<M, N, O>;
314
315    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
316        self.into_model().paginate(db, page_size)
317    }
318}
319
320#[cfg(test)]
321#[cfg(feature = "mock")]
322mod tests {
323    use super::*;
324    use crate::entity::prelude::*;
325    use crate::{tests_cfg::*, ConnectionTrait, Statement};
326    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
327    use futures_util::TryStreamExt;
328    use once_cell::sync::Lazy;
329    use pretty_assertions::assert_eq;
330    use sea_query::{Alias, Expr, SelectStatement, Value};
331
332    static RAW_STMT: Lazy<Statement> = Lazy::new(|| {
333        Statement::from_sql_and_values(
334            DbBackend::Postgres,
335            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
336            [],
337        )
338    });
339
340    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
341        let page1 = vec![
342            fruit::Model {
343                id: 1,
344                name: "Blueberry".into(),
345                cake_id: Some(1),
346            },
347            fruit::Model {
348                id: 2,
349                name: "Raspberry".into(),
350                cake_id: Some(1),
351            },
352        ];
353
354        let page2 = vec![fruit::Model {
355            id: 3,
356            name: "Strawberry".into(),
357            cake_id: Some(2),
358        }];
359
360        let page3 = Vec::<fruit::Model>::new();
361
362        let db = MockDatabase::new(DbBackend::Postgres)
363            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
364            .into_connection();
365
366        (db, vec![page1, page2, page3])
367    }
368
369    fn setup_num_items() -> (DatabaseConnection, i64) {
370        let num_items = 3;
371        let db = MockDatabase::new(DbBackend::Postgres)
372            .append_query_results([[maplit::btreemap! {
373                "num_items" => Into::<Value>::into(num_items),
374            }]])
375            .into_connection();
376
377        (db, num_items)
378    }
379
380    #[smol_potat::test]
381    async fn fetch_page() -> Result<(), DbErr> {
382        let (db, pages) = setup();
383
384        let paginator = fruit::Entity::find().paginate(&db, 2);
385
386        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
387        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
388        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
389
390        let mut select = SelectStatement::new()
391            .exprs([
392                Expr::col((fruit::Entity, fruit::Column::Id)),
393                Expr::col((fruit::Entity, fruit::Column::Name)),
394                Expr::col((fruit::Entity, fruit::Column::CakeId)),
395            ])
396            .from(fruit::Entity)
397            .to_owned();
398
399        let query_builder = db.get_database_backend();
400        let stmts = [
401            query_builder.build(select.clone().offset(0).limit(2)),
402            query_builder.build(select.clone().offset(2).limit(2)),
403            query_builder.build(select.offset(4).limit(2)),
404        ];
405
406        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
407        Ok(())
408    }
409
410    #[smol_potat::test]
411    async fn fetch_page_raw() -> Result<(), DbErr> {
412        let (db, pages) = setup();
413
414        let paginator = fruit::Entity::find()
415            .from_raw_sql(RAW_STMT.clone())
416            .paginate(&db, 2);
417
418        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
419        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
420        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
421
422        let mut select = SelectStatement::new()
423            .exprs([
424                Expr::col((fruit::Entity, fruit::Column::Id)),
425                Expr::col((fruit::Entity, fruit::Column::Name)),
426                Expr::col((fruit::Entity, fruit::Column::CakeId)),
427            ])
428            .from(fruit::Entity)
429            .to_owned();
430
431        let query_builder = db.get_database_backend();
432        let stmts = [
433            query_builder.build(select.clone().offset(0).limit(2)),
434            query_builder.build(select.clone().offset(2).limit(2)),
435            query_builder.build(select.offset(4).limit(2)),
436        ];
437
438        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
439        Ok(())
440    }
441
442    #[smol_potat::test]
443    async fn fetch() -> Result<(), DbErr> {
444        let (db, pages) = setup();
445
446        let mut paginator = fruit::Entity::find().paginate(&db, 2);
447
448        assert_eq!(paginator.fetch().await?, pages[0].clone());
449        paginator.next();
450
451        assert_eq!(paginator.fetch().await?, pages[1].clone());
452        paginator.next();
453
454        assert_eq!(paginator.fetch().await?, pages[2].clone());
455
456        let mut select = SelectStatement::new()
457            .exprs([
458                Expr::col((fruit::Entity, fruit::Column::Id)),
459                Expr::col((fruit::Entity, fruit::Column::Name)),
460                Expr::col((fruit::Entity, fruit::Column::CakeId)),
461            ])
462            .from(fruit::Entity)
463            .to_owned();
464
465        let query_builder = db.get_database_backend();
466        let stmts = [
467            query_builder.build(select.clone().offset(0).limit(2)),
468            query_builder.build(select.clone().offset(2).limit(2)),
469            query_builder.build(select.offset(4).limit(2)),
470        ];
471
472        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
473        Ok(())
474    }
475
476    #[smol_potat::test]
477    async fn fetch_raw() -> Result<(), DbErr> {
478        let (db, pages) = setup();
479
480        let mut paginator = fruit::Entity::find()
481            .from_raw_sql(RAW_STMT.clone())
482            .paginate(&db, 2);
483
484        assert_eq!(paginator.fetch().await?, pages[0].clone());
485        paginator.next();
486
487        assert_eq!(paginator.fetch().await?, pages[1].clone());
488        paginator.next();
489
490        assert_eq!(paginator.fetch().await?, pages[2].clone());
491
492        let mut select = SelectStatement::new()
493            .exprs([
494                Expr::col((fruit::Entity, fruit::Column::Id)),
495                Expr::col((fruit::Entity, fruit::Column::Name)),
496                Expr::col((fruit::Entity, fruit::Column::CakeId)),
497            ])
498            .from(fruit::Entity)
499            .to_owned();
500
501        let query_builder = db.get_database_backend();
502        let stmts = [
503            query_builder.build(select.clone().offset(0).limit(2)),
504            query_builder.build(select.clone().offset(2).limit(2)),
505            query_builder.build(select.offset(4).limit(2)),
506        ];
507
508        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
509        Ok(())
510    }
511
512    #[smol_potat::test]
513    async fn num_pages() -> Result<(), DbErr> {
514        let (db, num_items) = setup_num_items();
515
516        let num_items = num_items as u64;
517        let page_size = 2_u64;
518        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
519        let paginator = fruit::Entity::find().paginate(&db, page_size);
520
521        assert_eq!(paginator.num_pages().await?, num_pages);
522
523        let sub_query = SelectStatement::new()
524            .exprs([
525                Expr::col((fruit::Entity, fruit::Column::Id)),
526                Expr::col((fruit::Entity, fruit::Column::Name)),
527                Expr::col((fruit::Entity, fruit::Column::CakeId)),
528            ])
529            .from(fruit::Entity)
530            .to_owned();
531
532        let select = SelectStatement::new()
533            .expr(Expr::cust("COUNT(*) AS num_items"))
534            .from_subquery(sub_query, Alias::new("sub_query"))
535            .to_owned();
536
537        let query_builder = db.get_database_backend();
538        let stmts = [query_builder.build(&select)];
539
540        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
541        Ok(())
542    }
543
544    #[smol_potat::test]
545    async fn num_pages_raw() -> Result<(), DbErr> {
546        let (db, num_items) = setup_num_items();
547
548        let num_items = num_items as u64;
549        let page_size = 2_u64;
550        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
551        let paginator = fruit::Entity::find()
552            .from_raw_sql(RAW_STMT.clone())
553            .paginate(&db, page_size);
554
555        assert_eq!(paginator.num_pages().await?, num_pages);
556
557        let sub_query = SelectStatement::new()
558            .exprs([
559                Expr::col((fruit::Entity, fruit::Column::Id)),
560                Expr::col((fruit::Entity, fruit::Column::Name)),
561                Expr::col((fruit::Entity, fruit::Column::CakeId)),
562            ])
563            .from(fruit::Entity)
564            .to_owned();
565
566        let select = SelectStatement::new()
567            .expr(Expr::cust("COUNT(*) AS num_items"))
568            .from_subquery(sub_query, Alias::new("sub_query"))
569            .to_owned();
570
571        let query_builder = db.get_database_backend();
572        let stmts = [query_builder.build(&select)];
573
574        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
575        Ok(())
576    }
577
578    #[smol_potat::test]
579    async fn next_and_cur_page() -> Result<(), DbErr> {
580        let (db, _) = setup();
581
582        let mut paginator = fruit::Entity::find().paginate(&db, 2);
583
584        assert_eq!(paginator.cur_page(), 0);
585        paginator.next();
586
587        assert_eq!(paginator.cur_page(), 1);
588        paginator.next();
589
590        assert_eq!(paginator.cur_page(), 2);
591        Ok(())
592    }
593
594    #[smol_potat::test]
595    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
596        let (db, _) = setup();
597
598        let mut paginator = fruit::Entity::find()
599            .from_raw_sql(RAW_STMT.clone())
600            .paginate(&db, 2);
601
602        assert_eq!(paginator.cur_page(), 0);
603        paginator.next();
604
605        assert_eq!(paginator.cur_page(), 1);
606        paginator.next();
607
608        assert_eq!(paginator.cur_page(), 2);
609        Ok(())
610    }
611
612    #[smol_potat::test]
613    async fn fetch_and_next() -> Result<(), DbErr> {
614        let (db, pages) = setup();
615
616        let mut paginator = fruit::Entity::find().paginate(&db, 2);
617
618        assert_eq!(paginator.cur_page(), 0);
619        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
620
621        assert_eq!(paginator.cur_page(), 1);
622        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
623
624        assert_eq!(paginator.cur_page(), 2);
625        assert_eq!(paginator.fetch_and_next().await?, None);
626
627        let mut select = SelectStatement::new()
628            .exprs([
629                Expr::col((fruit::Entity, fruit::Column::Id)),
630                Expr::col((fruit::Entity, fruit::Column::Name)),
631                Expr::col((fruit::Entity, fruit::Column::CakeId)),
632            ])
633            .from(fruit::Entity)
634            .to_owned();
635
636        let query_builder = db.get_database_backend();
637        let stmts = [
638            query_builder.build(select.clone().offset(0).limit(2)),
639            query_builder.build(select.clone().offset(2).limit(2)),
640            query_builder.build(select.offset(4).limit(2)),
641        ];
642
643        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
644        Ok(())
645    }
646
647    #[smol_potat::test]
648    async fn fetch_and_next_raw() -> Result<(), DbErr> {
649        let (db, pages) = setup();
650
651        let mut paginator = fruit::Entity::find()
652            .from_raw_sql(RAW_STMT.clone())
653            .paginate(&db, 2);
654
655        assert_eq!(paginator.cur_page(), 0);
656        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
657
658        assert_eq!(paginator.cur_page(), 1);
659        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
660
661        assert_eq!(paginator.cur_page(), 2);
662        assert_eq!(paginator.fetch_and_next().await?, None);
663
664        let mut select = SelectStatement::new()
665            .exprs([
666                Expr::col((fruit::Entity, fruit::Column::Id)),
667                Expr::col((fruit::Entity, fruit::Column::Name)),
668                Expr::col((fruit::Entity, fruit::Column::CakeId)),
669            ])
670            .from(fruit::Entity)
671            .to_owned();
672
673        let query_builder = db.get_database_backend();
674        let stmts = [
675            query_builder.build(select.clone().offset(0).limit(2)),
676            query_builder.build(select.clone().offset(2).limit(2)),
677            query_builder.build(select.offset(4).limit(2)),
678        ];
679
680        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
681        Ok(())
682    }
683
684    #[smol_potat::test]
685    async fn into_stream() -> Result<(), DbErr> {
686        let (db, pages) = setup();
687
688        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
689
690        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
691        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
692        assert_eq!(fruit_stream.try_next().await?, None);
693
694        drop(fruit_stream);
695
696        let mut select = SelectStatement::new()
697            .exprs([
698                Expr::col((fruit::Entity, fruit::Column::Id)),
699                Expr::col((fruit::Entity, fruit::Column::Name)),
700                Expr::col((fruit::Entity, fruit::Column::CakeId)),
701            ])
702            .from(fruit::Entity)
703            .to_owned();
704
705        let query_builder = db.get_database_backend();
706        let stmts = [
707            query_builder.build(select.clone().offset(0).limit(2)),
708            query_builder.build(select.clone().offset(2).limit(2)),
709            query_builder.build(select.offset(4).limit(2)),
710        ];
711
712        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
713        Ok(())
714    }
715
716    #[smol_potat::test]
717    async fn into_stream_raw() -> Result<(), DbErr> {
718        let (db, pages) = setup();
719
720        let mut fruit_stream = fruit::Entity::find()
721            .from_raw_sql(RAW_STMT.clone())
722            .paginate(&db, 2)
723            .into_stream();
724
725        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
726        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
727        assert_eq!(fruit_stream.try_next().await?, None);
728
729        drop(fruit_stream);
730
731        let mut select = SelectStatement::new()
732            .exprs([
733                Expr::col((fruit::Entity, fruit::Column::Id)),
734                Expr::col((fruit::Entity, fruit::Column::Name)),
735                Expr::col((fruit::Entity, fruit::Column::CakeId)),
736            ])
737            .from(fruit::Entity)
738            .to_owned();
739
740        let query_builder = db.get_database_backend();
741        let stmts = [
742            query_builder.build(select.clone().offset(0).limit(2)),
743            query_builder.build(select.clone().offset(2).limit(2)),
744            query_builder.build(select.offset(4).limit(2)),
745        ];
746
747        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
748        Ok(())
749    }
750
751    #[smol_potat::test]
752    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
753        let (db, pages) = setup();
754
755        let raw_stmt = Statement::from_sql_and_values(
756            DbBackend::Postgres,
757            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
758            [],
759        );
760
761        let mut fruit_stream = fruit::Entity::find()
762            .from_raw_sql(raw_stmt.clone())
763            .paginate(&db, 2)
764            .into_stream();
765
766        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
767        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
768        assert_eq!(fruit_stream.try_next().await?, None);
769
770        drop(fruit_stream);
771
772        let mut select = SelectStatement::new()
773            .exprs([
774                Expr::col((fruit::Entity, fruit::Column::Id)),
775                Expr::col((fruit::Entity, fruit::Column::Name)),
776                Expr::col((fruit::Entity, fruit::Column::CakeId)),
777            ])
778            .from(fruit::Entity)
779            .to_owned();
780
781        let query_builder = db.get_database_backend();
782        let stmts = [
783            query_builder.build(select.clone().offset(0).limit(2)),
784            query_builder.build(select.clone().offset(2).limit(2)),
785            query_builder.build(select.offset(4).limit(2)),
786        ];
787
788        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
789        Ok(())
790    }
791
792    #[smol_potat::test]
793    #[should_panic]
794    async fn error() {
795        let (db, _pages) = setup();
796
797        fruit::Entity::find().paginate(&db, 0);
798    }
799}