feattle_sync/
rusoto_s3.rs1use async_trait::async_trait;
2use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
3use feattle_core::BoxError;
4use rusoto_core::RusotoError;
5use rusoto_s3::{GetObjectError, GetObjectRequest, PutObjectRequest, S3Client, S3};
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8use std::fmt;
9use std::time::Duration;
10use tokio::io::AsyncReadExt;
11use tokio::time;
12
13#[derive(Clone)]
45pub struct RusotoS3 {
46 client: S3Client,
47 bucket: String,
48 prefix: String,
49 timeout: Duration,
50}
51
52impl fmt::Debug for RusotoS3 {
53 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54 f.debug_struct("S3")
55 .field("client", &"S3Client")
56 .field("bucket", &self.bucket)
57 .field("prefix", &self.prefix)
58 .finish()
59 }
60}
61
62impl RusotoS3 {
63 pub fn new(client: S3Client, bucket: String, prefix: String, timeout: Duration) -> Self {
64 RusotoS3 {
65 client,
66 bucket,
67 prefix,
68 timeout,
69 }
70 }
71
72 async fn save<T: Serialize>(&self, name: &str, value: T) -> Result<(), BoxError> {
73 let key = format!("{}{}", self.prefix, name);
74 let contents = serde_json::to_string(&value)?;
75 let put_future = self.client.put_object(PutObjectRequest {
76 body: Some(contents.into_bytes().into()),
77 bucket: self.bucket.clone(),
78 key,
79 ..Default::default()
80 });
81 time::timeout(self.timeout, put_future).await??;
82
83 Ok(())
84 }
85
86 async fn load<T: DeserializeOwned>(&self, name: &str) -> Result<Option<T>, BoxError> {
87 let key = format!("{}{}", self.prefix, name);
88 let get_future = self.client.get_object(GetObjectRequest {
89 bucket: self.bucket.clone(),
90 key,
91 ..Default::default()
92 });
93 match time::timeout(self.timeout, get_future).await? {
94 Err(RusotoError::Service(GetObjectError::NoSuchKey(_))) => Ok(None),
95 Ok(response) => match response.body {
96 None => Ok(None),
97 Some(body) => {
98 let mut contents = String::new();
99 body.into_async_read().read_to_string(&mut contents).await?;
100 Ok(Some(serde_json::from_str(&contents)?))
101 }
102 },
103 Err(error) => Err(error.into()),
104 }
105 }
106}
107
108#[async_trait]
109impl Persist for RusotoS3 {
110 async fn save_current(&self, value: &CurrentValues) -> Result<(), BoxError> {
111 self.save("current.json", value).await
112 }
113
114 async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
115 self.load("current.json").await
116 }
117
118 async fn save_history(&self, key: &str, value: &ValueHistory) -> Result<(), BoxError> {
119 self.save(&format!("history-{}.json", key), value).await
120 }
121
122 async fn load_history(&self, key: &str) -> Result<Option<ValueHistory>, BoxError> {
123 self.load(&format!("history-{}.json", key)).await
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use crate::tests::test_persistence;
131
132 #[tokio::test]
133 async fn s3() {
134 use rusoto_core::Region;
135 use rusoto_s3::{
136 Delete, DeleteObjectsRequest, ListObjectsV2Request, ObjectIdentifier, S3Client, S3,
137 };
138 use std::env;
139
140 dotenv::dotenv().ok();
141
142 let client = S3Client::new(Region::default());
145 let bucket = env::var("S3_BUCKET").unwrap();
146 let prefix = format!("{}/rusoto-s3", env::var("S3_KEY_PREFIX").unwrap());
147
148 let objects_to_delete = client
150 .list_objects_v2(ListObjectsV2Request {
151 bucket: bucket.clone(),
152 prefix: Some(prefix.clone()),
153 ..Default::default()
154 })
155 .await
156 .unwrap()
157 .contents
158 .unwrap_or_default();
159 let keys_to_delete: Vec<_> = objects_to_delete
160 .into_iter()
161 .filter_map(|o| o.key)
162 .collect();
163
164 if !keys_to_delete.is_empty() {
165 println!(
166 "Will first clear previous objects in S3: {:?}",
167 keys_to_delete
168 );
169 client
170 .delete_objects(DeleteObjectsRequest {
171 bucket: bucket.clone(),
172 delete: Delete {
173 objects: keys_to_delete
174 .into_iter()
175 .map(|key| ObjectIdentifier {
176 key,
177 version_id: None,
178 })
179 .collect(),
180 ..Default::default()
181 },
182 ..Default::default()
183 })
184 .await
185 .unwrap();
186 }
187
188 let timeout = Duration::from_secs(10);
189 test_persistence(RusotoS3::new(client, bucket, prefix, timeout)).await;
190 }
191}