1use crate::error::LunaOrmError;
2use crate::sql_generator::SqlGenerator;
3use crate::LunaOrmResult;
4use sqlx::any::AnyArguments;
5use sqlx::any::AnyQueryResult;
6use sqlx::any::AnyRow;
7use tracing::debug;
8
9use crate::command_executor::CommandExecutor;
10use crate::sql_executor::SqlExecutor;
11use luna_orm_trait::{
13 Entity, Location, Mutation, Primary, SelectedEntity, Selection, WriteCommand,
14};
15
16#[derive(Debug)]
17pub struct Transaction<'a, G>
18where
19 G: SqlGenerator + Sync + std::fmt::Debug,
20{
21 transaction: sqlx::Transaction<'a, sqlx::Any>,
22 sql_generator: &'a G,
23}
24
25impl<'a, G> CommandExecutor for Transaction<'a, G>
26where
27 G: SqlGenerator + Sync + std::fmt::Debug,
28{
29 type G = G;
30 fn get_generator(&self) -> &Self::G {
31 self.sql_generator
32 }
33}
34
35impl<'a, G> SqlExecutor for Transaction<'a, G>
36where
37 G: SqlGenerator + Sync + std::fmt::Debug,
38{
39 async fn fetch_optional<SE>(
40 &mut self,
41 stmt: &str,
42 args: AnyArguments<'_>,
43 ) -> LunaOrmResult<Option<SE>>
44 where
45 SE: SelectedEntity + Send + Unpin,
46 {
47 let query = sqlx::query_with(stmt, args).try_map(|row: AnyRow| SE::from_any_row(row));
48 let result_opt: Option<SE> = query.fetch_optional(&mut *self.transaction).await?;
49 Ok(result_opt)
50 }
51
52 async fn fetch_all<SE>(&mut self, stmt: &str, args: AnyArguments<'_>) -> LunaOrmResult<Vec<SE>>
53 where
54 SE: SelectedEntity + Send + Unpin,
55 {
56 let query = sqlx::query_with(stmt, args).try_map(|row: AnyRow| SE::from_any_row(row));
57 let result_vec: Vec<SE> = query.fetch_all(&mut *self.transaction).await?;
58 Ok(result_vec)
59 }
60
61 async fn fetch_all_plain<SE>(&mut self, stmt: &str) -> LunaOrmResult<Vec<SE>>
62 where
63 SE: SelectedEntity + Send + Unpin,
64 {
65 let query = sqlx::query(stmt).try_map(|row: AnyRow| SE::from_any_row(row));
66 let result_vec: Vec<SE> = query.fetch_all(&mut *self.transaction).await?;
67 Ok(result_vec)
68 }
69
70 async fn execute(
71 &mut self,
72 stmt: &str,
73 args: AnyArguments<'_>,
74 ) -> LunaOrmResult<AnyQueryResult> {
75 Ok(sqlx::query_with(stmt, args)
76 .execute(&mut *self.transaction)
77 .await?)
78 }
79
80 async fn execute_plain(&mut self, stmt: &str) -> LunaOrmResult<AnyQueryResult> {
81 Ok(sqlx::query(stmt).execute(&mut *self.transaction).await?)
82 }
83}
84
85impl<'a, G> Transaction<'a, G>
86where
87 G: SqlGenerator + Sync + std::fmt::Debug,
88{
89 pub fn new(trx: sqlx::Transaction<'a, sqlx::Any>, sql_generator: &'a G) -> Self {
90 Self {
91 transaction: trx,
92 sql_generator,
93 }
94 }
95
96 #[inline]
97 pub async fn commit(self) -> Result<(), LunaOrmError> {
98 Ok(self.transaction.commit().await?)
99 }
100
101 #[inline]
102 pub async fn rollback(self) -> Result<(), LunaOrmError> {
103 Ok(self.transaction.rollback().await?)
104 }
105
106 pub async fn query(&mut self, sql: &str) -> Result<usize, LunaOrmError> {
107 debug!(target: "luna_orm", command = "query", sql = sql);
108 let result = sqlx::query(sql).execute(&mut *self.transaction).await?;
109 debug!(target: "luna_orm", command = "query", result = ?result);
110 Ok(result.rows_affected() as usize)
111 }
112
113 pub async fn remove<SE>(
114 &mut self,
115 primary: &dyn Primary,
116 selection: &dyn Selection,
117 ) -> LunaOrmResult<Option<SE>>
118 where
119 SE: SelectedEntity + Send + Unpin,
120 {
121 debug!(target: "luna_orm", command = "remove", primary = ?primary, selection = ?selection);
122 let selected_entity: Option<SE> = self.select(primary, selection).await?;
123 let sql = self.get_generator().get_delete_sql(primary);
124 debug!(target: "luna_orm", command = "remove", sql = sql);
125 let args = primary.any_arguments();
126 let result = sqlx::query_with(&sql, args)
127 .execute(&mut *self.transaction)
128 .await?;
129
130 debug!(target: "luna_orm", command = "remove", result = ?result);
131 if result.rows_affected() > 0 {
132 Ok(selected_entity)
133 } else {
134 Ok(None)
135 }
136 }
137
138 async fn transact(&mut self, commands: &[WriteCommand]) -> LunaOrmResult<bool> {
139 debug!(target: "luna_orm", command = "transact", commands = ?commands);
140 for command in commands {
141 match command {
142 WriteCommand::Insert { entity } => {
143 self.insert(entity.as_ref()).await?;
144 }
145 WriteCommand::Upsert { entity } => {
146 self.upsert(entity.as_ref()).await?;
147 }
148 WriteCommand::Update { mutation, primary } => {
149 self.update(mutation.as_ref(), primary.as_ref()).await?;
150 }
151 WriteCommand::Change { mutation, location } => {
152 self.change(mutation.as_ref(), location.as_ref()).await?;
153 }
154 WriteCommand::Delete { primary } => {
155 self.delete(primary.as_ref()).await?;
156 }
157 WriteCommand::Purify { location } => {
158 self.purify(location.as_ref()).await?;
159 }
160 }
161 }
162 debug!(target: "luna_orm", command = "transact", result = true);
163 Ok(true)
164 }
165}