lsor_core/
exec.rs

1use async_graphql::{
2    connection::{Connection, Edge, PageInfo},
3    OutputType,
4};
5use sqlx::{postgres::PgRow, Executor, FromRow, Postgres};
6
7use crate::{
8    cursor::Cursor,
9    driver::{Driver, PushPrql},
10    from::from,
11    page::{select_page_info, select_page_items, Pagination, TotalCount},
12    row::{upsert, Row},
13    table::Table,
14    Sorting,
15};
16
17pub async fn save_one<'c, E, R>(executor: E, row: R) -> sqlx::Result<()>
18where
19    E: Executor<'c, Database = Postgres>,
20    R: Row + Table,
21{
22    let mut driver = Driver::new();
23    upsert(row).push_to_driver(&mut driver);
24    driver.execute_without_compilation(executor).await?;
25    Ok(())
26}
27
28pub async fn load_one<'c, E, F, R>(executor: E, filter: F) -> sqlx::Result<Option<R>>
29where
30    E: Executor<'c, Database = Postgres>,
31    F: PushPrql,
32    for<'r> R: FromRow<'r, PgRow> + Table,
33{
34    let mut driver = Driver::new();
35
36    from(R::table_name())
37        .filter(filter)
38        .take(1)
39        .push_to_driver(&mut driver);
40
41    driver
42        .fetch_optional(executor)
43        .await
44        .and_then(|row| row.as_ref().map(R::from_row).transpose())
45}
46
47pub async fn load_page<'c, E, F, S, R>(
48    executor: E,
49    filter: F,
50    sort: S,
51    pagination: Pagination,
52) -> sqlx::Result<Connection<String, R, TotalCount>>
53where
54    E: Copy + Executor<'c, Database = Postgres>,
55    F: PushPrql,
56    S: PushPrql + Sorting,
57    for<'r> R: FromRow<'r, PgRow> + OutputType + Table,
58{
59    use sqlx::Row;
60
61    let cursor = pagination.cursor;
62    let subquery = from(R::table_name()).filter(filter);
63    let subquery = subquery.sort(sort);
64
65    let mut driver = Driver::new();
66    select_page_items(&subquery, pagination).push_to_driver(&mut driver);
67
68    let rows = driver.fetch_all(executor).await?;
69    let edges = rows
70        .into_iter()
71        .map(|row| {
72            Ok(Edge::new(
73                Cursor::infer(row.try_get_raw("cursor")?)?,
74                R::from_row(&row)?,
75            ))
76        })
77        .collect::<sqlx::Result<Vec<_>>>()?;
78
79    let start = edges
80        .first()
81        .map(|edge| edge.cursor.clone())
82        .unwrap_or(Cursor::encode(&cursor.min()));
83    let end = edges
84        .last()
85        .map(|edge| edge.cursor.clone())
86        .unwrap_or(Cursor::encode(&cursor.max()));
87
88    let mut driver = Driver::new();
89    select_page_info(subquery, cursor, start.clone(), end.clone()).push_to_driver(&mut driver);
90    let row = driver.fetch_one(executor).await?;
91    let page_info = PageInfo {
92        has_next_page: row.try_get("has_next_page")?,
93        has_previous_page: row.try_get("has_prev_page")?,
94        start_cursor: Some(start),
95        end_cursor: Some(end),
96    };
97    let total_count = TotalCount {
98        total_count: row.try_get("total_count")?,
99    };
100
101    let mut conn = Connection::with_additional_fields(
102        page_info.has_previous_page,
103        page_info.has_next_page,
104        total_count,
105    );
106    conn.edges = edges;
107    Ok(conn)
108}