Skip to main content

sea_orm/executor/
paginator.rs

1use crate::{
2    error::*, ConnectionTrait, DbBackend, EntityTrait, FromQueryResult, Select, SelectModel,
3    SelectThree, SelectThreeModel, SelectTwo, SelectTwoModel, Selector, SelectorRaw, SelectorTrait,
4};
5use async_stream::stream;
6use futures_util::Stream;
7use sea_query::{Expr, SelectStatement};
8use std::{marker::PhantomData, pin::Pin};
9
10/// Pin a Model so that stream operations can be performed on the model
11pub type PinBoxStream<'db, Item> = Pin<Box<dyn Stream<Item = Item> + 'db>>;
12
13/// Defined a structure to handle pagination of a result from a query operation on a Model
14#[derive(Clone, Debug)]
15pub struct Paginator<'db, C, S>
16where
17    C: ConnectionTrait,
18    S: SelectorTrait + 'db,
19{
20    pub(crate) query: SelectStatement,
21    pub(crate) page: u64,
22    pub(crate) page_size: u64,
23    pub(crate) db: &'db C,
24    pub(crate) selector: PhantomData<S>,
25}
26
27/// Define a structure containing the numbers of items and pages of a Paginator
28#[derive(Clone, Debug)]
29pub struct ItemsAndPagesNumber {
30    /// The total number of items of a paginator
31    pub number_of_items: u64,
32    /// The total number of pages of a paginator
33    pub number_of_pages: u64,
34}
35
36// LINT: warn if paginator is used without an order by clause
37
38impl<'db, C, S> Paginator<'db, C, S>
39where
40    C: ConnectionTrait,
41    S: SelectorTrait + 'db,
42{
43    /// Fetch a specific page; page index starts from zero
44    pub async fn fetch_page(&self, page: u64) -> Result<Vec<S::Item>, DbErr> {
45        let query = self
46            .query
47            .clone()
48            .limit(self.page_size)
49            .offset(self.page_size * page)
50            .to_owned();
51        let builder = self.db.get_database_backend();
52        let stmt = builder.build(&query);
53        let rows = self.db.query_all(stmt).await?;
54        let mut buffer = Vec::with_capacity(rows.len());
55        for row in rows.into_iter() {
56            // TODO: Error handling
57            buffer.push(S::from_raw_query_result(row)?);
58        }
59        Ok(buffer)
60    }
61
62    /// Fetch the current page
63    pub async fn fetch(&self) -> Result<Vec<S::Item>, DbErr> {
64        self.fetch_page(self.page).await
65    }
66
67    /// Get the total number of items
68    pub async fn num_items(&self) -> Result<u64, DbErr> {
69        let builder = self.db.get_database_backend();
70        let stmt = SelectStatement::new()
71            .expr(Expr::cust("COUNT(*) AS num_items"))
72            .from_subquery(
73                self.query
74                    .clone()
75                    .reset_limit()
76                    .reset_offset()
77                    .clear_order_by()
78                    .to_owned(),
79                "sub_query",
80            )
81            .to_owned();
82        let stmt = builder.build(&stmt);
83        let result = match self.db.query_one(stmt).await? {
84            Some(res) => res,
85            None => return Ok(0),
86        };
87        let num_items = match builder {
88            DbBackend::Postgres => result.try_get::<i64>("", "num_items")? as u64,
89            _ => result.try_get::<i32>("", "num_items")? as u64,
90        };
91        Ok(num_items)
92    }
93
94    /// Get the total number of pages
95    pub async fn num_pages(&self) -> Result<u64, DbErr> {
96        let num_items = self.num_items().await?;
97        let num_pages = self.compute_pages_number(num_items);
98        Ok(num_pages)
99    }
100
101    /// Get the total number of items and pages
102    pub async fn num_items_and_pages(&self) -> Result<ItemsAndPagesNumber, DbErr> {
103        let number_of_items = self.num_items().await?;
104        let number_of_pages = self.compute_pages_number(number_of_items);
105
106        Ok(ItemsAndPagesNumber {
107            number_of_items,
108            number_of_pages,
109        })
110    }
111
112    /// Compute the number of pages for the current page
113    fn compute_pages_number(&self, num_items: u64) -> u64 {
114        (num_items / self.page_size) + (num_items % self.page_size > 0) as u64
115    }
116
117    /// Increment the page counter
118    pub fn next(&mut self) {
119        self.page += 1;
120    }
121
122    /// Get current page number
123    pub fn cur_page(&self) -> u64 {
124        self.page
125    }
126
127    /// Fetch one page and increment the page counter
128    ///
129    /// ```
130    /// # use sea_orm::{error::*, tests_cfg::*, *};
131    /// #
132    /// # #[smol_potat::main]
133    /// # #[cfg(feature = "mock")]
134    /// # pub async fn main() -> Result<(), DbErr> {
135    /// #
136    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
137    /// #     .append_query_results([
138    /// #         vec![cake::Model {
139    /// #             id: 1,
140    /// #             name: "Cake".to_owned(),
141    /// #         }],
142    /// #         vec![],
143    /// #     ])
144    /// #     .into_connection();
145    /// # let db = &owned_db;
146    /// #
147    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
148    /// let mut cake_pages = cake::Entity::find()
149    ///     .order_by_asc(cake::Column::Id)
150    ///     .paginate(db, 50);
151    ///
152    /// while let Some(cakes) = cake_pages.fetch_and_next().await? {
153    ///     // Do something on cakes: Vec<cake::Model>
154    /// }
155    /// #
156    /// # Ok(())
157    /// # }
158    /// ```
159    pub async fn fetch_and_next(&mut self) -> Result<Option<Vec<S::Item>>, DbErr> {
160        let vec = self.fetch().await?;
161        self.next();
162        let opt = if !vec.is_empty() { Some(vec) } else { None };
163        Ok(opt)
164    }
165
166    /// Convert self into an async stream
167    ///
168    /// ```
169    /// # use sea_orm::{error::*, tests_cfg::*, *};
170    /// #
171    /// # #[smol_potat::main]
172    /// # #[cfg(feature = "mock")]
173    /// # pub async fn main() -> Result<(), DbErr> {
174    /// #
175    /// # let owned_db = MockDatabase::new(DbBackend::Postgres)
176    /// #     .append_query_results([
177    /// #         vec![cake::Model {
178    /// #             id: 1,
179    /// #             name: "Cake".to_owned(),
180    /// #         }],
181    /// #         vec![],
182    /// #     ])
183    /// #     .into_connection();
184    /// # let db = &owned_db;
185    /// #
186    /// use futures::TryStreamExt;
187    /// use sea_orm::{entity::*, query::*, tests_cfg::cake};
188    /// let mut cake_stream = cake::Entity::find()
189    ///     .order_by_asc(cake::Column::Id)
190    ///     .paginate(db, 50)
191    ///     .into_stream();
192    ///
193    /// while let Some(cakes) = cake_stream.try_next().await? {
194    ///     // Do something on cakes: Vec<cake::Model>
195    /// }
196    /// #
197    /// # Ok(())
198    /// # }
199    /// ```
200    pub fn into_stream(mut self) -> PinBoxStream<'db, Result<Vec<S::Item>, DbErr>> {
201        Box::pin(stream! {
202            while let Some(vec) = self.fetch_and_next().await? {
203                yield Ok(vec);
204            }
205        })
206    }
207}
208
209#[async_trait::async_trait]
210/// A Trait for any type that can paginate results
211pub trait PaginatorTrait<'db, C>
212where
213    C: ConnectionTrait,
214{
215    /// Select operation
216    type Selector: SelectorTrait + Send + Sync + 'db;
217
218    /// Paginate the result of a select operation.
219    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector>;
220
221    /// Perform a count on the paginated results
222    async fn count(self, db: &'db C) -> Result<u64, DbErr>
223    where
224        Self: Send + Sized,
225    {
226        self.paginate(db, 1).num_items().await
227    }
228
229    /// Check if any records exist
230    async fn exists(self, db: &'db C) -> Result<bool, DbErr>
231    where
232        Self: Send + Sized,
233    {
234        let paginator = self.paginate(db, 1);
235        let stmt = SelectStatement::new()
236            .expr(Expr::cust("1"))
237            .from_subquery(
238                paginator
239                    .query
240                    .clone()
241                    .reset_limit()
242                    .reset_offset()
243                    .clear_order_by()
244                    .limit(1)
245                    .to_owned(),
246                "sub_query",
247            )
248            .limit(1)
249            .to_owned();
250        let stmt = db.get_database_backend().build(&stmt);
251        let result = db.query_one(stmt).await?;
252        Ok(result.is_some())
253    }
254}
255
256impl<'db, C, S> PaginatorTrait<'db, C> for Selector<S>
257where
258    C: ConnectionTrait,
259    S: SelectorTrait + Send + Sync + 'db,
260{
261    type Selector = S;
262
263    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
264        assert!(page_size != 0, "page_size should not be zero");
265        Paginator {
266            query: self.query,
267            page: 0,
268            page_size,
269            db,
270            selector: PhantomData,
271        }
272    }
273}
274
275impl<'db, C, S> PaginatorTrait<'db, C> for SelectorRaw<S>
276where
277    C: ConnectionTrait,
278    S: SelectorTrait + Send + Sync + 'db,
279{
280    type Selector = S;
281    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, S> {
282        assert!(page_size != 0, "page_size should not be zero");
283        let sql = self.stmt.sql.trim()[6..].trim();
284        let mut query = SelectStatement::new();
285        query.expr(if let Some(values) = self.stmt.values {
286            Expr::cust_with_values(sql, values.0)
287        } else {
288            Expr::cust(sql)
289        });
290
291        Paginator {
292            query,
293            page: 0,
294            page_size,
295            db,
296            selector: PhantomData,
297        }
298    }
299}
300
301impl<'db, C, M, E> PaginatorTrait<'db, C> for Select<E>
302where
303    C: ConnectionTrait,
304    E: EntityTrait<Model = M>,
305    M: FromQueryResult + Sized + Send + Sync + 'db,
306{
307    type Selector = SelectModel<M>;
308
309    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
310        self.into_model().paginate(db, page_size)
311    }
312}
313
314impl<'db, C, M, N, E, F> PaginatorTrait<'db, C> for SelectTwo<E, F>
315where
316    C: ConnectionTrait,
317    E: EntityTrait<Model = M>,
318    F: EntityTrait<Model = N>,
319    M: FromQueryResult + Sized + Send + Sync + 'db,
320    N: FromQueryResult + Sized + Send + Sync + 'db,
321{
322    type Selector = SelectTwoModel<M, N>;
323
324    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
325        self.into_model().paginate(db, page_size)
326    }
327}
328
329impl<'db, C, M, N, O, E, F, G> PaginatorTrait<'db, C> for SelectThree<E, F, G>
330where
331    C: ConnectionTrait,
332    E: EntityTrait<Model = M>,
333    F: EntityTrait<Model = N>,
334    G: EntityTrait<Model = O>,
335    M: FromQueryResult + Sized + Send + Sync + 'db,
336    N: FromQueryResult + Sized + Send + Sync + 'db,
337    O: FromQueryResult + Sized + Send + Sync + 'db,
338{
339    type Selector = SelectThreeModel<M, N, O>;
340
341    fn paginate(self, db: &'db C, page_size: u64) -> Paginator<'db, C, Self::Selector> {
342        self.into_model().paginate(db, page_size)
343    }
344}
345
346#[cfg(test)]
347#[cfg(feature = "mock")]
348mod tests {
349    use super::*;
350    use crate::entity::prelude::*;
351    use crate::{tests_cfg::*, ConnectionTrait, Statement};
352    use crate::{DatabaseConnection, DbBackend, MockDatabase, Transaction};
353    use futures_util::TryStreamExt;
354    use pretty_assertions::assert_eq;
355    use sea_query::{Expr, SelectStatement, Value};
356    use std::sync::LazyLock;
357
358    static RAW_STMT: LazyLock<Statement> = LazyLock::new(|| {
359        Statement::from_sql_and_values(
360            DbBackend::Postgres,
361            r#"SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit""#,
362            [],
363        )
364    });
365
366    fn setup() -> (DatabaseConnection, Vec<Vec<fruit::Model>>) {
367        let page1 = vec![
368            fruit::Model {
369                id: 1,
370                name: "Blueberry".into(),
371                cake_id: Some(1),
372            },
373            fruit::Model {
374                id: 2,
375                name: "Raspberry".into(),
376                cake_id: Some(1),
377            },
378        ];
379
380        let page2 = vec![fruit::Model {
381            id: 3,
382            name: "Strawberry".into(),
383            cake_id: Some(2),
384        }];
385
386        let page3 = Vec::<fruit::Model>::new();
387
388        let db = MockDatabase::new(DbBackend::Postgres)
389            .append_query_results([page1.clone(), page2.clone(), page3.clone()])
390            .into_connection();
391
392        (db, vec![page1, page2, page3])
393    }
394
395    fn setup_num_items() -> (DatabaseConnection, i64) {
396        let num_items = 3;
397        let db = MockDatabase::new(DbBackend::Postgres)
398            .append_query_results([[maplit::btreemap! {
399                "num_items" => Into::<Value>::into(num_items),
400            }]])
401            .into_connection();
402
403        (db, num_items)
404    }
405
406    #[smol_potat::test]
407    async fn fetch_page() -> Result<(), DbErr> {
408        let (db, pages) = setup();
409
410        let paginator = fruit::Entity::find().paginate(&db, 2);
411
412        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
413        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
414        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
415
416        let mut select = SelectStatement::new()
417            .exprs([
418                Expr::col((fruit::Entity, fruit::Column::Id)),
419                Expr::col((fruit::Entity, fruit::Column::Name)),
420                Expr::col((fruit::Entity, fruit::Column::CakeId)),
421            ])
422            .from(fruit::Entity)
423            .to_owned();
424
425        let query_builder = db.get_database_backend();
426        let stmts = [
427            query_builder.build(select.clone().offset(0).limit(2)),
428            query_builder.build(select.clone().offset(2).limit(2)),
429            query_builder.build(select.offset(4).limit(2)),
430        ];
431
432        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
433        Ok(())
434    }
435
436    #[smol_potat::test]
437    async fn fetch_page_raw() -> Result<(), DbErr> {
438        let (db, pages) = setup();
439
440        let paginator = fruit::Entity::find()
441            .from_raw_sql(RAW_STMT.clone())
442            .paginate(&db, 2);
443
444        assert_eq!(paginator.fetch_page(0).await?, pages[0].clone());
445        assert_eq!(paginator.fetch_page(1).await?, pages[1].clone());
446        assert_eq!(paginator.fetch_page(2).await?, pages[2].clone());
447
448        let mut select = SelectStatement::new()
449            .exprs([
450                Expr::col((fruit::Entity, fruit::Column::Id)),
451                Expr::col((fruit::Entity, fruit::Column::Name)),
452                Expr::col((fruit::Entity, fruit::Column::CakeId)),
453            ])
454            .from(fruit::Entity)
455            .to_owned();
456
457        let query_builder = db.get_database_backend();
458        let stmts = [
459            query_builder.build(select.clone().offset(0).limit(2)),
460            query_builder.build(select.clone().offset(2).limit(2)),
461            query_builder.build(select.offset(4).limit(2)),
462        ];
463
464        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
465        Ok(())
466    }
467
468    #[smol_potat::test]
469    async fn fetch() -> Result<(), DbErr> {
470        let (db, pages) = setup();
471
472        let mut paginator = fruit::Entity::find().paginate(&db, 2);
473
474        assert_eq!(paginator.fetch().await?, pages[0].clone());
475        paginator.next();
476
477        assert_eq!(paginator.fetch().await?, pages[1].clone());
478        paginator.next();
479
480        assert_eq!(paginator.fetch().await?, pages[2].clone());
481
482        let mut select = SelectStatement::new()
483            .exprs([
484                Expr::col((fruit::Entity, fruit::Column::Id)),
485                Expr::col((fruit::Entity, fruit::Column::Name)),
486                Expr::col((fruit::Entity, fruit::Column::CakeId)),
487            ])
488            .from(fruit::Entity)
489            .to_owned();
490
491        let query_builder = db.get_database_backend();
492        let stmts = [
493            query_builder.build(select.clone().offset(0).limit(2)),
494            query_builder.build(select.clone().offset(2).limit(2)),
495            query_builder.build(select.offset(4).limit(2)),
496        ];
497
498        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
499        Ok(())
500    }
501
502    #[smol_potat::test]
503    async fn fetch_raw() -> Result<(), DbErr> {
504        let (db, pages) = setup();
505
506        let mut paginator = fruit::Entity::find()
507            .from_raw_sql(RAW_STMT.clone())
508            .paginate(&db, 2);
509
510        assert_eq!(paginator.fetch().await?, pages[0].clone());
511        paginator.next();
512
513        assert_eq!(paginator.fetch().await?, pages[1].clone());
514        paginator.next();
515
516        assert_eq!(paginator.fetch().await?, pages[2].clone());
517
518        let mut select = SelectStatement::new()
519            .exprs([
520                Expr::col((fruit::Entity, fruit::Column::Id)),
521                Expr::col((fruit::Entity, fruit::Column::Name)),
522                Expr::col((fruit::Entity, fruit::Column::CakeId)),
523            ])
524            .from(fruit::Entity)
525            .to_owned();
526
527        let query_builder = db.get_database_backend();
528        let stmts = [
529            query_builder.build(select.clone().offset(0).limit(2)),
530            query_builder.build(select.clone().offset(2).limit(2)),
531            query_builder.build(select.offset(4).limit(2)),
532        ];
533
534        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
535        Ok(())
536    }
537
538    #[smol_potat::test]
539    async fn num_pages() -> Result<(), DbErr> {
540        let (db, num_items) = setup_num_items();
541
542        let num_items = num_items as u64;
543        let page_size = 2_u64;
544        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
545        let paginator = fruit::Entity::find().paginate(&db, page_size);
546
547        assert_eq!(paginator.num_pages().await?, num_pages);
548
549        let sub_query = SelectStatement::new()
550            .exprs([
551                Expr::col((fruit::Entity, fruit::Column::Id)),
552                Expr::col((fruit::Entity, fruit::Column::Name)),
553                Expr::col((fruit::Entity, fruit::Column::CakeId)),
554            ])
555            .from(fruit::Entity)
556            .to_owned();
557
558        let select = SelectStatement::new()
559            .expr(Expr::cust("COUNT(*) AS num_items"))
560            .from_subquery(sub_query, "sub_query")
561            .to_owned();
562
563        let query_builder = db.get_database_backend();
564        let stmts = [query_builder.build(&select)];
565
566        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
567        Ok(())
568    }
569
570    #[smol_potat::test]
571    async fn num_pages_raw() -> Result<(), DbErr> {
572        let (db, num_items) = setup_num_items();
573
574        let num_items = num_items as u64;
575        let page_size = 2_u64;
576        let num_pages = (num_items / page_size) + (num_items % page_size > 0) as u64;
577        let paginator = fruit::Entity::find()
578            .from_raw_sql(RAW_STMT.clone())
579            .paginate(&db, page_size);
580
581        assert_eq!(paginator.num_pages().await?, num_pages);
582
583        let sub_query = SelectStatement::new()
584            .exprs([
585                Expr::col((fruit::Entity, fruit::Column::Id)),
586                Expr::col((fruit::Entity, fruit::Column::Name)),
587                Expr::col((fruit::Entity, fruit::Column::CakeId)),
588            ])
589            .from(fruit::Entity)
590            .to_owned();
591
592        let select = SelectStatement::new()
593            .expr(Expr::cust("COUNT(*) AS num_items"))
594            .from_subquery(sub_query, "sub_query")
595            .to_owned();
596
597        let query_builder = db.get_database_backend();
598        let stmts = [query_builder.build(&select)];
599
600        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
601        Ok(())
602    }
603
604    #[smol_potat::test]
605    async fn next_and_cur_page() -> Result<(), DbErr> {
606        let (db, _) = setup();
607
608        let mut paginator = fruit::Entity::find().paginate(&db, 2);
609
610        assert_eq!(paginator.cur_page(), 0);
611        paginator.next();
612
613        assert_eq!(paginator.cur_page(), 1);
614        paginator.next();
615
616        assert_eq!(paginator.cur_page(), 2);
617        Ok(())
618    }
619
620    #[smol_potat::test]
621    async fn next_and_cur_page_raw() -> Result<(), DbErr> {
622        let (db, _) = setup();
623
624        let mut paginator = fruit::Entity::find()
625            .from_raw_sql(RAW_STMT.clone())
626            .paginate(&db, 2);
627
628        assert_eq!(paginator.cur_page(), 0);
629        paginator.next();
630
631        assert_eq!(paginator.cur_page(), 1);
632        paginator.next();
633
634        assert_eq!(paginator.cur_page(), 2);
635        Ok(())
636    }
637
638    #[smol_potat::test]
639    async fn fetch_and_next() -> Result<(), DbErr> {
640        let (db, pages) = setup();
641
642        let mut paginator = fruit::Entity::find().paginate(&db, 2);
643
644        assert_eq!(paginator.cur_page(), 0);
645        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
646
647        assert_eq!(paginator.cur_page(), 1);
648        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
649
650        assert_eq!(paginator.cur_page(), 2);
651        assert_eq!(paginator.fetch_and_next().await?, None);
652
653        let mut select = SelectStatement::new()
654            .exprs([
655                Expr::col((fruit::Entity, fruit::Column::Id)),
656                Expr::col((fruit::Entity, fruit::Column::Name)),
657                Expr::col((fruit::Entity, fruit::Column::CakeId)),
658            ])
659            .from(fruit::Entity)
660            .to_owned();
661
662        let query_builder = db.get_database_backend();
663        let stmts = [
664            query_builder.build(select.clone().offset(0).limit(2)),
665            query_builder.build(select.clone().offset(2).limit(2)),
666            query_builder.build(select.offset(4).limit(2)),
667        ];
668
669        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
670        Ok(())
671    }
672
673    #[smol_potat::test]
674    async fn fetch_and_next_raw() -> Result<(), DbErr> {
675        let (db, pages) = setup();
676
677        let mut paginator = fruit::Entity::find()
678            .from_raw_sql(RAW_STMT.clone())
679            .paginate(&db, 2);
680
681        assert_eq!(paginator.cur_page(), 0);
682        assert_eq!(paginator.fetch_and_next().await?, Some(pages[0].clone()));
683
684        assert_eq!(paginator.cur_page(), 1);
685        assert_eq!(paginator.fetch_and_next().await?, Some(pages[1].clone()));
686
687        assert_eq!(paginator.cur_page(), 2);
688        assert_eq!(paginator.fetch_and_next().await?, None);
689
690        let mut select = SelectStatement::new()
691            .exprs([
692                Expr::col((fruit::Entity, fruit::Column::Id)),
693                Expr::col((fruit::Entity, fruit::Column::Name)),
694                Expr::col((fruit::Entity, fruit::Column::CakeId)),
695            ])
696            .from(fruit::Entity)
697            .to_owned();
698
699        let query_builder = db.get_database_backend();
700        let stmts = [
701            query_builder.build(select.clone().offset(0).limit(2)),
702            query_builder.build(select.clone().offset(2).limit(2)),
703            query_builder.build(select.offset(4).limit(2)),
704        ];
705
706        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
707        Ok(())
708    }
709
710    #[smol_potat::test]
711    async fn into_stream() -> Result<(), DbErr> {
712        let (db, pages) = setup();
713
714        let mut fruit_stream = fruit::Entity::find().paginate(&db, 2).into_stream();
715
716        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
717        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
718        assert_eq!(fruit_stream.try_next().await?, None);
719
720        drop(fruit_stream);
721
722        let mut select = SelectStatement::new()
723            .exprs([
724                Expr::col((fruit::Entity, fruit::Column::Id)),
725                Expr::col((fruit::Entity, fruit::Column::Name)),
726                Expr::col((fruit::Entity, fruit::Column::CakeId)),
727            ])
728            .from(fruit::Entity)
729            .to_owned();
730
731        let query_builder = db.get_database_backend();
732        let stmts = [
733            query_builder.build(select.clone().offset(0).limit(2)),
734            query_builder.build(select.clone().offset(2).limit(2)),
735            query_builder.build(select.offset(4).limit(2)),
736        ];
737
738        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
739        Ok(())
740    }
741
742    #[smol_potat::test]
743    async fn into_stream_raw() -> Result<(), DbErr> {
744        let (db, pages) = setup();
745
746        let mut fruit_stream = fruit::Entity::find()
747            .from_raw_sql(RAW_STMT.clone())
748            .paginate(&db, 2)
749            .into_stream();
750
751        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
752        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
753        assert_eq!(fruit_stream.try_next().await?, None);
754
755        drop(fruit_stream);
756
757        let mut select = SelectStatement::new()
758            .exprs([
759                Expr::col((fruit::Entity, fruit::Column::Id)),
760                Expr::col((fruit::Entity, fruit::Column::Name)),
761                Expr::col((fruit::Entity, fruit::Column::CakeId)),
762            ])
763            .from(fruit::Entity)
764            .to_owned();
765
766        let query_builder = db.get_database_backend();
767        let stmts = [
768            query_builder.build(select.clone().offset(0).limit(2)),
769            query_builder.build(select.clone().offset(2).limit(2)),
770            query_builder.build(select.offset(4).limit(2)),
771        ];
772
773        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
774        Ok(())
775    }
776
777    #[smol_potat::test]
778    async fn into_stream_raw_leading_spaces() -> Result<(), DbErr> {
779        let (db, pages) = setup();
780
781        let raw_stmt = Statement::from_sql_and_values(
782            DbBackend::Postgres,
783            r#"  SELECT "fruit"."id", "fruit"."name", "fruit"."cake_id" FROM "fruit"  "#,
784            [],
785        );
786
787        let mut fruit_stream = fruit::Entity::find()
788            .from_raw_sql(raw_stmt.clone())
789            .paginate(&db, 2)
790            .into_stream();
791
792        assert_eq!(fruit_stream.try_next().await?, Some(pages[0].clone()));
793        assert_eq!(fruit_stream.try_next().await?, Some(pages[1].clone()));
794        assert_eq!(fruit_stream.try_next().await?, None);
795
796        drop(fruit_stream);
797
798        let mut select = SelectStatement::new()
799            .exprs([
800                Expr::col((fruit::Entity, fruit::Column::Id)),
801                Expr::col((fruit::Entity, fruit::Column::Name)),
802                Expr::col((fruit::Entity, fruit::Column::CakeId)),
803            ])
804            .from(fruit::Entity)
805            .to_owned();
806
807        let query_builder = db.get_database_backend();
808        let stmts = [
809            query_builder.build(select.clone().offset(0).limit(2)),
810            query_builder.build(select.clone().offset(2).limit(2)),
811            query_builder.build(select.offset(4).limit(2)),
812        ];
813
814        assert_eq!(db.into_transaction_log(), Transaction::wrap(stmts));
815        Ok(())
816    }
817
818    #[smol_potat::test]
819    #[should_panic]
820    async fn error() {
821        let (db, _pages) = setup();
822
823        fruit::Entity::find().paginate(&db, 0);
824    }
825}