feattle_sync/
aws_sdk_s3.rs1use async_trait::async_trait;
2use aws_sdk_s3::operation::get_object::GetObjectError;
3use aws_sdk_s3::primitives::ByteStream;
4use aws_sdk_s3::Client;
5use aws_types::SdkConfig;
6use feattle_core::persist::{CurrentValues, Persist, ValueHistory};
7use feattle_core::BoxError;
8use serde::de::DeserializeOwned;
9use serde::Serialize;
10use std::fmt;
11
12#[derive(Clone)]
44pub struct S3 {
45 client: Client,
46 bucket: String,
47 prefix: String,
48}
49
50impl fmt::Debug for S3 {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 f.debug_struct("S3")
53 .field("client", &"S3Client")
54 .field("bucket", &self.bucket)
55 .field("prefix", &self.prefix)
56 .finish()
57 }
58}
59
60impl S3 {
61 pub fn new(config: &SdkConfig, bucket: String, prefix: String) -> Self {
62 S3 {
63 client: Client::new(config),
64 bucket,
65 prefix,
66 }
67 }
68
69 async fn save<T: Serialize>(&self, name: &str, value: T) -> Result<(), BoxError> {
70 let key = format!("{}{}", self.prefix, name);
71 let contents = serde_json::to_vec(&value)?;
72 self.client
73 .put_object()
74 .bucket(self.bucket.clone())
75 .key(key)
76 .body(ByteStream::from(contents))
77 .send()
78 .await?;
79
80 Ok(())
81 }
82
83 async fn load<T: DeserializeOwned>(&self, name: &str) -> Result<Option<T>, BoxError> {
84 let key = format!("{}{}", self.prefix, name);
85 let get_object = self
86 .client
87 .get_object()
88 .bucket(self.bucket.clone())
89 .key(key)
90 .send()
91 .await
92 .map_err(|x| x.into_service_error());
93 match get_object {
94 Err(GetObjectError::NoSuchKey(_)) => Ok(None),
95 Ok(response) => {
96 let contents = response.body.collect().await?.to_vec();
97 Ok(Some(serde_json::from_slice(&contents)?))
98 }
99 Err(error) => Err(error.into()),
100 }
101 }
102}
103
104#[async_trait]
105impl Persist for S3 {
106 async fn save_current(&self, value: &CurrentValues) -> Result<(), BoxError> {
107 self.save("current.json", value).await
108 }
109
110 async fn load_current(&self) -> Result<Option<CurrentValues>, BoxError> {
111 self.load("current.json").await
112 }
113
114 async fn save_history(&self, key: &str, value: &ValueHistory) -> Result<(), BoxError> {
115 self.save(&format!("history-{}.json", key), value).await
116 }
117
118 async fn load_history(&self, key: &str) -> Result<Option<ValueHistory>, BoxError> {
119 self.load(&format!("history-{}.json", key)).await
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::tests::test_persistence;
127 use aws_sdk_s3::types::{Delete, ObjectIdentifier};
128
129 #[tokio::test]
130 async fn s3() {
131 use std::env;
132
133 dotenv::dotenv().ok();
134
135 let config = aws_config::load_from_env().await;
138 let bucket = env::var("S3_BUCKET").unwrap();
139 let prefix = format!("{}/aws-sdk-s3", env::var("S3_KEY_PREFIX").unwrap());
140 let client = Client::new(&config);
141
142 let objects_to_delete = client
144 .list_objects_v2()
145 .bucket(&bucket)
146 .prefix(&prefix)
147 .send()
148 .await
149 .unwrap()
150 .contents
151 .unwrap_or_default();
152 let keys_to_delete: Vec<_> = objects_to_delete
153 .into_iter()
154 .filter_map(|o| o.key)
155 .collect();
156
157 if !keys_to_delete.is_empty() {
158 println!(
159 "Will first clear previous objects in S3: {:?}",
160 keys_to_delete
161 );
162
163 let mut delete_builder = Delete::builder();
164 for key in keys_to_delete {
165 delete_builder =
166 delete_builder.objects(ObjectIdentifier::builder().key(key).build().unwrap());
167 }
168
169 client
170 .delete_objects()
171 .bucket(&bucket)
172 .delete(delete_builder.build().unwrap())
173 .send()
174 .await
175 .unwrap();
176 }
177
178 test_persistence(S3::new(&config, bucket, prefix)).await;
179 }
180}