remi_gridfs/
service.rs

1// ๐Ÿปโ€โ„๏ธ๐Ÿงถ remi-rs: Asynchronous Rust crate to handle communication between applications and object storage providers
2// Copyright (c) 2022-2025 Noelware, LLC. <team@noelware.org>
3//
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10//
11// The above copyright notice and this permission notice shall be included in all
12// copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20// SOFTWARE.
21
22use crate::StorageConfig;
23use async_trait::async_trait;
24use bytes::{Bytes, BytesMut};
25use futures_util::{AsyncWriteExt, StreamExt};
26use mongodb::{
27    bson::{doc, raw::ValueAccessErrorKind, Bson, Document, RawDocument},
28    gridfs::GridFsBucket,
29    options::GridFsUploadOptions,
30    Client, Database,
31};
32use remi::{Blob, File, ListBlobsRequest, UploadRequest};
33use std::{borrow::Cow, collections::HashMap, io, path::Path};
34use tokio_util::{compat::FuturesAsyncReadCompatExt, io::ReaderStream};
35
36fn value_access_err_to_error(error: mongodb::bson::raw::ValueAccessError) -> mongodb::error::Error {
37    match error.kind {
38        ValueAccessErrorKind::NotPresent => {
39            mongodb::error::Error::custom(format!("key [{}] was not found", error.key()))
40        }
41
42        ValueAccessErrorKind::UnexpectedType { expected, actual, .. } => mongodb::error::Error::custom(format!(
43            "expected BSON type '{expected:?}', actual type for key [{}] is '{actual:?}'",
44            error.key()
45        )),
46
47        ValueAccessErrorKind::InvalidBson(err) => err.into(),
48        _ => unimplemented!(
49            "`ValueAccessErrorKind` was unhandled, please report it: https://github.com/Noelware/remi-rs/issues/new"
50        ),
51    }
52}
53
54fn document_to_blob(bytes: Bytes, doc: &RawDocument) -> Result<File, mongodb::error::Error> {
55    let filename = doc.get_str("filename").map_err(value_access_err_to_error)?;
56    let length = doc.get_i64("length").map_err(value_access_err_to_error)?;
57    let created_at = doc.get_datetime("uploadDate").map_err(value_access_err_to_error)?;
58    let metadata = doc.get_document("metadata").map_err(value_access_err_to_error)?;
59
60    let content_type = match metadata.get_str("contentType") {
61        Ok(res) => Some(res),
62        Err(e) => match e.kind {
63            ValueAccessErrorKind::NotPresent => match metadata.get_str("contentType") {
64                Ok(res) => Some(res),
65                Err(e) => return Err(value_access_err_to_error(e)),
66            },
67            _ => return Err(value_access_err_to_error(e)),
68        },
69    };
70
71    // Convert `doc` into a HashMap that doesn't contain the properties we expect
72    // in a GridFS object.
73    //
74    // For brevity and compatibility with other storage services, we only use strings
75    // when including metadata.
76    let mut map = HashMap::new();
77    for ref_ in metadata.into_iter() {
78        let (name, doc) = ref_?;
79        if name != "contentType" {
80            if let Some(s) = doc.as_str() {
81                map.insert(name.into(), s.into());
82            }
83        }
84    }
85
86    Ok(File {
87        last_modified_at: None,
88        content_type: content_type.map(String::from),
89        metadata: map,
90        created_at: if created_at.timestamp_millis() < 0 {
91            #[cfg(feature = "tracing")]
92            ::tracing::warn!(%filename, "`created_at` timestamp was negative");
93
94            #[cfg(feature = "log")]
95            ::log::warn!("`created_at` for file {filename} was negative");
96
97            None
98        } else {
99            Some(
100                u128::try_from(created_at.timestamp_millis())
101                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
102            )
103        },
104
105        is_symlink: false,
106        data: bytes,
107        name: filename.to_owned(),
108        path: format!("gridfs://{filename}"),
109        size: if length < 0 {
110            0
111        } else {
112            length
113                .try_into()
114                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?
115        },
116    })
117}
118
119fn resolve_path(path: &Path) -> Result<String, mongodb::error::Error> {
120    let path = path.to_str().ok_or_else(|| {
121        <mongodb::error::Error as From<io::Error>>::from(io::Error::new(
122            io::ErrorKind::InvalidData,
123            "expected valid utf-8 string",
124        ))
125    })?;
126
127    // trim `./` and `~/` since Gridfs doesn't accept ./ or ~/ as valid paths
128    let path = path.trim_start_matches("~/").trim_start_matches("./");
129
130    Ok(path.to_owned())
131}
132
133#[derive(Debug, Clone)]
134pub struct StorageService {
135    config: Option<StorageConfig>,
136    bucket: GridFsBucket,
137}
138
139impl StorageService {
140    /// Creates a new [`StorageService`] which uses the [`StorageConfig`] as a way to create
141    /// the inner [`GridFsBucket`].
142    pub fn new(db: Database, config: StorageConfig) -> StorageService {
143        let bucket = db.gridfs_bucket(Some(config.clone().into()));
144        StorageService {
145            config: Some(config),
146            bucket,
147        }
148    }
149
150    /// Return a new [`StorageService`] from a constructed [`Client`].
151    pub fn from_client(client: &Client, config: StorageConfig) -> StorageService {
152        Self::new(
153            client.database(&config.clone().database.unwrap_or(String::from("mydb"))),
154            config,
155        )
156    }
157
158    /// Creates a MongoDB client from a connection string and creates a new [`StorageService`] interface.
159    pub async fn from_conn_string<C: AsRef<str>>(
160        conn_string: C,
161        config: StorageConfig,
162    ) -> Result<StorageService, mongodb::error::Error> {
163        let client = Client::with_uri_str(conn_string).await?;
164        Ok(Self::from_client(&client, config))
165    }
166
167    /// Uses a preconfigured [`GridFsBucket`] as the underlying bucket.
168    pub fn with_bucket(bucket: GridFsBucket) -> StorageService {
169        StorageService { config: None, bucket }
170    }
171
172    fn resolve_path<P: AsRef<Path>>(&self, path: P) -> Result<String, mongodb::error::Error> {
173        resolve_path(path.as_ref())
174    }
175}
176
177#[async_trait]
178impl remi::StorageService for StorageService {
179    type Error = mongodb::error::Error;
180
181    fn name(&self) -> Cow<'static, str> {
182        Cow::Borrowed("remi:gridfs")
183    }
184
185    #[cfg_attr(
186        feature = "tracing",
187        tracing::instrument(
188            name = "remi.gridfs.open",
189            skip_all,
190            fields(
191                remi.service = "gridfs",
192                path = %path.as_ref().display()
193            )
194        )
195    )]
196    async fn open<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Bytes>, Self::Error> {
197        let path = self.resolve_path(path)?;
198
199        #[cfg(feature = "tracing")]
200        ::tracing::info!(file = %path, "opening file");
201
202        #[cfg(feature = "log")]
203        ::log::info!("opening file [{}]", path);
204
205        let mut cursor = self.bucket.find(doc! { "filename": &path }).await?;
206        let advanced = cursor.advance().await?;
207        if !advanced {
208            #[cfg(feature = "tracing")]
209            ::tracing::warn!(
210                file = %path,
211                "file doesn't exist in GridFS"
212            );
213
214            #[cfg(feature = "log")]
215            ::log::warn!("file [{}] doesn't exist in GridFS", path);
216
217            return Ok(None);
218        }
219
220        let doc = cursor.current();
221        let stream = self
222            .bucket
223            .open_download_stream(Bson::ObjectId(
224                doc.get_object_id("_id").map_err(value_access_err_to_error)?,
225            ))
226            .await?;
227
228        let mut bytes = BytesMut::new();
229        let mut reader = ReaderStream::new(stream.compat());
230        while let Some(raw) = reader.next().await {
231            match raw {
232                Ok(b) => bytes.extend(b),
233                Err(e) => return Err(e.into()),
234            }
235        }
236
237        Ok(Some(bytes.into()))
238    }
239
240    #[cfg_attr(
241        feature = "tracing",
242        tracing::instrument(
243            name = "remi.gridfs.blob",
244            skip_all,
245            fields(
246                remi.service = "gridfs",
247                path = %path.as_ref().display()
248            )
249        )
250    )]
251    async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> Result<Option<Blob>, Self::Error> {
252        let path = self.resolve_path(path)?;
253        let Some(bytes) = self.open(&path).await? else {
254            return Ok(None);
255        };
256
257        #[cfg(feature = "tracing")]
258        ::tracing::info!(
259            file = %path,
260            "getting file metadata for file"
261        );
262
263        #[cfg(feature = "log")]
264        ::log::info!("getting file metadata for file [{}]", path);
265
266        let mut cursor = self
267            .bucket
268            .find(doc! {
269                "filename": &path,
270            })
271            .await?;
272
273        // has_advanced returns false if there is no entries that have that filename
274        let has_advanced = cursor.advance().await?;
275        if !has_advanced {
276            #[cfg(feature = "tracing")]
277            ::tracing::warn!(file = %path, "file doesn't exist");
278
279            #[cfg(feature = "log")]
280            ::log::warn!("file [{}] doesn't exist", path);
281
282            return Ok(None);
283        }
284
285        let doc = cursor.current();
286        document_to_blob(bytes, doc).map(|doc| Some(Blob::File(doc)))
287    }
288
289    #[cfg_attr(
290        feature = "tracing",
291        tracing::instrument(
292            name = "remi.gridfs.blobs",
293            skip_all,
294            fields(
295                remi.service = "gridfs"
296            )
297        )
298    )]
299    async fn blobs<P: AsRef<Path> + Send>(
300        &self,
301        path: Option<P>,
302        _request: Option<ListBlobsRequest>,
303    ) -> Result<Vec<Blob>, Self::Error> {
304        // TODO(@auguwu): support filtering files, for now we should probably
305        // heavily test this
306        #[allow(unused)]
307        if let Some(path) = path {
308            #[cfg(feature = "tracing")]
309            ::tracing::warn!(
310                file = %path.as_ref().display(),
311                "using blobs() with a given file name is not supported",
312            );
313
314            #[cfg(feature = "log")]
315            ::log::warn!(
316                "using blobs() with a given file name [{}] is not supported",
317                path.as_ref().display()
318            );
319
320            return Ok(vec![]);
321        }
322
323        let mut cursor = self.bucket.find(doc!()).await?;
324        let mut blobs = vec![];
325        while cursor.advance().await? {
326            let doc = cursor.current();
327            let stream = self
328                .bucket
329                .open_download_stream(Bson::ObjectId(
330                    doc.get_object_id("_id").map_err(value_access_err_to_error)?,
331                ))
332                .await?;
333
334            let mut bytes = BytesMut::new();
335            let mut reader = ReaderStream::new(stream.compat());
336            while let Some(raw) = reader.next().await {
337                match raw {
338                    Ok(b) => bytes.extend(b),
339                    Err(e) => return Err(e.into()),
340                }
341            }
342
343            match document_to_blob(bytes.into(), doc) {
344                Ok(blob) => blobs.push(Blob::File(blob)),
345
346                #[cfg(any(feature = "tracing", feature = "log"))]
347                Err(e) => {
348                    #[cfg(feature = "tracing")]
349                    ::tracing::error!(error = %e, "unable to convert to a file");
350
351                    #[cfg(feature = "log")]
352                    ::log::error!("unable to convert to a file: {e}");
353                }
354
355                #[cfg(not(any(feature = "tracing", feature = "log")))]
356                Err(_e) => {}
357            }
358        }
359
360        Ok(blobs)
361    }
362
363    #[cfg_attr(
364        feature = "tracing",
365        tracing::instrument(
366            name = "remi.gridfs.delete",
367            skip_all,
368            fields(
369                remi.service = "gridfs",
370                path = %path.as_ref().display()
371            )
372        )
373    )]
374    async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> Result<(), Self::Error> {
375        let path = self.resolve_path(path)?;
376
377        #[cfg(feature = "tracing")]
378        ::tracing::info!(file = %path, "deleting file");
379
380        #[cfg(feature = "log")]
381        ::log::info!("deleting file [{}]", path);
382
383        let mut cursor = self
384            .bucket
385            .find(doc! {
386                "filename": &path,
387            })
388            .await?;
389
390        // has_advanced returns false if there is no entries that have that filename
391        let has_advanced = cursor.advance().await?;
392        if !has_advanced {
393            #[cfg(feature = "tracing")]
394            ::tracing::warn!(file = %path, "file doesn't exist");
395
396            #[cfg(feature = "log")]
397            ::log::warn!("file [{}] doesn't exist", path);
398
399            return Ok(());
400        }
401
402        let doc = cursor.current();
403        let oid = doc.get_object_id("_id").map_err(value_access_err_to_error)?;
404
405        self.bucket.delete(Bson::ObjectId(oid)).await
406    }
407
408    #[cfg_attr(
409        feature = "tracing",
410        tracing::instrument(
411            name = "remi.gridfs.exists",
412            skip_all,
413            fields(
414                remi.service = "gridfs",
415                path = %path.as_ref().display()
416            )
417        )
418    )]
419    async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> Result<bool, Self::Error> {
420        match self.open(path).await {
421            Ok(Some(_)) => Ok(true),
422            Ok(None) => Ok(false),
423            Err(e) => Err(e),
424        }
425    }
426
427    #[cfg_attr(
428        feature = "tracing",
429        tracing::instrument(
430            name = "remi.gridfs.blob",
431            skip_all,
432            fields(
433                remi.service = "gridfs",
434                path = %path.as_ref().display()
435            )
436        )
437    )]
438    async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> Result<(), Self::Error> {
439        let path = self.resolve_path(path)?;
440
441        #[cfg(feature = "tracing")]
442        ::tracing::info!(
443            file = %path,
444            "uploading file to GridFS..."
445        );
446
447        #[cfg(feature = "log")]
448        ::log::info!("uploading file [{}] to GridFS", path);
449
450        let mut metadata = options
451            .metadata
452            .into_iter()
453            .map(|(key, value)| (key, Bson::String(value)))
454            .collect::<Document>();
455
456        if let Some(ct) = options.content_type {
457            metadata.insert("contentType", ct);
458        }
459
460        let opts = GridFsUploadOptions::builder()
461            .chunk_size_bytes(Some(
462                self.config.clone().unwrap_or_default().chunk_size.unwrap_or(255 * 1024),
463            ))
464            .metadata(metadata)
465            .build();
466
467        let mut stream = self.bucket.open_upload_stream(path).with_options(opts).await?;
468        stream.write_all(&options.data[..]).await?;
469        stream.close().await.map_err(From::from)
470    }
471}
472
473// #[cfg(test)]
474// #[cfg_attr(not(target_os = "linux"), allow(dead_code))]
475// mod tests {
476//     use crate::service::resolve_path;
477//     use remi::{StorageService, UploadRequest};
478//     use std::path::Path;
479//     use testcontainers::{runners::AsyncRunner, GenericImage};
480//     use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
481
482//     const IMAGE: &str = "mongo";
483
484//     // renovate: image="mongo"
485//     const TAG: &str = "7.0.9";
486
487//     fn container() -> GenericImage {
488//         GenericImage::new(IMAGE, TAG)
489//     }
490
491//     #[test]
492//     fn test_resolve_paths() {
493//         assert_eq!(resolve_path(Path::new("./weow.txt")).unwrap(), String::from("weow.txt"));
494//         assert_eq!(resolve_path(Path::new("~/weow.txt")).unwrap(), String::from("weow.txt"));
495//         assert_eq!(resolve_path(Path::new("weow.txt")).unwrap(), String::from("weow.txt"));
496//         assert_eq!(
497//             resolve_path(Path::new("~/weow/fluff/mooo.exe")).unwrap(),
498//             String::from("weow/fluff/mooo.exe")
499//         );
500//     }
501
502//     macro_rules! build_testcases {
503//         (
504//             $(
505//                 $(#[$meta:meta])*
506//                 async fn $name:ident($storage:ident) $code:block
507//             )*
508//         ) => {
509//             $(
510//                 #[cfg_attr(target_os = "linux", tokio::test)]
511//                 #[cfg_attr(not(target_os = "linux"), ignore = "`mongo` image can be only used on Linux")]
512//                 $(#[$meta])*
513//                 async fn $name() {
514//                     if ::bollard::Docker::connect_with_defaults().is_err() {
515//                         eprintln!("[remi-gridfs] `docker` cannot be probed by default settings; skipping test");
516//                         return;
517//                     }
518
519//                     let _guard = tracing_subscriber::registry()
520//                         .with(tracing_subscriber::fmt::layer())
521//                         .set_default();
522
523//                     let container = container().start().await.expect("failed to start container");
524//                     let $storage = crate::StorageService::from_conn_string(
525//                         format!(
526//                             "mongodb://{}:{}",
527//                             container.get_host().await.expect("failed to get host ip"),
528//                             container.get_host_port_ipv4(27017).await.expect("failed to get port mapping: 27017")
529//                         ),
530//                         $crate::StorageConfig {
531//                             database: Some(String::from("remi")),
532//                             bucket: String::from("fs"),
533
534//                             ..Default::default()
535//                         }
536//                     ).await.expect("failed to create storage service");
537
538//                     ($storage).init().await.expect("failed to initialize storage service");
539
540//                     let __ret = $code;
541//                     __ret
542//                 }
543//             )*
544//         };
545//     }
546
547//     build_testcases! {
548//         async fn prepare_mongo_container_usage(_storage) {}
549
550//         async fn test_uploading_file(storage) {
551//             let contents: remi::Bytes = "{\"wuff\":true}".into();
552//             storage.upload("./wuff.json", UploadRequest::default()
553//                 .with_content_type(Some("application/json"))
554//                 .with_data(contents.clone())
555//             ).await.expect("failed to upload");
556
557//             assert!(storage.exists("./wuff.json").await.expect("failed to query ./wuff.json"));
558//             assert_eq!(contents, storage.open("./wuff.json").await.expect("failed to open ./wuff.json").expect("it should exist"));
559//         }
560
561//         async fn list_blobs(storage) {
562//             for i in 0..100 {
563//                 let contents: remi::Bytes = format!("{{\"blob\":{i}}}").into();
564//                 storage.upload(format!("./wuff.{i}.json"), UploadRequest::default()
565//                     .with_content_type(Some("application/json"))
566//                     .with_data(contents)
567//                 ).await.expect("failed to upload blob");
568//             }
569
570//             let blobs = storage.blobs(None::<&str>, None).await.expect("failed to list all blobs");
571//             let mut iter = blobs.iter().filter_map(|x| match x {
572//                 remi::Blob::File(file) => Some(file),
573//                 _ => None
574//             });
575
576//             assert!(iter.all(|x|
577//                 x.content_type == Some(String::from("application/json")) &&
578//                 !x.is_symlink &&
579//                 x.data.starts_with(&[/* b"{" */ 123])
580//             ));
581//         }
582
583//         async fn query_single_blob(storage) {
584//             for i in 0..100 {
585//                 let contents: remi::Bytes = format!("{{\"blob\":{i}}}").into();
586//                 storage.upload(format!("./wuff.{i}.json"), UploadRequest::default()
587//                     .with_content_type(Some("application/json"))
588//                     .with_data(contents)
589//                 ).await.expect("failed to upload blob");
590//             }
591
592//             assert!(storage.blob("./wuff.98.json").await.expect("failed to query single blob").is_some());
593//             assert!(storage.blob("./wuff.95.json").await.expect("failed to query single blob").is_some());
594//             assert!(storage.blob("~/doesnt/exist").await.expect("failed to query single blob").is_none());
595//         }
596//     }
597// }