use crate::db::runner::DbRunner;
use crate::db::DbType;
use crate::schema::instances;
use crate::schema::instances::dsl;
use chrono::{DateTime, Utc};
use deadpool_diesel::postgres::Object as DbConnection;
use diesel::prelude::*;
use diesel::OptionalExtension;
use diesel::{AsChangeset, Identifiable, Insertable, Queryable};
use jdt_activity_pub::ApAddress;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortField {
DomainName,
Blocked,
LastMessageAt,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortDirection {
Asc,
Desc,
}
#[derive(Debug, Clone)]
pub struct SortParam {
pub field: SortField,
pub direction: SortDirection,
}
#[derive(Serialize, Deserialize, Insertable, AsChangeset, Default, Debug, Clone)]
#[diesel(table_name = instances)]
pub struct NewInstance {
pub domain_name: String,
pub json: Option<Value>,
pub blocked: bool,
pub shared_inbox: Option<String>,
}
#[derive(Identifiable, Queryable, AsChangeset, Serialize, Clone, Default, Debug)]
#[diesel(table_name = instances)]
pub struct Instance {
#[serde(skip_serializing)]
pub id: i32,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub domain_name: String,
pub json: Option<Value>,
pub blocked: bool,
pub last_message_at: DateTime<Utc>,
pub shared_inbox: Option<String>,
}
pub async fn create_or_update_instance<C: DbRunner>(
conn: &C,
instance: NewInstance,
) -> Result<Instance, anyhow::Error> {
conn.run(move |c| {
diesel::insert_into(instances::table)
.values(&instance)
.on_conflict(instances::domain_name)
.do_update()
.set((instances::last_message_at.eq(Utc::now()), &instance))
.get_result::<Instance>(c)
})
.await
}
pub type DomainInbox = (String, Option<String>);
impl From<DomainInbox> for NewInstance {
fn from((domain_name, shared_inbox): DomainInbox) -> Self {
NewInstance {
domain_name,
json: None,
blocked: false,
shared_inbox,
}
}
}
pub async fn get_instance_inboxes<C: DbRunner>(conn: &C) -> Result<Vec<ApAddress>, anyhow::Error> {
let operation = move |c: &mut PgConnection| {
let cutoff = Utc::now().naive_utc() - chrono::Duration::days(14);
instances::table
.filter(instances::blocked.eq(false))
.filter(instances::shared_inbox.is_not_null())
.filter(instances::last_message_at.gt(cutoff))
.select(instances::shared_inbox.assume_not_null())
.get_results::<String>(c)
};
let results = conn.run(operation).await?;
Ok(results.into_iter().map(ApAddress::from).collect())
}
pub async fn get_instance_by_domain_name<C: DbRunner>(
conn: &C,
domain_name_val: String,
) -> Result<Option<Instance>, anyhow::Error> {
let query = move |c: &mut _| {
dsl::instances
.filter(dsl::domain_name.eq(domain_name_val))
.first::<Instance>(c)
.optional()
};
conn.run(query).await
}
pub async fn get_all_instances_paginated<C: DbRunner>(
conn: &C,
page: i64,
page_size: i64,
sort_params: Option<Vec<SortParam>>,
) -> Result<Vec<Instance>, anyhow::Error> {
use crate::schema::instances::BoxedQuery;
let offset = (page - 1).max(0) * page_size;
let query_builder_fn = move |c: &mut _| {
let mut query: BoxedQuery<'_, DbType> = dsl::instances.into_boxed();
if let Some(params) = sort_params {
if !params.is_empty() {
for p in params {
query = match p.field {
SortField::DomainName => match p.direction {
SortDirection::Asc => query.then_order_by(dsl::domain_name.asc()),
SortDirection::Desc => query.then_order_by(dsl::domain_name.desc()),
},
SortField::Blocked => match p.direction {
SortDirection::Asc => query.then_order_by(dsl::blocked.asc()),
SortDirection::Desc => query.then_order_by(dsl::blocked.desc()),
},
SortField::LastMessageAt => match p.direction {
SortDirection::Asc => query.then_order_by(dsl::last_message_at.asc()),
SortDirection::Desc => query.then_order_by(dsl::last_message_at.desc()),
},
};
}
} else {
query = query.order(dsl::domain_name.asc());
}
} else {
query = query.order(dsl::domain_name.asc());
}
query.limit(page_size).offset(offset).load::<Instance>(c)
};
conn.run(query_builder_fn).await
}
pub async fn set_block_status<C: DbRunner>(
conn: &C,
domain_name_val: String,
should_be_blocked: bool,
) -> Result<Instance, anyhow::Error> {
match get_instance_by_domain_name(conn, domain_name_val.clone()).await? {
Some(instance) => {
if instance.blocked == should_be_blocked {
Ok(instance)
} else {
let query_update = move |c: &mut _| {
diesel::update(dsl::instances.find(instance.id))
.set((
dsl::blocked.eq(should_be_blocked),
dsl::updated_at.eq(Utc::now()),
))
.get_result::<Instance>(c)
};
conn.run(query_update).await
}
}
None => {
if should_be_blocked {
let new_instance_data = NewInstance {
domain_name: domain_name_val,
blocked: true,
json: None,
shared_inbox: None,
};
let query_upsert = move |c: &mut _| {
diesel::insert_into(dsl::instances)
.values(&new_instance_data)
.on_conflict(dsl::domain_name)
.do_update()
.set((
dsl::blocked.eq(true), dsl::updated_at.eq(Utc::now()),
))
.get_result::<Instance>(c)
};
conn.run(query_upsert).await
} else {
Err(anyhow::anyhow!(
"Instance {domain_name_val} not found. Cannot unblock a non-existent instance."
))
}
}
}
}
pub async fn get_blocked_instances<C: DbRunner>(conn: &C) -> Vec<Instance> {
conn.run(move |c| {
instances::table
.filter(instances::blocked.eq(true))
.get_results::<Instance>(c)
})
.await
.unwrap_or(vec![])
}
pub async fn create_or_update_instance_axum(
conn: &DbConnection,
instance: NewInstance,
) -> Result<Instance, anyhow::Error> {
conn.interact(move |c| {
use crate::schema::instances::dsl::*;
diesel::insert_into(instances)
.values(&instance)
.on_conflict(domain_name)
.do_update()
.set((last_message_at.eq(Utc::now()), &instance))
.get_result::<Instance>(c)
})
.await
.map_err(|e| anyhow::anyhow!("Interact error: {:?}", e))?
.map_err(anyhow::Error::from)
}