1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
use std::{
    cmp,
    collections::HashMap,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
};

use anyhow::Result;
use async_trait::async_trait;
use console::style;
use futures::future::select_all;
pub use indicatif::ProgressBar;
use tokio::task::JoinHandle;

use crate::{
    cache::Cache,
    config::{ConfigData, SugarConfig, UploadMethod},
    constants::PARALLEL_LIMIT,
    upload::{
        assets::{AssetPair, DataType},
        methods::*,
        UploadError,
    },
};

// Size of the mock media URI for cost calculations.
pub const MOCK_URI_SIZE: usize = 100;

/// Struct representing an asset ready for upload. An `AssetInfo` can represent
/// a physical file, in which case the `content` will correspond to the name
/// of the file; or an in-memory asset, in which case the `content` will correspond
/// to the content of the asset.
///
/// For example, for image files, the `content` contains the path of the file on the
/// file system. In the case of json metadata files, the `content` contains the string
/// representation of the json metadata.
///
pub struct AssetInfo {
    /// Id of the asset in the cache.
    pub asset_id: String,
    /// Name (file name) of the asset.
    pub name: String,
    /// Content of the asset - either a file path or the string representation of the content.
    pub content: String,
    /// Type of the asset.
    pub data_type: DataType,
    /// MIME content type.
    pub content_type: String,
}

/// Types that can be prepared to upload assets (files).
///
/// All implementation of [`Uploader`](Uploader) need to implement this trait.
#[async_trait]
pub trait Prepare {
    /// Prepare the upload of the specified media/metadata files, e.g.:
    /// - check if any file exceeds a size limit;
    /// - check if there is storage space for the upload;
    /// - check/add funds for the upload.
    ///
    /// The `prepare` receives the information of all files that will be upload.
    ///
    /// # Arguments
    ///
    /// * `sugar_config` - The current sugar configuration
    /// * `asset_pairs` - Mapping of `index` to an `AssetPair`
    /// * `asset_indices` - Vector with the information of which asset pair indices will be upload grouped by type.
    ///
    /// The `asset_pairs` contain the complete information of the assets, but only the assets specified in the
    /// `asset_indices` will be uploaded. E.g., if index `1` is only present in the `DataType::Image` indices' array,
    /// only the image of asset `1` will the uploaded.
    ///
    async fn prepare(
        &self,
        sugar_config: &SugarConfig,
        asset_pairs: &HashMap<isize, AssetPair>,
        asset_indices: Vec<(DataType, &[isize])>,
    ) -> Result<()>;
}

/// Types that can upload assets (files).
///
/// This trait should be implemented directly by upload methods that require full control on how the upload
/// is performed. For methods that support parallel uploads (threading), consider implementing
/// [`ParallelUploader`](ParallelUploader) instead.
///
#[async_trait]
pub trait Uploader: Prepare {
    /// Returns a vector [`UploadError`](super::errors::UploadError) with the errors (if any) after uploading all
    /// assets to the storage.
    ///
    /// This function will be called to upload each type of asset separately.
    ///
    /// # Arguments
    ///
    /// * `sugar_config` - The current sugar configuration
    /// * `cache` - Asset [`cache`](crate::cache::Cache) object (mutable)
    /// * `data_type` - Type of the asset being uploaded
    /// * `assets` - Vector of [`assets`](AssetInfo) to upload (mutable)
    /// * `progress` - Reference to the [`progress bar`](indicatif::ProgressBar) to provide feedback to
    ///                the console
    /// * `interrupted` - Reference to the shared interruption handler [`flag`](std::sync::atomic::AtomicBool)
    ///                   to receive notifications
    ///
    /// # Examples
    ///
    /// Implementations are expected to use the `interrupted` to control when the user aborts the upload process.
    /// In general, this would involve using it as a control of a loop:
    ///
    /// ```ignore
    /// while !interrupted.load(Ordering::SeqCst) {
    ///     // continue with the upload
    /// }
    /// ```
    ///
    /// After uploading an asset, its information need to be updated in the cache and the cache
    /// [`sync`](crate::cache::Cache#method.sync_file)ed to the file system. Syncing the cache to the file system
    /// might be slow for large collections, therefore it should be done as frequent as practical to avoid slowing
    /// down the upload process and, at the same time, minimizing the chances of information loss in case
    /// the user aborts the upload.
    ///
    /// ```ignore
    /// ...
    /// // once an asset has been upload
    ///
    /// let id = asset_info.asset_id.clone();
    /// let uri = "URI of the asset after upload";
    /// // cache item to update
    /// let item = cache.items.get_mut(&id).unwrap();
    ///
    /// match data_type {
    ///     DataType::Image => item.image_link = uri,
    ///     DataType::Metadata => item.metadata_link = uri,
    ///     DataType::Animation => item.animation_link = Some(uri),
    /// }
    /// // updates the progress bar
    /// progress.inc(1);
    ///
    /// ...
    ///
    /// // after several uploads
    /// cache.sync_file()?;
    /// ```
    ///
    async fn upload(
        &self,
        sugar_config: &SugarConfig,
        cache: &mut Cache,
        data_type: DataType,
        assets: &mut Vec<AssetInfo>,
        progress: &ProgressBar,
        interrupted: Arc<AtomicBool>,
    ) -> Result<Vec<UploadError>>;
}

