use std::sync::Arc;
use crate::common::{IdGenerator, to_model};
use crate::error::into_c3p0_error;
use crate::mysql::queries::build_mysql_queries;
use crate::mysql::{Db, DbRow};
use c3p0_common::json::Queries;
use c3p0_common::time::utils::get_current_epoch_millis;
use c3p0_common::*;
use sqlx::query::Query;
use sqlx::{Database, IntoArguments, MySqlConnection, Row};
pub trait MySqlIdGenerator<Id: IdType>: IdGenerator<Id, Db = Db, Row = DbRow> {
fn inserted_id_to_id(&self, inserted_id: u64) -> Result<Id, C3p0Error>;
fn upcast(&self) -> &dyn IdGenerator<Id, Db = Db, Row = DbRow>;
}
#[derive(Debug, Clone)]
pub struct MySqlAutogeneratedIdGenerator {}
impl MySqlIdGenerator<u64> for MySqlAutogeneratedIdGenerator {
fn inserted_id_to_id(&self, inserted_id: u64) -> Result<u64, C3p0Error> {
Ok(inserted_id)
}
fn upcast(&self) -> &dyn IdGenerator<u64, Db = Db, Row = DbRow> {
self
}
}
impl IdGenerator<u64> for MySqlAutogeneratedIdGenerator {
type Db = Db;
type Row = DbRow;
fn create_statement_column_type(&self) -> &str {
"BIGINT primary key NOT NULL AUTO_INCREMENT"
}
fn generate_id(&self) -> Option<u64> {
None
}
fn id_from_row(
&self,
row: &Self::Row,
index: &(dyn sqlx::ColumnIndex<Self::Row>),
) -> Result<u64, C3p0Error> {
row.try_get(index)
.map_err(|err| C3p0Error::RowMapperError {
cause: format!("Row contains no values for id index. Err: {:?}", err),
})
.map(|id: i64| id as u64)
}
fn id_to_query<'a>(
&self,
id: &'a u64,
query: Query<'a, Db, <Db as Database>::Arguments<'a>>,
) -> Query<'a, Db, <Db as Database>::Arguments<'a>> {
query.bind(*id as i64)
}
}
#[derive(Debug, Clone)]
pub struct MySqlUuidIdGenerator {}
impl MySqlIdGenerator<uuid::Uuid> for MySqlUuidIdGenerator {
fn inserted_id_to_id(&self, _inserted_id: u64) -> Result<uuid::Uuid, C3p0Error> {
Err(C3p0Error::RowMapperError {
cause: "Cannot convert inserted id to Uuid: Unexpected type".into(),
})
}
fn upcast(&self) -> &dyn IdGenerator<uuid::Uuid, Db = Db, Row = DbRow> {
self
}
}
impl IdGenerator<uuid::Uuid> for MySqlUuidIdGenerator {
type Db = Db;
type Row = DbRow;
fn create_statement_column_type(&self) -> &str {
"binary(16) primary key NOT NULL"
}
fn generate_id(&self) -> Option<uuid::Uuid> {
Some(uuid::Uuid::new_v4())
}
fn id_to_query<'a>(
&self,
id: &'a uuid::Uuid,
query: Query<'a, Db, <Db as Database>::Arguments<'a>>,
) -> Query<'a, Db, <Db as Database>::Arguments<'a>> {
query.bind(id)
}
fn id_from_row(
&self,
row: &Self::Row,
index: &(dyn sqlx::ColumnIndex<Self::Row>),
) -> Result<uuid::Uuid, C3p0Error> {
row.try_get(index).map_err(|err| C3p0Error::RowMapperError {
cause: format!("Row contains no values for id index. Err: {:?}", err),
})
}
}
#[derive(Clone)]
pub struct SqlxMySqlC3p0JsonBuilder<Id: IdType> {
phantom_id: std::marker::PhantomData<Id>,
pub id_generator: Arc<dyn MySqlIdGenerator<Id>>,
pub id_field_name: String,
pub version_field_name: String,
pub create_epoch_millis_field_name: String,
pub update_epoch_millis_field_name: String,
pub data_field_name: String,
pub table_name: String,
pub schema_name: Option<String>,
}
impl SqlxMySqlC3p0JsonBuilder<u64> {
pub fn new<T: Into<String>>(table_name: T) -> Self {
let table_name = table_name.into();
SqlxMySqlC3p0JsonBuilder {
phantom_id: std::marker::PhantomData,
id_generator: Arc::new(MySqlAutogeneratedIdGenerator {}),
table_name,
id_field_name: "id".to_owned(),
version_field_name: "version".to_owned(),
create_epoch_millis_field_name: "create_epoch_millis".to_owned(),
update_epoch_millis_field_name: "update_epoch_millis".to_owned(),
data_field_name: "data".to_owned(),
schema_name: None,
}
}
}
impl<Id: IdType> SqlxMySqlC3p0JsonBuilder<Id> {
pub fn with_id_field_name<T: Into<String>>(mut self, id_field_name: T) -> Self {
self.id_field_name = id_field_name.into();
self
}
pub fn with_version_field_name<T: Into<String>>(mut self, version_field_name: T) -> Self {
self.version_field_name = version_field_name.into();
self
}
pub fn with_create_epoch_millis_field_name<T: Into<String>>(
mut self,
create_epoch_millis_field_name: T,
) -> Self {
self.create_epoch_millis_field_name = create_epoch_millis_field_name.into();
self
}
pub fn with_update_epoch_millis_field_name<T: Into<String>>(
mut self,
update_epoch_millis_field_name: T,
) -> Self {
self.update_epoch_millis_field_name = update_epoch_millis_field_name.into();
self
}
pub fn with_data_field_name<T: Into<String>>(mut self, data_field_name: T) -> Self {
self.data_field_name = data_field_name.into();
self
}
pub fn with_schema_name<O: Into<Option<String>>>(mut self, schema_name: O) -> Self {
self.schema_name = schema_name.into();
self
}
pub fn with_id_generator<NewId: IdType>(
self,
id_generator: Arc<dyn MySqlIdGenerator<NewId>>,
) -> SqlxMySqlC3p0JsonBuilder<NewId> {
SqlxMySqlC3p0JsonBuilder {
phantom_id: std::marker::PhantomData,
id_generator,
id_field_name: self.id_field_name,
version_field_name: self.version_field_name,
create_epoch_millis_field_name: self.create_epoch_millis_field_name,
update_epoch_millis_field_name: self.update_epoch_millis_field_name,
data_field_name: self.data_field_name,
table_name: self.table_name,
schema_name: self.schema_name,
}
}
pub fn build<Data: DataType>(self) -> SqlxMySqlC3p0Json<Id, Data, DefaultJsonCodec> {
self.build_with_codec(DefaultJsonCodec {})
}
pub fn build_with_codec<Data: DataType, CODEC: JsonCodec<Data>>(
self,
codec: CODEC,
) -> SqlxMySqlC3p0Json<Id, Data, CODEC> {
SqlxMySqlC3p0Json {
phantom_data: std::marker::PhantomData,
phantom_id: std::marker::PhantomData,
id_generator: self.id_generator.clone(),
codec,
queries: build_mysql_queries(self),
}
}
}
#[derive(Clone)]
pub struct SqlxMySqlC3p0Json<Id: IdType, Data: DataType, CODEC: JsonCodec<Data>> {
phantom_data: std::marker::PhantomData<Data>,
phantom_id: std::marker::PhantomData<Id>,
id_generator: Arc<dyn MySqlIdGenerator<Id>>,
codec: CODEC,
queries: Queries,
}
impl<Id: IdType, Data: DataType, CODEC: JsonCodec<Data>> SqlxMySqlC3p0Json<Id, Data, CODEC> {
pub fn queries(&self) -> &Queries {
&self.queries
}
pub fn query_with_id<'a>(
&self,
sql: &'a str,
id: &'a Id,
) -> Query<'a, Db, <Db as Database>::Arguments<'a>> {
let query = sqlx::query(sql);
self.id_generator.id_to_query(id, query)
}
#[inline]
pub fn to_model(&self, row: &DbRow) -> Result<Model<Id, Data>, C3p0Error> {
to_model(&self.codec, self.id_generator.upcast(), row)
}
pub async fn fetch_one_optional_with_sql<'a, A: 'a + Send + IntoArguments<'a, Db>>(
&self,
tx: &mut MySqlConnection,
sql: Query<'a, Db, A>,
) -> Result<Option<Model<Id, Data>>, C3p0Error> {
sql.fetch_optional(tx)
.await
.map_err(into_c3p0_error)?
.map(|row| to_model(&self.codec, self.id_generator.upcast(), &row))
.transpose()
}
pub async fn fetch_one_with_sql<'a, A: 'a + Send + IntoArguments<'a, Db>>(
&self,
tx: &mut MySqlConnection,
sql: Query<'a, Db, A>,
) -> Result<Model<Id, Data>, C3p0Error> {
sql.fetch_one(tx)
.await
.map_err(into_c3p0_error)
.and_then(|row| to_model(&self.codec, self.id_generator.upcast(), &row))
}
pub async fn fetch_all_with_sql<'a, A: 'a + Send + IntoArguments<'a, Db>>(
&self,
tx: &mut MySqlConnection,
sql: Query<'a, Db, A>,
) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
sql.fetch_all(tx)
.await
.map_err(into_c3p0_error)?
.iter()
.map(|row| to_model(&self.codec, self.id_generator.upcast(), row))
.collect::<Result<Vec<_>, C3p0Error>>()
}
}
impl<Id: IdType, Data: DataType, CODEC: JsonCodec<Data>> C3p0Json<Id, Data, CODEC>
for SqlxMySqlC3p0Json<Id, Data, CODEC>
{
type Tx<'a> = MySqlConnection;
fn codec(&self) -> &CODEC {
&self.codec
}
async fn create_table_if_not_exists(&self, tx: &mut Self::Tx<'_>) -> Result<(), C3p0Error> {
sqlx::query(&self.queries.create_table_sql_query)
.execute(tx)
.await
.map_err(into_c3p0_error)
.map(|_| ())
}
async fn drop_table_if_exists(
&self,
tx: &mut Self::Tx<'_>,
cascade: bool,
) -> Result<(), C3p0Error> {
let query = if cascade {
&self.queries.drop_table_sql_query_cascade
} else {
&self.queries.drop_table_sql_query
};
sqlx::query(query)
.execute(tx)
.await
.map_err(into_c3p0_error)
.map(|_| ())
}
async fn count_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
sqlx::query(&self.queries.count_all_sql_query)
.fetch_one(tx)
.await
.and_then(|row| row.try_get(0))
.map_err(into_c3p0_error)
.map(|val: i64| val as u64)
}
async fn exists_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<bool, C3p0Error> {
self.query_with_id(&self.queries.exists_by_id_sql_query, id)
.fetch_one(tx)
.await
.and_then(|row| row.try_get(0))
.map_err(into_c3p0_error)
}
async fn fetch_all(&self, tx: &mut Self::Tx<'_>) -> Result<Vec<Model<Id, Data>>, C3p0Error> {
self.fetch_all_with_sql(tx, sqlx::query(&self.queries.find_all_sql_query))
.await
}
async fn fetch_one_optional_by_id(
&self,
tx: &mut Self::Tx<'_>,
id: &Id,
) -> Result<Option<Model<Id, Data>>, C3p0Error> {
let query = self.query_with_id(&self.queries.find_by_id_sql_query, id);
self.fetch_one_optional_with_sql(tx, query).await
}
async fn fetch_one_by_id(
&self,
tx: &mut Self::Tx<'_>,
id: &Id,
) -> Result<Model<Id, Data>, C3p0Error> {
let query = self.query_with_id(&self.queries.find_by_id_sql_query, id);
self.fetch_one_with_sql(tx, query).await
}
async fn delete(
&self,
tx: &mut Self::Tx<'_>,
obj: Model<Id, Data>,
) -> Result<Model<Id, Data>, C3p0Error> {
let result = self
.query_with_id(&self.queries.delete_sql_query, &obj.id)
.bind(obj.version)
.execute(tx)
.await
.map_err(into_c3p0_error)?
.rows_affected();
if result == 0 {
return Err(C3p0Error::OptimisticLockError {
cause: format!(
"Cannot delete data in table [{}] with id [{:?}], version [{}]: data was changed!",
&self.queries.qualified_table_name, &obj.id, &obj.version
),
});
}
Ok(obj)
}
async fn delete_all(&self, tx: &mut Self::Tx<'_>) -> Result<u64, C3p0Error> {
sqlx::query(&self.queries.delete_all_sql_query)
.execute(tx)
.await
.map_err(into_c3p0_error)
.map(|done| done.rows_affected())
}
async fn delete_by_id(&self, tx: &mut Self::Tx<'_>, id: &Id) -> Result<u64, C3p0Error> {
self.query_with_id(&self.queries.delete_by_id_sql_query, id)
.execute(tx)
.await
.map_err(into_c3p0_error)
.map(|done| done.rows_affected())
}
async fn save(
&self,
tx: &mut Self::Tx<'_>,
obj: NewModel<Data>,
) -> Result<Model<Id, Data>, C3p0Error> {
let json_data = &self.codec.data_to_value(&obj.data)?;
let create_epoch_millis = get_current_epoch_millis();
let id = if let Some(id) = self.id_generator.generate_id() {
let query = sqlx::query(&self.queries.save_sql_query_with_id)
.bind(obj.version)
.bind(create_epoch_millis)
.bind(create_epoch_millis)
.bind(json_data);
self.id_generator
.id_to_query(&id, query)
.execute(tx)
.await
.map_err(into_c3p0_error)?;
id
} else {
let id = sqlx::query(&self.queries.save_sql_query)
.bind(obj.version)
.bind(create_epoch_millis)
.bind(create_epoch_millis)
.bind(json_data)
.execute(tx)
.await
.map(|done| done.last_insert_id())
.map_err(into_c3p0_error)?;
self.id_generator.inserted_id_to_id(id)?
};
Ok(Model {
id,
version: obj.version,
data: obj.data,
create_epoch_millis,
update_epoch_millis: create_epoch_millis,
})
}
async fn update(
&self,
tx: &mut Self::Tx<'_>,
obj: Model<Id, Data>,
) -> Result<Model<Id, Data>, C3p0Error> {
let json_data = self.codec.data_to_value(&obj.data)?;
let previous_version = obj.version;
let updated_model = obj.into_new_version(get_current_epoch_millis());
let result = {
let query = sqlx::query(&self.queries.update_sql_query)
.bind(updated_model.version)
.bind(updated_model.update_epoch_millis)
.bind(json_data);
self.id_generator
.id_to_query(&updated_model.id, query)
.bind(previous_version)
.execute(tx)
.await
.map_err(into_c3p0_error)
.map(|done| done.rows_affected())?
};
if result == 0 {
return Err(C3p0Error::OptimisticLockError {
cause: format!(
"Cannot update data in table [{}] with id [{:?}], version [{}]: data was changed!",
self.queries.qualified_table_name, updated_model.id, &previous_version
),
});
}
Ok(updated_model)
}
}