parsql_tokio_postgres/transaction_ops.rs
1use postgres::types::FromSql;
2use tokio_postgres::{Error, Row, Client, Transaction};
3use std::sync::OnceLock;
4use crate::traits::{CrudOps, FromRow, SqlParams, SqlQuery, UpdateParams};
5
6/// Creates and begins a new transaction.
7///
8/// This function is a wrapper around the tokio-postgres `transaction()` method.
9/// It allows starting a new database transaction for performing multiple operations atomically.
10///
11/// # Return Value
12/// * `Result<Transaction<'_>, Error>` - On success, returns the new transaction; on failure, returns Error
13///
14/// # Example
15/// ```rust,no_run
16/// # use tokio_postgres::{NoTls, Error};
17/// # use parsql::tokio_postgres::transactional;
18/// #
19/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20/// # let (mut client, connection) = tokio_postgres::connect("", NoTls).await?;
21/// # tokio::spawn(async move { connection.await; });
22/// let transaction = transactional::begin(&mut client).await?;
23///
24/// // Transaction işlemlerini gerçekleştir
25///
26/// transaction.commit().await?;
27/// # Ok(())
28/// # }
29/// ```
30pub async fn begin(client: &mut Client) -> Result<Transaction<'_>, Error> {
31 client.transaction().await
32}
33
34/// Inserts a record within a transaction.
35///
36/// This function executes an INSERT SQL query within the given transaction.
37/// It returns the transaction object, allowing for method chaining.
38///
39/// # Arguments
40/// * `transaction` - An active transaction
41/// * `entity` - Data object to be inserted (must implement SqlQuery and SqlParams traits)
42///
43/// # Return Value
44/// * `Result<(Transaction<'_>, u64), Error>` - On success, returns the transaction and the number of affected rows; on failure, returns Error
45///
46/// # Example
47/// ```rust,no_run
48/// # use tokio_postgres::{NoTls, Error};
49/// # use parsql::tokio_postgres::transactional;
50/// # use parsql::macros::{Insertable, SqlParams};
51/// #
52/// #[derive(Insertable, SqlParams)]
53/// #[table("users")]
54/// struct InsertUser {
55/// name: String,
56/// email: String,
57/// }
58///
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
61/// # tokio::spawn(async move { connection.await; });
62/// let user = InsertUser {
63/// name: "John".to_string(),
64/// email: "john@example.com".to_string(),
65/// };
66///
67/// let transaction = transactional::begin(&client).await?;
68/// let (transaction, rows_affected) = transactional::tx_insert(transaction, user).await?;
69/// transaction.commit().await?;
70/// # Ok(())
71/// # }
72/// ```
73pub async fn tx_insert<T>(
74 transaction: Transaction<'_>,
75 entity: T,
76) -> Result<(Transaction<'_>, u64), Error>
77where
78 T: SqlQuery + SqlParams + Send + Sync + 'static
79{
80 let sql = T::query();
81
82 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
83 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
84 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
85 });
86
87 if is_trace_enabled {
88 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
89 }
90
91 let params = entity.params();
92 let result = transaction.execute(&sql, ¶ms).await?;
93 Ok((transaction, result))
94}
95
96/// Updates a record within a transaction.
97///
98/// # Arguments
99/// * `transaction` - An active transaction
100/// * `entity` - Data object containing the update information (must implement SqlQuery and UpdateParams traits)
101///
102/// # Return Value
103/// * `Result<(Transaction<'_>, bool), Error>` - On success, returns the transaction and whether any record was updated
104///
105/// # Example
106/// ```rust,no_run
107/// # use tokio_postgres::{NoTls, Error};
108/// # use parsql::tokio_postgres::transactional;
109/// # use parsql::macros::{Updateable, UpdateParams};
110/// #
111/// #[derive(Updateable, UpdateParams)]
112/// #[table("users")]
113/// #[update("name, email")]
114/// #[where_clause("id = $")]
115/// struct UpdateUser {
116/// id: i64,
117/// name: String,
118/// email: String,
119/// }
120///
121/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
122/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
123/// # tokio::spawn(async move { connection.await; });
124/// let user = UpdateUser {
125/// id: 1,
126/// name: "John Smith".to_string(),
127/// email: "john.smith@example.com".to_string(),
128/// };
129///
130/// let transaction = transactional::begin(&client).await?;
131/// let (transaction, updated) = transactional::tx_update(transaction, user).await?;
132/// transaction.commit().await?;
133/// # Ok(())
134/// # }
135/// ```
136pub async fn tx_update<T>(
137 transaction: Transaction<'_>,
138 entity: T,
139) -> Result<(Transaction<'_>, bool), Error>
140where
141 T: SqlQuery + UpdateParams + Send + Sync + 'static
142{
143 let sql = T::query();
144
145 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
146 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
147 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
148 });
149
150 if is_trace_enabled {
151 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
152 }
153
154 let params = entity.params();
155 let result = transaction.execute(&sql, ¶ms).await?;
156 Ok((transaction, result > 0))
157}
158
159/// Deletes a record within a transaction.
160///
161/// # Arguments
162/// * `transaction` - An active transaction
163/// * `entity` - Data object containing delete conditions (must implement SqlQuery and SqlParams traits)
164///
165/// # Return Value
166/// * `Result<(Transaction<'_>, u64), Error>` - On success, returns the transaction and number of deleted records
167///
168/// # Example
169/// ```rust,no_run
170/// # use tokio_postgres::{NoTls, Error};
171/// # use parsql::tokio_postgres::transactional;
172/// # use parsql::macros::{Deletable, SqlParams};
173/// #
174/// #[derive(Deletable, SqlParams)]
175/// #[table("users")]
176/// #[where_clause("id = $")]
177/// struct DeleteUser {
178/// id: i64,
179/// }
180///
181/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
182/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
183/// # tokio::spawn(async move { connection.await; });
184/// let user = DeleteUser { id: 1 };
185///
186/// let transaction = transactional::begin(&client).await?;
187/// let (transaction, deleted) = transactional::tx_delete(transaction, user).await?;
188/// transaction.commit().await?;
189/// # Ok(())
190/// # }
191/// ```
192pub async fn tx_delete<T>(
193 transaction: Transaction<'_>,
194 entity: T,
195) -> Result<(Transaction<'_>, u64), Error>
196where
197 T: SqlQuery + SqlParams + Send + Sync + 'static
198{
199 let sql = T::query();
200
201 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
202 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
203 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
204 });
205
206 if is_trace_enabled {
207 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
208 }
209
210 let params = entity.params();
211 let result = transaction.execute(&sql, ¶ms).await?;
212 Ok((transaction, result))
213}
214
215/// Retrieves a single record within a transaction.
216///
217/// # Arguments
218/// * `transaction` - An active transaction
219/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
220///
221/// # Return Value
222/// * `Result<(Transaction<'_>, T), Error>` - On success, returns the transaction and the record
223///
224/// # Example
225/// ```rust,no_run
226/// # use tokio_postgres::{NoTls, Error};
227/// # use parsql::tokio_postgres::transactional;
228/// # use parsql::macros::{Queryable, FromRow, SqlParams};
229/// #
230/// #[derive(Queryable, FromRow, SqlParams, Debug)]
231/// #[table("users")]
232/// #[where_clause("id = $")]
233/// struct GetUser {
234/// id: i64,
235/// name: String,
236/// email: String,
237/// }
238///
239/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
240/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
241/// # tokio::spawn(async move { connection.await; });
242/// let query = GetUser {
243/// id: 1,
244/// name: Default::default(),
245/// email: Default::default(),
246/// };
247///
248/// let transaction = transactional::begin(&client).await?;
249/// let (transaction, user) = transactional::tx_fetch(transaction, query).await?;
250/// transaction.commit().await?;
251/// # Ok(())
252/// # }
253/// ```
254pub async fn tx_fetch<T>(
255 transaction: Transaction<'_>,
256 params: T,
257) -> Result<(Transaction<'_>, T), Error>
258where
259 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
260{
261 let sql = T::query();
262
263 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
264 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
265 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
266 });
267
268 if is_trace_enabled {
269 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
270 }
271
272 let query_params = params.params();
273 let row = transaction.query_one(&sql, &query_params).await?;
274 let result = T::from_row(&row)?;
275 Ok((transaction, result))
276}
277
278/// Retrieves multiple records within a transaction.
279///
280/// # Arguments
281/// * `transaction` - An active transaction
282/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
283///
284/// # Return Value
285/// * `Result<(Transaction<'_>, Vec<T>), Error>` - On success, returns the transaction and a vector of records
286///
287/// # Example
288/// ```rust,no_run
289/// # use tokio_postgres::{NoTls, Error};
290/// # use parsql::tokio_postgres::transactional;
291/// # use parsql::macros::{Queryable, FromRow, SqlParams};
292/// #
293/// #[derive(Queryable, FromRow, SqlParams, Debug)]
294/// #[table("users")]
295/// #[where_clause("state = $")]
296/// struct GetActiveUsers {
297/// id: i64,
298/// name: String,
299/// email: String,
300/// state: i16,
301/// }
302///
303/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
304/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
305/// # tokio::spawn(async move { connection.await; });
306/// let query = GetActiveUsers {
307/// id: 0,
308/// name: Default::default(),
309/// email: Default::default(),
310/// state: 1, // active users
311/// };
312///
313/// let transaction = transactional::begin(&client).await?;
314/// let (transaction, users) = transactional::tx_fetch_all(transaction, query).await?;
315/// transaction.commit().await?;
316/// # Ok(())
317/// # }
318/// ```
319pub async fn tx_fetch_all<T>(
320 transaction: Transaction<'_>,
321 params: T,
322) -> Result<(Transaction<'_>, Vec<T>), Error>
323where
324 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
325{
326 let sql = T::query();
327
328 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
329 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
330 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
331 });
332
333 if is_trace_enabled {
334 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
335 }
336
337 let query_params = params.params();
338 let rows = transaction.query(&sql, &query_params).await?;
339
340 let mut results = Vec::with_capacity(rows.len());
341 for row in rows {
342 results.push(T::from_row(&row)?);
343 }
344
345 Ok((transaction, results))
346}
347
348/// Retrieves a single record within a transaction.
349///
350/// # Deprecated
351/// This function has been renamed to `tx_fetch`. Please use `tx_fetch` instead.
352///
353/// # Arguments
354/// * `transaction` - An active transaction
355/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
356///
357/// # Return Value
358/// * `Result<(Transaction<'_>, T), Error>` - On success, returns the transaction and the record
359#[deprecated(
360 since = "0.2.0",
361 note = "Renamed to `tx_fetch`. Please use `tx_fetch` function instead."
362)]
363pub async fn tx_get<T>(
364 transaction: Transaction<'_>,
365 params: T,
366) -> Result<(Transaction<'_>, T), Error>
367where
368 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
369{
370 tx_fetch(transaction, params).await
371}
372
373/// Retrieves multiple records within a transaction.
374///
375/// # Deprecated
376/// This function has been renamed to `tx_fetch_all`. Please use `tx_fetch_all` instead.
377///
378/// # Arguments
379/// * `transaction` - An active transaction
380/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
381///
382/// # Return Value
383/// * `Result<(Transaction<'_>, Vec<T>), Error>` - On success, returns the transaction and a vector of records
384#[deprecated(
385 since = "0.2.0",
386 note = "Renamed to `tx_fetch_all`. Please use `tx_fetch_all` function instead."
387)]
388pub async fn tx_get_all<T>(
389 transaction: Transaction<'_>,
390 params: T,
391) -> Result<(Transaction<'_>, Vec<T>), Error>
392where
393 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
394{
395 tx_fetch_all(transaction, params).await
396}
397
398/// Implementation of the CrudOps trait for Transactions
399///
400/// This implementation allows using the `CrudOps` trait methods directly on
401/// `Transaction<'a>` objects, similar to how they are used on `Client` objects.
402/// This provides a consistent API for both regular client operations and transaction operations.
403///
404/// # Examples
405///
406/// ```rust,no_run
407/// # use tokio_postgres::{NoTls, Error};
408/// # use parsql::tokio_postgres::{CrudOps};
409/// # use parsql::macros::{Insertable, SqlParams};
410/// #
411/// # #[derive(Insertable, SqlParams)]
412/// # #[table("users")]
413/// # struct InsertUser {
414/// # name: String,
415/// # email: String,
416/// # }
417/// #
418/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
419/// # let (mut client, connection) = tokio_postgres::connect("", NoTls).await?;
420/// # tokio::spawn(async move { connection.await; });
421/// let transaction = client.transaction().await?;
422///
423/// // Using CrudOps trait method directly on transaction
424/// let user = InsertUser {
425/// name: "John".to_string(),
426/// email: "john@example.com".to_string(),
427/// };
428///
429/// let rows_affected = transaction.insert(user).await?;
430/// println!("Rows affected: {}", rows_affected);
431///
432/// transaction.commit().await?;
433/// # Ok(())
434/// # }
435/// ```
436#[async_trait::async_trait]
437impl<'a> CrudOps for Transaction<'a> {
438 async fn insert<T, P:for<'b> FromSql<'b> + Send + Sync>(&self, entity: T) -> Result<P, Error>
439 where
440 T: SqlQuery + SqlParams + Send + Sync + 'static,
441 {
442 let sql = T::query();
443
444 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
445 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
446 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
447 });
448
449 if is_trace_enabled {
450 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
451 }
452
453 let params = entity.params();
454 let row = self.query_one(&sql, ¶ms).await?;
455 row.try_get::<_, P>(0)
456 }
457
458 async fn update<T>(&self, entity: T) -> Result<bool, Error>
459 where
460 T: SqlQuery + UpdateParams + Send + Sync + 'static,
461 {
462 let sql = T::query();
463
464 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
465 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
466 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
467 });
468
469 if is_trace_enabled {
470 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
471 }
472
473 let params = entity.params();
474 let result = self.execute(&sql, ¶ms).await?;
475 Ok(result > 0)
476 }
477
478 async fn delete<T>(&self, entity: T) -> Result<u64, Error>
479 where
480 T: SqlQuery + SqlParams + Send + Sync + 'static,
481 {
482 let sql = T::query();
483
484 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
485 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
486 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
487 });
488
489 if is_trace_enabled {
490 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
491 }
492
493 let params = entity.params();
494 self.execute(&sql, ¶ms).await
495 }
496
497 async fn fetch<T>(&self, params: T) -> Result<T, Error>
498 where
499 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
500 {
501 let sql = T::query();
502
503 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
504 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
505 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
506 });
507
508 if is_trace_enabled {
509 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
510 }
511
512 let query_params = params.params();
513 let row = self.query_one(&sql, &query_params).await?;
514 T::from_row(&row)
515 }
516
517 async fn fetch_all<T>(&self, params: T) -> Result<Vec<T>, Error>
518 where
519 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
520 {
521 let sql = T::query();
522
523 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
524 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
525 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
526 });
527
528 if is_trace_enabled {
529 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
530 }
531
532 let query_params = params.params();
533 let rows = self.query(&sql, &query_params).await?;
534
535 let mut results = Vec::with_capacity(rows.len());
536 for row in rows {
537 results.push(T::from_row(&row)?);
538 }
539
540 Ok(results)
541 }
542
543 // Use #[allow(deprecated)] to suppress warnings when implementing deprecated methods
544 #[allow(deprecated)]
545 async fn get<T>(&self, params: T) -> Result<T, Error>
546 where
547 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
548 {
549 self.fetch(params).await
550 }
551
552 // Use #[allow(deprecated)] to suppress warnings when implementing deprecated methods
553 #[allow(deprecated)]
554 async fn get_all<T>(&self, params: T) -> Result<Vec<T>, Error>
555 where
556 T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
557 {
558 self.fetch_all(params).await
559 }
560
561 async fn select<T, F, R>(&self, entity: T, to_model: F) -> Result<R, Error>
562 where
563 T: SqlQuery + SqlParams + Send + Sync + 'static,
564 F: Fn(&Row) -> Result<R, Error> + Send + Sync + 'static,
565 R: Send + 'static,
566 {
567 let sql = T::query();
568
569 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
570 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
571 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
572 });
573
574 if is_trace_enabled {
575 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
576 }
577
578 let params = entity.params();
579 let row = self.query_one(&sql, ¶ms).await?;
580 to_model(&row)
581 }
582
583 async fn select_all<T, F, R>(&self, entity: T, to_model: F) -> Result<Vec<R>, Error>
584 where
585 T: SqlQuery + SqlParams + Send + Sync + 'static,
586 F: Fn(&Row) -> R + Send + Sync + 'static,
587 R: Send + 'static,
588 {
589 let sql = T::query();
590
591 static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
592 let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
593 std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
594 });
595
596 if is_trace_enabled {
597 println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
598 }
599
600 let params = entity.params();
601 let rows = self.query(&sql, ¶ms).await?;
602
603 let mut results = Vec::with_capacity(rows.len());
604 for row in &rows {
605 results.push(to_model(row));
606 }
607
608 Ok(results)
609 }
610}