use c3p0_common::error::C3p0Error;
use c3p0_common::json::{
codec::JsonCodec, model::IdType, model::Model, model::NewModel, C3p0Json, Queries,
};
use crate::rusqlite::{
types::{FromSql, ToSql},
Row,
};
use crate::{C3p0PoolSqlite, SqliteConnection};
use c3p0_common::json::builder::C3p0JsonBuilder;
use c3p0_common::json::codec::DefaultJsonCodec;
pub trait C3p0JsonBuilderSqlite {
fn build<DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned>(
self,
) -> C3p0JsonSqlite<DATA, DefaultJsonCodec>;
fn build_with_codec<
DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned,
CODEC: JsonCodec<DATA>,
>(
self,
codec: CODEC,
) -> C3p0JsonSqlite<DATA, CODEC>;
}
impl C3p0JsonBuilderSqlite for C3p0JsonBuilder<C3p0PoolSqlite> {
fn build<DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned>(
self,
) -> C3p0JsonSqlite<DATA, DefaultJsonCodec> {
self.build_with_codec(DefaultJsonCodec {})
}
fn build_with_codec<
DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned,
CODEC: JsonCodec<DATA>,
>(
self,
codec: CODEC,
) -> C3p0JsonSqlite<DATA, CODEC> {
let qualified_table_name = match &self.schema_name {
Some(schema_name) => format!(r#"{}."{}""#, schema_name, self.table_name),
None => self.table_name.clone(),
};
C3p0JsonSqlite {
phantom_data: std::marker::PhantomData,
codec,
queries: Queries {
count_all_sql_query: format!("SELECT COUNT(*) FROM {}", qualified_table_name,),
exists_by_id_sql_query: format!(
"SELECT EXISTS (SELECT 1 FROM {} WHERE {} = ?)",
qualified_table_name, self.id_field_name,
),
find_all_sql_query: format!(
"SELECT {}, {}, {} FROM {} ORDER BY {} ASC",
self.id_field_name,
self.version_field_name,
self.data_field_name,
qualified_table_name,
self.id_field_name,
),
find_by_id_sql_query: format!(
"SELECT {}, {}, {} FROM {} WHERE {} = ? LIMIT 1",
self.id_field_name,
self.version_field_name,
self.data_field_name,
qualified_table_name,
self.id_field_name,
),
delete_sql_query: format!(
"DELETE FROM {} WHERE {} = ? AND {} = ?",
qualified_table_name, self.id_field_name, self.version_field_name,
),
delete_all_sql_query: format!("DELETE FROM {}", qualified_table_name,),
delete_by_id_sql_query: format!(
"DELETE FROM {} WHERE {} = ?",
qualified_table_name, self.id_field_name,
),
save_sql_query: format!(
"INSERT INTO {} ({}, {}) VALUES (?, ?)",
qualified_table_name, self.version_field_name, self.data_field_name
),
update_sql_query: format!(
"UPDATE {} SET {} = ?, {} = ? WHERE {} = ? AND {} = ?",
qualified_table_name,
self.version_field_name,
self.data_field_name,
self.id_field_name,
self.version_field_name,
),
create_table_sql_query: format!(
r#"
CREATE TABLE IF NOT EXISTS {} (
{} integer primary key autoincrement,
{} integer not null,
{} JSON
)
"#,
qualified_table_name,
self.id_field_name,
self.version_field_name,
self.data_field_name
),
drop_table_sql_query: format!("DROP TABLE IF EXISTS {}", qualified_table_name),
lock_table_sql_query: None,
qualified_table_name,
table_name: self.table_name,
id_field_name: self.id_field_name,
version_field_name: self.version_field_name,
data_field_name: self.data_field_name,
schema_name: self.schema_name,
},
}
}
}
#[derive(Clone)]
pub struct C3p0JsonSqlite<DATA, CODEC: JsonCodec<DATA>>
where
DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned,
{
phantom_data: std::marker::PhantomData<DATA>,
codec: CODEC,
queries: Queries,
}
impl<DATA, CODEC: JsonCodec<DATA>> C3p0JsonSqlite<DATA, CODEC>
where
DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned,
{
pub fn to_model(&self, row: &Row) -> Result<Model<DATA>, Box<std::error::Error>> {
let id = get_or_error(&row, 0)?;
let version = get_or_error(&row, 1)?;
let data = self.codec.from_value(get_or_error(&row, 2)?)?;
Ok(Model { id, version, data })
}
pub fn fetch_one_with_sql(
&self,
conn: &SqliteConnection,
sql: &str,
params: &[&ToSql],
) -> Result<Option<Model<DATA>>, C3p0Error> {
conn.fetch_one_option(sql, params, |row| self.to_model(row))
}
pub fn fetch_all_with_sql(
&self,
conn: &SqliteConnection,
sql: &str,
params: &[&ToSql],
) -> Result<Vec<Model<DATA>>, C3p0Error> {
conn.fetch_all(sql, params, |row| self.to_model(row))
}
}
impl<DATA, CODEC: JsonCodec<DATA>> C3p0Json<DATA, CODEC> for C3p0JsonSqlite<DATA, CODEC>
where
DATA: Clone + serde::ser::Serialize + serde::de::DeserializeOwned,
{
type CONNECTION = SqliteConnection;
fn codec(&self) -> &CODEC {
&self.codec
}
fn queries(&self) -> &Queries {
&self.queries
}
fn create_table_if_not_exists(&self, conn: &SqliteConnection) -> Result<(), C3p0Error> {
conn.execute(&self.queries.create_table_sql_query, &[])?;
Ok(())
}
fn drop_table_if_exists(&self, conn: &SqliteConnection) -> Result<(), C3p0Error> {
conn.execute(&self.queries.drop_table_sql_query, &[])?;
Ok(())
}
fn count_all(&self, conn: &SqliteConnection) -> Result<i64, C3p0Error> {
conn.fetch_one_value(&self.queries.count_all_sql_query, &[])
}
fn exists_by_id<'a, ID: Into<&'a IdType>>(
&self,
conn: &SqliteConnection,
id: ID,
) -> Result<bool, C3p0Error> {
conn.fetch_one_value(&self.queries.exists_by_id_sql_query, &[&id.into()])
}
fn fetch_all(&self, conn: &SqliteConnection) -> Result<Vec<Model<DATA>>, C3p0Error> {
conn.fetch_all(&self.queries.find_all_sql_query, &[], |row| {
self.to_model(row)
})
}
fn fetch_one_by_id<'a, ID: Into<&'a IdType>>(
&self,
conn: &SqliteConnection,
id: ID,
) -> Result<Option<Model<DATA>>, C3p0Error> {
conn.fetch_one_option(&self.queries.find_by_id_sql_query, &[&id.into()], |row| {
self.to_model(row)
})
}
fn delete(&self, conn: &SqliteConnection, obj: &Model<DATA>) -> Result<u64, C3p0Error> {
let result = conn.execute(&self.queries.delete_sql_query, &[&obj.id, &obj.version])?;
if result == 0 {
return Err(C3p0Error::OptimisticLockError{ message: format!("Cannot update data in table [{}] with id [{}], version [{}]: data was changed!",
&self.queries.qualified_table_name, &obj.id, &obj.version
)});
}
Ok(result)
}
fn delete_all(&self, conn: &SqliteConnection) -> Result<u64, C3p0Error> {
conn.execute(&self.queries.delete_all_sql_query, &[])
}
fn delete_by_id<'a, ID: Into<&'a IdType>>(
&self,
conn: &SqliteConnection,
id: ID,
) -> Result<u64, C3p0Error> {
conn.execute(&self.queries.delete_by_id_sql_query, &[id.into()])
}
fn save(&self, conn: &SqliteConnection, obj: NewModel<DATA>) -> Result<Model<DATA>, C3p0Error> {
let json_data = self.codec.to_value(&obj.data)?;
{
conn.execute(&self.queries.save_sql_query, &[&obj.version, &json_data])?;
}
let id = { conn.fetch_one_value("SELECT last_insert_rowid()", &[])? };
Ok(Model {
id,
version: obj.version,
data: obj.data,
})
}
fn update(&self, conn: &SqliteConnection, obj: Model<DATA>) -> Result<Model<DATA>, C3p0Error> {
let json_data = self.codec().to_value(&obj.data)?;
let updated_model = Model {
id: obj.id,
version: obj.version + 1,
data: obj.data,
};
let result = conn.execute(
&self.queries.update_sql_query,
&[
&updated_model.version,
&json_data,
&updated_model.id,
&obj.version,
],
)?;
if result == 0 {
return Err(C3p0Error::OptimisticLockError{ message: format!("Cannot update data in table [{}] with id [{}], version [{}]: data was changed!",
&self.queries.qualified_table_name, &updated_model.id, &obj.version
)});
}
Ok(updated_model)
}
}
fn get_or_error<T: FromSql>(row: &Row, index: usize) -> Result<T, C3p0Error> {
row.get(index).map_err(|err| C3p0Error::SqlError {
cause: format!("Row contains no values for index {}. Err: {}", index, err),
})
}