luna_orm/
transaction.rs

1use crate::error::LunaOrmError;
2use crate::sql_generator::SqlGenerator;
3use crate::LunaOrmResult;
4use sqlx::any::AnyArguments;
5use sqlx::any::AnyQueryResult;
6use sqlx::any::AnyRow;
7use tracing::debug;
8
9use crate::command_executor::CommandExecutor;
10use crate::sql_executor::SqlExecutor;
11//use async_trait::async_trait;
12use luna_orm_trait::{
13    Entity, Location, Mutation, Primary, SelectedEntity, Selection, WriteCommand,
14};
15
16#[derive(Debug)]
17pub struct Transaction<'a, G>
18where
19    G: SqlGenerator + Sync + std::fmt::Debug,
20{
21    transaction: sqlx::Transaction<'a, sqlx::Any>,
22    sql_generator: &'a G,
23}
24
25impl<'a, G> CommandExecutor for Transaction<'a, G>
26where
27    G: SqlGenerator + Sync + std::fmt::Debug,
28{
29    type G = G;
30    fn get_generator(&self) -> &Self::G {
31        self.sql_generator
32    }
33}
34
35impl<'a, G> SqlExecutor for Transaction<'a, G>
36where
37    G: SqlGenerator + Sync + std::fmt::Debug,
38{
39    async fn fetch_optional<SE>(
40        &mut self,
41        stmt: &str,
42        args: AnyArguments<'_>,
43    ) -> LunaOrmResult<Option<SE>>
44    where
45        SE: SelectedEntity + Send + Unpin,
46    {
47        let query = sqlx::query_with(stmt, args).try_map(|row: AnyRow| SE::from_any_row(row));
48        let result_opt: Option<SE> = query.fetch_optional(&mut *self.transaction).await?;
49        Ok(result_opt)
50    }
51
52    async fn fetch_all<SE>(&mut self, stmt: &str, args: AnyArguments<'_>) -> LunaOrmResult<Vec<SE>>
53    where
54        SE: SelectedEntity + Send + Unpin,
55    {
56        let query = sqlx::query_with(stmt, args).try_map(|row: AnyRow| SE::from_any_row(row));
57        let result_vec: Vec<SE> = query.fetch_all(&mut *self.transaction).await?;
58        Ok(result_vec)
59    }
60
61    async fn fetch_all_plain<SE>(&mut self, stmt: &str) -> LunaOrmResult<Vec<SE>>
62    where
63        SE: SelectedEntity + Send + Unpin,
64    {
65        let query = sqlx::query(stmt).try_map(|row: AnyRow| SE::from_any_row(row));
66        let result_vec: Vec<SE> = query.fetch_all(&mut *self.transaction).await?;
67        Ok(result_vec)
68    }
69
70    async fn execute(
71        &mut self,
72        stmt: &str,
73        args: AnyArguments<'_>,
74    ) -> LunaOrmResult<AnyQueryResult> {
75        Ok(sqlx::query_with(stmt, args)
76            .execute(&mut *self.transaction)
77            .await?)
78    }
79
80    async fn execute_plain(&mut self, stmt: &str) -> LunaOrmResult<AnyQueryResult> {
81        Ok(sqlx::query(stmt).execute(&mut *self.transaction).await?)
82    }
83}
84
85impl<'a, G> Transaction<'a, G>
86where
87    G: SqlGenerator + Sync + std::fmt::Debug,
88{
89    pub fn new(trx: sqlx::Transaction<'a, sqlx::Any>, sql_generator: &'a G) -> Self {
90        Self {
91            transaction: trx,
92            sql_generator,
93        }
94    }
95
96    #[inline]
97    pub async fn commit(self) -> Result<(), LunaOrmError> {
98        Ok(self.transaction.commit().await?)
99    }
100
101    #[inline]
102    pub async fn rollback(self) -> Result<(), LunaOrmError> {
103        Ok(self.transaction.rollback().await?)
104    }
105
106    pub async fn query(&mut self, sql: &str) -> Result<usize, LunaOrmError> {
107        debug!(target: "luna_orm", command = "query",  sql = sql);
108        let result = sqlx::query(sql).execute(&mut *self.transaction).await?;
109        debug!(target: "luna_orm", command = "query",  result = ?result);
110        Ok(result.rows_affected() as usize)
111    }
112
113    pub async fn remove<SE>(
114        &mut self,
115        primary: &dyn Primary,
116        selection: &dyn Selection,
117    ) -> LunaOrmResult<Option<SE>>
118    where
119        SE: SelectedEntity + Send + Unpin,
120    {
121        debug!(target: "luna_orm", command = "remove",  primary = ?primary, selection = ?selection);
122        let selected_entity: Option<SE> = self.select(primary, selection).await?;
123        let sql = self.get_generator().get_delete_sql(primary);
124        debug!(target: "luna_orm", command = "remove",  sql = sql);
125        let args = primary.any_arguments();
126        let result = sqlx::query_with(&sql, args)
127            .execute(&mut *self.transaction)
128            .await?;
129
130        debug!(target: "luna_orm", command = "remove",  result = ?result);
131        if result.rows_affected() > 0 {
132            Ok(selected_entity)
133        } else {
134            Ok(None)
135        }
136    }
137
138    async fn transact(&mut self, commands: &[WriteCommand]) -> LunaOrmResult<bool> {
139        debug!(target: "luna_orm", command = "transact",  commands = ?commands);
140        for command in commands {
141            match command {
142                WriteCommand::Insert { entity } => {
143                    self.insert(entity.as_ref()).await?;
144                }
145                WriteCommand::Upsert { entity } => {
146                    self.upsert(entity.as_ref()).await?;
147                }
148                WriteCommand::Update { mutation, primary } => {
149                    self.update(mutation.as_ref(), primary.as_ref()).await?;
150                }
151                WriteCommand::Change { mutation, location } => {
152                    self.change(mutation.as_ref(), location.as_ref()).await?;
153                }
154                WriteCommand::Delete { primary } => {
155                    self.delete(primary.as_ref()).await?;
156                }
157                WriteCommand::Purify { location } => {
158                    self.purify(location.as_ref()).await?;
159                }
160            }
161        }
162        debug!(target: "luna_orm", command = "transact",  result = true);
163        Ok(true)
164    }
165}