/// Types that can upload assets in parallel.
///
/// This trait abstracts the threading logic and allows methods to focus on the logic of uploading a single
/// asset (file).
#[async_trait]
pub trait ParallelUploader: Uploader + Send + Sync {
    /// Returns a [`JoinHandle`](tokio::task::JoinHandle) to the task responsible to upload the specified asset.
    ///
    /// # Arguments
    ///
    /// * `asset` - The [`asset`](AssetInfo) to upload
    ///
    /// # Example
    ///
    /// In most cases, the function will return the value from [`tokio::spawn`](tokio::spawn):
    ///
    /// ```ignore
    /// tokio::spawn(async move {
    ///     // code responsible to upload a single asset
    /// });
    /// ```
    ///
    fn upload_asset(&self, asset: AssetInfo) -> JoinHandle<Result<(String, String)>>;
}

/// Default implementation of the trait ['Uploader'](Uploader) for all ['ParallelUploader'](ParallelUploader).
///
#[async_trait]
impl<T: ParallelUploader> Uploader for T {
    /// Uploads assets in parallel. It creates `PARALLEL_LIMIT`[PARALLEL_LIMIT] tasks at a time to avoid
    /// reaching the limit of concurrent files open and it syncs the cache file at every `PARALLEL_LIMIT / 2`
    /// step.
    ///
    async fn upload(
        &self,
        _sugar_config: &SugarConfig,
        cache: &mut Cache,
        data_type: DataType,
        assets: &mut Vec<AssetInfo>,
        progress: &ProgressBar,
        interrupted: Arc<AtomicBool>,
    ) -> Result<Vec<UploadError>> {
        let mut handles = Vec::new();

        for task in assets.drain(0..cmp::min(assets.len(), PARALLEL_LIMIT)) {
            handles.push(self.upload_asset(task));
        }

        let mut errors = Vec::new();

        while !interrupted.load(Ordering::SeqCst) && !handles.is_empty() {
            match select_all(handles).await {
                (Ok(res), _index, remaining) => {
                    // independently if the upload was successful or not
                    // we continue to try the remaining ones
                    handles = remaining;
                    if res.is_ok() {
                        let val = res?;
                        let link = val.clone().1;
                        // cache item to update
                        let item = cache.items.0.get_mut(&val.0).unwrap();
                        match data_type {
                            DataType::Image => item.image_link = link,
                            DataType::Metadata => item.metadata_link = link,
                            DataType::Animation => item.animation_link = Some(link),
                        }
                        // updates the progress bar
                        progress.inc(1);
                    } else {
                        // user will need to retry the upload
                        errors.push(UploadError::SendDataFailed(format!(
                            "Upload error: {:?}",
                            res.err().unwrap()
                        )));
                    }
                }
                (Err(err), _index, remaining) => {
                    errors.push(UploadError::SendDataFailed(format!(
                        "Upload error: {:?}",
                        err
                    )));
                    // ignoring all errors
                    handles = remaining;
                }
            }
            if !assets.is_empty() {
                // if we are half way through, let spawn more transactions
                if (PARALLEL_LIMIT - handles.len()) > (PARALLEL_LIMIT / 2) {
                    // syncs cache (checkpoint)
                    cache.sync_file()?;
                    for task in assets.drain(0..cmp::min(assets.len(), PARALLEL_LIMIT / 2)) {
                        handles.push(self.upload_asset(task));
                    }
                }
            }
        }

        if errors.is_empty() && !assets.is_empty() {
            progress.abandon_with_message(format!("{}", style("Upload aborted ").red().bold()));
            return Err(
                UploadError::SendDataFailed("Not all files were uploaded.".to_string()).into(),
            );
        }

        Ok(errors)
    }
}

/// Returns a new uploader trait object based on the configuration `uploadMethod`.
///
/// This function acts as a *factory* function for uploader objects.
///
pub async fn initialize(
    sugar_config: &SugarConfig,
    config_data: &ConfigData,
) -> Result<Box<dyn Uploader>> {
    Ok(match config_data.upload_method {
        UploadMethod::AWS => Box::new(AWSMethod::new(config_data).await?) as Box<dyn Uploader>,
        UploadMethod::Bundlr => {
            Box::new(BundlrMethod::new(sugar_config, config_data).await?) as Box<dyn Uploader>
        }
        UploadMethod::NftStorage => {
            Box::new(NftStorageMethod::new(config_data).await?) as Box<dyn Uploader>
        }
        UploadMethod::SHDW => {
            Box::new(SHDWMethod::new(sugar_config, config_data).await?) as Box<dyn Uploader>
        }
    })
}