use bson::Document;
use futures::StreamExt;
use iso8601_timestamp::Timestamp;
use mongodb::options::ReadConcern;
use revolt_result::Result;
use crate::{FieldsMember, Member, MemberCompositeKey, PartialMember};
use crate::{IntoDocumentPath, MongoDb};
use super::{AbstractServerMembers, ChunkedServerMembersGenerator};
static COL: &str = "server_members";
#[async_trait]
impl AbstractServerMembers for MongoDb {
async fn insert_or_merge_member(&self, member: &Member) -> Result<Option<Member>> {
let existing: Result<Option<Document>> = query!(
self,
find_one,
COL,
doc! {
"_id.server": &member.id.server,
"_id.user": &member.id.user,
"pending_deletion_at": {"$exists": true}
}
);
if existing.is_ok_and(|x| x.is_some()) {
self.col::<Member>(COL)
.find_one_and_update(
doc! {
"_id.server": &member.id.server,
"_id.user": &member.id.user,
},
doc! {
"$set": {
"joined_at": member.joined_at.duration_since(Timestamp::UNIX_EPOCH).whole_seconds(),
},
"$unset": {
"pending_deletion_at": ""
}
},
)
.return_document(mongodb::options::ReturnDocument::After)
.await
.map_err(|_| create_database_error!("update_one", COL))
} else {
query!(self, insert_one, COL, &member).map(|_| ())?;
Ok(None)
}
}
async fn fetch_member(&self, server_id: &str, user_id: &str) -> Result<Member> {
query!(
self,
find_one,
COL,
doc! {
"_id.server": server_id,
"_id.user": user_id,
"pending_deletion_at": {"$exists": false}
}
)?
.ok_or_else(|| create_error!(NotFound))
}
async fn fetch_all_members(&self, server_id: &str) -> Result<Vec<Member>> {
Ok(self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id,
"pending_deletion_at": {"$exists": false}
})
.await
.map_err(|_| create_database_error!("find", COL))?
.filter_map(|s| async {
if cfg!(debug_assertions) {
Some(s.unwrap())
} else {
s.ok()
}
})
.collect()
.await)
}
async fn fetch_all_members_chunked(
&self,
server_id: &str,
) -> Result<ChunkedServerMembersGenerator> {
let config = revolt_config::config().await;
let mut session = self
.start_session()
.await
.map_err(|_| create_database_error!("start_session", COL))?;
session
.start_transaction()
.read_concern(ReadConcern::snapshot())
.await
.map_err(|_| create_database_error!("start_transaction", COL))?;
let cursor = self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id
})
.session(&mut session)
.batch_size(config.pushd.mass_mention_chunk_size as u32)
.await
.map_err(|_| create_database_error!("find", COL))?;
Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor))
}
async fn fetch_all_members_with_roles(
&self,
server_id: &str,
roles: &[String],
) -> Result<Vec<Member>> {
Ok(self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id,
"roles": {"$in": roles}
})
.await
.map_err(|_| create_database_error!("find", COL))?
.filter_map(|s| async {
if cfg!(debug_assertions) {
Some(s.unwrap())
} else {
s.ok()
}
})
.collect()
.await)
}
async fn fetch_all_members_with_roles_chunked(
&self,
server_id: &str,
roles: &[String],
) -> Result<ChunkedServerMembersGenerator> {
let config = revolt_config::config().await;
let mut session = self
.start_session()
.await
.map_err(|_| create_database_error!("start_session", COL))?;
session
.start_transaction()
.read_concern(ReadConcern::snapshot())
.await
.map_err(|_| create_database_error!("start_transaction", COL))?;
let cursor = self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id,
"roles": {"$in": roles}
})
.session(&mut session)
.batch_size(config.pushd.mass_mention_chunk_size as u32)
.await
.map_err(|_| create_database_error!("find", COL))?;
return Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor));
}
async fn fetch_all_memberships(&self, user_id: &str) -> Result<Vec<Member>> {
Ok(self
.col::<Member>(COL)
.find(doc! {
"_id.user": user_id,
"pending_deletion_at": {"$exists": false}
})
.await
.map_err(|_| create_database_error!("find", COL))?
.filter_map(|s| async {
if cfg!(debug_assertions) {
Some(s.unwrap())
} else {
s.ok()
}
})
.collect()
.await)
}
async fn fetch_members(&self, server_id: &str, ids: &[String]) -> Result<Vec<Member>> {
Ok(self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id,
"pending_deletion_at": {"$exists": false},
"_id.user": {
"$in": ids
}
})
.await
.map_err(|_| create_database_error!("find", COL))?
.filter_map(|s| async {
if cfg!(debug_assertions) {
Some(s.unwrap())
} else {
s.ok()
}
})
.collect()
.await)
}
async fn fetch_member_count(&self, server_id: &str) -> Result<usize> {
self.col::<Member>(COL)
.count_documents(doc! {
"_id.server": server_id,
"pending_deletion_at": {"$exists": false}
})
.await
.map(|c| c as usize)
.map_err(|_| create_database_error!("count_documents", COL))
}
async fn fetch_server_count(&self, user_id: &str) -> Result<usize> {
self.col::<Member>(COL)
.count_documents(doc! {
"_id.user": user_id,
"pending_deletion_at": {"$exists": false}
})
.await
.map(|c| c as usize)
.map_err(|_| create_database_error!("count_documents", COL))
}
async fn update_member(
&self,
id: &MemberCompositeKey,
partial: &PartialMember,
remove: Vec<FieldsMember>,
) -> Result<()> {
query!(
self,
update_one,
COL,
doc! {
"_id.server": &id.server,
"_id.user": &id.user
},
partial,
remove.iter().map(|x| x as &dyn IntoDocumentPath).collect(),
None
)
.map(|_| ())
}
async fn soft_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
let member = self.fetch_member(&id.server, &id.user).await;
if let Ok(member) = member {
if member.in_timeout() {
self.col::<Document>(COL)
.update_many(
doc! {
"_id.server": &id.server,
"_id.user": &id.user,
},
doc! {
"$set": {"pending_deletion_at": format!("{}", member.timeout.unwrap().format())},
"$unset": {
"joined_at": "",
"avatar": "",
"nickname": "",
"roles": ""
}
},
)
.await
.map(|_| ())
.map_err(|_| create_database_error!("update_many", COL))
} else {
self.force_delete_member(id).await
}
} else {
Err(create_database_error!("fetch_member", COL))
}
}
async fn force_delete_member(&self, id: &MemberCompositeKey) -> Result<()> {
query!(
self,
delete_one,
COL,
doc! {
"_id.server": &id.server,
"_id.user": &id.user
}
)
.map(|_| ())
}
async fn remove_dangling_members(&self) -> Result<()> {
let now = Timestamp::now_utc();
let date = bson::to_bson(&now).expect("Failed to serialize timestamp");
self.col::<Document>(COL)
.delete_many(doc! {
"pending_deletion_at": {"$lt": date}
})
.await
.map(|_| ())
.map_err(|_| create_database_error!("count_documents", COL))
}
}
impl IntoDocumentPath for FieldsMember {
fn as_path(&self) -> Option<&'static str> {
match self {
FieldsMember::JoinedAt => Some("joined_at"),
FieldsMember::Avatar => Some("avatar"),
FieldsMember::Nickname => Some("nickname"),
FieldsMember::Roles => Some("roles"),
FieldsMember::Timeout => Some("timeout"),
FieldsMember::CanPublish => Some("can_publish"),
FieldsMember::CanReceive => Some("can_receive"),
FieldsMember::VoiceChannel => None,
}
}
}