tower_sessions_dynamodb_store/
lib.rs

1use std::collections::hash_map::HashMap;
2
3use async_trait::async_trait;
4pub use aws_config;
5pub use aws_sdk_dynamodb;
6use aws_sdk_dynamodb::{
7    operation::{
8        batch_write_item::BatchWriteItemError, delete_item::DeleteItemError,
9        put_item::PutItemError, query::QueryError, scan::ScanError,
10    },
11    primitives::Blob,
12    types::{AttributeValue, DeleteRequest, WriteRequest},
13    Client,
14};
15use time::OffsetDateTime;
16use tower_sessions::{
17    session::{Id, Record},
18    session_store, ExpiredDeletion, SessionStore,
19};
20use tracing::info;
21
22/// An error type for `DynamoDBStore`.
23#[derive(thiserror::Error, Debug)]
24pub enum DynamoDBStoreError {
25    /// A variant to map `aws_sdk_dynamodb::error::BuildError`
26    /// errors.
27    #[error(transparent)]
28    DynamoDbBuild(#[from] aws_sdk_dynamodb::error::BuildError),
29
30    /// A variant to map `aws_sdk_dynamodb::error::SdkError<QueryError>`
31    /// errors.
32    #[error(transparent)]
33    DynamoDbQuery(#[from] aws_sdk_dynamodb::error::SdkError<QueryError>),
34
35    /// A variant to map `aws_sdk_dynamodb::error::SdkError<PutItemError>`
36    /// errors.
37    #[error(transparent)]
38    DynamoDbPutItem(#[from] aws_sdk_dynamodb::error::SdkError<PutItemError>),
39
40    /// A variant to map `aws_sdk_dynamodb::error::SdkError<DeleteItemError>`
41    /// errors.
42    #[error(transparent)]
43    DynamoDbDeleteItem(#[from] aws_sdk_dynamodb::error::SdkError<DeleteItemError>),
44
45    /// A variant to map
46    /// `aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>` errors.
47    #[error(transparent)]
48    DynamoDbBatchWriteItem(#[from] aws_sdk_dynamodb::error::SdkError<BatchWriteItemError>),
49
50    /// A variant to map `aws_sdk_dynamodb::error::SdkError<ScanError>` errors.
51    #[error(transparent)]
52    DynamoDbScan(#[from] aws_sdk_dynamodb::error::SdkError<ScanError>),
53
54    /// A variant to map `rmp_serde` encode errors.
55    #[error(transparent)]
56    Encode(#[from] rmp_serde::encode::Error),
57
58    /// A variant to map `rmp_serde` decode errors.
59    #[error(transparent)]
60    Decode(#[from] rmp_serde::decode::Error),
61}
62
63impl From<DynamoDBStoreError> for session_store::Error {
64    fn from(err: DynamoDBStoreError) -> Self {
65        match err {
66            DynamoDBStoreError::DynamoDbBuild(inner) => {
67                session_store::Error::Backend(inner.to_string())
68            }
69            DynamoDBStoreError::DynamoDbQuery(inner) => {
70                session_store::Error::Backend(inner.to_string())
71            }
72            DynamoDBStoreError::DynamoDbPutItem(inner) => {
73                session_store::Error::Backend(inner.to_string())
74            }
75            DynamoDBStoreError::DynamoDbDeleteItem(inner) => {
76                session_store::Error::Backend(inner.to_string())
77            }
78            DynamoDBStoreError::DynamoDbBatchWriteItem(inner) => {
79                session_store::Error::Backend(inner.to_string())
80            }
81            DynamoDBStoreError::DynamoDbScan(inner) => {
82                session_store::Error::Backend(inner.to_string())
83            }
84            DynamoDBStoreError::Decode(inner) => session_store::Error::Decode(inner.to_string()),
85            DynamoDBStoreError::Encode(inner) => session_store::Error::Encode(inner.to_string()),
86        }
87    }
88}
89
90/// Holds the DynamoDB property name of a key, and optionaly a prefix/suffix to
91/// add to the session id before saving to DynamoDB.
92#[derive(Clone, Debug)]
93pub struct DynamoDBStoreKey {
94    /// The property name of the key.
95    pub name: String,
96    /// The optional prefix to add before the session id (useful for singletable
97    /// designs).
98    pub prefix: Option<String>,
99    /// The optional suffix to add after the session id (useful for singletable
100    /// designs).
101    pub suffix: Option<String>,
102}
103
104impl Default for DynamoDBStoreKey {
105    fn default() -> Self {
106        DynamoDBStoreKey {
107            name: "session_id".to_string(),
108            prefix: Some("SESSIONS::TOWER::".to_string()),
109            suffix: None,
110        }
111    }
112}
113
114/// Properties for configuring the session store.
115#[derive(Clone, Debug)]
116pub struct DynamoDBStoreProps {
117    /// DynamoDB table name to store sessions in.
118    pub table_name: String,
119
120    /// The DynamoDB partition(hash) key to store the session_id at.
121    pub partition_key: DynamoDBStoreKey,
122
123    /// The DynamoDB sort(search) key to store the session_id under (useful with
124    /// singletable designs).
125    pub sort_key: Option<DynamoDBStoreKey>,
126
127    /// The property name to hold the expiration time of the session, a unix
128    /// timestamp in seconds.
129    pub expirey_name: String,
130
131    /// The property name to hold the session data blob.
132    pub data_name: String,
133
134    /// The number of retrys to attempt whencreating akey, before giving up.
135    pub create_key_max_retry_attempts: usize,
136}
137
138impl Default for DynamoDBStoreProps {
139    fn default() -> Self {
140        Self {
141            table_name: "TowerSessions".to_string(),
142            partition_key: DynamoDBStoreKey::default(),
143            sort_key: None,
144            expirey_name: "expire_at".to_string(),
145            data_name: "data".to_string(),
146            create_key_max_retry_attempts: 5,
147        }
148    }
149}
150
151/// A DynamoDB backed session store.
152#[derive(Clone, Debug)]
153pub struct DynamoDBStore {
154    /// the aws-sdk DynamoDB client to use when managing towser-sessions.
155    pub client: Client,
156    /// the DynamoDB backend configuration properties.
157    pub props: DynamoDBStoreProps,
158}
159
160impl DynamoDBStore {
161    /// Create a new DynamoDBStore store with the default store properties.
162    ///
163    /// # Examples
164    ///
165    /// ```rust,no_run
166    /// use tower_sessions_dynamodb_store::{
167    ///     aws_config, aws_sdk_dynamodb, DynamoDBStore, DynamoDBStoreProps,
168    /// };
169    ///
170    /// let store_props = DynamoDBStoreProps::default();
171    ///
172    /// # tokio_test::block_on(async {
173    /// let config = aws_config::load_from_env().await;
174    /// let client = aws_sdk_dynamodb::Client::new(&config);
175    /// let session_store = DynamoDBStore::new(client, store_props);
176    /// # })
177    /// ```
178    pub fn new(client: Client, props: DynamoDBStoreProps) -> Self {
179        Self { client, props }
180    }
181
182    fn pk<S: ToString>(&self, input: S) -> String {
183        format!(
184            "{}{}{}",
185            self.props
186                .partition_key
187                .prefix
188                .clone()
189                .unwrap_or("".to_string()),
190            input.to_string(),
191            self.props
192                .partition_key
193                .suffix
194                .clone()
195                .unwrap_or("".to_string())
196        )
197    }
198
199    fn sk<S: ToString>(&self, input: S) -> String {
200        if let Some(sk) = &self.props.sort_key {
201            format!(
202                "{}{}{}",
203                sk.prefix.clone().unwrap_or("".to_string()),
204                input.to_string(),
205                sk.suffix.clone().unwrap_or("".to_string())
206            )
207        } else {
208            "".to_string()
209        }
210    }
211}
212
213#[async_trait]
214impl ExpiredDeletion for DynamoDBStore {
215    // scans are typically to be avoided in dynamodb; the ExpiredDeletion trait
216    // assumes a scan is available on the Store; it recommended to use this in
217    // conjunction with a dynamo Time to Live setting on the DynamoDB table.
218    // A TTL setting will let dynamodb cull expired sessions inbetween
219    // delete_expired runs, preventing the scan from returning larger results,
220    // taking longer, costing more, and increasing the chance for a failure
221    // while batch processing.
222    // NOTE: a DynamoDB TTL does not offer an SLA on deleting the columns below 48
223    // hours. While typically items are removed within seconds of their TLL
224    // value, they can remain in the table for up to 2 days before AWS removes
225    // them.
226    //
227    // see: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html
228    async fn delete_expired(&self) -> session_store::Result<()> {
229        info!("Deleting expired sessions");
230        let now_sec = OffsetDateTime::now_utc().unix_timestamp();
231        let now_av = AttributeValue::N(now_sec.to_string());
232
233        // expression_attribute_names
234        let mut projection = "#pk";
235        let mut attribute_names = HashMap::new();
236        attribute_names.insert("#expire_at".to_string(), self.props.expirey_name.clone());
237        attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
238        if let Some(sk) = &self.props.sort_key {
239            attribute_names.insert("#sk".to_string(), sk.name.clone());
240            projection = "#pk, #sk";
241        }
242
243        let mut expired_sessions = self
244            .client
245            .scan()
246            .table_name(&self.props.table_name)
247            .set_expression_attribute_names(Some(attribute_names))
248            .expression_attribute_values(":now", now_av)
249            .filter_expression("#expire_at < :now")
250            .projection_expression(projection)
251            .into_paginator()
252            .page_size(25)
253            .items()
254            .send();
255
256        // batchwriteitem only processes 25 items at a time
257        let mut batches: Vec<Vec<WriteRequest>> = Vec::with_capacity(50);
258        let mut batch: Vec<WriteRequest> = Vec::with_capacity(25);
259        while let Some(session) = expired_sessions.next().await {
260            if batch.len() == 25 {
261                batches.push(batch);
262                batch = Vec::with_capacity(25);
263            }
264            let delete_keys = session.map_err(DynamoDBStoreError::DynamoDbScan)?.clone();
265            let delete_request = DeleteRequest::builder()
266                .set_key(Some(delete_keys))
267                .build()
268                .map_err(DynamoDBStoreError::DynamoDbBuild)?;
269            let write_request = WriteRequest::builder()
270                .delete_request(delete_request)
271                .build();
272            batch.push(write_request);
273        }
274        if !batch.is_empty() {
275            batches.push(batch);
276        }
277
278        // process each batch of 25 epired sessions
279        for delete_batch in batches {
280            let mut unprocessed_count = delete_batch.len();
281            let mut unprocessed = Some(HashMap::from([(
282                self.props.table_name.clone(),
283                delete_batch,
284            )]));
285            while unprocessed_count > 0 {
286                let new_unprocessed_items = self
287                    .client
288                    .batch_write_item()
289                    .set_request_items(unprocessed)
290                    .send()
291                    .await
292                    .map_err(DynamoDBStoreError::DynamoDbBatchWriteItem)?
293                    .unprocessed_items;
294                unprocessed_count = new_unprocessed_items
295                    .as_ref()
296                    .map(|m| {
297                        m.get(&self.props.table_name)
298                            .map(|v| v.len())
299                            .unwrap_or_default()
300                    })
301                    .unwrap_or_default();
302                unprocessed = new_unprocessed_items;
303            }
304        }
305
306        Ok(())
307    }
308}
309
310#[async_trait]
311impl SessionStore for DynamoDBStore {
312    async fn create(&self, record: &mut Record) -> session_store::Result<()> {
313        let mut retries_attempted = 0;
314        loop {
315            let exp_sec = record.expiry_date.unix_timestamp();
316            let data_bytes = rmp_serde::to_vec(record).map_err(DynamoDBStoreError::Encode)?;
317
318            let mut item = HashMap::new();
319            item.insert(
320                self.props.partition_key.name.clone(),
321                AttributeValue::S(self.pk(record.id)),
322            );
323            item.insert(
324                self.props.data_name.clone(),
325                AttributeValue::B(Blob::new(data_bytes)),
326            );
327            item.insert(
328                self.props.expirey_name.clone(),
329                AttributeValue::N(exp_sec.to_string()),
330            );
331
332            // introducing a ConditionExpression to test if the given key already exists,
333            // using suggested ConditionExpression from:
334            // ihttps://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.ConditionExpressions.html#Expressions.ConditionExpressions.PreventingOverwrites
335            let mut attribute_names = HashMap::new();
336            let mut condition = "attribute_not_exists(#pk)";
337
338            attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
339
340            if let Some(sk) = &self.props.sort_key {
341                item.insert(sk.name.clone(), AttributeValue::S(self.sk(record.id)));
342                attribute_names.insert("#sk".to_string(), sk.name.clone());
343                condition = "attribute_not_exists(#pk) AND attribute_not_exists(#sk)";
344            }
345
346            match self
347                .client
348                .put_item()
349                .table_name(&self.props.table_name)
350                .set_expression_attribute_names(Some(attribute_names))
351                .set_item(Some(item))
352                .condition_expression(condition)
353                .send()
354                .await
355            {
356                Ok(_) => return Ok(()),
357                Err(sdk_err) => match sdk_err.as_service_error() {
358                    Some(PutItemError::ConditionalCheckFailedException(_))
359                        if retries_attempted < self.props.create_key_max_retry_attempts =>
360                    {
361                        retries_attempted += 1;
362                        record.id = Id::default();
363                        continue;
364                    }
365                    _ => {
366                        return Err(session_store::Error::from(
367                            DynamoDBStoreError::DynamoDbPutItem(sdk_err),
368                        ))
369                    }
370                },
371            }
372        }
373    }
374
375    async fn save(&self, record: &Record) -> session_store::Result<()> {
376        let exp_sec = record.expiry_date.unix_timestamp();
377        let data_bytes = rmp_serde::to_vec(record).map_err(DynamoDBStoreError::Encode)?;
378
379        let mut item = HashMap::new();
380        item.insert(
381            self.props.partition_key.name.clone(),
382            AttributeValue::S(self.pk(record.id)),
383        );
384        item.insert(
385            self.props.data_name.clone(),
386            AttributeValue::B(Blob::new(data_bytes)),
387        );
388        item.insert(
389            self.props.expirey_name.clone(),
390            AttributeValue::N(exp_sec.to_string()),
391        );
392        if let Some(sk) = &self.props.sort_key {
393            item.insert(sk.name.clone(), AttributeValue::S(self.sk(record.id)));
394        }
395
396        self.client
397            .put_item()
398            .table_name(&self.props.table_name)
399            .set_item(Some(item))
400            .send()
401            .await
402            .map_err(DynamoDBStoreError::DynamoDbPutItem)?;
403
404        Ok(())
405    }
406
407    async fn load(&self, session_id: &Id) -> session_store::Result<Option<Record>> {
408        let now_sec = OffsetDateTime::now_utc().unix_timestamp();
409
410        let mut attribute_names = HashMap::new();
411        let mut attribute_values = HashMap::new();
412        let mut key_condition = "#pk = :pk";
413
414        let expire_av = AttributeValue::N(now_sec.to_string());
415        let pk_av = AttributeValue::S(self.pk(session_id));
416
417        attribute_names.insert("#expire_at".to_string(), self.props.expirey_name.clone());
418        attribute_values.insert(":expire_at".to_string(), expire_av);
419
420        attribute_names.insert("#pk".to_string(), self.props.partition_key.name.clone());
421        attribute_values.insert(":pk".to_string(), pk_av);
422
423        if let Some(sk) = &self.props.sort_key {
424            let sk_av = AttributeValue::S(self.sk(session_id));
425            attribute_names.insert("#sk".to_string(), sk.name.clone());
426            attribute_values.insert(":sk".to_string(), sk_av);
427            key_condition = "#pk = :pk AND #sk = :sk";
428        }
429
430        let item = self
431            .client
432            .query()
433            .table_name(&self.props.table_name)
434            .set_expression_attribute_names(Some(attribute_names))
435            .set_expression_attribute_values(Some(attribute_values))
436            .key_condition_expression(key_condition)
437            .filter_expression("#expire_at > :expire_at")
438            .send()
439            .await
440            .map_err(DynamoDBStoreError::DynamoDbQuery)?
441            .items
442            .and_then(|list| list.into_iter().next())
443            .and_then(|map| {
444                if let Some(AttributeValue::B(blob)) = map.get(&self.props.data_name) {
445                    Some(blob.clone().into_inner())
446                } else {
447                    None
448                }
449            });
450
451        if let Some(bytes) = item {
452            Ok(Some(
453                rmp_serde::from_slice(&bytes).map_err(DynamoDBStoreError::Decode)?,
454            ))
455        } else {
456            Ok(None)
457        }
458    }
459
460    async fn delete(&self, session_id: &Id) -> session_store::Result<()> {
461        let _ = if let Some(sk) = &self.props.sort_key {
462            self.client
463                .delete_item()
464                .table_name(&self.props.table_name)
465                .key(
466                    &self.props.partition_key.name,
467                    AttributeValue::S(self.pk(session_id)),
468                )
469                .key(&sk.name, AttributeValue::S(self.sk(session_id)))
470                .send()
471                .await
472                .map_err(DynamoDBStoreError::DynamoDbDeleteItem)?;
473        } else {
474            self.client
475                .delete_item()
476                .table_name(&self.props.table_name)
477                .key(
478                    &self.props.partition_key.name,
479                    AttributeValue::S(self.pk(session_id)),
480                )
481                .send()
482                .await
483                .map_err(DynamoDBStoreError::DynamoDbDeleteItem)?;
484        };
485        Ok(())
486    }
487}