use super::ItemStore;
use crate::error::KumoError;
use async_trait::async_trait;
use sqlx::{AssertSqlSafe, MySqlPool};
pub struct MySqlStore {
pool: MySqlPool,
table: String,
extra_columns: Vec<String>,
}
pub struct MySqlStoreBuilder {
database_url: String,
table: String,
create_table: bool,
extra_columns: Vec<(String, String)>,
}
impl MySqlStore {
pub async fn connect(database_url: &str) -> Result<Self, KumoError> {
Self::builder(database_url).connect().await
}
pub fn builder(database_url: impl Into<String>) -> MySqlStoreBuilder {
MySqlStoreBuilder {
database_url: database_url.into(),
table: "kumo_items".into(),
create_table: true,
extra_columns: Vec::new(),
}
}
}
impl MySqlStoreBuilder {
pub fn table(mut self, name: impl Into<String>) -> Self {
self.table = name.into();
self
}
pub fn create_table(mut self, yes: bool) -> Self {
self.create_table = yes;
self
}
pub fn add_column(
mut self,
name: impl Into<String>,
sql_type: impl Into<String>,
) -> Result<Self, KumoError> {
let name = name.into();
super::validate_table_name(&name)?;
self.extra_columns.push((name, sql_type.into()));
Ok(self)
}
pub async fn connect(self) -> Result<MySqlStore, KumoError> {
super::validate_table_name(&self.table)?;
let pool = MySqlPool::connect(&self.database_url)
.await
.map_err(|e| KumoError::store("mysql store", e))?;
if self.create_table {
let extra = self
.extra_columns
.iter()
.map(|(name, ty)| format!(",\n `{}` {}", name, ty))
.collect::<String>();
let sql = format!(
r#"CREATE TABLE IF NOT EXISTS `{}` (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
data JSON NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP{}
)"#,
self.table, extra
);
sqlx::query(AssertSqlSafe(sql))
.execute(&pool)
.await
.map_err(|e| KumoError::store("mysql store", e))?;
}
Ok(MySqlStore {
pool,
table: self.table,
extra_columns: self.extra_columns.into_iter().map(|(n, _)| n).collect(),
})
}
}
#[async_trait]
impl ItemStore for MySqlStore {
async fn store(&self, item: &serde_json::Value) -> Result<(), KumoError> {
let col_list: String = self
.extra_columns
.iter()
.map(|n| format!(", `{}`", n))
.collect();
let param_list: String = self.extra_columns.iter().map(|_| ", ?").collect();
let sql = format!(
r#"INSERT INTO `{}` (data{}) VALUES (?{})"#,
self.table, col_list, param_list
);
let mut q = sqlx::query(AssertSqlSafe(sql)).bind(item.to_string());
for name in &self.extra_columns {
q = q.bind(super::json_val_to_sql_string(item.get(name)));
}
q.execute(&self.pool)
.await
.map_err(|e| KumoError::store("mysql store", e))?;
Ok(())
}
}