use crate::errors::Result;
use crate::storage::TableMetadata;
use crate::storage_backend::StorageBackend;
use crate::storage_backend::clock::Clock;
use crate::types::{AttributeValue, Item};
use std::collections::HashMap;
pub const LOCAL_REGION: &str = "dynoxide";
pub const LOCAL_ACCOUNT: &str = "000000000000";
pub fn table_arn(table_name: &str) -> String {
format!("arn:aws:dynamodb:{LOCAL_REGION}:{LOCAL_ACCOUNT}:table/{table_name}")
}
pub fn index_arn(table_name: &str, index_name: &str) -> String {
format!("arn:aws:dynamodb:{LOCAL_REGION}:{LOCAL_ACCOUNT}:table/{table_name}/index/{index_name}")
}
pub fn stream_arn(table_name: &str, label: &str) -> String {
format!("arn:aws:dynamodb:{LOCAL_REGION}:{LOCAL_ACCOUNT}:table/{table_name}/stream/{label}")
}
pub fn kms_key_arn(key_id: &str) -> String {
if key_id.starts_with("arn:") {
key_id.to_string()
} else {
format!("arn:aws:kms:{LOCAL_REGION}:{LOCAL_ACCOUNT}:key/{key_id}")
}
}
pub fn shard_id(table_name: &str) -> String {
format!("shardId-00000001-{table_name}")
}
pub fn generate_stream_label(clock: &dyn Clock) -> String {
let now_f64 = clock.now_unix_secs_f64();
let secs = now_f64.trunc() as u64;
let nanos = ((now_f64 - secs as f64) * 1_000_000_000.0) as u32;
format!("{secs}.{nanos:09}")
}
pub fn extract_keys(item: &Item, key_schema_json: &str) -> HashMap<String, AttributeValue> {
let key_schema: Vec<crate::types::KeySchemaElement> =
serde_json::from_str(key_schema_json).unwrap_or_default();
let mut keys = HashMap::new();
for ks in &key_schema {
if let Some(val) = item.get(&ks.attribute_name) {
keys.insert(ks.attribute_name.clone(), val.clone());
}
}
keys
}
#[allow(clippy::too_many_arguments)]
pub async fn record_stream_event<S: StorageBackend>(
storage: &S,
meta: &TableMetadata,
old_item: Option<&Item>,
new_item: Option<&Item>,
) -> Result<()> {
if !meta.stream_enabled {
return Ok(());
}
let view_type = meta
.stream_view_type
.as_deref()
.unwrap_or("NEW_AND_OLD_IMAGES");
let event_name = match (old_item, new_item) {
(None, Some(_)) => "INSERT",
(Some(_), Some(_)) => "MODIFY",
(Some(_), None) => "REMOVE",
(None, None) => return Ok(()),
};
let ref_item = new_item.or(old_item).unwrap();
let keys = extract_keys(ref_item, &meta.key_schema);
let keys_json = serde_json::to_string(&keys).unwrap_or_default();
let new_image_json = match view_type {
"NEW_IMAGE" | "NEW_AND_OLD_IMAGES" => {
new_item.map(|i| serde_json::to_string(i).unwrap_or_default())
}
_ => None,
};
let old_image_json = match view_type {
"OLD_IMAGE" | "NEW_AND_OLD_IMAGES" => {
old_item.map(|i| serde_json::to_string(i).unwrap_or_default())
}
_ => None,
};
let seq_num = storage
.next_stream_sequence_number(&meta.table_name)
.await?;
let sid = shard_id(&meta.table_name);
let now = storage.clock().now_unix_secs() as i64;
storage
.insert_stream_record(
&meta.table_name,
event_name,
&keys_json,
new_image_json.as_deref(),
old_image_json.as_deref(),
&seq_num.to_string(),
&sid,
now,
)
.await?;
Ok(())
}