1use crate::StorageConfig;
23use async_trait::async_trait;
24use bytes::{Bytes, BytesMut};
25use futures_util::{AsyncWriteExt, StreamExt};
26use mongodb::{
27 bson::{doc, raw::ValueAccessErrorKind, Bson, Document, RawDocument},
28 gridfs::GridFsBucket,
29 options::GridFsUploadOptions,
30 Client, Database,
31};
32use remi::{Blob, File, ListBlobsRequest, UploadRequest};
33use std::{borrow::Cow, collections::HashMap, io, path::Path};
34use tokio_util::{compat::FuturesAsyncReadCompatExt, io::ReaderStream};
35
36fn value_access_err_to_error(error: mongodb::bson::raw::ValueAccessError) -> mongodb::error::Error {
37 match error.kind {
38 ValueAccessErrorKind::NotPresent => {
39 mongodb::error::Error::custom(format!("key [{}] was not found", error.key()))
40 }
41
42 ValueAccessErrorKind::UnexpectedType { expected, actual, .. } => mongodb::error::Error::custom(format!(
43 "expected BSON type '{expected:?}', actual type for key [{}] is '{actual:?}'",
44 error.key()
45 )),
46
47 ValueAccessErrorKind::InvalidBson(err) => err.into(),
48 _ => unimplemented!(
49 "`ValueAccessErrorKind` was unhandled, please report it: https://github.com/Noelware/remi-rs/issues/new"
50 ),
51 }
52}
53
54fn document_to_blob(bytes: Bytes, doc: &RawDocument) -> Result<File, mongodb::error::Error> {
55 let filename = doc.get_str("filename").map_err(value_access_err_to_error)?;
56 let length = doc.get_i64("length").map_err(value_access_err_to_error)?;
57 let created_at = doc.get_datetime("uploadDate").map_err(value_access_err_to_error)?;
58 let metadata = doc.get_document("metadata").map_err(value_access_err_to_error)?;
59
60 let content_type = match metadata.get_str("contentType") {
61 Ok(res) => Some(res),
62 Err(e) => match e.kind {
63 ValueAccessErrorKind::NotPresent => match metadata.get_str("contentType") {
64 Ok(res) => Some(res),
65 Err(e) => return Err(value_access_err_to_error(e)),
66 },
67 _ => return Err(value_access_err_to_error(e)),
68 },
69 };
70
71 let mut map = HashMap::new();
77 for ref_ in metadata.into_iter() {
78 let (name, doc) = ref_?;
79 if name != "contentType" {
80 if let Some(s) = doc.as_str() {
81 map.insert(name.into(), s.into());
82 }
83 }
84 }
85
86 Ok(File {
87 last_modified_at: None,
88 content_type: content_type.map(String::from),
89 metadata: map,
90 created_at: if created_at.timestamp_millis() < 0 {
91 #[cfg(feature = "tracing")]
92 ::tracing::warn!(%filename, "`created_at` timestamp was negative");
93
94 #[cfg(feature = "log")]
95 ::log::warn!("`created_at` for file {filename} was negative");
96
97 None
98 } else {
99 Some(
100 u128::try_from(created_at.timestamp_millis())
101 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
102 )
103 },
104
105 is_symlink: false,
106 data: bytes,
107 name: filename.to_owned(),
108 path: format!("gridfs://{filename}"),
109 size: if length < 0 {
110 0
111 } else {
112 length
113 .try_into()
114 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
115 },
116 })
117}
118
119fn resolve_path(path: &Path) -> Result<String, mongodb::error::Error> {
120 let path = path.to_str().ok_or_else(|| {
121 <mongodb::error::Error as From<io::Error>>::from(io::Error::new(
122 io::ErrorKind::InvalidData,
123 "expected valid utf-8 string",
124 ))
125 })?;
126
127 let path = path.trim_start_matches("~/").trim_start_matches("./");
129
130 Ok(path.to_owned())
131}
132
133#[derive(Debug, Clone)]
134pub struct StorageService {
135 config: Option<StorageConfig>,
136 bucket: GridFsBucket,
137}
138
139impl StorageService {
140 pub fn new(db: Database, config: StorageConfig) -> StorageService {
143 let bucket = db.gridfs_bucket(Some(config.clone().into()));
144 StorageService {
145 config: Some(config),
146 bucket,
147 }
148 }
149
150 pub fn from_client(client: &Client, config: StorageConfig) -> StorageService {
152 Self::new(
153 client.database(&config.clone().database.unwrap_or(String::from("mydb"))),
154 config,
155 )
156 }
157
158 pub async fn from_conn_string<C: AsRef<str>>(
160 conn_string: C,
161 config: StorageConfig,
162 ) -> Result<StorageService, mongodb::error::Error> {
163 let client = Client::with_uri_str(conn_string).await?;
164 Ok(Self::from_client(&client, config))
165 }
166
167 pub fn with_bucket(bucket: GridFsBucket) -> StorageService {
169 StorageService { config: None, bucket }
170 }
171
172 fn resolve_path<P: AsRef<Path>>(&self, path: P) -> Result<String, mongodb::error::Error> {
173 resolve_path(path.as_ref())
174 }
175}
176
177#[async_trait]
178impl remi::StorageService for StorageService {
179 type Error = mongodb::error::Error;
180
181 fn name(&self) -> Cow<'static, str> {
182 Cow::Borrowed("remi:gridfs")
183 }
184
185 #[cfg_attr(
186 feature = "tracing",
187 tracing::instrument(
188 name = "remi.gridfs.open",
189 skip_all,
190 fields(
191 remi.service = "gridfs",
192 path = %path.as_ref().display()
193 )
194 )
195 )]
196 async fn open<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Bytes>, Self::Error> {
197 let path = self.resolve_path(path)?;
198
199 #[cfg(feature = "tracing")]
200 ::tracing::info!(file = %path, "opening file");
201
202 #[cfg(feature = "log")]
203 ::log::info!("opening file [{}]", path);
204
205 let mut cursor = self.bucket.find(doc! { "filename": &path }).await?;
206 let advanced = cursor.advance().await?;
207 if !advanced {
208 #[cfg(feature = "tracing")]
209 ::tracing::warn!(
210 file = %path,
211 "file doesn't exist in GridFS"
212 );
213
214 #[cfg(feature = "log")]
215 ::log::warn!("file [{}] doesn't exist in GridFS", path);
216
217 return Ok(None);
218 }
219
220 let doc = cursor.current();
221 let stream = self
222 .bucket
223 .open_download_stream(Bson::ObjectId(
224 doc.get_object_id("_id").map_err(value_access_err_to_error)?,
225 ))
226 .await?;
227
228 let mut bytes = BytesMut::new();
229 let mut reader = ReaderStream::new(stream.compat());
230 while let Some(raw) = reader.next().await {
231 match raw {
232 Ok(b) => bytes.extend(b),
233 Err(e) => return Err(e.into()),
234 }
235 }
236
237 Ok(Some(bytes.into()))
238 }
239
240 #[cfg_attr(
241 feature = "tracing",
242 tracing::instrument(
243 name = "remi.gridfs.blob",
244 skip_all,
245 fields(
246 remi.service = "gridfs",
247 path = %path.as_ref().display()
248 )
249 )
250 )]
251 async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Blob>, Self::Error> {
252 let path = self.resolve_path(path)?;
253 let Some(bytes) = self.open(&path).await? else {
254 return Ok(None);
255 };
256
257 #[cfg(feature = "tracing")]
258 ::tracing::info!(
259 file = %path,
260 "getting file metadata for file"
261 );
262
263 #[cfg(feature = "log")]
264 ::log::info!("getting file metadata for file [{}]", path);
265
266 let mut cursor = self
267 .bucket
268 .find(doc! {
269 "filename": &path,
270 })
271 .await?;
272
273 let has_advanced = cursor.advance().await?;
275 if !has_advanced {
276 #[cfg(feature = "tracing")]
277 ::tracing::warn!(file = %path, "file doesn't exist");
278
279 #[cfg(feature = "log")]
280 ::log::warn!("file [{}] doesn't exist", path);
281
282 return Ok(None);
283 }
284
285 let doc = cursor.current();
286 document_to_blob(bytes, doc).map(|doc| Some(Blob::File(doc)))
287 }
288
289 #[cfg_attr(
290 feature = "tracing",
291 tracing::instrument(
292 name = "remi.gridfs.blobs",
293 skip_all,
294 fields(
295 remi.service = "gridfs"
296 )
297 )
298 )]
299 async fn blobs<P: AsRef<Path> + Send>(
300 &self,
301 path: Option<P>,
302 _request: Option<ListBlobsRequest>,
303 ) -> Result<Vec<Blob>, Self::Error> {
304 #[allow(unused)]
307 if let Some(path) = path {
308 #[cfg(feature = "tracing")]
309 ::tracing::warn!(
310 file = %path.as_ref().display(),
311 "using blobs() with a given file name is not supported",
312 );
313
314 #[cfg(feature = "log")]
315 ::log::warn!(
316 "using blobs() with a given file name [{}] is not supported",
317 path.as_ref().display()
318 );
319
320 return Ok(vec![]);
321 }
322
323 let mut cursor = self.bucket.find(doc!()).await?;
324 let mut blobs = vec![];
325 while cursor.advance().await? {
326 let doc = cursor.current();
327 let stream = self
328 .bucket
329 .open_download_stream(Bson::ObjectId(
330 doc.get_object_id("_id").map_err(value_access_err_to_error)?,
331 ))
332 .await?;
333
334 let mut bytes = BytesMut::new();
335 let mut reader = ReaderStream::new(stream.compat());
336 while let Some(raw) = reader.next().await {
337 match raw {
338 Ok(b) => bytes.extend(b),
339 Err(e) => return Err(e.into()),
340 }
341 }
342
343 match document_to_blob(bytes.into(), doc) {
344 Ok(blob) => blobs.push(Blob::File(blob)),
345
346 #[cfg(any(feature = "tracing", feature = "log"))]
347 Err(e) => {
348 #[cfg(feature = "tracing")]
349 ::tracing::error!(error = %e, "unable to convert to a file");
350
351 #[cfg(feature = "log")]
352 ::log::error!("unable to convert to a file: {e}");
353 }
354
355 #[cfg(not(any(feature = "tracing", feature = "log")))]
356 Err(_e) => {}
357 }
358 }
359
360 Ok(blobs)
361 }
362
363 #[cfg_attr(
364 feature = "tracing",
365 tracing::instrument(
366 name = "remi.gridfs.delete",
367 skip_all,
368 fields(
369 remi.service = "gridfs",
370 path = %path.as_ref().display()
371 )
372 )
373 )]
374 async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> Result<(), Self::Error> {
375 let path = self.resolve_path(path)?;
376
377 #[cfg(feature = "tracing")]
378 ::tracing::info!(file = %path, "deleting file");
379
380 #[cfg(feature = "log")]
381 ::log::info!("deleting file [{}]", path);
382
383 let mut cursor = self
384 .bucket
385 .find(doc! {
386 "filename": &path,
387 })
388 .await?;
389
390 let has_advanced = cursor.advance().await?;
392 if !has_advanced {
393 #[cfg(feature = "tracing")]
394 ::tracing::warn!(file = %path, "file doesn't exist");
395
396 #[cfg(feature = "log")]
397 ::log::warn!("file [{}] doesn't exist", path);
398
399 return Ok(());
400 }
401
402 let doc = cursor.current();
403 let oid = doc.get_object_id("_id").map_err(value_access_err_to_error)?;
404
405 self.bucket.delete(Bson::ObjectId(oid)).await
406 }
407
408 #[cfg_attr(
409 feature = "tracing",
410 tracing::instrument(
411 name = "remi.gridfs.exists",
412 skip_all,
413 fields(
414 remi.service = "gridfs",
415 path = %path.as_ref().display()
416 )
417 )
418 )]
419 async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> Result<bool, Self::Error> {
420 match self.open(path).await {
421 Ok(Some(_)) => Ok(true),
422 Ok(None) => Ok(false),
423 Err(e) => Err(e),
424 }
425 }
426
427 #[cfg_attr(
428 feature = "tracing",
429 tracing::instrument(
430 name = "remi.gridfs.blob",
431 skip_all,
432 fields(
433 remi.service = "gridfs",
434 path = %path.as_ref().display()
435 )
436 )
437 )]
438 async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> Result<(), Self::Error> {
439 let path = self.resolve_path(path)?;
440
441 #[cfg(feature = "tracing")]
442 ::tracing::info!(
443 file = %path,
444 "uploading file to GridFS..."
445 );
446
447 #[cfg(feature = "log")]
448 ::log::info!("uploading file [{}] to GridFS", path);
449
450 let mut metadata = options
451 .metadata
452 .into_iter()
453 .map(|(key, value)| (key, Bson::String(value)))
454 .collect::<Document>();
455
456 if let Some(ct) = options.content_type {
457 metadata.insert("contentType", ct);
458 }
459
460 let opts = GridFsUploadOptions::builder()
461 .chunk_size_bytes(Some(
462 self.config.clone().unwrap_or_default().chunk_size.unwrap_or(255 * 1024),
463 ))
464 .metadata(metadata)
465 .build();
466
467 let mut stream = self.bucket.open_upload_stream(path).with_options(opts).await?;
468 stream.write_all(&options.data[..]).await?;
469 stream.close().await.map_err(From::from)
470 }
471}
472
473