redact_crypto/storage/
mongodb.rs1use crate::{
2 CryptoError, Entry, IndexedStorer, IndexedTypeStorer, StorableType, Storer, TypeStorer,
3};
4use async_trait::async_trait;
5use futures::StreamExt;
6use mongodb::{
7 bson::{self, Bson, Document},
8 options::ClientOptions,
9 options::{FindOneOptions, FindOptions},
10 Client,
11};
12use once_cell::sync::OnceCell;
13use serde::{Deserialize, Serialize};
14use std::{
15 error::Error,
16 fmt::{self, Display, Formatter},
17};
18
19#[derive(Debug)]
20pub enum MongoStorerError {
21 InternalError {
23 source: Box<dyn Error + Send + Sync>,
24 },
25
26 NotFound,
28}
29
30impl Error for MongoStorerError {
31 fn source(&self) -> Option<&(dyn Error + 'static)> {
32 match *self {
33 MongoStorerError::InternalError { ref source } => Some(source.as_ref()),
34 MongoStorerError::NotFound => None,
35 }
36 }
37}
38
39impl Display for MongoStorerError {
40 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
41 match *self {
42 MongoStorerError::InternalError { .. } => {
43 write!(f, "Internal error occurred")
44 }
45 MongoStorerError::NotFound => {
46 write!(f, "Requested document not found")
47 }
48 }
49 }
50}
51
52impl From<MongoStorerError> for CryptoError {
53 fn from(mse: MongoStorerError) -> Self {
54 match mse {
55 MongoStorerError::InternalError { .. } => CryptoError::InternalError {
56 source: Box::new(mse),
57 },
58 MongoStorerError::NotFound => CryptoError::NotFound {
59 source: Box::new(mse),
60 },
61 }
62 }
63}
64
65#[derive(Serialize, Deserialize, Debug, Clone)]
67pub struct MongoStorer {
68 url: String,
69 db_name: String,
70 #[serde(skip)]
71 client: OnceCell<Client>,
72}
73
74impl From<MongoStorer> for IndexedTypeStorer {
75 fn from(ms: MongoStorer) -> Self {
76 IndexedTypeStorer::Mongo(ms)
77 }
78}
79
80impl From<MongoStorer> for TypeStorer {
81 fn from(ms: MongoStorer) -> Self {
82 TypeStorer::Indexed(IndexedTypeStorer::Mongo(ms))
83 }
84}
85
86impl MongoStorer {
87 pub fn new(url: &str, db_name: &str) -> Self {
90 MongoStorer {
91 url: url.to_owned(),
92 db_name: db_name.to_owned(),
93 client: OnceCell::new(),
94 }
95 }
96}
97
98impl MongoStorer {
99 async fn get_client(&self) -> Result<&Client, MongoStorerError> {
100 match self.client.get() {
101 Some(c) => Ok(c),
102 None => {
103 let db_client_options = ClientOptions::parse_with_resolver_config(
104 &self.url,
105 mongodb::options::ResolverConfig::cloudflare(),
106 )
107 .await
108 .map_err(|e| MongoStorerError::InternalError {
109 source: Box::new(e),
110 })?;
111 self.client.get_or_try_init(|| {
112 Client::with_options(db_client_options).map_err(|e| {
113 MongoStorerError::InternalError {
114 source: Box::new(e),
115 }
116 })
117 })
118 }
119 }
120 }
121}
122
123#[async_trait]
124impl IndexedStorer for MongoStorer {
125 async fn get_indexed<T: StorableType>(
126 &self,
127 path: &str,
128 index: &Option<Document>,
129 ) -> Result<Entry<T>, CryptoError> {
130 let mut filter = bson::doc! { "path": path };
131 if let Some(i) = index {
132 filter.insert("value", i);
133 }
134
135 let filter_options = FindOneOptions::builder().build();
136 let client = self.get_client().await?;
137
138 client
139 .database(&self.db_name)
140 .collection("entries")
141 .find_one(filter, filter_options)
142 .await
143 .map_err(|e| -> CryptoError {
144 MongoStorerError::InternalError {
145 source: Box::new(e),
146 }
147 .into()
148 })
149 .and_then(|doc| match doc {
150 Some(doc) => bson::from_bson(Bson::Document(doc)).map_err(|e| {
151 MongoStorerError::InternalError {
152 source: Box::new(e),
153 }
154 .into()
155 }),
156 None => Err(MongoStorerError::NotFound.into()),
157 })
158 }
159
160 async fn list_indexed<T: StorableType>(
161 &self,
162 path: &str,
163 skip: u64,
164 page_size: i64,
165 index: &Option<Document>,
166 ) -> Result<Vec<Entry<T>>, CryptoError> {
167 let mut filter = bson::doc! { "path": path };
168 if let Some(i) = index {
169 filter.insert("value", i);
170 }
171 let filter_options = FindOptions::builder().skip(skip).limit(page_size).build();
172
173 let cursor = self
174 .get_client()
175 .await?
176 .database(&self.db_name)
177 .collection("entries")
178 .find(filter, filter_options)
179 .await
180 .map_err(|e| -> CryptoError {
181 MongoStorerError::InternalError {
182 source: Box::new(e),
183 }
184 .into()
185 })?;
186
187 Ok(cursor
188 .filter_map(|doc| async move {
189 match doc {
190 Ok(doc) => Some(doc),
191 Err(_) => None,
192 }
193 })
194 .collect::<Vec<Document>>()
195 .await
196 .into_iter()
197 .filter_map(|doc| -> Option<Entry<T>> {
198 match bson::from_bson(Bson::Document(doc)) {
199 Ok(entry) => Some(entry),
200 Err(_) => None,
201 }
202 })
203 .collect::<Vec<Entry<T>>>())
204 }
205}
206
207#[async_trait]
208impl Storer for MongoStorer {
209 async fn get<T: StorableType>(&self, path: &str) -> Result<Entry<T>, CryptoError> {
210 self.get_indexed::<T>(path, &T::get_index()).await
211 }
212
213 async fn create<T: StorableType>(&self, entry: Entry<T>) -> Result<Entry<T>, CryptoError> {
214 let filter = bson::doc! { "path": &entry.path };
215 let filter_options = mongodb::options::ReplaceOptions::builder()
216 .upsert(true)
217 .build();
218 let doc = bson::to_document(&entry).map_err(|e| MongoStorerError::InternalError {
219 source: Box::new(e),
220 })?;
221
222 match self
223 .get_client()
224 .await?
225 .database(&self.db_name)
226 .collection("entries")
227 .replace_one(filter, doc, filter_options)
228 .await
229 {
230 Ok(_) => Ok(entry),
231 Err(e) => Err(MongoStorerError::InternalError {
232 source: Box::new(e),
233 }
234 .into()),
235 }
236 }
237}