Skip to main content

loop_agent_sdk/
dynamo.rs

1//! Loop Agent SDK - DynamoDB State Store Implementation
2//! 
3//! High-speed state persistence for Lambda agents.
4//! Target: <100ms context reload, <5ms fingerprint lookup.
5//! 
6//! ## Table Schema
7//! 
8//! ```text
9//! Table: loop-agent-state
10//! 
11//! Primary Key:
12//!   PK (Partition Key): String
13//!   SK (Sort Key): String
14//! 
15//! GSI1 (CardFingerprintIndex):
16//!   PK: fingerprint
17//!   Projects: user_pubkey, card_last4, linked_at
18//! 
19//! Item Types:
20//!   USER#{pubkey} | CONTEXT      → User context (preferences, cache)
21//!   USER#{pubkey} | SESSION      → Active session key
22//!   USER#{pubkey} | PENDING#{ts} → Pending capture (ring buffer)
23//!   CARD#{fp}     | META         → Card fingerprint mapping
24//!   TXN#{id}      | PROCESSED    → Processed transaction (dedup)
25//! ```
26
27use crate::state::{
28    ContextEvent, StateError, StateStore, UserContext, UserPreferences,
29    VaultState, NotificationPrefs, init_user_context,
30};
31use crate::action::SessionKey;
32use aws_sdk_dynamodb::{
33    Client,
34    types::{AttributeValue, KeyType, ProjectionType, ScalarAttributeType},
35};
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::time::{SystemTime, UNIX_EPOCH};
39use tracing::{info, warn, instrument};
40
41// ============================================================================
42// CONFIGURATION
43// ============================================================================
44
45/// DynamoDB table configuration
46#[derive(Debug, Clone)]
47pub struct DynamoConfig {
48    /// Table name
49    pub table_name: String,
50    /// GSI name for card fingerprint lookups
51    pub fingerprint_gsi: String,
52    /// TTL attribute name
53    pub ttl_attribute: String,
54    /// Maximum pending captures per user (ring buffer)
55    pub max_pending_captures: usize,
56    /// Session key TTL in seconds
57    pub session_ttl_seconds: i64,
58    /// Pending capture TTL in seconds (3 weeks default)
59    pub pending_ttl_seconds: i64,
60    /// Use DAX endpoint if available
61    pub dax_endpoint: Option<String>,
62}
63
64impl Default for DynamoConfig {
65    fn default() -> Self {
66        Self {
67            table_name: "loop-agent-state".to_string(),
68            fingerprint_gsi: "CardFingerprintIndex".to_string(),
69            ttl_attribute: "ttl".to_string(),
70            max_pending_captures: 10,
71            session_ttl_seconds: 86400,      // 24 hours
72            pending_ttl_seconds: 1814400,    // 21 days
73            dax_endpoint: None,
74        }
75    }
76}
77
78impl DynamoConfig {
79    /// Create from environment variables
80    pub fn from_env() -> Self {
81        Self {
82            table_name: std::env::var("DYNAMO_TABLE")
83                .unwrap_or_else(|_| "loop-agent-state".to_string()),
84            fingerprint_gsi: std::env::var("DYNAMO_FINGERPRINT_GSI")
85                .unwrap_or_else(|_| "CardFingerprintIndex".to_string()),
86            ttl_attribute: "ttl".to_string(),
87            max_pending_captures: std::env::var("MAX_PENDING_CAPTURES")
88                .ok()
89                .and_then(|s| s.parse().ok())
90                .unwrap_or(10),
91            session_ttl_seconds: std::env::var("SESSION_TTL_SECONDS")
92                .ok()
93                .and_then(|s| s.parse().ok())
94                .unwrap_or(86400),
95            pending_ttl_seconds: std::env::var("PENDING_TTL_SECONDS")
96                .ok()
97                .and_then(|s| s.parse().ok())
98                .unwrap_or(1814400),
99            dax_endpoint: std::env::var("DAX_ENDPOINT").ok(),
100        }
101    }
102}
103
104// ============================================================================
105// DYNAMO STATE STORE
106// ============================================================================
107
108/// DynamoDB-backed state store
109pub struct DynamoStateStore {
110    client: Client,
111    config: DynamoConfig,
112}
113
114impl DynamoStateStore {
115    /// Create new state store with default config
116    pub async fn new() -> Result<Self, StateError> {
117        let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
118        let client = Client::new(&config);
119        
120        Ok(Self {
121            client,
122            config: DynamoConfig::from_env(),
123        })
124    }
125    
126    /// Create with custom config
127    pub async fn with_config(config: DynamoConfig) -> Result<Self, StateError> {
128        let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
129        let client = Client::new(&aws_config);
130        
131        Ok(Self { client, config })
132    }
133    
134    /// Get current Unix timestamp
135    fn now() -> i64 {
136        SystemTime::now()
137            .duration_since(UNIX_EPOCH)
138            .unwrap()
139            .as_secs() as i64
140    }
141    
142    /// Generate TTL timestamp
143    fn ttl_at(&self, seconds_from_now: i64) -> i64 {
144        Self::now() + seconds_from_now
145    }
146    
147    // ========================================================================
148    // KEY HELPERS
149    // ========================================================================
150    
151    fn user_pk(pubkey: &str) -> String {
152        format!("USER#{}", pubkey)
153    }
154    
155    fn card_pk(fingerprint: &str) -> String {
156        format!("CARD#{}", fingerprint)
157    }
158    
159    fn txn_pk(txn_id: &str) -> String {
160        format!("TXN#{}", txn_id)
161    }
162    
163    fn pending_sk(timestamp: i64) -> String {
164        format!("PENDING#{:020}", timestamp)
165    }
166    
167    // ========================================================================
168    // FINGERPRINT LOOKUP (<5ms target)
169    // ========================================================================
170    
171    /// Look up user by card fingerprint
172    /// This is the "glue" that connects POS webhooks to user vaults
173    #[instrument(skip(self))]
174    pub async fn lookup_user_by_card(&self, fingerprint: &str) -> Result<Option<CardMapping>, StateError> {
175        let result = self.client
176            .query()
177            .table_name(&self.config.table_name)
178            .index_name(&self.config.fingerprint_gsi)
179            .key_condition_expression("fingerprint = :fp")
180            .expression_attribute_values(":fp", AttributeValue::S(fingerprint.to_string()))
181            .limit(1)
182            .send()
183            .await
184            .map_err(|e| StateError::StorageError(e.to_string()))?;
185        
186        if let Some(items) = result.items {
187            if let Some(item) = items.first() {
188                return Ok(Some(CardMapping::from_dynamo(item)?));
189            }
190        }
191        
192        Ok(None)
193    }
194    
195    /// Link a card fingerprint to a user
196    #[instrument(skip(self))]
197    pub async fn link_card(&self, fingerprint: &str, user_pubkey: &str, card_last4: &str) -> Result<(), StateError> {
198        let now = Self::now();
199        
200        self.client
201            .put_item()
202            .table_name(&self.config.table_name)
203            .item("pk", AttributeValue::S(Self::card_pk(fingerprint)))
204            .item("sk", AttributeValue::S("META".to_string()))
205            .item("fingerprint", AttributeValue::S(fingerprint.to_string()))
206            .item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
207            .item("card_last4", AttributeValue::S(card_last4.to_string()))
208            .item("linked_at", AttributeValue::N(now.to_string()))
209            .send()
210            .await
211            .map_err(|e| StateError::StorageError(e.to_string()))?;
212        
213        info!(fingerprint = %fingerprint, user = %user_pubkey, "Card linked");
214        Ok(())
215    }
216    
217    /// Unlink a card fingerprint
218    pub async fn unlink_card(&self, fingerprint: &str) -> Result<(), StateError> {
219        self.client
220            .delete_item()
221            .table_name(&self.config.table_name)
222            .key("pk", AttributeValue::S(Self::card_pk(fingerprint)))
223            .key("sk", AttributeValue::S("META".to_string()))
224            .send()
225            .await
226            .map_err(|e| StateError::StorageError(e.to_string()))?;
227        
228        Ok(())
229    }
230    
231    /// Get all cards linked to a user
232    pub async fn get_user_cards(&self, user_pubkey: &str) -> Result<Vec<CardMapping>, StateError> {
233        // Query the GSI with a filter on user_pubkey
234        // Note: For production, consider a separate GSI for user→cards lookup
235        let result = self.client
236            .scan()
237            .table_name(&self.config.table_name)
238            .filter_expression("user_pubkey = :user AND begins_with(pk, :prefix)")
239            .expression_attribute_values(":user", AttributeValue::S(user_pubkey.to_string()))
240            .expression_attribute_values(":prefix", AttributeValue::S("CARD#".to_string()))
241            .send()
242            .await
243            .map_err(|e| StateError::StorageError(e.to_string()))?;
244        
245        let mut cards = Vec::new();
246        if let Some(items) = result.items {
247            for item in items {
248                if let Ok(card) = CardMapping::from_dynamo(&item) {
249                    cards.push(card);
250                }
251            }
252        }
253        
254        Ok(cards)
255    }
256    
257    // ========================================================================
258    // TRANSACTION DEDUPLICATION
259    // ========================================================================
260    
261    /// Check if a transaction has already been processed
262    pub async fn is_transaction_processed(&self, txn_id: &str) -> Result<bool, StateError> {
263        let result = self.client
264            .get_item()
265            .table_name(&self.config.table_name)
266            .key("pk", AttributeValue::S(Self::txn_pk(txn_id)))
267            .key("sk", AttributeValue::S("PROCESSED".to_string()))
268            .projection_expression("pk")
269            .send()
270            .await
271            .map_err(|e| StateError::StorageError(e.to_string()))?;
272        
273        Ok(result.item.is_some())
274    }
275    
276    /// Mark a transaction as processed
277    pub async fn mark_transaction_processed(
278        &self,
279        txn_id: &str,
280        user_pubkey: &str,
281        cred_amount: u64,
282    ) -> Result<(), StateError> {
283        let now = Self::now();
284        let ttl = self.ttl_at(86400 * 30); // Keep for 30 days
285        
286        self.client
287            .put_item()
288            .table_name(&self.config.table_name)
289            .item("pk", AttributeValue::S(Self::txn_pk(txn_id)))
290            .item("sk", AttributeValue::S("PROCESSED".to_string()))
291            .item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
292            .item("cred_amount", AttributeValue::N(cred_amount.to_string()))
293            .item("processed_at", AttributeValue::N(now.to_string()))
294            .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
295            .send()
296            .await
297            .map_err(|e| StateError::StorageError(e.to_string()))?;
298        
299        Ok(())
300    }
301    
302    // ========================================================================
303    // SESSION KEY MANAGEMENT
304    // ========================================================================
305    
306    /// Store a session key with TTL
307    pub async fn store_session_key(&self, user_pubkey: &str, session: &SessionKeyData) -> Result<(), StateError> {
308        let ttl = self.ttl_at(self.config.session_ttl_seconds);
309        let session_json = serde_json::to_string(session)
310            .map_err(|e| StateError::SerializationError(e.to_string()))?;
311        
312        self.client
313            .put_item()
314            .table_name(&self.config.table_name)
315            .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
316            .item("sk", AttributeValue::S("SESSION".to_string()))
317            .item("session_data", AttributeValue::S(session_json))
318            .item("expires_at", AttributeValue::N(session.expires_at.to_string()))
319            .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
320            .send()
321            .await
322            .map_err(|e| StateError::StorageError(e.to_string()))?;
323        
324        info!(user = %user_pubkey, expires_at = session.expires_at, "Session key stored");
325        Ok(())
326    }
327    
328    /// Get active session key for user
329    pub async fn get_session_key(&self, user_pubkey: &str) -> Result<Option<SessionKeyData>, StateError> {
330        let result = self.client
331            .get_item()
332            .table_name(&self.config.table_name)
333            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
334            .key("sk", AttributeValue::S("SESSION".to_string()))
335            .send()
336            .await
337            .map_err(|e| StateError::StorageError(e.to_string()))?;
338        
339        if let Some(item) = result.item {
340            if let Some(AttributeValue::S(session_json)) = item.get("session_data") {
341                let session: SessionKeyData = serde_json::from_str(session_json)
342                    .map_err(|e| StateError::SerializationError(e.to_string()))?;
343                
344                // Check if expired
345                if session.expires_at > Self::now() {
346                    return Ok(Some(session));
347                } else {
348                    // Expired - will be cleaned up by TTL
349                    return Ok(None);
350                }
351            }
352        }
353        
354        Ok(None)
355    }
356    
357    /// Invalidate session key
358    pub async fn invalidate_session(&self, user_pubkey: &str) -> Result<(), StateError> {
359        self.client
360            .delete_item()
361            .table_name(&self.config.table_name)
362            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
363            .key("sk", AttributeValue::S("SESSION".to_string()))
364            .send()
365            .await
366            .map_err(|e| StateError::StorageError(e.to_string()))?;
367        
368        Ok(())
369    }
370    
371    // ========================================================================
372    // PENDING CAPTURES (Ring Buffer)
373    // ========================================================================
374    
375    /// Add a pending capture (limited to max_pending_captures per user)
376    pub async fn add_pending_capture(&self, user_pubkey: &str, capture: &PendingCapture) -> Result<(), StateError> {
377        let now = Self::now();
378        let ttl = self.ttl_at(self.config.pending_ttl_seconds);
379        let capture_json = serde_json::to_string(capture)
380            .map_err(|e| StateError::SerializationError(e.to_string()))?;
381        
382        // Add the new pending capture
383        self.client
384            .put_item()
385            .table_name(&self.config.table_name)
386            .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
387            .item("sk", AttributeValue::S(Self::pending_sk(now)))
388            .item("capture_data", AttributeValue::S(capture_json))
389            .item("merchant_id", AttributeValue::S(capture.merchant_id.clone()))
390            .item("amount_cents", AttributeValue::N(capture.amount_cents.to_string()))
391            .item("created_at", AttributeValue::N(now.to_string()))
392            .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
393            .send()
394            .await
395            .map_err(|e| StateError::StorageError(e.to_string()))?;
396        
397        // Enforce ring buffer limit - delete oldest if over limit
398        self.enforce_pending_limit(user_pubkey).await?;
399        
400        info!(user = %user_pubkey, merchant = %capture.merchant_id, "Pending capture added");
401        Ok(())
402    }
403    
404    /// Get all pending captures for user
405    pub async fn get_pending_captures(&self, user_pubkey: &str) -> Result<Vec<PendingCapture>, StateError> {
406        let result = self.client
407            .query()
408            .table_name(&self.config.table_name)
409            .key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
410            .expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
411            .expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
412            .scan_index_forward(false) // Most recent first
413            .send()
414            .await
415            .map_err(|e| StateError::StorageError(e.to_string()))?;
416        
417        let mut captures = Vec::new();
418        if let Some(items) = result.items {
419            for item in items {
420                if let Some(AttributeValue::S(capture_json)) = item.get("capture_data") {
421                    if let Ok(capture) = serde_json::from_str::<PendingCapture>(capture_json) {
422                        captures.push(capture);
423                    }
424                }
425            }
426        }
427        
428        Ok(captures)
429    }
430    
431    /// Remove a pending capture (after it's been processed)
432    pub async fn remove_pending_capture(&self, user_pubkey: &str, timestamp: i64) -> Result<(), StateError> {
433        self.client
434            .delete_item()
435            .table_name(&self.config.table_name)
436            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
437            .key("sk", AttributeValue::S(Self::pending_sk(timestamp)))
438            .send()
439            .await
440            .map_err(|e| StateError::StorageError(e.to_string()))?;
441        
442        Ok(())
443    }
444    
445    /// Enforce the ring buffer limit
446    async fn enforce_pending_limit(&self, user_pubkey: &str) -> Result<(), StateError> {
447        let result = self.client
448            .query()
449            .table_name(&self.config.table_name)
450            .key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
451            .expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
452            .expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
453            .scan_index_forward(true) // Oldest first
454            .projection_expression("sk")
455            .send()
456            .await
457            .map_err(|e| StateError::StorageError(e.to_string()))?;
458        
459        if let Some(items) = result.items {
460            if items.len() > self.config.max_pending_captures {
461                // Delete oldest items to get back to limit
462                let to_delete = items.len() - self.config.max_pending_captures;
463                for item in items.iter().take(to_delete) {
464                    if let Some(AttributeValue::S(sk)) = item.get("sk") {
465                        self.client
466                            .delete_item()
467                            .table_name(&self.config.table_name)
468                            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
469                            .key("sk", AttributeValue::S(sk.clone()))
470                            .send()
471                            .await
472                            .map_err(|e| StateError::StorageError(e.to_string()))?;
473                    }
474                }
475                info!(user = %user_pubkey, deleted = to_delete, "Enforced pending capture limit");
476            }
477        }
478        
479        Ok(())
480    }
481    
482    // ========================================================================
483    // MERCHANT LOCATION CACHE
484    // ========================================================================
485    
486    /// Cache current merchant for user (geofence)
487    pub async fn cache_current_merchant(&self, user_pubkey: &str, merchant_id: &str) -> Result<(), StateError> {
488        let now = Self::now();
489        let ttl = self.ttl_at(3600); // 1 hour TTL
490        
491        self.client
492            .put_item()
493            .table_name(&self.config.table_name)
494            .item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
495            .item("sk", AttributeValue::S("LOCATION".to_string()))
496            .item("merchant_id", AttributeValue::S(merchant_id.to_string()))
497            .item("entered_at", AttributeValue::N(now.to_string()))
498            .item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
499            .send()
500            .await
501            .map_err(|e| StateError::StorageError(e.to_string()))?;
502        
503        Ok(())
504    }
505    
506    /// Get cached merchant location
507    pub async fn get_current_merchant(&self, user_pubkey: &str) -> Result<Option<String>, StateError> {
508        let result = self.client
509            .get_item()
510            .table_name(&self.config.table_name)
511            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
512            .key("sk", AttributeValue::S("LOCATION".to_string()))
513            .send()
514            .await
515            .map_err(|e| StateError::StorageError(e.to_string()))?;
516        
517        if let Some(item) = result.item {
518            if let Some(AttributeValue::S(merchant_id)) = item.get("merchant_id") {
519                return Ok(Some(merchant_id.clone()));
520            }
521        }
522        
523        Ok(None)
524    }
525    
526    /// Clear cached merchant location
527    pub async fn clear_current_merchant(&self, user_pubkey: &str) -> Result<(), StateError> {
528        self.client
529            .delete_item()
530            .table_name(&self.config.table_name)
531            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
532            .key("sk", AttributeValue::S("LOCATION".to_string()))
533            .send()
534            .await
535            .map_err(|e| StateError::StorageError(e.to_string()))?;
536        
537        Ok(())
538    }
539}
540
541// ============================================================================
542// STATE STORE TRAIT IMPLEMENTATION
543// ============================================================================
544
545impl StateStore for DynamoStateStore {
546    fn context_fetch(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
547        // This is sync trait but we need async - use block_on for now
548        // In production, refactor trait to be async
549        tokio::runtime::Handle::current().block_on(async {
550            self.context_fetch_async(user_pubkey).await
551        })
552    }
553    
554    fn context_push(&self, context: UserContext) -> Result<(), StateError> {
555        tokio::runtime::Handle::current().block_on(async {
556            self.context_push_async(context).await
557        })
558    }
559    
560    fn context_batch_fetch(&self, user_pubkeys: &[String]) -> Result<Vec<UserContext>, StateError> {
561        tokio::runtime::Handle::current().block_on(async {
562            let mut contexts = Vec::new();
563            for pubkey in user_pubkeys {
564                if let Ok(ctx) = self.context_fetch_async(pubkey).await {
565                    contexts.push(ctx);
566                }
567            }
568            Ok(contexts)
569        })
570    }
571    
572    fn context_patch(
573        &self,
574        user_pubkey: &str,
575        path: &str,
576        value: serde_json::Value,
577    ) -> Result<(), StateError> {
578        tokio::runtime::Handle::current().block_on(async {
579            self.context_patch_async(user_pubkey, path, value).await
580        })
581    }
582    
583    fn push_event(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
584        tokio::runtime::Handle::current().block_on(async {
585            self.push_event_async(user_pubkey, event).await
586        })
587    }
588    
589    fn get_or_init(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
590        tokio::runtime::Handle::current().block_on(async {
591            match self.context_fetch_async(user_pubkey).await {
592                Ok(ctx) => Ok(ctx),
593                Err(StateError::NotFound) => {
594                    let ctx = init_user_context(user_pubkey);
595                    self.context_push_async(ctx.clone()).await?;
596                    Ok(ctx)
597                }
598                Err(e) => Err(e),
599            }
600        })
601    }
602}
603
604impl DynamoStateStore {
605    /// Async context fetch
606    #[instrument(skip(self))]
607    pub async fn context_fetch_async(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
608        let result = self.client
609            .get_item()
610            .table_name(&self.config.table_name)
611            .key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
612            .key("sk", AttributeValue::S("CONTEXT".to_string()))
613            .consistent_read(true) // Strongly consistent for accuracy
614            .send()
615            .await
616            .map_err(|e| StateError::StorageError(e.to_string()))?;
617        
618        if let Some(item) = result.item {
619            if let Some(AttributeValue::S(context_json)) = item.get("context_data") {
620                let context: UserContext = serde_json::from_str(context_json)
621                    .map_err(|e| StateError::SerializationError(e.to_string()))?;
622                return Ok(context);
623            }
624        }
625        
626        Err(StateError::NotFound)
627    }
628    
629    /// Async context push with optimistic locking
630    #[instrument(skip(self, context))]
631    pub async fn context_push_async(&self, context: UserContext) -> Result<(), StateError> {
632        let context_json = serde_json::to_string(&context)
633            .map_err(|e| StateError::SerializationError(e.to_string()))?;
634        
635        let mut put = self.client
636            .put_item()
637            .table_name(&self.config.table_name)
638            .item("pk", AttributeValue::S(Self::user_pk(&context.pubkey)))
639            .item("sk", AttributeValue::S("CONTEXT".to_string()))
640            .item("context_data", AttributeValue::S(context_json))
641            .item("version", AttributeValue::N(context.version.to_string()))
642            .item("updated_at", AttributeValue::N(context.updated_at.to_string()));
643        
644        // Optimistic locking - only update if version matches
645        if context.version > 1 {
646            put = put
647                .condition_expression("version = :expected_version OR attribute_not_exists(version)")
648                .expression_attribute_values(
649                    ":expected_version",
650                    AttributeValue::N((context.version - 1).to_string()),
651                );
652        }
653        
654        put.send()
655            .await
656            .map_err(|e| {
657                if e.to_string().contains("ConditionalCheckFailed") {
658                    StateError::VersionConflict {
659                        expected: context.version - 1,
660                        actual: context.version,
661                    }
662                } else {
663                    StateError::StorageError(e.to_string())
664                }
665            })?;
666        
667        Ok(())
668    }
669    
670    /// Async context patch
671    pub async fn context_patch_async(
672        &self,
673        user_pubkey: &str,
674        path: &str,
675        value: serde_json::Value,
676    ) -> Result<(), StateError> {
677        // Fetch, modify, push pattern
678        // For production, consider using UpdateItem with nested paths
679        let mut ctx = self.context_fetch_async(user_pubkey).await?;
680        
681        // Simple path handling - extend as needed
682        match path {
683            "preferences.auto_stake" => {
684                if let Some(v) = value.as_bool() {
685                    ctx.preferences.auto_stake = v;
686                }
687            }
688            "preferences.default_duration_days" => {
689                if let Some(v) = value.as_u64() {
690                    ctx.preferences.default_duration_days = v as u16;
691                }
692            }
693            "preferences.auto_compound" => {
694                if let Some(v) = value.as_bool() {
695                    ctx.preferences.auto_compound = v;
696                }
697            }
698            _ => {
699                warn!(path = %path, "Unknown context path for patch");
700            }
701        }
702        
703        ctx.version += 1;
704        ctx.updated_at = Self::now();
705        self.context_push_async(ctx).await
706    }
707    
708    /// Async push event
709    pub async fn push_event_async(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
710        let mut ctx = match self.context_fetch_async(user_pubkey).await {
711            Ok(c) => c,
712            Err(StateError::NotFound) => init_user_context(user_pubkey),
713            Err(e) => return Err(e),
714        };
715        
716        // Add event to front, keep last 10
717        ctx.recent_events.insert(0, event);
718        ctx.recent_events.truncate(10);
719        ctx.version += 1;
720        ctx.updated_at = Self::now();
721        
722        self.context_push_async(ctx).await
723    }
724}
725
726// ============================================================================
727// DATA TYPES
728// ============================================================================
729
730/// Card fingerprint to user mapping
731#[derive(Debug, Clone, Serialize, Deserialize)]
732pub struct CardMapping {
733    pub fingerprint: String,
734    pub user_pubkey: String,
735    pub card_last4: String,
736    pub linked_at: i64,
737}
738
739impl CardMapping {
740    fn from_dynamo(item: &HashMap<String, AttributeValue>) -> Result<Self, StateError> {
741        Ok(Self {
742            fingerprint: item.get("fingerprint")
743                .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
744                .ok_or_else(|| StateError::SerializationError("Missing fingerprint".into()))?,
745            user_pubkey: item.get("user_pubkey")
746                .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
747                .ok_or_else(|| StateError::SerializationError("Missing user_pubkey".into()))?,
748            card_last4: item.get("card_last4")
749                .and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
750                .unwrap_or_default(),
751            linked_at: item.get("linked_at")
752                .and_then(|v| if let AttributeValue::N(n) = v { n.parse().ok() } else { None })
753                .unwrap_or(0),
754        })
755    }
756}
757
758/// Pending capture data
759#[derive(Debug, Clone, Serialize, Deserialize)]
760pub struct PendingCapture {
761    pub merchant_id: String,
762    pub merchant_name: Option<String>,
763    pub amount_cents: u64,
764    pub card_last4: Option<String>,
765    pub proof_data: Option<String>,
766    pub proof_type: Option<String>,
767    pub location: Option<PendingLocation>,
768    pub created_at: i64,
769    pub expires_at: i64,
770}
771
772#[derive(Debug, Clone, Serialize, Deserialize)]
773pub struct PendingLocation {
774    pub latitude: f64,
775    pub longitude: f64,
776    pub accuracy_m: f32,
777}
778
779/// Session key data for storage
780#[derive(Debug, Clone, Serialize, Deserialize)]
781pub struct SessionKeyData {
782    pub key_pubkey: String,
783    pub vault_pubkey: String,
784    pub scopes: Vec<String>,
785    pub expires_at: i64,
786    pub created_at: i64,
787    pub max_actions: u32,
788    pub actions_used: u32,
789}
790
791// ============================================================================
792// TABLE CREATION (for setup scripts)
793// ============================================================================
794
795/// Create the DynamoDB table with GSI
796/// Run this once during infrastructure setup
797pub async fn create_table(client: &Client, config: &DynamoConfig) -> Result<(), Box<dyn std::error::Error>> {
798    client
799        .create_table()
800        .table_name(&config.table_name)
801        // Primary key
802        .attribute_definitions(
803            aws_sdk_dynamodb::types::AttributeDefinition::builder()
804                .attribute_name("pk")
805                .attribute_type(ScalarAttributeType::S)
806                .build()?
807        )
808        .attribute_definitions(
809            aws_sdk_dynamodb::types::AttributeDefinition::builder()
810                .attribute_name("sk")
811                .attribute_type(ScalarAttributeType::S)
812                .build()?
813        )
814        .attribute_definitions(
815            aws_sdk_dynamodb::types::AttributeDefinition::builder()
816                .attribute_name("fingerprint")
817                .attribute_type(ScalarAttributeType::S)
818                .build()?
819        )
820        .key_schema(
821            aws_sdk_dynamodb::types::KeySchemaElement::builder()
822                .attribute_name("pk")
823                .key_type(KeyType::Hash)
824                .build()?
825        )
826        .key_schema(
827            aws_sdk_dynamodb::types::KeySchemaElement::builder()
828                .attribute_name("sk")
829                .key_type(KeyType::Range)
830                .build()?
831        )
832        // GSI for card fingerprint lookups
833        .global_secondary_indexes(
834            aws_sdk_dynamodb::types::GlobalSecondaryIndex::builder()
835                .index_name(&config.fingerprint_gsi)
836                .key_schema(
837                    aws_sdk_dynamodb::types::KeySchemaElement::builder()
838                        .attribute_name("fingerprint")
839                        .key_type(KeyType::Hash)
840                        .build()?
841                )
842                .projection(
843                    aws_sdk_dynamodb::types::Projection::builder()
844                        .projection_type(ProjectionType::Include)
845                        .non_key_attributes("user_pubkey")
846                        .non_key_attributes("card_last4")
847                        .non_key_attributes("linked_at")
848                        .build()
849                )
850                .build()?
851        )
852        // On-demand capacity
853        .billing_mode(aws_sdk_dynamodb::types::BillingMode::PayPerRequest)
854        .send()
855        .await?;
856    
857    // Enable TTL
858    client
859        .update_time_to_live()
860        .table_name(&config.table_name)
861        .time_to_live_specification(
862            aws_sdk_dynamodb::types::TimeToLiveSpecification::builder()
863                .enabled(true)
864                .attribute_name(&config.ttl_attribute)
865                .build()?
866        )
867        .send()
868        .await?;
869    
870    info!(table = %config.table_name, "DynamoDB table created with GSI and TTL");
871    Ok(())
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877    
878    #[test]
879    fn test_key_formatting() {
880        assert_eq!(DynamoStateStore::user_pk("abc123"), "USER#abc123");
881        assert_eq!(DynamoStateStore::card_pk("fp_xyz"), "CARD#fp_xyz");
882        assert_eq!(DynamoStateStore::txn_pk("tx_123"), "TXN#tx_123");
883        assert_eq!(DynamoStateStore::pending_sk(1234567890), "PENDING#00000001234567890");
884    }
885    
886    #[test]
887    fn pending_sk_sorts_correctly() {
888        let sk1 = DynamoStateStore::pending_sk(1000);
889        let sk2 = DynamoStateStore::pending_sk(2000);
890        let sk3 = DynamoStateStore::pending_sk(999999999999);
891        
892        // Lexicographic sort should match numeric sort
893        assert!(sk1 < sk2);
894        assert!(sk2 < sk3);
895    }
896}