use super::schema;
use diesel::prelude::*;
use diesel::sqlite::SqliteConnection;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use syncular_runtime::app_schema::DieselTableAdapter;
use syncular_runtime::error::{Result, SyncularError};
use syncular_runtime::protocol::{ScopeValues, SyncChange};
static COMMENTS_ADAPTER: CommentsTableAdapter = CommentsTableAdapter;
static PROJECTS_ADAPTER: ProjectsTableAdapter = ProjectsTableAdapter;
static TASKS_ADAPTER: TasksTableAdapter = TasksTableAdapter;
static TABLE_ADAPTERS: [&dyn DieselTableAdapter; 3] =
[&COMMENTS_ADAPTER, &PROJECTS_ADAPTER, &TASKS_ADAPTER];
pub fn adapter_for(table: &str) -> Result<&'static dyn DieselTableAdapter> {
TABLE_ADAPTERS
.iter()
.copied()
.find(|adapter| adapter.name() == table)
.ok_or_else(|| {
SyncularError::codegen(format!("no Diesel table adapter registered for {table}"))
})
}
#[derive(Debug, Clone, Queryable, Selectable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = schema::comments)]
pub struct CommentRow {
pub id: String,
pub task_id: String,
pub project_id: Option<String>,
pub body: String,
pub author_id: String,
pub deleted: i32,
pub server_version: i64,
}
struct CommentsTableAdapter;
impl DieselTableAdapter for CommentsTableAdapter {
fn name(&self) -> &'static str {
"comments"
}
fn list_rows_json(&self, conn: &mut SqliteConnection) -> Result<Vec<Value>> {
use schema::comments::dsl as t;
let rows: Vec<CommentRow> = t::comments.select(CommentRow::as_select()).load(conn)?;
rows.into_iter()
.map(serde_json::to_value)
.collect::<serde_json::Result<Vec<_>>>()
.map_err(Into::into)
}
fn clear_for_scopes(&self, conn: &mut SqliteConnection, scopes: &ScopeValues) -> Result<()> {
use schema::comments::dsl as t;
let scope_0 = scopes.get("user_id").and_then(Value::as_str);
let scope_1 = scopes.get("project_id").and_then(Value::as_str);
if scope_0.is_some() && scope_1.is_some() {
diesel::delete(
t::comments
.filter(t::author_id.eq(scope_0.expect("scope checked")))
.filter(t::project_id.eq(scope_1.expect("scope checked"))),
)
.execute(conn)?;
} else if scope_0.is_some() {
diesel::delete(t::comments.filter(t::author_id.eq(scope_0.expect("scope checked"))))
.execute(conn)?;
} else {
diesel::delete(t::comments).execute(conn)?;
}
Ok(())
}
fn upsert_row(
&self,
conn: &mut SqliteConnection,
row: &Value,
fallback_version: Option<i64>,
) -> Result<()> {
use schema::comments::dsl as t;
let obj = row.as_object().ok_or_else(|| {
SyncularError::protocol_message(format!("row is not an object: {row}"))
})?;
let row = CommentRow {
id: obj
.get("id")
.and_then(Value::as_str)
.ok_or_else(|| SyncularError::protocol_message("id missing"))?
.to_string(),
task_id: obj
.get("task_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
project_id: obj
.get("project_id")
.and_then(Value::as_str)
.map(str::to_string),
body: obj
.get("body")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
author_id: obj
.get("author_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
deleted: obj.get("deleted").and_then(Value::as_i64).unwrap_or(0) as i32,
server_version: fallback_version
.or_else(|| obj.get("server_version").and_then(Value::as_i64))
.unwrap_or(0),
};
diesel::insert_into(t::comments)
.values(&row)
.on_conflict(t::id)
.do_update()
.set((
t::task_id.eq(&row.task_id),
t::project_id.eq(&row.project_id),
t::body.eq(&row.body),
t::author_id.eq(&row.author_id),
t::deleted.eq(&row.deleted),
t::server_version.eq(&row.server_version),
))
.execute(conn)?;
Ok(())
}
fn apply_change(&self, conn: &mut SqliteConnection, change: &SyncChange) -> Result<()> {
use schema::comments::dsl as t;
if change.table != "comments" {
return Err(SyncularError::codegen(format!(
"adapter cannot apply change for table {}",
change.table
)));
}
if change.op == "delete" {
diesel::delete(t::comments.filter(t::id.eq(&change.row_id))).execute(conn)?;
return Ok(());
}
let row = change.row_json.as_ref().ok_or_else(|| {
SyncularError::protocol_message(format!(
"upsert change missing row_json for {}",
change.row_id
))
})?;
self.upsert_row(conn, row, change.row_version)
}
}
#[derive(Debug, Clone, Queryable, Selectable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = schema::projects)]
pub struct ProjectRow {
pub id: String,
pub name: String,
pub owner_id: String,
pub archived: i32,
pub server_version: i64,
}
struct ProjectsTableAdapter;
impl DieselTableAdapter for ProjectsTableAdapter {
fn name(&self) -> &'static str {
"projects"
}
fn list_rows_json(&self, conn: &mut SqliteConnection) -> Result<Vec<Value>> {
use schema::projects::dsl as t;
let rows: Vec<ProjectRow> = t::projects.select(ProjectRow::as_select()).load(conn)?;
rows.into_iter()
.map(serde_json::to_value)
.collect::<serde_json::Result<Vec<_>>>()
.map_err(Into::into)
}
fn clear_for_scopes(&self, conn: &mut SqliteConnection, scopes: &ScopeValues) -> Result<()> {
use schema::projects::dsl as t;
let scope_0 = scopes.get("user_id").and_then(Value::as_str);
if scope_0.is_some() {
diesel::delete(t::projects.filter(t::owner_id.eq(scope_0.expect("scope checked"))))
.execute(conn)?;
} else {
diesel::delete(t::projects).execute(conn)?;
}
Ok(())
}
fn upsert_row(
&self,
conn: &mut SqliteConnection,
row: &Value,
fallback_version: Option<i64>,
) -> Result<()> {
use schema::projects::dsl as t;
let obj = row.as_object().ok_or_else(|| {
SyncularError::protocol_message(format!("row is not an object: {row}"))
})?;
let row = ProjectRow {
id: obj
.get("id")
.and_then(Value::as_str)
.ok_or_else(|| SyncularError::protocol_message("id missing"))?
.to_string(),
name: obj
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
owner_id: obj
.get("owner_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
archived: obj.get("archived").and_then(Value::as_i64).unwrap_or(0) as i32,
server_version: fallback_version
.or_else(|| obj.get("server_version").and_then(Value::as_i64))
.unwrap_or(0),
};
diesel::insert_into(t::projects)
.values(&row)
.on_conflict(t::id)
.do_update()
.set((
t::name.eq(&row.name),
t::owner_id.eq(&row.owner_id),
t::archived.eq(&row.archived),
t::server_version.eq(&row.server_version),
))
.execute(conn)?;
Ok(())
}
fn apply_change(&self, conn: &mut SqliteConnection, change: &SyncChange) -> Result<()> {
use schema::projects::dsl as t;
if change.table != "projects" {
return Err(SyncularError::codegen(format!(
"adapter cannot apply change for table {}",
change.table
)));
}
if change.op == "delete" {
diesel::delete(t::projects.filter(t::id.eq(&change.row_id))).execute(conn)?;
return Ok(());
}
let row = change.row_json.as_ref().ok_or_else(|| {
SyncularError::protocol_message(format!(
"upsert change missing row_json for {}",
change.row_id
))
})?;
self.upsert_row(conn, row, change.row_version)
}
}
#[derive(Debug, Clone, Queryable, Selectable, Insertable, Serialize, Deserialize)]
#[diesel(table_name = schema::tasks)]
pub struct TaskRow {
pub id: String,
pub title: String,
pub completed: i32,
pub user_id: String,
pub project_id: Option<String>,
pub server_version: i64,
pub image: Option<String>,
pub title_yjs_state: Option<String>,
pub description: Option<String>,
}
struct TasksTableAdapter;
impl DieselTableAdapter for TasksTableAdapter {
fn name(&self) -> &'static str {
"tasks"
}
fn list_rows_json(&self, conn: &mut SqliteConnection) -> Result<Vec<Value>> {
use schema::tasks::dsl as t;
let rows: Vec<TaskRow> = t::tasks.select(TaskRow::as_select()).load(conn)?;
rows.into_iter()
.map(serde_json::to_value)
.collect::<serde_json::Result<Vec<_>>>()
.map_err(Into::into)
}
fn clear_for_scopes(&self, conn: &mut SqliteConnection, scopes: &ScopeValues) -> Result<()> {
use schema::tasks::dsl as t;
let scope_0 = scopes.get("user_id").and_then(Value::as_str);
let scope_1 = scopes.get("project_id").and_then(Value::as_str);
if scope_0.is_some() && scope_1.is_some() {
diesel::delete(
t::tasks
.filter(t::user_id.eq(scope_0.expect("scope checked")))
.filter(t::project_id.eq(scope_1.expect("scope checked"))),
)
.execute(conn)?;
} else if scope_0.is_some() {
diesel::delete(t::tasks.filter(t::user_id.eq(scope_0.expect("scope checked"))))
.execute(conn)?;
} else {
diesel::delete(t::tasks).execute(conn)?;
}
Ok(())
}
fn upsert_row(
&self,
conn: &mut SqliteConnection,
row: &Value,
fallback_version: Option<i64>,
) -> Result<()> {
use schema::tasks::dsl as t;
let obj = row.as_object().ok_or_else(|| {
SyncularError::protocol_message(format!("row is not an object: {row}"))
})?;
let row = TaskRow {
id: obj
.get("id")
.and_then(Value::as_str)
.ok_or_else(|| SyncularError::protocol_message("id missing"))?
.to_string(),
title: obj
.get("title")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
completed: obj.get("completed").and_then(Value::as_i64).unwrap_or(0) as i32,
user_id: obj
.get("user_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
project_id: obj
.get("project_id")
.and_then(Value::as_str)
.map(str::to_string),
server_version: fallback_version
.or_else(|| obj.get("server_version").and_then(Value::as_i64))
.unwrap_or(0),
image: obj.get("image").and_then(|value| match value {
Value::String(value) => Some(value.clone()),
Value::Array(_) | Value::Object(_) => Some(value.to_string()),
_ => None,
}),
title_yjs_state: obj
.get("title_yjs_state")
.and_then(Value::as_str)
.map(str::to_string),
description: obj
.get("description")
.and_then(Value::as_str)
.map(str::to_string),
};
diesel::insert_into(t::tasks)
.values(&row)
.on_conflict(t::id)
.do_update()
.set((
t::title.eq(&row.title),
t::completed.eq(&row.completed),
t::user_id.eq(&row.user_id),
t::project_id.eq(&row.project_id),
t::server_version.eq(&row.server_version),
t::image.eq(&row.image),
t::title_yjs_state.eq(&row.title_yjs_state),
t::description.eq(&row.description),
))
.execute(conn)?;
Ok(())
}
fn apply_change(&self, conn: &mut SqliteConnection, change: &SyncChange) -> Result<()> {
use schema::tasks::dsl as t;
if change.table != "tasks" {
return Err(SyncularError::codegen(format!(
"adapter cannot apply change for table {}",
change.table
)));
}
if change.op == "delete" {
diesel::delete(t::tasks.filter(t::id.eq(&change.row_id))).execute(conn)?;
return Ok(());
}
let row = change.row_json.as_ref().ok_or_else(|| {
SyncularError::protocol_message(format!(
"upsert change missing row_json for {}",
change.row_id
))
})?;
self.upsert_row(conn, row, change.row_version)
}
}