Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9    Arc::new(RwLock::new(Vec::new()))
10}
11
12/// Serde for `Arc<RwLock<Vec<StreamRecord>>>`: persist the inner change records
13/// so a stream consumer's un-read records survive a snapshot restart
14/// (bug-audit 2026-05-28, 4.5). The field was `#[serde(skip)]`, so table data
15/// was preserved across restart but pending stream records silently vanished.
16mod stream_records_serde {
17    use super::{Arc, RwLock, StreamRecord};
18    use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20    pub fn serialize<S: Serializer>(
21        v: &Arc<RwLock<Vec<StreamRecord>>>,
22        s: S,
23    ) -> Result<S::Ok, S::Error> {
24        v.read().serialize(s)
25    }
26
27    pub fn deserialize<'de, D: Deserializer<'de>>(
28        d: D,
29    ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30        let records = Vec::<StreamRecord>::deserialize(d)?;
31        // Raise the in-memory sequence-number floor above every persisted
32        // record so newly-minted numbers cannot collide with them after a
33        // restart, even if the wall-clock seed went backwards (4.4 / Cubic).
34        for r in &records {
35            crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36        }
37        Ok(Arc::new(RwLock::new(records)))
38    }
39}
40
41/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
42/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
43pub type AttributeValue = Value;
44
45/// Extract the "typed" inner value for comparison purposes.
46/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
47pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48    let obj = av.as_object()?;
49    if obj.len() != 1 {
50        return None;
51    }
52    let (k, v) = obj.iter().next()?;
53    Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58    pub attribute_name: String,
59    pub key_type: String, // HASH or RANGE
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64    pub attribute_name: String,
65    pub attribute_type: String, // S, N, B
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70    pub read_capacity_units: i64,
71    pub write_capacity_units: i64,
72}
73
74/// On-demand capacity caps for PAY_PER_REQUEST tables and GSIs. Real AWS
75/// accepts both fields independently; `-1` (the AWS sentinel for "no cap")
76/// is the default and is what `DescribeTable` returns when the caller never
77/// set a value — the Terraform provider asserts on that exact value.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80    pub max_read_request_units: i64,
81    pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86    pub index_name: String,
87    pub key_schema: Vec<KeySchemaElement>,
88    pub projection: Projection,
89    pub provisioned_throughput: Option<ProvisionedThroughput>,
90    pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95    pub index_name: String,
96    pub key_schema: Vec<KeySchemaElement>,
97    pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
103    pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108    pub name: String,
109    pub arn: String,
110    pub table_id: String,
111    pub key_schema: Vec<KeySchemaElement>,
112    pub attribute_definitions: Vec<AttributeDefinition>,
113    pub provisioned_throughput: ProvisionedThroughput,
114    pub items: Vec<HashMap<String, AttributeValue>>,
115    pub gsi: Vec<GlobalSecondaryIndex>,
116    pub lsi: Vec<LocalSecondaryIndex>,
117    pub tags: BTreeMap<String, String>,
118    pub created_at: DateTime<Utc>,
119    pub status: String,
120    pub item_count: i64,
121    pub size_bytes: i64,
122    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
123    pub ttl_attribute: Option<String>,
124    pub ttl_enabled: bool,
125    pub resource_policy: Option<String>,
126    /// PITR enabled
127    pub pitr_enabled: bool,
128    /// Kinesis streaming destinations: stream_arn -> status
129    pub kinesis_destinations: Vec<KinesisDestination>,
130    /// Contributor insights status
131    pub contributor_insights_status: String,
132    /// Contributor insights: partition key access counters (key_value_string -> count)
133    pub contributor_insights_counters: BTreeMap<String, u64>,
134    /// DynamoDB Streams configuration
135    pub stream_enabled: bool,
136    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
137    pub stream_arn: Option<String>,
138    /// Stream records (retained for 24 hours). Not persisted: stream
139    /// records are ephemeral and would be garbage anyway across restarts.
140    #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142    /// Server-side encryption type: AES256 (owned) or KMS
143    pub sse_type: Option<String>,
144    /// KMS key ARN for SSE (only when sse_type is KMS)
145    pub sse_kms_key_arn: Option<String>,
146    /// Deletion protection: when true, DeleteTable is rejected with
147    /// `ResourceInUseException`. Defaults to false. Returned on every
148    /// `DescribeTable` and toggleable via `UpdateTable`.
149    pub deletion_protection_enabled: bool,
150    /// Table-level on-demand throughput caps. Only meaningful for
151    /// PAY_PER_REQUEST tables, but real AWS echoes the field on every
152    /// DescribeTable once set.
153    pub on_demand_throughput: Option<OnDemandThroughput>,
154    /// Storage class: STANDARD (default) or STANDARD_INFREQUENT_ACCESS.
155    /// Returned inside `TableClassSummary` on DescribeTable; set at
156    /// CreateTable and changed via UpdateTable.
157    #[serde(default = "default_table_class")]
158    pub table_class: String,
159}
160
161pub(crate) fn default_table_class() -> String {
162    "STANDARD".to_string()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct StreamRecord {
167    pub event_id: String,
168    pub event_name: String, // INSERT, MODIFY, REMOVE
169    pub event_version: String,
170    pub event_source: String,
171    pub aws_region: String,
172    pub dynamodb: DynamoDbStreamRecord,
173    pub event_source_arn: String,
174    pub timestamp: DateTime<Utc>,
175    /// Set only for system-generated changes. TTL deletions carry
176    /// `{principalId: "dynamodb.amazonaws.com", type: "Service"}` so consumers
177    /// can distinguish an expiry REMOVE from a user-driven DeleteItem. Absent
178    /// (and omitted from the wire) for ordinary writes.
179    #[serde(default)]
180    pub user_identity: Option<StreamUserIdentity>,
181}
182
183/// `userIdentity` block on a stream record. Present only for system-generated
184/// events such as TTL expirations.
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct StreamUserIdentity {
187    pub principal_id: String,
188    pub identity_type: String,
189}
190
191impl StreamUserIdentity {
192    /// The marker AWS attaches to a TTL-expiry REMOVE record.
193    pub fn ttl() -> Self {
194        Self {
195            principal_id: "dynamodb.amazonaws.com".to_string(),
196            identity_type: "Service".to_string(),
197        }
198    }
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize)]
202pub struct DynamoDbStreamRecord {
203    pub keys: HashMap<String, AttributeValue>,
204    pub new_image: Option<HashMap<String, AttributeValue>>,
205    pub old_image: Option<HashMap<String, AttributeValue>>,
206    pub sequence_number: String,
207    pub size_bytes: i64,
208    pub stream_view_type: String,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
212pub struct KinesisDestination {
213    pub stream_arn: String,
214    pub destination_status: String,
215    pub approximate_creation_date_time_precision: String,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub struct BackupDescription {
220    pub backup_arn: String,
221    pub backup_name: String,
222    pub table_name: String,
223    pub table_arn: String,
224    pub backup_status: String,
225    pub backup_type: String,
226    pub backup_creation_date: DateTime<Utc>,
227    pub key_schema: Vec<KeySchemaElement>,
228    pub attribute_definitions: Vec<AttributeDefinition>,
229    pub provisioned_throughput: ProvisionedThroughput,
230    pub billing_mode: String,
231    pub item_count: i64,
232    pub size_bytes: i64,
233    /// Snapshot of the table items at backup creation time.
234    pub items: Vec<HashMap<String, AttributeValue>>,
235    /// Real DDB persists GSI/LSI/tags/TTL/SSE/Stream into the backup
236    /// payload so RestoreTableFromBackup brings the full table back
237    /// up. Older snapshots may not have these fields, so all default
238    /// to empty/false via serde.
239    #[serde(default)]
240    pub gsi: Vec<GlobalSecondaryIndex>,
241    #[serde(default)]
242    pub lsi: Vec<LocalSecondaryIndex>,
243    #[serde(default)]
244    pub tags: BTreeMap<String, String>,
245    #[serde(default)]
246    pub ttl_attribute: Option<String>,
247    #[serde(default)]
248    pub ttl_enabled: bool,
249    #[serde(default)]
250    pub sse_type: Option<String>,
251    #[serde(default)]
252    pub sse_kms_key_arn: Option<String>,
253    #[serde(default)]
254    pub stream_enabled: bool,
255    #[serde(default)]
256    pub stream_view_type: Option<String>,
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct GlobalTableDescription {
261    pub global_table_name: String,
262    pub global_table_arn: String,
263    pub global_table_status: String,
264    pub creation_date: DateTime<Utc>,
265    pub replication_group: Vec<ReplicaDescription>,
266    /// Billing mode applied across all replicas via
267    /// `UpdateGlobalTableSettings` (`PROVISIONED` / `PAY_PER_REQUEST`).
268    /// Defaults to PROVISIONED to match real DynamoDB global-table v1.
269    #[serde(default = "default_global_billing_mode")]
270    pub billing_mode: String,
271    /// Global provisioned write capacity applied across all replicas via
272    /// `UpdateGlobalTableSettings`. `None` under PAY_PER_REQUEST.
273    #[serde(default)]
274    pub provisioned_write_capacity_units: Option<i64>,
275}
276
277fn default_global_billing_mode() -> String {
278    "PROVISIONED".to_string()
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct ReplicaDescription {
283    pub region_name: String,
284    pub replica_status: String,
285    /// Per-replica provisioned-read-capacity autoscaling settings, as supplied
286    /// via `UpdateTableReplicaAutoScaling`. Round-tripped through
287    /// `DescribeTableReplicaAutoScaling` as `AutoScalingSettingsDescription`.
288    #[serde(default)]
289    pub read_capacity_auto_scaling: Option<serde_json::Value>,
290    /// Per-replica provisioned-write-capacity autoscaling settings.
291    #[serde(default)]
292    pub write_capacity_auto_scaling: Option<serde_json::Value>,
293    /// Per-replica provisioned read capacity supplied via
294    /// `UpdateGlobalTableSettings` ReplicaSettingsUpdate.
295    #[serde(default)]
296    pub read_capacity_units: Option<i64>,
297}
298
299#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct ExportDescription {
301    pub export_arn: String,
302    pub export_status: String,
303    pub table_arn: String,
304    pub s3_bucket: String,
305    pub s3_prefix: Option<String>,
306    pub export_format: String,
307    pub start_time: DateTime<Utc>,
308    pub end_time: DateTime<Utc>,
309    pub export_time: DateTime<Utc>,
310    pub item_count: i64,
311    pub billed_size_bytes: i64,
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct ImportDescription {
316    pub import_arn: String,
317    pub import_status: String,
318    pub table_arn: String,
319    pub table_name: String,
320    pub s3_bucket_source: String,
321    pub input_format: String,
322    pub start_time: DateTime<Utc>,
323    pub end_time: DateTime<Utc>,
324    pub processed_item_count: i64,
325    pub processed_size_bytes: i64,
326}
327
328impl DynamoTable {
329    /// Get the hash key attribute name from the key schema.
330    pub fn hash_key_name(&self) -> &str {
331        self.key_schema
332            .iter()
333            .find(|k| k.key_type == "HASH")
334            .map(|k| k.attribute_name.as_str())
335            .unwrap_or("")
336    }
337
338    /// Get the range key attribute name from the key schema (if any).
339    pub fn range_key_name(&self) -> Option<&str> {
340        self.key_schema
341            .iter()
342            .find(|k| k.key_type == "RANGE")
343            .map(|k| k.attribute_name.as_str())
344    }
345
346    /// Find an item index by its primary key.
347    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
348        let hash_key = self.hash_key_name();
349        let range_key = self.range_key_name();
350
351        // Compare keys with numeric-aware equality: a Number key stored as
352        // `{"N":"1"}` must match a lookup of `{"N":"1.0"}` (they are the same
353        // DynamoDB number). Raw JSON `==` treated them as distinct, so the
354        // write/lookup path could miss an existing item or store a duplicate
355        // even though the compare/filter path was already canonical
356        // (bug-hunt 2026-07-01, DynamoDB write-path number-canon).
357        use crate::service::helpers::partiql::values_equal;
358        self.items.iter().position(|item| {
359            let hash_match =
360                values_equal(item.get(hash_key), key.get(hash_key)) && item.get(hash_key).is_some();
361            if !hash_match {
362                return false;
363            }
364            match range_key {
365                Some(rk) => values_equal(item.get(rk), key.get(rk)),
366                None => true,
367            }
368        })
369    }
370
371    /// Estimate item size in bytes (rough approximation).
372    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
373        let mut size: i64 = 0;
374        for (k, v) in item {
375            size += k.len() as i64;
376            size += Self::estimate_value_size(v);
377        }
378        size
379    }
380
381    fn estimate_value_size(v: &Value) -> i64 {
382        match v {
383            Value::Object(obj) => {
384                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
385                    s.len() as i64
386                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
387                    n.len() as i64
388                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
389                    1
390                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
391                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
392                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
393                    3 + m
394                        .iter()
395                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
396                        .sum::<i64>()
397                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
398                    ss.iter()
399                        .filter_map(|v| v.as_str())
400                        .map(|s| s.len() as i64)
401                        .sum()
402                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
403                    ns.iter()
404                        .filter_map(|v| v.as_str())
405                        .map(|s| s.len() as i64)
406                        .sum()
407                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
408                    // Base64-encoded binary
409                    (b.len() as i64 * 3) / 4
410                } else {
411                    v.to_string().len() as i64
412                }
413            }
414            _ => v.to_string().len() as i64,
415        }
416    }
417
418    /// Record a partition key access for contributor insights.
419    /// Only records if contributor insights is enabled.
420    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
421        if self.contributor_insights_status != "ENABLED" {
422            return;
423        }
424        let hash_key = self.hash_key_name().to_string();
425        if let Some(pk_value) = key.get(&hash_key) {
426            let key_str = pk_value.to_string();
427            *self
428                .contributor_insights_counters
429                .entry(key_str)
430                .or_insert(0) += 1;
431        }
432    }
433
434    /// Record a partition key access from a full item (extracts the key first).
435    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
436        if self.contributor_insights_status != "ENABLED" {
437            return;
438        }
439        let hash_key = self.hash_key_name().to_string();
440        if let Some(pk_value) = item.get(&hash_key) {
441            let key_str = pk_value.to_string();
442            *self
443                .contributor_insights_counters
444                .entry(key_str)
445                .or_insert(0) += 1;
446        }
447    }
448
449    /// Get top N contributors sorted by access count (descending).
450    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
451        let mut entries: Vec<(&str, u64)> = self
452            .contributor_insights_counters
453            .iter()
454            .map(|(k, &v)| (k.as_str(), v))
455            .collect();
456        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
457        entries.truncate(n);
458        entries
459    }
460
461    /// Recalculate item_count and size_bytes from the items vec.
462    pub fn recalculate_stats(&mut self) {
463        self.item_count = self.items.len() as i64;
464        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
465    }
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
469pub struct DynamoDbState {
470    pub account_id: String,
471    pub region: String,
472    pub tables: BTreeMap<String, DynamoTable>,
473    pub backups: BTreeMap<String, BackupDescription>,
474    pub global_tables: BTreeMap<String, GlobalTableDescription>,
475    pub exports: BTreeMap<String, ExportDescription>,
476    pub imports: BTreeMap<String, ImportDescription>,
477    /// DynamoDB Streams -> Lambda event-source-mapping checkpoints: the
478    /// last stream sequence number delivered for each mapping
479    /// (keyed by ESM uuid). Persisted so the streams poller resumes from
480    /// where it left off after a restart instead of re-seeding TRIM_HORIZON
481    /// and re-invoking the target Lambda with the whole retained backlog
482    /// (duplicate side effects). Mirrors `KinesisState.lambda_checkpoints`.
483    /// `#[serde(default)]` keeps older snapshots loadable.
484    #[serde(default)]
485    pub lambda_stream_checkpoints: BTreeMap<String, String>,
486}
487
488/// On-disk snapshot envelope. The payload is the full [`DynamoDbState`];
489/// `schema_version` lets us evolve the format without accidentally loading
490/// an incompatible dump on upgrade.
491#[derive(Debug, Clone, Serialize, Deserialize)]
492pub struct DynamoDbSnapshot {
493    pub schema_version: u32,
494    /// v2+: multi-account state.
495    #[serde(default)]
496    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
497    /// v1 compat: single-account state.
498    #[serde(default)]
499    pub state: Option<DynamoDbState>,
500}
501
502pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
503
504impl DynamoDbState {
505    pub fn new(account_id: &str, region: &str) -> Self {
506        Self {
507            account_id: account_id.to_string(),
508            region: region.to_string(),
509            tables: BTreeMap::new(),
510            backups: BTreeMap::new(),
511            global_tables: BTreeMap::new(),
512            exports: BTreeMap::new(),
513            imports: BTreeMap::new(),
514            lambda_stream_checkpoints: BTreeMap::new(),
515        }
516    }
517
518    pub fn reset(&mut self) {
519        self.tables.clear();
520        self.backups.clear();
521        self.global_tables.clear();
522        self.exports.clear();
523        self.imports.clear();
524        self.lambda_stream_checkpoints.clear();
525    }
526
527    /// Last stream sequence number delivered for the given DynamoDB
528    /// Streams -> Lambda event source mapping, or `None` if the mapping
529    /// has never delivered (so the poller seeds from StartingPosition).
530    pub fn lambda_stream_checkpoint(&self, mapping_uuid: &str) -> Option<String> {
531        self.lambda_stream_checkpoints.get(mapping_uuid).cloned()
532    }
533
534    /// Record the last stream sequence number delivered for a mapping. The
535    /// value rides along with the next DynamoDB snapshot save, exactly the
536    /// way Kinesis lambda checkpoints persist through their snapshot.
537    pub fn set_lambda_stream_checkpoint(&mut self, mapping_uuid: &str, sequence_number: String) {
538        self.lambda_stream_checkpoints
539            .insert(mapping_uuid.to_string(), sequence_number);
540    }
541}
542
543impl fakecloud_core::multi_account::AccountState for DynamoDbState {
544    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
545        Self::new(account_id, region)
546    }
547}
548
549pub type SharedDynamoDbState =
550    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555    use serde_json::json;
556
557    #[test]
558    fn attribute_type_and_value_valid() {
559        let v = json!({"S": "hi"});
560        let (ty, val) = attribute_type_and_value(&v).unwrap();
561        assert_eq!(ty, "S");
562        assert_eq!(val, &json!("hi"));
563    }
564
565    #[test]
566    fn attribute_type_and_value_empty_returns_none() {
567        let v = json!({});
568        assert!(attribute_type_and_value(&v).is_none());
569    }
570
571    #[test]
572    fn attribute_type_and_value_multiple_entries_returns_none() {
573        let v = json!({"S": "hi", "N": "1"});
574        assert!(attribute_type_and_value(&v).is_none());
575    }
576
577    #[test]
578    fn attribute_type_and_value_non_object_returns_none() {
579        let v = json!("not-object");
580        assert!(attribute_type_and_value(&v).is_none());
581    }
582
583    #[test]
584    fn account_state_trait_impl() {
585        use fakecloud_core::multi_account::AccountState;
586        let state = DynamoDbState::new_for_account("123", "us-east-1", "");
587        assert_eq!(state.account_id, "123");
588        assert_eq!(state.region, "us-east-1");
589    }
590
591    #[test]
592    fn new_and_reset() {
593        let state = DynamoDbState::new("123", "us-east-1");
594        assert!(state.tables.is_empty());
595    }
596
597    fn table_with_hash_key(hash: &str) -> DynamoTable {
598        DynamoTable {
599            name: "t".to_string(),
600            arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
601            table_id: "id".to_string(),
602            key_schema: vec![KeySchemaElement {
603                attribute_name: hash.to_string(),
604                key_type: "HASH".to_string(),
605            }],
606            attribute_definitions: vec![],
607            provisioned_throughput: ProvisionedThroughput {
608                read_capacity_units: 1,
609                write_capacity_units: 1,
610            },
611            items: Vec::new(),
612            gsi: Vec::new(),
613            lsi: Vec::new(),
614            tags: BTreeMap::new(),
615            created_at: Utc::now(),
616            status: "ACTIVE".to_string(),
617            item_count: 0,
618            size_bytes: 0,
619            billing_mode: "PROVISIONED".to_string(),
620            ttl_attribute: None,
621            ttl_enabled: false,
622            resource_policy: None,
623            pitr_enabled: false,
624            kinesis_destinations: Vec::new(),
625            contributor_insights_status: "DISABLED".to_string(),
626            contributor_insights_counters: BTreeMap::new(),
627            stream_enabled: false,
628            stream_view_type: None,
629            stream_arn: None,
630            stream_records: empty_stream_records(),
631            sse_type: None,
632            sse_kms_key_arn: None,
633            deletion_protection_enabled: false,
634            on_demand_throughput: None,
635            table_class: default_table_class(),
636        }
637    }
638
639    #[test]
640    fn hash_key_name_extracts_from_schema() {
641        let t = table_with_hash_key("pk");
642        assert_eq!(t.hash_key_name(), "pk");
643    }
644
645    #[test]
646    fn hash_key_name_empty_when_no_hash_schema() {
647        let mut t = table_with_hash_key("pk");
648        t.key_schema.clear();
649        assert_eq!(t.hash_key_name(), "");
650    }
651
652    #[test]
653    fn record_key_access_noop_when_disabled() {
654        let mut t = table_with_hash_key("pk");
655        let mut key = HashMap::new();
656        key.insert("pk".to_string(), json!({"S": "a"}));
657        t.record_key_access(&key);
658        assert!(t.contributor_insights_counters.is_empty());
659    }
660
661    #[test]
662    fn record_key_access_increments_when_enabled() {
663        let mut t = table_with_hash_key("pk");
664        t.contributor_insights_status = "ENABLED".to_string();
665        let mut key = HashMap::new();
666        key.insert("pk".to_string(), json!({"S": "a"}));
667        t.record_key_access(&key);
668        t.record_key_access(&key);
669        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
670    }
671
672    #[test]
673    fn record_item_access_uses_hash_key_from_item() {
674        let mut t = table_with_hash_key("pk");
675        t.contributor_insights_status = "ENABLED".to_string();
676        let mut item = HashMap::new();
677        item.insert("pk".to_string(), json!({"S": "user-1"}));
678        item.insert("other".to_string(), json!({"N": "42"}));
679        t.record_item_access(&item);
680        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
681    }
682
683    #[test]
684    fn find_item_index_canonicalizes_number_keys() {
685        // Write path stored `{"N":"1.0"}`; a lookup of `{"N":"1"}` must find it
686        // (same DynamoDB number) rather than miss/duplicate (bug-hunt 2026-07-01).
687        let mut t = table_with_hash_key("pk");
688        let mut item = HashMap::new();
689        item.insert("pk".to_string(), json!({"N": "1.0"}));
690        t.items.push(item);
691
692        let mut lookup = HashMap::new();
693        lookup.insert("pk".to_string(), json!({"N": "1"}));
694        assert_eq!(t.find_item_index(&lookup), Some(0));
695
696        // A different number still misses.
697        let mut other = HashMap::new();
698        other.insert("pk".to_string(), json!({"N": "2"}));
699        assert_eq!(t.find_item_index(&other), None);
700    }
701
702    #[test]
703    fn find_item_index_malformed_number_key_does_not_match_valid() {
704        // A malformed Number operand must not compare equal to a valid stored
705        // numeric key -- otherwise DeleteItem{"N":"abc"} could delete the wrong
706        // row (Cubic P1, 2026-07-01).
707        let mut t = table_with_hash_key("pk");
708        let mut item = HashMap::new();
709        item.insert("pk".to_string(), json!({"N": "5"}));
710        t.items.push(item);
711
712        let mut bad = HashMap::new();
713        bad.insert("pk".to_string(), json!({"N": "abc"}));
714        assert_eq!(t.find_item_index(&bad), None);
715    }
716
717    #[test]
718    fn top_contributors_returns_sorted() {
719        let mut t = table_with_hash_key("pk");
720        t.contributor_insights_counters.insert("a".to_string(), 3);
721        t.contributor_insights_counters.insert("b".to_string(), 10);
722        t.contributor_insights_counters.insert("c".to_string(), 1);
723        let top = t.top_contributors(2);
724        assert_eq!(top.len(), 2);
725        assert_eq!(top[0], ("b", 10));
726        assert_eq!(top[1], ("a", 3));
727    }
728
729    #[test]
730    fn recalculate_stats_matches_items() {
731        let mut t = table_with_hash_key("pk");
732        let mut item1 = HashMap::new();
733        item1.insert("pk".to_string(), json!({"S": "hello"}));
734        let mut item2 = HashMap::new();
735        item2.insert("pk".to_string(), json!({"N": "42"}));
736        item2.insert("flag".to_string(), json!({"BOOL": true}));
737        t.items.push(item1);
738        t.items.push(item2);
739        t.recalculate_stats();
740        assert_eq!(t.item_count, 2);
741        assert!(t.size_bytes > 0);
742    }
743
744    #[test]
745    fn estimate_value_size_covers_all_types() {
746        let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
747        assert_eq!(s, 3);
748        let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
749        assert_eq!(n, 2);
750        let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
751        assert_eq!(b, 1);
752        let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
753        assert_eq!(null, 1);
754        let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
755        assert_eq!(l, 6);
756        let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
757        assert_eq!(m, 7);
758        let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
759        assert_eq!(ss, 5);
760        let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
761        assert_eq!(ns, 5);
762        let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
763        assert_eq!(bin, 6);
764    }
765}