tower_sessions_dynamodb_store/
lib.rs1use std::collections::hash_map::HashMap;
2
3use async_trait::async_trait;
4pub use aws_config;
5pub use aws_sdk_dynamodb;
6use aws_sdk_dynamodb::{
7 operation::{
8 batch_write_item::BatchWriteItemError, delete_item::DeleteItemError,
9 put_item::PutItemError, query::QueryError, scan::ScanError,
10 },
11 primitives::Blob,
12 types::{AttributeValue, DeleteRequest, WriteRequest},
13 Client,
14};
15use time::OffsetDateTime;
16use tower_sessions::{
17 session::{Id, Record},
18 session_store, ExpiredDeletion, SessionStore,
19};
20use tracing::info;
21
22#[derive(thiserror::Error, Debug)]
24pub enum DynamoDBStoreError {
25 #[error(transparent)]
28 DynamoDbBuild(#[from] aws_sdk_dynamodb::error::BuildError),
29
30 #[error(transparent)]
33 DynamoDbQuery(#[from] aws_sdk_dynamodb::error::SdkError<QueryError>),
34
35 #[error(transparent)]
38 DynamoDbPutItem(#[from] aws_sdk_dynamodb::error::SdkError<PutItemError>),
39
40 #[error(transparent)]
43 DynamoDbDeleteItem(#[from] aws_sdk_dynamodb::error::SdkError<DeleteItemError>),
44
45 #[error(transparent)]
48 DynamoDbBatchWriteItem(#[from] aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>),
49
50 #[error(transparent)]
52 DynamoDbScan(#[from] aws_sdk_dynamodb::error::SdkError<ScanError>),
53
54 #[error(transparent)]
56 Encode(#[from] rmp_serde::encode::Error),
57
58 #[error(transparent)]
60 Decode(#[from] rmp_serde::decode::Error),
61}
62
63impl From<DynamoDBStoreError> for session_store::Error {
64 fn from(err: DynamoDBStoreError) -> Self {
65 match err {
66 DynamoDBStoreError::DynamoDbBuild(inner) => {
67 session_store::Error::Backend(inner.to_string())
68 }
69 DynamoDBStoreError::DynamoDbQuery(inner) => {
70 session_store::Error::Backend(inner.to_string())
71 }
72 DynamoDBStoreError::DynamoDbPutItem(inner) => {
73 session_store::Error::Backend(inner.to_string())
74 }
75 DynamoDBStoreError::DynamoDbDeleteItem(inner) => {
76 session_store::Error::Backend(inner.to_string())
77 }
78 DynamoDBStoreError::DynamoDbBatchWriteItem(inner) => {
79 session_store::Error::Backend(inner.to_string())
80 }
81 DynamoDBStoreError::DynamoDbScan(inner) => {
82 session_store::Error::Backend(inner.to_string())
83 }
84 DynamoDBStoreError::Decode(inner) => session_store::Error::Decode(inner.to_string()),
85 DynamoDBStoreError::Encode(inner) => session_store::Error::Encode(inner.to_string()),
86 }
87 }
88}
89
90#[derive(Clone, Debug)]
93pub struct DynamoDBStoreKey {
94 pub name: String,
96 pub prefix: Option<String>,
99 pub suffix: Option<String>,
102}
103
104impl Default for DynamoDBStoreKey {
105 fn default() -> Self {
106 DynamoDBStoreKey {
107 name: "session_id".to_string(),
108 prefix: Some("SESSIONS::TOWER::".to_string()),
109 suffix: None,
110 }
111 }
112}
113
114#[derive(Clone, Debug)]
116pub struct DynamoDBStoreProps {
117 pub table_name: String,
119
120 pub partition_key: DynamoDBStoreKey,
122
123 pub sort_key: Option<DynamoDBStoreKey>,
126
127 pub expirey_name: String,
130
131 pub data_name: String,
133
134 pub create_key_max_retry_attempts: usize,
136}
137
138impl Default for DynamoDBStoreProps {
139 fn default() -> Self {
140 Self {
141 table_name: "TowerSessions".to_string(),
142 partition_key: DynamoDBStoreKey::default(),
143 sort_key: None,
144 expirey_name: "expire_at".to_string(),
145 data_name: "data".to_string(),
146 create_key_max_retry_attempts: 5,
147 }
148 }
149}
150
151#[derive(Clone, Debug)]
153pub struct DynamoDBStore {
154 pub client: Client,
156 pub props: DynamoDBStoreProps,
158}
159
160impl DynamoDBStore {
161 pub fn new(client: Client, props: DynamoDBStoreProps) -> Self {
179 Self { client, props }
180 }
181
182 fn pk<S: ToString>(&self, input: S) -> String {
183 format!(
184 "{}{}{}",
185 self.props
186 .partition_key
187 .prefix
188 .clone()
189 .unwrap_or("".to_string()),
190 input.to_string(),
191 self.props
192 .partition_key
193 .suffix
194 .clone()
195 .unwrap_or("".to_string())
196 )
197 }
198
199 fn sk<S: ToString>(&self, input: S) -> String {
200 if let Some(sk) = &self.props.sort_key {
201 format!(
202 "{}{}{}",
203 sk.prefix.clone().unwrap_or("".to_string()),
204 input.to_string(),
205 sk.suffix.clone().unwrap_or("".to_string())
206 )
207 } else {
208 "".to_string()
209 }
210 }
211}
212
213#[async_trait]
214impl ExpiredDeletion for DynamoDBStore {
215 async fn delete_expired(&self) -> session_store::Result<()> {
229 info!("Deleting expired sessions");
230 let now_sec = OffsetDateTime::now_utc().unix_timestamp();
231 let now_av = AttributeValue::N(now_sec.to_string());
232
233 let mut projection = "#pk";
235 let mut attribute_names = HashMap::new();
236 attribute_names.insert("#expire_at".to_string(), self.props.expirey_name.clone());
237 attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
238 if let Some(sk) = &self.props.sort_key {
239 attribute_names.insert("#sk".to_string(), sk.name.clone());
240 projection = "#pk, #sk";
241 }
242
243 let mut expired_sessions = self
244 .client
245 .scan()
246 .table_name(&self.props.table_name)
247 .set_expression_attribute_names(Some(attribute_names))
248 .expression_attribute_values(":now", now_av)
249 .filter_expression("#expire_at < :now")
250 .projection_expression(projection)
251 .into_paginator()
252 .page_size(25)
253 .items()
254 .send();
255
256 let mut batches: Vec<Vec<WriteRequest>> = Vec::with_capacity(50);
258 let mut batch: Vec<WriteRequest> = Vec::with_capacity(25);
259 while let Some(session) = expired_sessions.next().await {
260 if batch.len() == 25 {
261 batches.push(batch);
262 batch = Vec::with_capacity(25);
263 }
264 let delete_keys = session.map_err(DynamoDBStoreError::DynamoDbScan)?.clone();
265 let delete_request = DeleteRequest::builder()
266 .set_key(Some(delete_keys))
267 .build()
268 .map_err(DynamoDBStoreError::DynamoDbBuild)?;
269 let write_request = WriteRequest::builder()
270 .delete_request(delete_request)
271 .build();
272 batch.push(write_request);
273 }
274 if !batch.is_empty() {
275 batches.push(batch);
276 }
277
278 for delete_batch in batches {
280 let mut unprocessed_count = delete_batch.len();
281 let mut unprocessed = Some(HashMap::from([(
282 self.props.table_name.clone(),
283 delete_batch,
284 )]));
285 while unprocessed_count > 0 {
286 let new_unprocessed_items = self
287 .client
288 .batch_write_item()
289 .set_request_items(unprocessed)
290 .send()
291 .await
292 .map_err(DynamoDBStoreError::DynamoDbBatchWriteItem)?
293 .unprocessed_items;
294 unprocessed_count = new_unprocessed_items
295 .as_ref()
296 .map(|m| {
297 m.get(&self.props.table_name)
298 .map(|v| v.len())
299 .unwrap_or_default()
300 })
301 .unwrap_or_default();
302 unprocessed = new_unprocessed_items;
303 }
304 }
305
306 Ok(())
307 }
308}
309
310#[async_trait]
311impl SessionStore for DynamoDBStore {
312 async fn create(&self, record: &mut Record) -> session_store::Result<()> {
313 let mut retries_attempted = 0;
314 loop {
315 let exp_sec = record.expiry_date.unix_timestamp();
316 let data_bytes = rmp_serde::to_vec(record).map_err(DynamoDBStoreError::Encode)?;
317
318 let mut item = HashMap::new();
319 item.insert(
320 self.props.partition_key.name.clone(),
321 AttributeValue::S(self.pk(record.id)),
322 );
323 item.insert(
324 self.props.data_name.clone(),
325 AttributeValue::B(Blob::new(data_bytes)),
326 );
327 item.insert(
328 self.props.expirey_name.clone(),
329 AttributeValue::N(exp_sec.to_string()),
330 );
331
332 let mut attribute_names = HashMap::new();
336 let mut condition = "attribute_not_exists(#pk)";
337
338 attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
339
340 if let Some(sk) = &self.props.sort_key {
341 item.insert(sk.name.clone(), AttributeValue::S(self.sk(record.id)));
342 attribute_names.insert("#sk".to_string(), sk.name.clone());
343 condition = "attribute_not_exists(#pk) AND attribute_not_exists(#sk)";
344 }
345
346 match self
347 .client
348 .put_item()
349 .table_name(&self.props.table_name)
350 .set_expression_attribute_names(Some(attribute_names))
351 .set_item(Some(item))
352 .condition_expression(condition)
353 .send()
354 .await
355 {
356 Ok(_) => return Ok(()),
357 Err(sdk_err) => match sdk_err.as_service_error() {
358 Some(PutItemError::ConditionalCheckFailedException(_))
359 if retries_attempted < self.props.create_key_max_retry_attempts =>
360 {
361 retries_attempted += 1;
362 record.id = Id::default();
363 continue;
364 }
365 _ => {
366 return Err(session_store::Error::from(
367 DynamoDBStoreError::DynamoDbPutItem(sdk_err),
368 ))
369 }
370 },
371 }
372 }
373 }
374
375 async fn save(&self, record: &Record) -> session_store::Result<()> {
376 let exp_sec = record.expiry_date.unix_timestamp();
377 let data_bytes = rmp_serde::to_vec(record).map_err(DynamoDBStoreError::Encode)?;
378
379 let mut item = HashMap::new();
380 item.insert(
381 self.props.partition_key.name.clone(),
382 AttributeValue::S(self.pk(record.id)),
383 );
384 item.insert(
385 self.props.data_name.clone(),
386 AttributeValue::B(Blob::new(data_bytes)),
387 );
388 item.insert(
389 self.props.expirey_name.clone(),
390 AttributeValue::N(exp_sec.to_string()),
391 );
392 if let Some(sk) = &self.props.sort_key {
393 item.insert(sk.name.clone(), AttributeValue::S(self.sk(record.id)));
394 }
395
396 self.client
397 .put_item()
398 .table_name(&self.props.table_name)
399 .set_item(Some(item))
400 .send()
401 .await
402 .map_err(DynamoDBStoreError::DynamoDbPutItem)?;
403
404 Ok(())
405 }
406
407 async fn load(&self, session_id: &Id) -> session_store::Result<Option<Record>> {
408 let now_sec = OffsetDateTime::now_utc().unix_timestamp();
409
410 let mut attribute_names = HashMap::new();
411 let mut attribute_values = HashMap::new();
412 let mut key_condition = "#pk = :pk";
413
414 let expire_av = AttributeValue::N(now_sec.to_string());
415 let pk_av = AttributeValue::S(self.pk(session_id));
416
417 attribute_names.insert("#expire_at".to_string(), self.props.expirey_name.clone());
418 attribute_values.insert(":expire_at".to_string(), expire_av);
419
420 attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
421 attribute_values.insert(":pk".to_string(), pk_av);
422
423 if let Some(sk) = &self.props.sort_key {
424 let sk_av = AttributeValue::S(self.sk(session_id));
425 attribute_names.insert("#sk".to_string(), sk.name.clone());
426 attribute_values.insert(":sk".to_string(), sk_av);
427 key_condition = "#pk = :pk AND #sk = :sk";
428 }
429
430 let item = self
431 .client
432 .query()
433 .table_name(&self.props.table_name)
434 .set_expression_attribute_names(Some(attribute_names))
435 .set_expression_attribute_values(Some(attribute_values))
436 .key_condition_expression(key_condition)
437 .filter_expression("#expire_at > :expire_at")
438 .send()
439 .await
440 .map_err(DynamoDBStoreError::DynamoDbQuery)?
441 .items
442 .and_then(|list| list.into_iter().next())
443 .and_then(|map| {
444 if let Some(AttributeValue::B(blob)) = map.get(&self.props.data_name) {
445 Some(blob.clone().into_inner())
446 } else {
447 None
448 }
449 });
450
451 if let Some(bytes) = item {
452 Ok(Some(
453 rmp_serde::from_slice(&bytes).map_err(DynamoDBStoreError::Decode)?,
454 ))
455 } else {
456 Ok(None)
457 }
458 }
459
460 async fn delete(&self, session_id: &Id) -> session_store::Result<()> {
461 let _ = if let Some(sk) = &self.props.sort_key {
462 self.client
463 .delete_item()
464 .table_name(&self.props.table_name)
465 .key(
466 &self.props.partition_key.name,
467 AttributeValue::S(self.pk(session_id)),
468 )
469 .key(&sk.name, AttributeValue::S(self.sk(session_id)))
470 .send()
471 .await
472 .map_err(DynamoDBStoreError::DynamoDbDeleteItem)?;
473 } else {
474 self.client
475 .delete_item()
476 .table_name(&self.props.table_name)
477 .key(
478 &self.props.partition_key.name,
479 AttributeValue::S(self.pk(session_id)),
480 )
481 .send()
482 .await
483 .map_err(DynamoDBStoreError::DynamoDbDeleteItem)?;
484 };
485 Ok(())
486 }
487}