Skip to main content

dco3/public/
upload.rs

1use async_trait::async_trait;
2use dco3_crypto::{DracoonCrypto, DracoonRSACrypto};
3use reqwest::StatusCode;
4use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
5use tracing::{error, warn};
6
7use crate::{
8    constants::{
9        DEFAULT_UPLOAD_CHUNK_SIZE, DRACOON_API_PREFIX, FILES_S3_COMPLETE, FILES_S3_URLS,
10        POLLING_START_DELAY, PUBLIC_BASE, PUBLIC_SHARES_BASE, PUBLIC_UPLOAD_SHARES,
11    },
12    nodes::{
13        models::StreamingEncryptedUpload,
14        upload::{calculate_s3_url_count, StreamUploadInternal},
15        CloneableUploadProgressCallback, GeneratePresignedUrlsRequest, PresignedUrlList,
16        S3FileUploadPart, S3UploadStatus, UploadOptions, UploadProgressCallback,
17    },
18    utils::{build_s3_protocol_error, FromResponse},
19    DracoonClientError, Public,
20};
21
22use super::{
23    CompleteS3ShareUploadRequest, CreateShareUploadChannelRequest,
24    CreateShareUploadChannelResponse, FileName, PublicEndpoint, PublicUpload, PublicUploadShare,
25    PublicUploadedFileData, S3ShareUploadStatus, UserFileKey, UserFileKeyList,
26};
27
28fn missing_presigned_url_error() -> DracoonClientError {
29    build_s3_protocol_error(
30        StatusCode::BAD_GATEWAY,
31        "missing_presigned_url",
32        "Presigned URL response contained no URLs",
33    )
34}
35
36fn missing_upload_error_details_error() -> DracoonClientError {
37    build_s3_protocol_error(
38        StatusCode::BAD_GATEWAY,
39        "missing_upload_error_details",
40        "Upload status 'error' did not include error details",
41    )
42}
43
44#[async_trait]
45impl<S: Send + Sync, R: AsyncRead + Send + Sync + Unpin + 'static> PublicUpload<R>
46    for PublicEndpoint<S>
47{
48    async fn upload<'r>(
49        &'r self,
50        access_key: impl Into<String> + Send + Sync,
51        share: PublicUploadShare,
52        upload_options: UploadOptions,
53        reader: BufReader<R>,
54        callback: Option<UploadProgressCallback>,
55        chunk_size: Option<usize>,
56    ) -> Result<FileName, DracoonClientError> {
57        let use_s3_storage = self.get_system_info().await?.use_s3_storage;
58        let is_encrypted = share.is_encrypted.unwrap_or(false);
59
60        let upload_fn = match (use_s3_storage, is_encrypted) {
61            (true, true) => PublicUploadInternal::upload_to_s3_encrypted,
62            (true, false) => PublicUploadInternal::upload_to_s3_unencrypted,
63            (false, true) => PublicUploadInternalNfs::upload_to_nfs_encrypted,
64            (false, false) => PublicUploadInternalNfs::upload_to_nfs_unencrypted,
65        };
66
67        upload_fn(
68            self,
69            access_key.into(),
70            &share,
71            upload_options,
72            reader,
73            callback,
74            chunk_size,
75        )
76        .await
77    }
78}
79
80impl<S> StreamUploadInternal<S> for PublicEndpoint<S> {}
81
82#[async_trait]
83impl<S: Send + Sync, R: AsyncRead + Send + Sync + Unpin + 'static> PublicUploadInternal<R, S>
84    for PublicEndpoint<S>
85{
86    async fn create_upload_channel(
87        &self,
88        access_key: String,
89        create_file_upload_req: CreateShareUploadChannelRequest,
90    ) -> Result<CreateShareUploadChannelResponse, DracoonClientError> {
91        let url_part = format!(
92            "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}",
93            access_key
94        );
95
96        let url = self.client().build_api_url(&url_part);
97
98        let response = self
99            .client()
100            .http
101            .post(url)
102            .json(&create_file_upload_req)
103            .send()
104            .await?;
105
106        CreateShareUploadChannelResponse::from_response(response).await
107    }
108
109    async fn create_s3_upload_urls(
110        &self,
111        access_key: String,
112        upload_id: String,
113        generate_urls_req: GeneratePresignedUrlsRequest,
114    ) -> Result<PresignedUrlList, DracoonClientError> {
115        let url_part = format!(
116            "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}/{FILES_S3_URLS}",
117            access_key, upload_id
118        );
119
120        let url = self.client().build_api_url(&url_part);
121
122        let response = self
123            .client()
124            .http
125            .post(url)
126            .json(&generate_urls_req)
127            .send()
128            .await?;
129
130        PresignedUrlList::from_response(response).await
131    }
132
133    async fn upload_to_s3_unencrypted(
134        &self,
135        access_key: String,
136        share: &PublicUploadShare,
137        upload_options: UploadOptions,
138        mut reader: BufReader<R>,
139        callback: Option<UploadProgressCallback>,
140        chunk_size: Option<usize>,
141    ) -> Result<FileName, DracoonClientError> {
142        let fm = upload_options.file_meta.clone();
143
144        let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
145
146        // create upload channel
147        let file_upload_req =
148            CreateShareUploadChannelRequest::from_upload_options(&upload_options, Some(true), None);
149
150        let upload_channel =
151            <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel(
152                self,
153                access_key.clone(),
154                file_upload_req,
155            )
156            .await?;
157
158        let mut s3_parts = Vec::new();
159
160        let (count_urls, last_chunk_size) = calculate_s3_url_count(fm.size, chunk_size as u64);
161        let mut url_part: u32 = 1;
162
163        let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
164
165        if count_urls > 1 {
166            while url_part < count_urls {
167                let mut buffer = vec![0; chunk_size];
168                let cb = cloneable_callback.clone();
169                let fm = fm.clone();
170
171                match reader.read_exact(&mut buffer).await {
172                    Ok(0) => break,
173                    Ok(n) => {
174                        buffer.truncate(n);
175                        let chunk = bytes::Bytes::from(buffer);
176
177                        let stream: async_stream::__private::AsyncStream<
178                            Result<bytes::Bytes, std::io::Error>,
179                            _,
180                        > = async_stream::stream! {
181                            let mut buffer = Vec::new();
182                            let mut bytes_read = 0;
183
184                            for byte in chunk.iter() {
185                            buffer.push(*byte);
186                            bytes_read += 1;
187                            if buffer.len() == 1024 || bytes_read == chunk.len() {
188                            if let Some(callback) = cb.clone() {
189                                callback.call(buffer.len() as u64, fm.size);
190                                        }
191                                yield Ok(bytes::Bytes::from(buffer.clone()));
192                                buffer.clear();
193                                }
194                            }
195                        };
196
197                        let url_req = GeneratePresignedUrlsRequest::new(
198                            n.try_into().map_err(|_| DracoonClientError::IoError)?,
199                            url_part,
200                            url_part,
201                        );
202                        let url =
203                        <PublicEndpoint<S> as PublicUploadInternal<R, S>>::
204                            create_s3_upload_urls(self, access_key.clone(), upload_channel.upload_id.clone(), url_req)
205                            .await?;
206                        let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
207
208                        // truncation is safe because chunk_size is 32 MB
209                        #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)]
210                        let curr_pos: u64 = ((url_part - 1) * (chunk_size as u32)) as u64;
211
212                        let e_tag = self
213                            .upload_stream_to_s3(
214                                Box::pin(stream),
215                                url,
216                                chunk_size
217                                    .try_into()
218                                    .map_err(|_| DracoonClientError::IoError)?,
219                            )
220                            .await?;
221
222                        s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
223                        url_part += 1;
224                    }
225                    Err(err) => {
226                        error!("Error reading file: {}", err);
227                        return Err(DracoonClientError::IoError);
228                    }
229                }
230            }
231        }
232
233        // upload last chunk
234        let mut buffer = vec![
235            0;
236            last_chunk_size
237                .try_into()
238                .map_err(|_| DracoonClientError::IoError)?
239        ];
240        let cb = cloneable_callback.clone();
241        match reader.read_exact(&mut buffer).await {
242            Ok(n) => {
243                buffer.truncate(n);
244                let chunk = bytes::Bytes::from(buffer);
245                let stream: async_stream::__private::AsyncStream<
246                    Result<bytes::Bytes, std::io::Error>,
247                    _,
248                > = async_stream::stream! {
249                    let mut buffer = Vec::new();
250                    let mut bytes_read = 0;
251
252                    for byte in chunk.iter() {
253                    buffer.push(*byte);
254                    bytes_read += 1;
255                    if buffer.len() == 1024 || bytes_read == chunk.len() {
256                    if let Some(callback) = cb.clone() {
257                        callback.call(buffer.len() as u64, fm.size);
258                                }
259                        yield Ok(bytes::Bytes::from(buffer.clone()));
260                        buffer.clear();
261                        }
262                    }
263
264                };
265
266                let url_req = GeneratePresignedUrlsRequest::new(
267                    n.try_into().map_err(|_| DracoonClientError::IoError)?,
268                    url_part,
269                    url_part,
270                );
271                let url = <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_s3_upload_urls(
272                    self,
273                    access_key.clone(),
274                    upload_channel.upload_id.clone(),
275                    url_req,
276                )
277                .await?;
278
279                let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
280
281                let curr_pos: u64 = (url_part - 1) as u64 * (DEFAULT_UPLOAD_CHUNK_SIZE as u64);
282
283                let e_tag = self
284                    .upload_stream_to_s3(
285                        Box::pin(stream),
286                        url,
287                        n.try_into().map_err(|_| DracoonClientError::IoError)?,
288                    )
289                    .await?;
290
291                s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
292            }
293            Err(err) => {
294                error!("Error reading file: {}", err);
295                return Err(DracoonClientError::IoError);
296            }
297        }
298
299        // finalize upload
300        let complete_upload_req = CompleteS3ShareUploadRequest::new(s3_parts, None);
301
302        <PublicEndpoint<S> as PublicUploadInternal<R, S>>::finalize_s3_upload(
303            self,
304            access_key.clone(),
305            upload_channel.upload_id.clone(),
306            complete_upload_req,
307        )
308        .await?;
309
310        // get upload status
311        // return node if upload is done
312        // return error if upload failed
313        // polling with exponential backoff
314        let mut sleep_duration = POLLING_START_DELAY;
315        loop {
316            let status_response =
317                <PublicEndpoint<S> as PublicUploadInternal<R, S>>::get_upload_status(
318                    self,
319                    access_key.clone(),
320                    upload_channel.upload_id.clone(),
321                )
322                .await?;
323
324            match status_response.status {
325                S3UploadStatus::Done => {
326                    return Ok(status_response.file_name);
327                }
328                S3UploadStatus::Error => {
329                    let response = status_response
330                        .error_details
331                        .ok_or_else(missing_upload_error_details_error)?;
332                    error!("Error uploading file: {}", response);
333                    return Err(DracoonClientError::Http(response));
334                }
335                _ => {
336                    tokio::time::sleep(tokio::time::Duration::from_millis(sleep_duration)).await;
337                    sleep_duration *= 2;
338                }
339            }
340        }
341    }
342    async fn upload_to_s3_encrypted(
343        &self,
344        access_key: String,
345        share: &PublicUploadShare,
346        upload_options: UploadOptions,
347        reader: BufReader<R>,
348        callback: Option<UploadProgressCallback>,
349        chunk_size: Option<usize>,
350    ) -> Result<FileName, DracoonClientError> {
351        let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
352        let mut encrypted_upload = StreamingEncryptedUpload::new(reader, chunk_size)?;
353
354        let fm = upload_options.file_meta.clone();
355
356        // create upload channel
357        let file_upload_req =
358            CreateShareUploadChannelRequest::from_upload_options(&upload_options, Some(true), None);
359
360        let upload_channel =
361            <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel(
362                self,
363                access_key.clone(),
364                file_upload_req,
365            )
366            .await
367            .map_err(|err| {
368                error!("Error creating upload channel: {}", err);
369                err
370            })?;
371
372        let mut s3_parts = Vec::new();
373        let mut url_part: u32 = 1;
374
375        let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
376
377        while let Some(chunk) = encrypted_upload.next_chunk(chunk_size).await? {
378            let cb = cloneable_callback.clone();
379            let fm = fm.clone();
380            let chunk_len = chunk.len();
381            let stream: async_stream::__private::AsyncStream<
382                Result<bytes::Bytes, std::io::Error>,
383                _,
384            > = async_stream::stream! {
385                let mut buffer = Vec::new();
386                let mut bytes_read = 0;
387
388                for byte in chunk.iter() {
389                    buffer.push(*byte);
390                    bytes_read += 1;
391                    if buffer.len() == 1024 || bytes_read == chunk.len() {
392                        if let Some(callback) = cb.clone() {
393                            callback.call(buffer.len() as u64, fm.size);
394                        }
395                        yield Ok(bytes::Bytes::from(buffer.clone()));
396                        buffer.clear();
397                    }
398                }
399            };
400
401            let url_req = GeneratePresignedUrlsRequest::new(
402                chunk_len
403                    .try_into()
404                    .map_err(|_| DracoonClientError::IoError)?,
405                url_part,
406                url_part,
407            );
408            let url =
409                <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_s3_upload_urls::<'_, '_>(
410                    self,
411                    access_key.clone(),
412                    upload_channel.upload_id.clone(),
413                    url_req,
414                )
415                .await
416                .map_err(|err| {
417                    error!("Error creating S3 upload urls: {}", err);
418                    err
419                })?;
420            let url = url.urls.first().ok_or_else(missing_presigned_url_error)?;
421
422            let e_tag = self
423                .upload_stream_to_s3(
424                    Box::pin(stream),
425                    url,
426                    chunk_len
427                        .try_into()
428                        .map_err(|_| DracoonClientError::IoError)?,
429                )
430                .await
431                .map_err(|err| {
432                    error!("Error uploading stream to S3: {}", err);
433                    err
434                })?;
435
436            s3_parts.push(S3FileUploadPart::new(url_part, e_tag));
437            url_part += 1;
438        }
439
440        let plain_file_key = encrypted_upload.into_plain_file_key()?;
441        let public_keys = share.user_user_public_key_list.clone().unwrap_or_default();
442
443        let mut user_file_keys = Vec::new();
444        for key in public_keys.items {
445            let user_id = key.id;
446            match DracoonCrypto::encrypt_file_key(
447                plain_file_key.clone(),
448                key.public_key_container.clone(),
449            ) {
450                Ok(file_key) => user_file_keys.push(UserFileKey::new(user_id, file_key)),
451                Err(err) => warn!(
452                    user_id,
453                    access_key = %access_key,
454                    file_name = %upload_options.file_meta.name,
455                    error = ?err,
456                    "Skipping public upload recipient key distribution",
457                ),
458            }
459        }
460
461        // finalize upload
462        let complete_upload_req = CompleteS3ShareUploadRequest::new(s3_parts, Some(user_file_keys));
463
464        <PublicEndpoint<S> as PublicUploadInternal<R, S>>::finalize_s3_upload::<'_, '_>(
465            self,
466            access_key.clone(),
467            upload_channel.upload_id.clone(),
468            complete_upload_req,
469        )
470        .await
471        .map_err(|err| {
472            error!("Error finalizing upload: {}", err);
473            err
474        })?;
475
476        // get upload status
477        // return node if upload is done
478        // return error if upload failed
479        // polling with exponential backoff
480        let mut sleep_duration = POLLING_START_DELAY;
481        loop {
482            let status_response =
483                <PublicEndpoint<S> as PublicUploadInternal<R, S>>::get_upload_status(
484                    self,
485                    access_key.clone(),
486                    upload_channel.upload_id.clone(),
487                )
488                .await
489                .map_err(|err| {
490                    error!("Error getting upload status: {}", err);
491                    err
492                })?;
493
494            match status_response.status {
495                S3UploadStatus::Done => {
496                    return Ok(status_response.file_name);
497                }
498                S3UploadStatus::Error => {
499                    return Err(DracoonClientError::Http(
500                        status_response
501                            .error_details
502                            .ok_or_else(missing_upload_error_details_error)?,
503                    ));
504                }
505                _ => {
506                    tokio::time::sleep(tokio::time::Duration::from_millis(sleep_duration)).await;
507                    sleep_duration *= 2;
508                }
509            }
510        }
511    }
512
513    async fn finalize_s3_upload(
514        &self,
515        access_key: String,
516        upload_id: String,
517        complete_file_upload_req: CompleteS3ShareUploadRequest,
518    ) -> Result<(), DracoonClientError> {
519        let url_part = format!(
520            "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}/{FILES_S3_COMPLETE}",
521            access_key, upload_id
522        );
523
524        let url = self.client().build_api_url(&url_part);
525
526        let response = self
527            .client()
528            .http
529            .put(url)
530            .json(&complete_file_upload_req)
531            .send()
532            .await?;
533
534        if response.status().is_success() {
535            Ok(())
536        } else {
537            Err(DracoonClientError::from_response(response).await?)
538        }
539    }
540
541    async fn get_upload_status(
542        &self,
543        access_key: String,
544        upload_id: String,
545    ) -> Result<S3ShareUploadStatus, DracoonClientError> {
546        let url_part = format!(
547            "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}",
548            access_key, upload_id
549        );
550
551        let url = self.client().build_api_url(&url_part);
552
553        let response = self.client().http.get(url).send().await?;
554
555        S3ShareUploadStatus::from_response(response).await
556    }
557}
558
559#[async_trait]
560trait PublicUploadInternal<R: AsyncRead, S>: StreamUploadInternal<S> {
561    async fn create_upload_channel(
562        &self,
563        access_key: String,
564        create_file_upload_req: CreateShareUploadChannelRequest,
565    ) -> Result<CreateShareUploadChannelResponse, DracoonClientError>;
566
567    async fn create_s3_upload_urls(
568        &self,
569        access_key: String,
570        upload_id: String,
571        generate_urls_req: GeneratePresignedUrlsRequest,
572    ) -> Result<PresignedUrlList, DracoonClientError>;
573
574    async fn upload_to_s3_unencrypted(
575        &self,
576        access_key: String,
577        share: &PublicUploadShare,
578        upload_options: UploadOptions,
579        reader: BufReader<R>,
580        callback: Option<UploadProgressCallback>,
581        chunk_size: Option<usize>,
582    ) -> Result<FileName, DracoonClientError>;
583    async fn upload_to_s3_encrypted(
584        &self,
585        access_key: String,
586        share: &PublicUploadShare,
587        upload_options: UploadOptions,
588        reader: BufReader<R>,
589        callback: Option<UploadProgressCallback>,
590        chunk_size: Option<usize>,
591    ) -> Result<FileName, DracoonClientError>;
592
593    async fn finalize_s3_upload(
594        &self,
595        access_key: String,
596        upload_id: String,
597        complete_file_upload_req: CompleteS3ShareUploadRequest,
598    ) -> Result<(), DracoonClientError>;
599
600    async fn get_upload_status(
601        &self,
602        access_key: String,
603        upload_id: String,
604    ) -> Result<S3ShareUploadStatus, DracoonClientError>;
605}
606
607#[async_trait]
608trait PublicUploadInternalNfs<R: AsyncRead, S>:
609    StreamUploadInternal<S> + PublicUploadInternal<R, S>
610{
611    async fn upload_to_nfs_unencrypted(
612        &self,
613        access_key: String,
614        share: &PublicUploadShare,
615        upload_options: UploadOptions,
616        reader: BufReader<R>,
617        callback: Option<UploadProgressCallback>,
618        chunk_size: Option<usize>,
619    ) -> Result<FileName, DracoonClientError>;
620    async fn upload_to_nfs_encrypted(
621        &self,
622        access_key: String,
623        share: &PublicUploadShare,
624        upload_options: UploadOptions,
625        reader: BufReader<R>,
626        callback: Option<UploadProgressCallback>,
627        chunk_size: Option<usize>,
628    ) -> Result<FileName, DracoonClientError>;
629    async fn finalize_nfs_upload(
630        &self,
631        access_key: String,
632        upload_id: String,
633        user_file_key_list: Option<UserFileKeyList>,
634    ) -> Result<PublicUploadedFileData, DracoonClientError>;
635}
636
637#[async_trait]
638impl<R: AsyncRead + Send + Sync + Unpin + 'static, S: Send + Sync> PublicUploadInternalNfs<R, S>
639    for PublicEndpoint<S>
640{
641    async fn upload_to_nfs_unencrypted(
642        &self,
643        access_key: String,
644        share: &PublicUploadShare,
645        upload_options: UploadOptions,
646        mut reader: BufReader<R>,
647        callback: Option<UploadProgressCallback>,
648        chunk_size: Option<usize>,
649    ) -> Result<FileName, DracoonClientError> {
650        let fm = upload_options.file_meta.clone();
651
652        let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
653
654        // create upload channel
655        let file_upload_req =
656            CreateShareUploadChannelRequest::from_upload_options(&upload_options, None, None);
657
658        let upload_channel =
659            <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel::<'_, '_>(
660                self,
661                access_key.clone(),
662                file_upload_req,
663            )
664            .await
665            .map_err(|err| {
666                error!("Error creating upload channel: {}", err);
667                err
668            })?;
669
670        let (count_chunks, last_chunk_size) = calculate_s3_url_count(fm.size, chunk_size as u64);
671        let mut chunk_part: u32 = 1;
672
673        let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
674
675        if count_chunks > 1 {
676            while chunk_part < count_chunks {
677                let mut buffer = vec![0; chunk_size];
678                let cb = cloneable_callback.clone();
679                let fm = fm.clone();
680
681                match reader.read_exact(&mut buffer).await {
682                    Ok(0) => break,
683                    Ok(n) => {
684                        buffer.truncate(n);
685                        let chunk = bytes::Bytes::from(buffer);
686
687                        let stream: async_stream::__private::AsyncStream<
688                            Result<bytes::Bytes, std::io::Error>,
689                            _,
690                        > = async_stream::stream! {
691                            let mut buffer = Vec::new();
692                            let mut bytes_read = 0;
693
694                            for byte in chunk.iter() {
695                            buffer.push(*byte);
696                            bytes_read += 1;
697                            if buffer.len() == 1024 || bytes_read == chunk.len() {
698                            if let Some(callback) = cb.clone() {
699                                callback.call(buffer.len() as u64, fm.size);
700                                        }
701                                yield Ok(bytes::Bytes::from(buffer.clone()));
702                                buffer.clear();
703                                }
704                            }
705                        };
706
707                        let url = upload_channel.upload_url.clone();
708
709                        // truncation is safe because chunk_size is 32 MB
710                        #[allow(clippy::cast_possible_truncation, clippy::cast_lossless)]
711                        let curr_pos: u64 = ((chunk_part - 1) * (chunk_size as u32)) as u64;
712
713                        self.upload_stream_to_nfs(
714                            Box::pin(stream),
715                            &url,
716                            upload_options.file_meta.size,
717                            n,
718                            Some(curr_pos),
719                        )
720                        .await?;
721
722                        chunk_part += 1;
723                    }
724                    Err(err) => {
725                        error!("Error reading file: {}", err);
726                        return Err(DracoonClientError::IoError);
727                    }
728                }
729            }
730        }
731
732        // upload last chunk
733        let mut buffer = vec![
734            0;
735            last_chunk_size
736                .try_into()
737                .map_err(|_| DracoonClientError::IoError)?
738        ];
739        let cb = cloneable_callback.clone();
740        match reader.read_exact(&mut buffer).await {
741            Ok(n) => {
742                buffer.truncate(n);
743                let chunk = bytes::Bytes::from(buffer);
744                let stream: async_stream::__private::AsyncStream<
745                    Result<bytes::Bytes, std::io::Error>,
746                    _,
747                > = async_stream::stream! {
748                    let mut buffer = Vec::new();
749                    let mut bytes_read = 0;
750
751                    for byte in chunk.iter() {
752                    buffer.push(*byte);
753                    bytes_read += 1;
754                    if buffer.len() == 1024 || bytes_read == chunk.len() {
755                    if let Some(callback) = cb.clone() {
756                        callback.call(buffer.len() as u64, fm.size);
757                                }
758                        yield Ok(bytes::Bytes::from(buffer.clone()));
759                        buffer.clear();
760                        }
761                    }
762
763                };
764
765                let url = upload_channel.upload_url.clone();
766
767                let curr_pos: u64 = (chunk_part - 1) as u64 * (DEFAULT_UPLOAD_CHUNK_SIZE as u64);
768
769                let e_tag = self
770                    .upload_stream_to_nfs(
771                        Box::pin(stream),
772                        &url,
773                        upload_options.file_meta.size,
774                        n,
775                        Some(curr_pos),
776                    )
777                    .await?;
778            }
779            Err(err) => {
780                error!("Error reading file: {}", err);
781                return Err(DracoonClientError::IoError);
782            }
783        }
784
785        let public_upload =
786            <PublicEndpoint<S> as PublicUploadInternalNfs<R, S>>::finalize_nfs_upload::<'_, '_>(
787                self,
788                access_key.clone(),
789                upload_channel.upload_id.clone(),
790                None,
791            )
792            .await
793            .map_err(|err| {
794                error!("Error finalizing upload: {}", err);
795                err
796            })?;
797
798        Ok(public_upload.name)
799    }
800    async fn upload_to_nfs_encrypted(
801        &self,
802        access_key: String,
803        share: &PublicUploadShare,
804        upload_options: UploadOptions,
805        reader: BufReader<R>,
806        callback: Option<UploadProgressCallback>,
807        chunk_size: Option<usize>,
808    ) -> Result<FileName, DracoonClientError> {
809        let chunk_size = chunk_size.unwrap_or(DEFAULT_UPLOAD_CHUNK_SIZE);
810        let mut encrypted_upload = StreamingEncryptedUpload::new(reader, chunk_size)?;
811
812        let fm = upload_options.file_meta.clone();
813
814        // create upload channel
815        let file_upload_req =
816            CreateShareUploadChannelRequest::from_upload_options(&upload_options, None, None);
817
818        let upload_channel =
819            <PublicEndpoint<S> as PublicUploadInternal<R, S>>::create_upload_channel::<'_, '_>(
820                self,
821                access_key.clone(),
822                file_upload_req,
823            )
824            .await
825            .map_err(|err| {
826                error!("Error creating upload channel: {}", err);
827                err
828            })?;
829
830        let cloneable_callback = callback.map(CloneableUploadProgressCallback::new);
831        let mut curr_pos = 0u64;
832
833        while let Some(chunk) = encrypted_upload.next_chunk(chunk_size).await? {
834            let cb = cloneable_callback.clone();
835            let fm = fm.clone();
836            let chunk_len = chunk.len();
837            let stream: async_stream::__private::AsyncStream<
838                Result<bytes::Bytes, std::io::Error>,
839                _,
840            > = async_stream::stream! {
841                let mut buffer = Vec::new();
842                let mut bytes_read = 0;
843
844                for byte in chunk.iter() {
845                    buffer.push(*byte);
846                    bytes_read += 1;
847                    if buffer.len() == 1024 || bytes_read == chunk.len() {
848                        if let Some(callback) = cb.clone() {
849                            callback.call(buffer.len() as u64, fm.size);
850                        }
851                        yield Ok(bytes::Bytes::from(buffer.clone()));
852                        buffer.clear();
853                    }
854                }
855            };
856
857            let url = upload_channel.upload_url.clone();
858
859            self.upload_stream_to_nfs(
860                Box::pin(stream),
861                &url,
862                upload_options.file_meta.size,
863                chunk_len,
864                Some(curr_pos),
865            )
866            .await
867            .map_err(|err| {
868                error!("Error uploading stream to NFS: {}", err);
869                err
870            })?;
871
872            curr_pos += chunk_len as u64;
873        }
874
875        let plain_file_key = encrypted_upload.into_plain_file_key()?;
876        let public_keys = share.user_user_public_key_list.clone().unwrap_or_default();
877
878        let mut user_file_keys = Vec::new();
879        for key in public_keys.items {
880            let user_id = key.id;
881            match DracoonCrypto::encrypt_file_key(
882                plain_file_key.clone(),
883                key.public_key_container.clone(),
884            ) {
885                Ok(file_key) => user_file_keys.push(UserFileKey::new(user_id, file_key)),
886                Err(err) => warn!(
887                    user_id,
888                    access_key = %access_key,
889                    file_name = %upload_options.file_meta.name,
890                    error = ?err,
891                    "Skipping public upload recipient key distribution",
892                ),
893            }
894        }
895
896        let user_file_keys = UserFileKeyList::from(user_file_keys);
897
898        let public_upload =
899            <PublicEndpoint<S> as PublicUploadInternalNfs<R, S>>::finalize_nfs_upload::<'_, '_>(
900                self,
901                access_key.clone(),
902                upload_channel.upload_id.clone(),
903                Some(user_file_keys),
904            )
905            .await
906            .map_err(|err| {
907                error!("Error finalizing upload: {}", err);
908                err
909            })?;
910
911        Ok(public_upload.name)
912    }
913    async fn finalize_nfs_upload(
914        &self,
915        access_key: String,
916        upload_id: String,
917        user_file_key_list: Option<UserFileKeyList>,
918    ) -> Result<PublicUploadedFileData, DracoonClientError> {
919        let url_part = format!(
920            "{DRACOON_API_PREFIX}/{PUBLIC_BASE}/{PUBLIC_SHARES_BASE}/{PUBLIC_UPLOAD_SHARES}/{}/{}",
921            access_key, upload_id
922        );
923
924        let url = self.client().build_api_url(&url_part);
925
926        let response = match user_file_key_list {
927            Some(user_file_keys) => {
928                self.client()
929                    .http
930                    .put(url)
931                    .json(&user_file_keys)
932                    .send()
933                    .await?
934            }
935            None => self.client().http.put(url).send().await?,
936        };
937
938        PublicUploadedFileData::from_response(response).await
939    }
940}
941
942#[cfg(test)]
943mod tests {
944    use std::io::Cursor;
945
946    use dco3_crypto::{DracoonCrypto, DracoonRSACrypto, UserKeyPairVersion};
947    use mockito::Matcher;
948
949    use crate::{
950        nodes::{FileMeta, UploadOptions, UserUserPublicKey},
951        public::UserUserPublicKeyList,
952        Dracoon,
953    };
954
955    use super::*;
956
957    fn public_upload_share(is_encrypted: bool) -> PublicUploadShare {
958        let mut share: PublicUploadShare = serde_json::from_str(include_str!(
959            "../tests/responses/public/upload_share_ok.json"
960        ))
961        .unwrap();
962        share.is_encrypted = Some(is_encrypted);
963        share
964    }
965
966    fn encrypted_public_upload_share() -> PublicUploadShare {
967        let mut share = public_upload_share(true);
968        let keypair = DracoonCrypto::create_plain_user_keypair(UserKeyPairVersion::RSA4096)
969            .expect("public key generation should succeed");
970
971        share.user_user_public_key_list = Some(UserUserPublicKeyList {
972            items: vec![UserUserPublicKey {
973                id: 7,
974                public_key_container: keypair.public_key_container.clone(),
975            }],
976        });
977
978        share
979    }
980
981    fn s3_urls_response(base_url: &str) -> String {
982        include_str!("../tests/responses/upload/s3_urls_ok_with_placeholder.json")
983            .replace("$base_url", base_url)
984    }
985
986    fn upload_status_done(file_name: &str) -> String {
987        format!(r#"{{"status":"done","fileName":"{file_name}"}}"#)
988    }
989
990    fn nfs_upload_channel_response(base_url: &str) -> String {
991        let base_url = base_url.trim_end_matches('/');
992        format!(r#"{{"uploadUrl":"{base_url}/upload_url","uploadId":"string"}}"#)
993    }
994
995    fn uploaded_file_response(name: &str, size: u64) -> String {
996        format!(r#"{{"name":"{name}","size":{size},"createdAt":"2021-01-01T00:00:00.000Z"}}"#)
997    }
998
999    #[tokio::test]
1000    async fn test_upload_to_s3_unencrypted() {
1001        let mut mock_server = mockito::Server::new_async().await;
1002        let base_url = mock_server.url();
1003
1004        let client = Dracoon::builder()
1005            .with_base_url(base_url.clone())
1006            .with_client_id("client_id")
1007            .with_client_secret("client_secret")
1008            .build()
1009            .unwrap();
1010        let public = client.public();
1011
1012        let reader = BufReader::new(Cursor::new(vec![
1013            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1014        ]));
1015        let upload_options =
1016            UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1017        let share = public_upload_share(false);
1018        let access_key = "test-access-key";
1019        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1020        let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1021        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1022        let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1023
1024        let upload_channel_mock = mock_server
1025            .mock("POST", upload_path.as_str())
1026            .with_status(201)
1027            .with_body(include_str!(
1028                "../tests/responses/upload/upload_channel_ok.json"
1029            ))
1030            .with_header("content-type", "application/json")
1031            .create();
1032
1033        let s3_urls_mock = mock_server
1034            .mock("POST", s3_urls_path.as_str())
1035            .with_status(201)
1036            .with_body(s3_urls_response(&base_url))
1037            .with_header("content-type", "application/json")
1038            .create();
1039
1040        let upload_mock = mock_server
1041            .mock("PUT", "/upload_url")
1042            .with_status(202)
1043            .with_header("etag", "string")
1044            .create();
1045
1046        let finalize_mock = mock_server
1047            .mock("PUT", finalize_path.as_str())
1048            .with_status(202)
1049            .create();
1050
1051        let status_mock = mock_server
1052            .mock("GET", status_path.as_str())
1053            .with_status(200)
1054            .with_body(upload_status_done("test.txt"))
1055            .with_header("content-type", "application/json")
1056            .create();
1057
1058        let file_name = public
1059            .upload_to_s3_unencrypted(
1060                access_key.into(),
1061                &share,
1062                upload_options,
1063                reader,
1064                None,
1065                None,
1066            )
1067            .await
1068            .unwrap();
1069
1070        assert_eq!(file_name, "test.txt");
1071        upload_channel_mock.assert();
1072        s3_urls_mock.assert();
1073        upload_mock.assert();
1074        finalize_mock.assert();
1075        status_mock.assert();
1076    }
1077
1078    #[tokio::test]
1079    async fn test_upload_to_s3_encrypted() {
1080        let mut mock_server = mockito::Server::new_async().await;
1081        let base_url = mock_server.url();
1082
1083        let client = Dracoon::builder()
1084            .with_base_url(base_url.clone())
1085            .with_client_id("client_id")
1086            .with_client_secret("client_secret")
1087            .build()
1088            .unwrap();
1089        let public = client.public();
1090
1091        let reader = BufReader::new(Cursor::new(vec![
1092            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1093        ]));
1094        let upload_options =
1095            UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1096        let share = encrypted_public_upload_share();
1097        let access_key = "test-access-key";
1098        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1099        let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1100        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1101        let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1102
1103        let upload_channel_mock = mock_server
1104            .mock("POST", upload_path.as_str())
1105            .with_status(201)
1106            .with_body(include_str!(
1107                "../tests/responses/upload/upload_channel_ok.json"
1108            ))
1109            .with_header("content-type", "application/json")
1110            .create();
1111
1112        let s3_urls_mock = mock_server
1113            .mock("POST", s3_urls_path.as_str())
1114            .with_status(201)
1115            .with_body(s3_urls_response(&base_url))
1116            .with_header("content-type", "application/json")
1117            .create();
1118
1119        let upload_mock = mock_server
1120            .mock("PUT", "/upload_url")
1121            .with_status(202)
1122            .with_header("etag", "string")
1123            .create();
1124
1125        let finalize_mock = mock_server
1126            .mock("PUT", finalize_path.as_str())
1127            .match_body(Matcher::Regex(
1128                r#"(?s).*userFileKeyList.*"userId":7.*"#.to_string(),
1129            ))
1130            .with_status(202)
1131            .create();
1132
1133        let status_mock = mock_server
1134            .mock("GET", status_path.as_str())
1135            .with_status(200)
1136            .with_body(upload_status_done("test.txt"))
1137            .with_header("content-type", "application/json")
1138            .create();
1139
1140        let file_name = public
1141            .upload_to_s3_encrypted(
1142                access_key.into(),
1143                &share,
1144                upload_options,
1145                reader,
1146                None,
1147                None,
1148            )
1149            .await
1150            .unwrap();
1151
1152        assert_eq!(file_name, "test.txt");
1153        upload_channel_mock.assert();
1154        s3_urls_mock.assert();
1155        upload_mock.assert();
1156        finalize_mock.assert();
1157        status_mock.assert();
1158    }
1159
1160    #[tokio::test]
1161    async fn test_upload_to_s3_encrypted_streams_multiple_parts() {
1162        let mut mock_server = mockito::Server::new_async().await;
1163        let base_url = mock_server.url();
1164
1165        let client = Dracoon::builder()
1166            .with_base_url(base_url.clone())
1167            .with_client_id("client_id")
1168            .with_client_secret("client_secret")
1169            .build()
1170            .unwrap();
1171        let public = client.public();
1172
1173        let reader = BufReader::new(Cursor::new(vec![
1174            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255,
1175        ]));
1176        let upload_options =
1177            UploadOptions::builder(FileMeta::builder("test.txt", 12).build()).build();
1178        let share = encrypted_public_upload_share();
1179        let access_key = "test-access-key";
1180        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1181        let s3_urls_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3_urls");
1182        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string/s3");
1183        let status_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1184
1185        let upload_channel_mock = mock_server
1186            .mock("POST", upload_path.as_str())
1187            .with_status(201)
1188            .with_body(include_str!(
1189                "../tests/responses/upload/upload_channel_ok.json"
1190            ))
1191            .with_header("content-type", "application/json")
1192            .create();
1193
1194        let s3_urls_mock = mock_server
1195            .mock("POST", s3_urls_path.as_str())
1196            .with_status(201)
1197            .with_body(s3_urls_response(&base_url))
1198            .with_header("content-type", "application/json")
1199            .expect(3)
1200            .create();
1201
1202        let upload_mock = mock_server
1203            .mock("PUT", "/upload_url")
1204            .with_status(202)
1205            .with_header("etag", "string")
1206            .expect(3)
1207            .create();
1208
1209        let finalize_mock = mock_server
1210            .mock("PUT", finalize_path.as_str())
1211            .match_body(Matcher::Regex(
1212                r#"(?s).*"partNumber":1.*"partNumber":2.*"partNumber":3.*userFileKeyList.*"userId":7.*"#
1213                    .to_string(),
1214            ))
1215            .with_status(202)
1216            .create();
1217
1218        let status_mock = mock_server
1219            .mock("GET", status_path.as_str())
1220            .with_status(200)
1221            .with_body(upload_status_done("test.txt"))
1222            .with_header("content-type", "application/json")
1223            .create();
1224
1225        let file_name = public
1226            .upload_to_s3_encrypted(
1227                access_key.into(),
1228                &share,
1229                upload_options,
1230                reader,
1231                None,
1232                Some(4),
1233            )
1234            .await
1235            .unwrap();
1236
1237        assert_eq!(file_name, "test.txt");
1238        upload_channel_mock.assert();
1239        s3_urls_mock.assert();
1240        upload_mock.assert();
1241        finalize_mock.assert();
1242        status_mock.assert();
1243    }
1244
1245    #[tokio::test]
1246    async fn test_upload_to_nfs_unencrypted() {
1247        let mut mock_server = mockito::Server::new_async().await;
1248        let base_url = mock_server.url();
1249
1250        let client = Dracoon::builder()
1251            .with_base_url(base_url.clone())
1252            .with_client_id("client_id")
1253            .with_client_secret("client_secret")
1254            .build()
1255            .unwrap();
1256        let public = client.public();
1257
1258        let reader = BufReader::new(Cursor::new(vec![
1259            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1260        ]));
1261        let upload_options =
1262            UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1263        let share = public_upload_share(false);
1264        let access_key = "test-access-key";
1265        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1266        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1267
1268        let upload_channel_mock = mock_server
1269            .mock("POST", upload_path.as_str())
1270            .with_status(201)
1271            .with_body(nfs_upload_channel_response(&base_url))
1272            .with_header("content-type", "application/json")
1273            .create();
1274
1275        let upload_mock = mock_server
1276            .mock("POST", "/upload_url")
1277            .with_status(202)
1278            .create();
1279
1280        let finalize_mock = mock_server
1281            .mock("PUT", finalize_path.as_str())
1282            .with_status(200)
1283            .with_body(uploaded_file_response("test.txt", 16))
1284            .with_header("content-type", "application/json")
1285            .create();
1286
1287        let file_name = public
1288            .upload_to_nfs_unencrypted(
1289                access_key.into(),
1290                &share,
1291                upload_options,
1292                reader,
1293                None,
1294                None,
1295            )
1296            .await
1297            .unwrap();
1298
1299        assert_eq!(file_name, "test.txt");
1300        upload_channel_mock.assert();
1301        upload_mock.assert();
1302        finalize_mock.assert();
1303    }
1304
1305    #[tokio::test]
1306    async fn test_upload_to_nfs_encrypted() {
1307        let mut mock_server = mockito::Server::new_async().await;
1308        let base_url = mock_server.url();
1309
1310        let client = Dracoon::builder()
1311            .with_base_url(base_url.clone())
1312            .with_client_id("client_id")
1313            .with_client_secret("client_secret")
1314            .build()
1315            .unwrap();
1316        let public = client.public();
1317
1318        let reader = BufReader::new(Cursor::new(vec![
1319            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255, 0, 12, 33, 44,
1320        ]));
1321        let upload_options =
1322            UploadOptions::builder(FileMeta::builder("test.txt", 16).build()).build();
1323        let share = encrypted_public_upload_share();
1324        let access_key = "test-access-key";
1325        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1326        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1327
1328        let upload_channel_mock = mock_server
1329            .mock("POST", upload_path.as_str())
1330            .with_status(201)
1331            .with_body(nfs_upload_channel_response(&base_url))
1332            .with_header("content-type", "application/json")
1333            .create();
1334
1335        let upload_mock = mock_server
1336            .mock("POST", "/upload_url")
1337            .with_status(202)
1338            .create();
1339
1340        let finalize_mock = mock_server
1341            .mock("PUT", finalize_path.as_str())
1342            .match_body(Matcher::Regex(
1343                r#"(?s).*"items":\[.*"userId":7.*"#.to_string(),
1344            ))
1345            .with_status(200)
1346            .with_body(uploaded_file_response("test.txt", 16))
1347            .with_header("content-type", "application/json")
1348            .create();
1349
1350        let file_name = public
1351            .upload_to_nfs_encrypted(
1352                access_key.into(),
1353                &share,
1354                upload_options,
1355                reader,
1356                None,
1357                None,
1358            )
1359            .await
1360            .unwrap();
1361
1362        assert_eq!(file_name, "test.txt");
1363        upload_channel_mock.assert();
1364        upload_mock.assert();
1365        finalize_mock.assert();
1366    }
1367
1368    #[tokio::test]
1369    async fn test_upload_to_nfs_encrypted_streams_multiple_chunks_with_offsets() {
1370        let mut mock_server = mockito::Server::new_async().await;
1371        let base_url = mock_server.url();
1372
1373        let client = Dracoon::builder()
1374            .with_base_url(base_url.clone())
1375            .with_client_id("client_id")
1376            .with_client_secret("client_secret")
1377            .build()
1378            .unwrap();
1379        let public = client.public();
1380
1381        let reader = BufReader::new(Cursor::new(vec![
1382            0, 12, 33, 44, 55, 66, 77, 88, 99, 111, 222, 255,
1383        ]));
1384        let upload_options =
1385            UploadOptions::builder(FileMeta::builder("test.txt", 12).build()).build();
1386        let share = encrypted_public_upload_share();
1387        let access_key = "test-access-key";
1388        let upload_path = format!("/api/v4/public/shares/uploads/{access_key}");
1389        let finalize_path = format!("/api/v4/public/shares/uploads/{access_key}/string");
1390
1391        let upload_channel_mock = mock_server
1392            .mock("POST", upload_path.as_str())
1393            .with_status(201)
1394            .with_body(nfs_upload_channel_response(&base_url))
1395            .with_header("content-type", "application/json")
1396            .create();
1397
1398        let upload_mock_1 = mock_server
1399            .mock("POST", "/upload_url")
1400            .match_header("content-range", "bytes 0-4/12")
1401            .with_status(202)
1402            .expect(1)
1403            .create();
1404
1405        let upload_mock_2 = mock_server
1406            .mock("POST", "/upload_url")
1407            .match_header("content-range", "bytes 4-8/12")
1408            .with_status(202)
1409            .expect(1)
1410            .create();
1411
1412        let upload_mock_3 = mock_server
1413            .mock("POST", "/upload_url")
1414            .match_header("content-range", "bytes 8-12/12")
1415            .with_status(202)
1416            .expect(1)
1417            .create();
1418
1419        let finalize_mock = mock_server
1420            .mock("PUT", finalize_path.as_str())
1421            .match_body(Matcher::Regex(
1422                r#"(?s).*"items":\[.*"userId":7.*"#.to_string(),
1423            ))
1424            .with_status(200)
1425            .with_body(uploaded_file_response("test.txt", 12))
1426            .with_header("content-type", "application/json")
1427            .create();
1428
1429        let file_name = public
1430            .upload_to_nfs_encrypted(
1431                access_key.into(),
1432                &share,
1433                upload_options,
1434                reader,
1435                None,
1436                Some(4),
1437            )
1438            .await
1439            .unwrap();
1440
1441        assert_eq!(file_name, "test.txt");
1442        upload_channel_mock.assert();
1443        upload_mock_1.assert();
1444        upload_mock_2.assert();
1445        upload_mock_3.assert();
1446        finalize_mock.assert();
1447    }
1448}