c3p0_postgres/
json.rs

1use std::borrow::Cow;
2use std::sync::Arc;
3
4use crate::tokio_postgres::{row::Row, types::ToSql};
5use crate::*;
6use ::tokio_postgres::types::FromSqlOwned;
7use c3p0_common::json::Queries;
8use c3p0_common::time::utils::get_current_epoch_millis;
9use c3p0_common::*;
10
11pub trait PostgresIdType: IdType + FromSqlOwned + ToSql {}
12impl<T: IdType + FromSqlOwned + ToSql> PostgresIdType for T {}
13
14pub type PostgresVersionType = i32;
15
16/// A trait that allows the creation of an Id
17pub trait IdGenerator<Id: IdType, DbId: PostgresIdType>: Send + Sync {
18    /// Returns the column type for the id in the create statement
19    fn create_statement_column_type(&self) -> &str;
20    /// Generates a new id
21    fn generate_id(&self) -> Option<DbId>;
22    /// Converts an Id to a DbId
23    fn id_to_db_id<'a>(&self, id: Cow<'a, Id>) -> Result<Cow<'a, DbId>, C3p0Error>;
24    /// Converts a DbId to an Id
25    fn db_id_to_id<'a>(&self, id: Cow<'a, DbId>) -> Result<Cow<'a, Id>, C3p0Error>;
26}
27
28/// An IdGenerator that uses the auto-increment feature of the database
29pub struct AutogeneratedIdGenerator {}
30
31impl IdGenerator<u64, i64> for AutogeneratedIdGenerator {
32    fn create_statement_column_type(&self) -> &str {
33        "bigserial"
34    }
35
36    fn generate_id(&self) -> Option<i64> {
37        None
38    }
39
40    fn id_to_db_id<'a>(&self, id: Cow<'a, u64>) -> Result<Cow<'a, i64>, C3p0Error> {
41        Ok(Cow::Owned(id.into_owned() as i64))
42    }
43
44    fn db_id_to_id<'a>(&self, id: Cow<'a, i64>) -> Result<Cow<'a, u64>, C3p0Error> {
45        Ok(Cow::Owned(id.into_owned() as u64))
46    }
47}
48
49/// An IdGenerator that uses the uuid crate to generate a random uuid
50pub struct UuidIdGenerator {}
51
52impl IdGenerator<uuid::Uuid, uuid::Uuid> for UuidIdGenerator {
53    fn create_statement_column_type(&self) -> &str {
54        "uuid"
55    }
56
57    fn generate_id(&self) -> Option<uuid::Uuid> {
58        Some(uuid::Uuid::new_v4())
59    }
60
61    fn id_to_db_id<'a>(&self, id: Cow<'a, uuid::Uuid>) -> Result<Cow<'a, uuid::Uuid>, C3p0Error> {
62        Ok(id)
63    }
64
65    fn db_id_to_id<'a>(&self, id: Cow<'a, uuid::Uuid>) -> Result<Cow<'a, uuid::Uuid>, C3p0Error> {
66        Ok(id)
67    }
68}
69
70/// A builder for a PgC3p0Json
71#[derive(Clone)]
72pub struct PgC3p0JsonBuilder<Id: IdType, DbId: PostgresIdType> {
73    pub id_generator: Arc<dyn IdGenerator<Id, DbId>>,
74    pub id_field_name: String,
75    pub version_field_name: String,
76    pub create_epoch_millis_field_name: String,
77    pub update_epoch_millis_field_name: String,
78    pub data_field_name: String,
79    pub table_name: String,
80    pub schema_name: Option<String>,
81}
82
83impl PgC3p0JsonBuilder<u64, i64> {
84    /// Creates a new PgC3p0JsonBuilder for a table with the given name
85    pub fn new<T: Into<String>>(table_name: T) -> Self {
86        let table_name = table_name.into();
87        PgC3p0JsonBuilder {
88            id_generator: Arc::new(AutogeneratedIdGenerator {}),
89            table_name,
90            id_field_name: "id".to_owned(),
91            version_field_name: "version".to_owned(),
92            create_epoch_millis_field_name: "create_epoch_millis".to_owned(),
93            update_epoch_millis_field_name: "update_epoch_millis".to_owned(),
94            data_field_name: "data".to_owned(),
95            schema_name: None,
96        }
97    }
98}
99
100impl<Id: IdType, DbId: PostgresIdType> PgC3p0JsonBuilder<Id, DbId> {
101    /// Sets the id field name
102    pub fn with_id_field_name<T: Into<String>>(mut self, id_field_name: T) -> Self {
103        self.id_field_name = id_field_name.into();
104        self
105    }
106
107    /// Sets the version field name
108    pub fn with_version_field_name<T: Into<String>>(mut self, version_field_name: T) -> Self {
109        self.version_field_name = version_field_name.into();
110        self
111    }
112
113    /// Sets the create_epoch_millis field name
114    pub fn with_create_epoch_millis_field_name<T: Into<String>>(
115        mut self,
116        create_epoch_millis_field_name: T,
117    ) -> Self {
118        self.create_epoch_millis_field_name = create_epoch_millis_field_name.into();
119        self
120    }
121
122    /// Sets the update_epoch_millis field name
123    pub fn with_update_epoch_millis_field_name<T: Into<String>>(
124        mut self,
125        update_epoch_millis_field_name: T,
126    ) -> Self {
127        self.update_epoch_millis_field_name = update_epoch_millis_field_name.into();
128        self
129    }
130
131    /// Sets the data field name
132    pub fn with_data_field_name<T: Into<String>>(mut self, data_field_name: T) -> Self {
133        self.data_field_name = data_field_name.into();
134        self
135    }
136
137    /// Sets the schema name
138    pub fn with_schema_name<O: Into<Option<String>>>(mut self, schema_name: O) -> Self {
139        self.schema_name = schema_name.into();
140        self
141    }
142
143    /// Sets the id generator
144    pub fn with_id_generator<
145        NewId: IdType,
146        NewDbId: PostgresIdType,
147        T: 'static + IdGenerator<NewId, NewDbId> + Send + Sync,
148    >(
149        self,
150        id_generator: T,
151    ) -> PgC3p0JsonBuilder<NewId, NewDbId> {
152        PgC3p0JsonBuilder {
153            id_generator: Arc::new(id_generator),
154            id_field_name: self.id_field_name,
155            version_field_name: self.version_field_name,
156            create_epoch_millis_field_name: self.create_epoch_millis_field_name,
157            update_epoch_millis_field_name: self.update_epoch_millis_field_name,
158            data_field_name: self.data_field_name,
159            table_name: self.table_name,
160            schema_name: self.schema_name,
161        }
162    }
163
164    /// Builds a PgC3p0Json
165    pub fn build<Data: DataType>(self) -> PgC3p0Json<Id, DbId, Data, DefaultJsonCodec> {
166        self.build_with_codec(DefaultJsonCodec {})
167    }
168
169    /// Builds a PgC3p0Json with the given codec
170    pub fn build_with_codec<Data: DataType, CODEC: JsonCodec<Data>>(
171        self,
172        codec: CODEC,
173    ) -> PgC3p0Json<Id, DbId, Data, CODEC> {
174        PgC3p0Json {
175            phantom_data: std::marker::PhantomData,
176            id_generator: self.id_generator.clone(),
177            codec,
178            queries: build_pg_queries(self),
179        }
180    }
181}
182
183/// A C3p0Json implementation for Postgres
184#[derive(Clone)]
185pub struct PgC3p0Json<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>> {
186    phantom_data: std::marker::PhantomData<Data>,
187    id_generator: Arc<dyn IdGenerator<Id, DbId>>,
188    codec: CODEC,
189    queries: Queries,
190}
191
192impl<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>>
193    PgC3p0Json<Id, DbId, Data, CODEC>
194{
195    /// Returns the Postgres specific queries for this C3p0Json
196    pub fn queries(&self) -> &Queries {
197        &self.queries
198    }
199
200    /// Converts a Postgres row to a Model
201    #[inline]
202    pub fn to_model(&self, row: &Row) -> Result<Model<Id, Data>, Box<dyn std::error::Error>> {
203        to_model(&self.codec, self.id_generator.as_ref(), row, 0, 1, 2, 3, 4)
204    }
205
206    /// Allows the execution of a custom sql query and returns the first entry in the result set.
207    /// For this to work, the sql query:
208    /// - must be a SELECT
209    /// - must declare the ID, VERSION and Data fields in this exact order
210    pub async fn fetch_one_optional_with_sql(
211        &self,
212        tx: &mut PgTx<'_>,
213        sql: &str,
214        params: &[&(dyn ToSql + Sync)],
215    ) -> Result<Option<Model<Id, Data>>, C3p0Error> {
216        tx.fetch_one_optional(sql, params, |row| self.to_model(row))
217            .await
218    }
219
220    /// Allows the execution of a custom sql query and returns the first entry in the result set.
221    /// For this to work, the sql query:
222    /// - must be a SELECT
223    /// - must declare the ID, VERSION and Data fields in this exact order
224    pub async fn fetch_one_with_sql(
225        &self,
226        tx: &mut PgTx<'_>,
227        sql: &str,
228        params: &[&(dyn ToSql + Sync)],
229    ) -> Result<Model<Id, Data>, C3p0Error> {
230        tx.fetch_one(sql, params, |row| self.to_model(row)).await
231    }
232
233    /// Allows the execution of a custom sql query and returns all the entries in the result set.
234    /// For this to work, the sql query:
235    /// - must be a SELECT
236    /// - must declare the ID, VERSION and Data fields in this exact order
237    pub async fn fetch_all_with_sql(
238        &self,
239        tx: &mut PgTx<'_>,
240        sql: &str,
241        params: &[&(dyn ToSql + Sync)],
242    ) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
243        tx.fetch_all(sql, params, |row| self.to_model(row)).await
244    }
245}
246
247impl<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>>
248    C3p0Json<Id, Data, CODEC> for PgC3p0Json<Id, DbId, Data, CODEC>
249{
250    type Tx<'a> = PgTx<'a>;
251
252    fn codec(&self) -> &CODEC {
253        &self.codec
254    }
255
256    async fn create_table_if_not_exists(&self, tx: &mut Self::Tx<'_>) -> Result<(), C3p0Error> {
257        tx.execute(&self.queries.create_table_sql_query, &[])
258            .await?;
259        Ok(())
260    }
261
262    async fn drop_table_if_exists(
263        &self,
264        tx: &mut Self::Tx<'_>,
265        cascade: bool,
266    ) -> Result<(), C3p0Error> {
267        let query = if cascade {
268            &self.queries.drop_table_sql_query_cascade
269        } else {
270            &self.queries.drop_table_sql_query
271        };
272        tx.execute(query, &[]).await?;
273        Ok(())
274    }
275
276    async fn count_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
277        tx.fetch_one_value(&self.queries.count_all_sql_query, &[])
278            .await
279            .map(|val: i64| val as u64)
280    }
281
282    async fn exists_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<bool, C3p0Error> {
283        let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
284        tx.fetch_one_value(&self.queries.exists_by_id_sql_query, &[id.as_ref()])
285            .await
286    }
287
288    async fn fetch_all(&self, tx: &mut Self::Tx<'_>) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
289        tx.fetch_all(&self.queries.find_all_sql_query, &[], |row| {
290            self.to_model(row)
291        })
292        .await
293    }
294
295    async fn fetch_one_optional_by_id(
296        &self,
297        tx: &mut Self::Tx<'_>,
298        id: &Id,
299    ) -> Result<Option<Model<Id, Data>>, C3p0Error> {
300        let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
301        tx.fetch_one_optional(&self.queries.find_by_id_sql_query, &[id.as_ref()], |row| {
302            self.to_model(row)
303        })
304        .await
305    }
306
307    async fn fetch_one_by_id(
308        &self,
309        tx: &mut Self::Tx<'_>,
310        id: &Id,
311    ) -> Result<Model<Id, Data>, C3p0Error> {
312        self.fetch_one_optional_by_id(tx, id)
313            .await
314            .and_then(|result| result.ok_or(C3p0Error::ResultNotFoundError))
315    }
316
317    async fn delete(
318        &self,
319        tx: &mut Self::Tx<'_>,
320        obj: Model<Id, Data>,
321    ) -> Result<Model<Id, Data>, C3p0Error> {
322        let id = self.id_generator.id_to_db_id(Cow::Borrowed(&obj.id))?;
323        let result = tx
324            .execute(
325                &self.queries.delete_sql_query,
326                &[id.as_ref(), &(obj.version as PostgresVersionType)],
327            )
328            .await?;
329
330        if result == 0 {
331            return Err(C3p0Error::OptimisticLockError {
332                cause: format!(
333                    "Cannot delete data in table [{}] with id [{:?}], version [{}]: data was changed!",
334                    &self.queries.qualified_table_name, &obj.id, &obj.version
335                ),
336            });
337        }
338
339        Ok(obj)
340    }
341
342    async fn delete_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
343        tx.execute(&self.queries.delete_all_sql_query, &[]).await
344    }
345
346    async fn delete_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<u64, C3p0Error> {
347        let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
348        tx.execute(&self.queries.delete_by_id_sql_query, &[id.as_ref()])
349            .await
350    }
351
352    async fn save(
353        &self,
354        tx: &mut Self::Tx<'_>,
355        obj: NewModel<Data>,
356    ) -> Result<Model<Id, Data>, C3p0Error> {
357        let json_data = &self.codec.data_to_value(&obj.data)?;
358        let create_epoch_millis = get_current_epoch_millis();
359
360        let id = match self.id_generator.generate_id() {
361            Some(id) => {
362                tx.execute(
363                    &self.queries.save_sql_query_with_id,
364                    &[
365                        &(obj.version as PostgresVersionType),
366                        &create_epoch_millis,
367                        &json_data,
368                        &id,
369                    ],
370                )
371                .await?;
372                id
373            }
374            _ => {
375                tx.fetch_one_value(
376                    &self.queries.save_sql_query,
377                    &[
378                        &(obj.version as PostgresVersionType),
379                        &create_epoch_millis,
380                        &json_data,
381                    ],
382                )
383                .await?
384            }
385        };
386
387        Ok(Model {
388            id: self.id_generator.db_id_to_id(Cow::Owned(id))?.into_owned(),
389            version: obj.version,
390            data: obj.data,
391            create_epoch_millis,
392            update_epoch_millis: create_epoch_millis,
393        })
394    }
395
396    async fn update(
397        &self,
398        tx: &mut Self::Tx<'_>,
399        obj: Model<Id, Data>,
400    ) -> Result<Model<Id, Data>, C3p0Error> {
401        let json_data = &self.codec.data_to_value(&obj.data)?;
402        let previous_version = obj.version;
403        let updated_model = obj.into_new_version(get_current_epoch_millis());
404        let updated_model_id = self
405            .id_generator
406            .id_to_db_id(Cow::Borrowed(&updated_model.id))?;
407        let result = tx
408            .execute(
409                &self.queries.update_sql_query,
410                &[
411                    &(updated_model.version as PostgresVersionType),
412                    &updated_model.update_epoch_millis,
413                    &json_data,
414                    updated_model_id.as_ref(),
415                    &(previous_version as PostgresVersionType),
416                ],
417            )
418            .await?;
419
420        if result == 0 {
421            return Err(C3p0Error::OptimisticLockError {
422                cause: format!(
423                    "Cannot update data in table [{}] with id [{:?}], version [{}]: data was changed!",
424                    &self.queries.qualified_table_name, &updated_model.id, &previous_version
425                ),
426            });
427        }
428
429        Ok(updated_model)
430    }
431}