use crate::{
crypto,
file_chunk_pos::{FileChunkPosition, FileChunkPositions},
queries, utils,
v1::{
bool_from_int, bool_to_int, response_payload, Expire, FileChunkLocation, FileProperties, LocationNameMetadata,
PlainResponsePayload,
},
FilenSettings, SettingsBundle,
};
use secstr::SecUtf8;
use serde::{Deserialize, Serialize};
use snafu::{ensure, Backtrace, ResultExt, Snafu};
use std::{
cmp::{Eq, PartialEq},
convert::TryInto,
io::{BufReader, Read, Seek, SeekFrom},
};
use url::Url;
use uuid::Uuid;
type Result<T, E = Error> = std::result::Result<T, E>;
const FILE_CHUNK_SIZE: u32 = 1024 * 1024; const UPLOAD_PATH: &str = "/v1/upload";
const UPLOAD_DONE_PATH: &str = "/v1/upload/done";
const UPLOAD_STOP_PATH: &str = "/v1/upload/stop";
const USER_UNFINISHED_DELETE_PATH: &str = "/v1/user/unfinished/delete";
#[derive(Snafu, Debug)]
pub enum Error {
#[snafu(display("Caller provided invalid argument: {}", message))]
BadArgument { message: String, backtrace: Backtrace },
#[snafu(display(
"Chunk of size '{}' encryption failed, file key size was '{}' and file version was '{}'",
chunk_size,
file_key_size,
file_version,
))]
ChunkEncryptionError {
chunk_size: usize,
file_key_size: usize,
file_version: u32,
source: crypto::Error,
},
#[snafu(display("Filen did not accept at least one uploaded file chunk: {}", message))]
ChunkNotAccepted { message: String, backtrace: Backtrace },
#[snafu(display("Filen could not mark file upload as done: {}", message))]
CouldNotMarkDone { message: String, backtrace: Backtrace },
#[snafu(display(
"Not all uploaded chunks with status == true actually had data: {}",
file_upload_info
))]
ChunkUploadResponseMissingData { file_upload_info: Box<FileUploadInfo> },
#[snafu(display("Filen did not accept uploaded dummy chunk: {}", message))]
DummyChunkNotAccepted { message: String, backtrace: Backtrace },
#[snafu(display("File key be an alphanumeric string of 32 chars"))]
FileKeyShouldHave32Chars { source: std::array::TryFromSliceError },
#[snafu(display("Cannot read file chunks due to IO error: {}", source))]
SeekReadError { source: std::io::Error },
#[snafu(display("{} ({} bytes) query failed: {}", api_endpoint, chunk_size, source))]
UploadQueryFailed {
api_endpoint: String,
chunk_size: usize,
source: queries::Error,
},
#[snafu(display("{} query failed: {}", UPLOAD_DONE_PATH, source))]
UploadDoneQueryFailed { source: queries::Error },
#[snafu(display("{} query failed: {}", UPLOAD_STOP_PATH, source))]
UploadStopQueryFailed { source: queries::Error },
#[snafu(display("{} query failed: {}", USER_UNFINISHED_DELETE_PATH, source))]
UserUnfinishedDeleteQueryFailed { source: queries::Error },
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct UploadFileChunkResponseData {
pub bucket: String,
pub region: String,
#[serde(
rename = "expireSet",
deserialize_with = "bool_from_int",
serialize_with = "bool_to_int"
)]
pub expire_set: bool,
#[serde(rename = "expireTimestamp")]
pub expire_timestamp: u64,
#[serde(rename = "deleteTimestamp")]
pub delete_timestamp: u64,
}
response_payload!(
UploadFileChunkResponsePayload<UploadFileChunkResponseData>
);
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct UploadDoneRequestPayload<'upload_done> {
pub uuid: Uuid,
#[serde(rename = "uploadKey")]
pub upload_key: &'upload_done str,
}
utils::display_from_json_with_lifetime!('upload_done, UploadDoneRequestPayload);
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct UploadStopRequestPayload<'upload_stop> {
pub uuid: Uuid,
#[serde(rename = "uploadKey")]
pub upload_key: &'upload_stop str,
}
utils::display_from_json_with_lifetime!('upload_stop, UploadStopRequestPayload);
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FileUploadProperties {
pub uuid: Uuid,
pub name_metadata: String,
pub name_hashed: String,
pub size_metadata: String,
pub chunks: u32,
pub mime_metadata: String,
pub file_metadata: String,
pub file_key: SecUtf8,
pub rm: String,
pub upload_key: String,
pub expire: Expire,
pub parent_uuid: Uuid,
pub version: u32,
}
impl FileUploadProperties {
#[must_use]
pub fn from_file_properties(
file_properties: &FileProperties,
version: u32,
parent_folder_uuid: Uuid,
last_master_key: &SecUtf8,
) -> Self {
let rm = utils::random_alphanumeric_string(32);
let upload_key = utils::random_alphanumeric_string(32);
let file_metadata_encrypted = file_properties.to_metadata_string(last_master_key);
let name_metadata_encrypted = file_properties.name_encrypted();
let size_metadata_encrypted = file_properties.size_encrypted();
let mime_metadata_encrypted = file_properties.mime_encrypted();
let name_hashed = LocationNameMetadata::name_hashed(&file_properties.name);
let file_chunks = calculate_chunk_count(FILE_CHUNK_SIZE, file_properties.size);
Self {
uuid: Uuid::new_v4(),
name_metadata: name_metadata_encrypted,
name_hashed,
size_metadata: size_metadata_encrypted,
chunks: file_chunks,
mime_metadata: mime_metadata_encrypted,
file_metadata: file_metadata_encrypted,
file_key: file_properties.key.clone(),
rm,
upload_key,
expire: Expire::Never,
parent_uuid: parent_folder_uuid,
version,
}
}
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn to_query_params(&self, chunk_index: u32, api_key: &SecUtf8) -> String {
let query_builder = Url::parse_with_params(
"https://localhost?",
&[
("apiKey", api_key.unsecure()),
("uuid", &self.uuid.as_hyphenated().to_string()),
("name", &self.name_metadata),
("nameHashed", &self.name_hashed),
("size", &self.size_metadata),
("chunks", &self.chunks.to_string()),
("mime", &self.mime_metadata),
("index", &chunk_index.to_string()),
("rm", &self.rm),
("expire", &self.expire.to_string()),
("uploadKey", &self.upload_key),
("metaData", &self.file_metadata),
("parent", &self.parent_uuid.as_hyphenated().to_string()),
("version", &self.version.to_string()),
],
)
.unwrap();
query_builder.query().unwrap().to_owned()
}
#[must_use]
pub fn to_api_endpoint(&self, chunk_index: u32, api_key: &SecUtf8) -> String {
format!("{}?{}", UPLOAD_PATH, self.to_query_params(chunk_index, api_key))
}
}
utils::display_from_json!(FileUploadProperties);
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct FileUploadInfo {
pub properties: FileUploadProperties,
pub chunk_responses: Vec<UploadFileChunkResponsePayload>,
}
impl FileUploadInfo {
#[must_use]
pub fn new(upload_properties: FileUploadProperties, chunk_responses: Vec<UploadFileChunkResponsePayload>) -> Self {
Self {
properties: upload_properties,
chunk_responses,
}
}
pub fn get_file_chunk_locations(&self) -> Result<Vec<FileChunkLocation>> {
let chunk_datas = self
.chunk_responses
.iter()
.filter_map(|chunk_response| chunk_response.data.clone())
.enumerate();
let locations = chunk_datas
.map(|(index, data)| FileChunkLocation {
region: data.region,
bucket: data.bucket,
file_uuid: self.properties.uuid,
chunk_index: index as u32,
})
.collect::<Vec<FileChunkLocation>>();
if locations.len() == self.chunk_responses.len() {
Ok(locations)
} else {
ChunkUploadResponseMissingDataSnafu {
file_upload_info: self.clone(),
}
.fail()
}
}
}
utils::display_from_json!(FileUploadInfo);
pub fn upload_done_request(
payload: &UploadDoneRequestPayload,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api(UPLOAD_DONE_PATH, payload, filen_settings).context(UploadDoneQueryFailedSnafu {})
}
#[cfg(feature = "async")]
pub async fn upload_done_request_async(
payload: &UploadDoneRequestPayload<'_>,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api_async(UPLOAD_DONE_PATH, payload, filen_settings)
.await
.context(UploadDoneQueryFailedSnafu {})
}
pub fn upload_stop_request(
payload: &UploadStopRequestPayload,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api(UPLOAD_STOP_PATH, payload, filen_settings).context(UploadStopQueryFailedSnafu {})
}
#[cfg(feature = "async")]
pub async fn upload_stop_request_async(
payload: &UploadStopRequestPayload<'_>,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api_async(UPLOAD_STOP_PATH, payload, filen_settings)
.await
.context(UploadStopQueryFailedSnafu {})
}
pub fn encrypt_and_upload_chunk(
api_key: &SecUtf8,
chunk_index: u32,
chunk: &[u8],
upload_properties: &FileUploadProperties,
filen_settings: &FilenSettings,
) -> Result<UploadFileChunkResponsePayload> {
let file_key: &[u8; crypto::AES_CBC_KEY_LENGTH] = upload_properties
.file_key
.unsecure()
.as_bytes()
.try_into()
.context(FileKeyShouldHave32CharsSnafu {})?;
let chunk_encrypted =
crypto::encrypt_file_chunk(chunk, file_key, upload_properties.version).context(ChunkEncryptionSnafu {
chunk_size: chunk.len(),
file_key_size: file_key.len(),
file_version: upload_properties.version,
})?;
let chunk_size = chunk_encrypted.len();
let api_endpoint = upload_properties.to_api_endpoint(chunk_index, api_key);
queries::upload_to_filen::<UploadFileChunkResponsePayload>(
&api_endpoint,
chunk_encrypted.as_bytes(),
filen_settings,
)
.context(UploadQueryFailedSnafu {
api_endpoint,
chunk_size,
})
}
#[cfg(feature = "async")]
pub async fn encrypt_and_upload_chunk_async(
api_key: &SecUtf8,
chunk_index: u32,
chunk: &[u8],
upload_properties: &FileUploadProperties,
filen_settings: &FilenSettings,
) -> Result<UploadFileChunkResponsePayload> {
let file_key: &[u8; crypto::AES_CBC_KEY_LENGTH] = upload_properties
.file_key
.unsecure()
.as_bytes()
.try_into()
.context(FileKeyShouldHave32CharsSnafu {})?;
let chunk_encrypted =
crypto::encrypt_file_chunk(chunk, file_key, upload_properties.version).context(ChunkEncryptionSnafu {
chunk_size: chunk.len(),
file_key_size: file_key.len(),
file_version: upload_properties.version,
})?;
let chunk_size = chunk_encrypted.len();
let api_endpoint = upload_properties.to_api_endpoint(chunk_index, api_key);
queries::upload_to_filen_async::<UploadFileChunkResponsePayload>(
&api_endpoint,
chunk_encrypted.as_bytes(),
filen_settings,
)
.await
.context(UploadQueryFailedSnafu {
api_endpoint,
chunk_size,
})
}
pub fn user_unfinished_delete_request(
api_key: &SecUtf8,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api(
USER_UNFINISHED_DELETE_PATH,
&utils::api_key_json(api_key),
filen_settings,
)
.context(UserUnfinishedDeleteQueryFailedSnafu {})
}
#[cfg(feature = "async")]
pub async fn user_unfinished_delete_request_async(
api_key: &SecUtf8,
filen_settings: &FilenSettings,
) -> Result<PlainResponsePayload> {
queries::query_filen_api_async(
USER_UNFINISHED_DELETE_PATH,
&utils::api_key_json(api_key),
filen_settings,
)
.await
.context(UserUnfinishedDeleteQueryFailedSnafu {})
}
pub fn encrypt_and_upload_file<R: Read + Seek>(
api_key: &SecUtf8,
parent_uuid: Uuid,
file_properties: &FileProperties,
version: u32,
last_master_key: &SecUtf8,
reader: &mut BufReader<R>,
settings: &SettingsBundle,
) -> Result<FileUploadInfo> {
let upload_properties =
FileUploadProperties::from_file_properties(file_properties, version, parent_uuid, last_master_key);
let chunk_upload_responses = upload_chunks(
api_key,
FILE_CHUNK_SIZE,
file_properties.size,
&upload_properties,
reader,
settings,
)?;
let finalize_action = |chunk_upload_responses: Vec<UploadFileChunkResponsePayload>| {
send_dummy_chunk(
FILE_CHUNK_SIZE,
file_properties.size,
api_key,
&upload_properties,
settings,
)
.and_then(|dummy_chunk_response| {
if dummy_chunk_response.status {
let upload_done_payload = UploadDoneRequestPayload {
uuid: upload_properties.uuid,
upload_key: &upload_properties.upload_key,
};
let mark_done_response = settings
.retry
.call(|| upload_done_request(&upload_done_payload, &settings.filen))?;
if mark_done_response.status {
Ok(FileUploadInfo::new(upload_properties, chunk_upload_responses))
} else {
CouldNotMarkDoneSnafu {
message: format!("{:?}", mark_done_response.message),
}
.fail()
}
} else {
DummyChunkNotAcceptedSnafu {
message: dummy_chunk_response
.message
.unwrap_or_else(|| "unknown reason".to_owned()),
}
.fail()
}
})
};
utils::flatten_result(finalize_chunks_if_all_uploaded(chunk_upload_responses, finalize_action))
}
#[cfg(feature = "async")]
pub async fn encrypt_and_upload_file_async<R: Read + Seek + Send>(
api_key: &SecUtf8,
parent_uuid: Uuid,
file_properties: &FileProperties,
version: u32,
last_master_key: &SecUtf8,
reader: &mut BufReader<R>,
settings: &SettingsBundle,
) -> Result<FileUploadInfo> {
let upload_properties =
FileUploadProperties::from_file_properties(file_properties, version, parent_uuid, last_master_key);
let chunk_upload_responses = upload_chunks_async(
api_key,
FILE_CHUNK_SIZE,
file_properties.size,
&upload_properties,
reader,
settings,
)
.await?;
let finalize_action = |chunk_upload_responses: Vec<UploadFileChunkResponsePayload>| async {
let dummy_chunk_response = send_dummy_chunk_async(
FILE_CHUNK_SIZE,
file_properties.size,
api_key,
&upload_properties,
settings,
)
.await?;
if dummy_chunk_response.status {
let upload_done_payload = UploadDoneRequestPayload {
uuid: upload_properties.uuid,
upload_key: &upload_properties.upload_key,
};
let mark_done_response = settings
.retry
.call_async(|| upload_done_request_async(&upload_done_payload, &settings.filen))
.await?;
if mark_done_response.status {
Ok(FileUploadInfo::new(upload_properties, chunk_upload_responses))
} else {
CouldNotMarkDoneSnafu {
message: format!("{:?}", mark_done_response.message),
}
.fail()
}
} else {
DummyChunkNotAcceptedSnafu {
message: dummy_chunk_response
.message
.unwrap_or_else(|| "unknown reason".to_owned()),
}
.fail()
}
};
match finalize_chunks_if_all_uploaded(chunk_upload_responses, finalize_action) {
Ok(future_file_upload_info) => future_file_upload_info.await,
Err(f_err) => Err(f_err),
}
}
fn finalize_chunks_if_all_uploaded<F, FR>(
chunk_upload_responses: Vec<UploadFileChunkResponsePayload>,
finalize_action: F,
) -> Result<FR>
where
F: FnOnce(Vec<UploadFileChunkResponsePayload>) -> FR,
{
let maybe_failed_chunk = chunk_upload_responses.iter().find(|r| !r.status);
match maybe_failed_chunk {
Some(failed_chunk) => {
let failure_reason = failed_chunk.message.as_deref().unwrap_or("unknown reason");
ChunkNotAcceptedSnafu {
message: failure_reason.to_owned(),
}
.fail()
}
None => Ok(finalize_action(chunk_upload_responses)),
}
}
fn upload_chunks<R: Read + Seek>(
api_key: &SecUtf8,
file_chunk_size: u32,
file_size: u64,
upload_properties: &FileUploadProperties,
reader: &mut BufReader<R>,
settings: &SettingsBundle,
) -> Result<Vec<UploadFileChunkResponsePayload>> {
let chunk_processor = |chunk_pos: FileChunkPosition, chunk: Vec<u8>| {
settings
.retry
.call(|| encrypt_and_upload_chunk(api_key, chunk_pos.index, &chunk, upload_properties, &settings.filen))
};
read_into_chunks_and_process(file_chunk_size, file_size, reader, chunk_processor)
.flatten()
.collect()
}
#[cfg(feature = "async")]
async fn upload_chunks_async<R: Read + Seek + Send>(
api_key: &SecUtf8,
file_chunk_size: u32,
file_size: u64,
upload_properties: &FileUploadProperties,
reader: &mut BufReader<R>,
settings: &SettingsBundle,
) -> Result<Vec<UploadFileChunkResponsePayload>> {
let chunk_processor = |chunk_pos: FileChunkPosition, chunk: Vec<u8>| async move {
settings
.retry
.call_async(|| {
encrypt_and_upload_chunk_async(api_key, chunk_pos.index, &chunk, upload_properties, &settings.filen)
})
.await
};
let future_chunk_responses: Result<Vec<_>> =
read_into_chunks_and_process(file_chunk_size, file_size, reader, chunk_processor).collect();
futures::future::try_join_all(future_chunk_responses?).await
}
fn read_into_chunks_and_process<'reader, R, ProcType, ProcResult>(
file_chunk_size: u32,
file_size: u64,
reader: &'reader mut BufReader<R>,
chunk_processor: ProcType,
) -> impl Iterator<Item = Result<ProcResult>> + 'reader
where
R: Read + Seek,
ProcType: 'reader + Fn(FileChunkPosition, Vec<u8>) -> ProcResult,
{
let file_chunk_positions = FileChunkPositions::new(file_chunk_size, file_size);
file_chunk_positions.map(move |chunk_pos| {
let mut chunk_buf = vec![0_u8; chunk_pos.chunk_size as usize];
reader
.seek(SeekFrom::Start(chunk_pos.start_position))
.and_then(|_| reader.read_exact(&mut chunk_buf))
.context(SeekReadSnafu {})
.map(|_| chunk_processor(chunk_pos, chunk_buf))
})
}
fn send_dummy_chunk(
chunk_size: u32,
file_size: u64,
api_key: &SecUtf8,
upload_properties: &FileUploadProperties,
settings: &SettingsBundle,
) -> Result<UploadFileChunkResponsePayload> {
ensure!(
file_size > 0,
BadArgumentSnafu {
message: "file size should be > 0"
}
);
let last_index = ((file_size - 1) / chunk_size as u64) as u32;
let dummy_buf = vec![0_u8; 0];
settings
.retry
.call(|| encrypt_and_upload_chunk(api_key, last_index + 1, &dummy_buf, upload_properties, &settings.filen))
}
#[cfg(feature = "async")]
async fn send_dummy_chunk_async(
chunk_size: u32,
file_size: u64,
api_key: &SecUtf8,
upload_properties: &FileUploadProperties,
settings: &SettingsBundle,
) -> Result<UploadFileChunkResponsePayload> {
assert!(file_size != 0);
let last_index = ((file_size - 1) / chunk_size as u64) as u32;
let dummy_buf = vec![0_u8; 0];
settings
.retry
.call_async(|| {
encrypt_and_upload_chunk_async(api_key, last_index + 1, &dummy_buf, upload_properties, &settings.filen)
})
.await
}
const fn calculate_chunk_count(chunk_size: u32, file_size: u64) -> u32 {
let mut dummy_offset = 0_u64;
let mut file_chunks = 0_u32;
while dummy_offset < file_size {
file_chunks += 1;
dummy_offset += chunk_size as u64;
}
file_chunks
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use std::time::SystemTime;
#[test]
fn uploaded_file_properties_should_produce_query_string_with_expected_parts() {
let m_key = SecUtf8::from("b49cadfb92e1d7d54e9dd9d33ba9feb2af1f10ae");
let file_metadata = FileProperties::from_name_size_modified("test.txt", 128, &SystemTime::now()).unwrap();
let properties = FileUploadProperties::from_file_properties(&file_metadata, 1, Uuid::nil(), &m_key);
let query_params = properties.to_query_params(0, &SecUtf8::from("some api key"));
let query_params_2 = properties.to_query_params(0, &SecUtf8::from("some api key"));
assert_eq!(query_params, query_params_2);
assert!(query_params.contains("apiKey=some+api+key"));
assert!(query_params.contains("uuid="));
assert!(query_params.contains("name="));
assert!(query_params.contains("nameHashed=809a953250a3917a9993645d1ba146348a198fc2"));
assert!(query_params.contains("size="));
assert!(query_params.contains("chunks=1"));
assert!(query_params.contains("mime="));
assert!(query_params.contains("index=0"));
assert!(query_params.contains("rm="));
assert!(query_params.contains("expire=never"));
assert!(query_params.contains("uploadKey="));
assert!(query_params.contains("metaData="));
assert!(query_params.contains("parent=00000000-0000-0000-0000-000000000000"));
assert!(query_params.contains("version=1"));
}
}