dynamodb_book_ch19_ecomm/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use core::fmt;
4use std::collections::HashMap;
5
6use aliri_braid::braid;
7use modyne::{
8    expr, keys, model::TransactWrite, projections, read_projection, Aggregate, Entity, EntityExt,
9    Error, Item, Projection, QueryInput, QueryInputExt, Table,
10};
11use svix_ksuid::{Ksuid, KsuidLike};
12
13pub struct App {
14    table_name: std::sync::Arc<str>,
15    client: aws_sdk_dynamodb::Client,
16}
17
18impl App {
19    pub fn new(client: aws_sdk_dynamodb::Client) -> Self {
20        Self::new_with_table(client, "EcommerceTable")
21    }
22
23    pub fn new_with_table(client: aws_sdk_dynamodb::Client, table_name: &str) -> Self {
24        Self {
25            table_name: std::sync::Arc::from(table_name),
26            client,
27        }
28    }
29}
30
31impl Table for App {
32    type PrimaryKey = keys::Primary;
33    type IndexKeys = keys::Gsi1;
34
35    fn table_name(&self) -> &str {
36        &self.table_name
37    }
38
39    fn client(&self) -> &aws_sdk_dynamodb::Client {
40        &self.client
41    }
42}
43
44impl App {
45    pub async fn create_customer(&self, input: Customer) -> Result<(), Error> {
46        let email_entity = CustomerEmail {
47            email: input.email.clone(),
48            user_name: input.user_name.clone(),
49        };
50
51        let _result = TransactWrite::new()
52            .operation(input.create())
53            .operation(email_entity.create())
54            .execute(self)
55            .await?;
56
57        Ok(())
58    }
59
60    pub async fn upsert_address(
61        &self,
62        user_name: &UserNameRef,
63        address_type: &str,
64        input: Address,
65    ) -> Result<(), Error> {
66        let expression = expr::Update::new("SET #address.#address_type = :address")
67            .name("#address", "address")
68            .name("#address_type", address_type)
69            .value(":address", input);
70
71        Customer::update(user_name)
72            .expression(expression)
73            .execute(self)
74            .await?;
75
76        Ok(())
77    }
78
79    pub async fn get_customer_orders_page(
80        &self,
81        user_name: &UserNameRef,
82        next: Option<Item>,
83        limit: Option<u32>,
84    ) -> Result<(CustomerOrders, Option<Item>), Error> {
85        let query_input = CustomerOrdersQuery { user_name };
86
87        let mut customer_orders = CustomerOrders::default();
88
89        let result = query_input
90            .query()
91            .set_exclusive_start_key(next)
92            .set_limit(limit)
93            .execute(self)
94            .await?;
95
96        customer_orders.reduce(result.items.unwrap_or_default())?;
97
98        Ok((customer_orders, result.last_evaluated_key))
99    }
100
101    pub async fn save_order(&self, order: Order, items: Vec<OrderItem>) -> Result<(), Error> {
102        let mut builder = TransactWrite::new().operation(order.create());
103
104        for item in items {
105            builder = builder.operation(item.create());
106        }
107
108        let _result = builder.execute(self).await?;
109
110        Ok(())
111    }
112
113    pub async fn update_order_status(
114        &self,
115        user_name: &UserNameRef,
116        order_id: OrderId,
117        status: OrderStatus,
118    ) -> Result<(), Error> {
119        let key = OrderKeyInput {
120            user_name,
121            order_id,
122        };
123
124        let expression = expr::Update::new("SET #status = :status")
125            .name("#status", "status")
126            .value(":status", status);
127
128        Order::update(key)
129            .expression(expression)
130            .execute(self)
131            .await?;
132
133        Ok(())
134    }
135
136    pub async fn get_order(&self, order_id: OrderId) -> Result<OrderWithItems, Error> {
137        let query_input = OrderWithItemsQuery { order_id };
138
139        let mut order = OrderWithItems::default();
140        let mut next = None;
141
142        loop {
143            let result = query_input
144                .query()
145                .set_exclusive_start_key(next)
146                .execute(self)
147                .await?;
148
149            order.reduce(result.items.unwrap_or_default())?;
150
151            let Some(last_evaluated_key) = result.last_evaluated_key else {
152                break;
153            };
154
155            next = Some(last_evaluated_key);
156        }
157
158        Ok(order)
159    }
160}
161
162#[braid(serde)]
163pub struct UserName;
164
165#[braid(serde)]
166pub struct UserEmail;
167
168#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
169pub struct Customer {
170    pub user_name: UserName,
171    pub name: String,
172    pub email: UserEmail,
173    pub addresses: HashMap<String, Address>,
174}
175
176#[derive(Debug, serde::Serialize, serde::Deserialize)]
177pub struct Address {
178    pub street: String,
179    pub city: String,
180    pub state: String,
181}
182
183impl Entity for Customer {
184    type KeyInput<'a> = &'a UserNameRef;
185    type Table = App;
186    type IndexKeys = ();
187
188    fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
189        let common = format!("CUSTOMER#{}", input);
190        keys::Primary {
191            hash: common.clone(),
192            range: common,
193        }
194    }
195
196    fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
197        Self::primary_key(&self.user_name).into()
198    }
199}
200
201#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
202struct CustomerEmail {
203    user_name: UserName,
204    email: UserEmail,
205}
206
207impl Entity for CustomerEmail {
208    type KeyInput<'a> = &'a UserEmailRef;
209    type Table = App;
210    type IndexKeys = ();
211
212    fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
213        let common = format!("CUSTOMEREMAIL#{}", input);
214        keys::Primary {
215            hash: common.clone(),
216            range: common,
217        }
218    }
219
220    fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
221        Self::primary_key(&self.email).into()
222    }
223}
224
225#[derive(
226    Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
227)]
228#[serde(transparent)]
229pub struct OrderId(Ksuid);
230
231impl OrderId {
232    #[allow(clippy::new_without_default)]
233    pub fn new() -> Self {
234        Self(Ksuid::new(None, None))
235    }
236}
237
238impl fmt::Display for OrderId {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        self.0.fmt(f)
241    }
242}
243
244impl std::str::FromStr for OrderId {
245    type Err = svix_ksuid::Error;
246
247    fn from_str(s: &str) -> Result<Self, Self::Err> {
248        Ksuid::from_str(s).map(Self)
249    }
250}
251
252#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
253pub struct Order {
254    pub user_name: UserName,
255    pub order_id: OrderId,
256    #[serde(with = "time::serde::rfc3339")]
257    pub created_at: time::OffsetDateTime,
258    pub number_of_items: u32,
259    pub amount: f32,
260    pub status: OrderStatus,
261}
262
263#[derive(Debug, serde::Serialize, serde::Deserialize)]
264#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
265pub enum OrderStatus {
266    Accepted,
267    Canceled,
268    Shipped,
269    Delivered,
270}
271
272pub struct OrderKeyInput<'a> {
273    user_name: &'a UserNameRef,
274    order_id: OrderId,
275}
276
277impl Entity for Order {
278    type KeyInput<'a> = OrderKeyInput<'a>;
279    type Table = App;
280    type IndexKeys = keys::Gsi1;
281
282    fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
283        keys::Primary {
284            hash: format!("CUSTOMER#{}", input.user_name),
285            range: format!("#ORDER#{}", input.order_id),
286        }
287    }
288
289    fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
290        keys::FullKey {
291            primary: Self::primary_key(OrderKeyInput {
292                user_name: &self.user_name,
293                order_id: self.order_id,
294            }),
295            indexes: keys::Gsi1 {
296                hash: format!("ORDER#{}", self.order_id),
297                range: format!("ORDER#{}", self.order_id),
298            },
299        }
300    }
301}
302
303#[braid(serde)]
304pub struct ItemId;
305
306#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
307pub struct OrderItem {
308    pub order_id: Ksuid,
309    pub item_id: ItemId,
310    pub description: String,
311    pub price: f32,
312}
313
314#[derive(Debug)]
315pub struct OrderItemKeyInput<'a> {
316    order_id: Ksuid,
317    item_id: &'a ItemIdRef,
318}
319
320impl Entity for OrderItem {
321    type KeyInput<'a> = OrderItemKeyInput<'a>;
322    type Table = App;
323    type IndexKeys = keys::Gsi1;
324
325    fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
326        keys::Primary {
327            hash: format!("ORDER#{}", input.order_id),
328            range: format!("ORDER#{}#ITEM#{}", input.order_id, input.item_id),
329        }
330    }
331
332    fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
333        keys::FullKey {
334            primary: Self::primary_key(OrderItemKeyInput {
335                order_id: self.order_id,
336                item_id: &self.item_id,
337            }),
338            indexes: keys::Gsi1 {
339                hash: format!("ORDER#{}", self.order_id),
340                range: format!("ITEM#{}", self.item_id),
341            },
342        }
343    }
344}
345
346/// A projection of customer data that does not include address information.
347#[derive(Debug, Projection, serde::Serialize, serde::Deserialize)]
348#[entity(Customer)]
349pub struct CustomerHeader {
350    pub user_name: UserName,
351    pub name: String,
352    pub email: UserEmail,
353}
354
355#[derive(Debug, Default)]
356pub struct CustomerOrders {
357    pub orders: Vec<Order>,
358    pub customer: Option<CustomerHeader>,
359}
360
361pub struct CustomerOrdersQuery<'a> {
362    user_name: &'a UserNameRef,
363}
364
365impl QueryInput for CustomerOrdersQuery<'_> {
366    type Index = keys::Primary;
367    type Aggregate = CustomerOrders;
368
369    fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
370        expr::KeyCondition::in_partition(format!("CUSTOMER#{}", self.user_name))
371    }
372}
373
374projections! {
375    pub enum CustomerOrdersEntities {
376        Order,
377        CustomerHeader,
378    }
379}
380
381impl Aggregate for CustomerOrders {
382    type Projections = CustomerOrdersEntities;
383
384    fn merge(&mut self, item: Item) -> Result<(), Error> {
385        match read_projection!(item)? {
386            Self::Projections::Order(order) => self.orders.push(order),
387            Self::Projections::CustomerHeader(header) => self.customer = Some(header),
388        }
389
390        Ok(())
391    }
392}
393
394#[derive(Debug, Default)]
395pub struct OrderWithItems {
396    pub order: Option<Order>,
397    pub items: Vec<OrderItem>,
398}
399
400pub struct OrderWithItemsQuery {
401    pub order_id: OrderId,
402}
403
404impl QueryInput for OrderWithItemsQuery {
405    type Index = keys::Gsi1;
406    type Aggregate = OrderWithItems;
407
408    fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
409        expr::KeyCondition::in_partition(format!("ORDER#{}", self.order_id))
410    }
411}
412
413projections! {
414    pub enum OrderWithItemsEntities {
415        Order,
416        OrderItem,
417    }
418}
419
420impl Aggregate for OrderWithItems {
421    type Projections = OrderWithItemsEntities;
422
423    fn merge(&mut self, item: Item) -> Result<(), Error> {
424        match read_projection!(item)? {
425            Self::Projections::Order(order) => self.order = Some(order),
426            Self::Projections::OrderItem(item) => self.items.push(item),
427        }
428
429        Ok(())
430    }
431}
432
433#[cfg(test)]
434mod tests {
435    use aws_sdk_dynamodb::types::AttributeValue;
436
437    use super::*;
438
439    #[test]
440    fn verify_user_orders_entities_projection_expression() {
441        assert_eq!(
442        <CustomerOrdersEntities as modyne::ProjectionSet>::projection_expression(),
443        Some(expr::StaticProjection {
444            expression: "user_name,order_id,created_at,number_of_items,amount,#prj_000,#prj_001,email,entity_type",
445            names: &[
446                ("#prj_000", "status"),
447                ("#prj_001", "name"),
448            ],
449        })
450    );
451    }
452
453    #[test]
454    fn verify_order_with_items_entities_projection_expression() {
455        assert_eq!(
456        <OrderWithItemsEntities as modyne::ProjectionSet>::projection_expression(),
457        Some(expr::StaticProjection {
458            expression: "user_name,order_id,created_at,number_of_items,amount,#prj_000,item_id,description,price,entity_type",
459            names: &[
460                ("#prj_000", "status"),
461            ],
462        }),
463    );
464    }
465
466    #[test]
467    fn verify_order_entity_full_item_serializes_as_expected() {
468        let order_id = "1VrgXBQ0VCshuQUnh1HrDIHQNwY".parse().unwrap();
469        let order = Order {
470            user_name: UserName::from_static("alexdebrie"),
471            order_id,
472            created_at: time::OffsetDateTime::from_unix_timestamp(1578016664).unwrap(),
473            number_of_items: 7,
474            status: OrderStatus::Shipped,
475            amount: 67.43,
476        };
477
478        let item = order.into_item();
479
480        assert_eq!(item["PK"].as_s().unwrap(), "CUSTOMER#alexdebrie");
481        assert_eq!(
482            item["SK"].as_s().unwrap(),
483            "#ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
484        );
485        assert_eq!(
486            item["GSI1PK"].as_s().unwrap(),
487            "ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
488        );
489        assert_eq!(
490            item["GSI1SK"].as_s().unwrap(),
491            "ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
492        );
493        assert_eq!(item["entity_type"].as_s().unwrap(), "order");
494        assert_eq!(item["user_name"].as_s().unwrap(), "alexdebrie");
495        assert_eq!(
496            item["order_id"].as_s().unwrap(),
497            "1VrgXBQ0VCshuQUnh1HrDIHQNwY"
498        );
499        assert_eq!(item["created_at"].as_s().unwrap(), "2020-01-03T01:57:44Z");
500        assert_eq!(item["number_of_items"].as_n().unwrap(), "7");
501        assert_eq!(item["status"].as_s().unwrap(), "SHIPPED");
502        assert_eq!(item["amount"].as_n().unwrap(), "67.43");
503        assert_eq!(item.len(), 11);
504    }
505
506    #[test]
507    fn verify_customer_orders_entity_hydrates_as_expected() {
508        #[allow(non_snake_case)]
509        fn Str(s: &str) -> AttributeValue {
510            AttributeValue::S(s.to_string())
511        }
512
513        #[allow(non_snake_case)]
514        fn Num(s: &str) -> AttributeValue {
515            AttributeValue::N(s.to_string())
516        }
517
518        let items = [
519            [
520                ("entity_type", Str("customer")),
521                ("user_name", Str("alexdebrie")),
522                ("name", Str("Alex DeBrie")),
523                ("email", Str("alexdebrie1@gmail.com")),
524            ]
525            .into_iter()
526            .map(|(k, v)| (k.to_string(), v))
527            .collect::<Item>(),
528            [
529                ("entity_type", Str("order")),
530                ("user_name", Str("alexdebrie")),
531                ("order_id", Str("1VwVAvJk1GvBFfpTAjm0KG7Cg9d")),
532                ("created_at", Str("2020-01-04T18:53:24Z")),
533                ("number_of_items", Num("2")),
534                ("status", Str("CANCELED")),
535                ("amount", Num("12.43")),
536            ]
537            .into_iter()
538            .map(|(k, v)| (k.to_string(), v))
539            .collect(),
540            [
541                ("entity_type", Str("order")),
542                ("user_name", Str("alexdebrie")),
543                ("order_id", Str("1VrgXBQ0VCshuQUnh1HrDIHQNwY")),
544                ("created_at", Str("2020-01-03T01:57:44Z")),
545                ("number_of_items", Num("7")),
546                ("status", Str("SHIPPED")),
547                ("amount", Num("67.43")),
548            ]
549            .into_iter()
550            .map(|(k, v)| (k.to_string(), v))
551            .collect(),
552        ];
553
554        let mut customer_orders = CustomerOrders::default();
555
556        for item in items {
557            customer_orders.merge(item).unwrap();
558        }
559
560        assert!(customer_orders.customer.is_some());
561        assert_eq!(customer_orders.orders.len(), 2);
562    }
563}