feattle_sync/
aws_sdk_s3.rs

1use async_trait::async_trait;
2use aws_sdk_s3::operation::get_object::GetObjectError;
3use aws_sdk_s3::primitives::ByteStream;
4use aws_sdk_s3::Client;
5use aws_types::SdkConfig;
6use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
7use feattle_core::BoxError;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10use std::fmt;
11
12/// Persist the data in an [AWS S3](https://aws.amazon.com/s3/) bucket.
13///
14/// To use it, make sure to activate the cargo feature `"aws_sdk_s3"` in your `Cargo.toml`.
15///
16/// # Example
17/// ```
18/// use std::sync::Arc;
19/// use std::time::Duration;
20/// use feattle_core::{feattles, Feattles};
21/// use feattle_sync::S3;
22///
23/// feattles! {
24///     struct MyToggles {
25///         a: bool,
26///     }
27/// }
28///
29/// #[tokio::main]
30/// async fn main() {
31///     // Create an AWS config, read more at the official documentation <https://docs.aws.amazon.com/sdk-for-rust/latest/dg/welcome.html>
32///     use feattle_sync::S3;
33///     let config = aws_config::load_from_env().await;
34///    
35///     let persistence = Arc::new(S3::new(
36///         &config,
37///         "my-bucket".to_owned(),
38///         "some/s3/prefix/".to_owned(),
39///     ));
40///     let my_toggles = MyToggles::new(persistence);
41/// }
42/// ```
43#[derive(Clone)]
44pub struct S3 {
45    client: Client,
46    bucket: String,
47    prefix: String,
48}
49
50impl fmt::Debug for S3 {
51    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52        f.debug_struct("S3")
53            .field("client", &"S3Client")
54            .field("bucket", &self.bucket)
55            .field("prefix", &self.prefix)
56            .finish()
57    }
58}
59
60impl S3 {
61    pub fn new(config: &SdkConfig, bucket: String, prefix: String) -> Self {
62        S3 {
63            client: Client::new(config),
64            bucket,
65            prefix,
66        }
67    }
68
69    async fn save<T: Serialize>(&self, name: &str, value: T) -> Result<(), BoxError> {
70        let key = format!("{}{}", self.prefix, name);
71        let contents = serde_json::to_vec(&value)?;
72        self.client
73            .put_object()
74            .bucket(self.bucket.clone())
75            .key(key)
76            .body(ByteStream::from(contents))
77            .send()
78            .await?;
79
80        Ok(())
81    }
82
83    async fn load<T: DeserializeOwned>(&self, name: &str) -> Result<Option<T>, BoxError> {
84        let key = format!("{}{}", self.prefix, name);
85        let get_object = self
86            .client
87            .get_object()
88            .bucket(self.bucket.clone())
89            .key(key)
90            .send()
91            .await
92            .map_err(|x| x.into_service_error());
93        match get_object {
94            Err(GetObjectError::NoSuchKey(_)) => Ok(None),
95            Ok(response) => {
96                let contents = response.body.collect().await?.to_vec();
97                Ok(Some(serde_json::from_slice(&contents)?))
98            }
99            Err(error) => Err(error.into()),
100        }
101    }
102}
103
104#[async_trait]
105impl Persist for S3 {
106    async fn save_current(&self, value: &CurrentValues) -> Result<(), BoxError> {
107        self.save("current.json", value).await
108    }
109
110    async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
111        self.load("current.json").await
112    }
113
114    async fn save_history(&self, key: &str, value: &ValueHistory) -> Result<(), BoxError> {
115        self.save(&format!("history-{}.json", key), value).await
116    }
117
118    async fn load_history(&self, key: &str) -> Result<Option<ValueHistory>, BoxError> {
119        self.load(&format!("history-{}.json", key)).await
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::tests::test_persistence;
127    use aws_sdk_s3::types::{Delete, ObjectIdentifier};
128
129    #[tokio::test]
130    async fn s3() {
131        use std::env;
132
133        dotenv::dotenv().ok();
134
135        // Please set the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
136        // AWS_REGION, S3_BUCKET and S3_KEY_PREFIX accordingly
137        let config = aws_config::load_from_env().await;
138        let bucket = env::var("S3_BUCKET").unwrap();
139        let prefix = format!("{}/aws-sdk-s3", env::var("S3_KEY_PREFIX").unwrap());
140        let client = Client::new(&config);
141
142        // Clear all previous objects
143        let objects_to_delete = client
144            .list_objects_v2()
145            .bucket(&bucket)
146            .prefix(&prefix)
147            .send()
148            .await
149            .unwrap()
150            .contents
151            .unwrap_or_default();
152        let keys_to_delete: Vec<_> = objects_to_delete
153            .into_iter()
154            .filter_map(|o| o.key)
155            .collect();
156
157        if !keys_to_delete.is_empty() {
158            println!(
159                "Will first clear previous objects in S3: {:?}",
160                keys_to_delete
161            );
162
163            let mut delete_builder = Delete::builder();
164            for key in keys_to_delete {
165                delete_builder =
166                    delete_builder.objects(ObjectIdentifier::builder().key(key).build().unwrap());
167            }
168
169            client
170                .delete_objects()
171                .bucket(&bucket)
172                .delete(delete_builder.build().unwrap())
173                .send()
174                .await
175                .unwrap();
176        }
177
178        test_persistence(S3::new(&config, bucket, prefix)).await;
179    }
180}