1#![doc = include_str!("../README.md")]
2
3use core::fmt;
4use std::collections::HashMap;
5
6use aliri_braid::braid;
7use modyne::{
8 expr, keys, model::TransactWrite, projections, read_projection, Aggregate, Entity, EntityExt,
9 Error, Item, Projection, QueryInput, QueryInputExt, Table,
10};
11use svix_ksuid::{Ksuid, KsuidLike};
12
13pub struct App {
14 table_name: std::sync::Arc<str>,
15 client: aws_sdk_dynamodb::Client,
16}
17
18impl App {
19 pub fn new(client: aws_sdk_dynamodb::Client) -> Self {
20 Self::new_with_table(client, "EcommerceTable")
21 }
22
23 pub fn new_with_table(client: aws_sdk_dynamodb::Client, table_name: &str) -> Self {
24 Self {
25 table_name: std::sync::Arc::from(table_name),
26 client,
27 }
28 }
29}
30
31impl Table for App {
32 type PrimaryKey = keys::Primary;
33 type IndexKeys = keys::Gsi1;
34
35 fn table_name(&self) -> &str {
36 &self.table_name
37 }
38
39 fn client(&self) -> &aws_sdk_dynamodb::Client {
40 &self.client
41 }
42}
43
44impl App {
45 pub async fn create_customer(&self, input: Customer) -> Result<(), Error> {
46 let email_entity = CustomerEmail {
47 email: input.email.clone(),
48 user_name: input.user_name.clone(),
49 };
50
51 let _result = TransactWrite::new()
52 .operation(input.create())
53 .operation(email_entity.create())
54 .execute(self)
55 .await?;
56
57 Ok(())
58 }
59
60 pub async fn upsert_address(
61 &self,
62 user_name: &UserNameRef,
63 address_type: &str,
64 input: Address,
65 ) -> Result<(), Error> {
66 let expression = expr::Update::new("SET #address.#address_type = :address")
67 .name("#address", "address")
68 .name("#address_type", address_type)
69 .value(":address", input);
70
71 Customer::update(user_name)
72 .expression(expression)
73 .execute(self)
74 .await?;
75
76 Ok(())
77 }
78
79 pub async fn get_customer_orders_page(
80 &self,
81 user_name: &UserNameRef,
82 next: Option<Item>,
83 limit: Option<u32>,
84 ) -> Result<(CustomerOrders, Option<Item>), Error> {
85 let query_input = CustomerOrdersQuery { user_name };
86
87 let mut customer_orders = CustomerOrders::default();
88
89 let result = query_input
90 .query()
91 .set_exclusive_start_key(next)
92 .set_limit(limit)
93 .execute(self)
94 .await?;
95
96 customer_orders.reduce(result.items.unwrap_or_default())?;
97
98 Ok((customer_orders, result.last_evaluated_key))
99 }
100
101 pub async fn save_order(&self, order: Order, items: Vec<OrderItem>) -> Result<(), Error> {
102 let mut builder = TransactWrite::new().operation(order.create());
103
104 for item in items {
105 builder = builder.operation(item.create());
106 }
107
108 let _result = builder.execute(self).await?;
109
110 Ok(())
111 }
112
113 pub async fn update_order_status(
114 &self,
115 user_name: &UserNameRef,
116 order_id: OrderId,
117 status: OrderStatus,
118 ) -> Result<(), Error> {
119 let key = OrderKeyInput {
120 user_name,
121 order_id,
122 };
123
124 let expression = expr::Update::new("SET #status = :status")
125 .name("#status", "status")
126 .value(":status", status);
127
128 Order::update(key)
129 .expression(expression)
130 .execute(self)
131 .await?;
132
133 Ok(())
134 }
135
136 pub async fn get_order(&self, order_id: OrderId) -> Result<OrderWithItems, Error> {
137 let query_input = OrderWithItemsQuery { order_id };
138
139 let mut order = OrderWithItems::default();
140 let mut next = None;
141
142 loop {
143 let result = query_input
144 .query()
145 .set_exclusive_start_key(next)
146 .execute(self)
147 .await?;
148
149 order.reduce(result.items.unwrap_or_default())?;
150
151 let Some(last_evaluated_key) = result.last_evaluated_key else {
152 break;
153 };
154
155 next = Some(last_evaluated_key);
156 }
157
158 Ok(order)
159 }
160}
161
162#[braid(serde)]
163pub struct UserName;
164
165#[braid(serde)]
166pub struct UserEmail;
167
168#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
169pub struct Customer {
170 pub user_name: UserName,
171 pub name: String,
172 pub email: UserEmail,
173 pub addresses: HashMap<String, Address>,
174}
175
176#[derive(Debug, serde::Serialize, serde::Deserialize)]
177pub struct Address {
178 pub street: String,
179 pub city: String,
180 pub state: String,
181}
182
183impl Entity for Customer {
184 type KeyInput<'a> = &'a UserNameRef;
185 type Table = App;
186 type IndexKeys = ();
187
188 fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
189 let common = format!("CUSTOMER#{}", input);
190 keys::Primary {
191 hash: common.clone(),
192 range: common,
193 }
194 }
195
196 fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
197 Self::primary_key(&self.user_name).into()
198 }
199}
200
201#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
202struct CustomerEmail {
203 user_name: UserName,
204 email: UserEmail,
205}
206
207impl Entity for CustomerEmail {
208 type KeyInput<'a> = &'a UserEmailRef;
209 type Table = App;
210 type IndexKeys = ();
211
212 fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
213 let common = format!("CUSTOMEREMAIL#{}", input);
214 keys::Primary {
215 hash: common.clone(),
216 range: common,
217 }
218 }
219
220 fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
221 Self::primary_key(&self.email).into()
222 }
223}
224
225#[derive(
226 Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize,
227)]
228#[serde(transparent)]
229pub struct OrderId(Ksuid);
230
231impl OrderId {
232 #[allow(clippy::new_without_default)]
233 pub fn new() -> Self {
234 Self(Ksuid::new(None, None))
235 }
236}
237
238impl fmt::Display for OrderId {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 self.0.fmt(f)
241 }
242}
243
244impl std::str::FromStr for OrderId {
245 type Err = svix_ksuid::Error;
246
247 fn from_str(s: &str) -> Result<Self, Self::Err> {
248 Ksuid::from_str(s).map(Self)
249 }
250}
251
252#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
253pub struct Order {
254 pub user_name: UserName,
255 pub order_id: OrderId,
256 #[serde(with = "time::serde::rfc3339")]
257 pub created_at: time::OffsetDateTime,
258 pub number_of_items: u32,
259 pub amount: f32,
260 pub status: OrderStatus,
261}
262
263#[derive(Debug, serde::Serialize, serde::Deserialize)]
264#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
265pub enum OrderStatus {
266 Accepted,
267 Canceled,
268 Shipped,
269 Delivered,
270}
271
272pub struct OrderKeyInput<'a> {
273 user_name: &'a UserNameRef,
274 order_id: OrderId,
275}
276
277impl Entity for Order {
278 type KeyInput<'a> = OrderKeyInput<'a>;
279 type Table = App;
280 type IndexKeys = keys::Gsi1;
281
282 fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
283 keys::Primary {
284 hash: format!("CUSTOMER#{}", input.user_name),
285 range: format!("#ORDER#{}", input.order_id),
286 }
287 }
288
289 fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
290 keys::FullKey {
291 primary: Self::primary_key(OrderKeyInput {
292 user_name: &self.user_name,
293 order_id: self.order_id,
294 }),
295 indexes: keys::Gsi1 {
296 hash: format!("ORDER#{}", self.order_id),
297 range: format!("ORDER#{}", self.order_id),
298 },
299 }
300 }
301}
302
303#[braid(serde)]
304pub struct ItemId;
305
306#[derive(Debug, modyne::EntityDef, serde::Serialize, serde::Deserialize)]
307pub struct OrderItem {
308 pub order_id: Ksuid,
309 pub item_id: ItemId,
310 pub description: String,
311 pub price: f32,
312}
313
314#[derive(Debug)]
315pub struct OrderItemKeyInput<'a> {
316 order_id: Ksuid,
317 item_id: &'a ItemIdRef,
318}
319
320impl Entity for OrderItem {
321 type KeyInput<'a> = OrderItemKeyInput<'a>;
322 type Table = App;
323 type IndexKeys = keys::Gsi1;
324
325 fn primary_key(input: Self::KeyInput<'_>) -> keys::Primary {
326 keys::Primary {
327 hash: format!("ORDER#{}", input.order_id),
328 range: format!("ORDER#{}#ITEM#{}", input.order_id, input.item_id),
329 }
330 }
331
332 fn full_key(&self) -> keys::FullKey<keys::Primary, Self::IndexKeys> {
333 keys::FullKey {
334 primary: Self::primary_key(OrderItemKeyInput {
335 order_id: self.order_id,
336 item_id: &self.item_id,
337 }),
338 indexes: keys::Gsi1 {
339 hash: format!("ORDER#{}", self.order_id),
340 range: format!("ITEM#{}", self.item_id),
341 },
342 }
343 }
344}
345
346#[derive(Debug, Projection, serde::Serialize, serde::Deserialize)]
348#[entity(Customer)]
349pub struct CustomerHeader {
350 pub user_name: UserName,
351 pub name: String,
352 pub email: UserEmail,
353}
354
355#[derive(Debug, Default)]
356pub struct CustomerOrders {
357 pub orders: Vec<Order>,
358 pub customer: Option<CustomerHeader>,
359}
360
361pub struct CustomerOrdersQuery<'a> {
362 user_name: &'a UserNameRef,
363}
364
365impl QueryInput for CustomerOrdersQuery<'_> {
366 type Index = keys::Primary;
367 type Aggregate = CustomerOrders;
368
369 fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
370 expr::KeyCondition::in_partition(format!("CUSTOMER#{}", self.user_name))
371 }
372}
373
374projections! {
375 pub enum CustomerOrdersEntities {
376 Order,
377 CustomerHeader,
378 }
379}
380
381impl Aggregate for CustomerOrders {
382 type Projections = CustomerOrdersEntities;
383
384 fn merge(&mut self, item: Item) -> Result<(), Error> {
385 match read_projection!(item)? {
386 Self::Projections::Order(order) => self.orders.push(order),
387 Self::Projections::CustomerHeader(header) => self.customer = Some(header),
388 }
389
390 Ok(())
391 }
392}
393
394#[derive(Debug, Default)]
395pub struct OrderWithItems {
396 pub order: Option<Order>,
397 pub items: Vec<OrderItem>,
398}
399
400pub struct OrderWithItemsQuery {
401 pub order_id: OrderId,
402}
403
404impl QueryInput for OrderWithItemsQuery {
405 type Index = keys::Gsi1;
406 type Aggregate = OrderWithItems;
407
408 fn key_condition(&self) -> expr::KeyCondition<Self::Index> {
409 expr::KeyCondition::in_partition(format!("ORDER#{}", self.order_id))
410 }
411}
412
413projections! {
414 pub enum OrderWithItemsEntities {
415 Order,
416 OrderItem,
417 }
418}
419
420impl Aggregate for OrderWithItems {
421 type Projections = OrderWithItemsEntities;
422
423 fn merge(&mut self, item: Item) -> Result<(), Error> {
424 match read_projection!(item)? {
425 Self::Projections::Order(order) => self.order = Some(order),
426 Self::Projections::OrderItem(item) => self.items.push(item),
427 }
428
429 Ok(())
430 }
431}
432
433#[cfg(test)]
434mod tests {
435 use aws_sdk_dynamodb::types::AttributeValue;
436
437 use super::*;
438
439 #[test]
440 fn verify_user_orders_entities_projection_expression() {
441 assert_eq!(
442 <CustomerOrdersEntities as modyne::ProjectionSet>::projection_expression(),
443 Some(expr::StaticProjection {
444 expression: "user_name,order_id,created_at,number_of_items,amount,#prj_000,#prj_001,email,entity_type",
445 names: &[
446 ("#prj_000", "status"),
447 ("#prj_001", "name"),
448 ],
449 })
450 );
451 }
452
453 #[test]
454 fn verify_order_with_items_entities_projection_expression() {
455 assert_eq!(
456 <OrderWithItemsEntities as modyne::ProjectionSet>::projection_expression(),
457 Some(expr::StaticProjection {
458 expression: "user_name,order_id,created_at,number_of_items,amount,#prj_000,item_id,description,price,entity_type",
459 names: &[
460 ("#prj_000", "status"),
461 ],
462 }),
463 );
464 }
465
466 #[test]
467 fn verify_order_entity_full_item_serializes_as_expected() {
468 let order_id = "1VrgXBQ0VCshuQUnh1HrDIHQNwY".parse().unwrap();
469 let order = Order {
470 user_name: UserName::from_static("alexdebrie"),
471 order_id,
472 created_at: time::OffsetDateTime::from_unix_timestamp(1578016664).unwrap(),
473 number_of_items: 7,
474 status: OrderStatus::Shipped,
475 amount: 67.43,
476 };
477
478 let item = order.into_item();
479
480 assert_eq!(item["PK"].as_s().unwrap(), "CUSTOMER#alexdebrie");
481 assert_eq!(
482 item["SK"].as_s().unwrap(),
483 "#ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
484 );
485 assert_eq!(
486 item["GSI1PK"].as_s().unwrap(),
487 "ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
488 );
489 assert_eq!(
490 item["GSI1SK"].as_s().unwrap(),
491 "ORDER#1VrgXBQ0VCshuQUnh1HrDIHQNwY"
492 );
493 assert_eq!(item["entity_type"].as_s().unwrap(), "order");
494 assert_eq!(item["user_name"].as_s().unwrap(), "alexdebrie");
495 assert_eq!(
496 item["order_id"].as_s().unwrap(),
497 "1VrgXBQ0VCshuQUnh1HrDIHQNwY"
498 );
499 assert_eq!(item["created_at"].as_s().unwrap(), "2020-01-03T01:57:44Z");
500 assert_eq!(item["number_of_items"].as_n().unwrap(), "7");
501 assert_eq!(item["status"].as_s().unwrap(), "SHIPPED");
502 assert_eq!(item["amount"].as_n().unwrap(), "67.43");
503 assert_eq!(item.len(), 11);
504 }
505
506 #[test]
507 fn verify_customer_orders_entity_hydrates_as_expected() {
508 #[allow(non_snake_case)]
509 fn Str(s: &str) -> AttributeValue {
510 AttributeValue::S(s.to_string())
511 }
512
513 #[allow(non_snake_case)]
514 fn Num(s: &str) -> AttributeValue {
515 AttributeValue::N(s.to_string())
516 }
517
518 let items = [
519 [
520 ("entity_type", Str("customer")),
521 ("user_name", Str("alexdebrie")),
522 ("name", Str("Alex DeBrie")),
523 ("email", Str("alexdebrie1@gmail.com")),
524 ]
525 .into_iter()
526 .map(|(k, v)| (k.to_string(), v))
527 .collect::<Item>(),
528 [
529 ("entity_type", Str("order")),
530 ("user_name", Str("alexdebrie")),
531 ("order_id", Str("1VwVAvJk1GvBFfpTAjm0KG7Cg9d")),
532 ("created_at", Str("2020-01-04T18:53:24Z")),
533 ("number_of_items", Num("2")),
534 ("status", Str("CANCELED")),
535 ("amount", Num("12.43")),
536 ]
537 .into_iter()
538 .map(|(k, v)| (k.to_string(), v))
539 .collect(),
540 [
541 ("entity_type", Str("order")),
542 ("user_name", Str("alexdebrie")),
543 ("order_id", Str("1VrgXBQ0VCshuQUnh1HrDIHQNwY")),
544 ("created_at", Str("2020-01-03T01:57:44Z")),
545 ("number_of_items", Num("7")),
546 ("status", Str("SHIPPED")),
547 ("amount", Num("67.43")),
548 ]
549 .into_iter()
550 .map(|(k, v)| (k.to_string(), v))
551 .collect(),
552 ];
553
554 let mut customer_orders = CustomerOrders::default();
555
556 for item in items {
557 customer_orders.merge(item).unwrap();
558 }
559
560 assert!(customer_orders.customer.is_some());
561 assert_eq!(customer_orders.orders.len(), 2);
562 }
563}