1use std::borrow::Cow;
2use std::sync::Arc;
3
4use crate::tokio_postgres::{row::Row, types::ToSql};
5use crate::*;
6use ::tokio_postgres::types::FromSqlOwned;
7use c3p0_common::json::Queries;
8use c3p0_common::time::utils::get_current_epoch_millis;
9use c3p0_common::*;
10
11pub trait PostgresIdType: IdType + FromSqlOwned + ToSql {}
12impl<T: IdType + FromSqlOwned + ToSql> PostgresIdType for T {}
13
14pub type PostgresVersionType = i32;
15
16pub trait IdGenerator<Id: IdType, DbId: PostgresIdType>: Send + Sync {
18 fn create_statement_column_type(&self) -> &str;
20 fn generate_id(&self) -> Option<DbId>;
22 fn id_to_db_id<'a>(&self, id: Cow<'a, Id>) -> Result<Cow<'a, DbId>, C3p0Error>;
24 fn db_id_to_id<'a>(&self, id: Cow<'a, DbId>) -> Result<Cow<'a, Id>, C3p0Error>;
26}
27
28pub struct AutogeneratedIdGenerator {}
30
31impl IdGenerator<u64, i64> for AutogeneratedIdGenerator {
32 fn create_statement_column_type(&self) -> &str {
33 "bigserial"
34 }
35
36 fn generate_id(&self) -> Option<i64> {
37 None
38 }
39
40 fn id_to_db_id<'a>(&self, id: Cow<'a, u64>) -> Result<Cow<'a, i64>, C3p0Error> {
41 Ok(Cow::Owned(id.into_owned() as i64))
42 }
43
44 fn db_id_to_id<'a>(&self, id: Cow<'a, i64>) -> Result<Cow<'a, u64>, C3p0Error> {
45 Ok(Cow::Owned(id.into_owned() as u64))
46 }
47}
48
49pub struct UuidIdGenerator {}
51
52impl IdGenerator<uuid::Uuid, uuid::Uuid> for UuidIdGenerator {
53 fn create_statement_column_type(&self) -> &str {
54 "uuid"
55 }
56
57 fn generate_id(&self) -> Option<uuid::Uuid> {
58 Some(uuid::Uuid::new_v4())
59 }
60
61 fn id_to_db_id<'a>(&self, id: Cow<'a, uuid::Uuid>) -> Result<Cow<'a, uuid::Uuid>, C3p0Error> {
62 Ok(id)
63 }
64
65 fn db_id_to_id<'a>(&self, id: Cow<'a, uuid::Uuid>) -> Result<Cow<'a, uuid::Uuid>, C3p0Error> {
66 Ok(id)
67 }
68}
69
70#[derive(Clone)]
72pub struct PgC3p0JsonBuilder<Id: IdType, DbId: PostgresIdType> {
73 pub id_generator: Arc<dyn IdGenerator<Id, DbId>>,
74 pub id_field_name: String,
75 pub version_field_name: String,
76 pub create_epoch_millis_field_name: String,
77 pub update_epoch_millis_field_name: String,
78 pub data_field_name: String,
79 pub table_name: String,
80 pub schema_name: Option<String>,
81}
82
83impl PgC3p0JsonBuilder<u64, i64> {
84 pub fn new<T: Into<String>>(table_name: T) -> Self {
86 let table_name = table_name.into();
87 PgC3p0JsonBuilder {
88 id_generator: Arc::new(AutogeneratedIdGenerator {}),
89 table_name,
90 id_field_name: "id".to_owned(),
91 version_field_name: "version".to_owned(),
92 create_epoch_millis_field_name: "create_epoch_millis".to_owned(),
93 update_epoch_millis_field_name: "update_epoch_millis".to_owned(),
94 data_field_name: "data".to_owned(),
95 schema_name: None,
96 }
97 }
98}
99
100impl<Id: IdType, DbId: PostgresIdType> PgC3p0JsonBuilder<Id, DbId> {
101 pub fn with_id_field_name<T: Into<String>>(mut self, id_field_name: T) -> Self {
103 self.id_field_name = id_field_name.into();
104 self
105 }
106
107 pub fn with_version_field_name<T: Into<String>>(mut self, version_field_name: T) -> Self {
109 self.version_field_name = version_field_name.into();
110 self
111 }
112
113 pub fn with_create_epoch_millis_field_name<T: Into<String>>(
115 mut self,
116 create_epoch_millis_field_name: T,
117 ) -> Self {
118 self.create_epoch_millis_field_name = create_epoch_millis_field_name.into();
119 self
120 }
121
122 pub fn with_update_epoch_millis_field_name<T: Into<String>>(
124 mut self,
125 update_epoch_millis_field_name: T,
126 ) -> Self {
127 self.update_epoch_millis_field_name = update_epoch_millis_field_name.into();
128 self
129 }
130
131 pub fn with_data_field_name<T: Into<String>>(mut self, data_field_name: T) -> Self {
133 self.data_field_name = data_field_name.into();
134 self
135 }
136
137 pub fn with_schema_name<O: Into<Option<String>>>(mut self, schema_name: O) -> Self {
139 self.schema_name = schema_name.into();
140 self
141 }
142
143 pub fn with_id_generator<
145 NewId: IdType,
146 NewDbId: PostgresIdType,
147 T: 'static + IdGenerator<NewId, NewDbId> + Send + Sync,
148 >(
149 self,
150 id_generator: T,
151 ) -> PgC3p0JsonBuilder<NewId, NewDbId> {
152 PgC3p0JsonBuilder {
153 id_generator: Arc::new(id_generator),
154 id_field_name: self.id_field_name,
155 version_field_name: self.version_field_name,
156 create_epoch_millis_field_name: self.create_epoch_millis_field_name,
157 update_epoch_millis_field_name: self.update_epoch_millis_field_name,
158 data_field_name: self.data_field_name,
159 table_name: self.table_name,
160 schema_name: self.schema_name,
161 }
162 }
163
164 pub fn build<Data: DataType>(self) -> PgC3p0Json<Id, DbId, Data, DefaultJsonCodec> {
166 self.build_with_codec(DefaultJsonCodec {})
167 }
168
169 pub fn build_with_codec<Data: DataType, CODEC: JsonCodec<Data>>(
171 self,
172 codec: CODEC,
173 ) -> PgC3p0Json<Id, DbId, Data, CODEC> {
174 PgC3p0Json {
175 phantom_data: std::marker::PhantomData,
176 id_generator: self.id_generator.clone(),
177 codec,
178 queries: build_pg_queries(self),
179 }
180 }
181}
182
183#[derive(Clone)]
185pub struct PgC3p0Json<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>> {
186 phantom_data: std::marker::PhantomData<Data>,
187 id_generator: Arc<dyn IdGenerator<Id, DbId>>,
188 codec: CODEC,
189 queries: Queries,
190}
191
192impl<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>>
193 PgC3p0Json<Id, DbId, Data, CODEC>
194{
195 pub fn queries(&self) -> &Queries {
197 &self.queries
198 }
199
200 #[inline]
202 pub fn to_model(&self, row: &Row) -> Result<Model<Id, Data>, Box<dyn std::error::Error>> {
203 to_model(&self.codec, self.id_generator.as_ref(), row, 0, 1, 2, 3, 4)
204 }
205
206 pub async fn fetch_one_optional_with_sql(
211 &self,
212 tx: &mut PgTx<'_>,
213 sql: &str,
214 params: &[&(dyn ToSql + Sync)],
215 ) -> Result<Option<Model<Id, Data>>, C3p0Error> {
216 tx.fetch_one_optional(sql, params, |row| self.to_model(row))
217 .await
218 }
219
220 pub async fn fetch_one_with_sql(
225 &self,
226 tx: &mut PgTx<'_>,
227 sql: &str,
228 params: &[&(dyn ToSql + Sync)],
229 ) -> Result<Model<Id, Data>, C3p0Error> {
230 tx.fetch_one(sql, params, |row| self.to_model(row)).await
231 }
232
233 pub async fn fetch_all_with_sql(
238 &self,
239 tx: &mut PgTx<'_>,
240 sql: &str,
241 params: &[&(dyn ToSql + Sync)],
242 ) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
243 tx.fetch_all(sql, params, |row| self.to_model(row)).await
244 }
245}
246
247impl<Id: IdType, DbId: PostgresIdType, Data: DataType, CODEC: JsonCodec<Data>>
248 C3p0Json<Id, Data, CODEC> for PgC3p0Json<Id, DbId, Data, CODEC>
249{
250 type Tx<'a> = PgTx<'a>;
251
252 fn codec(&self) -> &CODEC {
253 &self.codec
254 }
255
256 async fn create_table_if_not_exists(&self, tx: &mut Self::Tx<'_>) -> Result<(), C3p0Error> {
257 tx.execute(&self.queries.create_table_sql_query, &[])
258 .await?;
259 Ok(())
260 }
261
262 async fn drop_table_if_exists(
263 &self,
264 tx: &mut Self::Tx<'_>,
265 cascade: bool,
266 ) -> Result<(), C3p0Error> {
267 let query = if cascade {
268 &self.queries.drop_table_sql_query_cascade
269 } else {
270 &self.queries.drop_table_sql_query
271 };
272 tx.execute(query, &[]).await?;
273 Ok(())
274 }
275
276 async fn count_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
277 tx.fetch_one_value(&self.queries.count_all_sql_query, &[])
278 .await
279 .map(|val: i64| val as u64)
280 }
281
282 async fn exists_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<bool, C3p0Error> {
283 let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
284 tx.fetch_one_value(&self.queries.exists_by_id_sql_query, &[id.as_ref()])
285 .await
286 }
287
288 async fn fetch_all(&self, tx: &mut Self::Tx<'_>) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
289 tx.fetch_all(&self.queries.find_all_sql_query, &[], |row| {
290 self.to_model(row)
291 })
292 .await
293 }
294
295 async fn fetch_one_optional_by_id(
296 &self,
297 tx: &mut Self::Tx<'_>,
298 id: &Id,
299 ) -> Result<Option<Model<Id, Data>>, C3p0Error> {
300 let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
301 tx.fetch_one_optional(&self.queries.find_by_id_sql_query, &[id.as_ref()], |row| {
302 self.to_model(row)
303 })
304 .await
305 }
306
307 async fn fetch_one_by_id(
308 &self,
309 tx: &mut Self::Tx<'_>,
310 id: &Id,
311 ) -> Result<Model<Id, Data>, C3p0Error> {
312 self.fetch_one_optional_by_id(tx, id)
313 .await
314 .and_then(|result| result.ok_or(C3p0Error::ResultNotFoundError))
315 }
316
317 async fn delete(
318 &self,
319 tx: &mut Self::Tx<'_>,
320 obj: Model<Id, Data>,
321 ) -> Result<Model<Id, Data>, C3p0Error> {
322 let id = self.id_generator.id_to_db_id(Cow::Borrowed(&obj.id))?;
323 let result = tx
324 .execute(
325 &self.queries.delete_sql_query,
326 &[id.as_ref(), &(obj.version as PostgresVersionType)],
327 )
328 .await?;
329
330 if result == 0 {
331 return Err(C3p0Error::OptimisticLockError {
332 cause: format!(
333 "Cannot delete data in table [{}] with id [{:?}], version [{}]: data was changed!",
334 &self.queries.qualified_table_name, &obj.id, &obj.version
335 ),
336 });
337 }
338
339 Ok(obj)
340 }
341
342 async fn delete_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
343 tx.execute(&self.queries.delete_all_sql_query, &[]).await
344 }
345
346 async fn delete_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<u64, C3p0Error> {
347 let id = self.id_generator.id_to_db_id(Cow::Borrowed(id))?;
348 tx.execute(&self.queries.delete_by_id_sql_query, &[id.as_ref()])
349 .await
350 }
351
352 async fn save(
353 &self,
354 tx: &mut Self::Tx<'_>,
355 obj: NewModel<Data>,
356 ) -> Result<Model<Id, Data>, C3p0Error> {
357 let json_data = &self.codec.data_to_value(&obj.data)?;
358 let create_epoch_millis = get_current_epoch_millis();
359
360 let id = match self.id_generator.generate_id() {
361 Some(id) => {
362 tx.execute(
363 &self.queries.save_sql_query_with_id,
364 &[
365 &(obj.version as PostgresVersionType),
366 &create_epoch_millis,
367 &json_data,
368 &id,
369 ],
370 )
371 .await?;
372 id
373 }
374 _ => {
375 tx.fetch_one_value(
376 &self.queries.save_sql_query,
377 &[
378 &(obj.version as PostgresVersionType),
379 &create_epoch_millis,
380 &json_data,
381 ],
382 )
383 .await?
384 }
385 };
386
387 Ok(Model {
388 id: self.id_generator.db_id_to_id(Cow::Owned(id))?.into_owned(),
389 version: obj.version,
390 data: obj.data,
391 create_epoch_millis,
392 update_epoch_millis: create_epoch_millis,
393 })
394 }
395
396 async fn update(
397 &self,
398 tx: &mut Self::Tx<'_>,
399 obj: Model<Id, Data>,
400 ) -> Result<Model<Id, Data>, C3p0Error> {
401 let json_data = &self.codec.data_to_value(&obj.data)?;
402 let previous_version = obj.version;
403 let updated_model = obj.into_new_version(get_current_epoch_millis());
404 let updated_model_id = self
405 .id_generator
406 .id_to_db_id(Cow::Borrowed(&updated_model.id))?;
407 let result = tx
408 .execute(
409 &self.queries.update_sql_query,
410 &[
411 &(updated_model.version as PostgresVersionType),
412 &updated_model.update_epoch_millis,
413 &json_data,
414 updated_model_id.as_ref(),
415 &(previous_version as PostgresVersionType),
416 ],
417 )
418 .await?;
419
420 if result == 0 {
421 return Err(C3p0Error::OptimisticLockError {
422 cause: format!(
423 "Cannot update data in table [{}] with id [{:?}], version [{}]: data was changed!",
424 &self.queries.qualified_table_name, &updated_model.id, &previous_version
425 ),
426 });
427 }
428
429 Ok(updated_model)
430 }
431}