use crate::state::{
ContextEvent, StateError, StateStore, UserContext, UserPreferences,
VaultState, NotificationPrefs, init_user_context,
};
use crate::action::SessionKey;
use aws_sdk_dynamodb::{
Client,
types::{AttributeValue, KeyType, ProjectionType, ScalarAttributeType},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};
use tracing::{info, warn, instrument};
#[derive(Debug, Clone)]
pub struct DynamoConfig {
pub table_name: String,
pub fingerprint_gsi: String,
pub ttl_attribute: String,
pub max_pending_captures: usize,
pub session_ttl_seconds: i64,
pub pending_ttl_seconds: i64,
pub dax_endpoint: Option<String>,
}
impl Default for DynamoConfig {
fn default() -> Self {
Self {
table_name: "loop-agent-state".to_string(),
fingerprint_gsi: "CardFingerprintIndex".to_string(),
ttl_attribute: "ttl".to_string(),
max_pending_captures: 10,
session_ttl_seconds: 86400, pending_ttl_seconds: 1814400, dax_endpoint: None,
}
}
}
impl DynamoConfig {
pub fn from_env() -> Self {
Self {
table_name: std::env::var("DYNAMO_TABLE")
.unwrap_or_else(|_| "loop-agent-state".to_string()),
fingerprint_gsi: std::env::var("DYNAMO_FINGERPRINT_GSI")
.unwrap_or_else(|_| "CardFingerprintIndex".to_string()),
ttl_attribute: "ttl".to_string(),
max_pending_captures: std::env::var("MAX_PENDING_CAPTURES")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(10),
session_ttl_seconds: std::env::var("SESSION_TTL_SECONDS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(86400),
pending_ttl_seconds: std::env::var("PENDING_TTL_SECONDS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1814400),
dax_endpoint: std::env::var("DAX_ENDPOINT").ok(),
}
}
}
pub struct DynamoStateStore {
client: Client,
config: DynamoConfig,
}
impl DynamoStateStore {
pub async fn new() -> Result<Self, StateError> {
let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = Client::new(&config);
Ok(Self {
client,
config: DynamoConfig::from_env(),
})
}
pub async fn with_config(config: DynamoConfig) -> Result<Self, StateError> {
let aws_config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
let client = Client::new(&aws_config);
Ok(Self { client, config })
}
fn now() -> i64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as i64
}
fn ttl_at(&self, seconds_from_now: i64) -> i64 {
Self::now() + seconds_from_now
}
fn user_pk(pubkey: &str) -> String {
format!("USER#{}", pubkey)
}
fn card_pk(fingerprint: &str) -> String {
format!("CARD#{}", fingerprint)
}
fn txn_pk(txn_id: &str) -> String {
format!("TXN#{}", txn_id)
}
fn pending_sk(timestamp: i64) -> String {
format!("PENDING#{:020}", timestamp)
}
#[instrument(skip(self))]
pub async fn lookup_user_by_card(&self, fingerprint: &str) -> Result<Option<CardMapping>, StateError> {
let result = self.client
.query()
.table_name(&self.config.table_name)
.index_name(&self.config.fingerprint_gsi)
.key_condition_expression("fingerprint = :fp")
.expression_attribute_values(":fp", AttributeValue::S(fingerprint.to_string()))
.limit(1)
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
if let Some(items) = result.items {
if let Some(item) = items.first() {
return Ok(Some(CardMapping::from_dynamo(item)?));
}
}
Ok(None)
}
#[instrument(skip(self))]
pub async fn link_card(&self, fingerprint: &str, user_pubkey: &str, card_last4: &str) -> Result<(), StateError> {
let now = Self::now();
self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::card_pk(fingerprint)))
.item("sk", AttributeValue::S("META".to_string()))
.item("fingerprint", AttributeValue::S(fingerprint.to_string()))
.item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
.item("card_last4", AttributeValue::S(card_last4.to_string()))
.item("linked_at", AttributeValue::N(now.to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
info!(fingerprint = %fingerprint, user = %user_pubkey, "Card linked");
Ok(())
}
pub async fn unlink_card(&self, fingerprint: &str) -> Result<(), StateError> {
self.client
.delete_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::card_pk(fingerprint)))
.key("sk", AttributeValue::S("META".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
pub async fn get_user_cards(&self, user_pubkey: &str) -> Result<Vec<CardMapping>, StateError> {
let result = self.client
.scan()
.table_name(&self.config.table_name)
.filter_expression("user_pubkey = :user AND begins_with(pk, :prefix)")
.expression_attribute_values(":user", AttributeValue::S(user_pubkey.to_string()))
.expression_attribute_values(":prefix", AttributeValue::S("CARD#".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
let mut cards = Vec::new();
if let Some(items) = result.items {
for item in items {
if let Ok(card) = CardMapping::from_dynamo(&item) {
cards.push(card);
}
}
}
Ok(cards)
}
pub async fn is_transaction_processed(&self, txn_id: &str) -> Result<bool, StateError> {
let result = self.client
.get_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::txn_pk(txn_id)))
.key("sk", AttributeValue::S("PROCESSED".to_string()))
.projection_expression("pk")
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(result.item.is_some())
}
pub async fn mark_transaction_processed(
&self,
txn_id: &str,
user_pubkey: &str,
cred_amount: u64,
) -> Result<(), StateError> {
let now = Self::now();
let ttl = self.ttl_at(86400 * 30);
self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::txn_pk(txn_id)))
.item("sk", AttributeValue::S("PROCESSED".to_string()))
.item("user_pubkey", AttributeValue::S(user_pubkey.to_string()))
.item("cred_amount", AttributeValue::N(cred_amount.to_string()))
.item("processed_at", AttributeValue::N(now.to_string()))
.item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
pub async fn store_session_key(&self, user_pubkey: &str, session: &SessionKeyData) -> Result<(), StateError> {
let ttl = self.ttl_at(self.config.session_ttl_seconds);
let session_json = serde_json::to_string(session)
.map_err(|e| StateError::SerializationError(e.to_string()))?;
self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.item("sk", AttributeValue::S("SESSION".to_string()))
.item("session_data", AttributeValue::S(session_json))
.item("expires_at", AttributeValue::N(session.expires_at.to_string()))
.item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
info!(user = %user_pubkey, expires_at = session.expires_at, "Session key stored");
Ok(())
}
pub async fn get_session_key(&self, user_pubkey: &str) -> Result<Option<SessionKeyData>, StateError> {
let result = self.client
.get_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S("SESSION".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
if let Some(item) = result.item {
if let Some(AttributeValue::S(session_json)) = item.get("session_data") {
let session: SessionKeyData = serde_json::from_str(session_json)
.map_err(|e| StateError::SerializationError(e.to_string()))?;
if session.expires_at > Self::now() {
return Ok(Some(session));
} else {
return Ok(None);
}
}
}
Ok(None)
}
pub async fn invalidate_session(&self, user_pubkey: &str) -> Result<(), StateError> {
self.client
.delete_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S("SESSION".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
pub async fn add_pending_capture(&self, user_pubkey: &str, capture: &PendingCapture) -> Result<(), StateError> {
let now = Self::now();
let ttl = self.ttl_at(self.config.pending_ttl_seconds);
let capture_json = serde_json::to_string(capture)
.map_err(|e| StateError::SerializationError(e.to_string()))?;
self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.item("sk", AttributeValue::S(Self::pending_sk(now)))
.item("capture_data", AttributeValue::S(capture_json))
.item("merchant_id", AttributeValue::S(capture.merchant_id.clone()))
.item("amount_cents", AttributeValue::N(capture.amount_cents.to_string()))
.item("created_at", AttributeValue::N(now.to_string()))
.item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
self.enforce_pending_limit(user_pubkey).await?;
info!(user = %user_pubkey, merchant = %capture.merchant_id, "Pending capture added");
Ok(())
}
pub async fn get_pending_captures(&self, user_pubkey: &str) -> Result<Vec<PendingCapture>, StateError> {
let result = self.client
.query()
.table_name(&self.config.table_name)
.key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
.expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
.scan_index_forward(false) .send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
let mut captures = Vec::new();
if let Some(items) = result.items {
for item in items {
if let Some(AttributeValue::S(capture_json)) = item.get("capture_data") {
if let Ok(capture) = serde_json::from_str::<PendingCapture>(capture_json) {
captures.push(capture);
}
}
}
}
Ok(captures)
}
pub async fn remove_pending_capture(&self, user_pubkey: &str, timestamp: i64) -> Result<(), StateError> {
self.client
.delete_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S(Self::pending_sk(timestamp)))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
async fn enforce_pending_limit(&self, user_pubkey: &str) -> Result<(), StateError> {
let result = self.client
.query()
.table_name(&self.config.table_name)
.key_condition_expression("pk = :pk AND begins_with(sk, :prefix)")
.expression_attribute_values(":pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.expression_attribute_values(":prefix", AttributeValue::S("PENDING#".to_string()))
.scan_index_forward(true) .projection_expression("sk")
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
if let Some(items) = result.items {
if items.len() > self.config.max_pending_captures {
let to_delete = items.len() - self.config.max_pending_captures;
for item in items.iter().take(to_delete) {
if let Some(AttributeValue::S(sk)) = item.get("sk") {
self.client
.delete_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S(sk.clone()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
}
}
info!(user = %user_pubkey, deleted = to_delete, "Enforced pending capture limit");
}
}
Ok(())
}
pub async fn cache_current_merchant(&self, user_pubkey: &str, merchant_id: &str) -> Result<(), StateError> {
let now = Self::now();
let ttl = self.ttl_at(3600);
self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.item("sk", AttributeValue::S("LOCATION".to_string()))
.item("merchant_id", AttributeValue::S(merchant_id.to_string()))
.item("entered_at", AttributeValue::N(now.to_string()))
.item(&self.config.ttl_attribute, AttributeValue::N(ttl.to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
pub async fn get_current_merchant(&self, user_pubkey: &str) -> Result<Option<String>, StateError> {
let result = self.client
.get_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S("LOCATION".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
if let Some(item) = result.item {
if let Some(AttributeValue::S(merchant_id)) = item.get("merchant_id") {
return Ok(Some(merchant_id.clone()));
}
}
Ok(None)
}
pub async fn clear_current_merchant(&self, user_pubkey: &str) -> Result<(), StateError> {
self.client
.delete_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S("LOCATION".to_string()))
.send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
Ok(())
}
}
impl StateStore for DynamoStateStore {
fn context_fetch(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
tokio::runtime::Handle::current().block_on(async {
self.context_fetch_async(user_pubkey).await
})
}
fn context_push(&self, context: UserContext) -> Result<(), StateError> {
tokio::runtime::Handle::current().block_on(async {
self.context_push_async(context).await
})
}
fn context_batch_fetch(&self, user_pubkeys: &[String]) -> Result<Vec<UserContext>, StateError> {
tokio::runtime::Handle::current().block_on(async {
let mut contexts = Vec::new();
for pubkey in user_pubkeys {
if let Ok(ctx) = self.context_fetch_async(pubkey).await {
contexts.push(ctx);
}
}
Ok(contexts)
})
}
fn context_patch(
&self,
user_pubkey: &str,
path: &str,
value: serde_json::Value,
) -> Result<(), StateError> {
tokio::runtime::Handle::current().block_on(async {
self.context_patch_async(user_pubkey, path, value).await
})
}
fn push_event(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
tokio::runtime::Handle::current().block_on(async {
self.push_event_async(user_pubkey, event).await
})
}
fn get_or_init(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
tokio::runtime::Handle::current().block_on(async {
match self.context_fetch_async(user_pubkey).await {
Ok(ctx) => Ok(ctx),
Err(StateError::NotFound) => {
let ctx = init_user_context(user_pubkey);
self.context_push_async(ctx.clone()).await?;
Ok(ctx)
}
Err(e) => Err(e),
}
})
}
}
impl DynamoStateStore {
#[instrument(skip(self))]
pub async fn context_fetch_async(&self, user_pubkey: &str) -> Result<UserContext, StateError> {
let result = self.client
.get_item()
.table_name(&self.config.table_name)
.key("pk", AttributeValue::S(Self::user_pk(user_pubkey)))
.key("sk", AttributeValue::S("CONTEXT".to_string()))
.consistent_read(true) .send()
.await
.map_err(|e| StateError::StorageError(e.to_string()))?;
if let Some(item) = result.item {
if let Some(AttributeValue::S(context_json)) = item.get("context_data") {
let context: UserContext = serde_json::from_str(context_json)
.map_err(|e| StateError::SerializationError(e.to_string()))?;
return Ok(context);
}
}
Err(StateError::NotFound)
}
#[instrument(skip(self, context))]
pub async fn context_push_async(&self, context: UserContext) -> Result<(), StateError> {
let context_json = serde_json::to_string(&context)
.map_err(|e| StateError::SerializationError(e.to_string()))?;
let mut put = self.client
.put_item()
.table_name(&self.config.table_name)
.item("pk", AttributeValue::S(Self::user_pk(&context.pubkey)))
.item("sk", AttributeValue::S("CONTEXT".to_string()))
.item("context_data", AttributeValue::S(context_json))
.item("version", AttributeValue::N(context.version.to_string()))
.item("updated_at", AttributeValue::N(context.updated_at.to_string()));
if context.version > 1 {
put = put
.condition_expression("version = :expected_version OR attribute_not_exists(version)")
.expression_attribute_values(
":expected_version",
AttributeValue::N((context.version - 1).to_string()),
);
}
put.send()
.await
.map_err(|e| {
if e.to_string().contains("ConditionalCheckFailed") {
StateError::VersionConflict {
expected: context.version - 1,
actual: context.version,
}
} else {
StateError::StorageError(e.to_string())
}
})?;
Ok(())
}
pub async fn context_patch_async(
&self,
user_pubkey: &str,
path: &str,
value: serde_json::Value,
) -> Result<(), StateError> {
let mut ctx = self.context_fetch_async(user_pubkey).await?;
match path {
"preferences.auto_stake" => {
if let Some(v) = value.as_bool() {
ctx.preferences.auto_stake = v;
}
}
"preferences.default_duration_days" => {
if let Some(v) = value.as_u64() {
ctx.preferences.default_duration_days = v as u16;
}
}
"preferences.auto_compound" => {
if let Some(v) = value.as_bool() {
ctx.preferences.auto_compound = v;
}
}
_ => {
warn!(path = %path, "Unknown context path for patch");
}
}
ctx.version += 1;
ctx.updated_at = Self::now();
self.context_push_async(ctx).await
}
pub async fn push_event_async(&self, user_pubkey: &str, event: ContextEvent) -> Result<(), StateError> {
let mut ctx = match self.context_fetch_async(user_pubkey).await {
Ok(c) => c,
Err(StateError::NotFound) => init_user_context(user_pubkey),
Err(e) => return Err(e),
};
ctx.recent_events.insert(0, event);
ctx.recent_events.truncate(10);
ctx.version += 1;
ctx.updated_at = Self::now();
self.context_push_async(ctx).await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CardMapping {
pub fingerprint: String,
pub user_pubkey: String,
pub card_last4: String,
pub linked_at: i64,
}
impl CardMapping {
fn from_dynamo(item: &HashMap<String, AttributeValue>) -> Result<Self, StateError> {
Ok(Self {
fingerprint: item.get("fingerprint")
.and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
.ok_or_else(|| StateError::SerializationError("Missing fingerprint".into()))?,
user_pubkey: item.get("user_pubkey")
.and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
.ok_or_else(|| StateError::SerializationError("Missing user_pubkey".into()))?,
card_last4: item.get("card_last4")
.and_then(|v| if let AttributeValue::S(s) = v { Some(s.clone()) } else { None })
.unwrap_or_default(),
linked_at: item.get("linked_at")
.and_then(|v| if let AttributeValue::N(n) = v { n.parse().ok() } else { None })
.unwrap_or(0),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingCapture {
pub merchant_id: String,
pub merchant_name: Option<String>,
pub amount_cents: u64,
pub card_last4: Option<String>,
pub proof_data: Option<String>,
pub proof_type: Option<String>,
pub location: Option<PendingLocation>,
pub created_at: i64,
pub expires_at: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingLocation {
pub latitude: f64,
pub longitude: f64,
pub accuracy_m: f32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionKeyData {
pub key_pubkey: String,
pub vault_pubkey: String,
pub scopes: Vec<String>,
pub expires_at: i64,
pub created_at: i64,
pub max_actions: u32,
pub actions_used: u32,
}
pub async fn create_table(client: &Client, config: &DynamoConfig) -> Result<(), Box<dyn std::error::Error>> {
client
.create_table()
.table_name(&config.table_name)
.attribute_definitions(
aws_sdk_dynamodb::types::AttributeDefinition::builder()
.attribute_name("pk")
.attribute_type(ScalarAttributeType::S)
.build()?
)
.attribute_definitions(
aws_sdk_dynamodb::types::AttributeDefinition::builder()
.attribute_name("sk")
.attribute_type(ScalarAttributeType::S)
.build()?
)
.attribute_definitions(
aws_sdk_dynamodb::types::AttributeDefinition::builder()
.attribute_name("fingerprint")
.attribute_type(ScalarAttributeType::S)
.build()?
)
.key_schema(
aws_sdk_dynamodb::types::KeySchemaElement::builder()
.attribute_name("pk")
.key_type(KeyType::Hash)
.build()?
)
.key_schema(
aws_sdk_dynamodb::types::KeySchemaElement::builder()
.attribute_name("sk")
.key_type(KeyType::Range)
.build()?
)
.global_secondary_indexes(
aws_sdk_dynamodb::types::GlobalSecondaryIndex::builder()
.index_name(&config.fingerprint_gsi)
.key_schema(
aws_sdk_dynamodb::types::KeySchemaElement::builder()
.attribute_name("fingerprint")
.key_type(KeyType::Hash)
.build()?
)
.projection(
aws_sdk_dynamodb::types::Projection::builder()
.projection_type(ProjectionType::Include)
.non_key_attributes("user_pubkey")
.non_key_attributes("card_last4")
.non_key_attributes("linked_at")
.build()
)
.build()?
)
.billing_mode(aws_sdk_dynamodb::types::BillingMode::PayPerRequest)
.send()
.await?;
client
.update_time_to_live()
.table_name(&config.table_name)
.time_to_live_specification(
aws_sdk_dynamodb::types::TimeToLiveSpecification::builder()
.enabled(true)
.attribute_name(&config.ttl_attribute)
.build()?
)
.send()
.await?;
info!(table = %config.table_name, "DynamoDB table created with GSI and TTL");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_key_formatting() {
assert_eq!(DynamoStateStore::user_pk("abc123"), "USER#abc123");
assert_eq!(DynamoStateStore::card_pk("fp_xyz"), "CARD#fp_xyz");
assert_eq!(DynamoStateStore::txn_pk("tx_123"), "TXN#tx_123");
assert_eq!(DynamoStateStore::pending_sk(1234567890), "PENDING#00000001234567890");
}
#[test]
fn pending_sk_sorts_correctly() {
let sk1 = DynamoStateStore::pending_sk(1000);
let sk2 = DynamoStateStore::pending_sk(2000);
let sk3 = DynamoStateStore::pending_sk(999999999999);
assert!(sk1 < sk2);
assert!(sk2 < sk3);
}
}