dynamodb_book_ch18_sessionstore/
lib.rs1#![doc = include_str!("../README.md")]
2
3use aliri_braid::braid;
4use modyne::{
5 expr, keys, types::Expiry, Aggregate, Entity, EntityDef, EntityExt, Error, Projection,
6 ProjectionExt, QueryInput, QueryInputExt, Table,
7};
8
9#[derive(Clone, Debug)]
10pub struct App {
11 table_name: std::sync::Arc<str>,
12 client: aws_sdk_dynamodb::Client,
13}
14
15impl App {
16 pub fn new(client: aws_sdk_dynamodb::Client) -> Self {
17 Self::new_with_table(client, "SessionStore")
18 }
19
20 pub fn new_with_table(client: aws_sdk_dynamodb::Client, table_name: &str) -> Self {
21 Self {
22 table_name: std::sync::Arc::from(table_name),
23 client,
24 }
25 }
26}
27
28impl Table for App {
29 const ENTITY_TYPE_ATTRIBUTE: &'static str = "et";
31
32 type PrimaryKey = SessionToken;
33 type IndexKeys = UsernameKey;
34
35 fn table_name(&self) -> &str {
36 &self.table_name
37 }
38
39 fn client(&self) -> &aws_sdk_dynamodb::Client {
40 &self.client
41 }
42}
43
44impl App {
45 pub async fn create_session(&self, session: Session) -> Result<(), Error> {
46 session.create().execute(self).await?;
47 Ok(())
48 }
49
50 pub async fn get_session(&self, session_token: uuid::Uuid) -> Result<Option<Session>, Error> {
51 let now = time::OffsetDateTime::now_utc();
52 self.get_session_with_now(session_token, now).await
53 }
54
55 pub async fn get_session_with_now(
56 &self,
57 session_token: uuid::Uuid,
58 now: time::OffsetDateTime,
59 ) -> Result<Option<Session>, Error> {
60 let result = Session::get(session_token).execute(self).await?;
61 if let Some(item) = result.item {
62 let session = Session::from_item(item)?;
63 if session.expires_at > now {
64 Ok(Some(session))
65 } else {
66 Ok(None)
67 }
68 } else {
69 Ok(None)
70 }
71 }
72
73 pub async fn delete_user_sessions(&self, user: &UsernameRef) -> Result<(), Error> {
74 let mut joiner = tokio::task::JoinSet::new();
75 loop {
76 let mut agg = Vec::<SessionTokenOnly>::new();
77
78 let result = user.query().execute(self).await?;
79
80 agg.reduce(result.items.unwrap_or_default())?;
81
82 for session in agg {
83 let this = self.clone();
84 joiner.spawn(
85 async move { Session::delete(session.session_token).execute(&this).await },
86 );
87 }
88
89 if result.last_evaluated_key.is_none() {
90 break;
91 }
92 }
93
94 let mut last_result = Ok(());
95
96 while let Some(next) = joiner.join_next().await {
97 match next {
98 Ok(Ok(_)) => {}
99 Ok(Err(err)) => {
100 tracing::error!(
101 exception = &err as &dyn std::error::Error,
102 "error while deleting session"
103 );
104 last_result = Err(err);
105 }
106 Err(err) => {
107 tracing::error!(
108 exception = &err as &dyn std::error::Error,
109 "panic while deleting session"
110 );
111 }
112 }
113 }
114
115 Ok(last_result?)
116 }
117}
118
119#[braid(serde)]
120pub struct Username;
121
122#[derive(Clone, Debug, serde::Serialize)]
123pub struct SessionToken {
124 pub session_token: uuid::Uuid,
125}
126
127impl keys::PrimaryKey for SessionToken {
128 const PRIMARY_KEY_DEFINITION: keys::PrimaryKeyDefinition = keys::PrimaryKeyDefinition {
129 hash_key: "session_token",
130 range_key: None,
131 };
132}
133
134impl keys::Key for SessionToken {
135 const DEFINITION: keys::KeyDefinition =
136 <Self as keys::PrimaryKey>::PRIMARY_KEY_DEFINITION.into_key_definition();
137}
138
139#[derive(Clone, Debug, serde::Serialize)]
140pub struct UsernameKey {
141 pub username: Username,
142}
143
144impl keys::IndexKey for UsernameKey {
145 const INDEX_DEFINITION: keys::SecondaryIndexDefinition = keys::GlobalSecondaryIndexDefinition {
146 index_name: "UserIndex",
147 hash_key: "username",
148 range_key: None,
149 }
150 .into_index();
151}
152
153#[derive(Clone, Debug, EntityDef, serde::Serialize, serde::Deserialize)]
154pub struct Session {
155 pub session_token: uuid::Uuid,
156 pub username: Username,
157 #[serde(with = "time::serde::rfc3339")]
158 pub created_at: time::OffsetDateTime,
159 #[serde(with = "time::serde::rfc3339")]
160 pub expires_at: time::OffsetDateTime,
161 pub ttl: Expiry,
162}
163
164impl Entity for Session {
165 type KeyInput<'a> = uuid::Uuid;
166 type Table = App;
167 type IndexKeys = UsernameKey;
168
169 fn primary_key(input: Self::KeyInput<'_>) -> SessionToken {
170 SessionToken {
171 session_token: input,
172 }
173 }
174
175 fn full_key(&self) -> keys::FullKey<SessionToken, Self::IndexKeys> {
176 keys::FullKey {
177 primary: Self::primary_key(self.session_token),
178 indexes: UsernameKey {
179 username: self.username.clone(),
180 },
181 }
182 }
183}
184
185#[derive(Clone, Debug, Projection, serde::Deserialize)]
186#[entity(Session)]
187pub struct SessionTokenOnly {
188 pub session_token: uuid::Uuid,
189}
190
191impl QueryInput for UsernameRef {
192 type Index = UsernameKey;
193 type Aggregate = Vec<SessionTokenOnly>;
194
195 fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
196 expr::KeyCondition::in_partition(self)
197 }
198}