1use crate::state::{
28 ContextEvent, StateError, StateStore, UserContext, UserPreferences,
29 VaultState, NotificationPrefs, init_user_context,
30};
31use crate::action::SessionKey;
32use aws_sdk_dynamodb::{
33 Client,
34 types::{AttributeValue, KeyType, ProjectionType, ScalarAttributeType},
35};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::time::{SystemTime, UNIX_EPOCH};
39use tracing::{info, warn, instrument};
40
41#[derive(Debug, Clone)]
47pub struct DynamoConfig {
48 pub table_name: String,
50 pub fingerprint_gsi: String,
52 pub ttl_attribute: String,
54 pub max_pending_captures: usize,
56 pub session_ttl_seconds: i64,
58 pub pending_ttl_seconds: i64,
60 pub dax_endpoint: Option<String>,
62}
63
64impl Default for DynamoConfig {
65 fn default() -> Self {
66 Self {
67 table_name: "loop-agent-state".to_string(),
68 fingerprint_gsi: "CardFingerprintIndex".to_string(),
69 ttl_attribute: "ttl".to_string(),
70 max_pending_captures: 10,
71 session_ttl_seconds: 86400, pending_ttl_seconds: 1814400, dax_endpoint: None,
74 }
75 }
76}
77
78impl DynamoConfig {
79 pub fn from_env() -> Self {
81 Self {
82 table_name: std::env::var("DYNAMO_TABLE")
83 .unwrap_or_else(|_| "loop-agent-state".to_string()),
84 fingerprint_gsi: std::env::var("DYNAMO_FINGERPRINT_GSI")
85 .unwrap_or_else(|_| "CardFingerprintIndex".to_string()),
86 ttl_attribute: "ttl".to_string(),
87 max_pending_captures: std::env::var("MAX_PENDING_CAPTURES")
88 .ok()
89 .and_then(|s| s.parse().ok())
90 .unwrap_or(10),
91 session_ttl_seconds: std::env::var("SESSION_TTL_SECONDS")
92 .ok()
93 .and_then(|s| s.parse().ok())
94 .unwrap_or(86400),
95 pending_ttl_seconds: std::env::var("PENDING_TTL_SECONDS")
96 .ok()
97 .and_then(|s| s.parse().ok())
98 .unwrap_or(1814400),
99 dax_endpoint: std::env::var("DAX_ENDPOINT").ok(),
100 }
101 }
102}
103
104pub struct DynamoStateStore {
110 client: Client,
111 config: DynamoConfig,
112}
113
114impl DynamoStateStore {
115 pub async fn new() -> Result<Self, StateError> {
117 let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
118 let client = Client::new(&config);
119
120 Ok(Self {
121 client,
122 config: DynamoConfig::from_env(),
123 })
124 }
125
126 pub async fn with_config(config: DynamoConfig) -> Result<Self, StateError> {
128 let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
129 let client = Client::new(&aws_config);
130
131 Ok(Self { client, config })
132 }
133
134 fn now() -> i64 {
136 SystemTime::now()
137 .duration_since(UNIX_EPOCH)
138 .unwrap()
139 .as_secs() as i64
140 }
141
142 fn ttl_at(&self, seconds_from_now: i64) -> i64 {
144 Self::now() + seconds_from_now
145 }
146
147 fn user_pk(pubkey: &str) -> String {
152 format!("USER#{}", pubkey)
153 }
154
155 fn card_pk(fingerprint: &str) -> String {
156 format!("CARD#{}", fingerprint)
157 }
158
159 fn txn_pk(txn_id: &str) -> String {
160 format!("TXN#{}", txn_id)
161 }
162
163 fn pending_sk(timestamp: i64) -> String {
164 format!("PENDING#{:020}", timestamp)
165 }
166
167 #[instrument(skip(self))]
174 pub async fn lookup_user_by_card(&self, fingerprint: &str) -> Result<Option<CardMapping>, StateError> {
175 let result = self.client
176 .query()
177 .table_name(&self.config.table_name)
178 .index_name(&self.config.fingerprint_gsi)
179 .key_condition_expression("fingerprint = :fp")
180 .expression_attribute_values(":fp", AttributeValue::S(fingerprint.to_string()))
181 .limit(1)
182 .send()
183 .await
184 .map_err(|e| StateError::StorageError(e.to_string()))?;
185
186 if let Some(items) = result.items {
187 if let Some(item) = items.first() {
188 return Ok(Some(CardMapping::from_dynamo(item)?));
189 }
190 }
191
192 Ok(None)
193 }
194
195 #[instrument(skip(self))]
197 pub async fn link_card(&self, fingerprint: &str, user_pubkey: &str, card_last4: &str) -> Result<(), StateError> {
198 let now = Self::now();
199
200 self.client
201 .put_item()
202 .table_name(&self.config.table_name)
203 .item("pk", AttributeValue::S(Self::card_pk(fingerprint)))
204 .item("sk", AttributeValue::S("META".to_string()))
205 .item("fingerprint", AttributeValue::S(fingerprint.to_string()))
206 .item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
207 .item("card_last4", AttributeValue::S(card_last4.to_string()))
208 .item("linked_at", AttributeValue::N(now.to_string()))
209 .send()
210 .await
211 .map_err(|e| StateError::StorageError(e.to_string()))?;
212
213 info!(fingerprint = %fingerprint, user = %user_pubkey, "Card linked");
214 Ok(())
215 }
216
217 pub async fn unlink_card(&self, fingerprint: &str) -> Result<(), StateError> {
219 self.client
220 .delete_item()
221 .table_name(&self.config.table_name)
222 .key("pk", AttributeValue::S(Self::card_pk(fingerprint)))
223 .key("sk", AttributeValue::S("META".to_string()))
224 .send()
225 .await
226 .map_err(|e| StateError::StorageError(e.to_string()))?;
227
228 Ok(())
229 }
230
231 pub async fn get_user_cards(&self, user_pubkey: &str) -> Result<Vec<CardMapping>, StateError> {
233 let result = self.client
236 .scan()
237 .table_name(&self.config.table_name)
238 .filter_expression("user_pubkey = :user AND begins_with(pk, :prefix)")
239 .expression_attribute_values(":user", AttributeValue::S(user_pubkey.to_string()))
240 .expression_attribute_values(":prefix", AttributeValue::S("CARD#".to_string()))
241 .send()
242 .await
243 .map_err(|e| StateError::StorageError(e.to_string()))?;
244
245 let mut cards = Vec::new();
246 if let Some(items) = result.items {
247 for item in items {
248 if let Ok(card) = CardMapping::from_dynamo(&item) {
249 cards.push(card);
250 }
251 }
252 }
253
254 Ok(cards)
255 }
256
257 pub async fn is_transaction_processed(&self, txn_id: &str) -> Result<bool, StateError> {
263 let result = self.client
264 .get_item()
265 .table_name(&self.config.table_name)
266 .key("pk", AttributeValue::S(Self::txn_pk(txn_id)))
267 .key("sk", AttributeValue::S("PROCESSED".to_string()))
268 .projection_expression("pk")
269 .send()
270 .await
271 .map_err(|e| StateError::StorageError(e.to_string()))?;
272
273 Ok(result.item.is_some())
274 }
275
276 pub async fn mark_transaction_processed(
278 &self,
279 txn_id: &str,
280 user_pubkey: &str,
281 cred_amount: u64,
282 ) -> Result<(), StateError> {
283 let now = Self::now();
284 let ttl = self.ttl_at(86400 * 30); self.client
287 .put_item()
288 .table_name(&self.config.table_name)
289 .item("pk", AttributeValue::S(Self::txn_pk(txn_id)))
290 .item("sk", AttributeValue::S("PROCESSED".to_string()))
291 .item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
292 .item("cred_amount", AttributeValue::N(cred_amount.to_string()))
293 .item("processed_at", AttributeValue::N(now.to_string()))
294 .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
295 .send()
296 .await
297 .map_err(|e| StateError::StorageError(e.to_string()))?;
298
299 Ok(())
300 }
301
302 pub async fn store_session_key(&self, user_pubkey: &str, session: &SessionKeyData) -> Result<(), StateError> {
308 let ttl = self.ttl_at(self.config.session_ttl_seconds);
309 let session_json = serde_json::to_string(session)
310 .map_err(|e| StateError::SerializationError(e.to_string()))?;
311
312 self.client
313 .put_item()
314 .table_name(&self.config.table_name)
315 .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
316 .item("sk", AttributeValue::S("SESSION".to_string()))
317 .item("session_data", AttributeValue::S(session_json))
318 .item("expires_at", AttributeValue::N(session.expires_at.to_string()))
319 .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
320 .send()
321 .await
322 .map_err(|e| StateError::StorageError(e.to_string()))?;
323
324 info!(user = %user_pubkey, expires_at = session.expires_at, "Session key stored");
325 Ok(())
326 }
327
328 pub async fn get_session_key(&self, user_pubkey: &str) -> Result<Option<SessionKeyData>, StateError> {
330 let result = self.client
331 .get_item()
332 .table_name(&self.config.table_name)
333 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
334 .key("sk", AttributeValue::S("SESSION".to_string()))
335 .send()
336 .await
337 .map_err(|e| StateError::StorageError(e.to_string()))?;
338
339 if let Some(item) = result.item {
340 if let Some(AttributeValue::S(session_json)) = item.get("session_data") {
341 let session: SessionKeyData = serde_json::from_str(session_json)
342 .map_err(|e| StateError::SerializationError(e.to_string()))?;
343
344 if session.expires_at > Self::now() {
346 return Ok(Some(session));
347 } else {
348 return Ok(None);
350 }
351 }
352 }
353
354 Ok(None)
355 }
356
357 pub async fn invalidate_session(&self, user_pubkey: &str) -> Result<(), StateError> {
359 self.client
360 .delete_item()
361 .table_name(&self.config.table_name)
362 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
363 .key("sk", AttributeValue::S("SESSION".to_string()))
364 .send()
365 .await
366 .map_err(|e| StateError::StorageError(e.to_string()))?;
367
368 Ok(())
369 }
370
371 pub async fn add_pending_capture(&self, user_pubkey: &str, capture: &PendingCapture) -> Result<(), StateError> {
377 let now = Self::now();
378 let ttl = self.ttl_at(self.config.pending_ttl_seconds);
379 let capture_json = serde_json::to_string(capture)
380 .map_err(|e| StateError::SerializationError(e.to_string()))?;
381
382 self.client
384 .put_item()
385 .table_name(&self.config.table_name)
386 .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
387 .item("sk", AttributeValue::S(Self::pending_sk(now)))
388 .item("capture_data", AttributeValue::S(capture_json))
389 .item("merchant_id", AttributeValue::S(capture.merchant_id.clone()))
390 .item("amount_cents", AttributeValue::N(capture.amount_cents.to_string()))
391 .item("created_at", AttributeValue::N(now.to_string()))
392 .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
393 .send()
394 .await
395 .map_err(|e| StateError::StorageError(e.to_string()))?;
396
397 self.enforce_pending_limit(user_pubkey).await?;
399
400 info!(user = %user_pubkey, merchant = %capture.merchant_id, "Pending capture added");
401 Ok(())
402 }
403
404 pub async fn get_pending_captures(&self, user_pubkey: &str) -> Result<Vec<PendingCapture>, StateError> {
406 let result = self.client
407 .query()
408 .table_name(&self.config.table_name)
409 .key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
410 .expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
411 .expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
412 .scan_index_forward(false) .send()
414 .await
415 .map_err(|e| StateError::StorageError(e.to_string()))?;
416
417 let mut captures = Vec::new();
418 if let Some(items) = result.items {
419 for item in items {
420 if let Some(AttributeValue::S(capture_json)) = item.get("capture_data") {
421 if let Ok(capture) = serde_json::from_str::<PendingCapture>(capture_json) {
422 captures.push(capture);
423 }
424 }
425 }
426 }
427
428 Ok(captures)
429 }
430
431 pub async fn remove_pending_capture(&self, user_pubkey: &str, timestamp: i64) -> Result<(), StateError> {
433 self.client
434 .delete_item()
435 .table_name(&self.config.table_name)
436 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
437 .key("sk", AttributeValue::S(Self::pending_sk(timestamp)))
438 .send()
439 .await
440 .map_err(|e| StateError::StorageError(e.to_string()))?;
441
442 Ok(())
443 }
444
445 async fn enforce_pending_limit(&self, user_pubkey: &str) -> Result<(), StateError> {
447 let result = self.client
448 .query()
449 .table_name(&self.config.table_name)
450 .key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
451 .expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
452 .expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
453 .scan_index_forward(true) .projection_expression("sk")
455 .send()
456 .await
457 .map_err(|e| StateError::StorageError(e.to_string()))?;
458
459 if let Some(items) = result.items {
460 if items.len() > self.config.max_pending_captures {
461 let to_delete = items.len() - self.config.max_pending_captures;
463 for item in items.iter().take(to_delete) {
464 if let Some(AttributeValue::S(sk)) = item.get("sk") {
465 self.client
466 .delete_item()
467 .table_name(&self.config.table_name)
468 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
469 .key("sk", AttributeValue::S(sk.clone()))
470 .send()
471 .await
472 .map_err(|e| StateError::StorageError(e.to_string()))?;
473 }
474 }
475 info!(user = %user_pubkey, deleted = to_delete, "Enforced pending capture limit");
476 }
477 }
478
479 Ok(())
480 }
481
482 pub async fn cache_current_merchant(&self, user_pubkey: &str, merchant_id: &str) -> Result<(), StateError> {
488 let now = Self::now();
489 let ttl = self.ttl_at(3600); self.client
492 .put_item()
493 .table_name(&self.config.table_name)
494 .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
495 .item("sk", AttributeValue::S("LOCATION".to_string()))
496 .item("merchant_id", AttributeValue::S(merchant_id.to_string()))
497 .item("entered_at", AttributeValue::N(now.to_string()))
498 .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
499 .send()
500 .await
501 .map_err(|e| StateError::StorageError(e.to_string()))?;
502
503 Ok(())
504 }
505
506 pub async fn get_current_merchant(&self, user_pubkey: &str) -> Result<Option<String>, StateError> {
508 let result = self.client
509 .get_item()
510 .table_name(&self.config.table_name)
511 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
512 .key("sk", AttributeValue::S("LOCATION".to_string()))
513 .send()
514 .await
515 .map_err(|e| StateError::StorageError(e.to_string()))?;
516
517 if let Some(item) = result.item {
518 if let Some(AttributeValue::S(merchant_id)) = item.get("merchant_id") {
519 return Ok(Some(merchant_id.clone()));
520 }
521 }
522
523 Ok(None)
524 }
525
526 pub async fn clear_current_merchant(&self, user_pubkey: &str) -> Result<(), StateError> {
528 self.client
529 .delete_item()
530 .table_name(&self.config.table_name)
531 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
532 .key("sk", AttributeValue::S("LOCATION".to_string()))
533 .send()
534 .await
535 .map_err(|e| StateError::StorageError(e.to_string()))?;
536
537 Ok(())
538 }
539}
540
541impl StateStore for DynamoStateStore {
546 fn context_fetch(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
547 tokio::runtime::Handle::current().block_on(async {
550 self.context_fetch_async(user_pubkey).await
551 })
552 }
553
554 fn context_push(&self, context: UserContext) -> Result<(), StateError> {
555 tokio::runtime::Handle::current().block_on(async {
556 self.context_push_async(context).await
557 })
558 }
559
560 fn context_batch_fetch(&self, user_pubkeys: &[String]) -> Result<Vec<UserContext>, StateError> {
561 tokio::runtime::Handle::current().block_on(async {
562 let mut contexts = Vec::new();
563 for pubkey in user_pubkeys {
564 if let Ok(ctx) = self.context_fetch_async(pubkey).await {
565 contexts.push(ctx);
566 }
567 }
568 Ok(contexts)
569 })
570 }
571
572 fn context_patch(
573 &self,
574 user_pubkey: &str,
575 path: &str,
576 value: serde_json::Value,
577 ) -> Result<(), StateError> {
578 tokio::runtime::Handle::current().block_on(async {
579 self.context_patch_async(user_pubkey, path, value).await
580 })
581 }
582
583 fn push_event(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
584 tokio::runtime::Handle::current().block_on(async {
585 self.push_event_async(user_pubkey, event).await
586 })
587 }
588
589 fn get_or_init(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
590 tokio::runtime::Handle::current().block_on(async {
591 match self.context_fetch_async(user_pubkey).await {
592 Ok(ctx) => Ok(ctx),
593 Err(StateError::NotFound) => {
594 let ctx = init_user_context(user_pubkey);
595 self.context_push_async(ctx.clone()).await?;
596 Ok(ctx)
597 }
598 Err(e) => Err(e),
599 }
600 })
601 }
602}
603
604impl DynamoStateStore {
605 #[instrument(skip(self))]
607 pub async fn context_fetch_async(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
608 let result = self.client
609 .get_item()
610 .table_name(&self.config.table_name)
611 .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
612 .key("sk", AttributeValue::S("CONTEXT".to_string()))
613 .consistent_read(true) .send()
615 .await
616 .map_err(|e| StateError::StorageError(e.to_string()))?;
617
618 if let Some(item) = result.item {
619 if let Some(AttributeValue::S(context_json)) = item.get("context_data") {
620 let context: UserContext = serde_json::from_str(context_json)
621 .map_err(|e| StateError::SerializationError(e.to_string()))?;
622 return Ok(context);
623 }
624 }
625
626 Err(StateError::NotFound)
627 }
628
629 #[instrument(skip(self, context))]
631 pub async fn context_push_async(&self, context: UserContext) -> Result<(), StateError> {
632 let context_json = serde_json::to_string(&context)
633 .map_err(|e| StateError::SerializationError(e.to_string()))?;
634
635 let mut put = self.client
636 .put_item()
637 .table_name(&self.config.table_name)
638 .item("pk", AttributeValue::S(Self::user_pk(&context.pubkey)))
639 .item("sk", AttributeValue::S("CONTEXT".to_string()))
640 .item("context_data", AttributeValue::S(context_json))
641 .item("version", AttributeValue::N(context.version.to_string()))
642 .item("updated_at", AttributeValue::N(context.updated_at.to_string()));
643
644 if context.version > 1 {
646 put = put
647 .condition_expression("version = :expected_version OR attribute_not_exists(version)")
648 .expression_attribute_values(
649 ":expected_version",
650 AttributeValue::N((context.version - 1).to_string()),
651 );
652 }
653
654 put.send()
655 .await
656 .map_err(|e| {
657 if e.to_string().contains("ConditionalCheckFailed") {
658 StateError::VersionConflict {
659 expected: context.version - 1,
660 actual: context.version,
661 }
662 } else {
663 StateError::StorageError(e.to_string())
664 }
665 })?;
666
667 Ok(())
668 }
669
670 pub async fn context_patch_async(
672 &self,
673 user_pubkey: &str,
674 path: &str,
675 value: serde_json::Value,
676 ) -> Result<(), StateError> {
677 let mut ctx = self.context_fetch_async(user_pubkey).await?;
680
681 match path {
683 "preferences.auto_stake" => {
684 if let Some(v) = value.as_bool() {
685 ctx.preferences.auto_stake = v;
686 }
687 }
688 "preferences.default_duration_days" => {
689 if let Some(v) = value.as_u64() {
690 ctx.preferences.default_duration_days = v as u16;
691 }
692 }
693 "preferences.auto_compound" => {
694 if let Some(v) = value.as_bool() {
695 ctx.preferences.auto_compound = v;
696 }
697 }
698 _ => {
699 warn!(path = %path, "Unknown context path for patch");
700 }
701 }
702
703 ctx.version += 1;
704 ctx.updated_at = Self::now();
705 self.context_push_async(ctx).await
706 }
707
708 pub async fn push_event_async(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
710 let mut ctx = match self.context_fetch_async(user_pubkey).await {
711 Ok(c) => c,
712 Err(StateError::NotFound) => init_user_context(user_pubkey),
713 Err(e) => return Err(e),
714 };
715
716 ctx.recent_events.insert(0, event);
718 ctx.recent_events.truncate(10);
719 ctx.version += 1;
720 ctx.updated_at = Self::now();
721
722 self.context_push_async(ctx).await
723 }
724}
725
726#[derive(Debug, Clone, Serialize, Deserialize)]
732pub struct CardMapping {
733 pub fingerprint: String,
734 pub user_pubkey: String,
735 pub card_last4: String,
736 pub linked_at: i64,
737}
738
739impl CardMapping {
740 fn from_dynamo(item: &HashMap<String, AttributeValue>) -> Result<Self, StateError> {
741 Ok(Self {
742 fingerprint: item.get("fingerprint")
743 .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
744 .ok_or_else(|| StateError::SerializationError("Missing fingerprint".into()))?,
745 user_pubkey: item.get("user_pubkey")
746 .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
747 .ok_or_else(|| StateError::SerializationError("Missing user_pubkey".into()))?,
748 card_last4: item.get("card_last4")
749 .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
750 .unwrap_or_default(),
751 linked_at: item.get("linked_at")
752 .and_then(|v| if let AttributeValue::N(n) = v { n.parse().ok() } else { None })
753 .unwrap_or(0),
754 })
755 }
756}
757
758#[derive(Debug, Clone, Serialize, Deserialize)]
760pub struct PendingCapture {
761 pub merchant_id: String,
762 pub merchant_name: Option<String>,
763 pub amount_cents: u64,
764 pub card_last4: Option<String>,
765 pub proof_data: Option<String>,
766 pub proof_type: Option<String>,
767 pub location: Option<PendingLocation>,
768 pub created_at: i64,
769 pub expires_at: i64,
770}
771
772#[derive(Debug, Clone, Serialize, Deserialize)]
773pub struct PendingLocation {
774 pub latitude: f64,
775 pub longitude: f64,
776 pub accuracy_m: f32,
777}
778
779#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct SessionKeyData {
782 pub key_pubkey: String,
783 pub vault_pubkey: String,
784 pub scopes: Vec<String>,
785 pub expires_at: i64,
786 pub created_at: i64,
787 pub max_actions: u32,
788 pub actions_used: u32,
789}
790
791pub async fn create_table(client: &Client, config: &DynamoConfig) -> Result<(), Box<dyn std::error::Error>> {
798 client
799 .create_table()
800 .table_name(&config.table_name)
801 .attribute_definitions(
803 aws_sdk_dynamodb::types::AttributeDefinition::builder()
804 .attribute_name("pk")
805 .attribute_type(ScalarAttributeType::S)
806 .build()?
807 )
808 .attribute_definitions(
809 aws_sdk_dynamodb::types::AttributeDefinition::builder()
810 .attribute_name("sk")
811 .attribute_type(ScalarAttributeType::S)
812 .build()?
813 )
814 .attribute_definitions(
815 aws_sdk_dynamodb::types::AttributeDefinition::builder()
816 .attribute_name("fingerprint")
817 .attribute_type(ScalarAttributeType::S)
818 .build()?
819 )
820 .key_schema(
821 aws_sdk_dynamodb::types::KeySchemaElement::builder()
822 .attribute_name("pk")
823 .key_type(KeyType::Hash)
824 .build()?
825 )
826 .key_schema(
827 aws_sdk_dynamodb::types::KeySchemaElement::builder()
828 .attribute_name("sk")
829 .key_type(KeyType::Range)
830 .build()?
831 )
832 .global_secondary_indexes(
834 aws_sdk_dynamodb::types::GlobalSecondaryIndex::builder()
835 .index_name(&config.fingerprint_gsi)
836 .key_schema(
837 aws_sdk_dynamodb::types::KeySchemaElement::builder()
838 .attribute_name("fingerprint")
839 .key_type(KeyType::Hash)
840 .build()?
841 )
842 .projection(
843 aws_sdk_dynamodb::types::Projection::builder()
844 .projection_type(ProjectionType::Include)
845 .non_key_attributes("user_pubkey")
846 .non_key_attributes("card_last4")
847 .non_key_attributes("linked_at")
848 .build()
849 )
850 .build()?
851 )
852 .billing_mode(aws_sdk_dynamodb::types::BillingMode::PayPerRequest)
854 .send()
855 .await?;
856
857 client
859 .update_time_to_live()
860 .table_name(&config.table_name)
861 .time_to_live_specification(
862 aws_sdk_dynamodb::types::TimeToLiveSpecification::builder()
863 .enabled(true)
864 .attribute_name(&config.ttl_attribute)
865 .build()?
866 )
867 .send()
868 .await?;
869
870 info!(table = %config.table_name, "DynamoDB table created with GSI and TTL");
871 Ok(())
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 #[test]
879 fn test_key_formatting() {
880 assert_eq!(DynamoStateStore::user_pk("abc123"), "USER#abc123");
881 assert_eq!(DynamoStateStore::card_pk("fp_xyz"), "CARD#fp_xyz");
882 assert_eq!(DynamoStateStore::txn_pk("tx_123"), "TXN#tx_123");
883 assert_eq!(DynamoStateStore::pending_sk(1234567890), "PENDING#00000001234567890");
884 }
885
886 #[test]
887 fn pending_sk_sorts_correctly() {
888 let sk1 = DynamoStateStore::pending_sk(1000);
889 let sk2 = DynamoStateStore::pending_sk(2000);
890 let sk3 = DynamoStateStore::pending_sk(999999999999);
891
892 assert!(sk1 < sk2);
894 assert!(sk2 < sk3);
895 }
896}