1use crate::bucket::GridFSBucket;
2use crate::options::GridFSUploadOptions;
3use bson::{doc, oid::ObjectId, DateTime, Document};
4#[cfg(feature = "async-std-runtime")]
5use futures::io::{AsyncRead, AsyncReadExt};
6use md5::{Digest, Md5};
7use mongodb::{
8 error::Error,
9 options::{FindOneOptions, InsertOneOptions, UpdateOptions},
10 Collection,
11};
12#[cfg(any(feature = "default", feature = "tokio-runtime"))]
13use tokio::io::{AsyncRead, AsyncReadExt};
14
15impl GridFSBucket {
16 async fn create_files_index(&self, collection_name: &str) -> Result<Document, Error> {
17 self.db
18 .run_command(
19 doc! {
20 "createIndexes": collection_name,
21 "indexes": [
22 {
23 "key": {
24 "filename":1,
25 "uploadDate":1.0
26 },
27 "name": collection_name.to_owned()+"_index",
28 }]},
29 None,
30 )
31 .await
32 }
33
34 async fn create_chunks_index(&self, collection_name: &str) -> Result<Document, Error> {
35 self.db
36 .run_command(
37 doc! {
38 "createIndexes": collection_name,
39 "indexes": [
40 {
41 "key": {
42 "files_id":1,
43 "n":1
44 },
45 "name": collection_name.to_owned()+"_index",
46 }]},
47 None,
48 )
49 .await
50 }
51
52 async fn ensure_file_index(
55 &mut self,
56 files: &Collection<Document>,
57 file_collection: &str,
58 chunk_collection: &str,
59 ) -> Result<(), Error> {
60 if self.never_write {
61 if files
62 .find_one(
63 doc! {},
64 FindOneOptions::builder()
65 .projection(doc! { "_id": 1 })
66 .build(),
67 )
68 .await
69 .ok()
70 == Some(None)
71 {
72 {
73 let is_collection_exists = self
74 .db
75 .list_collection_names(doc! {"name":file_collection})
76 .await?;
77 if is_collection_exists.is_empty() {
78 self.db
79 .create_collection(&file_collection, None)
80 .await?
81 }
82
83 let indexes = self
84 .db
85 .run_command(doc! {"listIndexes":file_collection}, None)
86 .await?;
87 let mut have_index = false;
88 for index in indexes
89 .get_document("cursor")
90 .unwrap()
91 .get_array("firstBatch")
92 .unwrap()
93 {
94 let key = index.as_document().unwrap().get_document("key").unwrap();
95 let filename = key.get_i32("filename");
96 let upload_date = key.get_i32("uploadDate");
97 let filename_f = key.get_f64("filename");
98 let upload_date_f = key.get_f64("uploadDate");
99
100 match (filename, upload_date, filename_f, upload_date_f) {
101 (Ok(1), Ok(1), _, _) => {
102 have_index = true;
103 }
104 (_, _, Ok(x), Ok(y))
105 if (x - 1.0).abs() < 0.0001 && (y - 1.0).abs() < 0.0001 =>
106 {
107 have_index = true;
108 }
109 (Ok(1), _, _, Ok(x)) if (x - 1.0).abs() < 0.0001 => {
110 have_index = true;
111 }
112 (_, Ok(1), Ok(x), _) if (x - 1.0).abs() < 0.0001 => {
113 have_index = true;
114 }
115 _ => {}
116 }
117 }
118 if !have_index {
119 self.create_files_index(file_collection).await?;
120 }
121 }
122 {
123 let is_collection_exists = self
124 .db
125 .list_collection_names(doc! {"name":chunk_collection})
126 .await?;
127 if is_collection_exists.is_empty() {
128 self.db
129 .create_collection(&chunk_collection, None)
130 .await?
131 }
132
133 let indexes = self
134 .db
135 .run_command(doc! {"listIndexes":chunk_collection}, None)
136 .await?;
137 let mut have_index = false;
138 for index in indexes
139 .get_document("cursor")
140 .unwrap()
141 .get_array("firstBatch")
142 .unwrap()
143 {
144 let key = index.as_document().unwrap().get_document("key").unwrap();
145 let files_id = key.get_i32("files_id");
146 let n = key.get_i32("n");
147 let files_id_f = key.get_f64("files_id");
148 let n_f = key.get_f64("n");
149
150 match (files_id, n, files_id_f, n_f) {
151 (Ok(1), Ok(1), _, _) => {
152 have_index = true;
153 }
154 (_, _, Ok(x), Ok(y))
155 if (x - 1.0).abs() < 0.0001 && (y - 1.0).abs() < 0.0001 =>
156 {
157 have_index = true;
158 }
159 (Ok(1), _, _, Ok(x)) if (x - 1.0).abs() < 0.0001 => {
160 have_index = true;
161 }
162 (_, Ok(1), Ok(x), _) if (x - 1.0).abs() < 0.0001 => {
163 have_index = true;
164 }
165 _ => {}
166 }
167 }
168 if !have_index {
169 self.create_chunks_index(chunk_collection).await?;
170 }
171 }
172 }
173 self.never_write = false;
174 }
175 Ok(())
176 }
177
178 pub async fn upload_from_stream<'a>(
219 &mut self,
220 filename: &str,
221 mut source: impl AsyncRead + Unpin,
222 options: Option<GridFSUploadOptions>,
223 ) -> Result<ObjectId, Error> {
224 let dboptions = self.options.clone().unwrap_or_default();
225 let mut chunk_size: u32 = dboptions.chunk_size_bytes;
226 let bucket_name = dboptions.bucket_name;
227 let file_collection = bucket_name.clone() + ".files";
228 let disable_md5 = dboptions.disable_md5;
229 let chunk_collection = bucket_name + ".chunks";
230 let mut progress_tick = None;
231 if let Some(options) = options.clone() {
232 if let Some(chunk_size_bytes) = options.chunk_size_bytes {
233 chunk_size = chunk_size_bytes;
234 }
235 progress_tick = options.progress_tick;
236 }
237 let files = self.db.collection(&file_collection);
238
239 self.ensure_file_index(&files, &file_collection, &chunk_collection)
240 .await?;
241
242 let mut file_document = doc! {"filename":filename,
243 "chunkSize":chunk_size};
244 if let Some(options) = options {
245 if let Some(metadata) = options.metadata {
246 file_document.insert("metadata", metadata);
247 }
248 }
249 let mut insert_option = InsertOneOptions::default();
250 if let Some(write_concern) = dboptions.write_concern.clone() {
251 insert_option.write_concern = Some(write_concern);
252 }
253 let insert_file_result = files
254 .insert_one(file_document, Some(insert_option.clone()))
255 .await?;
256
257 let files_id = insert_file_result.inserted_id.as_object_id().unwrap();
258
259 let mut md5 = Md5::default();
260 let chunks = self.db.collection(&chunk_collection);
261 let mut vecbuf: Vec<u8> = vec![0; chunk_size as usize];
262 let mut length: usize = 0;
263 let mut n: u32 = 0;
264 loop {
265 let chunk_read_size = {
266 let mut chunk_read_size = 0;
267 loop {
268 let buffer = &mut vecbuf[chunk_read_size..];
269 let step_read_size = source.read(buffer).await?;
270 if step_read_size == 0 {
271 break;
272 }
273 chunk_read_size += step_read_size;
274 }
275 if chunk_read_size == 0 {
276 break;
277 }
278 chunk_read_size
279 };
280 let bin: Vec<u8> = Vec::from(&vecbuf[..chunk_read_size]);
281 md5.update(&bin);
282 chunks
283 .insert_one(
284 doc! {"files_id":files_id,
285 "n":n,
286 "data": bson::Binary{subtype: bson::spec::BinarySubtype::Generic, bytes:bin}},
287 Some(insert_option.clone()),
288 )
289 .await?;
290 length += chunk_read_size;
291 n += 1;
292 if let Some(ref progress_tick) = progress_tick {
293 progress_tick.update(length);
294 };
295 }
296
297 let mut update = doc! { "length": length as i64, "uploadDate": DateTime::now() };
298 if !disable_md5 {
299 update.insert("md5", format!("{:02x}", md5.finalize()));
300 }
301 let mut update_option = UpdateOptions::default();
302 if let Some(write_concern) = dboptions.write_concern {
303 update_option.write_concern = Some(write_concern);
304 }
305 files
306 .update_one(
307 doc! {"_id":files_id},
308 doc! {"$set":update},
309 Some(update_option),
310 )
311 .await?;
312
313 Ok(files_id)
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::GridFSBucket;
320 use crate::options::GridFSBucketOptions;
321 use bson::{doc, Document};
322 #[cfg(feature = "async-std-runtime")]
323 use futures::StreamExt;
324 use mongodb::{error::Error, Client, Database};
325 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
326 use std::io::Write;
327 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
328 use tokio_stream::StreamExt;
329 use uuid::Uuid;
330 fn db_name_new() -> String {
331 "test_".to_owned()
332 + Uuid::new_v4()
333 .hyphenated()
334 .encode_lower(&mut Uuid::encode_buffer())
335 }
336 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
337 fn generate_large_text(size: usize) -> Vec<u8> {
338 let mut buffer = Vec::new();
339 buffer.reserve(size);
340 for i in 0..size {
341 buffer.push((i % 8) as u8);
342 }
343 buffer
344 }
345
346 #[tokio::test]
347 async fn upload_from_stream() -> Result<(), Error> {
348 let client = Client::with_uri_str(
349 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
350 )
351 .await?;
352 let dbname = db_name_new();
353 let db: Database = client.database(&dbname);
354 let mut bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
355 let id = bucket
356 .upload_from_stream("test.txt", "test data".as_bytes(), None)
357 .await?;
358
359 assert_eq!(id.to_hex(), id.to_hex());
360 let file = db
361 .collection::<Document>("fs.files")
362 .find_one(doc! { "_id": id }, None)
363 .await?
364 .unwrap();
365 assert_eq!(file.get_str("filename").unwrap(), "test.txt");
366 assert_eq!(file.get_i32("chunkSize").unwrap(), 261120);
367 assert_eq!(file.get_i64("length").unwrap(), 9);
368 assert_eq!(
369 file.get_str("md5").unwrap(),
370 "eb733a00c0c9d336e65691a37ab54293"
371 );
372
373 let chunks: Vec<Result<Document, Error>> = db
374 .collection("fs.chunks")
375 .find(doc! { "files_id": id }, None)
376 .await?
377 .collect()
378 .await;
379
380 assert_eq!(chunks[0].as_ref().unwrap().get_i32("n").unwrap(), 0);
381 assert_eq!(
382 chunks[0]
383 .as_ref()
384 .unwrap()
385 .get_binary_generic("data")
386 .unwrap(),
387 &vec![116_u8, 101, 115, 116, 32, 100, 97, 116, 97]
388 );
389
390 db.drop(None).await
391 }
393
394 #[tokio::test]
395 async fn upload_from_stream_chunk_size() -> Result<(), Error> {
396 let client = Client::with_uri_str(
397 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
398 )
399 .await?;
400 let dbname = db_name_new();
401 let db: Database = client.database(&dbname);
402 let mut bucket = GridFSBucket::new(
403 db.clone(),
404 Some(GridFSBucketOptions::builder().chunk_size_bytes(8).build()),
405 );
406 let id = bucket
407 .upload_from_stream("test.txt", "test data 1234567890".as_bytes(), None)
408 .await?;
409
410 assert_eq!(id.to_hex(), id.to_hex());
411 let file = db
412 .collection::<Document>("fs.files")
413 .find_one(doc! { "_id": id }, None)
414 .await?
415 .unwrap();
416 assert_eq!(file.get_str("filename").unwrap(), "test.txt");
417 assert_eq!(file.get_i32("chunkSize").unwrap(), 8);
418 assert_eq!(file.get_i64("length").unwrap(), 20);
419 assert_eq!(
420 file.get_str("md5").unwrap(),
421 "5e75d6271a7cfc3d9b79116be261eb21"
422 );
423
424 let chunks: Vec<Result<Document, Error>> = db
425 .collection::<Document>("fs.chunks")
426 .find(doc! { "files_id": id }, None)
427 .await?
428 .collect()
429 .await;
430
431 assert_eq!(chunks[0].as_ref().unwrap().get_i32("n").unwrap(), 0);
432 assert_eq!(
433 chunks[0]
434 .as_ref()
435 .unwrap()
436 .get_binary_generic("data")
437 .unwrap(),
438 &vec![116_u8, 101, 115, 116, 32, 100, 97, 116]
439 );
440
441 assert_eq!(chunks[1].as_ref().unwrap().get_i32("n").unwrap(), 1);
442 assert_eq!(
443 chunks[1]
444 .as_ref()
445 .unwrap()
446 .get_binary_generic("data")
447 .unwrap(),
448 &vec![97_u8, 32, 49, 50, 51, 52, 53, 54]
449 );
450
451 assert_eq!(chunks[2].as_ref().unwrap().get_i32("n").unwrap(), 2);
452 assert_eq!(
453 chunks[2]
454 .as_ref()
455 .unwrap()
456 .get_binary_generic("data")
457 .unwrap(),
458 &vec![55_u8, 56, 57, 48]
459 );
460
461 db.drop(None).await
462 }
464
465 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
466 #[tokio::test]
467 async fn upload_from_stream_chunk_size_from_tokio_file() -> Result<(), Error> {
468 let client = Client::with_uri_str(
469 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
470 )
471 .await?;
472 let chunk_size: usize = 255 * 1024;
473 let text_len: usize = 400 * 1024;
474 let dbname = db_name_new();
475 let db: Database = client.database(&dbname);
476 let mut bucket = GridFSBucket::new(
477 db.clone(),
478 Some(
479 GridFSBucketOptions::builder()
480 .chunk_size_bytes(chunk_size as u32)
481 .build(),
482 ),
483 );
484 let large_text: Vec<u8> = generate_large_text(text_len);
485 let mut file = tempfile::NamedTempFile::new().unwrap();
486 file.write_all(large_text.as_slice()).unwrap();
487 file.flush().unwrap();
488
489 let async_file = tokio::fs::File::open(file.path()).await?;
490 let id = bucket
491 .upload_from_stream("test.txt", async_file, None)
492 .await?;
493
494 assert_eq!(id.to_hex(), id.to_hex());
495 let file = db
496 .collection::<Document>("fs.files")
497 .find_one(doc! { "_id": id }, None)
498 .await?
499 .unwrap();
500 assert_eq!(file.get_str("filename").unwrap(), "test.txt");
501 assert_eq!(file.get_i32("chunkSize").unwrap(), chunk_size as i32);
502 assert_eq!(file.get_i64("length").unwrap(), text_len as i64);
503 assert_eq!(
504 file.get_str("md5").unwrap(),
505 "324fc7dfb78e561b7dd47e9503bc2dfa"
506 );
507
508 let chunks: Vec<Result<Document, Error>> = db
509 .collection::<Document>("fs.chunks")
510 .find(doc! { "files_id": id }, None)
511 .await?
512 .collect()
513 .await;
514
515 let n_chunks: usize = chunks.len();
516 assert_eq!(n_chunks, (text_len - 1) / chunk_size + 1);
517
518 for i in 0..n_chunks {
519 assert_eq!(chunks[i].as_ref().unwrap().get_i32("n").unwrap(), i as i32);
520
521 let chunk = chunks[i]
522 .as_ref()
523 .unwrap()
524 .get_binary_generic("data")
525 .unwrap();
526 let start = i * chunk_size;
527 let end = start + std::cmp::min(chunk.len(), chunk_size);
528 assert_eq!(chunk, &large_text[start..end]);
529 }
530
531 db.drop(None).await
532 }
534
535 #[cfg(any(feature = "default", feature = "tokio-runtime"))]
536 #[tokio::test]
537 async fn upload_from_stream_chunk_size_from_align_tokio_file() -> Result<(), Error> {
538 let client = Client::with_uri_str(
539 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
540 )
541 .await?;
542 let chunk_size: usize = 255 * 1024;
543 let text_len: usize = 255 * 1024;
544 let dbname = db_name_new();
545 let db: Database = client.database(&dbname);
546 let mut bucket = GridFSBucket::new(
547 db.clone(),
548 Some(
549 GridFSBucketOptions::builder()
550 .chunk_size_bytes(chunk_size as u32)
551 .build(),
552 ),
553 );
554 let large_text: Vec<u8> = generate_large_text(text_len);
555 let mut file = tempfile::NamedTempFile::new().unwrap();
556 file.write_all(large_text.as_slice()).unwrap();
557 file.flush().unwrap();
558
559 let async_file = tokio::fs::File::open(file.path()).await?;
560 let id = bucket
561 .upload_from_stream("test.txt", async_file, None)
562 .await?;
563
564 assert_eq!(id.to_hex(), id.to_hex());
565 let file = db
566 .collection::<Document>("fs.files")
567 .find_one(doc! { "_id": id }, None)
568 .await?
569 .unwrap();
570 assert_eq!(file.get_str("filename").unwrap(), "test.txt");
571 assert_eq!(file.get_i32("chunkSize").unwrap(), chunk_size as i32);
572 assert_eq!(file.get_i64("length").unwrap(), text_len as i64);
573 assert_eq!(
574 file.get_str("md5").unwrap(),
575 "62f27a894992f3b7f532631a2aafcdc4"
576 );
577
578 let chunks: Vec<Result<Document, Error>> = db
579 .collection::<Document>("fs.chunks")
580 .find(doc! { "files_id": id }, None)
581 .await?
582 .collect()
583 .await;
584
585 let n_chunks: usize = chunks.len();
586 assert_eq!(n_chunks, (text_len - 1) / chunk_size + 1);
587
588 for i in 0..n_chunks {
589 assert_eq!(chunks[i].as_ref().unwrap().get_i32("n").unwrap(), i as i32);
590
591 let chunk = chunks[i]
592 .as_ref()
593 .unwrap()
594 .get_binary_generic("data")
595 .unwrap();
596 let start = i * chunk_size;
597 let end = start + std::cmp::min(chunk.len(), chunk_size);
598 assert_eq!(chunk, &large_text[start..end]);
599 }
600
601 db.drop(None).await
602 }
604
605 #[tokio::test]
606 async fn ensure_files_index_before_write() -> Result<(), Error> {
607 let client = Client::with_uri_str(
608 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
609 )
610 .await?;
611 let dbname = db_name_new();
612 let db: Database = client.database(&dbname);
613 let mut bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
614
615 let indexes = db
616 .run_command(doc! {"listIndexes":"fs.files"}, None)
617 .await
618 .ok();
619
620 assert_eq!(indexes, None, "No index expected");
621
622 bucket
623 .upload_from_stream("test.txt", "test data".as_bytes(), None)
624 .await?;
625
626 let indexes = db
627 .run_command(doc! {"listIndexes":"fs.files"}, None)
628 .await?;
629
630 let mut have_index = false;
631 for index in indexes
632 .get_document("cursor")
633 .unwrap()
634 .get_array("firstBatch")
635 .unwrap()
636 {
637 let key = index.as_document().unwrap().get_document("key").unwrap();
638 let filename = key.get_i32("filename");
639 let upload_date = key.get_i32("uploadDate");
640 let filename_f = key.get_f64("filename");
641 let upload_date_f = key.get_f64("uploadDate");
642
643 match (filename, upload_date, filename_f, upload_date_f) {
644 (Ok(1), Ok(1), _, _) => {
645 have_index = true;
646 }
647 (_, _, Ok(x), Ok(y)) if x == 1.0 && y == 1.0 => {
648 have_index = true;
649 }
650 (Ok(1), _, _, Ok(x)) if x == 1.0 => {
651 have_index = true;
652 }
653 (_, Ok(1), Ok(x), _) if x == 1.0 => {
654 have_index = true;
655 }
656 _ => {}
657 }
658 }
659
660 assert_eq!(have_index, true, "should found a file index");
661
662 db.drop(None).await
663 }
665
666 #[tokio::test]
667 async fn ensure_chunks_index_before_write() -> Result<(), Error> {
668 let client = Client::with_uri_str(
669 &std::env::var("MONGO_URI").unwrap_or("mongodb://localhost:27017/".to_string()),
670 )
671 .await?;
672 let dbname = db_name_new();
673 let db: Database = client.database(&dbname);
674 let mut bucket = GridFSBucket::new(db.clone(), Some(GridFSBucketOptions::default()));
675
676 let indexes = db
677 .run_command(doc! {"listIndexes":"fs.chunks"}, None)
678 .await
679 .ok();
680
681 assert_eq!(indexes, None, "No index expected");
682
683 bucket
684 .upload_from_stream("test.txt", "test data".as_bytes(), None)
685 .await?;
686
687 let indexes = db
688 .run_command(doc! {"listIndexes":"fs.chunks"}, None)
689 .await?;
690
691 let mut have_chunks_index = false;
692 for index in indexes
693 .get_document("cursor")
694 .unwrap()
695 .get_array("firstBatch")
696 .unwrap()
697 {
698 let key = index.as_document().unwrap().get_document("key").unwrap();
699
700 let files_id = key.get_i32("files_id");
701 let n = key.get_i32("n");
702 let files_id_f = key.get_f64("files_id");
703 let n_f = key.get_f64("n");
704
705 match (files_id, n, files_id_f, n_f) {
706 (Ok(1), Ok(1), _, _) => {
707 have_chunks_index = true;
708 }
709 (_, _, Ok(x), Ok(y)) if x == 1.0 && y == 1.0 => {
710 have_chunks_index = true;
711 }
712 (Ok(1), _, _, Ok(x)) if x == 1.0 => {
713 have_chunks_index = true;
714 }
715 (_, Ok(1), Ok(x), _) if x == 1.0 => {
716 have_chunks_index = true;
717 }
718 _ => {}
719 }
720 }
721 assert_eq!(have_chunks_index, true, "should found a chunk index");
722 db.drop(None).await
723 }
725}