1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use mongodb::db::ThreadedDatabase;
use crate::{
coll::Collection,
doc::Doc,
ext::DocumentExt,
error::{ ErrorKind, Result, ResultExt },
};
#[cfg(feature = "schema_validation")]
use magnet_schema::BsonSchema;
#[cfg(feature = "schema_validation")]
use crate::uid::Uid;
pub trait DatabaseExt: ThreadedDatabase {
fn existing_collection<T: Doc>(&self) -> Collection<T> {
self.collection(T::NAME).into()
}
#[cfg(feature = "schema_validation")]
fn empty_collection<T>(&self) -> Result<Collection<T>>
where T: Doc + BsonSchema,
Uid<T>: BsonSchema,
{
use bson::Bson;
use mongodb::CommandType;
use crate::bsn::BsonExt;
use crate::error::Error;
self.drop_collection(T::NAME).chain("error dropping collection")?;
let schema = {
let mut schema = T::bson_schema();
let mut properties = schema
.remove_document("properties")
.and_then(Bson::try_into_doc)?;
if properties.contains_key("_id") {
let id_schema = properties.get_document("_id")?;
if
*id_schema != Uid::<T>::bson_schema()
&&
*id_schema != Option::<Uid<T>>::bson_schema()
{
return Err(Error::new(ErrorKind::BsonSchema, "BSON schema mismatch for _id"));
}
} else {
properties.insert("_id", Uid::<T>::bson_schema());
}
schema.insert("properties", properties);
schema
};
let command = doc! {
"create": T::NAME,
"validator": { "$jsonSchema": schema },
};
let reply = self.command(command, CommandType::CreateCollection, None)?;
let err = || Error::new(
ErrorKind::MongoDbError,
format!("couldn't create {}: {}", T::NAME, reply)
);
let success = reply.get("ok").and_then(Bson::try_as_bool).ok_or_else(&err)?;
if success {
let coll = self.existing_collection();
coll.create_indexes()?;
Ok(coll)
} else {
Err(err())
}
}
fn empty_collection_novalidate<T: Doc>(&self) -> Result<Collection<T>> {
self.drop_collection(T::NAME).chain("error dropping collection")?;
let coll = self.existing_collection();
coll.create_indexes()?;
Ok(coll)
}
}
impl<T: ThreadedDatabase> DatabaseExt for T {}