1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use futures::io::Cursor;
use time::Duration;

use crate::raw::*;
use crate::*;

/// ObjectMultipart represent an ongoing multipart upload.
///
/// # Process
///
/// ```txt
/// create
///     -> write
///         -> complete to build a normal Object
///         -> abort to cancel upload and delete all existing parts
/// ```
///
/// # Notes
///
/// Before [`ObjectMultipart::complete`] has been called, we can't read any content from this multipart object.
pub struct ObjectMultipart {
    acc: Arc<dyn Accessor>,
    path: String,
    upload_id: String,
}

impl ObjectMultipart {
    /// Build a new MultipartObject.
    pub fn new(op: Operator, path: &str, upload_id: &str) -> Self {
        Self {
            acc: op.inner(),
            path: normalize_path(path),
            upload_id: upload_id.to_string(),
        }
    }

    /// Fetch the operator that used by this object.
    pub fn operatoer(&self) -> Operator {
        self.acc.clone().into()
    }

    /// Write a new [`ObjectPart`] with specified part number.
    pub async fn write(&self, part_number: usize, bs: impl Into<Vec<u8>>) -> Result<ObjectPart> {
        let bs = bs.into();

        let op = OpWriteMultipart::new(self.upload_id.clone(), part_number, bs.len() as u64);
        let r = Cursor::new(bs);
        let rp = self
            .acc
            .write_multipart(&self.path, op, Box::new(r))
            .await?;
        Ok(rp.into_object_part())
    }

    /// Complete multipart uploads with specified parts.
    ///
    /// # Notes
    ///
    /// - This operation will complete and finish this upload.
    /// - This operation will concat input parts to build a new object.
    /// - Input parts order is **SENSITIVE**, please make sure the order is correct.
    pub async fn complete(&self, parts: Vec<ObjectPart>) -> Result<Object> {
        let op = OpCompleteMultipart::new(self.upload_id.clone(), parts);
        self.acc.complete_multipart(&self.path, op).await?;

        Ok(Object::new(self.acc.clone().into(), &self.path))
    }

    /// Abort multipart uploads.
    ///
    /// # Notes
    ///
    /// - This operation will cancel this upload.
    /// - This operation will remove all parts that already uploaded.
    /// - This operation will return `succeeded` even when object or upload_id not exist.
    pub async fn abort(&self) -> Result<()> {
        let op = OpAbortMultipart::new(self.upload_id.clone());
        let _ = self.acc.abort_multipart(&self.path, op).await?;

        Ok(())
    }

    /// Presign an operation for write multipart.
    ///
    /// # TODO
    ///
    /// User need to handle the response by self which may differ for different platforms.
    pub fn presign_write(&self, part_number: usize, expire: Duration) -> Result<PresignedRequest> {
        let op = OpPresign::new(
            OpWriteMultipart::new(self.upload_id.clone(), part_number, 0),
            expire,
        );

        let rp = self.acc.presign(&self.path, op)?;
        Ok(rp.into_presigned_request())
    }
}

/// ObjectPart is generated by `write_multipart` operation, carries
/// required information for `complete_multipart`.
#[derive(Debug, Clone, Default)]
pub struct ObjectPart {
    part_number: usize,
    etag: String,
}

impl ObjectPart {
    /// Create a new part
    pub fn new(part_number: usize, etag: &str) -> Self {
        Self {
            part_number,
            etag: etag.to_string(),
        }
    }

    /// Get part_number from part.
    pub fn part_number(&self) -> usize {
        self.part_number
    }

    /// Get etag from part.
    pub fn etag(&self) -> &str {
        &self.etag
    }
}