feattle_sync/
rusoto_s3.rs

1use async_trait::async_trait;
2use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
3use feattle_core::BoxError;
4use rusoto_core::RusotoError;
5use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3};
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::fmt;
9use std::time::Duration;
10use tokio::io::AsyncReadExt;
11use tokio::time;
12
13/// Persist the data in an [AWS S3](https://aws.amazon.com/s3/) bucket.
14///
15/// To use it, make sure to activate the cargo feature `"rusoto_s3"` in your `Cargo.toml`.
16///
17/// # Example
18/// ```
19/// use std::sync::Arc;
20/// use std::time::Duration;
21/// use feattle_core::{feattles, Feattles};
22/// use feattle_sync::RusotoS3;
23/// use rusoto_s3::S3Client;
24/// use rusoto_core::Region;
25///
26/// feattles! {
27///     struct MyToggles {
28///         a: bool,
29///     }
30/// }
31///
32/// // Create a S3 client, read more at the official documentation https://www.rusoto.org
33/// let s3_client = S3Client::new(Region::default());
34///
35/// let timeout = Duration::from_secs(10);
36/// let persistence = Arc::new(RusotoS3::new(
37///     s3_client,
38///     "my-bucket".to_owned(),
39///     "some/s3/prefix/".to_owned(),
40///     timeout,
41/// ));
42/// let my_toggles = MyToggles::new(persistence);
43/// ```
44#[derive(Clone)]
45pub struct RusotoS3 {
46    client: S3Client,
47    bucket: String,
48    prefix: String,
49    timeout: Duration,
50}
51
52impl fmt::Debug for RusotoS3 {
53    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54        f.debug_struct("S3")
55            .field("client", &"S3Client")
56            .field("bucket", &self.bucket)
57            .field("prefix", &self.prefix)
58            .finish()
59    }
60}
61
62impl RusotoS3 {
63    pub fn new(client: S3Client, bucket: String, prefix: String, timeout: Duration) -> Self {
64        RusotoS3 {
65            client,
66            bucket,
67            prefix,
68            timeout,
69        }
70    }
71
72    async fn save<T: Serialize>(&self, name: &str, value: T) -> Result<(), BoxError> {
73        let key = format!("{}{}", self.prefix, name);
74        let contents = serde_json::to_string(&value)?;
75        let put_future = self.client.put_object(PutObjectRequest {
76            body: Some(contents.into_bytes().into()),
77            bucket: self.bucket.clone(),
78            key,
79            ..Default::default()
80        });
81        time::timeout(self.timeout, put_future).await??;
82
83        Ok(())
84    }
85
86    async fn load<T: DeserializeOwned>(&self, name: &str) -> Result<Option<T>, BoxError> {
87        let key = format!("{}{}", self.prefix, name);
88        let get_future = self.client.get_object(GetObjectRequest {
89            bucket: self.bucket.clone(),
90            key,
91            ..Default::default()
92        });
93        match time::timeout(self.timeout, get_future).await? {
94            Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Ok(None),
95            Ok(response) => match response.body {
96                None => Ok(None),
97                Some(body) => {
98                    let mut contents = String::new();
99                    body.into_async_read().read_to_string(&mut contents).await?;
100                    Ok(Some(serde_json::from_str(&contents)?))
101                }
102            },
103            Err(error) => Err(error.into()),
104        }
105    }
106}
107
108#[async_trait]
109impl Persist for RusotoS3 {
110    async fn save_current(&self, value: &CurrentValues) -> Result<(), BoxError> {
111        self.save("current.json", value).await
112    }
113
114    async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
115        self.load("current.json").await
116    }
117
118    async fn save_history(&self, key: &str, value: &ValueHistory) -> Result<(), BoxError> {
119        self.save(&format!("history-{}.json", key), value).await
120    }
121
122    async fn load_history(&self, key: &str) -> Result<Option<ValueHistory>, BoxError> {
123        self.load(&format!("history-{}.json", key)).await
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use super::*;
130    use crate::tests::test_persistence;
131
132    #[tokio::test]
133    async fn s3() {
134        use rusoto_core::Region;
135        use rusoto_s3::{
136            Delete, DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3,
137        };
138        use std::env;
139
140        dotenv::dotenv().ok();
141
142        // Please set the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY,
143        // AWS_REGION, S3_BUCKET and S3_KEY_PREFIX accordingly
144        let client = S3Client::new(Region::default());
145        let bucket = env::var("S3_BUCKET").unwrap();
146        let prefix = format!("{}/rusoto-s3", env::var("S3_KEY_PREFIX").unwrap());
147
148        // Clear all previous objects
149        let objects_to_delete = client
150            .list_objects_v2(ListObjectsV2Request {
151                bucket: bucket.clone(),
152                prefix: Some(prefix.clone()),
153                ..Default::default()
154            })
155            .await
156            .unwrap()
157            .contents
158            .unwrap_or_default();
159        let keys_to_delete: Vec<_> = objects_to_delete
160            .into_iter()
161            .filter_map(|o| o.key)
162            .collect();
163
164        if !keys_to_delete.is_empty() {
165            println!(
166                "Will first clear previous objects in S3: {:?}",
167                keys_to_delete
168            );
169            client
170                .delete_objects(DeleteObjectsRequest {
171                    bucket: bucket.clone(),
172                    delete: Delete {
173                        objects: keys_to_delete
174                            .into_iter()
175                            .map(|key| ObjectIdentifier {
176                                key,
177                                version_id: None,
178                            })
179                            .collect(),
180                        ..Default::default()
181                    },
182                    ..Default::default()
183                })
184                .await
185                .unwrap();
186        }
187
188        let timeout = Duration::from_secs(10);
189        test_persistence(RusotoS3::new(client, bucket, prefix, timeout)).await;
190    }
191}