use crate::{
MyError,
db::{activity::insert_activity, actor::find_agent_id_from_str, schema::TState},
emit_db_error,
};
use chrono::{DateTime, Utc};
use core::fmt;
use sqlx::PgPool;
use tracing::{debug, error};
use uuid::Uuid;
use xapi_data::{Activity, DataError};
#[derive(Debug)]
pub(crate) struct SingleResourceParams<'a> {
activity_id: i32,
agent_id: i32,
registration: Uuid,
state_id: &'a str,
}
impl fmt::Display for SingleResourceParams<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{ activity: #{}, agent: #{}, registration: {}, state: '{}' }}",
self.activity_id, self.agent_id, self.registration, self.state_id
)
}
}
#[derive(Debug)]
pub(crate) struct MultiResourceParams {
activity_id: i32,
agent_id: i32,
registration: Uuid,
since: Option<DateTime<Utc>>,
}
impl fmt::Display for MultiResourceParams {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if let Some(z_since) = self.since.as_ref() {
write!(
f,
"{{ activity: #{}, agent: #{}, registration: {}, since: {} }}",
self.activity_id, self.agent_id, self.registration, z_since
)
} else {
write!(
f,
"{{ activity: #{}, agent: #{} }}, registration: {}",
self.activity_id, self.agent_id, self.registration
)
}
}
}
const UPSERT: &str = r#"
INSERT INTO state (activity_id, agent_id, registration, state_id, document)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (activity_id, agent_id, registration, state_id)
DO UPDATE SET document = $5"#;
pub(crate) async fn upsert(
conn: &PgPool,
s: &SingleResourceParams<'_>,
document: &str,
) -> Result<(), MyError> {
sqlx::query(UPSERT)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.bind(s.state_id)
.bind(document)
.execute(conn)
.await?;
Ok(())
}
const FIND: &str = r#"SELECT * FROM state
WHERE activity_id = $1 AND agent_id = $2 AND registration = $3 AND state_id = $4"#;
pub(crate) async fn find(
conn: &PgPool,
s: &SingleResourceParams<'_>,
) -> Result<(Option<String>, DateTime<Utc>), MyError> {
match sqlx::query_as::<_, TState>(FIND)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.bind(s.state_id)
.fetch_one(conn)
.await
{
Ok(x) => Ok((Some(x.document), x.updated)),
Err(x) => match x {
sqlx::Error::RowNotFound => Ok((None, DateTime::UNIX_EPOCH)),
x => emit_db_error!(x, "Failed find State w/ {}", s),
},
}
}
const FIND_IDS_SINCE: &str = r#"
SELECT * FROM state
WHERE activity_id = $1 AND agent_id = $2 AND registration = $3 AND updated > $4"#;
const FIND_IDS: &str = r#"
SELECT * FROM state
WHERE activity_id = $1 AND agent_id = $2 AND registration = $3"#;
pub(crate) async fn find_ids(
conn: &PgPool,
s: &MultiResourceParams,
) -> Result<Vec<String>, MyError> {
let query = if let Some(z_since) = s.since {
sqlx::query_as::<_, TState>(FIND_IDS_SINCE)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.bind(z_since)
.fetch_all(conn)
} else {
sqlx::query_as::<_, TState>(FIND_IDS)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.fetch_all(conn)
};
match query.await {
Ok(x) => {
let vec = x.iter().map(|x| x.state_id.to_owned()).collect::<Vec<_>>();
Ok(vec)
}
Err(x) => match x {
sqlx::Error::RowNotFound => Ok(vec![]),
x => emit_db_error!(x, "Failed find State ID(s) w/ {}", s),
},
}
}
const DELETE: &str = r#"DELETE FROM state
WHERE activity_id = $1 AND agent_id = $2 AND registration = $3 AND state_id = $4"#;
pub(crate) async fn remove(conn: &PgPool, s: &SingleResourceParams<'_>) -> Result<(), MyError> {
match sqlx::query(DELETE)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.bind(s.state_id)
.execute(conn)
.await
{
Ok(_) => Ok(()),
Err(x) => emit_db_error!(x, "Failed remove State w/ {}", s),
}
}
const DELETE_MANY: &str = r#"DELETE FROM state
WHERE activity_id = $1 AND agent_id = $2 AND registration = $3"#;
pub(crate) async fn remove_many(
conn: &PgPool,
s: &SingleResourceParams<'_>,
) -> Result<(), MyError> {
match sqlx::query(DELETE_MANY)
.bind(s.activity_id)
.bind(s.agent_id)
.bind(s.registration)
.execute(conn)
.await
{
Ok(_) => Ok(()),
Err(x) => emit_db_error!(x, "Failed remove State(s) w/ {}", s),
}
}
pub(crate) async fn as_single<'a>(
conn: &PgPool,
activity_iri: &'a str,
agent: &'a str,
registration: Option<&'a str>,
state_id: &'a str,
) -> Result<SingleResourceParams<'a>, MyError> {
debug!("----- as_single -----");
let activity = Activity::from_iri_str(activity_iri).map_err(|x| {
error!("Failed parse Activity ({})", activity_iri);
MyError::Data(x)
})?;
let activity_id = insert_activity(conn, &activity).await?;
debug!("activity_id = {}", activity_id);
let agent_id = find_agent_id_from_str(conn, agent).await?;
debug!("agent_id = {}", agent_id);
let registration = if let Some(z_uuid) = registration {
Uuid::parse_str(z_uuid).map_err(|x| {
error!("Failed parse registration ({})", z_uuid);
MyError::Data(DataError::UUID(x))
})?
} else {
Uuid::nil()
};
Ok(SingleResourceParams {
activity_id,
agent_id,
registration,
state_id,
})
}
pub(crate) async fn as_many<'a>(
conn: &PgPool,
activity_iri: &'a str,
agent: &'a str,
registration: Option<&'a str>,
since: Option<&'a str>,
) -> Result<MultiResourceParams, MyError> {
debug!("----- as_many -----");
let activity = Activity::from_iri_str(activity_iri).map_err(|x| {
error!("Failed parse Activity ({})", activity_iri);
MyError::Data(x)
})?;
let activity_id = insert_activity(conn, &activity).await?;
debug!("activity_id = {}", activity_id);
let agent_id = find_agent_id_from_str(conn, agent).await?;
debug!("agent_id = {}", agent_id);
let registration = if let Some(z_uuid) = registration {
Uuid::parse_str(z_uuid).map_err(|x| {
error!("Failed parse registration ({})", z_uuid);
MyError::Data(DataError::UUID(x))
})?
} else {
Uuid::nil()
};
let since = if let Some(z_str) = since {
let dt = DateTime::parse_from_rfc3339(z_str).map_err(|x| {
error!("Failed parse since ({})", z_str);
MyError::Data(DataError::Time(x))
})?;
Some(dt.with_timezone(&Utc))
} else {
None
};
Ok(MultiResourceParams {
activity_id,
agent_id,
registration,
since,
})
}