use crate::state::{AttributeValue, DynamoDbStreamRecord, DynamoTable, StreamRecord};
use chrono::Utc;
use std::collections::HashMap;
use uuid::Uuid;
pub fn generate_stream_record(
table: &DynamoTable,
event_name: &str, keys: HashMap<String, AttributeValue>,
old_image: Option<HashMap<String, AttributeValue>>,
new_image: Option<HashMap<String, AttributeValue>>,
) -> Option<StreamRecord> {
if !table.stream_enabled {
return None;
}
let stream_view_type = table.stream_view_type.as_ref()?;
let (filtered_old, filtered_new) = match stream_view_type.as_str() {
"KEYS_ONLY" => (None, None),
"NEW_IMAGE" => (None, new_image),
"OLD_IMAGE" => (old_image, None),
"NEW_AND_OLD_IMAGES" => (old_image, new_image),
_ => (None, None),
};
let size_bytes = serde_json::to_vec(&keys).ok()?.len() as i64
+ filtered_old
.as_ref()
.and_then(|img| serde_json::to_vec(img).ok())
.map(|v| v.len() as i64)
.unwrap_or(0)
+ filtered_new
.as_ref()
.and_then(|img| serde_json::to_vec(img).ok())
.map(|v| v.len() as i64)
.unwrap_or(0);
let event_id = Uuid::new_v4().to_string();
let sequence_number = Utc::now().timestamp_nanos_opt()?.to_string();
Some(StreamRecord {
event_id,
event_name: event_name.to_string(),
event_version: "1.1".to_string(),
event_source: "aws:dynamodb".to_string(),
aws_region: "us-east-1".to_string(), dynamodb: DynamoDbStreamRecord {
keys,
new_image: filtered_new,
old_image: filtered_old,
sequence_number,
size_bytes,
stream_view_type: stream_view_type.clone(),
},
event_source_arn: table.stream_arn.clone().unwrap_or_default(),
timestamp: Utc::now(),
})
}
pub fn add_stream_record(table: &mut DynamoTable, record: StreamRecord) {
let mut records = table.stream_records.write();
records.push(record);
let cutoff = Utc::now() - chrono::Duration::hours(24);
records.retain(|r| r.timestamp > cutoff);
}