office_convert_lambda_client/
lib.rs

1use aws_sdk_lambda::{error::SdkError, operation::invoke::InvokeError, primitives::Blob};
2use serde::{Deserialize, Serialize};
3use std::{sync::Arc, time::Duration};
4use thiserror::Error;
5use tokio::time::sleep;
6
7#[derive(Debug, Clone)]
8pub struct OfficeConvertLambda {
9    client: aws_sdk_lambda::Client,
10    options: Arc<OfficeConvertLambdaOptions>,
11}
12
13impl OfficeConvertLambda {
14    pub fn new(client: aws_sdk_lambda::Client, options: OfficeConvertLambdaOptions) -> Self {
15        Self {
16            client,
17            options: Arc::new(options),
18        }
19    }
20}
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
23pub struct OfficeConvertLambdaOptions {
24    /// The name or ARN of the Lambda function, version, or alias.
25    pub function_name: String,
26    /// Specify a version or alias to invoke a published version of the function.
27    pub qualifier: Option<String>,
28    /// The identifier of the tenant in a multi-tenant Lambda function.
29    pub tenant_id: Option<String>,
30    /// Number of retry attempts to perform
31    pub retry_attempts: usize,
32    /// Time to wait between retry attempts
33    pub retry_wait: Duration,
34}
35
36impl Default for OfficeConvertLambdaOptions {
37    fn default() -> Self {
38        Self {
39            function_name: Default::default(),
40            qualifier: None,
41            tenant_id: None,
42            retry_attempts: 3,
43            retry_wait: Duration::from_secs(1),
44        }
45    }
46}
47
48#[derive(Serialize)]
49pub struct ConvertRequest {
50    /// Bucket the input source file is within
51    pub source_bucket: String,
52    /// Key within the source bucket for the source file
53    pub source_key: String,
54    /// Bucket to store the output file
55    pub dest_bucket: String,
56    /// Key within the `dest_bucket` for the output file
57    pub dest_key: String,
58}
59
60/// Error that could occur when requesting a conversion
61#[derive(Debug, Error)]
62#[allow(clippy::large_enum_variant)]
63pub enum ConvertError {
64    /// Failed to serialize the request or deserialize the response
65    #[error(transparent)]
66    Serde(#[from] serde_json::Error),
67
68    /// Failed to convert the file
69    #[error(transparent)]
70    Invoke(#[from] SdkError<InvokeError>),
71
72    /// Error from the lambda itself
73    #[error(transparent)]
74    Lambda(#[from] OfficeLambdaError),
75}
76
77impl ConvertError {
78    pub fn is_retry(&self) -> bool {
79        match self {
80            // Cannot retry serde failures
81            ConvertError::Serde(_) => false,
82            // Should retry lambda invoke errors
83            ConvertError::Invoke(_) => true,
84            // Error reasons that can be retried
85            ConvertError::Lambda(error) => matches!(
86                error.reason.as_str(),
87                "SETUP_TEMP_DIR_FAILED"
88                    | "INITIALIZE_OFFICE"
89                    | "SETUP_TEMP_FAILED"
90                    | "RUN_OFFICE"
91                    | "RESPONSE_ERROR"
92            ),
93        }
94    }
95}
96
97impl OfficeConvertLambda {
98    /// Perform an invoke of the convert lambda using a [ConvertRequest] based
99    /// on files already present in S3
100    ///
101    /// Will attempt to retry on failure based on the current options
102    pub async fn convert(&self, request: ConvertRequest) -> Result<(), ConvertError> {
103        // Perform initial request
104        let mut err: ConvertError = match self.convert_inner(&request).await {
105            Ok(_) => return Ok(()),
106            Err(error) => error,
107        };
108
109        // Perform retry attempts
110        for _ in 0..self.options.retry_attempts {
111            match self.convert_inner(&request).await {
112                Ok(_) => return Ok(()),
113                Err(error) => {
114                    if !error.is_retry() {
115                        return Err(error);
116                    }
117
118                    err = error;
119                }
120            }
121
122            sleep(self.options.retry_wait).await;
123        }
124
125        Err(err)
126    }
127
128    /// Perform an invoke of the convert lambda using a [ConvertRequest] based
129    /// on files already present in S3
130    async fn convert_inner(&self, request: &ConvertRequest) -> Result<(), ConvertError> {
131        let request_bytes = serde_json::to_vec(request)?;
132
133        let output = self
134            .client
135            .invoke()
136            .payload(Blob::new(request_bytes))
137            .function_name(&self.options.function_name)
138            .set_qualifier(self.options.qualifier.clone())
139            .set_tenant_id(self.options.tenant_id.clone())
140            .send()
141            .await?;
142
143        if let Some(function_error) = output.function_error {
144            let lambda_error: OfficeLambdaError = serde_json::from_str(&function_error)?;
145            return Err(ConvertError::Lambda(lambda_error));
146        }
147
148        Ok(())
149    }
150}
151
152/// Error returned by the lambda
153#[derive(Error, Debug, Deserialize)]
154#[error("{message} ({reason})")]
155pub struct OfficeLambdaError {
156    pub reason: String,
157    pub message: String,
158}