use aws_sdk_lambda::{error::SdkError, operation::invoke::InvokeError, primitives::Blob};
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use thiserror::Error;
use tokio::time::sleep;
#[derive(Debug, Clone)]
pub struct OfficeConvertLambda {
client: aws_sdk_lambda::Client,
options: Arc<OfficeConvertLambdaOptions>,
}
impl OfficeConvertLambda {
pub fn new(client: aws_sdk_lambda::Client, options: OfficeConvertLambdaOptions) -> Self {
Self {
client,
options: Arc::new(options),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct OfficeConvertLambdaOptions {
pub function_name: String,
pub qualifier: Option<String>,
pub tenant_id: Option<String>,
pub retry_attempts: usize,
pub retry_wait: Duration,
}
impl Default for OfficeConvertLambdaOptions {
fn default() -> Self {
Self {
function_name: Default::default(),
qualifier: None,
tenant_id: None,
retry_attempts: 3,
retry_wait: Duration::from_secs(1),
}
}
}
#[derive(Serialize)]
pub struct ConvertRequest {
pub source_bucket: String,
pub source_key: String,
pub dest_bucket: String,
pub dest_key: String,
}
#[derive(Debug, Error)]
#[allow(clippy::large_enum_variant)]
pub enum ConvertError {
#[error(transparent)]
Serde(#[from] serde_json::Error),
#[error(transparent)]
Invoke(#[from] SdkError<InvokeError>),
#[error(transparent)]
Lambda(#[from] OfficeLambdaError),
}
impl ConvertError {
pub fn is_retry(&self) -> bool {
match self {
ConvertError::Serde(_) => false,
ConvertError::Invoke(_) => true,
ConvertError::Lambda(error) => matches!(
error.reason.as_str(),
"SETUP_TEMP_DIR_FAILED"
| "INITIALIZE_OFFICE"
| "SETUP_TEMP_FAILED"
| "RUN_OFFICE"
| "RESPONSE_ERROR"
),
}
}
}
impl OfficeConvertLambda {
pub async fn convert(&self, request: ConvertRequest) -> Result<(), ConvertError> {
let mut err: ConvertError = match self.convert_inner(&request).await {
Ok(_) => return Ok(()),
Err(error) => error,
};
for _ in 0..self.options.retry_attempts {
match self.convert_inner(&request).await {
Ok(_) => return Ok(()),
Err(error) => {
if !error.is_retry() {
return Err(error);
}
err = error;
}
}
sleep(self.options.retry_wait).await;
}
Err(err)
}
async fn convert_inner(&self, request: &ConvertRequest) -> Result<(), ConvertError> {
let request_bytes = serde_json::to_vec(request)?;
let output = self
.client
.invoke()
.payload(Blob::new(request_bytes))
.function_name(&self.options.function_name)
.set_qualifier(self.options.qualifier.clone())
.set_tenant_id(self.options.tenant_id.clone())
.send()
.await?;
if let Some(function_error) = output.function_error {
let lambda_error: OfficeLambdaError = serde_json::from_str(&function_error)?;
return Err(ConvertError::Lambda(lambda_error));
}
Ok(())
}
}
#[derive(Error, Debug, Deserialize)]
#[error("{message} ({reason})")]
pub struct OfficeLambdaError {
pub reason: String,
pub message: String,
}