#![allow(clippy::result_large_err)]
use super::ClientExtraDiskExt;
use crate::Client;
impl ClientExtraDiskExt for Client {
fn disk_import(&self) -> builder::DiskImport<'_> {
builder::DiskImport::new(self)
}
}
pub mod builder {
use crate::extras::disk::types::{DiskImportError, DiskImportHandle, DiskInfo, ImageInfo};
use crate::types::{Name, NameOrId};
use crate::{Client, Error};
use std::future::Future;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicBool;
use tokio::sync::{oneshot, watch};
pub struct DiskImport<'a> {
client: &'a Client,
project: Result<NameOrId, String>,
description: Result<String, String>,
upload_task_ct: Result<NonZeroUsize, String>,
disk: Result<Name, String>,
disk_info: Result<DiskInfo, String>,
image_info: Result<Option<ImageInfo>, String>,
}
impl<'a> DiskImport<'a> {
pub fn new(client: &'a Client) -> Self {
Self {
client,
project: Err("project was not initialized".to_string()),
description: Err("description was not initialized".to_string()),
upload_task_ct: Err("upload_task_ct was not initialized".to_string()),
disk: Err("disk was not initialized".to_string()),
disk_info: Err("disk_info was not initialized".to_string()),
image_info: Ok(None),
}
}
pub fn project<V>(mut self, value: V) -> Self
where
V: std::convert::TryInto<NameOrId>,
{
self.project = value
.try_into()
.map_err(|_| "conversion to `NameOrId` for project failed".to_string());
self
}
pub fn description<V>(mut self, value: V) -> Self
where
V: std::convert::TryInto<String>,
{
self.description = value
.try_into()
.map_err(|_| "conversion to `String` for description failed".to_string());
self
}
pub fn upload_task_ct<V>(mut self, value: V) -> Self
where
V: std::convert::TryInto<NonZeroUsize>,
{
self.upload_task_ct = value.try_into().map_err(|_| {
"conversion to `non-zero usize` for upload_task_ct failed".to_string()
});
self
}
pub fn disk<V>(mut self, value: V) -> Self
where
V: std::convert::TryInto<Name>,
{
self.disk = value
.try_into()
.map_err(|_| "conversion to `Name` for disk failed".to_string());
self
}
pub fn disk_info(mut self, value: DiskInfo) -> Self {
self.disk_info = Ok(value);
self
}
pub fn image_info(mut self, value: ImageInfo) -> Self {
self.image_info = Ok(Some(value));
self
}
pub fn execute(
self,
) -> Result<
impl Future<Output = Result<(), DiskImportError>> + 'a,
Error<crate::types::Error>,
> {
let (progress_tx, _progress_rx) = watch::channel(0);
let importer = super::types::DiskImport::try_from((self, progress_tx))?;
Ok(importer.run())
}
pub fn execute_with_control(
self,
) -> Result<
(
impl Future<Output = Result<(), DiskImportError>> + 'a,
DiskImportHandle,
),
Error<crate::types::Error>,
> {
let (progress_tx, progress_rx) = watch::channel(0);
let (cancel_tx, cancel_rx) = oneshot::channel();
let importer = super::types::DiskImport::try_from((self, progress_tx))?;
let handle = DiskImportHandle {
progress_rx,
cancel_tx,
};
Ok((importer.run_with_cancel(cancel_rx), handle))
}
}
impl<'a> TryFrom<(DiskImport<'a>, watch::Sender<u64>)> for super::types::DiskImport<'a> {
type Error = Error<crate::types::Error>;
fn try_from(input: (DiskImport<'a>, watch::Sender<u64>)) -> Result<Self, Self::Error> {
let (builder, progress_tx) = input;
let project = builder.project.map_err(Error::InvalidRequest)?;
let description = builder.description.map_err(Error::InvalidRequest)?;
let upload_task_ct = builder.upload_task_ct.map_err(Error::InvalidRequest)?;
let disk = builder.disk.map_err(Error::InvalidRequest)?;
let disk_info = builder.disk_info.map_err(Error::InvalidRequest)?;
let image_info = builder.image_info.map_err(Error::InvalidRequest)?;
Ok(Self {
client: builder.client,
project,
description,
upload_task_ct,
disk,
disk_info,
image_info,
progress_tx,
cleanup_started: AtomicBool::new(false),
})
}
}
}
pub mod types {
use crate::types::{
self, BlockSize, ByteCount, DiskBackend, DiskCreate, DiskSource, DiskState, FinalizeDisk,
ImageCreate, ImageSource, ImportBlocksBulkWrite, Name, NameOrId,
};
use crate::{
Client, ClientDisksExt, ClientImagesExt, ClientSnapshotsExt, Error, ResponseValue,
};
use base64::Engine;
use reqwest::StatusCode;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::fs::File;
use tokio::io::{self, AsyncReadExt};
use tokio::sync::{oneshot, watch};
const CHUNK_SIZE: u64 = 512 * 1024;
#[derive(thiserror::Error, Debug)]
pub enum DiskImportError {
#[error("{0}")]
Validation(String),
#[error("{err}")]
Wrapped {
err: Box<dyn std::error::Error + Send + Sync>,
source: Box<DiskImportError>,
},
#[error("{0}")]
Other(Box<dyn std::error::Error + Send + Sync>),
#[error(transparent)]
Api(#[from] crate::Error<types::Error>),
#[error(transparent)]
Conversion(#[from] types::error::ConversionError),
#[error(transparent)]
Io(#[from] io::Error),
}
impl DiskImportError {
pub fn context(
err: impl Into<Box<dyn std::error::Error + Send + Sync>>,
source: impl Into<DiskImportError>,
) -> Self {
DiskImportError::Wrapped {
err: err.into(),
source: Box::new(source.into()),
}
}
pub fn other(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Self {
DiskImportError::Other(err.into())
}
}
#[derive(Debug)]
pub struct DiskImportHandle {
pub(super) progress_rx: watch::Receiver<u64>,
pub(super) cancel_tx: oneshot::Sender<()>,
}
impl DiskImportHandle {
pub fn progress(&self) -> watch::Receiver<u64> {
self.progress_rx.clone()
}
pub fn cancel(self) {
let _ = self.cancel_tx.send(());
}
}
#[derive(Debug)]
pub(super) struct DiskImport<'a> {
pub client: &'a Client,
pub project: NameOrId,
pub description: String,
pub upload_task_ct: NonZeroUsize,
pub disk: Name,
pub disk_info: DiskInfo,
pub image_info: Option<ImageInfo>,
pub progress_tx: watch::Sender<u64>,
pub cleanup_started: AtomicBool,
}
impl DiskImport<'_> {
pub async fn run_with_cancel(
self,
cancel_rx: oneshot::Receiver<()>,
) -> Result<(), DiskImportError> {
self.check_for_existing_disk().await?;
let result = tokio::select! {
biased; _ = cancel_rx => {
Err(DiskImportError::other("Disk import canceled"))
}
result = self.do_disk_import() => result,
};
if let Err(e) = result {
if let Err(cleanup_err) = self.cleanup().await {
Err(DiskImportError::Wrapped {
err: cleanup_err.into(),
source: e.into(),
})
} else {
Err(e)
}
} else {
Ok(())
}
}
pub async fn run(self) -> Result<(), DiskImportError> {
self.check_for_existing_disk().await?;
if let Err(e) = self.do_disk_import().await {
if let Err(cleanup_err) = self.cleanup().await {
Err(DiskImportError::Wrapped {
err: cleanup_err.into(),
source: e.into(),
})
} else {
Err(e)
}
} else {
Ok(())
}
}
async fn do_disk_import(&self) -> Result<(), DiskImportError> {
self.client
.disk_create()
.project(self.project.clone())
.body(DiskCreate {
name: self.disk.clone(),
description: self.description.clone(),
disk_backend: DiskBackend::Distributed(DiskSource::ImportingBlocks {
block_size: self.disk_info.disk_block_size.clone(),
}),
size: self.disk_info.disk_size.clone(),
})
.send()
.await
.map_err(|e| DiskImportError::context("creating the disk failed", e))?;
self.client
.disk_bulk_write_import_start()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await
.map_err(|e| {
DiskImportError::context("starting the build write process failed", e)
})?;
let mut handles: Vec<tokio::task::JoinHandle<Result<(), DiskImportError>>> =
Vec::with_capacity(self.upload_task_ct.get());
let (tx, rx) = flume::bounded(64);
let (failed_tx, failed_rx) = flume::bounded(self.upload_task_ct.get());
let (resubmit_tx, resubmit_rx) = flume::bounded(self.upload_task_ct.get());
for _ in 0..self.upload_task_ct.get() {
let mut worker = UploadWorker {
client: self.client.clone(),
disk: self.disk.clone(),
project: self.project.clone(),
progress_tx: self.progress_tx.clone(),
};
let rx = rx.clone();
let failed_tx = failed_tx.clone();
let resubmit_rx = resubmit_rx.clone();
handles.push(tokio::spawn(async move {
while let Ok(chunk) = rx.recv_async().await {
if let Err(e) = worker.upload_chunk(&chunk).await {
let _ = failed_tx.send_async(chunk).await;
return Err(e);
}
}
drop(failed_tx);
while let Ok(chunk) = resubmit_rx.recv_async().await {
worker.upload_chunk(&chunk).await?;
}
Ok(())
}));
}
drop(failed_tx);
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
let mut file = File::open(&self.disk_info.file_path).await?;
let mut offset = 0;
let read_result: Result<(), DiskImportError> = loop {
let n = match (&mut file).take(CHUNK_SIZE).read_to_end(&mut buf).await {
Ok(n) => n,
Err(e) => {
return Err(DiskImportError::context(
format!("reading from {} failed", self.disk_info.file_path.display()),
e,
));
}
};
if n == 0 {
break Ok(());
}
let data = &buf[..n];
if !data.iter().all(|x| *x == 0) {
if tx
.send_async(Chunk {
offset,
data: data.to_vec(),
})
.await
.is_err()
{
break Ok(());
}
} else {
self.progress_tx.send_modify(|offset| *offset += n as u64);
}
offset += n as u64;
buf.clear();
};
drop(tx);
while let Ok(failed_chunk) = failed_rx.recv_async().await {
if resubmit_tx.send_async(failed_chunk).await.is_err() {
break;
}
}
drop(resubmit_tx);
let mut errors = Vec::new();
if let Err(e) = read_result {
errors.push(e);
}
for handle in handles {
let result = handle.await.map_err(DiskImportError::other)?;
if let Err(err) = result {
errors.push(err);
}
}
if errors.len() == self.upload_task_ct.get() {
let mut err_set = HashSet::new();
for err in errors {
err_set.insert(format!("\n * {err}"));
}
let mut msg = match err_set.len() {
1 => String::from("Error while uploading the disk image:"),
2.. => String::from("Errors while uploading the disk image:"),
0 => unreachable!("error count was zero"),
};
for err in err_set {
msg += &err;
}
return Err(DiskImportError::Other(msg.into()));
}
self.client
.disk_bulk_write_import_stop()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await
.map_err(|e| {
DiskImportError::context("stopping the bulk write process failed", e)
})?;
self.client
.disk_finalize_import()
.project(self.project.clone())
.disk(self.disk.clone())
.body(FinalizeDisk {
snapshot_name: self.image_info.as_ref().map(|ii| ii.snapshot.clone()),
})
.send()
.await
.map_err(|e| DiskImportError::context("finalizing the disk failed", e))?;
if self.image_info.is_some() {
self.create_image().await?;
}
Ok(())
}
async fn create_image(&self) -> Result<(), DiskImportError> {
let Some(image_info) = &self.image_info else {
return Err(DiskImportError::other("no snapshot provided"));
};
let snapshot_id = self
.client
.snapshot_view()
.project(self.project.clone())
.snapshot(image_info.snapshot.clone())
.send()
.await
.map_err(|e| DiskImportError::context("failed to fetch snapshot", e))?;
self.client
.image_create()
.project(self.project.clone())
.body(ImageCreate {
name: image_info.image.clone(),
description: image_info.image_description.clone(),
os: image_info.image_os.clone(),
version: image_info.image_version.clone(),
source: ImageSource::Snapshot(snapshot_id.id),
})
.send()
.await
.map_err(|e| DiskImportError::context("failed to create image", e))?;
Ok(())
}
async fn cleanup(&self) -> Result<(), DiskImportError> {
if self
.cleanup_started
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Ok(());
}
let disk_state = match self.wait_for_disk().await {
Ok(state) => state,
Err(e) => match e {
Error::ErrorResponse(rv) if rv.status() == 404 => return Ok(()),
_ => Err(e)?,
},
};
if !matches!(
disk_state,
DiskState::ImportReady
| DiskState::ImportingFromBulkWrites
| DiskState::ImportingFromUrl
| DiskState::Finalizing
) {
return Ok(());
}
if matches!(disk_state, DiskState::ImportingFromBulkWrites) {
self.unwind_disk_bulk_write_stop().await?;
}
self.unwind_disk_finalize().await?;
self.unwind_disk_delete().await
}
async fn check_for_existing_disk(&self) -> Result<(), DiskImportError> {
err_if_object_exists(
format!("disk \"{}\" exists already", &*self.disk),
self.client
.disk_view()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await,
)?;
if let Some(image_info) = &self.image_info {
err_if_object_exists(
format!("snapshot \"{}\" exists already", &*image_info.snapshot),
self.client
.snapshot_view()
.project(self.project.clone())
.snapshot(image_info.snapshot.clone())
.send()
.await,
)?;
err_if_object_exists(
format!("image \"{}\" exists already", &*image_info.image),
self.client
.image_view()
.project(self.project.clone())
.image(image_info.image.clone())
.send()
.await,
)?;
}
Ok(())
}
async fn get_disk_state(&self) -> Result<DiskState, Error<types::Error>> {
let response = self
.client
.disk_view()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await?;
Ok(response.into_inner().state)
}
async fn wait_for_disk(&self) -> Result<DiskState, Error<types::Error>> {
const RETRY_CT: usize = 10;
const RETRY_DELAY: Duration = Duration::from_millis(500);
let mut disk_state = self.get_disk_state().await?;
for _ in 0..RETRY_CT {
if !matches!(disk_state, DiskState::Creating) {
return Ok(disk_state);
}
tokio::time::sleep(RETRY_DELAY).await;
disk_state = self.get_disk_state().await?;
}
Err(Error::InvalidRequest(
"disk remained in \"Creating\" state for more than 5 seconds".to_string(),
))
}
async fn unwind_disk_delete(&self) -> Result<(), DiskImportError> {
self.client
.disk_delete()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await
.map_err(|e| {
DiskImportError::context(
format!("trying to unwind, deleting \"{}\" failed", &*self.disk),
e,
)
})?;
Ok(())
}
async fn unwind_disk_finalize(&self) -> Result<(), DiskImportError> {
self.client
.disk_finalize_import()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await
.map_err(|e| {
DiskImportError::context(
format!("trying to unwind, finalizing \"{}\" failed", &*self.disk),
e,
)
})?;
Ok(())
}
async fn unwind_disk_bulk_write_stop(&self) -> Result<(), DiskImportError> {
self.client
.disk_bulk_write_import_stop()
.project(self.project.clone())
.disk(self.disk.clone())
.send()
.await
.map_err(|e| {
DiskImportError::context(
format!(
"trying to unwind, stopping the bulk write process for \"{}\"",
&*self.disk
),
e,
)
})?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct DiskInfo {
pub file_path: PathBuf,
pub file_size: u64,
pub disk_size: ByteCount,
pub disk_block_size: BlockSize,
}
impl DiskInfo {
pub fn calculate(
file_path: PathBuf,
requested_disk_size: Option<&ByteCount>,
requested_disk_block_size: Option<&BlockSize>,
) -> Result<DiskInfo, DiskImportError> {
if !Path::new(&file_path).exists() {
return Err(DiskImportError::other(format!(
"path {} does not exist",
file_path.display()
)));
}
let file_size = std::fs::metadata(&file_path)?.len();
let disk_size = Self::get_disk_size(file_size, requested_disk_size.map(|x| **x)).into();
let disk_block_size = match requested_disk_block_size {
Some(v) => v.clone(),
None => BlockSize::try_from(512)?,
};
if (file_size % *disk_block_size as u64) != 0 {
return Err(DiskImportError::other(format!(
"file size {file_size} is not divisible by block size {}!",
*disk_block_size
)));
}
Ok(DiskInfo {
file_path,
file_size,
disk_size,
disk_block_size,
})
}
fn get_disk_size(file_size: u64, size: Option<u64>) -> u64 {
const ONE_GB: u64 = 1024 * 1024 * 1024;
let disk_size = if let Some(size) = size {
size
} else {
file_size
};
if disk_size % ONE_GB != 0 {
let rounded_down_gb: u64 = disk_size - disk_size % ONE_GB;
assert_eq!(rounded_down_gb % ONE_GB, 0);
rounded_down_gb + ONE_GB
} else {
disk_size
}
}
}
fn err_if_object_exists<T>(
error_msg: String,
send_response: Result<ResponseValue<T>, Error<types::Error>>,
) -> Result<(), DiskImportError> {
match send_response {
Ok(_) => Err(DiskImportError::Validation(error_msg)),
Err(crate::Error::ErrorResponse(response_value))
if response_value.status() == StatusCode::NOT_FOUND =>
{
Ok(())
}
Err(e) => Err(DiskImportError::Api(e)),
}
}
#[derive(Clone, Debug)]
pub struct ImageInfo {
pub snapshot: Name,
pub image: Name,
pub image_description: String,
pub image_os: String,
pub image_version: String,
}
struct Chunk {
offset: u64,
data: Vec<u8>,
}
struct UploadWorker {
client: Client,
disk: Name,
project: NameOrId,
progress_tx: watch::Sender<u64>,
}
impl UploadWorker {
async fn upload_chunk(&mut self, chunk: &Chunk) -> Result<(), DiskImportError> {
let base64_encoded_data = base64::engine::general_purpose::STANDARD.encode(&chunk.data);
self.client
.disk_bulk_write_import()
.disk(&*self.disk)
.project(self.project.clone())
.body(ImportBlocksBulkWrite {
offset: chunk.offset,
base64_encoded_data,
})
.send()
.await?;
self.progress_tx
.send_modify(|offset| *offset += chunk.data.len() as u64);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_disk_size() {
let file_size = 1;
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(1)),
1024 * 1024 * 1024,
);
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(1024 * 1024 * 1024 - 1)),
1024 * 1024 * 1024,
);
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(1024 * 1024 * 1024)),
1024 * 1024 * 1024,
);
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(2 * 1024 * 1024 * 1024)),
2 * 1024 * 1024 * 1024,
);
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(2 * 1024 * 1024 * 1024 + 1)),
3 * 1024 * 1024 * 1024,
);
assert_eq!(
DiskInfo::get_disk_size(file_size, Some(3 * 1024 * 1024 * 1024 - 1)),
3 * 1024 * 1024 * 1024,
);
}
}
}