Skip to main content

fakecloud_dynamodb/
state.rs

1use chrono::{DateTime, Utc};
2use parking_lot::RwLock;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use std::collections::{BTreeMap, HashMap};
6use std::sync::Arc;
7
8fn empty_stream_records() -> Arc<RwLock<Vec<StreamRecord>>> {
9    Arc::new(RwLock::new(Vec::new()))
10}
11
12/// Serde for `Arc<RwLock<Vec<StreamRecord>>>`: persist the inner change records
13/// so a stream consumer's un-read records survive a snapshot restart
14/// (bug-audit 2026-05-28, 4.5). The field was `#[serde(skip)]`, so table data
15/// was preserved across restart but pending stream records silently vanished.
16mod stream_records_serde {
17    use super::{Arc, RwLock, StreamRecord};
18    use serde::{Deserialize, Deserializer, Serialize, Serializer};
19
20    pub fn serialize<S: Serializer>(
21        v: &Arc<RwLock<Vec<StreamRecord>>>,
22        s: S,
23    ) -> Result<S::Ok, S::Error> {
24        v.read().serialize(s)
25    }
26
27    pub fn deserialize<'de, D: Deserializer<'de>>(
28        d: D,
29    ) -> Result<Arc<RwLock<Vec<StreamRecord>>>, D::Error> {
30        let records = Vec::<StreamRecord>::deserialize(d)?;
31        // Raise the in-memory sequence-number floor above every persisted
32        // record so newly-minted numbers cannot collide with them after a
33        // restart, even if the wall-clock seed went backwards (4.4 / Cubic).
34        for r in &records {
35            crate::streams::observe_stream_sequence(&r.dynamodb.sequence_number);
36        }
37        Ok(Arc::new(RwLock::new(records)))
38    }
39}
40
41/// A single DynamoDB attribute value (tagged union matching the AWS wire format).
42/// AWS sends attribute values as `{"S": "hello"}`, `{"N": "42"}`, etc.
43pub type AttributeValue = Value;
44
45/// Extract the "typed" inner value for comparison purposes.
46/// Returns (type_tag, inner_value) e.g. ("S", "hello") or ("N", "42").
47pub fn attribute_type_and_value(av: &Value) -> Option<(&str, &Value)> {
48    let obj = av.as_object()?;
49    if obj.len() != 1 {
50        return None;
51    }
52    let (k, v) = obj.iter().next()?;
53    Some((k.as_str(), v))
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct KeySchemaElement {
58    pub attribute_name: String,
59    pub key_type: String, // HASH or RANGE
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct AttributeDefinition {
64    pub attribute_name: String,
65    pub attribute_type: String, // S, N, B
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct ProvisionedThroughput {
70    pub read_capacity_units: i64,
71    pub write_capacity_units: i64,
72}
73
74/// On-demand capacity caps for PAY_PER_REQUEST tables and GSIs. Real AWS
75/// accepts both fields independently; `-1` (the AWS sentinel for "no cap")
76/// is the default and is what `DescribeTable` returns when the caller never
77/// set a value — the Terraform provider asserts on that exact value.
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct OnDemandThroughput {
80    pub max_read_request_units: i64,
81    pub max_write_request_units: i64,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct GlobalSecondaryIndex {
86    pub index_name: String,
87    pub key_schema: Vec<KeySchemaElement>,
88    pub projection: Projection,
89    pub provisioned_throughput: Option<ProvisionedThroughput>,
90    pub on_demand_throughput: Option<OnDemandThroughput>,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct LocalSecondaryIndex {
95    pub index_name: String,
96    pub key_schema: Vec<KeySchemaElement>,
97    pub projection: Projection,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct Projection {
102    pub projection_type: String, // ALL, KEYS_ONLY, INCLUDE
103    pub non_key_attributes: Vec<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct DynamoTable {
108    pub name: String,
109    pub arn: String,
110    pub table_id: String,
111    pub key_schema: Vec<KeySchemaElement>,
112    pub attribute_definitions: Vec<AttributeDefinition>,
113    pub provisioned_throughput: ProvisionedThroughput,
114    pub items: Vec<HashMap<String, AttributeValue>>,
115    pub gsi: Vec<GlobalSecondaryIndex>,
116    pub lsi: Vec<LocalSecondaryIndex>,
117    pub tags: BTreeMap<String, String>,
118    pub created_at: DateTime<Utc>,
119    pub status: String,
120    pub item_count: i64,
121    pub size_bytes: i64,
122    pub billing_mode: String, // PROVISIONED or PAY_PER_REQUEST
123    pub ttl_attribute: Option<String>,
124    pub ttl_enabled: bool,
125    pub resource_policy: Option<String>,
126    /// PITR enabled
127    pub pitr_enabled: bool,
128    /// Kinesis streaming destinations: stream_arn -> status
129    pub kinesis_destinations: Vec<KinesisDestination>,
130    /// Contributor insights status
131    pub contributor_insights_status: String,
132    /// Contributor insights: partition key access counters (key_value_string -> count)
133    pub contributor_insights_counters: BTreeMap<String, u64>,
134    /// DynamoDB Streams configuration
135    pub stream_enabled: bool,
136    pub stream_view_type: Option<String>, // KEYS_ONLY, NEW_IMAGE, OLD_IMAGE, NEW_AND_OLD_IMAGES
137    pub stream_arn: Option<String>,
138    /// Stream records (retained for 24 hours). Not persisted: stream
139    /// records are ephemeral and would be garbage anyway across restarts.
140    #[serde(with = "stream_records_serde", default = "empty_stream_records")]
141    pub stream_records: Arc<RwLock<Vec<StreamRecord>>>,
142    /// Server-side encryption type: AES256 (owned) or KMS
143    pub sse_type: Option<String>,
144    /// KMS key ARN for SSE (only when sse_type is KMS)
145    pub sse_kms_key_arn: Option<String>,
146    /// Deletion protection: when true, DeleteTable is rejected with
147    /// `ResourceInUseException`. Defaults to false. Returned on every
148    /// `DescribeTable` and toggleable via `UpdateTable`.
149    pub deletion_protection_enabled: bool,
150    /// Table-level on-demand throughput caps. Only meaningful for
151    /// PAY_PER_REQUEST tables, but real AWS echoes the field on every
152    /// DescribeTable once set.
153    pub on_demand_throughput: Option<OnDemandThroughput>,
154    /// Storage class: STANDARD (default) or STANDARD_INFREQUENT_ACCESS.
155    /// Returned inside `TableClassSummary` on DescribeTable; set at
156    /// CreateTable and changed via UpdateTable.
157    #[serde(default = "default_table_class")]
158    pub table_class: String,
159}
160
161pub(crate) fn default_table_class() -> String {
162    "STANDARD".to_string()
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
166pub struct StreamRecord {
167    pub event_id: String,
168    pub event_name: String, // INSERT, MODIFY, REMOVE
169    pub event_version: String,
170    pub event_source: String,
171    pub aws_region: String,
172    pub dynamodb: DynamoDbStreamRecord,
173    pub event_source_arn: String,
174    pub timestamp: DateTime<Utc>,
175}
176
177#[derive(Debug, Clone, Serialize, Deserialize)]
178pub struct DynamoDbStreamRecord {
179    pub keys: HashMap<String, AttributeValue>,
180    pub new_image: Option<HashMap<String, AttributeValue>>,
181    pub old_image: Option<HashMap<String, AttributeValue>>,
182    pub sequence_number: String,
183    pub size_bytes: i64,
184    pub stream_view_type: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct KinesisDestination {
189    pub stream_arn: String,
190    pub destination_status: String,
191    pub approximate_creation_date_time_precision: String,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct BackupDescription {
196    pub backup_arn: String,
197    pub backup_name: String,
198    pub table_name: String,
199    pub table_arn: String,
200    pub backup_status: String,
201    pub backup_type: String,
202    pub backup_creation_date: DateTime<Utc>,
203    pub key_schema: Vec<KeySchemaElement>,
204    pub attribute_definitions: Vec<AttributeDefinition>,
205    pub provisioned_throughput: ProvisionedThroughput,
206    pub billing_mode: String,
207    pub item_count: i64,
208    pub size_bytes: i64,
209    /// Snapshot of the table items at backup creation time.
210    pub items: Vec<HashMap<String, AttributeValue>>,
211    /// Real DDB persists GSI/LSI/tags/TTL/SSE/Stream into the backup
212    /// payload so RestoreTableFromBackup brings the full table back
213    /// up. Older snapshots may not have these fields, so all default
214    /// to empty/false via serde.
215    #[serde(default)]
216    pub gsi: Vec<GlobalSecondaryIndex>,
217    #[serde(default)]
218    pub lsi: Vec<LocalSecondaryIndex>,
219    #[serde(default)]
220    pub tags: BTreeMap<String, String>,
221    #[serde(default)]
222    pub ttl_attribute: Option<String>,
223    #[serde(default)]
224    pub ttl_enabled: bool,
225    #[serde(default)]
226    pub sse_type: Option<String>,
227    #[serde(default)]
228    pub sse_kms_key_arn: Option<String>,
229    #[serde(default)]
230    pub stream_enabled: bool,
231    #[serde(default)]
232    pub stream_view_type: Option<String>,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct GlobalTableDescription {
237    pub global_table_name: String,
238    pub global_table_arn: String,
239    pub global_table_status: String,
240    pub creation_date: DateTime<Utc>,
241    pub replication_group: Vec<ReplicaDescription>,
242    /// Billing mode applied across all replicas via
243    /// `UpdateGlobalTableSettings` (`PROVISIONED` / `PAY_PER_REQUEST`).
244    /// Defaults to PROVISIONED to match real DynamoDB global-table v1.
245    #[serde(default = "default_global_billing_mode")]
246    pub billing_mode: String,
247    /// Global provisioned write capacity applied across all replicas via
248    /// `UpdateGlobalTableSettings`. `None` under PAY_PER_REQUEST.
249    #[serde(default)]
250    pub provisioned_write_capacity_units: Option<i64>,
251}
252
253fn default_global_billing_mode() -> String {
254    "PROVISIONED".to_string()
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ReplicaDescription {
259    pub region_name: String,
260    pub replica_status: String,
261    /// Per-replica provisioned-read-capacity autoscaling settings, as supplied
262    /// via `UpdateTableReplicaAutoScaling`. Round-tripped through
263    /// `DescribeTableReplicaAutoScaling` as `AutoScalingSettingsDescription`.
264    #[serde(default)]
265    pub read_capacity_auto_scaling: Option<serde_json::Value>,
266    /// Per-replica provisioned-write-capacity autoscaling settings.
267    #[serde(default)]
268    pub write_capacity_auto_scaling: Option<serde_json::Value>,
269    /// Per-replica provisioned read capacity supplied via
270    /// `UpdateGlobalTableSettings` ReplicaSettingsUpdate.
271    #[serde(default)]
272    pub read_capacity_units: Option<i64>,
273}
274
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct ExportDescription {
277    pub export_arn: String,
278    pub export_status: String,
279    pub table_arn: String,
280    pub s3_bucket: String,
281    pub s3_prefix: Option<String>,
282    pub export_format: String,
283    pub start_time: DateTime<Utc>,
284    pub end_time: DateTime<Utc>,
285    pub export_time: DateTime<Utc>,
286    pub item_count: i64,
287    pub billed_size_bytes: i64,
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct ImportDescription {
292    pub import_arn: String,
293    pub import_status: String,
294    pub table_arn: String,
295    pub table_name: String,
296    pub s3_bucket_source: String,
297    pub input_format: String,
298    pub start_time: DateTime<Utc>,
299    pub end_time: DateTime<Utc>,
300    pub processed_item_count: i64,
301    pub processed_size_bytes: i64,
302}
303
304impl DynamoTable {
305    /// Get the hash key attribute name from the key schema.
306    pub fn hash_key_name(&self) -> &str {
307        self.key_schema
308            .iter()
309            .find(|k| k.key_type == "HASH")
310            .map(|k| k.attribute_name.as_str())
311            .unwrap_or("")
312    }
313
314    /// Get the range key attribute name from the key schema (if any).
315    pub fn range_key_name(&self) -> Option<&str> {
316        self.key_schema
317            .iter()
318            .find(|k| k.key_type == "RANGE")
319            .map(|k| k.attribute_name.as_str())
320    }
321
322    /// Find an item index by its primary key.
323    pub fn find_item_index(&self, key: &HashMap<String, AttributeValue>) -> Option<usize> {
324        let hash_key = self.hash_key_name();
325        let range_key = self.range_key_name();
326
327        self.items.iter().position(|item| {
328            let hash_match = match (item.get(hash_key), key.get(hash_key)) {
329                (Some(a), Some(b)) => a == b,
330                _ => false,
331            };
332            if !hash_match {
333                return false;
334            }
335            match range_key {
336                Some(rk) => match (item.get(rk), key.get(rk)) {
337                    (Some(a), Some(b)) => a == b,
338                    (None, None) => true,
339                    _ => false,
340                },
341                None => true,
342            }
343        })
344    }
345
346    /// Estimate item size in bytes (rough approximation).
347    fn estimate_item_size(item: &HashMap<String, AttributeValue>) -> i64 {
348        let mut size: i64 = 0;
349        for (k, v) in item {
350            size += k.len() as i64;
351            size += Self::estimate_value_size(v);
352        }
353        size
354    }
355
356    fn estimate_value_size(v: &Value) -> i64 {
357        match v {
358            Value::Object(obj) => {
359                if let Some(s) = obj.get("S").and_then(|v| v.as_str()) {
360                    s.len() as i64
361                } else if let Some(n) = obj.get("N").and_then(|v| v.as_str()) {
362                    n.len() as i64
363                } else if obj.contains_key("BOOL") || obj.contains_key("NULL") {
364                    1
365                } else if let Some(l) = obj.get("L").and_then(|v| v.as_array()) {
366                    3 + l.iter().map(Self::estimate_value_size).sum::<i64>()
367                } else if let Some(m) = obj.get("M").and_then(|v| v.as_object()) {
368                    3 + m
369                        .iter()
370                        .map(|(k, v)| k.len() as i64 + Self::estimate_value_size(v))
371                        .sum::<i64>()
372                } else if let Some(ss) = obj.get("SS").and_then(|v| v.as_array()) {
373                    ss.iter()
374                        .filter_map(|v| v.as_str())
375                        .map(|s| s.len() as i64)
376                        .sum()
377                } else if let Some(ns) = obj.get("NS").and_then(|v| v.as_array()) {
378                    ns.iter()
379                        .filter_map(|v| v.as_str())
380                        .map(|s| s.len() as i64)
381                        .sum()
382                } else if let Some(b) = obj.get("B").and_then(|v| v.as_str()) {
383                    // Base64-encoded binary
384                    (b.len() as i64 * 3) / 4
385                } else {
386                    v.to_string().len() as i64
387                }
388            }
389            _ => v.to_string().len() as i64,
390        }
391    }
392
393    /// Record a partition key access for contributor insights.
394    /// Only records if contributor insights is enabled.
395    pub fn record_key_access(&mut self, key: &HashMap<String, AttributeValue>) {
396        if self.contributor_insights_status != "ENABLED" {
397            return;
398        }
399        let hash_key = self.hash_key_name().to_string();
400        if let Some(pk_value) = key.get(&hash_key) {
401            let key_str = pk_value.to_string();
402            *self
403                .contributor_insights_counters
404                .entry(key_str)
405                .or_insert(0) += 1;
406        }
407    }
408
409    /// Record a partition key access from a full item (extracts the key first).
410    pub fn record_item_access(&mut self, item: &HashMap<String, AttributeValue>) {
411        if self.contributor_insights_status != "ENABLED" {
412            return;
413        }
414        let hash_key = self.hash_key_name().to_string();
415        if let Some(pk_value) = item.get(&hash_key) {
416            let key_str = pk_value.to_string();
417            *self
418                .contributor_insights_counters
419                .entry(key_str)
420                .or_insert(0) += 1;
421        }
422    }
423
424    /// Get top N contributors sorted by access count (descending).
425    pub fn top_contributors(&self, n: usize) -> Vec<(&str, u64)> {
426        let mut entries: Vec<(&str, u64)> = self
427            .contributor_insights_counters
428            .iter()
429            .map(|(k, &v)| (k.as_str(), v))
430            .collect();
431        entries.sort_by_key(|e| std::cmp::Reverse(e.1));
432        entries.truncate(n);
433        entries
434    }
435
436    /// Recalculate item_count and size_bytes from the items vec.
437    pub fn recalculate_stats(&mut self) {
438        self.item_count = self.items.len() as i64;
439        self.size_bytes = self.items.iter().map(Self::estimate_item_size).sum::<i64>();
440    }
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize)]
444pub struct DynamoDbState {
445    pub account_id: String,
446    pub region: String,
447    pub tables: BTreeMap<String, DynamoTable>,
448    pub backups: BTreeMap<String, BackupDescription>,
449    pub global_tables: BTreeMap<String, GlobalTableDescription>,
450    pub exports: BTreeMap<String, ExportDescription>,
451    pub imports: BTreeMap<String, ImportDescription>,
452}
453
454/// On-disk snapshot envelope. The payload is the full [`DynamoDbState`];
455/// `schema_version` lets us evolve the format without accidentally loading
456/// an incompatible dump on upgrade.
457#[derive(Debug, Clone, Serialize, Deserialize)]
458pub struct DynamoDbSnapshot {
459    pub schema_version: u32,
460    /// v2+: multi-account state.
461    #[serde(default)]
462    pub accounts: Option<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>,
463    /// v1 compat: single-account state.
464    #[serde(default)]
465    pub state: Option<DynamoDbState>,
466}
467
468pub const DYNAMODB_SNAPSHOT_SCHEMA_VERSION: u32 = 2;
469
470impl DynamoDbState {
471    pub fn new(account_id: &str, region: &str) -> Self {
472        Self {
473            account_id: account_id.to_string(),
474            region: region.to_string(),
475            tables: BTreeMap::new(),
476            backups: BTreeMap::new(),
477            global_tables: BTreeMap::new(),
478            exports: BTreeMap::new(),
479            imports: BTreeMap::new(),
480        }
481    }
482
483    pub fn reset(&mut self) {
484        self.tables.clear();
485        self.backups.clear();
486        self.global_tables.clear();
487        self.exports.clear();
488        self.imports.clear();
489    }
490}
491
492impl fakecloud_core::multi_account::AccountState for DynamoDbState {
493    fn new_for_account(account_id: &str, region: &str, _endpoint: &str) -> Self {
494        Self::new(account_id, region)
495    }
496}
497
498pub type SharedDynamoDbState =
499    Arc<RwLock<fakecloud_core::multi_account::MultiAccountState<DynamoDbState>>>;
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use serde_json::json;
505
506    #[test]
507    fn attribute_type_and_value_valid() {
508        let v = json!({"S": "hi"});
509        let (ty, val) = attribute_type_and_value(&v).unwrap();
510        assert_eq!(ty, "S");
511        assert_eq!(val, &json!("hi"));
512    }
513
514    #[test]
515    fn attribute_type_and_value_empty_returns_none() {
516        let v = json!({});
517        assert!(attribute_type_and_value(&v).is_none());
518    }
519
520    #[test]
521    fn attribute_type_and_value_multiple_entries_returns_none() {
522        let v = json!({"S": "hi", "N": "1"});
523        assert!(attribute_type_and_value(&v).is_none());
524    }
525
526    #[test]
527    fn attribute_type_and_value_non_object_returns_none() {
528        let v = json!("not-object");
529        assert!(attribute_type_and_value(&v).is_none());
530    }
531
532    #[test]
533    fn account_state_trait_impl() {
534        use fakecloud_core::multi_account::AccountState;
535        let state = DynamoDbState::new_for_account("123", "us-east-1", "");
536        assert_eq!(state.account_id, "123");
537        assert_eq!(state.region, "us-east-1");
538    }
539
540    #[test]
541    fn new_and_reset() {
542        let state = DynamoDbState::new("123", "us-east-1");
543        assert!(state.tables.is_empty());
544    }
545
546    fn table_with_hash_key(hash: &str) -> DynamoTable {
547        DynamoTable {
548            name: "t".to_string(),
549            arn: "arn:aws:dynamodb:us-east-1:123:table/t".to_string(),
550            table_id: "id".to_string(),
551            key_schema: vec![KeySchemaElement {
552                attribute_name: hash.to_string(),
553                key_type: "HASH".to_string(),
554            }],
555            attribute_definitions: vec![],
556            provisioned_throughput: ProvisionedThroughput {
557                read_capacity_units: 1,
558                write_capacity_units: 1,
559            },
560            items: Vec::new(),
561            gsi: Vec::new(),
562            lsi: Vec::new(),
563            tags: BTreeMap::new(),
564            created_at: Utc::now(),
565            status: "ACTIVE".to_string(),
566            item_count: 0,
567            size_bytes: 0,
568            billing_mode: "PROVISIONED".to_string(),
569            ttl_attribute: None,
570            ttl_enabled: false,
571            resource_policy: None,
572            pitr_enabled: false,
573            kinesis_destinations: Vec::new(),
574            contributor_insights_status: "DISABLED".to_string(),
575            contributor_insights_counters: BTreeMap::new(),
576            stream_enabled: false,
577            stream_view_type: None,
578            stream_arn: None,
579            stream_records: empty_stream_records(),
580            sse_type: None,
581            sse_kms_key_arn: None,
582            deletion_protection_enabled: false,
583            on_demand_throughput: None,
584            table_class: default_table_class(),
585        }
586    }
587
588    #[test]
589    fn hash_key_name_extracts_from_schema() {
590        let t = table_with_hash_key("pk");
591        assert_eq!(t.hash_key_name(), "pk");
592    }
593
594    #[test]
595    fn hash_key_name_empty_when_no_hash_schema() {
596        let mut t = table_with_hash_key("pk");
597        t.key_schema.clear();
598        assert_eq!(t.hash_key_name(), "");
599    }
600
601    #[test]
602    fn record_key_access_noop_when_disabled() {
603        let mut t = table_with_hash_key("pk");
604        let mut key = HashMap::new();
605        key.insert("pk".to_string(), json!({"S": "a"}));
606        t.record_key_access(&key);
607        assert!(t.contributor_insights_counters.is_empty());
608    }
609
610    #[test]
611    fn record_key_access_increments_when_enabled() {
612        let mut t = table_with_hash_key("pk");
613        t.contributor_insights_status = "ENABLED".to_string();
614        let mut key = HashMap::new();
615        key.insert("pk".to_string(), json!({"S": "a"}));
616        t.record_key_access(&key);
617        t.record_key_access(&key);
618        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 2);
619    }
620
621    #[test]
622    fn record_item_access_uses_hash_key_from_item() {
623        let mut t = table_with_hash_key("pk");
624        t.contributor_insights_status = "ENABLED".to_string();
625        let mut item = HashMap::new();
626        item.insert("pk".to_string(), json!({"S": "user-1"}));
627        item.insert("other".to_string(), json!({"N": "42"}));
628        t.record_item_access(&item);
629        assert_eq!(t.contributor_insights_counters.values().sum::<u64>(), 1);
630    }
631
632    #[test]
633    fn top_contributors_returns_sorted() {
634        let mut t = table_with_hash_key("pk");
635        t.contributor_insights_counters.insert("a".to_string(), 3);
636        t.contributor_insights_counters.insert("b".to_string(), 10);
637        t.contributor_insights_counters.insert("c".to_string(), 1);
638        let top = t.top_contributors(2);
639        assert_eq!(top.len(), 2);
640        assert_eq!(top[0], ("b", 10));
641        assert_eq!(top[1], ("a", 3));
642    }
643
644    #[test]
645    fn recalculate_stats_matches_items() {
646        let mut t = table_with_hash_key("pk");
647        let mut item1 = HashMap::new();
648        item1.insert("pk".to_string(), json!({"S": "hello"}));
649        let mut item2 = HashMap::new();
650        item2.insert("pk".to_string(), json!({"N": "42"}));
651        item2.insert("flag".to_string(), json!({"BOOL": true}));
652        t.items.push(item1);
653        t.items.push(item2);
654        t.recalculate_stats();
655        assert_eq!(t.item_count, 2);
656        assert!(t.size_bytes > 0);
657    }
658
659    #[test]
660    fn estimate_value_size_covers_all_types() {
661        let s = DynamoTable::estimate_value_size(&json!({"S": "abc"}));
662        assert_eq!(s, 3);
663        let n = DynamoTable::estimate_value_size(&json!({"N": "42"}));
664        assert_eq!(n, 2);
665        let b = DynamoTable::estimate_value_size(&json!({"BOOL": true}));
666        assert_eq!(b, 1);
667        let null = DynamoTable::estimate_value_size(&json!({"NULL": true}));
668        assert_eq!(null, 1);
669        let l = DynamoTable::estimate_value_size(&json!({"L": [{"S": "x"}, {"S": "yy"}]}));
670        assert_eq!(l, 6);
671        let m = DynamoTable::estimate_value_size(&json!({"M": {"key": {"S": "v"}}}));
672        assert_eq!(m, 7);
673        let ss = DynamoTable::estimate_value_size(&json!({"SS": ["ab", "cde"]}));
674        assert_eq!(ss, 5);
675        let ns = DynamoTable::estimate_value_size(&json!({"NS": ["12", "345"]}));
676        assert_eq!(ns, 5);
677        let bin = DynamoTable::estimate_value_size(&json!({"B": "AAAAAAAA"}));
678        assert_eq!(bin, 6);
679    }
680}