mongodb_gridfs/bucket/
download.rs

1use crate::{bucket::GridFSBucket, GridFSError};
2use bson::{doc, oid::ObjectId, Document};
3#[cfg(feature = "async-std-runtime")]
4use futures::{Stream, StreamExt};
5use mongodb::options::{FindOneOptions, FindOptions, SelectionCriteria};
6#[cfg(any(feature = "default", feature = "tokio-runtime"))]
7use tokio_stream::{Stream, StreamExt};
8
9impl GridFSBucket {
10    /// Opens a Stream from which the application can read the contents of the stored file
11    /// specified by @id.
12    /// [Spec](https://github.com/mongodb/specifications/blob/master/source/gridfs/gridfs-spec.rst#file-download)
13    ///
14    /// Returns a [`Stream`].
15    ///
16    /// # Examples
17    ///
18    ///  ```rust
19    ///  # #[cfg(feature = "async-std-runtime")]
20    ///  # use futures::stream::StreamExt;
21    ///  # #[cfg(any(feature = "default", feature = "tokio-runtime"))]
22    ///  use tokio_stream::StreamExt;
23    ///  # use mongodb::Client;
24    ///  # use mongodb::Database;
25    ///  use mongodb_gridfs::{options::GridFSBucketOptions, GridFSBucket, GridFSError};
26    ///  # use uuid::Uuid;
27    ///  # fn db_name_new() -> String {
28    ///  #     "test_".to_owned()
29    ///  #         + Uuid::new_v4()
30    ///  #             .hyphenated()
31    ///  #             .encode_lower(&mut Uuid::encode_buffer())
32    ///  # }
33    ///  #
34    ///  # #[tokio::main]
35    ///  # async fn main() -> Result<(), GridFSError> {
36    ///  #     let client = Client::with_uri_str(
37    ///  #         &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
38    ///  #     )
39    ///  #     .await?;
40    ///  #     let dbname = db_name_new();
41    ///  #     let db: Database = client.database(&dbname);
42    ///  let bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
43    ///  #     let id = bucket
44    ///  #         .clone()
45    ///  #         .upload_from_stream("test.txt", "test data".as_bytes(), None)
46    ///  #         .await?;
47    ///  #     println!("{}", id);
48    ///  #
49    ///  let (mut cursor, filename) = bucket.open_download_stream_with_filename(id).await?;
50    ///  assert_eq!(filename, "test.txt");
51    ///  let buffer = cursor.next().await.unwrap();
52    ///  #     println!("{:?}", buffer);
53    ///  #
54    ///  #     db.drop(None).await?;
55    ///  #     Ok(())
56    ///  # }
57    ///  ```
58    ///
59    ///  # Errors
60    ///
61    ///  Raise [`GridFSError::FileNotFound`] when the requested id doesn't exists.
62    ///
63    pub async fn open_download_stream_with_filename(
64        &self,
65        id: ObjectId,
66    ) -> Result<(impl Stream<Item = Vec<u8>>, String), GridFSError> {
67        let dboptions = self.options.clone().unwrap_or_default();
68        let bucket_name = dboptions.bucket_name;
69        let file_collection = bucket_name.clone() + ".files";
70        let files = self.db.collection::<Document>(&file_collection);
71        let chunk_collection = bucket_name + ".chunks";
72        let chunks = self.db.collection::<Document>(&chunk_collection);
73
74        let mut find_one_options = FindOneOptions::default();
75        let mut find_options = FindOptions::builder().sort(doc! {"n":1}).build();
76
77        if let Some(read_concern) = dboptions.read_concern {
78            find_one_options.read_concern = Some(read_concern.clone());
79            find_options.read_concern = Some(read_concern);
80        }
81        if let Some(read_preference) = dboptions.read_preference {
82            find_one_options.selection_criteria =
83                Some(SelectionCriteria::ReadPreference(read_preference.clone()));
84            find_options.selection_criteria =
85                Some(SelectionCriteria::ReadPreference(read_preference));
86        }
87
88        /*
89        Drivers must first retrieve the files collection document for this
90        file. If there is no files collection document, the file either never
91        existed, is in the process of being deleted, or has been corrupted,
92        and the driver MUST raise an error.
93        */
94        let file = files.find_one(doc! {"_id":id}, find_one_options).await?;
95
96        if let Some(file) = file {
97            let filename = file.get_str("filename").unwrap().to_string();
98            let stream = chunks
99                .find(doc! {"files_id":id}, find_options.clone())
100                .await
101                .unwrap()
102                .map(|item| {
103                    let i = item.unwrap();
104                    i.get_binary_generic("data").unwrap().clone()
105                });
106            Ok((stream, filename))
107        } else {
108            Err(GridFSError::FileNotFound())
109        }
110    }
111
112    /**
113     Opens a Stream from which the application can read the contents of the stored file
114     specified by @id.
115     [Spec](https://github.com/mongodb/specifications/blob/master/source/gridfs/gridfs-spec.rst#file-download)
116
117     Returns a [`Stream`].
118
119     # Examples
120
121     ```rust
122     # #[cfg(feature = "async-std-runtime")]
123     # use futures::stream::StreamExt;
124     # #[cfg(any(feature = "default", feature = "tokio-runtime"))]
125     use tokio_stream::StreamExt;
126     # use mongodb::Client;
127     # use mongodb::Database;
128     use mongodb_gridfs::{options::GridFSBucketOptions, GridFSBucket, GridFSError};
129     # use uuid::Uuid;
130     # fn db_name_new() -> String {
131     #     "test_".to_owned()
132     #         + Uuid::new_v4()
133     #             .hyphenated()
134     #             .encode_lower(&mut Uuid::encode_buffer())
135     # }
136     #
137     # #[tokio::main]
138     # async fn main() -> Result<(), GridFSError> {
139     #     let client = Client::with_uri_str(
140     #         &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
141     #     )
142     #     .await?;
143     #     let dbname = db_name_new();
144     #     let db: Database = client.database(&dbname);
145     let bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
146     #     let id = bucket
147     #         .clone()
148     #         .upload_from_stream("test.txt", "test data".as_bytes(), None)
149     #         .await?;
150     #     println!("{}", id);
151     #
152     let mut cursor = bucket.open_download_stream(id).await?;
153     let buffer = cursor.next().await.unwrap();
154     #     println!("{:?}", buffer);
155     #
156     #     db.drop(None).await?;
157     #     Ok(())
158     # }
159     ```
160
161     # Errors
162
163     Raise [`GridFSError::FileNotFound`] when the requested id doesn't exists.
164    */
165    pub async fn open_download_stream(
166        &self,
167        id: ObjectId,
168    ) -> Result<impl Stream<Item = Vec<u8>>, GridFSError> {
169        let (stream, _) = self.open_download_stream_with_filename(id).await?;
170        Ok(stream)
171    }
172}
173
174#[cfg(test)]
175mod tests {
176    use super::GridFSBucket;
177    use crate::{options::GridFSBucketOptions, GridFSError};
178    use bson::oid::ObjectId;
179    #[cfg(feature = "async-std-runtime")]
180    use futures::stream::StreamExt;
181    use mongodb::{Client, Database};
182    #[cfg(any(feature = "default", feature = "tokio-runtime"))]
183    use tokio_stream::StreamExt;
184    use uuid::Uuid;
185
186    fn db_name_new() -> String {
187        "test_".to_owned()
188            + Uuid::new_v4()
189                .hyphenated()
190                .encode_lower(&mut Uuid::encode_buffer())
191    }
192
193    #[tokio::test]
194    async fn open_download_stream() -> Result<(), GridFSError> {
195        let client = Client::with_uri_str(
196            &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
197        )
198        .await?;
199        let dbname = db_name_new();
200        let db: Database = client.database(&dbname);
201        let bucket = &GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
202        let id = bucket
203            .clone()
204            .upload_from_stream("test.txt", "test data".as_bytes(), None)
205            .await?;
206
207        assert_eq!(id.to_hex(), id.to_hex());
208
209        let mut cursor = bucket.open_download_stream(id).await?;
210        let buffer = cursor.next().await.unwrap();
211        assert_eq!(buffer, [116, 101, 115, 116, 32, 100, 97, 116, 97]);
212        db.drop(None).await?;
213        Ok(())
214    }
215    #[tokio::test]
216    async fn open_download_stream_chunk_size() -> Result<(), GridFSError> {
217        let client = Client::with_uri_str(
218            &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
219        )
220        .await?;
221        let dbname = db_name_new();
222        let db: Database = client.database(&dbname);
223        let bucket = &GridFSBucket::new(
224            db.clone(),
225            Some(GridFSBucketOptions::builder().chunk_size_bytes(4).build()),
226        );
227        let id = bucket
228            .clone()
229            .upload_from_stream("test.txt", "test data".as_bytes(), None)
230            .await?;
231
232        assert_eq!(id.to_hex(), id.to_hex());
233
234        let mut cursor = bucket.open_download_stream(id).await?;
235        let buffer = cursor.next().await.unwrap();
236        assert_eq!(buffer, [116, 101, 115, 116]);
237
238        let buffer = cursor.next().await.unwrap();
239        assert_eq!(buffer, [32, 100, 97, 116]);
240
241        let buffer = cursor.next().await.unwrap();
242        assert_eq!(buffer, [97]);
243
244        let buffer = cursor.next().await;
245        assert_eq!(buffer, None);
246
247        db.drop(None).await?;
248        Ok(())
249    }
250
251    #[tokio::test]
252    async fn open_download_stream_not_existing_file() -> Result<(), GridFSError> {
253        let client = Client::with_uri_str(
254            &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
255        )
256        .await?;
257        let dbname = db_name_new();
258        let db: Database = client.database(&dbname);
259        let bucket = &GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
260        let id = ObjectId::new();
261
262        let cursor = bucket.open_download_stream(id).await;
263        assert!(cursor.is_err());
264
265        db.drop(None).await?;
266        Ok(())
267    }
268}