parsql_tokio_postgres/
transaction_ops.rs

1use postgres::types::FromSql;
2use tokio_postgres::{Error, Row, Client, Transaction};
3use std::sync::OnceLock;
4use crate::traits::{CrudOps, FromRow, SqlParams, SqlQuery, UpdateParams};
5
6/// Creates and begins a new transaction.
7/// 
8/// This function is a wrapper around the tokio-postgres `transaction()` method.
9/// It allows starting a new database transaction for performing multiple operations atomically.
10/// 
11/// # Return Value
12/// * `Result<Transaction<'_>, Error>` - On success, returns the new transaction; on failure, returns Error
13/// 
14/// # Example
15/// ```rust,no_run
16/// # use tokio_postgres::{NoTls, Error};
17/// # use parsql::tokio_postgres::transactional;
18/// # 
19/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
20/// # let (mut client, connection) = tokio_postgres::connect("", NoTls).await?;
21/// # tokio::spawn(async move { connection.await; });
22/// let transaction = transactional::begin(&mut client).await?;
23/// 
24/// // Transaction işlemlerini gerçekleştir
25/// 
26/// transaction.commit().await?;
27/// # Ok(())
28/// # }
29/// ```
30pub async fn begin(client: &mut Client) -> Result<Transaction<'_>, Error> {
31    client.transaction().await
32}
33
34/// Inserts a record within a transaction.
35/// 
36/// This function executes an INSERT SQL query within the given transaction.
37/// It returns the transaction object, allowing for method chaining.
38/// 
39/// # Arguments
40/// * `transaction` - An active transaction
41/// * `entity` - Data object to be inserted (must implement SqlQuery and SqlParams traits)
42/// 
43/// # Return Value
44/// * `Result<(Transaction<'_>, u64), Error>` - On success, returns the transaction and the number of affected rows; on failure, returns Error
45///
46/// # Example
47/// ```rust,no_run
48/// # use tokio_postgres::{NoTls, Error};
49/// # use parsql::tokio_postgres::transactional;
50/// # use parsql::macros::{Insertable, SqlParams};
51/// #
52/// #[derive(Insertable, SqlParams)]
53/// #[table("users")]
54/// struct InsertUser {
55///     name: String,
56///     email: String,
57/// }
58///
59/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
60/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
61/// # tokio::spawn(async move { connection.await; });
62/// let user = InsertUser {
63///     name: "John".to_string(),
64///     email: "john@example.com".to_string(),
65/// };
66///
67/// let transaction = transactional::begin(&client).await?;
68/// let (transaction, rows_affected) = transactional::tx_insert(transaction, user).await?;
69/// transaction.commit().await?;
70/// # Ok(())
71/// # }
72/// ```
73pub async fn tx_insert<T>(
74    transaction: Transaction<'_>,
75    entity: T,
76) -> Result<(Transaction<'_>, u64), Error>
77where
78    T: SqlQuery + SqlParams + Send + Sync + 'static
79{
80    let sql = T::query();
81    
82    static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
83    let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
84        std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
85    });
86    
87    if is_trace_enabled {
88        println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
89    }
90
91    let params = entity.params();
92    let result = transaction.execute(&sql, &params).await?;
93    Ok((transaction, result))
94}
95
96/// Updates a record within a transaction.
97/// 
98/// # Arguments
99/// * `transaction` - An active transaction
100/// * `entity` - Data object containing the update information (must implement SqlQuery and UpdateParams traits)
101/// 
102/// # Return Value
103/// * `Result<(Transaction<'_>, bool), Error>` - On success, returns the transaction and whether any record was updated
104///
105/// # Example
106/// ```rust,no_run
107/// # use tokio_postgres::{NoTls, Error};
108/// # use parsql::tokio_postgres::transactional;
109/// # use parsql::macros::{Updateable, UpdateParams};
110/// #
111/// #[derive(Updateable, UpdateParams)]
112/// #[table("users")]
113/// #[update("name, email")]
114/// #[where_clause("id = $")]
115/// struct UpdateUser {
116///     id: i64,
117///     name: String,
118///     email: String,
119/// }
120///
121/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
122/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
123/// # tokio::spawn(async move { connection.await; });
124/// let user = UpdateUser {
125///     id: 1,
126///     name: "John Smith".to_string(),
127///     email: "john.smith@example.com".to_string(),
128/// };
129///
130/// let transaction = transactional::begin(&client).await?;
131/// let (transaction, updated) = transactional::tx_update(transaction, user).await?;
132/// transaction.commit().await?;
133/// # Ok(())
134/// # }
135/// ```
136pub async fn tx_update<T>(
137    transaction: Transaction<'_>,
138    entity: T,
139) -> Result<(Transaction<'_>, bool), Error>
140where
141    T: SqlQuery + UpdateParams + Send + Sync + 'static
142{
143    let sql = T::query();
144    
145    static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
146    let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
147        std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
148    });
149    
150    if is_trace_enabled {
151        println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
152    }
153
154    let params = entity.params();
155    let result = transaction.execute(&sql, &params).await?;
156    Ok((transaction, result > 0))
157}
158
159/// Deletes a record within a transaction.
160/// 
161/// # Arguments
162/// * `transaction` - An active transaction
163/// * `entity` - Data object containing delete conditions (must implement SqlQuery and SqlParams traits)
164/// 
165/// # Return Value
166/// * `Result<(Transaction<'_>, u64), Error>` - On success, returns the transaction and number of deleted records
167///
168/// # Example
169/// ```rust,no_run
170/// # use tokio_postgres::{NoTls, Error};
171/// # use parsql::tokio_postgres::transactional;
172/// # use parsql::macros::{Deletable, SqlParams};
173/// #
174/// #[derive(Deletable, SqlParams)]
175/// #[table("users")]
176/// #[where_clause("id = $")]
177/// struct DeleteUser {
178///     id: i64,
179/// }
180///
181/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
182/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
183/// # tokio::spawn(async move { connection.await; });
184/// let user = DeleteUser { id: 1 };
185///
186/// let transaction = transactional::begin(&client).await?;
187/// let (transaction, deleted) = transactional::tx_delete(transaction, user).await?;
188/// transaction.commit().await?;
189/// # Ok(())
190/// # }
191/// ```
192pub async fn tx_delete<T>(
193    transaction: Transaction<'_>,
194    entity: T,
195) -> Result<(Transaction<'_>, u64), Error>
196where
197    T: SqlQuery + SqlParams + Send + Sync + 'static
198{
199    let sql = T::query();
200    
201    static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
202    let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
203        std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
204    });
205    
206    if is_trace_enabled {
207        println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
208    }
209
210    let params = entity.params();
211    let result = transaction.execute(&sql, &params).await?;
212    Ok((transaction, result))
213}
214
215/// Retrieves a single record within a transaction.
216/// 
217/// # Arguments
218/// * `transaction` - An active transaction
219/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
220/// 
221/// # Return Value
222/// * `Result<(Transaction<'_>, T), Error>` - On success, returns the transaction and the record
223///
224/// # Example
225/// ```rust,no_run
226/// # use tokio_postgres::{NoTls, Error};
227/// # use parsql::tokio_postgres::transactional;
228/// # use parsql::macros::{Queryable, FromRow, SqlParams};
229/// #
230/// #[derive(Queryable, FromRow, SqlParams, Debug)]
231/// #[table("users")]
232/// #[where_clause("id = $")]
233/// struct GetUser {
234///     id: i64,
235///     name: String,
236///     email: String,
237/// }
238///
239/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
240/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
241/// # tokio::spawn(async move { connection.await; });
242/// let query = GetUser {
243///     id: 1,
244///     name: Default::default(),
245///     email: Default::default(),
246/// };
247///
248/// let transaction = transactional::begin(&client).await?;
249/// let (transaction, user) = transactional::tx_fetch(transaction, query).await?;
250/// transaction.commit().await?;
251/// # Ok(())
252/// # }
253/// ```
254pub async fn tx_fetch<T>(
255    transaction: Transaction<'_>,
256    params: T,
257) -> Result<(Transaction<'_>, T), Error>
258where
259    T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
260{
261    let sql = T::query();
262    
263    static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
264    let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
265        std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
266    });
267    
268    if is_trace_enabled {
269        println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
270    }
271
272    let query_params = params.params();
273    let row = transaction.query_one(&sql, &query_params).await?;
274    let result = T::from_row(&row)?;
275    Ok((transaction, result))
276}
277
278/// Retrieves multiple records within a transaction.
279/// 
280/// # Arguments
281/// * `transaction` - An active transaction
282/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
283/// 
284/// # Return Value
285/// * `Result<(Transaction<'_>, Vec<T>), Error>` - On success, returns the transaction and a vector of records
286///
287/// # Example
288/// ```rust,no_run
289/// # use tokio_postgres::{NoTls, Error};
290/// # use parsql::tokio_postgres::transactional;
291/// # use parsql::macros::{Queryable, FromRow, SqlParams};
292/// #
293/// #[derive(Queryable, FromRow, SqlParams, Debug)]
294/// #[table("users")]
295/// #[where_clause("state = $")]
296/// struct GetActiveUsers {
297///     id: i64,
298///     name: String,
299///     email: String,
300///     state: i16,
301/// }
302///
303/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
304/// # let (client, connection) = tokio_postgres::connect("", NoTls).await?;
305/// # tokio::spawn(async move { connection.await; });
306/// let query = GetActiveUsers {
307///     id: 0,
308///     name: Default::default(),
309///     email: Default::default(),
310///     state: 1, // active users
311/// };
312///
313/// let transaction = transactional::begin(&client).await?;
314/// let (transaction, users) = transactional::tx_fetch_all(transaction, query).await?;
315/// transaction.commit().await?;
316/// # Ok(())
317/// # }
318/// ```
319pub async fn tx_fetch_all<T>(
320    transaction: Transaction<'_>,
321    params: T,
322) -> Result<(Transaction<'_>, Vec<T>), Error>
323where
324    T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
325{
326    let sql = T::query();
327    
328    static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
329    let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
330        std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
331    });
332    
333    if is_trace_enabled {
334        println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
335    }
336
337    let query_params = params.params();
338    let rows = transaction.query(&sql, &query_params).await?;
339    
340    let mut results = Vec::with_capacity(rows.len());
341    for row in rows {
342        results.push(T::from_row(&row)?);
343    }
344    
345    Ok((transaction, results))
346}
347
348/// Retrieves a single record within a transaction.
349/// 
350/// # Deprecated
351/// This function has been renamed to `tx_fetch`. Please use `tx_fetch` instead.
352///
353/// # Arguments
354/// * `transaction` - An active transaction
355/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
356/// 
357/// # Return Value
358/// * `Result<(Transaction<'_>, T), Error>` - On success, returns the transaction and the record
359#[deprecated(
360    since = "0.2.0",
361    note = "Renamed to `tx_fetch`. Please use `tx_fetch` function instead."
362)]
363pub async fn tx_get<T>(
364    transaction: Transaction<'_>,
365    params: T,
366) -> Result<(Transaction<'_>, T), Error>
367where
368    T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
369{
370    tx_fetch(transaction, params).await
371}
372
373/// Retrieves multiple records within a transaction.
374/// 
375/// # Deprecated
376/// This function has been renamed to `tx_fetch_all`. Please use `tx_fetch_all` instead.
377///
378/// # Arguments
379/// * `transaction` - An active transaction
380/// * `params` - Data object containing query parameters (must implement SqlQuery, FromRow, and SqlParams traits)
381/// 
382/// # Return Value
383/// * `Result<(Transaction<'_>, Vec<T>), Error>` - On success, returns the transaction and a vector of records
384#[deprecated(
385    since = "0.2.0",
386    note = "Renamed to `tx_fetch_all`. Please use `tx_fetch_all` function instead."
387)]
388pub async fn tx_get_all<T>(
389    transaction: Transaction<'_>,
390    params: T,
391) -> Result<(Transaction<'_>, Vec<T>), Error>
392where
393    T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static
394{
395    tx_fetch_all(transaction, params).await
396}
397
398/// Implementation of the CrudOps trait for Transactions
399///
400/// This implementation allows using the `CrudOps` trait methods directly on 
401/// `Transaction<'a>` objects, similar to how they are used on `Client` objects.
402/// This provides a consistent API for both regular client operations and transaction operations.
403///
404/// # Examples
405///
406/// ```rust,no_run
407/// # use tokio_postgres::{NoTls, Error};
408/// # use parsql::tokio_postgres::{CrudOps};
409/// # use parsql::macros::{Insertable, SqlParams};
410/// #
411/// # #[derive(Insertable, SqlParams)]
412/// # #[table("users")]
413/// # struct InsertUser {
414/// #     name: String,
415/// #     email: String,
416/// # }
417/// # 
418/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
419/// # let (mut client, connection) = tokio_postgres::connect("", NoTls).await?;
420/// # tokio::spawn(async move { connection.await; });
421/// let transaction = client.transaction().await?;
422///
423/// // Using CrudOps trait method directly on transaction
424/// let user = InsertUser {
425///     name: "John".to_string(),
426///     email: "john@example.com".to_string(),
427/// };
428///
429/// let rows_affected = transaction.insert(user).await?;
430/// println!("Rows affected: {}", rows_affected);
431///
432/// transaction.commit().await?;
433/// # Ok(())
434/// # }
435/// ```
436#[async_trait::async_trait]
437impl<'a> CrudOps for Transaction<'a> {
438    async fn insert<T, P:for<'b> FromSql<'b> + Send + Sync>(&self, entity: T) -> Result<P, Error>
439    where
440        T: SqlQuery + SqlParams + Send + Sync + 'static,
441    {
442        let sql = T::query();
443        
444        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
445        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
446            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
447        });
448        
449        if is_trace_enabled {
450            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
451        }
452
453        let params = entity.params();
454        let row = self.query_one(&sql, &params).await?;
455        row.try_get::<_, P>(0)
456    }
457
458    async fn update<T>(&self, entity: T) -> Result<bool, Error>
459    where
460        T: SqlQuery + UpdateParams + Send + Sync + 'static,
461    {
462        let sql = T::query();
463        
464        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
465        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
466            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
467        });
468        
469        if is_trace_enabled {
470            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
471        }
472
473        let params = entity.params();
474        let result = self.execute(&sql, &params).await?;
475        Ok(result > 0)
476    }
477
478    async fn delete<T>(&self, entity: T) -> Result<u64, Error>
479    where
480        T: SqlQuery + SqlParams + Send + Sync + 'static,
481    {
482        let sql = T::query();
483        
484        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
485        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
486            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
487        });
488        
489        if is_trace_enabled {
490            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
491        }
492
493        let params = entity.params();
494        self.execute(&sql, &params).await
495    }
496
497    async fn fetch<T>(&self, params: T) -> Result<T, Error>
498    where
499        T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
500    {
501        let sql = T::query();
502        
503        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
504        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
505            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
506        });
507        
508        if is_trace_enabled {
509            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
510        }
511
512        let query_params = params.params();
513        let row = self.query_one(&sql, &query_params).await?;
514        T::from_row(&row)
515    }
516
517    async fn fetch_all<T>(&self, params: T) -> Result<Vec<T>, Error>
518    where
519        T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
520    {
521        let sql = T::query();
522        
523        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
524        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
525            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
526        });
527        
528        if is_trace_enabled {
529            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
530        }
531
532        let query_params = params.params();
533        let rows = self.query(&sql, &query_params).await?;
534        
535        let mut results = Vec::with_capacity(rows.len());
536        for row in rows {
537            results.push(T::from_row(&row)?);
538        }
539        
540        Ok(results)
541    }
542    
543    // Use #[allow(deprecated)] to suppress warnings when implementing deprecated methods
544    #[allow(deprecated)]
545    async fn get<T>(&self, params: T) -> Result<T, Error>
546    where
547        T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
548    {
549        self.fetch(params).await
550    }
551
552    // Use #[allow(deprecated)] to suppress warnings when implementing deprecated methods
553    #[allow(deprecated)]
554    async fn get_all<T>(&self, params: T) -> Result<Vec<T>, Error>
555    where
556        T: SqlQuery + FromRow + SqlParams + Send + Sync + 'static,
557    {
558        self.fetch_all(params).await
559    }
560
561    async fn select<T, F, R>(&self, entity: T, to_model: F) -> Result<R, Error>
562    where
563        T: SqlQuery + SqlParams + Send + Sync + 'static,
564        F: Fn(&Row) -> Result<R, Error> + Send + Sync + 'static,
565        R: Send + 'static,
566    {
567        let sql = T::query();
568        
569        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
570        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
571            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
572        });
573        
574        if is_trace_enabled {
575            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
576        }
577
578        let params = entity.params();
579        let row = self.query_one(&sql, &params).await?;
580        to_model(&row)
581    }
582
583    async fn select_all<T, F, R>(&self, entity: T, to_model: F) -> Result<Vec<R>, Error>
584    where
585        T: SqlQuery + SqlParams + Send + Sync + 'static,
586        F: Fn(&Row) -> R + Send + Sync + 'static,
587        R: Send + 'static,
588    {
589        let sql = T::query();
590        
591        static TRACE_ENABLED: OnceLock<bool> = OnceLock::new();
592        let is_trace_enabled = *TRACE_ENABLED.get_or_init(|| {
593            std::env::var("PARSQL_TRACE").unwrap_or_default() == "1"
594        });
595        
596        if is_trace_enabled {
597            println!("[PARSQL-TOKIO-POSTGRES-TX] Execute SQL: {}", sql);
598        }
599
600        let params = entity.params();
601        let rows = self.query(&sql, &params).await?;
602        
603        let mut results = Vec::with_capacity(rows.len());
604        for row in &rows {
605            results.push(to_model(row));
606        }
607        
608        Ok(results)
609    }
610}