1use async_trait::async_trait;
2use dco3_crypto::{DracoonCrypto, DracoonRSACrypto};
3use reqwest::StatusCode;
4use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
5use tracing::{error, warn};
6
7use crate::{
8 constants::{
9 DEFAULT_UPLOAD_CHUNK_SIZE, DRACOON_API_PREFIX, FILES_S3_COMPLETE, FILES_S3_URLS,
10 POLLING_START_DELAY, PUBLIC_BASE, PUBLIC_SHARES_BASE, PUBLIC_UPLOAD_SHARES,
11 },
12 nodes::{
13 models::StreamingEncryptedUpload,
14 upload::{calculate_s3_url_count, StreamUploadInternal},
15 CloneableUploadProgressCallback, GeneratePresignedUrlsRequest, PresignedUrlList,
16 S3FileUploadPart, S3UploadStatus, UploadOptions, UploadProgressCallback,
17 },
18 utils::{build_s3_protocol_error, FromResponse},
19 DracoonClientError, Public,
20};
21
22use super::{
23 CompleteS3ShareUploadRequest, CreateShareUploadChannelRequest,
24 CreateShareUploadChannelResponse, FileName, PublicEndpoint, PublicUpload, PublicUploadShare,
25 PublicUploadedFileData, S3ShareUploadStatus, UserFileKey, UserFileKeyList,
26};
27
28fn missing_presigned_url_error() -> DracoonClientError {
29 build_s3_protocol_error(
30 StatusCode::BAD_GATEWAY,
31 "missing_presigned_url",
32 "Presigned URL response contained no URLs",
33 )
34}
35
36fn missing_upload_error_details_error() -> DracoonClientError {
37 build_s3_protocol_error(
38 StatusCode::BAD_GATEWAY,
39 "missing_upload_error_details",
40 "Upload status 'error' did not include error details",
41 )
42}
43
44#[async_trait]
45impl<S: Send + Sync, R: AsyncRead + Send + Sync + Unpin + 'static> PublicUpload<R>
46 for PublicEndpoint<S>
47{
48 async fn upload<'r>(
49 &'r self,
50 access_key: impl Into<String> + Send + Sync,
51 share: PublicUploadShare,
52 upload_options: UploadOptions,
53 reader: BufReader<R>,
54 callback: Option<UploadProgressCallback>,
55 chunk_size: Option<usize>,
56 ) -> Result<FileName, DracoonClientError> {
57 let use_s3_storage = self.get_system_info().await?.use_s3_storage;
58 let is_encrypted = share.is_encrypted.unwrap_or(false);
59
60 let upload_fn = match (use_s3_storage, is_encrypted) {
61 (true, true) => PublicUploadInternal::upload_to_s3_encrypted,
62 (true, false) => PublicUploadInternal::upload_to_s3_unencrypted,
63 (false, true) => PublicUploadInternalNfs::upload_to_nfs_encrypted,
64 (false, false) => PublicUploadInternalNfs::upload_to_nfs_unencrypted,
65 };
66
67 upload_fn(
68 self,
69 access_key.into(),
70 &share,
71 upload_options,
72 reader,
73 callback,
74 chunk_size,
75 )
76 .await
77 }
78}
79
80impl<S> StreamUploadInternal<S> for PublicEndpoint<S> {}
81
82#[async_trait]
83impl<S: Send + Sync, R: AsyncRead + Send + Sync + Unpin + 'static> PublicUploadInternal<R, S>
84 for PublicEndpoint<S>
85{
86 async fn create_upload_channel(
87 &self,
88 access_key: String,
89 create_file_upload_req: CreateShareUploadChannelRequest,
90 ) -> Result<CreateShareUploadChannelResponse, DracoonClientError> {
91 let url_part = format!(
92 "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}",
93 access_key
94 );
95
96 let url = self.client().build_api_url(&url_part);
97
98 let response = self
99 .client()
100 .http
101 .post(url)
102 .json(&create_file_upload_req)
103 .send()
104 .await?;
105
106 CreateShareUploadChannelResponse::from_response(response).await
107 }
108
109 async fn create_s3_upload_urls(
110 &self,
111 access_key: String,
112 upload_id: String,
113 generate_urls_req: GeneratePresignedUrlsRequest,
114 ) -> Result<PresignedUrlList, DracoonClientError> {
115 let url_part = format!(
116 "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}/{FILES_S3_URLS}",
117 access_key, upload_id
118 );
119
120 let url = self.client().build_api_url(&url_part);
121
122 let response = self
123 .client()
124 .http
125 .post(url)
126 .json(&generate_urls_req)
127 .send()
128 .await?;
129
130 PresignedUrlList::from_response(response).await
131 }
132
133 async fn upload_to_s3_unencrypted(
134 &self,
135 access_key: String,
136 share: &PublicUploadShare,
137 upload_options: UploadOptions,
138 mut reader: BufReader<R>,
139 callback: Option<UploadProgressCallback>,
140 chunk_size: Option<usize>,
141 ) -> Result<FileName, DracoonClientError> {
142 let fm = upload_options.file_meta.clone();
143
144 let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
145
146 let file_upload_req =
148 CreateShareUploadChannelRequest::from_upload_options(&upload_options, Some(true), None);
149
150 let upload_channel =
151 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel(
152 self,
153 access_key.clone(),
154 file_upload_req,
155 )
156 .await?;
157
158 let mut s3_parts = Vec::new();
159
160 let (count_urls, last_chunk_size) = calculate_s3_url_count(fm.size, chunk_size as u64);
161 let mut url_part: u32 = 1;
162
163 let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
164
165 if count_urls > 1 {
166 while url_part < count_urls {
167 let mut buffer = vec![0; chunk_size];
168 let cb = cloneable_callback.clone();
169 let fm = fm.clone();
170
171 match reader.read_exact(&mut buffer).await {
172 Ok(0) => break,
173 Ok(n) => {
174 buffer.truncate(n);
175 let chunk = bytes::Bytes::from(buffer);
176
177 let stream: async_stream::__private::AsyncStream<
178 Result<bytes::Bytes, std::io::Error>,
179 _,
180 > = async_stream::stream! {
181 let mut buffer = Vec::new();
182 let mut bytes_read = 0;
183
184 for byte in chunk.iter() {
185 buffer.push(*byte);
186 bytes_read += 1;
187 if buffer.len() == 1024 || bytes_read == chunk.len() {
188 if let Some(callback) = cb.clone() {
189 callback.call(buffer.len() as u64, fm.size);
190 }
191 yield Ok(bytes::Bytes::from(buffer.clone()));
192 buffer.clear();
193 }
194 }
195 };
196
197 let url_req = GeneratePresignedUrlsRequest::new(
198 n.try_into().map_err(|_| DracoonClientError::IoError)?,
199 url_part,
200 url_part,
201 );
202 let url =
203 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::
204 create_s3_upload_urls(self, access_key.clone(), upload_channel.upload_id.clone(), url_req)
205 .await?;
206 let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
207
208 #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)]
210 let curr_pos: u64 = ((url_part - 1) * (chunk_size as u32)) as u64;
211
212 let e_tag = self
213 .upload_stream_to_s3(
214 Box::pin(stream),
215 url,
216 chunk_size
217 .try_into()
218 .map_err(|_| DracoonClientError::IoError)?,
219 )
220 .await?;
221
222 s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
223 url_part += 1;
224 }
225 Err(err) => {
226 error!("Error reading file: {}", err);
227 return Err(DracoonClientError::IoError);
228 }
229 }
230 }
231 }
232
233 let mut buffer = vec![
235 0;
236 last_chunk_size
237 .try_into()
238 .map_err(|_| DracoonClientError::IoError)?
239 ];
240 let cb = cloneable_callback.clone();
241 match reader.read_exact(&mut buffer).await {
242 Ok(n) => {
243 buffer.truncate(n);
244 let chunk = bytes::Bytes::from(buffer);
245 let stream: async_stream::__private::AsyncStream<
246 Result<bytes::Bytes, std::io::Error>,
247 _,
248 > = async_stream::stream! {
249 let mut buffer = Vec::new();
250 let mut bytes_read = 0;
251
252 for byte in chunk.iter() {
253 buffer.push(*byte);
254 bytes_read += 1;
255 if buffer.len() == 1024 || bytes_read == chunk.len() {
256 if let Some(callback) = cb.clone() {
257 callback.call(buffer.len() as u64, fm.size);
258 }
259 yield Ok(bytes::Bytes::from(buffer.clone()));
260 buffer.clear();
261 }
262 }
263
264 };
265
266 let url_req = GeneratePresignedUrlsRequest::new(
267 n.try_into().map_err(|_| DracoonClientError::IoError)?,
268 url_part,
269 url_part,
270 );
271 let url = <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_s3_upload_urls(
272 self,
273 access_key.clone(),
274 upload_channel.upload_id.clone(),
275 url_req,
276 )
277 .await?;
278
279 let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
280
281 let curr_pos: u64 = (url_part - 1) as u64 * (DEFAULT_UPLOAD_CHUNK_SIZE as u64);
282
283 let e_tag = self
284 .upload_stream_to_s3(
285 Box::pin(stream),
286 url,
287 n.try_into().map_err(|_| DracoonClientError::IoError)?,
288 )
289 .await?;
290
291 s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
292 }
293 Err(err) => {
294 error!("Error reading file: {}", err);
295 return Err(DracoonClientError::IoError);
296 }
297 }
298
299 let complete_upload_req = CompleteS3ShareUploadRequest::new(s3_parts, None);
301
302 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::finalize_s3_upload(
303 self,
304 access_key.clone(),
305 upload_channel.upload_id.clone(),
306 complete_upload_req,
307 )
308 .await?;
309
310 let mut sleep_duration = POLLING_START_DELAY;
315 loop {
316 let status_response =
317 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::get_upload_status(
318 self,
319 access_key.clone(),
320 upload_channel.upload_id.clone(),
321 )
322 .await?;
323
324 match status_response.status {
325 S3UploadStatus::Done => {
326 return Ok(status_response.file_name);
327 }
328 S3UploadStatus::Error => {
329 let response = status_response
330 .error_details
331 .ok_or_else(missing_upload_error_details_error)?;
332 error!("Error uploading file: {}", response);
333 return Err(DracoonClientError::Http(response));
334 }
335 _ => {
336 tokio::time::sleep(tokio::time::Duration::from_millis(sleep_duration)).await;
337 sleep_duration *= 2;
338 }
339 }
340 }
341 }
342 async fn upload_to_s3_encrypted(
343 &self,
344 access_key: String,
345 share: &PublicUploadShare,
346 upload_options: UploadOptions,
347 reader: BufReader<R>,
348 callback: Option<UploadProgressCallback>,
349 chunk_size: Option<usize>,
350 ) -> Result<FileName, DracoonClientError> {
351 let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
352 let mut encrypted_upload = StreamingEncryptedUpload::new(reader, chunk_size)?;
353
354 let fm = upload_options.file_meta.clone();
355
356 let file_upload_req =
358 CreateShareUploadChannelRequest::from_upload_options(&upload_options, Some(true), None);
359
360 let upload_channel =
361 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel(
362 self,
363 access_key.clone(),
364 file_upload_req,
365 )
366 .await
367 .map_err(|err| {
368 error!("Error creating upload channel: {}", err);
369 err
370 })?;
371
372 let mut s3_parts = Vec::new();
373 let mut url_part: u32 = 1;
374
375 let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
376
377 while let Some(chunk) = encrypted_upload.next_chunk(chunk_size).await? {
378 let cb = cloneable_callback.clone();
379 let fm = fm.clone();
380 let chunk_len = chunk.len();
381 let stream: async_stream::__private::AsyncStream<
382 Result<bytes::Bytes, std::io::Error>,
383 _,
384 > = async_stream::stream! {
385 let mut buffer = Vec::new();
386 let mut bytes_read = 0;
387
388 for byte in chunk.iter() {
389 buffer.push(*byte);
390 bytes_read += 1;
391 if buffer.len() == 1024 || bytes_read == chunk.len() {
392 if let Some(callback) = cb.clone() {
393 callback.call(buffer.len() as u64, fm.size);
394 }
395 yield Ok(bytes::Bytes::from(buffer.clone()));
396 buffer.clear();
397 }
398 }
399 };
400
401 let url_req = GeneratePresignedUrlsRequest::new(
402 chunk_len
403 .try_into()
404 .map_err(|_| DracoonClientError::IoError)?,
405 url_part,
406 url_part,
407 );
408 let url =
409 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_s3_upload_urls::<'_, '_>(
410 self,
411 access_key.clone(),
412 upload_channel.upload_id.clone(),
413 url_req,
414 )
415 .await
416 .map_err(|err| {
417 error!("Error creating S3 upload urls: {}", err);
418 err
419 })?;
420 let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
421
422 let e_tag = self
423 .upload_stream_to_s3(
424 Box::pin(stream),
425 url,
426 chunk_len
427 .try_into()
428 .map_err(|_| DracoonClientError::IoError)?,
429 )
430 .await
431 .map_err(|err| {
432 error!("Error uploading stream to S3: {}", err);
433 err
434 })?;
435
436 s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
437 url_part += 1;
438 }
439
440 let plain_file_key = encrypted_upload.into_plain_file_key()?;
441 let public_keys = share.user_user_public_key_list.clone().unwrap_or_default();
442
443 let mut user_file_keys = Vec::new();
444 for key in public_keys.items {
445 let user_id = key.id;
446 match DracoonCrypto::encrypt_file_key(
447 plain_file_key.clone(),
448 key.public_key_container.clone(),
449 ) {
450 Ok(file_key) => user_file_keys.push(UserFileKey::new(user_id, file_key)),
451 Err(err) => warn!(
452 user_id,
453 access_key = %access_key,
454 file_name = %upload_options.file_meta.name,
455 error = ?err,
456 "Skipping public upload recipient key distribution",
457 ),
458 }
459 }
460
461 let complete_upload_req = CompleteS3ShareUploadRequest::new(s3_parts, Some(user_file_keys));
463
464 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::finalize_s3_upload::<'_, '_>(
465 self,
466 access_key.clone(),
467 upload_channel.upload_id.clone(),
468 complete_upload_req,
469 )
470 .await
471 .map_err(|err| {
472 error!("Error finalizing upload: {}", err);
473 err
474 })?;
475
476 let mut sleep_duration = POLLING_START_DELAY;
481 loop {
482 let status_response =
483 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::get_upload_status(
484 self,
485 access_key.clone(),
486 upload_channel.upload_id.clone(),
487 )
488 .await
489 .map_err(|err| {
490 error!("Error getting upload status: {}", err);
491 err
492 })?;
493
494 match status_response.status {
495 S3UploadStatus::Done => {
496 return Ok(status_response.file_name);
497 }
498 S3UploadStatus::Error => {
499 return Err(DracoonClientError::Http(
500 status_response
501 .error_details
502 .ok_or_else(missing_upload_error_details_error)?,
503 ));
504 }
505 _ => {
506 tokio::time::sleep(tokio::time::Duration::from_millis(sleep_duration)).await;
507 sleep_duration *= 2;
508 }
509 }
510 }
511 }
512
513 async fn finalize_s3_upload(
514 &self,
515 access_key: String,
516 upload_id: String,
517 complete_file_upload_req: CompleteS3ShareUploadRequest,
518 ) -> Result<(), DracoonClientError> {
519 let url_part = format!(
520 "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}/{FILES_S3_COMPLETE}",
521 access_key, upload_id
522 );
523
524 let url = self.client().build_api_url(&url_part);
525
526 let response = self
527 .client()
528 .http
529 .put(url)
530 .json(&complete_file_upload_req)
531 .send()
532 .await?;
533
534 if response.status().is_success() {
535 Ok(())
536 } else {
537 Err(DracoonClientError::from_response(response).await?)
538 }
539 }
540
541 async fn get_upload_status(
542 &self,
543 access_key: String,
544 upload_id: String,
545 ) -> Result<S3ShareUploadStatus, DracoonClientError> {
546 let url_part = format!(
547 "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}",
548 access_key, upload_id
549 );
550
551 let url = self.client().build_api_url(&url_part);
552
553 let response = self.client().http.get(url).send().await?;
554
555 S3ShareUploadStatus::from_response(response).await
556 }
557}
558
559#[async_trait]
560trait PublicUploadInternal<R: AsyncRead, S>: StreamUploadInternal<S> {
561 async fn create_upload_channel(
562 &self,
563 access_key: String,
564 create_file_upload_req: CreateShareUploadChannelRequest,
565 ) -> Result<CreateShareUploadChannelResponse, DracoonClientError>;
566
567 async fn create_s3_upload_urls(
568 &self,
569 access_key: String,
570 upload_id: String,
571 generate_urls_req: GeneratePresignedUrlsRequest,
572 ) -> Result<PresignedUrlList, DracoonClientError>;
573
574 async fn upload_to_s3_unencrypted(
575 &self,
576 access_key: String,
577 share: &PublicUploadShare,
578 upload_options: UploadOptions,
579 reader: BufReader<R>,
580 callback: Option<UploadProgressCallback>,
581 chunk_size: Option<usize>,
582 ) -> Result<FileName, DracoonClientError>;
583 async fn upload_to_s3_encrypted(
584 &self,
585 access_key: String,
586 share: &PublicUploadShare,
587 upload_options: UploadOptions,
588 reader: BufReader<R>,
589 callback: Option<UploadProgressCallback>,
590 chunk_size: Option<usize>,
591 ) -> Result<FileName, DracoonClientError>;
592
593 async fn finalize_s3_upload(
594 &self,
595 access_key: String,
596 upload_id: String,
597 complete_file_upload_req: CompleteS3ShareUploadRequest,
598 ) -> Result<(), DracoonClientError>;
599
600 async fn get_upload_status(
601 &self,
602 access_key: String,
603 upload_id: String,
604 ) -> Result<S3ShareUploadStatus, DracoonClientError>;
605}
606
607#[async_trait]
608trait PublicUploadInternalNfs<R: AsyncRead, S>:
609 StreamUploadInternal<S> + PublicUploadInternal<R, S>
610{
611 async fn upload_to_nfs_unencrypted(
612 &self,
613 access_key: String,
614 share: &PublicUploadShare,
615 upload_options: UploadOptions,
616 reader: BufReader<R>,
617 callback: Option<UploadProgressCallback>,
618 chunk_size: Option<usize>,
619 ) -> Result<FileName, DracoonClientError>;
620 async fn upload_to_nfs_encrypted(
621 &self,
622 access_key: String,
623 share: &PublicUploadShare,
624 upload_options: UploadOptions,
625 reader: BufReader<R>,
626 callback: Option<UploadProgressCallback>,
627 chunk_size: Option<usize>,
628 ) -> Result<FileName, DracoonClientError>;
629 async fn finalize_nfs_upload(
630 &self,
631 access_key: String,
632 upload_id: String,
633 user_file_key_list: Option<UserFileKeyList>,
634 ) -> Result<PublicUploadedFileData, DracoonClientError>;
635}
636
637#[async_trait]
638impl<R: AsyncRead + Send + Sync + Unpin + 'static, S: Send + Sync> PublicUploadInternalNfs<R, S>
639 for PublicEndpoint<S>
640{
641 async fn upload_to_nfs_unencrypted(
642 &self,
643 access_key: String,
644 share: &PublicUploadShare,
645 upload_options: UploadOptions,
646 mut reader: BufReader<R>,
647 callback: Option<UploadProgressCallback>,
648 chunk_size: Option<usize>,
649 ) -> Result<FileName, DracoonClientError> {
650 let fm = upload_options.file_meta.clone();
651
652 let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
653
654 let file_upload_req =
656 CreateShareUploadChannelRequest::from_upload_options(&upload_options, None, None);
657
658 let upload_channel =
659 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel::<'_, '_>(
660 self,
661 access_key.clone(),
662 file_upload_req,
663 )
664 .await
665 .map_err(|err| {
666 error!("Error creating upload channel: {}", err);
667 err
668 })?;
669
670 let (count_chunks, last_chunk_size) = calculate_s3_url_count(fm.size, chunk_size as u64);
671 let mut chunk_part: u32 = 1;
672
673 let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
674
675 if count_chunks > 1 {
676 while chunk_part < count_chunks {
677 let mut buffer = vec![0; chunk_size];
678 let cb = cloneable_callback.clone();
679 let fm = fm.clone();
680
681 match reader.read_exact(&mut buffer).await {
682 Ok(0) => break,
683 Ok(n) => {
684 buffer.truncate(n);
685 let chunk = bytes::Bytes::from(buffer);
686
687 let stream: async_stream::__private::AsyncStream<
688 Result<bytes::Bytes, std::io::Error>,
689 _,
690 > = async_stream::stream! {
691 let mut buffer = Vec::new();
692 let mut bytes_read = 0;
693
694 for byte in chunk.iter() {
695 buffer.push(*byte);
696 bytes_read += 1;
697 if buffer.len() == 1024 || bytes_read == chunk.len() {
698 if let Some(callback) = cb.clone() {
699 callback.call(buffer.len() as u64, fm.size);
700 }
701 yield Ok(bytes::Bytes::from(buffer.clone()));
702 buffer.clear();
703 }
704 }
705 };
706
707 let url = upload_channel.upload_url.clone();
708
709 #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)]
711 let curr_pos: u64 = ((chunk_part - 1) * (chunk_size as u32)) as u64;
712
713 self.upload_stream_to_nfs(
714 Box::pin(stream),
715 &url,
716 upload_options.file_meta.size,
717 n,
718 Some(curr_pos),
719 )
720 .await?;
721
722 chunk_part += 1;
723 }
724 Err(err) => {
725 error!("Error reading file: {}", err);
726 return Err(DracoonClientError::IoError);
727 }
728 }
729 }
730 }
731
732 let mut buffer = vec![
734 0;
735 last_chunk_size
736 .try_into()
737 .map_err(|_| DracoonClientError::IoError)?
738 ];
739 let cb = cloneable_callback.clone();
740 match reader.read_exact(&mut buffer).await {
741 Ok(n) => {
742 buffer.truncate(n);
743 let chunk = bytes::Bytes::from(buffer);
744 let stream: async_stream::__private::AsyncStream<
745 Result<bytes::Bytes, std::io::Error>,
746 _,
747 > = async_stream::stream! {
748 let mut buffer = Vec::new();
749 let mut bytes_read = 0;
750
751 for byte in chunk.iter() {
752 buffer.push(*byte);
753 bytes_read += 1;
754 if buffer.len() == 1024 || bytes_read == chunk.len() {
755 if let Some(callback) = cb.clone() {
756 callback.call(buffer.len() as u64, fm.size);
757 }
758 yield Ok(bytes::Bytes::from(buffer.clone()));
759 buffer.clear();
760 }
761 }
762
763 };
764
765 let url = upload_channel.upload_url.clone();
766
767 let curr_pos: u64 = (chunk_part - 1) as u64 * (DEFAULT_UPLOAD_CHUNK_SIZE as u64);
768
769 let e_tag = self
770 .upload_stream_to_nfs(
771 Box::pin(stream),
772 &url,
773 upload_options.file_meta.size,
774 n,
775 Some(curr_pos),
776 )
777 .await?;
778 }
779 Err(err) => {
780 error!("Error reading file: {}", err);
781 return Err(DracoonClientError::IoError);
782 }
783 }
784
785 let public_upload =
786 <PublicEndpoint<S> as PublicUploadInternalNfs<R, S>>::finalize_nfs_upload::<'_, '_>(
787 self,
788 access_key.clone(),
789 upload_channel.upload_id.clone(),
790 None,
791 )
792 .await
793 .map_err(|err| {
794 error!("Error finalizing upload: {}", err);
795 err
796 })?;
797
798 Ok(public_upload.name)
799 }
800 async fn upload_to_nfs_encrypted(
801 &self,
802 access_key: String,
803 share: &PublicUploadShare,
804 upload_options: UploadOptions,
805 reader: BufReader<R>,
806 callback: Option<UploadProgressCallback>,
807 chunk_size: Option<usize>,
808 ) -> Result<FileName, DracoonClientError> {
809 let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
810 let mut encrypted_upload = StreamingEncryptedUpload::new(reader, chunk_size)?;
811
812 let fm = upload_options.file_meta.clone();
813
814 let file_upload_req =
816 CreateShareUploadChannelRequest::from_upload_options(&upload_options, None, None);
817
818 let upload_channel =
819 <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel::<'_, '_>(
820 self,
821 access_key.clone(),
822 file_upload_req,
823 )
824 .await
825 .map_err(|err| {
826 error!("Error creating upload channel: {}", err);
827 err
828 })?;
829
830 let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
831 let mut curr_pos = 0u64;
832
833 while let Some(chunk) = encrypted_upload.next_chunk(chunk_size).await? {
834 let cb = cloneable_callback.clone();
835 let fm = fm.clone();
836 let chunk_len = chunk.len();
837 let stream: async_stream::__private::AsyncStream<
838 Result<bytes::Bytes, std::io::Error>,
839 _,
840 > = async_stream::stream! {
841 let mut buffer = Vec::new();
842 let mut bytes_read = 0;
843
844 for byte in chunk.iter() {
845 buffer.push(*byte);
846 bytes_read += 1;
847 if buffer.len() == 1024 || bytes_read == chunk.len() {
848 if let Some(callback) = cb.clone() {
849 callback.call(buffer.len() as u64, fm.size);
850 }
851 yield Ok(bytes::Bytes::from(buffer.clone()));
852 buffer.clear();
853 }
854 }
855 };
856
857 let url = upload_channel.upload_url.clone();
858
859 self.upload_stream_to_nfs(
860 Box::pin(stream),
861 &url,
862 upload_options.file_meta.size,
863 chunk_len,
864 Some(curr_pos),
865 )
866 .await
867 .map_err(|err| {
868 error!("Error uploading stream to NFS: {}", err);
869 err
870 })?;
871
872 curr_pos += chunk_len as u64;
873 }
874
875 let plain_file_key = encrypted_upload.into_plain_file_key()?;
876 let public_keys = share.user_user_public_key_list.clone().unwrap_or_default();
877
878 let mut user_file_keys = Vec::new();
879 for key in public_keys.items {
880 let user_id = key.id;
881 match DracoonCrypto::encrypt_file_key(
882 plain_file_key.clone(),
883 key.public_key_container.clone(),
884 ) {
885 Ok(file_key) => user_file_keys.push(UserFileKey::new(user_id, file_key)),
886 Err(err) => warn!(
887 user_id,
888 access_key = %access_key,
889 file_name = %upload_options.file_meta.name,
890 error = ?err,
891 "Skipping public upload recipient key distribution",
892 ),
893 }
894 }
895
896 let user_file_keys = UserFileKeyList::from(user_file_keys);
897
898 let public_upload =
899 <PublicEndpoint<S> as PublicUploadInternalNfs<R, S>>::finalize_nfs_upload::<'_, '_>(
900 self,
901 access_key.clone(),
902 upload_channel.upload_id.clone(),
903 Some(user_file_keys),
904 )
905 .await
906 .map_err(|err| {
907 error!("Error finalizing upload: {}", err);
908 err
909 })?;
910
911 Ok(public_upload.name)
912 }
913 async fn finalize_nfs_upload(
914 &self,
915 access_key: String,
916 upload_id: String,
917 user_file_key_list: Option<UserFileKeyList>,
918 ) -> Result<PublicUploadedFileData, DracoonClientError> {
919 let url_part = format!(
920 "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}",
921 access_key, upload_id
922 );
923
924 let url = self.client().build_api_url(&url_part);
925
926 let response = match user_file_key_list {
927 Some(user_file_keys) => {
928 self.client()
929 .http
930 .put(url)
931 .json(&user_file_keys)
932 .send()
933 .await?
934 }
935 None => self.client().http.put(url).send().await?,
936 };
937
938 PublicUploadedFileData::from_response(response).await
939 }
940}
941
942#[cfg(test)]
943mod tests {
944 use std::io::Cursor;
945
946 use dco3_crypto::{DracoonCrypto, DracoonRSACrypto, UserKeyPairVersion};
947 use mockito::Matcher;
948
949 use crate::{
950 nodes::{FileMeta, UploadOptions, UserUserPublicKey},
951 public::UserUserPublicKeyList,
952 Dracoon,
953 };
954
955 use super::*;
956
957 fn public_upload_share(is_encrypted: bool) -> PublicUploadShare {
958 let mut share: PublicUploadShare = serde_json::from_str(include_str!(
959 "../tests/responses/public/upload_share_ok.json"
960 ))
961 .unwrap();
962 share.is_encrypted = Some(is_encrypted);
963 share
964 }
965
966 fn encrypted_public_upload_share() -> PublicUploadShare {
967 let mut share = public_upload_share(true);
968 let keypair = DracoonCrypto::create_plain_user_keypair(UserKeyPairVersion::RSA4096)
969 .expect("public key generation should succeed");
970
971 share.user_user_public_key_list = Some(UserUserPublicKeyList {
972 items: vec![UserUserPublicKey {
973 id: 7,
974 public_key_container: keypair.public_key_container.clone(),
975 }],
976 });
977
978 share
979 }
980
981 fn s3_urls_response(base_url: &str) -> String {
982 include_str!("../tests/responses/upload/s3_urls_ok_with_placeholder.json")
983 .replace("$base_url", base_url)
984 }
985
986 fn upload_status_done(file_name: &str) -> String {
987 format!(r#"{{"status":"done","fileName":"{file_name}"}}"#)
988 }
989
990 fn nfs_upload_channel_response(base_url: &str) -> String {
991 let base_url = base_url.trim_end_matches('/');
992 format!(r#"{{"uploadUrl":"{base_url}/upload_url","uploadId":"string"}}"#)
993 }
994
995 fn uploaded_file_response(name: &str, size: u64) -> String {
996 format!(r#"{{"name":"{name}","size":{size},"createdAt":"2021-01-01T00:00:00.000Z"}}"#)
997 }
998
999 #[tokio::test]
1000 async fn test_upload_to_s3_unencrypted() {
1001 let mut mock_server = mockito::Server::new_async().await;
1002 let base_url = mock_server.url();
1003
1004 let client = Dracoon::builder()
1005 .with_base_url(base_url.clone())
1006 .with_client_id("client_id")
1007 .with_client_secret("client_secret")
1008 .build()
1009 .unwrap();
1010 let public = client.public();
1011
1012 let reader = BufReader::new(Cursor::new(vec![
1013 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1014 ]));
1015 let upload_options =
1016 UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1017 let share = public_upload_share(false);
1018 let access_key = "test-access-key";
1019 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1020 let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1021 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1022 let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1023
1024 let upload_channel_mock = mock_server
1025 .mock("POST", upload_path.as_str())
1026 .with_status(201)
1027 .with_body(include_str!(
1028 "../tests/responses/upload/upload_channel_ok.json"
1029 ))
1030 .with_header("content-type", "application/json")
1031 .create();
1032
1033 let s3_urls_mock = mock_server
1034 .mock("POST", s3_urls_path.as_str())
1035 .with_status(201)
1036 .with_body(s3_urls_response(&base_url))
1037 .with_header("content-type", "application/json")
1038 .create();
1039
1040 let upload_mock = mock_server
1041 .mock("PUT", "/upload_url")
1042 .with_status(202)
1043 .with_header("etag", "string")
1044 .create();
1045
1046 let finalize_mock = mock_server
1047 .mock("PUT", finalize_path.as_str())
1048 .with_status(202)
1049 .create();
1050
1051 let status_mock = mock_server
1052 .mock("GET", status_path.as_str())
1053 .with_status(200)
1054 .with_body(upload_status_done("test.txt"))
1055 .with_header("content-type", "application/json")
1056 .create();
1057
1058 let file_name = public
1059 .upload_to_s3_unencrypted(
1060 access_key.into(),
1061 &share,
1062 upload_options,
1063 reader,
1064 None,
1065 None,
1066 )
1067 .await
1068 .unwrap();
1069
1070 assert_eq!(file_name, "test.txt");
1071 upload_channel_mock.assert();
1072 s3_urls_mock.assert();
1073 upload_mock.assert();
1074 finalize_mock.assert();
1075 status_mock.assert();
1076 }
1077
1078 #[tokio::test]
1079 async fn test_upload_to_s3_encrypted() {
1080 let mut mock_server = mockito::Server::new_async().await;
1081 let base_url = mock_server.url();
1082
1083 let client = Dracoon::builder()
1084 .with_base_url(base_url.clone())
1085 .with_client_id("client_id")
1086 .with_client_secret("client_secret")
1087 .build()
1088 .unwrap();
1089 let public = client.public();
1090
1091 let reader = BufReader::new(Cursor::new(vec![
1092 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1093 ]));
1094 let upload_options =
1095 UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1096 let share = encrypted_public_upload_share();
1097 let access_key = "test-access-key";
1098 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1099 let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1100 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1101 let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1102
1103 let upload_channel_mock = mock_server
1104 .mock("POST", upload_path.as_str())
1105 .with_status(201)
1106 .with_body(include_str!(
1107 "../tests/responses/upload/upload_channel_ok.json"
1108 ))
1109 .with_header("content-type", "application/json")
1110 .create();
1111
1112 let s3_urls_mock = mock_server
1113 .mock("POST", s3_urls_path.as_str())
1114 .with_status(201)
1115 .with_body(s3_urls_response(&base_url))
1116 .with_header("content-type", "application/json")
1117 .create();
1118
1119 let upload_mock = mock_server
1120 .mock("PUT", "/upload_url")
1121 .with_status(202)
1122 .with_header("etag", "string")
1123 .create();
1124
1125 let finalize_mock = mock_server
1126 .mock("PUT", finalize_path.as_str())
1127 .match_body(Matcher::Regex(
1128 r#"(?s).*userFileKeyList.*"userId":7.*"#.to_string(),
1129 ))
1130 .with_status(202)
1131 .create();
1132
1133 let status_mock = mock_server
1134 .mock("GET", status_path.as_str())
1135 .with_status(200)
1136 .with_body(upload_status_done("test.txt"))
1137 .with_header("content-type", "application/json")
1138 .create();
1139
1140 let file_name = public
1141 .upload_to_s3_encrypted(
1142 access_key.into(),
1143 &share,
1144 upload_options,
1145 reader,
1146 None,
1147 None,
1148 )
1149 .await
1150 .unwrap();
1151
1152 assert_eq!(file_name, "test.txt");
1153 upload_channel_mock.assert();
1154 s3_urls_mock.assert();
1155 upload_mock.assert();
1156 finalize_mock.assert();
1157 status_mock.assert();
1158 }
1159
1160 #[tokio::test]
1161 async fn test_upload_to_s3_encrypted_streams_multiple_parts() {
1162 let mut mock_server = mockito::Server::new_async().await;
1163 let base_url = mock_server.url();
1164
1165 let client = Dracoon::builder()
1166 .with_base_url(base_url.clone())
1167 .with_client_id("client_id")
1168 .with_client_secret("client_secret")
1169 .build()
1170 .unwrap();
1171 let public = client.public();
1172
1173 let reader = BufReader::new(Cursor::new(vec![
1174 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255,
1175 ]));
1176 let upload_options =
1177 UploadOptions::builder(FileMeta::builder("test.txt", 12).build()).build();
1178 let share = encrypted_public_upload_share();
1179 let access_key = "test-access-key";
1180 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1181 let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1182 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1183 let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1184
1185 let upload_channel_mock = mock_server
1186 .mock("POST", upload_path.as_str())
1187 .with_status(201)
1188 .with_body(include_str!(
1189 "../tests/responses/upload/upload_channel_ok.json"
1190 ))
1191 .with_header("content-type", "application/json")
1192 .create();
1193
1194 let s3_urls_mock = mock_server
1195 .mock("POST", s3_urls_path.as_str())
1196 .with_status(201)
1197 .with_body(s3_urls_response(&base_url))
1198 .with_header("content-type", "application/json")
1199 .expect(3)
1200 .create();
1201
1202 let upload_mock = mock_server
1203 .mock("PUT", "/upload_url")
1204 .with_status(202)
1205 .with_header("etag", "string")
1206 .expect(3)
1207 .create();
1208
1209 let finalize_mock = mock_server
1210 .mock("PUT", finalize_path.as_str())
1211 .match_body(Matcher::Regex(
1212 r#"(?s).*"partNumber":1.*"partNumber":2.*"partNumber":3.*userFileKeyList.*"userId":7.*"#
1213 .to_string(),
1214 ))
1215 .with_status(202)
1216 .create();
1217
1218 let status_mock = mock_server
1219 .mock("GET", status_path.as_str())
1220 .with_status(200)
1221 .with_body(upload_status_done("test.txt"))
1222 .with_header("content-type", "application/json")
1223 .create();
1224
1225 let file_name = public
1226 .upload_to_s3_encrypted(
1227 access_key.into(),
1228 &share,
1229 upload_options,
1230 reader,
1231 None,
1232 Some(4),
1233 )
1234 .await
1235 .unwrap();
1236
1237 assert_eq!(file_name, "test.txt");
1238 upload_channel_mock.assert();
1239 s3_urls_mock.assert();
1240 upload_mock.assert();
1241 finalize_mock.assert();
1242 status_mock.assert();
1243 }
1244
1245 #[tokio::test]
1246 async fn test_upload_to_nfs_unencrypted() {
1247 let mut mock_server = mockito::Server::new_async().await;
1248 let base_url = mock_server.url();
1249
1250 let client = Dracoon::builder()
1251 .with_base_url(base_url.clone())
1252 .with_client_id("client_id")
1253 .with_client_secret("client_secret")
1254 .build()
1255 .unwrap();
1256 let public = client.public();
1257
1258 let reader = BufReader::new(Cursor::new(vec![
1259 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1260 ]));
1261 let upload_options =
1262 UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1263 let share = public_upload_share(false);
1264 let access_key = "test-access-key";
1265 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1266 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1267
1268 let upload_channel_mock = mock_server
1269 .mock("POST", upload_path.as_str())
1270 .with_status(201)
1271 .with_body(nfs_upload_channel_response(&base_url))
1272 .with_header("content-type", "application/json")
1273 .create();
1274
1275 let upload_mock = mock_server
1276 .mock("POST", "/upload_url")
1277 .with_status(202)
1278 .create();
1279
1280 let finalize_mock = mock_server
1281 .mock("PUT", finalize_path.as_str())
1282 .with_status(200)
1283 .with_body(uploaded_file_response("test.txt", 16))
1284 .with_header("content-type", "application/json")
1285 .create();
1286
1287 let file_name = public
1288 .upload_to_nfs_unencrypted(
1289 access_key.into(),
1290 &share,
1291 upload_options,
1292 reader,
1293 None,
1294 None,
1295 )
1296 .await
1297 .unwrap();
1298
1299 assert_eq!(file_name, "test.txt");
1300 upload_channel_mock.assert();
1301 upload_mock.assert();
1302 finalize_mock.assert();
1303 }
1304
1305 #[tokio::test]
1306 async fn test_upload_to_nfs_encrypted() {
1307 let mut mock_server = mockito::Server::new_async().await;
1308 let base_url = mock_server.url();
1309
1310 let client = Dracoon::builder()
1311 .with_base_url(base_url.clone())
1312 .with_client_id("client_id")
1313 .with_client_secret("client_secret")
1314 .build()
1315 .unwrap();
1316 let public = client.public();
1317
1318 let reader = BufReader::new(Cursor::new(vec![
1319 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1320 ]));
1321 let upload_options =
1322 UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1323 let share = encrypted_public_upload_share();
1324 let access_key = "test-access-key";
1325 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1326 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1327
1328 let upload_channel_mock = mock_server
1329 .mock("POST", upload_path.as_str())
1330 .with_status(201)
1331 .with_body(nfs_upload_channel_response(&base_url))
1332 .with_header("content-type", "application/json")
1333 .create();
1334
1335 let upload_mock = mock_server
1336 .mock("POST", "/upload_url")
1337 .with_status(202)
1338 .create();
1339
1340 let finalize_mock = mock_server
1341 .mock("PUT", finalize_path.as_str())
1342 .match_body(Matcher::Regex(
1343 r#"(?s).*"items":\[.*"userId":7.*"#.to_string(),
1344 ))
1345 .with_status(200)
1346 .with_body(uploaded_file_response("test.txt", 16))
1347 .with_header("content-type", "application/json")
1348 .create();
1349
1350 let file_name = public
1351 .upload_to_nfs_encrypted(
1352 access_key.into(),
1353 &share,
1354 upload_options,
1355 reader,
1356 None,
1357 None,
1358 )
1359 .await
1360 .unwrap();
1361
1362 assert_eq!(file_name, "test.txt");
1363 upload_channel_mock.assert();
1364 upload_mock.assert();
1365 finalize_mock.assert();
1366 }
1367
1368 #[tokio::test]
1369 async fn test_upload_to_nfs_encrypted_streams_multiple_chunks_with_offsets() {
1370 let mut mock_server = mockito::Server::new_async().await;
1371 let base_url = mock_server.url();
1372
1373 let client = Dracoon::builder()
1374 .with_base_url(base_url.clone())
1375 .with_client_id("client_id")
1376 .with_client_secret("client_secret")
1377 .build()
1378 .unwrap();
1379 let public = client.public();
1380
1381 let reader = BufReader::new(Cursor::new(vec![
1382 0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255,
1383 ]));
1384 let upload_options =
1385 UploadOptions::builder(FileMeta::builder("test.txt", 12).build()).build();
1386 let share = encrypted_public_upload_share();
1387 let access_key = "test-access-key";
1388 let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1389 let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1390
1391 let upload_channel_mock = mock_server
1392 .mock("POST", upload_path.as_str())
1393 .with_status(201)
1394 .with_body(nfs_upload_channel_response(&base_url))
1395 .with_header("content-type", "application/json")
1396 .create();
1397
1398 let upload_mock_1 = mock_server
1399 .mock("POST", "/upload_url")
1400 .match_header("content-range", "bytes 0-4/12")
1401 .with_status(202)
1402 .expect(1)
1403 .create();
1404
1405 let upload_mock_2 = mock_server
1406 .mock("POST", "/upload_url")
1407 .match_header("content-range", "bytes 4-8/12")
1408 .with_status(202)
1409 .expect(1)
1410 .create();
1411
1412 let upload_mock_3 = mock_server
1413 .mock("POST", "/upload_url")
1414 .match_header("content-range", "bytes 8-12/12")
1415 .with_status(202)
1416 .expect(1)
1417 .create();
1418
1419 let finalize_mock = mock_server
1420 .mock("PUT", finalize_path.as_str())
1421 .match_body(Matcher::Regex(
1422 r#"(?s).*"items":\[.*"userId":7.*"#.to_string(),
1423 ))
1424 .with_status(200)
1425 .with_body(uploaded_file_response("test.txt", 12))
1426 .with_header("content-type", "application/json")
1427 .create();
1428
1429 let file_name = public
1430 .upload_to_nfs_encrypted(
1431 access_key.into(),
1432 &share,
1433 upload_options,
1434 reader,
1435 None,
1436 Some(4),
1437 )
1438 .await
1439 .unwrap();
1440
1441 assert_eq!(file_name, "test.txt");
1442 upload_channel_mock.assert();
1443 upload_mock_1.assert();
1444 upload_mock_2.assert();
1445 upload_mock_3.assert();
1446 finalize_mock.assert();
1447 }
1448}