use crate::CollectionConfig;
use mongodb::bson::{doc, from_bson, Bson, Document};
use mongodb::options::*;
use mongodb::Database;
use serde::Deserialize;
use std::borrow::Cow;
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SortOrder {
Ascending,
Descending,
}
impl From<SortOrder> for Bson {
fn from(v: SortOrder) -> Self {
match v {
SortOrder::Ascending => Self::Int32(1),
SortOrder::Descending => Self::Int32(-1),
}
}
}
#[derive(Debug, Clone)]
struct IndexKey {
name: Cow<'static, str>,
direction: SortOrder,
}
#[derive(Default, Clone, Debug)]
pub struct Index {
keys: Vec<IndexKey>,
options: Vec<IndexOption>,
}
impl Index {
pub fn new(key: impl Into<Cow<'static, str>>) -> Self {
Self::new_with_direction(key, SortOrder::Ascending)
}
pub fn new_with_direction(key: impl Into<Cow<'static, str>>, direction: SortOrder) -> Self {
let mut index = Self::default();
index.add_key_with_direction(key, direction);
index
}
pub fn add_key(&mut self, key: impl Into<Cow<'static, str>>) {
self.add_key_with_direction(key, SortOrder::Ascending)
}
pub fn with_key(mut self, key: impl Into<Cow<'static, str>>) -> Self {
self.add_key(key);
self
}
pub fn add_key_with_direction(
&mut self,
key: impl Into<Cow<'static, str>>,
direction: SortOrder,
) {
self.keys.push(IndexKey {
name: key.into(),
direction,
});
}
pub fn with_key_with_direction(
mut self,
key: impl Into<Cow<'static, str>>,
direction: SortOrder,
) -> Self {
self.add_key_with_direction(key, direction);
self
}
pub fn add_option(&mut self, option: IndexOption) {
self.options.push(option);
}
pub fn with_option(mut self, option: IndexOption) -> Self {
self.add_option(option);
self
}
pub fn into_document(self) -> Document {
let mut names = Vec::with_capacity(self.keys.len());
let mut keys_doc = Document::new();
for key in self.keys {
let key_name = match key.direction {
SortOrder::Ascending => format!("{}_1", key.name),
SortOrder::Descending => format!("{}_-1", key.name),
};
names.push(key_name);
keys_doc.insert(key.name, key.direction);
}
let mut index_doc = doc! { "key": keys_doc };
for option in self.options {
let (key, value) = option.into_key_value();
index_doc.insert(key, value);
}
if !index_doc.contains_key("name") {
let name = names.join("_");
index_doc.insert("name", name);
}
index_doc
}
}
#[derive(Debug, Clone)]
pub struct Indexes(pub(crate) Vec<Index>);
impl Default for Indexes {
fn default() -> Self {
Self::new()
}
}
impl From<Vec<Index>> for Indexes {
fn from(indexes: Vec<Index>) -> Self {
Self(indexes)
}
}
impl Indexes {
pub fn new() -> Self {
Self(Vec::new())
}
pub fn with(mut self, index: Index) -> Self {
self.0.push(index);
self
}
pub fn create_indexes_command(self, collection_name: &str) -> Document {
let mut indexes = Vec::with_capacity(self.0.len());
for index in self.0 {
indexes.push(index.into_document());
}
doc! {
"createIndexes": collection_name,
"indexes": indexes
}
}
}
#[derive(Debug, Clone)]
pub enum IndexOption {
Background,
Unique,
Name(String),
PartialFilterExpression(Document),
Sparse,
ExpireAfterSeconds(i32),
StorageEngine(Document),
Collation(Document),
Custom { name: String, value: Bson },
}
impl IndexOption {
pub fn name(&self) -> &str {
match self {
IndexOption::Background => "background",
IndexOption::Unique => "unique",
IndexOption::Name(..) => "name",
IndexOption::PartialFilterExpression(..) => "partialFilterExpression",
IndexOption::Sparse => "sparse",
IndexOption::ExpireAfterSeconds(..) => "expireAfterSeconds",
IndexOption::StorageEngine(..) => "storageEngine",
IndexOption::Collation(..) => "collation",
IndexOption::Custom { name, .. } => name.as_str(),
}
}
pub fn into_value(self) -> Bson {
match self {
IndexOption::Background | IndexOption::Unique | IndexOption::Sparse => {
Bson::Boolean(true)
}
IndexOption::Name(val) => Bson::String(val),
IndexOption::ExpireAfterSeconds(val) => Bson::Int32(val),
IndexOption::PartialFilterExpression(doc)
| IndexOption::StorageEngine(doc)
| IndexOption::Collation(doc) => Bson::Document(doc),
IndexOption::Custom { value, .. } => value,
}
}
pub fn into_key_value(self) -> (String, Bson) {
let name = self.name().to_owned();
let value = self.into_value();
(name, value)
}
}
pub async fn sync_indexes<CollConf: CollectionConfig>(
db: &Database,
) -> Result<(), mongodb::error::Error> {
let mut indexes = CollConf::indexes();
match h_run_command(db, doc! { "listIndexes": CollConf::collection_name() }).await {
Ok(ret) => {
let parsed_ret: ListIndexesRet = from_bson(Bson::Document(ret))
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
if parsed_ret.cursor.id != 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"couldn't list all indexes from '{}'",
CollConf::collection_name()
),
)
.into());
}
let mut existing_indexes = HashMap::new();
for index in parsed_ret.cursor.first_batch {
if let Some(key) = index.get("key") {
existing_indexes.insert(key.to_string(), index);
}
}
let mut already_sync = Vec::new();
let mut to_drop = Vec::new();
for (i, index) in indexes.0.clone().into_iter().enumerate() {
let index_doc = index.into_document();
let key = index_doc.get("key").ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "index doc is missing 'key'")
})?;
if let Some(mut existing_index) = existing_indexes.remove(&key.to_string()) {
existing_index.remove("ns");
existing_index.remove("v");
if doc_are_eq(&index_doc, &existing_index) {
already_sync.push(i);
} else {
to_drop.push(
index_doc
.get_str("name")
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
.to_owned(),
);
}
}
}
for existing_index in existing_indexes.values() {
let name = existing_index
.get_str("name")
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?
.to_owned();
if name != "_id_" {
to_drop.push(name);
}
}
if !to_drop.is_empty() {
if h_run_command(
db,
doc! { "dropIndexes": CollConf::collection_name(), "index": &to_drop },
)
.await
.is_err()
{
for index_name in to_drop {
h_run_command(
db,
doc! { "dropIndexes": CollConf::collection_name(), "index": index_name },
)
.await?;
}
}
}
for i in already_sync.into_iter().rev() {
indexes.0.remove(i);
}
}
Err(e) => {
match e.kind.as_ref() {
mongodb::error::ErrorKind::CommandError(err) if err.code == 26 => {
}
_ => return Err(e),
}
}
}
if !indexes.0.is_empty() {
h_run_command(
db,
indexes.create_indexes_command(CollConf::collection_name()),
)
.await?;
}
Ok(())
}
async fn h_run_command(
db: &Database,
command_doc: Document,
) -> Result<Document, mongodb::error::Error> {
let ret = db
.run_command(
command_doc,
Some(SelectionCriteria::ReadPreference(ReadPreference::Primary)),
)
.await?;
if let Ok(err) = from_bson::<mongodb::error::CommandError>(Bson::Document(ret.clone())) {
Err(mongodb::error::Error::from(
mongodb::error::ErrorKind::CommandError(err),
))
} else {
Ok(ret)
}
}
#[derive(Deserialize)]
struct ListIndexesRet {
pub cursor: Cursor,
}
#[derive(Deserialize)]
struct Cursor {
pub id: i64,
#[serde(rename = "firstBatch", default)]
pub first_batch: Vec<Document>,
}
fn doc_are_eq(a: &Document, b: &Document) -> bool {
if a.len() != b.len() {
return false;
}
for (key, a_val) in a {
match b.get(key) {
Some(b_val) if a_val != b_val => {
return false;
}
Some(_) => {}
None => {
return false;
}
}
}
true
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn create_indexes_command() {
let index = Index::new_with_direction("id", SortOrder::Descending)
.with_key("last_seen")
.with_option(IndexOption::Background)
.with_option(IndexOption::Unique);
let index_2 = Index::new("last_seen").with_option(IndexOption::ExpireAfterSeconds(60));
let indexes = Indexes::from(vec![index, index_2]);
assert_eq!(
indexes.create_indexes_command("my_collection"),
doc! {
"createIndexes": "my_collection",
"indexes": [
{
"key": { "id": -1, "last_seen": 1 },
"background": true,
"unique": true,
"name": "id_-1_last_seen_1",
},
{
"key": { "last_seen": 1 },
"expireAfterSeconds": 60,
"name": "last_seen_1",
},
]
}
);
}
}