use crate::{
ActivityDefinition, MyError,
data::{Activity, Canonical, Format},
db::{
RowID,
schema::{TActivity, TObjActivity},
},
emit_db_error,
};
use iri_string::types::IriStr;
use sqlx::{PgPool, types::Json};
use std::mem;
use tracing::debug;
const FIND: &str = r#"SELECT * FROM activity WHERE id = $1"#;
pub(crate) async fn find_activity(
conn: &PgPool,
id: i32,
format: &Format,
) -> Result<Activity, MyError> {
match sqlx::query_as::<_, TActivity>(FIND)
.bind(id)
.fetch_one(conn)
.await
{
Ok(x) => build_activity(x, format),
Err(x) => emit_db_error!(x, "Failed finding Activity #{}", id),
}
}
const FIND_BY_IRI: &str = r#"SELECT * FROM activity WHERE iri = $1"#;
pub(crate) async fn find_activity_by_iri(
conn: &PgPool,
iri: &IriStr,
format: &Format,
) -> Result<Option<Activity>, MyError> {
match sqlx::query_as::<_, TActivity>(FIND_BY_IRI)
.bind(iri.normalize().to_string().as_str())
.fetch_one(conn)
.await
{
Ok(x) => Ok(Some(build_activity(x, format)?)),
Err(x) => match x {
sqlx::Error::RowNotFound => Ok(None),
x => emit_db_error!(x, "Failed finding Activity ({})", iri),
},
}
}
const FIND_ID: &str = r#"SELECT id FROM activity WHERE iri = $1"#;
pub(crate) async fn find_activity_id(conn: &PgPool, iri: &IriStr) -> Result<Option<i32>, MyError> {
match sqlx::query_as::<_, RowID>(FIND_ID)
.bind(iri.normalize().to_string().as_str())
.fetch_one(conn)
.await
{
Ok(x) => Ok(Some(x.0)),
Err(x) => match x {
sqlx::Error::RowNotFound => Ok(None),
x => emit_db_error!(x, "Failed finding row # of Activity ({})", iri),
},
}
}
const INSERT_IRI: &str = r#"INSERT INTO activity (iri) VALUES ($1)
ON CONFLICT (iri) DO UPDATE SET iri = EXCLUDED.iri
RETURNING id"#;
pub(crate) async fn insert_activity_iri(conn: &PgPool, iri: &IriStr) -> Result<i32, MyError> {
match sqlx::query_as::<_, RowID>(INSERT_IRI)
.bind(iri.as_str())
.fetch_one(conn)
.await
{
Ok(x) => Ok(x.0),
Err(x) => emit_db_error!(x, "Failed inserting Activity ({})", iri),
}
}
const UPDATE: &str = r#"UPDATE activity SET definition = $2 WHERE id = $1"#;
const INSERT: &str = r#"INSERT INTO activity (iri, definition) VALUES ($1, $2) RETURNING id"#;
pub(crate) async fn insert_activity(conn: &PgPool, activity: &Activity) -> Result<i32, MyError> {
debug!("activity = {}", activity);
if activity.definition().is_none() {
insert_activity_iri(conn, activity.id()).await
} else {
let new_definition = activity.definition().unwrap().to_owned();
match sqlx::query_as::<_, TActivity>(FIND_BY_IRI)
.bind(activity.id_as_str())
.fetch_one(conn)
.await
{
Ok(row) => {
debug!("row = {:?}", row);
let activity_id = row.id;
let merged_definition = if let Some(z_definition) = row.definition {
let mut old_definition = z_definition.0;
let mut merged = mem::take(&mut old_definition);
merged.merge(new_definition);
merged
} else {
new_definition
};
debug!("merged_definition = {}", merged_definition);
match update_definition(conn, activity_id, &merged_definition).await {
Ok(_) => Ok(activity_id),
Err(x) => Err(x),
}
}
Err(x) => match x {
sqlx::Error::RowNotFound => {
match sqlx::query_as::<_, RowID>(INSERT)
.bind(activity.id_as_str())
.bind(Json(new_definition))
.fetch_one(conn)
.await
{
Ok(x) => Ok(x.0),
Err(x) => emit_db_error!(x, "Failed inserting Activity"),
}
}
x => emit_db_error!(x, "Failed finding Activity"),
},
}
}
}
async fn update_definition(conn: &PgPool, id: i32, ad: &ActivityDefinition) -> Result<(), MyError> {
match sqlx::query(UPDATE)
.bind(id)
.bind(Json(ad))
.execute(conn)
.await
{
Ok(_) => Ok(()),
Err(x) => emit_db_error!(x, "Failed updating ActivityDefinition ({})", ad),
}
}
const FIND_OBJECT: &str = r#"SELECT * FROM obj_activity WHERE statement_id = $1"#;
pub(crate) async fn find_obj_activity(
conn: &PgPool,
sid: i32,
format: &Format,
) -> Result<Activity, MyError> {
match sqlx::query_as::<_, TObjActivity>(FIND_OBJECT)
.bind(sid)
.fetch_one(conn)
.await
{
Ok(x) => Ok(find_activity(conn, x.activity_id, format).await?),
Err(x) => emit_db_error!(x, "Failed finding Activity object for Statement #{}", sid),
}
}
fn build_activity(row: TActivity, format: &Format) -> Result<Activity, MyError> {
debug!("row = {:?}", row);
debug!("format = {:?}", format);
let builder = Activity::builder().with_object_type().id(&row.iri)?;
if let Some(z_definition) = row.definition {
if format.is_ids() {
Ok(builder.build()?)
} else {
let mut res = builder.definition(z_definition.0)?.build()?;
if format.is_canonical() {
res.canonicalize(format.tags());
}
Ok(res)
}
} else {
Ok(builder.build()?)
}
}