use crate::CollectionConfig;
#[cfg(feature = "bson-3")]
use mongodb::bson::deserialize_from_bson;
#[cfg(feature = "compat-3-0-0")]
use mongodb::bson::from_bson as deserialize_from_bson;
use mongodb::Database;
use mongodb::bson::{Bson, Document, doc};
use mongodb::options::{ReadPreference, RunCommandOptions, SelectionCriteria};
use serde::Deserialize;
use std::borrow::Cow;
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortOrder {
Ascending,
Descending,
}
impl From<SortOrder> for Bson {
fn from(v: SortOrder) -> Self {
match v {
SortOrder::Ascending => Self::Int32(1),
SortOrder::Descending => Self::Int32(-1),
}
}
}
#[derive(Clone, Debug)]
enum IndexKey {
SortIndex(SortIndexKey),
TextIndex(TextIndexKey),
}
impl IndexKey {
fn get_key_name(&self) -> String {
match self {
Self::SortIndex(s) => match s.direction {
SortOrder::Ascending => format!("{}_1", s.name),
SortOrder::Descending => format!("{}_-1", s.name),
},
Self::TextIndex(t) => format!("{}_text", t.name),
}
}
fn get_name(&self) -> String {
match self {
Self::SortIndex(s) => s.name.to_string(),
Self::TextIndex(t) => t.name.to_string(),
}
}
fn get_value(&self) -> Bson {
match self {
Self::SortIndex(s) => s.direction.into(),
Self::TextIndex(_) => "text".into(),
}
}
}
#[derive(Debug, Clone)]
struct SortIndexKey {
name: Cow<'static, str>,
direction: SortOrder,
}
#[derive(Debug, Clone)]
struct TextIndexKey {
name: Cow<'static, str>,
}
#[derive(Default, Clone, Debug)]
pub struct Index {
keys: Vec<IndexKey>,
options: Vec<IndexOption>,
}
impl Index {
pub fn new(key: impl Into<Cow<'static, str>>) -> Self {
Self::new_with_direction(key, SortOrder::Ascending)
}
pub fn new_with_direction(key: impl Into<Cow<'static, str>>, direction: SortOrder) -> Self {
let mut index = Self::default();
index.add_key_with_direction(key, direction);
index
}
pub fn new_with_text(key: impl Into<Cow<'static, str>>) -> Self {
let mut index = Self::default();
index.add_key_with_text(key);
index
}
pub fn add_key(&mut self, key: impl Into<Cow<'static, str>>) {
self.add_key_with_direction(key, SortOrder::Ascending);
}
pub fn with_key(mut self, key: impl Into<Cow<'static, str>>) -> Self {
self.add_key(key);
self
}
pub fn add_key_with_direction(
&mut self,
key: impl Into<Cow<'static, str>>,
direction: SortOrder,
) {
self.keys.push(IndexKey::SortIndex(SortIndexKey {
name: key.into(),
direction,
}));
}
pub fn add_key_with_text(&mut self, key: impl Into<Cow<'static, str>>) {
self.keys
.push(IndexKey::TextIndex(TextIndexKey { name: key.into() }));
}
pub fn with_key_with_direction(
mut self,
key: impl Into<Cow<'static, str>>,
direction: SortOrder,
) -> Self {
self.add_key_with_direction(key, direction);
self
}
pub fn add_option(&mut self, option: IndexOption) {
self.options.push(option);
}
pub fn with_option(mut self, option: IndexOption) -> Self {
self.add_option(option);
self
}
pub fn into_document(self) -> Document {
let mut names = Vec::with_capacity(self.keys.len());
let mut keys_doc = Document::new();
for key in self.keys {
names.push(key.get_key_name());
keys_doc.insert(key.get_name(), key.get_value());
}
let mut index_doc = doc! { "key": keys_doc };
for option in self.options {
let (key, value) = option.into_key_value();
index_doc.insert(key, value);
}
if !index_doc.contains_key("name") {
let name = names.join("_");
index_doc.insert("name", name);
}
index_doc
}
}
#[derive(Debug, Clone)]
pub struct Indexes(pub(crate) Vec<Index>);
impl Default for Indexes {
fn default() -> Self {
Self::new()
}
}
impl From<Vec<Index>> for Indexes {
fn from(indexes: Vec<Index>) -> Self {
Self(indexes)
}
}
impl Indexes {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn with(mut self, index: Index) -> Self {
self.0.push(index);
self
}
pub fn create_indexes_command(self, collection_name: &str) -> Document {
let mut indexes = Vec::with_capacity(self.0.len());
for index in self.0 {
indexes.push(index.into_document());
}
doc! {
"createIndexes": collection_name,
"indexes": indexes
}
}
}
#[derive(Debug, Clone)]
pub enum IndexOption {
Background,
Unique,
Name(String),
PartialFilterExpression(Document),
Sparse,
ExpireAfterSeconds(i32),
StorageEngine(Document),
Collation(Document),
Weights(Vec<(String, i32)>),
Custom { name: String, value: Bson },
}
impl IndexOption {
pub fn name(&self) -> &str {
match self {
Self::Background => "background",
Self::Unique => "unique",
Self::Name(..) => "name",
Self::PartialFilterExpression(..) => "partialFilterExpression",
Self::Sparse => "sparse",
Self::ExpireAfterSeconds(..) => "expireAfterSeconds",
Self::StorageEngine(..) => "storageEngine",
Self::Collation(..) => "collation",
Self::Weights(..) => "weights",
Self::Custom { name, .. } => name.as_str(),
}
}
pub fn into_value(self) -> Bson {
match self {
Self::Background | Self::Unique | Self::Sparse => Bson::Boolean(true),
Self::Name(val) => Bson::String(val),
Self::ExpireAfterSeconds(val) => Bson::Int32(val),
Self::PartialFilterExpression(doc)
| Self::StorageEngine(doc)
| Self::Collation(doc) => Bson::Document(doc),
Self::Weights(w) => {
let mut doc = Document::new();
for (k, v) in w {
doc.insert(k, Bson::from(v));
}
Bson::Document(doc)
}
Self::Custom { value, .. } => value,
}
}
pub fn into_key_value(self) -> (String, Bson) {
let name = self.name().to_owned();
let value = self.into_value();
(name, value)
}
}
pub async fn sync_indexes<CollConf: CollectionConfig>(
db: &Database,
) -> Result<(), mongodb::error::Error> {
let mut indexes = CollConf::indexes();
match h_run_command(db, doc! { "listIndexes": CollConf::collection_name() }).await {
Ok(ret) => {
let parsed_ret: ListIndexesRet =
deserialize_from_bson(Bson::Document(ret)).map_err(std::io::Error::other)?;
if parsed_ret.cursor.id != 0 {
return Err(std::io::Error::other(format!(
"couldn't list all indexes from '{}'",
CollConf::collection_name()
))
.into());
}
let mut existing_indexes = HashMap::new();
for index in parsed_ret.cursor.first_batch {
if let Some(key) = index.get("key") {
existing_indexes.insert(key.to_string(), index);
}
}
let mut already_sync = Vec::new();
let mut to_drop = Vec::new();
for (i, index) in indexes.0.clone().into_iter().enumerate() {
let mut text_index_keys = None;
let index_doc = if index
.keys
.iter()
.any(|ind| matches!(ind, IndexKey::TextIndex(_)))
{
let mut doc = index.into_document();
text_index_keys = doc.get("key").cloned();
doc.insert("key", doc! { "_fts": "text", "_ftsx": 1 });
doc
} else {
index.into_document()
};
let key = index_doc
.get("key")
.ok_or_else(|| std::io::Error::other("index doc is missing 'key'"))?;
if let Some(mut existing_index) = existing_indexes.remove(&key.to_string()) {
existing_index.remove("ns");
existing_index.remove("v");
if let Some(Bson::Document(mut keys_to_set)) = text_index_keys
&& let Some(Bson::Document(existing_weights)) =
existing_index.get("weights")
{
for keys in keys_to_set.iter_mut() {
match keys.1 {
Bson::String(t) if t == "text" => {
*keys.1 = Bson::Int32(1);
}
_ => (),
}
}
if existing_weights.eq(&keys_to_set) {
already_sync.push(i);
} else {
to_drop.push(
index_doc
.get_str("name")
.map_err(std::io::Error::other)?
.to_owned(),
);
}
continue;
}
if doc_are_eq(&index_doc, &existing_index) {
already_sync.push(i);
} else {
to_drop.push(
index_doc
.get_str("name")
.map_err(std::io::Error::other)?
.to_owned(),
);
}
}
}
for existing_index in existing_indexes.values() {
let name = existing_index
.get_str("name")
.map_err(std::io::Error::other)?
.to_owned();
if name != "_id_" {
to_drop.push(name);
}
}
if !to_drop.is_empty() {
if h_run_command(
db,
doc! { "dropIndexes": CollConf::collection_name(), "index": &to_drop },
)
.await
.is_err()
{
for index_name in to_drop {
h_run_command(
db,
doc! { "dropIndexes": CollConf::collection_name(), "index": index_name },
)
.await?;
}
}
}
for i in already_sync.into_iter().rev() {
indexes.0.remove(i);
}
}
Err(e) => {
match e.kind.as_ref() {
mongodb::error::ErrorKind::Command(err) if err.code == 26 => {
}
_ => return Err(e),
}
}
}
if !indexes.0.is_empty() {
h_run_command(
db,
indexes.create_indexes_command(CollConf::collection_name()),
)
.await?;
}
Ok(())
}
async fn h_run_command(
db: &Database,
command_doc: Document,
) -> Result<Document, mongodb::error::Error> {
let primary_options = RunCommandOptions::builder()
.selection_criteria(SelectionCriteria::ReadPreference(ReadPreference::Primary))
.build();
let ret = db
.run_command(command_doc)
.with_options(primary_options)
.await?;
deserialize_from_bson::<mongodb::error::CommandError>(Bson::Document(ret.clone())).map_or_else(
|_| Ok(ret),
|err| {
Err(mongodb::error::Error::from(
mongodb::error::ErrorKind::Command(err),
))
},
)
}
#[derive(Deserialize)]
struct ListIndexesRet {
pub cursor: Cursor,
}
#[derive(Deserialize)]
struct Cursor {
pub id: i64,
#[serde(rename = "firstBatch", default)]
pub first_batch: Vec<Document>,
}
fn doc_are_eq(a: &Document, b: &Document) -> bool {
if a.len() != b.len() {
return false;
}
for (key, a_val) in a {
match b.get(key) {
Some(b_val) if a_val != b_val => {
return false;
}
Some(_) => {}
None => {
return false;
}
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_indexes_command() {
let index = Index::new_with_direction("id", SortOrder::Descending)
.with_key("last_seen")
.with_option(IndexOption::Background)
.with_option(IndexOption::Unique);
let index_2 = Index::new("last_seen").with_option(IndexOption::ExpireAfterSeconds(60));
let indexes = Indexes::from(vec![index, index_2]);
assert_eq!(
indexes.create_indexes_command("my_collection"),
doc! {
"createIndexes": "my_collection",
"indexes": [
{
"key": { "id": -1, "last_seen": 1 },
"background": true,
"unique": true,
"name": "id_-1_last_seen_1",
},
{
"key": { "last_seen": 1 },
"expireAfterSeconds": 60,
"name": "last_seen_1",
},
]
}
);
}
}