Skip to main content

awsim_qldb/
lib.rs

1//! Amazon QLDB emulator. Stores ledger metadata only — the journal/ION query
2//! data plane is not implemented.
3
4use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use async_trait::async_trait;
9use awsim_core::{
10    AccountRegionStore, AwsError, Protocol, RequestContext, RouteDefinition, ServiceHandler,
11    clamp_max_results_strict, paginate,
12};
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use tracing::debug;
17
18#[derive(Debug, Default)]
19pub struct QldbState {
20    pub ledgers: DashMap<String, Ledger>,
21    /// JournalKinesisStream records keyed by `StreamId` (UUID).
22    pub kinesis_streams: DashMap<String, JournalKinesisStream>,
23    /// JournalS3Export records keyed by `ExportId` (UUID).
24    pub s3_exports: DashMap<String, JournalS3Export>,
25    /// Per-ARN tag store for `stream` and `export` resources. Ledger
26    /// tags continue to live on [`Ledger::tags`] for backwards
27    /// compatibility with the original tag map.
28    pub resource_tags: DashMap<String, HashMap<String, String>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct JournalS3Export {
33    pub export_id: String,
34    pub ledger_name: String,
35    pub role_arn: String,
36    pub inclusive_start_time: f64,
37    pub exclusive_end_time: f64,
38    pub output_format: String,
39    pub bucket: String,
40    pub prefix: String,
41    pub object_encryption_type: String,
42    pub kms_key_arn: Option<String>,
43    pub status: String,
44    pub creation_time: f64,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct JournalKinesisStream {
49    pub stream_id: String,
50    pub ledger_name: String,
51    pub stream_name: String,
52    pub role_arn: String,
53    pub kinesis_stream_arn: String,
54    /// AWS accepts epoch seconds for the inclusive lower bound. The
55    /// emulator stores it verbatim and replays it on Describe.
56    pub inclusive_start_time: f64,
57    pub exclusive_end_time: Option<f64>,
58    pub aggregation_enabled: bool,
59    pub creation_time: f64,
60    pub status: String,
61    pub error_cause: Option<String>,
62    #[serde(default)]
63    pub tags: HashMap<String, String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct Ledger {
68    pub name: String,
69    pub arn: String,
70    pub state: String,
71    pub creation_date_time: f64,
72    pub permissions_mode: String,
73    pub deletion_protection: bool,
74    pub kms_key_arn: Option<String>,
75    pub tags: HashMap<String, String>,
76    /// `EncryptionStatus` field of the documented
77    /// `EncryptionDescription` block. Persisted on the model so a
78    /// future tick driver can flip it to `KMS_KEY_INACCESSIBLE` /
79    /// `UPDATING` without rebuilding the structure on every read.
80    #[serde(default = "default_encryption_status")]
81    pub encryption_status: String,
82    /// Epoch seconds when the KMS key first became inaccessible.
83    /// `None` while the key is reachable; surfaced as JSON null on
84    /// the API response.
85    #[serde(default)]
86    pub inaccessible_kms_key_date_time: Option<f64>,
87}
88
89fn default_encryption_status() -> String {
90    "ENABLED".to_string()
91}
92
93/// AWS QLDB default quota: 5 ledgers per account per region. Above
94/// this, `CreateLedger` returns `LimitExceededException`. The
95/// emulator hardcodes the AWS default; an account-level config
96/// surface can override this later if needed.
97const LEDGER_QUOTA_PER_REGION: usize = 5;
98
99#[derive(Debug, Serialize, Deserialize)]
100pub struct QldbSnapshot {
101    pub ledgers: Vec<Ledger>,
102    #[serde(default)]
103    pub kinesis_streams: Vec<JournalKinesisStream>,
104    #[serde(default)]
105    pub s3_exports: Vec<JournalS3Export>,
106    #[serde(default)]
107    pub resource_tags: HashMap<String, HashMap<String, String>>,
108}
109
110impl QldbState {
111    pub fn to_snapshot(&self) -> QldbSnapshot {
112        QldbSnapshot {
113            ledgers: self.ledgers.iter().map(|e| e.value().clone()).collect(),
114            kinesis_streams: self
115                .kinesis_streams
116                .iter()
117                .map(|e| e.value().clone())
118                .collect(),
119            s3_exports: self.s3_exports.iter().map(|e| e.value().clone()).collect(),
120            resource_tags: self
121                .resource_tags
122                .iter()
123                .map(|e| (e.key().clone(), e.value().clone()))
124                .collect(),
125        }
126    }
127    pub fn restore_from_snapshot(&self, snap: QldbSnapshot) {
128        self.ledgers.clear();
129        for l in snap.ledgers {
130            self.ledgers.insert(l.name.clone(), l);
131        }
132        self.kinesis_streams.clear();
133        for s in snap.kinesis_streams {
134            self.kinesis_streams.insert(s.stream_id.clone(), s);
135        }
136        self.s3_exports.clear();
137        for ex in snap.s3_exports {
138            self.s3_exports.insert(ex.export_id.clone(), ex);
139        }
140        self.resource_tags.clear();
141        for (arn, tags) in snap.resource_tags {
142            self.resource_tags.insert(arn, tags);
143        }
144    }
145}
146
147fn now() -> f64 {
148    SystemTime::now()
149        .duration_since(UNIX_EPOCH)
150        .unwrap_or_default()
151        .as_secs_f64()
152}
153
154/// Constructs a QLDB `CapacityExceededException`. Reserved for future
155/// throttling enforcement; surfacing it now keeps the wire-level
156/// catalogue stable for clients that already retry on the code.
157pub fn capacity_exceeded(message: impl Into<String>) -> AwsError {
158    AwsError::too_many_requests("CapacityExceededException", message)
159}
160
161/// Constructs a QLDB `RateExceededException`. Paired with
162/// [`capacity_exceeded`] in the documented throttling catalogue.
163pub fn rate_exceeded(message: impl Into<String>) -> AwsError {
164    AwsError::too_many_requests("RateExceededException", message)
165}
166
167fn require_str<'a>(input: &'a Value, key: &str) -> Result<&'a str, AwsError> {
168    input
169        .get(key)
170        .and_then(|v| v.as_str())
171        .ok_or_else(|| AwsError::bad_request("BadRequestException", format!("{key} is required")))
172}
173
174fn ledger_arn(ctx: &RequestContext, name: &str) -> String {
175    format!(
176        "arn:aws:qldb:{}:{}:ledger/{}",
177        ctx.region, ctx.account_id, name
178    )
179}
180
181fn stream_arn(ctx: &RequestContext, ledger: &str, stream_id: &str) -> String {
182    format!(
183        "arn:aws:qldb:{}:{}:stream/{ledger}/{stream_id}",
184        ctx.region, ctx.account_id,
185    )
186}
187
188fn stream_to_value(s: &JournalKinesisStream, ctx: &RequestContext) -> Value {
189    let exclusive = match s.exclusive_end_time {
190        Some(t) => json!(t),
191        None => Value::Null,
192    };
193    json!({
194        "LedgerName": s.ledger_name,
195        "CreationTime": s.creation_time,
196        "InclusiveStartTime": s.inclusive_start_time,
197        "ExclusiveEndTime": exclusive,
198        "RoleArn": s.role_arn,
199        "StreamId": s.stream_id,
200        "Arn": stream_arn(ctx, &s.ledger_name, &s.stream_id),
201        "Status": s.status,
202        "KinesisConfiguration": {
203            "StreamArn": s.kinesis_stream_arn,
204            "AggregationEnabled": s.aggregation_enabled,
205        },
206        "ErrorCause": s.error_cause,
207        "StreamName": s.stream_name,
208    })
209}
210
211/// Identifies a QLDB resource referenced by ARN. Only the kind and
212/// the trailing identifier are needed to dispatch tag operations.
213enum ResourceRef {
214    Ledger(String),
215    Stream(String),
216    Export(String),
217}
218
219fn parse_resource_arn(arn: &str) -> Result<ResourceRef, AwsError> {
220    let resource = arn.splitn(6, ':').nth(5).ok_or_else(|| {
221        AwsError::bad_request(
222            "BadRequestException",
223            format!("ResourceArn `{arn}` is malformed."),
224        )
225    })?;
226    let (kind, tail) = resource.split_once('/').ok_or_else(|| {
227        AwsError::bad_request(
228            "BadRequestException",
229            format!("ResourceArn `{arn}` is malformed."),
230        )
231    })?;
232    match kind {
233        "ledger" => Ok(ResourceRef::Ledger(tail.to_string())),
234        "stream" => {
235            let stream_id = tail.rsplit('/').next().unwrap_or(tail);
236            Ok(ResourceRef::Stream(stream_id.to_string()))
237        }
238        "export" => {
239            let export_id = tail.rsplit('/').next().unwrap_or(tail);
240            Ok(ResourceRef::Export(export_id.to_string()))
241        }
242        _ => Err(AwsError::bad_request(
243            "BadRequestException",
244            format!("Resource kind `{kind}` is not a QLDB resource type."),
245        )),
246    }
247}
248
249fn apply_resource_tags(
250    state: &QldbState,
251    arn: &str,
252    new_tags: HashMap<String, String>,
253) -> Result<(), AwsError> {
254    match parse_resource_arn(arn)? {
255        ResourceRef::Ledger(name) => {
256            let mut l = state.ledgers.get_mut(&name).ok_or_else(|| {
257                AwsError::not_found(
258                    "ResourceNotFoundException",
259                    format!("Ledger {name} not found"),
260                )
261            })?;
262            for (k, v) in new_tags {
263                l.tags.insert(k, v);
264            }
265        }
266        ResourceRef::Stream(id) => {
267            if !state.kinesis_streams.contains_key(&id) {
268                return Err(AwsError::not_found(
269                    "ResourceNotFoundException",
270                    format!("Stream {id} not found"),
271                ));
272            }
273            let mut entry = state.resource_tags.entry(arn.to_string()).or_default();
274            for (k, v) in new_tags {
275                entry.insert(k, v);
276            }
277        }
278        ResourceRef::Export(id) => {
279            if !state.s3_exports.contains_key(&id) {
280                return Err(AwsError::not_found(
281                    "ResourceNotFoundException",
282                    format!("Export {id} not found"),
283                ));
284            }
285            let mut entry = state.resource_tags.entry(arn.to_string()).or_default();
286            for (k, v) in new_tags {
287                entry.insert(k, v);
288            }
289        }
290    }
291    Ok(())
292}
293
294fn remove_resource_tags(state: &QldbState, arn: &str, keys: &[String]) -> Result<(), AwsError> {
295    match parse_resource_arn(arn)? {
296        ResourceRef::Ledger(name) => {
297            let mut l = state.ledgers.get_mut(&name).ok_or_else(|| {
298                AwsError::not_found(
299                    "ResourceNotFoundException",
300                    format!("Ledger {name} not found"),
301                )
302            })?;
303            for k in keys {
304                l.tags.remove(k);
305            }
306        }
307        ResourceRef::Stream(id) => {
308            if !state.kinesis_streams.contains_key(&id) {
309                return Err(AwsError::not_found(
310                    "ResourceNotFoundException",
311                    format!("Stream {id} not found"),
312                ));
313            }
314            if let Some(mut entry) = state.resource_tags.get_mut(arn) {
315                for k in keys {
316                    entry.remove(k);
317                }
318            }
319        }
320        ResourceRef::Export(id) => {
321            if !state.s3_exports.contains_key(&id) {
322                return Err(AwsError::not_found(
323                    "ResourceNotFoundException",
324                    format!("Export {id} not found"),
325                ));
326            }
327            if let Some(mut entry) = state.resource_tags.get_mut(arn) {
328                for k in keys {
329                    entry.remove(k);
330                }
331            }
332        }
333    }
334    Ok(())
335}
336
337fn read_resource_tags(state: &QldbState, arn: &str) -> Result<HashMap<String, String>, AwsError> {
338    match parse_resource_arn(arn)? {
339        ResourceRef::Ledger(name) => {
340            state
341                .ledgers
342                .get(&name)
343                .map(|l| l.tags.clone())
344                .ok_or_else(|| {
345                    AwsError::not_found(
346                        "ResourceNotFoundException",
347                        format!("Ledger {name} not found"),
348                    )
349                })
350        }
351        ResourceRef::Stream(id) => {
352            if !state.kinesis_streams.contains_key(&id) {
353                return Err(AwsError::not_found(
354                    "ResourceNotFoundException",
355                    format!("Stream {id} not found"),
356                ));
357            }
358            Ok(state
359                .resource_tags
360                .get(arn)
361                .map(|e| e.value().clone())
362                .unwrap_or_default())
363        }
364        ResourceRef::Export(id) => {
365            if !state.s3_exports.contains_key(&id) {
366                return Err(AwsError::not_found(
367                    "ResourceNotFoundException",
368                    format!("Export {id} not found"),
369                ));
370            }
371            Ok(state
372                .resource_tags
373                .get(arn)
374                .map(|e| e.value().clone())
375                .unwrap_or_default())
376        }
377    }
378}
379
380fn export_to_value(ex: &JournalS3Export) -> Value {
381    let mut encryption = json!({
382        "ObjectEncryptionType": ex.object_encryption_type,
383    });
384    if let Some(arn) = &ex.kms_key_arn {
385        encryption["KmsKeyArn"] = json!(arn);
386    }
387    json!({
388        "LedgerName": ex.ledger_name,
389        "ExportId": ex.export_id,
390        "ExportCreationTime": ex.creation_time,
391        "Status": ex.status,
392        "InclusiveStartTime": ex.inclusive_start_time,
393        "ExclusiveEndTime": ex.exclusive_end_time,
394        "S3ExportConfiguration": {
395            "Bucket": ex.bucket,
396            "Prefix": ex.prefix,
397            "EncryptionConfiguration": encryption,
398        },
399        "RoleArn": ex.role_arn,
400        "OutputFormat": ex.output_format,
401    })
402}
403
404fn ledger_to_value(l: &Ledger) -> Value {
405    // `EncryptionDescription` documents three fields. The emulator
406    // never simulates KMS key inaccessibility on its own, but the
407    // status and inaccessible-date are persisted on the model so a
408    // future tick driver can mutate them without changing the wire
409    // shape.
410    let inaccessible = match l.inaccessible_kms_key_date_time {
411        Some(t) => json!(t),
412        None => Value::Null,
413    };
414    json!({
415        "Name": l.name,
416        "Arn": l.arn,
417        "State": l.state,
418        "CreationDateTime": l.creation_date_time,
419        "PermissionsMode": l.permissions_mode,
420        "DeletionProtection": l.deletion_protection,
421        "KmsKeyArn": l.kms_key_arn,
422        "EncryptionDescription": {
423            "KmsKeyArn": l.kms_key_arn,
424            "EncryptionStatus": l.encryption_status,
425            "InaccessibleKmsKeyDateTime": inaccessible,
426        },
427    })
428}
429
430pub struct QldbService {
431    store: AccountRegionStore<QldbState>,
432}
433
434impl QldbService {
435    pub fn new() -> Self {
436        Self {
437            store: AccountRegionStore::new(),
438        }
439    }
440
441    pub fn store(&self) -> AccountRegionStore<QldbState> {
442        self.store.clone()
443    }
444
445    fn get_state(&self, ctx: &RequestContext) -> Arc<QldbState> {
446        self.store.get(&ctx.account_id, &ctx.region)
447    }
448}
449
450impl Default for QldbService {
451    fn default() -> Self {
452        Self::new()
453    }
454}
455
456#[async_trait]
457impl ServiceHandler for QldbService {
458    fn service_name(&self) -> &str {
459        "qldb"
460    }
461
462    fn signing_name(&self) -> &str {
463        "qldb"
464    }
465
466    fn protocol(&self) -> Protocol {
467        Protocol::RestJson1
468    }
469
470    fn routes(&self) -> Vec<RouteDefinition> {
471        vec![
472            RouteDefinition {
473                method: "POST",
474                path_pattern: "/ledgers",
475                operation: "CreateLedger",
476                required_query_param: None,
477            },
478            RouteDefinition {
479                method: "GET",
480                path_pattern: "/ledgers",
481                operation: "ListLedgers",
482                required_query_param: None,
483            },
484            RouteDefinition {
485                method: "GET",
486                path_pattern: "/ledgers/{name}",
487                operation: "DescribeLedger",
488                required_query_param: None,
489            },
490            RouteDefinition {
491                method: "PATCH",
492                path_pattern: "/ledgers/{name}",
493                operation: "UpdateLedger",
494                required_query_param: None,
495            },
496            RouteDefinition {
497                method: "DELETE",
498                path_pattern: "/ledgers/{name}",
499                operation: "DeleteLedger",
500                required_query_param: None,
501            },
502            RouteDefinition {
503                method: "PATCH",
504                path_pattern: "/ledgers/{name}/permissions-mode",
505                operation: "UpdateLedgerPermissionsMode",
506                required_query_param: None,
507            },
508            RouteDefinition {
509                method: "POST",
510                path_pattern: "/tags/{resourceArn}",
511                operation: "TagResource",
512                required_query_param: None,
513            },
514            RouteDefinition {
515                method: "DELETE",
516                path_pattern: "/tags/{resourceArn}",
517                operation: "UntagResource",
518                required_query_param: None,
519            },
520            RouteDefinition {
521                method: "GET",
522                path_pattern: "/tags/{resourceArn}",
523                operation: "ListTagsForResource",
524                required_query_param: None,
525            },
526        ]
527    }
528
529    async fn handle(
530        &self,
531        operation: &str,
532        input: Value,
533        ctx: &RequestContext,
534    ) -> Result<Value, AwsError> {
535        debug!(operation, "QLDB request");
536        let state = self.get_state(ctx);
537        match operation {
538            "CreateLedger" => {
539                let name = require_str(&input, "Name")?.to_string();
540                if state.ledgers.contains_key(&name) {
541                    return Err(AwsError::conflict(
542                        "ResourceAlreadyExistsException",
543                        format!("Ledger {name} already exists"),
544                    ));
545                }
546                if state.ledgers.len() >= LEDGER_QUOTA_PER_REGION {
547                    return Err(AwsError::bad_request(
548                        "LimitExceededException",
549                        format!(
550                            "Account already has {LEDGER_QUOTA_PER_REGION} ledgers in this region.",
551                        ),
552                    ));
553                }
554                let tags: HashMap<String, String> = input
555                    .get("Tags")
556                    .and_then(|v| v.as_object())
557                    .map(|o| {
558                        o.iter()
559                            .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
560                            .collect()
561                    })
562                    .unwrap_or_default();
563                let permissions_mode = require_str(&input, "PermissionsMode")?.to_string();
564                if !matches!(permissions_mode.as_str(), "ALLOW_ALL" | "STANDARD") {
565                    return Err(AwsError::bad_request(
566                        "ValidationException",
567                        format!(
568                            "PermissionsMode `{permissions_mode}` must be ALLOW_ALL or STANDARD.",
569                        ),
570                    ));
571                }
572                let l = Ledger {
573                    name: name.clone(),
574                    arn: ledger_arn(ctx, &name),
575                    // AWS reports CREATING on the initial CreateLedger
576                    // response; the ledger settles to ACTIVE on the
577                    // next DescribeLedger / ListLedgers call.
578                    state: "CREATING".to_string(),
579                    creation_date_time: now(),
580                    permissions_mode,
581                    deletion_protection: input
582                        .get("DeletionProtection")
583                        .and_then(|v| v.as_bool())
584                        .unwrap_or(true),
585                    kms_key_arn: input
586                        .get("KmsKey")
587                        .and_then(|v| v.as_str())
588                        .map(String::from),
589                    tags,
590                    encryption_status: default_encryption_status(),
591                    inaccessible_kms_key_date_time: None,
592                };
593                let result = ledger_to_value(&l);
594                state.ledgers.insert(name, l);
595                Ok(result)
596            }
597            "DescribeLedger" => {
598                let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
599                // Settle any CREATING ledger to ACTIVE on first describe;
600                // mirrors the lifecycle a real QLDB control plane walks
601                // before the ledger is ready to accept transactions.
602                if let Some(mut l) = state.ledgers.get_mut(name)
603                    && l.state == "CREATING"
604                {
605                    l.state = "ACTIVE".to_string();
606                }
607                let l = state.ledgers.get(name).ok_or_else(|| {
608                    AwsError::not_found(
609                        "ResourceNotFoundException",
610                        format!("Ledger {name} not found"),
611                    )
612                })?;
613                Ok(ledger_to_value(&l))
614            }
615            "ListLedgers" => {
616                // AWS QLDB ListLedgers caps MaxResults at 100 and uses
617                // the ledger name as the NextToken cursor.
618                for mut e in state.ledgers.iter_mut() {
619                    if e.value().state == "CREATING" {
620                        e.value_mut().state = "ACTIVE".to_string();
621                    }
622                }
623                let max_results = clamp_max_results_strict(
624                    input.get("MaxResults").and_then(Value::as_i64),
625                    100,
626                    100,
627                )?;
628                let starting_token = input.get("NextToken").and_then(Value::as_str);
629                let mut summaries: Vec<(String, Value)> = state
630                    .ledgers
631                    .iter()
632                    .map(|e| {
633                        let l = e.value();
634                        (
635                            l.name.clone(),
636                            json!({
637                                "Name": l.name,
638                                "State": l.state,
639                                "CreationDateTime": l.creation_date_time,
640                            }),
641                        )
642                    })
643                    .collect();
644                summaries.sort_by(|a, b| a.0.cmp(&b.0));
645                let page = paginate(summaries, max_results, starting_token, |(k, _)| k.clone())?;
646                let mut body = json!({
647                    "Ledgers": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
648                });
649                if let Some(token) = page.next_token {
650                    body["NextToken"] = json!(token);
651                }
652                Ok(body)
653            }
654            "UpdateLedger" => {
655                let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
656                let mut l = state.ledgers.get_mut(name).ok_or_else(|| {
657                    AwsError::not_found(
658                        "ResourceNotFoundException",
659                        format!("Ledger {name} not found"),
660                    )
661                })?;
662                if let Some(d) = input.get("DeletionProtection").and_then(|v| v.as_bool()) {
663                    l.deletion_protection = d;
664                }
665                if let Some(k) = input.get("KmsKey").and_then(|v| v.as_str()) {
666                    l.kms_key_arn = Some(k.to_string());
667                }
668                Ok(ledger_to_value(&l))
669            }
670            "UpdateLedgerPermissionsMode" => {
671                let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
672                let mode = require_str(&input, "PermissionsMode")?.to_string();
673                if !matches!(mode.as_str(), "ALLOW_ALL" | "STANDARD") {
674                    return Err(AwsError::bad_request(
675                        "ValidationException",
676                        format!("PermissionsMode `{mode}` must be ALLOW_ALL or STANDARD."),
677                    ));
678                }
679                let mut l = state.ledgers.get_mut(name).ok_or_else(|| {
680                    AwsError::not_found(
681                        "ResourceNotFoundException",
682                        format!("Ledger {name} not found"),
683                    )
684                })?;
685                l.permissions_mode = mode;
686                Ok(json!({
687                    "Name": l.name,
688                    "Arn": l.arn,
689                    "PermissionsMode": l.permissions_mode,
690                }))
691            }
692            "ExportJournalToS3" => {
693                let ledger_name = require_str(&input, "name")
694                    .or_else(|_| require_str(&input, "LedgerName"))?
695                    .to_string();
696                if !state.ledgers.contains_key(&ledger_name) {
697                    return Err(AwsError::not_found(
698                        "ResourceNotFoundException",
699                        format!("Ledger {ledger_name} not found"),
700                    ));
701                }
702                let inclusive_start_time = input
703                    .get("InclusiveStartTime")
704                    .and_then(Value::as_f64)
705                    .ok_or_else(|| {
706                        AwsError::bad_request(
707                            "ValidationException",
708                            "InclusiveStartTime is required and must be a number",
709                        )
710                    })?;
711                let exclusive_end_time = input
712                    .get("ExclusiveEndTime")
713                    .and_then(Value::as_f64)
714                    .ok_or_else(|| {
715                        AwsError::bad_request(
716                            "ValidationException",
717                            "ExclusiveEndTime is required and must be a number",
718                        )
719                    })?;
720                if exclusive_end_time <= inclusive_start_time {
721                    return Err(AwsError::bad_request(
722                        "ValidationException",
723                        "ExclusiveEndTime must be strictly after InclusiveStartTime.",
724                    ));
725                }
726                let role_arn = require_str(&input, "RoleArn")?.to_string();
727                let output_format = input
728                    .get("OutputFormat")
729                    .and_then(Value::as_str)
730                    .unwrap_or("ION_BINARY")
731                    .to_string();
732                if !matches!(output_format.as_str(), "ION_BINARY" | "ION_TEXT" | "JSON") {
733                    return Err(AwsError::bad_request(
734                        "ValidationException",
735                        format!(
736                            "OutputFormat `{output_format}` must be ION_BINARY, ION_TEXT, or JSON.",
737                        ),
738                    ));
739                }
740                let s3_cfg = input.get("S3ExportConfiguration").ok_or_else(|| {
741                    AwsError::bad_request(
742                        "ValidationException",
743                        "S3ExportConfiguration is required",
744                    )
745                })?;
746                let bucket = s3_cfg
747                    .get("Bucket")
748                    .and_then(Value::as_str)
749                    .ok_or_else(|| {
750                        AwsError::bad_request(
751                            "ValidationException",
752                            "S3ExportConfiguration.Bucket is required",
753                        )
754                    })?
755                    .to_string();
756                let prefix = s3_cfg
757                    .get("Prefix")
758                    .and_then(Value::as_str)
759                    .unwrap_or("")
760                    .to_string();
761                let encryption = s3_cfg.get("EncryptionConfiguration").ok_or_else(|| {
762                    AwsError::bad_request(
763                        "ValidationException",
764                        "S3ExportConfiguration.EncryptionConfiguration is required",
765                    )
766                })?;
767                let object_encryption_type = encryption
768                    .get("ObjectEncryptionType")
769                    .and_then(Value::as_str)
770                    .ok_or_else(|| {
771                        AwsError::bad_request(
772                            "ValidationException",
773                            "EncryptionConfiguration.ObjectEncryptionType is required",
774                        )
775                    })?
776                    .to_string();
777                if !matches!(
778                    object_encryption_type.as_str(),
779                    "SSE_KMS" | "SSE_S3" | "NO_ENCRYPTION"
780                ) {
781                    return Err(AwsError::bad_request(
782                        "ValidationException",
783                        format!(
784                            "ObjectEncryptionType `{object_encryption_type}` must be SSE_KMS, SSE_S3, or NO_ENCRYPTION."
785                        ),
786                    ));
787                }
788                let kms_key_arn = encryption
789                    .get("KmsKeyArn")
790                    .and_then(Value::as_str)
791                    .map(String::from);
792                if object_encryption_type == "SSE_KMS" && kms_key_arn.is_none() {
793                    return Err(AwsError::bad_request(
794                        "ValidationException",
795                        "EncryptionConfiguration.KmsKeyArn is required when ObjectEncryptionType=SSE_KMS.",
796                    ));
797                }
798                let export_id = uuid::Uuid::new_v4().to_string();
799                let ex = JournalS3Export {
800                    export_id: export_id.clone(),
801                    ledger_name,
802                    role_arn,
803                    inclusive_start_time,
804                    exclusive_end_time,
805                    output_format,
806                    bucket,
807                    prefix,
808                    object_encryption_type,
809                    kms_key_arn,
810                    status: "IN_PROGRESS".to_string(),
811                    creation_time: now(),
812                };
813                state.s3_exports.insert(export_id.clone(), ex);
814                Ok(json!({ "ExportId": export_id }))
815            }
816            "DescribeJournalS3Export" => {
817                let _ledger =
818                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
819                let export_id =
820                    require_str(&input, "exportId").or_else(|_| require_str(&input, "ExportId"))?;
821                let ex = state.s3_exports.get(export_id).ok_or_else(|| {
822                    AwsError::not_found(
823                        "ResourceNotFoundException",
824                        format!("Export {export_id} not found"),
825                    )
826                })?;
827                Ok(json!({ "ExportDescription": export_to_value(&ex) }))
828            }
829            "ListJournalS3Exports" => {
830                let max_results = clamp_max_results_strict(
831                    input.get("MaxResults").and_then(Value::as_i64),
832                    100,
833                    100,
834                )?;
835                let starting_token = input.get("NextToken").and_then(Value::as_str);
836                let mut exports: Vec<(String, Value)> = state
837                    .s3_exports
838                    .iter()
839                    .map(|e| (e.value().export_id.clone(), export_to_value(e.value())))
840                    .collect();
841                exports.sort_by(|a, b| a.0.cmp(&b.0));
842                let page = paginate(exports, max_results, starting_token, |(k, _)| k.clone())?;
843                let mut body = json!({
844                    "JournalS3Exports": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
845                });
846                if let Some(token) = page.next_token {
847                    body["NextToken"] = json!(token);
848                }
849                Ok(body)
850            }
851            "ListJournalS3ExportsForLedger" => {
852                let ledger_name =
853                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
854                if !state.ledgers.contains_key(ledger_name) {
855                    return Err(AwsError::not_found(
856                        "ResourceNotFoundException",
857                        format!("Ledger {ledger_name} not found"),
858                    ));
859                }
860                let max_results = clamp_max_results_strict(
861                    input.get("MaxResults").and_then(Value::as_i64),
862                    100,
863                    100,
864                )?;
865                let starting_token = input.get("NextToken").and_then(Value::as_str);
866                let mut exports: Vec<(String, Value)> = state
867                    .s3_exports
868                    .iter()
869                    .filter(|e| e.value().ledger_name == ledger_name)
870                    .map(|e| (e.value().export_id.clone(), export_to_value(e.value())))
871                    .collect();
872                exports.sort_by(|a, b| a.0.cmp(&b.0));
873                let page = paginate(exports, max_results, starting_token, |(k, _)| k.clone())?;
874                let mut body = json!({
875                    "JournalS3Exports": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
876                });
877                if let Some(token) = page.next_token {
878                    body["NextToken"] = json!(token);
879                }
880                Ok(body)
881            }
882            "CancelJournalS3Export" => {
883                let _ledger =
884                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
885                let export_id =
886                    require_str(&input, "exportId").or_else(|_| require_str(&input, "ExportId"))?;
887                let mut ex = state.s3_exports.get_mut(export_id).ok_or_else(|| {
888                    AwsError::not_found(
889                        "ResourceNotFoundException",
890                        format!("Export {export_id} not found"),
891                    )
892                })?;
893                match ex.status.as_str() {
894                    "CANCELLED" => {}
895                    "COMPLETED" | "FAILED" => {
896                        return Err(AwsError::precondition_failed(
897                            "ResourcePreconditionNotMetException",
898                            format!(
899                                "Export {export_id} is in terminal state `{}` and cannot be canceled.",
900                                ex.status,
901                            ),
902                        ));
903                    }
904                    _ => {
905                        ex.status = "CANCELLED".to_string();
906                    }
907                }
908                Ok(json!({}))
909            }
910            "StreamJournalToKinesis" => {
911                let ledger_name = require_str(&input, "name")
912                    .or_else(|_| require_str(&input, "LedgerName"))?
913                    .to_string();
914                if !state.ledgers.contains_key(&ledger_name) {
915                    return Err(AwsError::not_found(
916                        "ResourceNotFoundException",
917                        format!("Ledger {ledger_name} not found"),
918                    ));
919                }
920                let stream_name = require_str(&input, "StreamName")?.to_string();
921                let role_arn = require_str(&input, "RoleArn")?.to_string();
922                let inclusive_start_time = input
923                    .get("InclusiveStartTime")
924                    .and_then(Value::as_f64)
925                    .ok_or_else(|| {
926                        AwsError::bad_request(
927                            "ValidationException",
928                            "InclusiveStartTime is required and must be a number",
929                        )
930                    })?;
931                let exclusive_end_time = input.get("ExclusiveEndTime").and_then(Value::as_f64);
932                let kinesis = input.get("KinesisConfiguration").ok_or_else(|| {
933                    AwsError::bad_request("ValidationException", "KinesisConfiguration is required")
934                })?;
935                let kinesis_stream_arn = kinesis
936                    .get("StreamArn")
937                    .and_then(Value::as_str)
938                    .ok_or_else(|| {
939                        AwsError::bad_request(
940                            "ValidationException",
941                            "KinesisConfiguration.StreamArn is required",
942                        )
943                    })?
944                    .to_string();
945                if !kinesis_stream_arn.starts_with("arn:") {
946                    return Err(AwsError::bad_request(
947                        "ValidationException",
948                        format!(
949                            "KinesisConfiguration.StreamArn `{kinesis_stream_arn}` is not a valid ARN.",
950                        ),
951                    ));
952                }
953                let aggregation_enabled = kinesis
954                    .get("AggregationEnabled")
955                    .and_then(Value::as_bool)
956                    .unwrap_or(false);
957                let tags: HashMap<String, String> = input
958                    .get("Tags")
959                    .and_then(|v| v.as_object())
960                    .map(|o| {
961                        o.iter()
962                            .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
963                            .collect()
964                    })
965                    .unwrap_or_default();
966                let stream_id = uuid::Uuid::new_v4().to_string();
967                let stream = JournalKinesisStream {
968                    stream_id: stream_id.clone(),
969                    ledger_name: ledger_name.clone(),
970                    stream_name,
971                    role_arn,
972                    kinesis_stream_arn,
973                    inclusive_start_time,
974                    exclusive_end_time,
975                    aggregation_enabled,
976                    creation_time: now(),
977                    status: "ACTIVE".to_string(),
978                    error_cause: None,
979                    tags,
980                };
981                state.kinesis_streams.insert(stream_id.clone(), stream);
982                Ok(json!({ "StreamId": stream_id }))
983            }
984            "ListJournalKinesisStreamsForLedger" => {
985                let ledger_name =
986                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
987                if !state.ledgers.contains_key(ledger_name) {
988                    return Err(AwsError::not_found(
989                        "ResourceNotFoundException",
990                        format!("Ledger {ledger_name} not found"),
991                    ));
992                }
993                let max_results = clamp_max_results_strict(
994                    input.get("MaxResults").and_then(Value::as_i64),
995                    100,
996                    100,
997                )?;
998                let starting_token = input.get("NextToken").and_then(Value::as_str);
999                let mut streams: Vec<(String, Value)> = state
1000                    .kinesis_streams
1001                    .iter()
1002                    .filter(|e| e.value().ledger_name == ledger_name)
1003                    .map(|e| (e.value().stream_id.clone(), stream_to_value(e.value(), ctx)))
1004                    .collect();
1005                streams.sort_by(|a, b| a.0.cmp(&b.0));
1006                let page = paginate(streams, max_results, starting_token, |(k, _)| k.clone())?;
1007                let mut body = json!({
1008                    "Streams": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
1009                });
1010                if let Some(token) = page.next_token {
1011                    body["NextToken"] = json!(token);
1012                }
1013                Ok(body)
1014            }
1015            "CancelJournalKinesisStream" => {
1016                let _ledger =
1017                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
1018                let stream_id =
1019                    require_str(&input, "streamId").or_else(|_| require_str(&input, "StreamId"))?;
1020                let mut s = state.kinesis_streams.get_mut(stream_id).ok_or_else(|| {
1021                    AwsError::not_found(
1022                        "ResourceNotFoundException",
1023                        format!("Stream {stream_id} not found"),
1024                    )
1025                })?;
1026                // AWS rejects cancels against terminal states with
1027                // ResourcePreconditionNotMetException (HTTP 412). The
1028                // CANCELED case is idempotent so callers can retry.
1029                match s.status.as_str() {
1030                    "CANCELED" => {}
1031                    "COMPLETED" | "FAILED" => {
1032                        return Err(AwsError::precondition_failed(
1033                            "ResourcePreconditionNotMetException",
1034                            format!(
1035                                "Stream {stream_id} is in terminal state `{}` and cannot be canceled.",
1036                                s.status,
1037                            ),
1038                        ));
1039                    }
1040                    _ => {
1041                        s.status = "CANCELED".to_string();
1042                    }
1043                }
1044                Ok(json!({ "StreamId": stream_id }))
1045            }
1046            "DescribeJournalKinesisStream" => {
1047                let _ledger =
1048                    require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
1049                let stream_id =
1050                    require_str(&input, "streamId").or_else(|_| require_str(&input, "StreamId"))?;
1051                let s = state.kinesis_streams.get(stream_id).ok_or_else(|| {
1052                    AwsError::not_found(
1053                        "ResourceNotFoundException",
1054                        format!("Stream {stream_id} not found"),
1055                    )
1056                })?;
1057                Ok(json!({ "Stream": stream_to_value(&s, ctx) }))
1058            }
1059            "DeleteLedger" => {
1060                let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
1061                let l = state.ledgers.get(name).ok_or_else(|| {
1062                    AwsError::not_found(
1063                        "ResourceNotFoundException",
1064                        format!("Ledger {name} not found"),
1065                    )
1066                })?;
1067                if l.deletion_protection {
1068                    return Err(AwsError::precondition_failed(
1069                        "ResourcePreconditionNotMetException",
1070                        "Disable DeletionProtection before deleting the ledger",
1071                    ));
1072                }
1073                drop(l);
1074                state.ledgers.remove(name);
1075                Ok(json!({}))
1076            }
1077            "TagResource" => {
1078                let arn = input
1079                    .get("resourceArn")
1080                    .or_else(|| input.get("ResourceArn"))
1081                    .and_then(|v| v.as_str())
1082                    .unwrap_or("");
1083                let new_tags: HashMap<String, String> = input
1084                    .get("Tags")
1085                    .or_else(|| input.get("tags"))
1086                    .and_then(|v| v.as_object())
1087                    .map(|o| {
1088                        o.iter()
1089                            .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1090                            .collect()
1091                    })
1092                    .unwrap_or_default();
1093                apply_resource_tags(&state, arn, new_tags)?;
1094                Ok(json!({}))
1095            }
1096            "UntagResource" => {
1097                let arn = input
1098                    .get("resourceArn")
1099                    .or_else(|| input.get("ResourceArn"))
1100                    .and_then(|v| v.as_str())
1101                    .unwrap_or("");
1102                let keys: Vec<String> = input
1103                    .get("TagKeys")
1104                    .or_else(|| input.get("tagKeys"))
1105                    .and_then(|v| v.as_array())
1106                    .map(|a| {
1107                        a.iter()
1108                            .filter_map(|x| x.as_str().map(String::from))
1109                            .collect()
1110                    })
1111                    .unwrap_or_default();
1112                remove_resource_tags(&state, arn, &keys)?;
1113                Ok(json!({}))
1114            }
1115            "ListTagsForResource" => {
1116                let arn = input
1117                    .get("resourceArn")
1118                    .or_else(|| input.get("ResourceArn"))
1119                    .and_then(|v| v.as_str())
1120                    .unwrap_or("");
1121                let tags = read_resource_tags(&state, arn)?;
1122                Ok(json!({ "Tags": tags }))
1123            }
1124            _ => Err(AwsError::unknown_operation(operation)),
1125        }
1126    }
1127
1128    fn snapshot(&self) -> Option<Vec<u8>> {
1129        let mut all = QldbSnapshot {
1130            ledgers: vec![],
1131            kinesis_streams: vec![],
1132            s3_exports: vec![],
1133            resource_tags: Default::default(),
1134        };
1135        for (_, st) in self.store.iter_all() {
1136            let s = st.to_snapshot();
1137            all.ledgers.extend(s.ledgers);
1138            all.kinesis_streams.extend(s.kinesis_streams);
1139            all.s3_exports.extend(s.s3_exports);
1140            all.resource_tags.extend(s.resource_tags);
1141        }
1142        serde_json::to_vec(&all).ok()
1143    }
1144
1145    fn restore(&self, data: &[u8]) -> Result<(), String> {
1146        let snap: QldbSnapshot = serde_json::from_slice(data).map_err(|e| e.to_string())?;
1147        let st = self.store.get("000000000000", "us-east-1");
1148        st.restore_from_snapshot(snap);
1149        Ok(())
1150    }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156
1157    fn ctx() -> RequestContext {
1158        RequestContext::new("qldb", "us-east-1")
1159    }
1160
1161    fn block_on<F: std::future::Future>(f: F) -> F::Output {
1162        use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1163        fn noop_clone(_: *const ()) -> RawWaker {
1164            noop_raw_waker()
1165        }
1166        fn noop(_: *const ()) {}
1167        fn noop_raw_waker() -> RawWaker {
1168            static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
1169            RawWaker::new(std::ptr::null(), &VTABLE)
1170        }
1171        let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
1172        let mut cx = Context::from_waker(&waker);
1173        let mut fut = std::pin::pin!(f);
1174        loop {
1175            if let Poll::Ready(v) = fut.as_mut().poll(&mut cx) {
1176                return v;
1177            }
1178        }
1179    }
1180
1181    #[test]
1182    fn deletion_protection_blocks_delete() {
1183        let svc = QldbService::new();
1184        let ctx = ctx();
1185        block_on(svc.handle(
1186            "CreateLedger",
1187            json!({ "Name": "audit", "PermissionsMode": "STANDARD" }),
1188            &ctx,
1189        ))
1190        .unwrap();
1191        let err =
1192            block_on(svc.handle("DeleteLedger", json!({ "name": "audit" }), &ctx)).unwrap_err();
1193        assert_eq!(err.code, "ResourcePreconditionNotMetException");
1194        block_on(svc.handle(
1195            "UpdateLedger",
1196            json!({ "name": "audit", "DeletionProtection": false }),
1197            &ctx,
1198        ))
1199        .unwrap();
1200        block_on(svc.handle("DeleteLedger", json!({ "name": "audit" }), &ctx)).unwrap();
1201    }
1202
1203    #[test]
1204    fn create_ledger_enforces_per_region_quota() {
1205        let svc = QldbService::new();
1206        let ctx = RequestContext::new("qldb", "us-east-1");
1207        for i in 0..LEDGER_QUOTA_PER_REGION {
1208            block_on(svc.handle(
1209                "CreateLedger",
1210                json!({
1211                    "Name": format!("led-{i}"),
1212                    "PermissionsMode": "STANDARD",
1213                    "DeletionProtection": false,
1214                }),
1215                &ctx,
1216            ))
1217            .unwrap();
1218        }
1219        let err = block_on(svc.handle(
1220            "CreateLedger",
1221            json!({
1222                "Name": "led-extra",
1223                "PermissionsMode": "STANDARD",
1224                "DeletionProtection": false,
1225            }),
1226            &ctx,
1227        ))
1228        .unwrap_err();
1229        assert_eq!(err.code, "LimitExceededException");
1230    }
1231
1232    #[test]
1233    fn deletion_protection_returns_412() {
1234        let svc = QldbService::new();
1235        let ctx = RequestContext::new("qldb", "us-east-1");
1236        block_on(svc.handle(
1237            "CreateLedger",
1238            json!({
1239                "Name": "p412",
1240                "PermissionsMode": "STANDARD",
1241                "DeletionProtection": true,
1242            }),
1243            &ctx,
1244        ))
1245        .unwrap();
1246        let err =
1247            block_on(svc.handle("DeleteLedger", json!({ "name": "p412" }), &ctx)).unwrap_err();
1248        assert_eq!(err.code, "ResourcePreconditionNotMetException");
1249        assert_eq!(err.status.as_u16(), 412);
1250    }
1251
1252    #[test]
1253    fn update_ledger_accepts_kms_key_and_surfaces_encryption_description() {
1254        let svc = QldbService::new();
1255        let ctx = RequestContext::new("qldb", "us-east-1");
1256        block_on(svc.handle(
1257            "CreateLedger",
1258            json!({ "Name": "kms-led", "PermissionsMode": "STANDARD", "DeletionProtection": false }),
1259            &ctx,
1260        ))
1261        .unwrap();
1262
1263        let kms_key = "arn:aws:kms:us-east-1:123456789012:key/abcdef01-2345-6789-abcd-ef0123456789";
1264        let resp = block_on(svc.handle(
1265            "UpdateLedger",
1266            json!({ "name": "kms-led", "KmsKey": kms_key }),
1267            &ctx,
1268        ))
1269        .unwrap();
1270        assert_eq!(resp["KmsKeyArn"], kms_key);
1271        let enc = &resp["EncryptionDescription"];
1272        assert_eq!(enc["KmsKeyArn"], kms_key);
1273        assert_eq!(enc["EncryptionStatus"], "ENABLED");
1274        assert!(
1275            enc.get("InaccessibleKmsKeyDateTime")
1276                .map(|v| v.is_null())
1277                .unwrap_or(false),
1278            "expected InaccessibleKmsKeyDateTime to be present as null, got {enc:?}",
1279        );
1280    }
1281
1282    #[test]
1283    fn update_ledger_permissions_mode_persists() {
1284        let svc = QldbService::new();
1285        let ctx = RequestContext::new("qldb", "us-east-1");
1286        block_on(svc.handle(
1287            "CreateLedger",
1288            json!({
1289                "Name": "audit-mode",
1290                "PermissionsMode": "ALLOW_ALL",
1291                "DeletionProtection": false,
1292            }),
1293            &ctx,
1294        ))
1295        .unwrap();
1296
1297        let resp = block_on(svc.handle(
1298            "UpdateLedgerPermissionsMode",
1299            json!({ "name": "audit-mode", "PermissionsMode": "STANDARD" }),
1300            &ctx,
1301        ))
1302        .unwrap();
1303        assert_eq!(resp["PermissionsMode"], "STANDARD");
1304
1305        let described =
1306            block_on(svc.handle("DescribeLedger", json!({ "name": "audit-mode" }), &ctx)).unwrap();
1307        assert_eq!(described["PermissionsMode"], "STANDARD");
1308
1309        // Round trip via UpdateLedgerPermissionsMode for the other variant.
1310        let resp = block_on(svc.handle(
1311            "UpdateLedgerPermissionsMode",
1312            json!({ "name": "audit-mode", "PermissionsMode": "ALLOW_ALL" }),
1313            &ctx,
1314        ))
1315        .unwrap();
1316        assert_eq!(resp["PermissionsMode"], "ALLOW_ALL");
1317
1318        // Bad value still rejected.
1319        let err = block_on(svc.handle(
1320            "UpdateLedgerPermissionsMode",
1321            json!({ "name": "audit-mode", "PermissionsMode": "ROOT" }),
1322            &ctx,
1323        ))
1324        .unwrap_err();
1325        assert_eq!(err.code, "ValidationException");
1326    }
1327
1328    #[test]
1329    fn create_ledger_rejects_unknown_permissions_mode() {
1330        let svc = QldbService::new();
1331        let ctx = RequestContext::new("qldb", "us-east-1");
1332        let err = block_on(svc.handle(
1333            "CreateLedger",
1334            json!({ "Name": "x", "PermissionsMode": "WIDE_OPEN" }),
1335            &ctx,
1336        ))
1337        .unwrap_err();
1338        assert_eq!(err.code, "ValidationException");
1339    }
1340
1341    #[test]
1342    fn create_ledger_starts_in_creating_and_settles_to_active() {
1343        let svc = QldbService::new();
1344        let ctx = ctx();
1345        let created = block_on(svc.handle(
1346            "CreateLedger",
1347            json!({
1348                "Name": "lifecycle",
1349                "PermissionsMode": "ALLOW_ALL",
1350                "DeletionProtection": false,
1351            }),
1352            &ctx,
1353        ))
1354        .unwrap();
1355        assert_eq!(created["State"], "CREATING");
1356        let described =
1357            block_on(svc.handle("DescribeLedger", json!({ "name": "lifecycle" }), &ctx)).unwrap();
1358        assert_eq!(described["State"], "ACTIVE");
1359    }
1360
1361    #[test]
1362    fn list_ledgers_paginates_with_max_results_and_next_token() {
1363        let svc = QldbService::new();
1364        let ctx = RequestContext::new("qldb", "us-east-1");
1365        for name in ["alpha", "bravo", "charlie", "delta", "echo"] {
1366            block_on(svc.handle(
1367                "CreateLedger",
1368                json!({
1369                    "Name": name,
1370                    "PermissionsMode": "ALLOW_ALL",
1371                    "DeletionProtection": false,
1372                }),
1373                &ctx,
1374            ))
1375            .unwrap();
1376        }
1377        let first = block_on(svc.handle("ListLedgers", json!({ "MaxResults": 2 }), &ctx)).unwrap();
1378        let names: Vec<String> = first["Ledgers"]
1379            .as_array()
1380            .unwrap()
1381            .iter()
1382            .map(|v| v["Name"].as_str().unwrap().to_string())
1383            .collect();
1384        assert_eq!(names, vec!["alpha", "bravo"]);
1385        let token = first["NextToken"].as_str().unwrap().to_string();
1386        let second = block_on(svc.handle(
1387            "ListLedgers",
1388            json!({ "MaxResults": 2, "NextToken": token }),
1389            &ctx,
1390        ))
1391        .unwrap();
1392        let names: Vec<String> = second["Ledgers"]
1393            .as_array()
1394            .unwrap()
1395            .iter()
1396            .map(|v| v["Name"].as_str().unwrap().to_string())
1397            .collect();
1398        assert_eq!(names, vec!["charlie", "delta"]);
1399    }
1400
1401    #[test]
1402    fn list_ledgers_rejects_max_results_out_of_range() {
1403        let svc = QldbService::new();
1404        let ctx = RequestContext::new("qldb", "us-east-1");
1405        for bad in [0i64, -1, 101, 1000] {
1406            let err = block_on(svc.handle("ListLedgers", json!({ "MaxResults": bad }), &ctx))
1407                .unwrap_err();
1408            assert_eq!(err.code, "ValidationException", "input {bad}");
1409        }
1410    }
1411
1412    #[test]
1413    fn create_ledger_accepts_documented_permissions_modes() {
1414        let svc = QldbService::new();
1415        let ctx = RequestContext::new("qldb", "us-east-1");
1416        for mode in ["ALLOW_ALL", "STANDARD"] {
1417            block_on(svc.handle(
1418                "CreateLedger",
1419                json!({
1420                    "Name": format!("ledger-{mode}"),
1421                    "PermissionsMode": mode,
1422                    "DeletionProtection": false,
1423                }),
1424                &ctx,
1425            ))
1426            .unwrap();
1427        }
1428    }
1429
1430    #[test]
1431    fn export_journal_to_s3_persists_and_describes_export() {
1432        let svc = QldbService::new();
1433        let ctx = ctx();
1434        block_on(svc.handle(
1435            "CreateLedger",
1436            json!({
1437                "Name": "exp-target",
1438                "PermissionsMode": "ALLOW_ALL",
1439                "DeletionProtection": false,
1440            }),
1441            &ctx,
1442        ))
1443        .unwrap();
1444        let resp = block_on(svc.handle(
1445            "ExportJournalToS3",
1446            json!({
1447                "name": "exp-target",
1448                "InclusiveStartTime": 1700000000.0,
1449                "ExclusiveEndTime": 1700003600.0,
1450                "S3ExportConfiguration": {
1451                    "Bucket": "audit-out",
1452                    "Prefix": "exp/",
1453                    "EncryptionConfiguration": {
1454                        "ObjectEncryptionType": "SSE_KMS",
1455                        "KmsKeyArn": "arn:aws:kms:us-east-1:000000000000:key/abc",
1456                    },
1457                },
1458                "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1459                "OutputFormat": "JSON",
1460            }),
1461            &ctx,
1462        ))
1463        .unwrap();
1464        let export_id = resp["ExportId"].as_str().unwrap().to_string();
1465        let desc = block_on(svc.handle(
1466            "DescribeJournalS3Export",
1467            json!({ "name": "exp-target", "exportId": export_id }),
1468            &ctx,
1469        ))
1470        .unwrap();
1471        assert_eq!(desc["ExportDescription"]["Status"], "IN_PROGRESS");
1472        assert_eq!(desc["ExportDescription"]["OutputFormat"], "JSON");
1473        assert_eq!(
1474            desc["ExportDescription"]["S3ExportConfiguration"]["EncryptionConfiguration"]["ObjectEncryptionType"],
1475            "SSE_KMS"
1476        );
1477    }
1478
1479    #[test]
1480    fn export_journal_to_s3_rejects_sse_kms_without_key_arn() {
1481        let svc = QldbService::new();
1482        let ctx = ctx();
1483        block_on(svc.handle(
1484            "CreateLedger",
1485            json!({
1486                "Name": "exp-bad",
1487                "PermissionsMode": "ALLOW_ALL",
1488                "DeletionProtection": false,
1489            }),
1490            &ctx,
1491        ))
1492        .unwrap();
1493        let err = block_on(svc.handle(
1494            "ExportJournalToS3",
1495            json!({
1496                "name": "exp-bad",
1497                "InclusiveStartTime": 1700000000.0,
1498                "ExclusiveEndTime": 1700003600.0,
1499                "S3ExportConfiguration": {
1500                    "Bucket": "audit-out",
1501                    "EncryptionConfiguration": { "ObjectEncryptionType": "SSE_KMS" },
1502                },
1503                "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1504            }),
1505            &ctx,
1506        ))
1507        .unwrap_err();
1508        assert_eq!(err.code, "ValidationException");
1509    }
1510
1511    #[test]
1512    fn cancel_journal_s3_export_marks_cancelled_idempotently() {
1513        let svc = QldbService::new();
1514        let ctx = ctx();
1515        block_on(svc.handle(
1516            "CreateLedger",
1517            json!({
1518                "Name": "exp-cancel",
1519                "PermissionsMode": "ALLOW_ALL",
1520                "DeletionProtection": false,
1521            }),
1522            &ctx,
1523        ))
1524        .unwrap();
1525        let resp = block_on(svc.handle(
1526            "ExportJournalToS3",
1527            json!({
1528                "name": "exp-cancel",
1529                "InclusiveStartTime": 1700000000.0,
1530                "ExclusiveEndTime": 1700003600.0,
1531                "S3ExportConfiguration": {
1532                    "Bucket": "audit-out",
1533                    "EncryptionConfiguration": { "ObjectEncryptionType": "NO_ENCRYPTION" },
1534                },
1535                "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1536            }),
1537            &ctx,
1538        ))
1539        .unwrap();
1540        let export_id = resp["ExportId"].as_str().unwrap().to_string();
1541        block_on(svc.handle(
1542            "CancelJournalS3Export",
1543            json!({ "name": "exp-cancel", "exportId": export_id }),
1544            &ctx,
1545        ))
1546        .unwrap();
1547        // Idempotent
1548        block_on(svc.handle(
1549            "CancelJournalS3Export",
1550            json!({ "name": "exp-cancel", "exportId": export_id }),
1551            &ctx,
1552        ))
1553        .unwrap();
1554        let desc = block_on(svc.handle(
1555            "DescribeJournalS3Export",
1556            json!({ "name": "exp-cancel", "exportId": export_id }),
1557            &ctx,
1558        ))
1559        .unwrap();
1560        assert_eq!(desc["ExportDescription"]["Status"], "CANCELLED");
1561    }
1562
1563    #[test]
1564    fn list_journal_kinesis_streams_for_ledger_filters_by_ledger() {
1565        let svc = QldbService::new();
1566        let ctx = ctx();
1567        for name in ["one", "two"] {
1568            block_on(svc.handle(
1569                "CreateLedger",
1570                json!({
1571                    "Name": name,
1572                    "PermissionsMode": "ALLOW_ALL",
1573                    "DeletionProtection": false,
1574                }),
1575                &ctx,
1576            ))
1577            .unwrap();
1578        }
1579        for ledger in ["one", "one", "two"] {
1580            block_on(svc.handle(
1581                "StreamJournalToKinesis",
1582                json!({
1583                    "name": ledger,
1584                    "StreamName": format!("s-{ledger}"),
1585                    "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1586                    "InclusiveStartTime": 1700000000.0,
1587                    "KinesisConfiguration": {
1588                        "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1589                    },
1590                }),
1591                &ctx,
1592            ))
1593            .unwrap();
1594        }
1595        let resp = block_on(svc.handle(
1596            "ListJournalKinesisStreamsForLedger",
1597            json!({ "name": "one" }),
1598            &ctx,
1599        ))
1600        .unwrap();
1601        let streams = resp["Streams"].as_array().unwrap();
1602        assert_eq!(streams.len(), 2);
1603        assert!(streams.iter().all(|s| s["LedgerName"] == "one"));
1604    }
1605
1606    #[test]
1607    fn cancel_journal_kinesis_stream_marks_canceled_idempotently() {
1608        let svc = QldbService::new();
1609        let ctx = ctx();
1610        block_on(svc.handle(
1611            "CreateLedger",
1612            json!({
1613                "Name": "cancel-target",
1614                "PermissionsMode": "ALLOW_ALL",
1615                "DeletionProtection": false,
1616            }),
1617            &ctx,
1618        ))
1619        .unwrap();
1620        let created = block_on(svc.handle(
1621            "StreamJournalToKinesis",
1622            json!({
1623                "name": "cancel-target",
1624                "StreamName": "cancel-stream",
1625                "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1626                "InclusiveStartTime": 1700000000.0,
1627                "KinesisConfiguration": {
1628                    "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1629                },
1630            }),
1631            &ctx,
1632        ))
1633        .unwrap();
1634        let stream_id = created["StreamId"].as_str().unwrap().to_string();
1635        block_on(svc.handle(
1636            "CancelJournalKinesisStream",
1637            json!({ "name": "cancel-target", "streamId": stream_id }),
1638            &ctx,
1639        ))
1640        .unwrap();
1641        // Idempotent: a second cancel succeeds.
1642        block_on(svc.handle(
1643            "CancelJournalKinesisStream",
1644            json!({ "name": "cancel-target", "streamId": stream_id }),
1645            &ctx,
1646        ))
1647        .unwrap();
1648        let desc = block_on(svc.handle(
1649            "DescribeJournalKinesisStream",
1650            json!({ "name": "cancel-target", "streamId": stream_id }),
1651            &ctx,
1652        ))
1653        .unwrap();
1654        assert_eq!(desc["Stream"]["Status"], "CANCELED");
1655    }
1656
1657    #[test]
1658    fn cancel_journal_kinesis_stream_rejects_terminal_state() {
1659        let svc = QldbService::new();
1660        let ctx = ctx();
1661        block_on(svc.handle(
1662            "CreateLedger",
1663            json!({
1664                "Name": "completed-ledger",
1665                "PermissionsMode": "ALLOW_ALL",
1666                "DeletionProtection": false,
1667            }),
1668            &ctx,
1669        ))
1670        .unwrap();
1671        let created = block_on(svc.handle(
1672            "StreamJournalToKinesis",
1673            json!({
1674                "name": "completed-ledger",
1675                "StreamName": "done",
1676                "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1677                "InclusiveStartTime": 1700000000.0,
1678                "KinesisConfiguration": {
1679                    "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1680                },
1681            }),
1682            &ctx,
1683        ))
1684        .unwrap();
1685        let stream_id = created["StreamId"].as_str().unwrap().to_string();
1686        // Force COMPLETED on the persisted record so the cancel hits a
1687        // terminal state.
1688        {
1689            let st = svc.store.get("000000000000", "us-east-1");
1690            let mut entry = st.kinesis_streams.get_mut(&stream_id).unwrap();
1691            entry.status = "COMPLETED".to_string();
1692        }
1693        let err = block_on(svc.handle(
1694            "CancelJournalKinesisStream",
1695            json!({ "name": "completed-ledger", "streamId": stream_id }),
1696            &ctx,
1697        ))
1698        .unwrap_err();
1699        assert_eq!(err.code, "ResourcePreconditionNotMetException");
1700        assert_eq!(err.status.as_u16(), 412);
1701    }
1702
1703    #[test]
1704    fn stream_journal_to_kinesis_persists_and_describes_stream() {
1705        let svc = QldbService::new();
1706        let ctx = ctx();
1707        block_on(svc.handle(
1708            "CreateLedger",
1709            json!({
1710                "Name": "audit",
1711                "PermissionsMode": "ALLOW_ALL",
1712                "DeletionProtection": false,
1713            }),
1714            &ctx,
1715        ))
1716        .unwrap();
1717        let resp = block_on(svc.handle(
1718            "StreamJournalToKinesis",
1719            json!({
1720                "name": "audit",
1721                "StreamName": "audit-stream",
1722                "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1723                "InclusiveStartTime": 1700000000.0,
1724                "KinesisConfiguration": {
1725                    "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/audit-out",
1726                    "AggregationEnabled": true,
1727                },
1728            }),
1729            &ctx,
1730        ))
1731        .unwrap();
1732        let stream_id = resp["StreamId"].as_str().unwrap().to_string();
1733        let desc = block_on(svc.handle(
1734            "DescribeJournalKinesisStream",
1735            json!({ "name": "audit", "streamId": stream_id }),
1736            &ctx,
1737        ))
1738        .unwrap();
1739        let s = &desc["Stream"];
1740        assert_eq!(s["LedgerName"], "audit");
1741        assert_eq!(s["StreamName"], "audit-stream");
1742        assert_eq!(s["Status"], "ACTIVE");
1743        assert_eq!(s["KinesisConfiguration"]["AggregationEnabled"], true);
1744    }
1745
1746    #[test]
1747    fn stream_journal_to_kinesis_rejects_unknown_ledger() {
1748        let svc = QldbService::new();
1749        let ctx = ctx();
1750        let err = block_on(svc.handle(
1751            "StreamJournalToKinesis",
1752            json!({
1753                "name": "ghost",
1754                "StreamName": "audit-stream",
1755                "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1756                "InclusiveStartTime": 1700000000.0,
1757                "KinesisConfiguration": {
1758                    "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/x",
1759                },
1760            }),
1761            &ctx,
1762        ))
1763        .unwrap_err();
1764        assert_eq!(err.code, "ResourceNotFoundException");
1765    }
1766
1767    #[test]
1768    fn stream_journal_to_kinesis_requires_kinesis_stream_arn() {
1769        let svc = QldbService::new();
1770        let ctx = ctx();
1771        block_on(svc.handle(
1772            "CreateLedger",
1773            json!({
1774                "Name": "audit2",
1775                "PermissionsMode": "ALLOW_ALL",
1776                "DeletionProtection": false,
1777            }),
1778            &ctx,
1779        ))
1780        .unwrap();
1781        let err = block_on(svc.handle(
1782            "StreamJournalToKinesis",
1783            json!({
1784                "name": "audit2",
1785                "StreamName": "audit-stream",
1786                "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1787                "InclusiveStartTime": 1700000000.0,
1788                "KinesisConfiguration": {},
1789            }),
1790            &ctx,
1791        ))
1792        .unwrap_err();
1793        assert_eq!(err.code, "ValidationException");
1794    }
1795
1796    #[test]
1797    fn tag_resource_returns_not_found_for_unknown_ledger() {
1798        let svc = QldbService::new();
1799        let ctx = ctx();
1800        let err = block_on(svc.handle(
1801            "TagResource",
1802            json!({
1803                "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing",
1804                "Tags": { "team": "qldb" },
1805            }),
1806            &ctx,
1807        ))
1808        .unwrap_err();
1809        assert_eq!(err.code, "ResourceNotFoundException");
1810    }
1811
1812    #[test]
1813    fn untag_resource_returns_not_found_for_unknown_ledger() {
1814        let svc = QldbService::new();
1815        let ctx = ctx();
1816        let err = block_on(svc.handle(
1817            "UntagResource",
1818            json!({
1819                "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing",
1820                "TagKeys": ["team"],
1821            }),
1822            &ctx,
1823        ))
1824        .unwrap_err();
1825        assert_eq!(err.code, "ResourceNotFoundException");
1826    }
1827
1828    #[test]
1829    fn list_tags_returns_not_found_for_unknown_ledger() {
1830        let svc = QldbService::new();
1831        let ctx = ctx();
1832        let err = block_on(svc.handle(
1833            "ListTagsForResource",
1834            json!({ "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing" }),
1835            &ctx,
1836        ))
1837        .unwrap_err();
1838        assert_eq!(err.code, "ResourceNotFoundException");
1839    }
1840
1841    #[test]
1842    fn tag_resource_round_trips_stream_tags() {
1843        let svc = QldbService::new();
1844        let ctx = ctx();
1845        block_on(svc.handle(
1846            "CreateLedger",
1847            json!({
1848                "Name": "tagged-stream-ledger",
1849                "PermissionsMode": "ALLOW_ALL",
1850                "DeletionProtection": false,
1851            }),
1852            &ctx,
1853        ))
1854        .unwrap();
1855        let created = block_on(svc.handle(
1856            "StreamJournalToKinesis",
1857            json!({
1858                "name": "tagged-stream-ledger",
1859                "StreamName": "stream-x",
1860                "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1861                "InclusiveStartTime": 1700000000.0,
1862                "KinesisConfiguration": {
1863                    "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/x",
1864                },
1865            }),
1866            &ctx,
1867        ))
1868        .unwrap();
1869        let stream_id = created["StreamId"].as_str().unwrap().to_string();
1870        let stream_arn =
1871            format!("arn:aws:qldb:us-east-1:000000000000:stream/tagged-stream-ledger/{stream_id}",);
1872        block_on(svc.handle(
1873            "TagResource",
1874            json!({ "resourceArn": stream_arn, "Tags": { "team": "qldb" } }),
1875            &ctx,
1876        ))
1877        .unwrap();
1878        let listed = block_on(svc.handle(
1879            "ListTagsForResource",
1880            json!({ "resourceArn": stream_arn }),
1881            &ctx,
1882        ))
1883        .unwrap();
1884        assert_eq!(listed["Tags"]["team"], "qldb");
1885    }
1886
1887    #[test]
1888    fn tag_resource_returns_not_found_for_unknown_stream() {
1889        let svc = QldbService::new();
1890        let ctx = ctx();
1891        let err = block_on(svc.handle(
1892            "TagResource",
1893            json!({
1894                "resourceArn": "arn:aws:qldb:us-east-1:000000000000:stream/missing-ledger/missing-stream",
1895                "Tags": {},
1896            }),
1897            &ctx,
1898        ))
1899        .unwrap_err();
1900        assert_eq!(err.code, "ResourceNotFoundException");
1901    }
1902
1903    #[test]
1904    fn tag_resource_round_trips_tags() {
1905        let svc = QldbService::new();
1906        let ctx = ctx();
1907        block_on(svc.handle(
1908            "CreateLedger",
1909            json!({
1910                "Name": "tagged",
1911                "PermissionsMode": "ALLOW_ALL",
1912                "DeletionProtection": false,
1913            }),
1914            &ctx,
1915        ))
1916        .unwrap();
1917        let arn = "arn:aws:qldb:us-east-1:000000000000:ledger/tagged";
1918        block_on(svc.handle(
1919            "TagResource",
1920            json!({ "resourceArn": arn, "Tags": { "team": "qldb", "env": "test" } }),
1921            &ctx,
1922        ))
1923        .unwrap();
1924        let resp = block_on(svc.handle("ListTagsForResource", json!({ "resourceArn": arn }), &ctx))
1925            .unwrap();
1926        assert_eq!(resp["Tags"]["team"], "qldb");
1927        assert_eq!(resp["Tags"]["env"], "test");
1928        block_on(svc.handle(
1929            "UntagResource",
1930            json!({ "resourceArn": arn, "TagKeys": ["team"] }),
1931            &ctx,
1932        ))
1933        .unwrap();
1934        let resp = block_on(svc.handle("ListTagsForResource", json!({ "resourceArn": arn }), &ctx))
1935            .unwrap();
1936        assert!(resp["Tags"]["team"].is_null());
1937        assert_eq!(resp["Tags"]["env"], "test");
1938    }
1939
1940    #[test]
1941    fn capacity_and_rate_exceeded_map_to_http_429() {
1942        let cap = capacity_exceeded("table read units exhausted");
1943        assert_eq!(cap.status.as_u16(), 429);
1944        assert_eq!(cap.code, "CapacityExceededException");
1945        let rate = rate_exceeded("client retry budget burned");
1946        assert_eq!(rate.status.as_u16(), 429);
1947        assert_eq!(rate.code, "RateExceededException");
1948    }
1949}