sea_orm/executor/
paginator.rs

1use crate::{
2    ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel, SelectThree,
3    SelectThreeModel, SelectTwo, SelectTwoModel, Selector, SelectorRaw, SelectorTrait, Topology,
4    error::*,
5};
6use async_stream::stream;
7use futures_util::Stream;
8use sea_query::{Expr, SelectStatement};
9use std::{marker::PhantomData, pin::Pin};
10
11/// Pin a Model so that stream operations can be performed on the model
12pub type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
13
14/// Defined a structure to handle pagination of a result from a query operation on a Model
15#[derive(Clone, Debug)]
16pub struct Paginator<'db, C, S>
17where
18    C: ConnectionTrait,
19    S: SelectorTrait + 'db,
20{
21    pub(crate) query: SelectStatement,
22    pub(crate) page: u64,
23    pub(crate) page_size: u64,
24    pub(crate) db: &'db C,
25    pub(crate) selector: PhantomData<S>,
26}
27
28/// Define a structure containing the numbers of items and pages of a Paginator
29#[derive(Clone, Debug)]
30pub struct ItemsAndPagesNumber {
31    /// The total number of items of a paginator
32    pub number_of_items: u64,
33    /// The total number of pages of a paginator
34    pub number_of_pages: u64,
35}
36
37// LINT: warn if paginator is used without an order by clause
38
39impl<'db, C, S> Paginator<'db, C, S>
40where
41    C: ConnectionTrait,
42    S: SelectorTrait + 'db,
43{
44    /// Fetch a specific page; page index starts from zero
45    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
46        let query = self
47            .query
48            .clone()
49            .limit(self.page_size)
50            .offset(self.page_size * page)
51            .to_owned();
52        let rows = self.db.query_all(&query).await?;
53        let mut buffer = Vec::with_capacity(rows.len());
54        for row in rows.into_iter() {
55            // TODO: Error handling
56            buffer.push(S::from_raw_query_result(row)?);
57        }
58        Ok(buffer)
59    }
60
61    /// Fetch the current page
62    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
63        self.fetch_page(self.page).await
64    }
65
66    /// Get the total number of items
67    pub async fn num_items(&self) -> Result<u64, DbErr> {
68        let db_backend = self.db.get_database_backend();
69        let query = SelectStatement::new()
70            .expr(Expr::cust("COUNT(*) AS num_items"))
71            .from_subquery(
72                self.query
73                    .clone()
74                    .reset_limit()
75                    .reset_offset()
76                    .clear_order_by()
77                    .to_owned(),
78                "sub_query",
79            )
80            .to_owned();
81        let result = match self.db.query_one(&query).await? {
82            Some(res) => res,
83            None => return Ok(0),
84        };
85        let num_items = match db_backend {
86            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
87            _ => result.try_get::<i32>("", "num_items")? as u64,
88        };
89        Ok(num_items)
90    }
91
92    /// Get the total number of pages
93    pub async fn num_pages(&self) -> Result<u64, DbErr> {
94        let num_items = self.num_items().await?;
95        let num_pages = self.compute_pages_number(num_items);
96        Ok(num_pages)
97    }
98
99    /// Get the total number of items and pages
100    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
101        let number_of_items = self.num_items().await?;
102        let number_of_pages = self.compute_pages_number(number_of_items);
103
104        Ok(ItemsAndPagesNumber {
105            number_of_items,
106            number_of_pages,
107        })
108    }
109
110    /// Compute the number of pages for the current page
111    fn compute_pages_number(&self, num_items: u64) -> u64 {
112        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
113    }
114
115    /// Increment the page counter
116    pub fn next(&mut self) {
117        self.page += 1;
118    }
119
120    /// Get current page number
121    pub fn cur_page(&self) -> u64 {
122        self.page
123    }
124
125    /// Fetch one page and increment the page counter
126    ///
127    /// ```
128    /// # use sea_orm::{error::*, tests_cfg::*, *};
129    /// #
130    /// # #[smol_potat::main]
131    /// # #[cfg(feature = "mock")]
132    /// # pub async fn main() -> Result<(), DbErr> {
133    /// #
134    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
135    /// #     .append_query_results([
136    /// #         vec![cake::Model {
137    /// #             id: 1,
138    /// #             name: "Cake".to_owned(),
139    /// #         }],
140    /// #         vec![],
141    /// #     ])
142    /// #     .into_connection();
143    /// # let db = &owned_db;
144    /// #
145    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
146    /// let mut cake_pages = cake::Entity::find()
147    ///     .order_by_asc(cake::Column::Id)
148    ///     .paginate(db, 50);
149    ///
150    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
151    ///     // Do something on cakes: Vec<cake::Model>
152    /// }
153    /// #
154    /// # Ok(())
155    /// # }
156    /// ```
157    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
158        let vec = self.fetch().await?;
159        self.next();
160        let opt = if !vec.is_empty() { Some(vec) } else { None };
161        Ok(opt)
162    }
163
164    /// Convert self into an async stream
165    ///
166    /// ```
167    /// # use sea_orm::{error::*, tests_cfg::*, *};
168    /// #
169    /// # #[smol_potat::main]
170    /// # #[cfg(feature = "mock")]
171    /// # pub async fn main() -> Result<(), DbErr> {
172    /// #
173    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
174    /// #     .append_query_results([
175    /// #         vec![cake::Model {
176    /// #             id: 1,
177    /// #             name: "Cake".to_owned(),
178    /// #         }],
179    /// #         vec![],
180    /// #     ])
181    /// #     .into_connection();
182    /// # let db = &owned_db;
183    /// #
184    /// use futures::TryStreamExt;
185    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
186    /// let mut cake_stream = cake::Entity::find()
187    ///     .order_by_asc(cake::Column::Id)
188    ///     .paginate(db, 50)
189    ///     .into_stream();
190    ///
191    /// while let Some(cakes) = cake_stream.try_next().await? {
192    ///     // Do something on cakes: Vec<cake::Model>
193    /// }
194    /// #
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub fn into_stream(mut self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
199        Box::pin(stream! {
200            while let Some(vec) = self.fetch_and_next().await? {
201                yield Ok(vec);
202            }
203        })
204    }
205}
206
207#[async_trait::async_trait]
208/// A Trait for any type that can paginate results
209pub trait PaginatorTrait<'db, C>
210where
211    C: ConnectionTrait,
212{
213    /// Select operation
214    type Selector: SelectorTrait + Send + Sync + 'db;
215
216    /// Paginate the result of a select operation.
217    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
218
219    /// Perform a count on the paginated results
220    async fn count(self, db: &'db C) -> Result<u64, DbErr>
221    where
222        Self: Send + Sized,
223    {
224        self.paginate(db, 1).num_items().await
225    }
226}
227
228impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
229where
230    C: ConnectionTrait,
231    S: SelectorTrait + Send + Sync + 'db,
232{
233    type Selector = S;
234
235    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
236        assert!(page_size != 0, "page_size should not be zero");
237        Paginator {
238            query: self.query,
239            page: 0,
240            page_size,
241            db,
242            selector: PhantomData,
243        }
244    }
245}
246
247impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
248where
249    C: ConnectionTrait,
250    S: SelectorTrait + Send + Sync + 'db,
251{
252    type Selector = S;
253    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
254        assert!(page_size != 0, "page_size should not be zero");
255        let sql = self.stmt.sql.trim()[6..].trim().to_owned();
256        let mut query = SelectStatement::new();
257        query.expr(if let Some(values) = self.stmt.values {
258            Expr::cust_with_values(sql, values.0)
259        } else {
260            Expr::cust(sql)
261        });
262
263        Paginator {
264            query,
265            page: 0,
266            page_size,
267            db,
268            selector: PhantomData,
269        }
270    }
271}
272
273impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
274where
275    C: ConnectionTrait,
276    E: EntityTrait<Model = M>,
277    M: FromQueryResult + Sized + Send + Sync + 'db,
278{
279    type Selector = SelectModel<M>;
280
281    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
282        self.into_model().paginate(db, page_size)
283    }
284}
285
286impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
287where
288    C: ConnectionTrait,
289    E: EntityTrait<Model = M>,
290    F: EntityTrait<Model = N>,
291    M: FromQueryResult + Sized + Send + Sync + 'db,
292    N: FromQueryResult + Sized + Send + Sync + 'db,
293{
294    type Selector = SelectTwoModel<M, N>;
295
296    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
297        self.into_model().paginate(db, page_size)
298    }
299}
300
301impl<'db, C, M, N, O, E, F, G, TOP> PaginatorTrait<'db, C> for SelectThree<E, F, G, TOP>
302where
303    C: ConnectionTrait,
304    E: EntityTrait<Model = M>,
305    F: EntityTrait<Model = N>,
306    G: EntityTrait<Model = O>,
307    M: FromQueryResult + Sized + Send + Sync + 'db,
308    N: FromQueryResult + Sized + Send + Sync + 'db,
309    O: FromQueryResult + Sized + Send + Sync + 'db,
310    TOP: Topology,
311{
312    type Selector = SelectThreeModel<M, N, O>;
313
314    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
315        self.into_model().paginate(db, page_size)
316    }
317}
318
319#[cfg(test)]
320#[cfg(feature = "mock")]
321mod tests {
322    use super::*;
323    use crate::entity::prelude::*;
324    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
325    use crate::{Statement, tests_cfg::*};
326    use futures_util::TryStreamExt;
327    use pretty_assertions::assert_eq;
328    use sea_query::{Expr, SelectStatement, Value};
329    use std::sync::LazyLock;
330
331    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
332        Statement::from_sql_and_values(
333            DbBackend::Postgres,
334            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
335            [],
336        )
337    });
338
339    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
340        let page1 = vec![
341            fruit::Model {
342                id: 1,
343                name: "Blueberry".into(),
344                cake_id: Some(1),
345            },
346            fruit::Model {
347                id: 2,
348                name: "Raspberry".into(),
349                cake_id: Some(1),
350            },
351        ];
352
353        let page2 = vec![fruit::Model {
354            id: 3,
355            name: "Strawberry".into(),
356            cake_id: Some(2),
357        }];
358
359        let page3 = Vec::<fruit::Model>::new();
360
361        let db = MockDatabase::new(DbBackend::Postgres)
362            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
363            .into_connection();
364
365        (db, vec![page1, page2, page3])
366    }
367
368    fn setup_num_items() -> (DatabaseConnection, i64) {
369        let num_items = 3;
370        let db = MockDatabase::new(DbBackend::Postgres)
371            .append_query_results([[maplit::btreemap! {
372                "num_items" => Into::<Value>::into(num_items),
373            }]])
374            .into_connection();
375
376        (db, num_items)
377    }
378
379    #[smol_potat::test]
380    async fn fetch_page() -> Result<(), DbErr> {
381        let (db, pages) = setup();
382
383        let paginator = fruit::Entity::find().paginate(&db, 2);
384
385        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
386        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
387        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
388
389        let mut select = SelectStatement::new()
390            .exprs([
391                Expr::col((fruit::Entity, fruit::Column::Id)),
392                Expr::col((fruit::Entity, fruit::Column::Name)),
393                Expr::col((fruit::Entity, fruit::Column::CakeId)),
394            ])
395            .from(fruit::Entity)
396            .to_owned();
397
398        let query_builder = db.get_database_backend();
399        let stmts = [
400            query_builder.build(select.clone().offset(0).limit(2)),
401            query_builder.build(select.clone().offset(2).limit(2)),
402            query_builder.build(select.offset(4).limit(2)),
403        ];
404
405        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
406        Ok(())
407    }
408
409    #[smol_potat::test]
410    async fn fetch_page_raw() -> Result<(), DbErr> {
411        let (db, pages) = setup();
412
413        let paginator = fruit::Entity::find()
414            .from_raw_sql(RAW_STMT.clone())
415            .paginate(&db, 2);
416
417        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
418        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
419        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
420
421        let mut select = SelectStatement::new()
422            .exprs([
423                Expr::col((fruit::Entity, fruit::Column::Id)),
424                Expr::col((fruit::Entity, fruit::Column::Name)),
425                Expr::col((fruit::Entity, fruit::Column::CakeId)),
426            ])
427            .from(fruit::Entity)
428            .to_owned();
429
430        let query_builder = db.get_database_backend();
431        let stmts = [
432            query_builder.build(select.clone().offset(0).limit(2)),
433            query_builder.build(select.clone().offset(2).limit(2)),
434            query_builder.build(select.offset(4).limit(2)),
435        ];
436
437        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
438        Ok(())
439    }
440
441    #[smol_potat::test]
442    async fn fetch() -> Result<(), DbErr> {
443        let (db, pages) = setup();
444
445        let mut paginator = fruit::Entity::find().paginate(&db, 2);
446
447        assert_eq!(paginator.fetch().await?, pages[0].clone());
448        paginator.next();
449
450        assert_eq!(paginator.fetch().await?, pages[1].clone());
451        paginator.next();
452
453        assert_eq!(paginator.fetch().await?, pages[2].clone());
454
455        let mut select = SelectStatement::new()
456            .exprs([
457                Expr::col((fruit::Entity, fruit::Column::Id)),
458                Expr::col((fruit::Entity, fruit::Column::Name)),
459                Expr::col((fruit::Entity, fruit::Column::CakeId)),
460            ])
461            .from(fruit::Entity)
462            .to_owned();
463
464        let query_builder = db.get_database_backend();
465        let stmts = [
466            query_builder.build(select.clone().offset(0).limit(2)),
467            query_builder.build(select.clone().offset(2).limit(2)),
468            query_builder.build(select.offset(4).limit(2)),
469        ];
470
471        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
472        Ok(())
473    }
474
475    #[smol_potat::test]
476    async fn fetch_raw() -> Result<(), DbErr> {
477        let (db, pages) = setup();
478
479        let mut paginator = fruit::Entity::find()
480            .from_raw_sql(RAW_STMT.clone())
481            .paginate(&db, 2);
482
483        assert_eq!(paginator.fetch().await?, pages[0].clone());
484        paginator.next();
485
486        assert_eq!(paginator.fetch().await?, pages[1].clone());
487        paginator.next();
488
489        assert_eq!(paginator.fetch().await?, pages[2].clone());
490
491        let mut select = SelectStatement::new()
492            .exprs([
493                Expr::col((fruit::Entity, fruit::Column::Id)),
494                Expr::col((fruit::Entity, fruit::Column::Name)),
495                Expr::col((fruit::Entity, fruit::Column::CakeId)),
496            ])
497            .from(fruit::Entity)
498            .to_owned();
499
500        let query_builder = db.get_database_backend();
501        let stmts = [
502            query_builder.build(select.clone().offset(0).limit(2)),
503            query_builder.build(select.clone().offset(2).limit(2)),
504            query_builder.build(select.offset(4).limit(2)),
505        ];
506
507        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
508        Ok(())
509    }
510
511    #[smol_potat::test]
512    async fn num_pages() -> Result<(), DbErr> {
513        let (db, num_items) = setup_num_items();
514
515        let num_items = num_items as u64;
516        let page_size = 2_u64;
517        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
518        let paginator = fruit::Entity::find().paginate(&db, page_size);
519
520        assert_eq!(paginator.num_pages().await?, num_pages);
521
522        let sub_query = SelectStatement::new()
523            .exprs([
524                Expr::col((fruit::Entity, fruit::Column::Id)),
525                Expr::col((fruit::Entity, fruit::Column::Name)),
526                Expr::col((fruit::Entity, fruit::Column::CakeId)),
527            ])
528            .from(fruit::Entity)
529            .to_owned();
530
531        let select = SelectStatement::new()
532            .expr(Expr::cust("COUNT(*) AS num_items"))
533            .from_subquery(sub_query, "sub_query")
534            .to_owned();
535
536        let query_builder = db.get_database_backend();
537        let stmts = [query_builder.build(&select)];
538
539        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
540        Ok(())
541    }
542
543    #[smol_potat::test]
544    async fn num_pages_raw() -> Result<(), DbErr> {
545        let (db, num_items) = setup_num_items();
546
547        let num_items = num_items as u64;
548        let page_size = 2_u64;
549        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
550        let paginator = fruit::Entity::find()
551            .from_raw_sql(RAW_STMT.clone())
552            .paginate(&db, page_size);
553
554        assert_eq!(paginator.num_pages().await?, num_pages);
555
556        let sub_query = SelectStatement::new()
557            .exprs([
558                Expr::col((fruit::Entity, fruit::Column::Id)),
559                Expr::col((fruit::Entity, fruit::Column::Name)),
560                Expr::col((fruit::Entity, fruit::Column::CakeId)),
561            ])
562            .from(fruit::Entity)
563            .to_owned();
564
565        let select = SelectStatement::new()
566            .expr(Expr::cust("COUNT(*) AS num_items"))
567            .from_subquery(sub_query, "sub_query")
568            .to_owned();
569
570        let query_builder = db.get_database_backend();
571        let stmts = [query_builder.build(&select)];
572
573        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
574        Ok(())
575    }
576
577    #[smol_potat::test]
578    async fn next_and_cur_page() -> Result<(), DbErr> {
579        let (db, _) = setup();
580
581        let mut paginator = fruit::Entity::find().paginate(&db, 2);
582
583        assert_eq!(paginator.cur_page(), 0);
584        paginator.next();
585
586        assert_eq!(paginator.cur_page(), 1);
587        paginator.next();
588
589        assert_eq!(paginator.cur_page(), 2);
590        Ok(())
591    }
592
593    #[smol_potat::test]
594    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
595        let (db, _) = setup();
596
597        let mut paginator = fruit::Entity::find()
598            .from_raw_sql(RAW_STMT.clone())
599            .paginate(&db, 2);
600
601        assert_eq!(paginator.cur_page(), 0);
602        paginator.next();
603
604        assert_eq!(paginator.cur_page(), 1);
605        paginator.next();
606
607        assert_eq!(paginator.cur_page(), 2);
608        Ok(())
609    }
610
611    #[smol_potat::test]
612    async fn fetch_and_next() -> Result<(), DbErr> {
613        let (db, pages) = setup();
614
615        let mut paginator = fruit::Entity::find().paginate(&db, 2);
616
617        assert_eq!(paginator.cur_page(), 0);
618        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
619
620        assert_eq!(paginator.cur_page(), 1);
621        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
622
623        assert_eq!(paginator.cur_page(), 2);
624        assert_eq!(paginator.fetch_and_next().await?, None);
625
626        let mut select = SelectStatement::new()
627            .exprs([
628                Expr::col((fruit::Entity, fruit::Column::Id)),
629                Expr::col((fruit::Entity, fruit::Column::Name)),
630                Expr::col((fruit::Entity, fruit::Column::CakeId)),
631            ])
632            .from(fruit::Entity)
633            .to_owned();
634
635        let query_builder = db.get_database_backend();
636        let stmts = [
637            query_builder.build(select.clone().offset(0).limit(2)),
638            query_builder.build(select.clone().offset(2).limit(2)),
639            query_builder.build(select.offset(4).limit(2)),
640        ];
641
642        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
643        Ok(())
644    }
645
646    #[smol_potat::test]
647    async fn fetch_and_next_raw() -> Result<(), DbErr> {
648        let (db, pages) = setup();
649
650        let mut paginator = fruit::Entity::find()
651            .from_raw_sql(RAW_STMT.clone())
652            .paginate(&db, 2);
653
654        assert_eq!(paginator.cur_page(), 0);
655        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
656
657        assert_eq!(paginator.cur_page(), 1);
658        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
659
660        assert_eq!(paginator.cur_page(), 2);
661        assert_eq!(paginator.fetch_and_next().await?, None);
662
663        let mut select = SelectStatement::new()
664            .exprs([
665                Expr::col((fruit::Entity, fruit::Column::Id)),
666                Expr::col((fruit::Entity, fruit::Column::Name)),
667                Expr::col((fruit::Entity, fruit::Column::CakeId)),
668            ])
669            .from(fruit::Entity)
670            .to_owned();
671
672        let query_builder = db.get_database_backend();
673        let stmts = [
674            query_builder.build(select.clone().offset(0).limit(2)),
675            query_builder.build(select.clone().offset(2).limit(2)),
676            query_builder.build(select.offset(4).limit(2)),
677        ];
678
679        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
680        Ok(())
681    }
682
683    #[smol_potat::test]
684    async fn into_stream() -> Result<(), DbErr> {
685        let (db, pages) = setup();
686
687        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
688
689        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
690        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
691        assert_eq!(fruit_stream.try_next().await?, None);
692
693        drop(fruit_stream);
694
695        let mut select = SelectStatement::new()
696            .exprs([
697                Expr::col((fruit::Entity, fruit::Column::Id)),
698                Expr::col((fruit::Entity, fruit::Column::Name)),
699                Expr::col((fruit::Entity, fruit::Column::CakeId)),
700            ])
701            .from(fruit::Entity)
702            .to_owned();
703
704        let query_builder = db.get_database_backend();
705        let stmts = [
706            query_builder.build(select.clone().offset(0).limit(2)),
707            query_builder.build(select.clone().offset(2).limit(2)),
708            query_builder.build(select.offset(4).limit(2)),
709        ];
710
711        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
712        Ok(())
713    }
714
715    #[smol_potat::test]
716    async fn into_stream_raw() -> Result<(), DbErr> {
717        let (db, pages) = setup();
718
719        let mut fruit_stream = fruit::Entity::find()
720            .from_raw_sql(RAW_STMT.clone())
721            .paginate(&db, 2)
722            .into_stream();
723
724        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
725        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
726        assert_eq!(fruit_stream.try_next().await?, None);
727
728        drop(fruit_stream);
729
730        let mut select = SelectStatement::new()
731            .exprs([
732                Expr::col((fruit::Entity, fruit::Column::Id)),
733                Expr::col((fruit::Entity, fruit::Column::Name)),
734                Expr::col((fruit::Entity, fruit::Column::CakeId)),
735            ])
736            .from(fruit::Entity)
737            .to_owned();
738
739        let query_builder = db.get_database_backend();
740        let stmts = [
741            query_builder.build(select.clone().offset(0).limit(2)),
742            query_builder.build(select.clone().offset(2).limit(2)),
743            query_builder.build(select.offset(4).limit(2)),
744        ];
745
746        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
747        Ok(())
748    }
749
750    #[smol_potat::test]
751    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
752        let (db, pages) = setup();
753
754        let raw_stmt = Statement::from_sql_and_values(
755            DbBackend::Postgres,
756            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
757            [],
758        );
759
760        let mut fruit_stream = fruit::Entity::find()
761            .from_raw_sql(raw_stmt.clone())
762            .paginate(&db, 2)
763            .into_stream();
764
765        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
766        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
767        assert_eq!(fruit_stream.try_next().await?, None);
768
769        drop(fruit_stream);
770
771        let mut select = SelectStatement::new()
772            .exprs([
773                Expr::col((fruit::Entity, fruit::Column::Id)),
774                Expr::col((fruit::Entity, fruit::Column::Name)),
775                Expr::col((fruit::Entity, fruit::Column::CakeId)),
776            ])
777            .from(fruit::Entity)
778            .to_owned();
779
780        let query_builder = db.get_database_backend();
781        let stmts = [
782            query_builder.build(select.clone().offset(0).limit(2)),
783            query_builder.build(select.clone().offset(2).limit(2)),
784            query_builder.build(select.offset(4).limit(2)),
785        ];
786
787        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
788        Ok(())
789    }
790
791    #[smol_potat::test]
792    #[should_panic]
793    async fn error() {
794        let (db, _pages) = setup();
795
796        fruit::Entity::find().paginate(&db, 0);
797    }
798}