dynamodb_book_ch18_sessionstore/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use aliri_braid::braid;
4use modyne::{
5    expr, keys, types::Expiry, Aggregate, Entity, EntityDef, EntityExt, Error, Projection,
6    ProjectionExt, QueryInput, QueryInputExt, Table,
7};
8
9#[derive(Clone, Debug)]
10pub struct App {
11    table_name: std::sync::Arc<str>,
12    client: aws_sdk_dynamodb::Client,
13}
14
15impl App {
16    pub fn new(client: aws_sdk_dynamodb::Client) -> Self {
17        Self::new_with_table(client, "SessionStore")
18    }
19
20    pub fn new_with_table(client: aws_sdk_dynamodb::Client, table_name: &str) -> Self {
21        Self {
22            table_name: std::sync::Arc::from(table_name),
23            client,
24        }
25    }
26}
27
28impl Table for App {
29    /// For demonstration, this example uses a non-standard entity type attribute name
30    const ENTITY_TYPE_ATTRIBUTE: &'static str = "et";
31
32    type PrimaryKey = SessionToken;
33    type IndexKeys = UsernameKey;
34
35    fn table_name(&self) -> &str {
36        &self.table_name
37    }
38
39    fn client(&self) -> &aws_sdk_dynamodb::Client {
40        &self.client
41    }
42}
43
44impl App {
45    pub async fn create_session(&self, session: Session) -> Result<(), Error> {
46        session.create().execute(self).await?;
47        Ok(())
48    }
49
50    pub async fn get_session(&self, session_token: uuid::Uuid) -> Result<Option<Session>, Error> {
51        let now = time::OffsetDateTime::now_utc();
52        self.get_session_with_now(session_token, now).await
53    }
54
55    pub async fn get_session_with_now(
56        &self,
57        session_token: uuid::Uuid,
58        now: time::OffsetDateTime,
59    ) -> Result<Option<Session>, Error> {
60        let result = Session::get(session_token).execute(self).await?;
61        if let Some(item) = result.item {
62            let session = Session::from_item(item)?;
63            if session.expires_at > now {
64                Ok(Some(session))
65            } else {
66                Ok(None)
67            }
68        } else {
69            Ok(None)
70        }
71    }
72
73    pub async fn delete_user_sessions(&self, user: &UsernameRef) -> Result<(), Error> {
74        let mut joiner = tokio::task::JoinSet::new();
75        loop {
76            let mut agg = Vec::<SessionTokenOnly>::new();
77
78            let result = user.query().execute(self).await?;
79
80            agg.reduce(result.items.unwrap_or_default())?;
81
82            for session in agg {
83                let this = self.clone();
84                joiner.spawn(
85                    async move { Session::delete(session.session_token).execute(&this).await },
86                );
87            }
88
89            if result.last_evaluated_key.is_none() {
90                break;
91            }
92        }
93
94        let mut last_result = Ok(());
95
96        while let Some(next) = joiner.join_next().await {
97            match next {
98                Ok(Ok(_)) => {}
99                Ok(Err(err)) => {
100                    tracing::error!(
101                        exception = &err as &dyn std::error::Error,
102                        "error while deleting session"
103                    );
104                    last_result = Err(err);
105                }
106                Err(err) => {
107                    tracing::error!(
108                        exception = &err as &dyn std::error::Error,
109                        "panic while deleting session"
110                    );
111                }
112            }
113        }
114
115        Ok(last_result?)
116    }
117}
118
119#[braid(serde)]
120pub struct Username;
121
122#[derive(Clone, Debug, serde::Serialize)]
123pub struct SessionToken {
124    pub session_token: uuid::Uuid,
125}
126
127impl keys::PrimaryKey for SessionToken {
128    const PRIMARY_KEY_DEFINITION: keys::PrimaryKeyDefinition = keys::PrimaryKeyDefinition {
129        hash_key: "session_token",
130        range_key: None,
131    };
132}
133
134impl keys::Key for SessionToken {
135    const DEFINITION: keys::KeyDefinition =
136        <Self as keys::PrimaryKey>::PRIMARY_KEY_DEFINITION.into_key_definition();
137}
138
139#[derive(Clone, Debug, serde::Serialize)]
140pub struct UsernameKey {
141    pub username: Username,
142}
143
144impl keys::IndexKey for UsernameKey {
145    const INDEX_DEFINITION: keys::SecondaryIndexDefinition = keys::GlobalSecondaryIndexDefinition {
146        index_name: "UserIndex",
147        hash_key: "username",
148        range_key: None,
149    }
150    .into_index();
151}
152
153#[derive(Clone, Debug, EntityDef, serde::Serialize, serde::Deserialize)]
154pub struct Session {
155    pub session_token: uuid::Uuid,
156    pub username: Username,
157    #[serde(with = "time::serde::rfc3339")]
158    pub created_at: time::OffsetDateTime,
159    #[serde(with = "time::serde::rfc3339")]
160    pub expires_at: time::OffsetDateTime,
161    pub ttl: Expiry,
162}
163
164impl Entity for Session {
165    type KeyInput<'a> = uuid::Uuid;
166    type Table = App;
167    type IndexKeys = UsernameKey;
168
169    fn primary_key(input: Self::KeyInput<'_>) -> SessionToken {
170        SessionToken {
171            session_token: input,
172        }
173    }
174
175    fn full_key(&self) -> keys::FullKey<SessionToken, Self::IndexKeys> {
176        keys::FullKey {
177            primary: Self::primary_key(self.session_token),
178            indexes: UsernameKey {
179                username: self.username.clone(),
180            },
181        }
182    }
183}
184
185#[derive(Clone, Debug, Projection, serde::Deserialize)]
186#[entity(Session)]
187pub struct SessionTokenOnly {
188    pub session_token: uuid::Uuid,
189}
190
191impl QueryInput for UsernameRef {
192    type Index = UsernameKey;
193    type Aggregate = Vec<SessionTokenOnly>;
194
195    fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
196        expr::KeyCondition::in_partition(self)
197    }
198}