use crate::state::{AttributeValue, DynamoDbStreamRecord, DynamoTable, StreamRecord};
use chrono::Utc;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::OnceLock;
use uuid::Uuid;
static STREAM_SEQUENCE: OnceLock<AtomicU64> = OnceLock::new();
fn stream_sequence_counter() -> &'static AtomicU64 {
STREAM_SEQUENCE.get_or_init(|| {
let seed = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(1)
.max(1);
AtomicU64::new(seed)
})
}
pub fn next_stream_sequence() -> String {
format!(
"{:021}",
stream_sequence_counter().fetch_add(1, Ordering::Relaxed)
)
}
pub fn observe_stream_sequence(sequence_number: &str) {
if let Ok(seq) = sequence_number.parse::<u64>() {
stream_sequence_counter().fetch_max(seq.saturating_add(1), Ordering::Relaxed);
}
}
pub fn generate_stream_record(
table: &DynamoTable,
event_name: &str, keys: HashMap<String, AttributeValue>,
old_image: Option<HashMap<String, AttributeValue>>,
new_image: Option<HashMap<String, AttributeValue>>,
region: &str,
) -> Option<StreamRecord> {
if !table.stream_enabled {
return None;
}
let stream_view_type = table.stream_view_type.as_ref()?;
let (filtered_old, filtered_new) = match stream_view_type.as_str() {
"KEYS_ONLY" => (None, None),
"NEW_IMAGE" => (None, new_image),
"OLD_IMAGE" => (old_image, None),
"NEW_AND_OLD_IMAGES" => (old_image, new_image),
_ => (None, None),
};
let size_bytes = serde_json::to_vec(&keys).ok()?.len() as i64
+ filtered_old
.as_ref()
.and_then(|img| serde_json::to_vec(img).ok())
.map(|v| v.len() as i64)
.unwrap_or(0)
+ filtered_new
.as_ref()
.and_then(|img| serde_json::to_vec(img).ok())
.map(|v| v.len() as i64)
.unwrap_or(0);
let event_id = Uuid::new_v4().to_string();
let sequence_number = next_stream_sequence();
Some(StreamRecord {
event_id,
event_name: event_name.to_string(),
event_version: "1.1".to_string(),
event_source: "aws:dynamodb".to_string(),
aws_region: region.to_string(),
dynamodb: DynamoDbStreamRecord {
keys,
new_image: filtered_new,
old_image: filtered_old,
sequence_number,
size_bytes,
stream_view_type: stream_view_type.clone(),
},
event_source_arn: table.stream_arn.clone().unwrap_or_default(),
timestamp: Utc::now(),
})
}
pub fn add_stream_record(table: &mut DynamoTable, record: StreamRecord) {
let mut records = table.stream_records.write();
records.push(record);
let cutoff = Utc::now() - chrono::Duration::hours(24);
records.retain(|r| r.timestamp > cutoff);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::{KeySchemaElement, ProvisionedThroughput};
use parking_lot::RwLock;
use serde_json::json;
use std::collections::BTreeMap;
use std::sync::Arc;
#[test]
fn stream_sequence_numbers_are_unique_and_monotonic() {
let seqs: Vec<String> = (0..10_000).map(|_| next_stream_sequence()).collect();
let unique: std::collections::HashSet<&String> = seqs.iter().collect();
assert_eq!(unique.len(), seqs.len(), "sequence numbers must be unique");
for w in seqs.windows(2) {
assert!(w[0] < w[1], "sequence numbers must strictly increase");
assert_eq!(w[0].len(), w[1].len(), "fixed-width padding");
}
}
#[test]
fn stream_sequence_numbers_seeded_above_low_persisted_values() {
let seq = next_stream_sequence();
let n: u128 = seq.parse().unwrap();
assert!(
n > 1_000_000_000_000_000_000,
"sequence not clock-seeded: {seq}"
);
}
#[test]
fn observe_stream_sequence_raises_floor_above_persisted() {
let high = format!("{:021}", 9_000_000_000_000_000_000u64);
observe_stream_sequence(&high);
let next: u64 = next_stream_sequence().parse().unwrap();
assert!(
next > 9_000_000_000_000_000_000,
"floor not strictly above observed: {next}"
);
}
#[test]
fn stream_sequence_numbers_unique_under_concurrency() {
use std::thread;
let handles: Vec<_> = (0..8)
.map(|_| {
thread::spawn(|| {
(0..1000)
.map(|_| next_stream_sequence())
.collect::<Vec<_>>()
})
})
.collect();
let mut all = std::collections::HashSet::new();
for h in handles {
for s in h.join().unwrap() {
assert!(all.insert(s), "duplicate sequence number across threads");
}
}
assert_eq!(all.len(), 8 * 1000);
}
fn make_stream_table() -> DynamoTable {
DynamoTable {
name: "test-table".to_string(),
arn: "arn:aws:dynamodb:eu-west-1:999999999999:table/test-table".to_string(),
table_id: "test-table-id".to_string(),
key_schema: vec![KeySchemaElement {
attribute_name: "pk".to_string(),
key_type: "HASH".to_string(),
}],
attribute_definitions: vec![],
provisioned_throughput: ProvisionedThroughput {
read_capacity_units: 5,
write_capacity_units: 5,
},
items: vec![],
gsi: vec![],
lsi: vec![],
tags: BTreeMap::new(),
created_at: Utc::now(),
status: "ACTIVE".to_string(),
item_count: 0,
size_bytes: 0,
billing_mode: "PROVISIONED".to_string(),
ttl_attribute: None,
ttl_enabled: false,
resource_policy: None,
pitr_enabled: false,
kinesis_destinations: vec![],
contributor_insights_status: "DISABLED".to_string(),
contributor_insights_counters: BTreeMap::new(),
stream_enabled: true,
stream_view_type: Some("NEW_AND_OLD_IMAGES".to_string()),
stream_arn: Some(
"arn:aws:dynamodb:eu-west-1:999999999999:table/test-table/stream/123".to_string(),
),
stream_records: Arc::new(RwLock::new(Vec::new())),
sse_type: None,
sse_kms_key_arn: None,
deletion_protection_enabled: false,
on_demand_throughput: None,
table_class: "STANDARD".to_string(),
}
}
#[test]
fn stream_record_uses_configured_region() {
let table = make_stream_table();
let mut keys = HashMap::new();
keys.insert("pk".to_string(), json!({"S": "user1"}));
let record = generate_stream_record(
&table,
"INSERT",
keys,
None,
Some(HashMap::from([("pk".to_string(), json!({"S": "user1"}))])),
"eu-west-1",
)
.expect("stream record should be generated");
assert_eq!(
record.aws_region, "eu-west-1",
"stream record must use the configured region, not a hardcoded value"
);
}
#[test]
fn stream_records_persist_across_serde() {
let table = make_stream_table();
let mut keys = HashMap::new();
keys.insert("pk".to_string(), json!({"S": "user1"}));
let record = generate_stream_record(
&table,
"INSERT",
keys,
None,
Some(HashMap::from([("pk".to_string(), json!({"S": "user1"}))])),
"eu-west-1",
)
.expect("stream record should be generated");
let want_event_id = record.event_id.clone();
table.stream_records.write().push(record);
let json = serde_json::to_string(&table).unwrap();
let restored: DynamoTable = serde_json::from_str(&json).unwrap();
let recs = restored.stream_records.read();
assert_eq!(recs.len(), 1, "pending stream record must survive restart");
assert_eq!(recs[0].event_id, want_event_id);
assert_eq!(recs[0].event_name, "INSERT");
}
}