1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

use azure::prelude::*;
use azure_sdk_core::errors::AzureError;
use azure_sdk_core::prelude::*;
use azure_sdk_storage_core::prelude::*;

use byteorder::{LittleEndian, WriteBytesExt};
use retry::{delay::jitter, delay::Exponential, retry, OperationResult};
use std::cmp;
use std::error::Error;
use std::fs::File;
use std::io::prelude::*;
use tokio_core::reactor::Core;
use url::Url;

const BACKOFF: u64 = 100;
const BACKOFF_COUNT: usize = 100;
const MAX_BLOCK_SIZE: usize = 1024 * 1024 * 100;

/// Converts the block index into an block_id
fn to_id(count: u64) -> Result<Vec<u8>, Box<dyn Error>> {
    let mut bytes = vec![];
    bytes.write_u64::<LittleEndian>(count)?;
    Ok(bytes)
}

/// Parse a SAS token into the relevant components
fn parse(sas: &str) -> Result<(String, String, String), Box<dyn Error>> {
    let parsed = Url::parse(sas)?;
    let account = if let Some(host) = parsed.host_str() {
        let v: Vec<&str> = host.split_terminator('.').collect();
        v[0]
    } else {
        return Err(From::from("invalid sas token (no account)"));
    };

    let path = parsed.path();
    let mut v: Vec<&str> = path.split_terminator('/').collect();
    v.remove(0);
    let container = v.remove(0);
    let blob_path = v.join("/");
    Ok((account.to_string(), container.to_string(), blob_path))
}

/// Upload a file to Azure Blob Store using a fully qualified SAS token
pub fn upload_sas(filename: &str, sas: &str, block_size: usize) -> Result<(), Box<dyn Error>> {
    let block_size = cmp::min(block_size, MAX_BLOCK_SIZE);
    let (account, container, path) = parse(sas)?;
    let client = Client::azure_sas(&account, sas)?;

    let mut core = Core::new()?;
    let mut file = File::open(filename)?;
    let size = file.metadata()?.len() as usize;
    let mut sent = 0;
    let mut blocks = BlockList { blocks: Vec::new() };
    let mut data = vec![0; block_size];
    while sent < size {
        let send_size = cmp::min(block_size, size - sent);
        let block_id = to_id(sent as u64)?;
        data.resize(send_size, 0);
        file.read_exact(&mut data)?;

        retry(
            Exponential::from_millis(BACKOFF)
                .map(jitter)
                .take(BACKOFF_COUNT),
            || {
                let response = core.run(
                    client
                        .put_block()
                        .with_container_name(&container)
                        .with_blob_name(&path)
                        .with_body(&data)
                        .with_block_id(&block_id)
                        .finalize(),
                );

                match response {
                    Ok(x) => OperationResult::Ok(x),
                    Err(x) => match x {
                        AzureError::HyperError(_) => OperationResult::Retry(x),
                        _ => OperationResult::Err(x),
                    },
                }
            },
        )?;

        blocks.blocks.push(BlobBlockType::Uncommitted(block_id));
        sent += send_size;
    }

    retry(
        Exponential::from_millis(BACKOFF)
            .map(jitter)
            .take(BACKOFF_COUNT),
        || {
            let response = core.run(
                client
                    .put_block_list()
                    .with_container_name(&container)
                    .with_blob_name(&path)
                    .with_block_list(&blocks)
                    .finalize(),
            );

            match response {
                Ok(x) => OperationResult::Ok(x),
                Err(x) => match x {
                    AzureError::HyperError(_) => OperationResult::Retry(x),
                    _ => OperationResult::Err(x),
                },
            }
        },
    )?;

    Ok(())
}