mongodb_gridfs/bucket/download.rs
1use crate::{bucket::GridFSBucket, GridFSError};
2use bson::{doc, oid::ObjectId, Document};
3#[cfg(feature = "async-std-runtime")]
4use futures::{Stream, StreamExt};
5use mongodb::options::{FindOneOptions, FindOptions, SelectionCriteria};
6#[cfg(any(feature = "default", feature = "tokio-runtime"))]
7use tokio_stream::{Stream, StreamExt};
8
9impl GridFSBucket {
10 /// Opens a Stream from which the application can read the contents of the stored file
11 /// specified by @id.
12 /// [Spec](https://github.com/mongodb/specifications/blob/master/source/gridfs/gridfs-spec.rst#file-download)
13 ///
14 /// Returns a [`Stream`].
15 ///
16 /// # Examples
17 ///
18 /// ```rust
19 /// # #[cfg(feature = "async-std-runtime")]
20 /// # use futures::stream::StreamExt;
21 /// # #[cfg(any(feature = "default", feature = "tokio-runtime"))]
22 /// use tokio_stream::StreamExt;
23 /// # use mongodb::Client;
24 /// # use mongodb::Database;
25 /// use mongodb_gridfs::{options::GridFSBucketOptions, GridFSBucket, GridFSError};
26 /// # use uuid::Uuid;
27 /// # fn db_name_new() -> String {
28 /// # "test_".to_owned()
29 /// # + Uuid::new_v4()
30 /// # .hyphenated()
31 /// # .encode_lower(&mut Uuid::encode_buffer())
32 /// # }
33 /// #
34 /// # #[tokio::main]
35 /// # async fn main() -> Result<(), GridFSError> {
36 /// # let client = Client::with_uri_str(
37 /// # &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
38 /// # )
39 /// # .await?;
40 /// # let dbname = db_name_new();
41 /// # let db: Database = client.database(&dbname);
42 /// let bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
43 /// # let id = bucket
44 /// # .clone()
45 /// # .upload_from_stream("test.txt", "test data".as_bytes(), None)
46 /// # .await?;
47 /// # println!("{}", id);
48 /// #
49 /// let (mut cursor, filename) = bucket.open_download_stream_with_filename(id).await?;
50 /// assert_eq!(filename, "test.txt");
51 /// let buffer = cursor.next().await.unwrap();
52 /// # println!("{:?}", buffer);
53 /// #
54 /// # db.drop(None).await?;
55 /// # Ok(())
56 /// # }
57 /// ```
58 ///
59 /// # Errors
60 ///
61 /// Raise [`GridFSError::FileNotFound`] when the requested id doesn't exists.
62 ///
63 pub async fn open_download_stream_with_filename(
64 &self,
65 id: ObjectId,
66 ) -> Result<(impl Stream<Item = Vec<u8>>, String), GridFSError> {
67 let dboptions = self.options.clone().unwrap_or_default();
68 let bucket_name = dboptions.bucket_name;
69 let file_collection = bucket_name.clone() + ".files";
70 let files = self.db.collection::<Document>(&file_collection);
71 let chunk_collection = bucket_name + ".chunks";
72 let chunks = self.db.collection::<Document>(&chunk_collection);
73
74 let mut find_one_options = FindOneOptions::default();
75 let mut find_options = FindOptions::builder().sort(doc! {"n":1}).build();
76
77 if let Some(read_concern) = dboptions.read_concern {
78 find_one_options.read_concern = Some(read_concern.clone());
79 find_options.read_concern = Some(read_concern);
80 }
81 if let Some(read_preference) = dboptions.read_preference {
82 find_one_options.selection_criteria =
83 Some(SelectionCriteria::ReadPreference(read_preference.clone()));
84 find_options.selection_criteria =
85 Some(SelectionCriteria::ReadPreference(read_preference));
86 }
87
88 /*
89 Drivers must first retrieve the files collection document for this
90 file. If there is no files collection document, the file either never
91 existed, is in the process of being deleted, or has been corrupted,
92 and the driver MUST raise an error.
93 */
94 let file = files.find_one(doc! {"_id":id}, find_one_options).await?;
95
96 if let Some(file) = file {
97 let filename = file.get_str("filename").unwrap().to_string();
98 let stream = chunks
99 .find(doc! {"files_id":id}, find_options.clone())
100 .await
101 .unwrap()
102 .map(|item| {
103 let i = item.unwrap();
104 i.get_binary_generic("data").unwrap().clone()
105 });
106 Ok((stream, filename))
107 } else {
108 Err(GridFSError::FileNotFound())
109 }
110 }
111
112 /**
113 Opens a Stream from which the application can read the contents of the stored file
114 specified by @id.
115 [Spec](https://github.com/mongodb/specifications/blob/master/source/gridfs/gridfs-spec.rst#file-download)
116
117 Returns a [`Stream`].
118
119 # Examples
120
121 ```rust
122 # #[cfg(feature = "async-std-runtime")]
123 # use futures::stream::StreamExt;
124 # #[cfg(any(feature = "default", feature = "tokio-runtime"))]
125 use tokio_stream::StreamExt;
126 # use mongodb::Client;
127 # use mongodb::Database;
128 use mongodb_gridfs::{options::GridFSBucketOptions, GridFSBucket, GridFSError};
129 # use uuid::Uuid;
130 # fn db_name_new() -> String {
131 # "test_".to_owned()
132 # + Uuid::new_v4()
133 # .hyphenated()
134 # .encode_lower(&mut Uuid::encode_buffer())
135 # }
136 #
137 # #[tokio::main]
138 # async fn main() -> Result<(), GridFSError> {
139 # let client = Client::with_uri_str(
140 # &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
141 # )
142 # .await?;
143 # let dbname = db_name_new();
144 # let db: Database = client.database(&dbname);
145 let bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
146 # let id = bucket
147 # .clone()
148 # .upload_from_stream("test.txt", "test data".as_bytes(), None)
149 # .await?;
150 # println!("{}", id);
151 #
152 let mut cursor = bucket.open_download_stream(id).await?;
153 let buffer = cursor.next().await.unwrap();
154 # println!("{:?}", buffer);
155 #
156 # db.drop(None).await?;
157 # Ok(())
158 # }
159 ```
160
161 # Errors
162
163 Raise [`GridFSError::FileNotFound`] when the requested id doesn't exists.
164 */
165 pub async fn open_download_stream(
166 &self,
167 id: ObjectId,
168 ) -> Result<impl Stream<Item = Vec<u8>>, GridFSError> {
169 let (stream, _) = self.open_download_stream_with_filename(id).await?;
170 Ok(stream)
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use super::GridFSBucket;
177 use crate::{options::GridFSBucketOptions, GridFSError};
178 use bson::oid::ObjectId;
179 #[cfg(feature = "async-std-runtime")]
180 use futures::stream::StreamExt;
181 use mongodb::{Client, Database};
182 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
183 use tokio_stream::StreamExt;
184 use uuid::Uuid;
185
186 fn db_name_new() -> String {
187 "test_".to_owned()
188 + Uuid::new_v4()
189 .hyphenated()
190 .encode_lower(&mut Uuid::encode_buffer())
191 }
192
193 #[tokio::test]
194 async fn open_download_stream() -> Result<(), GridFSError> {
195 let client = Client::with_uri_str(
196 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
197 )
198 .await?;
199 let dbname = db_name_new();
200 let db: Database = client.database(&dbname);
201 let bucket = &GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
202 let id = bucket
203 .clone()
204 .upload_from_stream("test.txt", "test data".as_bytes(), None)
205 .await?;
206
207 assert_eq!(id.to_hex(), id.to_hex());
208
209 let mut cursor = bucket.open_download_stream(id).await?;
210 let buffer = cursor.next().await.unwrap();
211 assert_eq!(buffer, [116, 101, 115, 116, 32, 100, 97, 116, 97]);
212 db.drop(None).await?;
213 Ok(())
214 }
215 #[tokio::test]
216 async fn open_download_stream_chunk_size() -> Result<(), GridFSError> {
217 let client = Client::with_uri_str(
218 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
219 )
220 .await?;
221 let dbname = db_name_new();
222 let db: Database = client.database(&dbname);
223 let bucket = &GridFSBucket::new(
224 db.clone(),
225 Some(GridFSBucketOptions::builder().chunk_size_bytes(4).build()),
226 );
227 let id = bucket
228 .clone()
229 .upload_from_stream("test.txt", "test data".as_bytes(), None)
230 .await?;
231
232 assert_eq!(id.to_hex(), id.to_hex());
233
234 let mut cursor = bucket.open_download_stream(id).await?;
235 let buffer = cursor.next().await.unwrap();
236 assert_eq!(buffer, [116, 101, 115, 116]);
237
238 let buffer = cursor.next().await.unwrap();
239 assert_eq!(buffer, [32, 100, 97, 116]);
240
241 let buffer = cursor.next().await.unwrap();
242 assert_eq!(buffer, [97]);
243
244 let buffer = cursor.next().await;
245 assert_eq!(buffer, None);
246
247 db.drop(None).await?;
248 Ok(())
249 }
250
251 #[tokio::test]
252 async fn open_download_stream_not_existing_file() -> Result<(), GridFSError> {
253 let client = Client::with_uri_str(
254 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
255 )
256 .await?;
257 let dbname = db_name_new();
258 let db: Database = client.database(&dbname);
259 let bucket = &GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
260 let id = ObjectId::new();
261
262 let cursor = bucket.open_download_stream(id).await;
263 assert!(cursor.is_err());
264
265 db.drop(None).await?;
266 Ok(())
267 }
268}