Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9    Arc::new(RwLock::new(Vec::new()))
10}
11
12/// Serde for `Arc<RwLock<Vec<StreamRecord>>>`: persist the inner change records
13/// so a stream consumer's un-read records survive a snapshot restart
14/// (bug-audit 2026-05-28, 4.5). The field was `#[serde(skip)]`, so table data
15/// was preserved across restart but pending stream records silently vanished.
16mod stream_records_serde {
17    use super::{Arc, RwLock, StreamRecord};
18    use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20    pub fn serialize<S: Serializer>(
21        v: &Arc<RwLock<Vec<StreamRecord>>>,
22        s: S,
23    ) -> Result<S::Ok, S::Error> {
24        v.read().serialize(s)
25    }
26
27    pub fn deserialize<'de, D: Deserializer<'de>>(
28        d: D,
29    ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30        let records = Vec::<StreamRecord>::deserialize(d)?;
31        // Raise the in-memory sequence-number floor above every persisted
32        // record so newly-minted numbers cannot collide with them after a
33        // restart, even if the wall-clock seed went backwards (4.4 / Cubic).
34        for r in &records {
35            crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36        }
37        Ok(Arc::new(RwLock::new(records)))
38    }
39}
40
41/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
42/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
43pub type AttributeValue = Value;
44
45/// Extract the "typed" inner value for comparison purposes.
46/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
47pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48    let obj = av.as_object()?;
49    if obj.len() != 1 {
50        return None;
51    }
52    let (k, v) = obj.iter().next()?;
53    Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58    pub attribute_name: String,
59    pub key_type: String, // HASH or RANGE
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64    pub attribute_name: String,
65    pub attribute_type: String, // S, N, B
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70    pub read_capacity_units: i64,
71    pub write_capacity_units: i64,
72}
73
74/// On-demand capacity caps for PAY_PER_REQUEST tables and GSIs. Real AWS
75/// accepts both fields independently; `-1` (the AWS sentinel for "no cap")
76/// is the default and is what `DescribeTable` returns when the caller never
77/// set a value — the Terraform provider asserts on that exact value.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80    pub max_read_request_units: i64,
81    pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86    pub index_name: String,
87    pub key_schema: Vec<KeySchemaElement>,
88    pub projection: Projection,
89    pub provisioned_throughput: Option<ProvisionedThroughput>,
90    pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95    pub index_name: String,
96    pub key_schema: Vec<KeySchemaElement>,
97    pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
103    pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108    pub name: String,
109    pub arn: String,
110    pub table_id: String,
111    pub key_schema: Vec<KeySchemaElement>,
112    pub attribute_definitions: Vec<AttributeDefinition>,
113    pub provisioned_throughput: ProvisionedThroughput,
114    pub items: Vec<HashMap<String, AttributeValue>>,
115    pub gsi: Vec<GlobalSecondaryIndex>,
116    pub lsi: Vec<LocalSecondaryIndex>,
117    pub tags: BTreeMap<String, String>,
118    pub created_at: DateTime<Utc>,
119    pub status: String,
120    pub item_count: i64,
121    pub size_bytes: i64,
122    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
123    pub ttl_attribute: Option<String>,
124    pub ttl_enabled: bool,
125    pub resource_policy: Option<String>,
126    /// PITR enabled
127    pub pitr_enabled: bool,
128    /// Kinesis streaming destinations: stream_arn -> status
129    pub kinesis_destinations: Vec<KinesisDestination>,
130    /// Contributor insights status
131    pub contributor_insights_status: String,
132    /// Contributor insights: partition key access counters (key_value_string -> count)
133    pub contributor_insights_counters: BTreeMap<String, u64>,
134    /// DynamoDB Streams configuration
135    pub stream_enabled: bool,
136    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
137    pub stream_arn: Option<String>,
138    /// Stream records (retained for 24 hours). Not persisted: stream
139    /// records are ephemeral and would be garbage anyway across restarts.
140    #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142    /// Server-side encryption type: AES256 (owned) or KMS
143    pub sse_type: Option<String>,
144    /// KMS key ARN for SSE (only when sse_type is KMS)
145    pub sse_kms_key_arn: Option<String>,
146    /// Deletion protection: when true, DeleteTable is rejected with
147    /// `ResourceInUseException`. Defaults to false. Returned on every
148    /// `DescribeTable` and toggleable via `UpdateTable`.
149    pub deletion_protection_enabled: bool,
150    /// Table-level on-demand throughput caps. Only meaningful for
151    /// PAY_PER_REQUEST tables, but real AWS echoes the field on every
152    /// DescribeTable once set.
153    pub on_demand_throughput: Option<OnDemandThroughput>,
154    /// Storage class: STANDARD (default) or STANDARD_INFREQUENT_ACCESS.
155    /// Returned inside `TableClassSummary` on DescribeTable; set at
156    /// CreateTable and changed via UpdateTable.
157    #[serde(default = "default_table_class")]
158    pub table_class: String,
159}
160
161pub(crate) fn default_table_class() -> String {
162    "STANDARD".to_string()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct StreamRecord {
167    pub event_id: String,
168    pub event_name: String, // INSERT, MODIFY, REMOVE
169    pub event_version: String,
170    pub event_source: String,
171    pub aws_region: String,
172    pub dynamodb: DynamoDbStreamRecord,
173    pub event_source_arn: String,
174    pub timestamp: DateTime<Utc>,
175    /// Set only for system-generated changes. TTL deletions carry
176    /// `{principalId: "dynamodb.amazonaws.com", type: "Service"}` so consumers
177    /// can distinguish an expiry REMOVE from a user-driven DeleteItem. Absent
178    /// (and omitted from the wire) for ordinary writes.
179    #[serde(default)]
180    pub user_identity: Option<StreamUserIdentity>,
181}
182
183/// `userIdentity` block on a stream record. Present only for system-generated
184/// events such as TTL expirations.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct StreamUserIdentity {
187    pub principal_id: String,
188    pub identity_type: String,
189}
190
191impl StreamUserIdentity {
192    /// The marker AWS attaches to a TTL-expiry REMOVE record.
193    pub fn ttl() -> Self {
194        Self {
195            principal_id: "dynamodb.amazonaws.com".to_string(),
196            identity_type: "Service".to_string(),
197        }
198    }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DynamoDbStreamRecord {
203    pub keys: HashMap<String, AttributeValue>,
204    pub new_image: Option<HashMap<String, AttributeValue>>,
205    pub old_image: Option<HashMap<String, AttributeValue>>,
206    pub sequence_number: String,
207    pub size_bytes: i64,
208    pub stream_view_type: String,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct KinesisDestination {
213    pub stream_arn: String,
214    pub destination_status: String,
215    pub approximate_creation_date_time_precision: String,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct BackupDescription {
220    pub backup_arn: String,
221    pub backup_name: String,
222    pub table_name: String,
223    pub table_arn: String,
224    pub backup_status: String,
225    pub backup_type: String,
226    pub backup_creation_date: DateTime<Utc>,
227    pub key_schema: Vec<KeySchemaElement>,
228    pub attribute_definitions: Vec<AttributeDefinition>,
229    pub provisioned_throughput: ProvisionedThroughput,
230    pub billing_mode: String,
231    pub item_count: i64,
232    pub size_bytes: i64,
233    /// Snapshot of the table items at backup creation time.
234    pub items: Vec<HashMap<String, AttributeValue>>,
235    /// Real DDB persists GSI/LSI/tags/TTL/SSE/Stream into the backup
236    /// payload so RestoreTableFromBackup brings the full table back
237    /// up. Older snapshots may not have these fields, so all default
238    /// to empty/false via serde.
239    #[serde(default)]
240    pub gsi: Vec<GlobalSecondaryIndex>,
241    #[serde(default)]
242    pub lsi: Vec<LocalSecondaryIndex>,
243    #[serde(default)]
244    pub tags: BTreeMap<String, String>,
245    #[serde(default)]
246    pub ttl_attribute: Option<String>,
247    #[serde(default)]
248    pub ttl_enabled: bool,
249    #[serde(default)]
250    pub sse_type: Option<String>,
251    #[serde(default)]
252    pub sse_kms_key_arn: Option<String>,
253    #[serde(default)]
254    pub stream_enabled: bool,
255    #[serde(default)]
256    pub stream_view_type: Option<String>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct GlobalTableDescription {
261    pub global_table_name: String,
262    pub global_table_arn: String,
263    pub global_table_status: String,
264    pub creation_date: DateTime<Utc>,
265    pub replication_group: Vec<ReplicaDescription>,
266    /// Billing mode applied across all replicas via
267    /// `UpdateGlobalTableSettings` (`PROVISIONED` / `PAY_PER_REQUEST`).
268    /// Defaults to PROVISIONED to match real DynamoDB global-table v1.
269    #[serde(default = "default_global_billing_mode")]
270    pub billing_mode: String,
271    /// Global provisioned write capacity applied across all replicas via
272    /// `UpdateGlobalTableSettings`. `None` under PAY_PER_REQUEST.
273    #[serde(default)]
274    pub provisioned_write_capacity_units: Option<i64>,
275}
276
277fn default_global_billing_mode() -> String {
278    "PROVISIONED".to_string()
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct ReplicaDescription {
283    pub region_name: String,
284    pub replica_status: String,
285    /// Per-replica provisioned-read-capacity autoscaling settings, as supplied
286    /// via `UpdateTableReplicaAutoScaling`. Round-tripped through
287    /// `DescribeTableReplicaAutoScaling` as `AutoScalingSettingsDescription`.
288    #[serde(default)]
289    pub read_capacity_auto_scaling: Option<serde_json::Value>,
290    /// Per-replica provisioned-write-capacity autoscaling settings.
291    #[serde(default)]
292    pub write_capacity_auto_scaling: Option<serde_json::Value>,
293    /// Per-replica provisioned read capacity supplied via
294    /// `UpdateGlobalTableSettings` ReplicaSettingsUpdate.
295    #[serde(default)]
296    pub read_capacity_units: Option<i64>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct ExportDescription {
301    pub export_arn: String,
302    pub export_status: String,
303    pub table_arn: String,
304    pub s3_bucket: String,
305    pub s3_prefix: Option<String>,
306    pub export_format: String,
307    pub start_time: DateTime<Utc>,
308    pub end_time: DateTime<Utc>,
309    pub export_time: DateTime<Utc>,
310    pub item_count: i64,
311    pub billed_size_bytes: i64,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct ImportDescription {
316    pub import_arn: String,
317    pub import_status: String,
318    pub table_arn: String,
319    pub table_name: String,
320    pub s3_bucket_source: String,
321    pub input_format: String,
322    pub start_time: DateTime<Utc>,
323    pub end_time: DateTime<Utc>,
324    pub processed_item_count: i64,
325    pub processed_size_bytes: i64,
326}
327
328impl DynamoTable {
329    /// Get the hash key attribute name from the key schema.
330    pub fn hash_key_name(&self) -> &str {
331        self.key_schema
332            .iter()
333            .find(|k| k.key_type == "HASH")
334            .map(|k| k.attribute_name.as_str())
335            .unwrap_or("")
336    }
337
338    /// Get the range key attribute name from the key schema (if any).
339    pub fn range_key_name(&self) -> Option<&str> {
340        self.key_schema
341            .iter()
342            .find(|k| k.key_type == "RANGE")
343            .map(|k| k.attribute_name.as_str())
344    }
345
346    /// Find an item index by its primary key.
347    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
348        let hash_key = self.hash_key_name();
349        let range_key = self.range_key_name();
350
351        self.items.iter().position(|item| {
352            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
353                (Some(a), Some(b)) => a == b,
354                _ => false,
355            };
356            if !hash_match {
357                return false;
358            }
359            match range_key {
360                Some(rk) => match (item.get(rk), key.get(rk)) {
361                    (Some(a), Some(b)) => a == b,
362                    (None, None) => true,
363                    _ => false,
364                },
365                None => true,
366            }
367        })
368    }
369
370    /// Estimate item size in bytes (rough approximation).
371    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
372        let mut size: i64 = 0;
373        for (k, v) in item {
374            size += k.len() as i64;
375            size += Self::estimate_value_size(v);
376        }
377        size
378    }
379
380    fn estimate_value_size(v: &Value) -> i64 {
381        match v {
382            Value::Object(obj) => {
383                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
384                    s.len() as i64
385                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
386                    n.len() as i64
387                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
388                    1
389                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
390                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
391                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
392                    3 + m
393                        .iter()
394                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
395                        .sum::<i64>()
396                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
397                    ss.iter()
398                        .filter_map(|v| v.as_str())
399                        .map(|s| s.len() as i64)
400                        .sum()
401                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
402                    ns.iter()
403                        .filter_map(|v| v.as_str())
404                        .map(|s| s.len() as i64)
405                        .sum()
406                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
407                    // Base64-encoded binary
408                    (b.len() as i64 * 3) / 4
409                } else {
410                    v.to_string().len() as i64
411                }
412            }
413            _ => v.to_string().len() as i64,
414        }
415    }
416
417    /// Record a partition key access for contributor insights.
418    /// Only records if contributor insights is enabled.
419    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
420        if self.contributor_insights_status != "ENABLED" {
421            return;
422        }
423        let hash_key = self.hash_key_name().to_string();
424        if let Some(pk_value) = key.get(&hash_key) {
425            let key_str = pk_value.to_string();
426            *self
427                .contributor_insights_counters
428                .entry(key_str)
429                .or_insert(0) += 1;
430        }
431    }
432
433    /// Record a partition key access from a full item (extracts the key first).
434    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
435        if self.contributor_insights_status != "ENABLED" {
436            return;
437        }
438        let hash_key = self.hash_key_name().to_string();
439        if let Some(pk_value) = item.get(&hash_key) {
440            let key_str = pk_value.to_string();
441            *self
442                .contributor_insights_counters
443                .entry(key_str)
444                .or_insert(0) += 1;
445        }
446    }
447
448    /// Get top N contributors sorted by access count (descending).
449    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
450        let mut entries: Vec<(&str, u64)> = self
451            .contributor_insights_counters
452            .iter()
453            .map(|(k, &v)| (k.as_str(), v))
454            .collect();
455        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
456        entries.truncate(n);
457        entries
458    }
459
460    /// Recalculate item_count and size_bytes from the items vec.
461    pub fn recalculate_stats(&mut self) {
462        self.item_count = self.items.len() as i64;
463        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
464    }
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize)]
468pub struct DynamoDbState {
469    pub account_id: String,
470    pub region: String,
471    pub tables: BTreeMap<String, DynamoTable>,
472    pub backups: BTreeMap<String, BackupDescription>,
473    pub global_tables: BTreeMap<String, GlobalTableDescription>,
474    pub exports: BTreeMap<String, ExportDescription>,
475    pub imports: BTreeMap<String, ImportDescription>,
476    /// DynamoDB Streams -> Lambda event-source-mapping checkpoints: the
477    /// last stream sequence number delivered for each mapping
478    /// (keyed by ESM uuid). Persisted so the streams poller resumes from
479    /// where it left off after a restart instead of re-seeding TRIM_HORIZON
480    /// and re-invoking the target Lambda with the whole retained backlog
481    /// (duplicate side effects). Mirrors `KinesisState.lambda_checkpoints`.
482    /// `#[serde(default)]` keeps older snapshots loadable.
483    #[serde(default)]
484    pub lambda_stream_checkpoints: BTreeMap<String, String>,
485}
486
487/// On-disk snapshot envelope. The payload is the full [`DynamoDbState`];
488/// `schema_version` lets us evolve the format without accidentally loading
489/// an incompatible dump on upgrade.
490#[derive(Debug, Clone, Serialize, Deserialize)]
491pub struct DynamoDbSnapshot {
492    pub schema_version: u32,
493    /// v2+: multi-account state.
494    #[serde(default)]
495    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
496    /// v1 compat: single-account state.
497    #[serde(default)]
498    pub state: Option<DynamoDbState>,
499}
500
501pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
502
503impl DynamoDbState {
504    pub fn new(account_id: &str, region: &str) -> Self {
505        Self {
506            account_id: account_id.to_string(),
507            region: region.to_string(),
508            tables: BTreeMap::new(),
509            backups: BTreeMap::new(),
510            global_tables: BTreeMap::new(),
511            exports: BTreeMap::new(),
512            imports: BTreeMap::new(),
513            lambda_stream_checkpoints: BTreeMap::new(),
514        }
515    }
516
517    pub fn reset(&mut self) {
518        self.tables.clear();
519        self.backups.clear();
520        self.global_tables.clear();
521        self.exports.clear();
522        self.imports.clear();
523        self.lambda_stream_checkpoints.clear();
524    }
525
526    /// Last stream sequence number delivered for the given DynamoDB
527    /// Streams -> Lambda event source mapping, or `None` if the mapping
528    /// has never delivered (so the poller seeds from StartingPosition).
529    pub fn lambda_stream_checkpoint(&self, mapping_uuid: &str) -> Option<String> {
530        self.lambda_stream_checkpoints.get(mapping_uuid).cloned()
531    }
532
533    /// Record the last stream sequence number delivered for a mapping. The
534    /// value rides along with the next DynamoDB snapshot save, exactly the
535    /// way Kinesis lambda checkpoints persist through their snapshot.
536    pub fn set_lambda_stream_checkpoint(&mut self, mapping_uuid: &str, sequence_number: String) {
537        self.lambda_stream_checkpoints
538            .insert(mapping_uuid.to_string(), sequence_number);
539    }
540}
541
542impl fakecloud_core::multi_account::AccountState for DynamoDbState {
543    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
544        Self::new(account_id, region)
545    }
546}
547
548pub type SharedDynamoDbState =
549    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use serde_json::json;
555
556    #[test]
557    fn attribute_type_and_value_valid() {
558        let v = json!({"S": "hi"});
559        let (ty, val) = attribute_type_and_value(&v).unwrap();
560        assert_eq!(ty, "S");
561        assert_eq!(val, &json!("hi"));
562    }
563
564    #[test]
565    fn attribute_type_and_value_empty_returns_none() {
566        let v = json!({});
567        assert!(attribute_type_and_value(&v).is_none());
568    }
569
570    #[test]
571    fn attribute_type_and_value_multiple_entries_returns_none() {
572        let v = json!({"S": "hi", "N": "1"});
573        assert!(attribute_type_and_value(&v).is_none());
574    }
575
576    #[test]
577    fn attribute_type_and_value_non_object_returns_none() {
578        let v = json!("not-object");
579        assert!(attribute_type_and_value(&v).is_none());
580    }
581
582    #[test]
583    fn account_state_trait_impl() {
584        use fakecloud_core::multi_account::AccountState;
585        let state = DynamoDbState::new_for_account("123", "us-east-1", "");
586        assert_eq!(state.account_id, "123");
587        assert_eq!(state.region, "us-east-1");
588    }
589
590    #[test]
591    fn new_and_reset() {
592        let state = DynamoDbState::new("123", "us-east-1");
593        assert!(state.tables.is_empty());
594    }
595
596    fn table_with_hash_key(hash: &str) -> DynamoTable {
597        DynamoTable {
598            name: "t".to_string(),
599            arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
600            table_id: "id".to_string(),
601            key_schema: vec![KeySchemaElement {
602                attribute_name: hash.to_string(),
603                key_type: "HASH".to_string(),
604            }],
605            attribute_definitions: vec![],
606            provisioned_throughput: ProvisionedThroughput {
607                read_capacity_units: 1,
608                write_capacity_units: 1,
609            },
610            items: Vec::new(),
611            gsi: Vec::new(),
612            lsi: Vec::new(),
613            tags: BTreeMap::new(),
614            created_at: Utc::now(),
615            status: "ACTIVE".to_string(),
616            item_count: 0,
617            size_bytes: 0,
618            billing_mode: "PROVISIONED".to_string(),
619            ttl_attribute: None,
620            ttl_enabled: false,
621            resource_policy: None,
622            pitr_enabled: false,
623            kinesis_destinations: Vec::new(),
624            contributor_insights_status: "DISABLED".to_string(),
625            contributor_insights_counters: BTreeMap::new(),
626            stream_enabled: false,
627            stream_view_type: None,
628            stream_arn: None,
629            stream_records: empty_stream_records(),
630            sse_type: None,
631            sse_kms_key_arn: None,
632            deletion_protection_enabled: false,
633            on_demand_throughput: None,
634            table_class: default_table_class(),
635        }
636    }
637
638    #[test]
639    fn hash_key_name_extracts_from_schema() {
640        let t = table_with_hash_key("pk");
641        assert_eq!(t.hash_key_name(), "pk");
642    }
643
644    #[test]
645    fn hash_key_name_empty_when_no_hash_schema() {
646        let mut t = table_with_hash_key("pk");
647        t.key_schema.clear();
648        assert_eq!(t.hash_key_name(), "");
649    }
650
651    #[test]
652    fn record_key_access_noop_when_disabled() {
653        let mut t = table_with_hash_key("pk");
654        let mut key = HashMap::new();
655        key.insert("pk".to_string(), json!({"S": "a"}));
656        t.record_key_access(&key);
657        assert!(t.contributor_insights_counters.is_empty());
658    }
659
660    #[test]
661    fn record_key_access_increments_when_enabled() {
662        let mut t = table_with_hash_key("pk");
663        t.contributor_insights_status = "ENABLED".to_string();
664        let mut key = HashMap::new();
665        key.insert("pk".to_string(), json!({"S": "a"}));
666        t.record_key_access(&key);
667        t.record_key_access(&key);
668        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
669    }
670
671    #[test]
672    fn record_item_access_uses_hash_key_from_item() {
673        let mut t = table_with_hash_key("pk");
674        t.contributor_insights_status = "ENABLED".to_string();
675        let mut item = HashMap::new();
676        item.insert("pk".to_string(), json!({"S": "user-1"}));
677        item.insert("other".to_string(), json!({"N": "42"}));
678        t.record_item_access(&item);
679        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
680    }
681
682    #[test]
683    fn top_contributors_returns_sorted() {
684        let mut t = table_with_hash_key("pk");
685        t.contributor_insights_counters.insert("a".to_string(), 3);
686        t.contributor_insights_counters.insert("b".to_string(), 10);
687        t.contributor_insights_counters.insert("c".to_string(), 1);
688        let top = t.top_contributors(2);
689        assert_eq!(top.len(), 2);
690        assert_eq!(top[0], ("b", 10));
691        assert_eq!(top[1], ("a", 3));
692    }
693
694    #[test]
695    fn recalculate_stats_matches_items() {
696        let mut t = table_with_hash_key("pk");
697        let mut item1 = HashMap::new();
698        item1.insert("pk".to_string(), json!({"S": "hello"}));
699        let mut item2 = HashMap::new();
700        item2.insert("pk".to_string(), json!({"N": "42"}));
701        item2.insert("flag".to_string(), json!({"BOOL": true}));
702        t.items.push(item1);
703        t.items.push(item2);
704        t.recalculate_stats();
705        assert_eq!(t.item_count, 2);
706        assert!(t.size_bytes > 0);
707    }
708
709    #[test]
710    fn estimate_value_size_covers_all_types() {
711        let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
712        assert_eq!(s, 3);
713        let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
714        assert_eq!(n, 2);
715        let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
716        assert_eq!(b, 1);
717        let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
718        assert_eq!(null, 1);
719        let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
720        assert_eq!(l, 6);
721        let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
722        assert_eq!(m, 7);
723        let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
724        assert_eq!(ss, 5);
725        let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
726        assert_eq!(ns, 5);
727        let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
728        assert_eq!(bin, 6);
729    }
730}