1use std::collections::HashMap;
5use std::sync::Arc;
6use std::time::{SystemTime, UNIX_EPOCH};
7
8use async_trait::async_trait;
9use awsim_core::{
10 AccountRegionStore, AwsError, Protocol, RequestContext, RouteDefinition, ServiceHandler,
11 clamp_max_results_strict, paginate,
12};
13use dashmap::DashMap;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use tracing::debug;
17
18#[derive(Debug, Default)]
19pub struct QldbState {
20 pub ledgers: DashMap<String, Ledger>,
21 pub kinesis_streams: DashMap<String, JournalKinesisStream>,
23 pub s3_exports: DashMap<String, JournalS3Export>,
25 pub resource_tags: DashMap<String, HashMap<String, String>>,
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct JournalS3Export {
33 pub export_id: String,
34 pub ledger_name: String,
35 pub role_arn: String,
36 pub inclusive_start_time: f64,
37 pub exclusive_end_time: f64,
38 pub output_format: String,
39 pub bucket: String,
40 pub prefix: String,
41 pub object_encryption_type: String,
42 pub kms_key_arn: Option<String>,
43 pub status: String,
44 pub creation_time: f64,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct JournalKinesisStream {
49 pub stream_id: String,
50 pub ledger_name: String,
51 pub stream_name: String,
52 pub role_arn: String,
53 pub kinesis_stream_arn: String,
54 pub inclusive_start_time: f64,
57 pub exclusive_end_time: Option<f64>,
58 pub aggregation_enabled: bool,
59 pub creation_time: f64,
60 pub status: String,
61 pub error_cause: Option<String>,
62 #[serde(default)]
63 pub tags: HashMap<String, String>,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct Ledger {
68 pub name: String,
69 pub arn: String,
70 pub state: String,
71 pub creation_date_time: f64,
72 pub permissions_mode: String,
73 pub deletion_protection: bool,
74 pub kms_key_arn: Option<String>,
75 pub tags: HashMap<String, String>,
76 #[serde(default = "default_encryption_status")]
81 pub encryption_status: String,
82 #[serde(default)]
86 pub inaccessible_kms_key_date_time: Option<f64>,
87}
88
89fn default_encryption_status() -> String {
90 "ENABLED".to_string()
91}
92
93const LEDGER_QUOTA_PER_REGION: usize = 5;
98
99#[derive(Debug, Serialize, Deserialize)]
100pub struct QldbSnapshot {
101 pub ledgers: Vec<Ledger>,
102 #[serde(default)]
103 pub kinesis_streams: Vec<JournalKinesisStream>,
104 #[serde(default)]
105 pub s3_exports: Vec<JournalS3Export>,
106 #[serde(default)]
107 pub resource_tags: HashMap<String, HashMap<String, String>>,
108}
109
110impl QldbState {
111 pub fn to_snapshot(&self) -> QldbSnapshot {
112 QldbSnapshot {
113 ledgers: self.ledgers.iter().map(|e| e.value().clone()).collect(),
114 kinesis_streams: self
115 .kinesis_streams
116 .iter()
117 .map(|e| e.value().clone())
118 .collect(),
119 s3_exports: self.s3_exports.iter().map(|e| e.value().clone()).collect(),
120 resource_tags: self
121 .resource_tags
122 .iter()
123 .map(|e| (e.key().clone(), e.value().clone()))
124 .collect(),
125 }
126 }
127 pub fn restore_from_snapshot(&self, snap: QldbSnapshot) {
128 self.ledgers.clear();
129 for l in snap.ledgers {
130 self.ledgers.insert(l.name.clone(), l);
131 }
132 self.kinesis_streams.clear();
133 for s in snap.kinesis_streams {
134 self.kinesis_streams.insert(s.stream_id.clone(), s);
135 }
136 self.s3_exports.clear();
137 for ex in snap.s3_exports {
138 self.s3_exports.insert(ex.export_id.clone(), ex);
139 }
140 self.resource_tags.clear();
141 for (arn, tags) in snap.resource_tags {
142 self.resource_tags.insert(arn, tags);
143 }
144 }
145}
146
147fn now() -> f64 {
148 SystemTime::now()
149 .duration_since(UNIX_EPOCH)
150 .unwrap_or_default()
151 .as_secs_f64()
152}
153
154pub fn capacity_exceeded(message: impl Into<String>) -> AwsError {
158 AwsError::too_many_requests("CapacityExceededException", message)
159}
160
161pub fn rate_exceeded(message: impl Into<String>) -> AwsError {
164 AwsError::too_many_requests("RateExceededException", message)
165}
166
167fn require_str<'a>(input: &'a Value, key: &str) -> Result<&'a str, AwsError> {
168 input
169 .get(key)
170 .and_then(|v| v.as_str())
171 .ok_or_else(|| AwsError::bad_request("BadRequestException", format!("{key} is required")))
172}
173
174fn ledger_arn(ctx: &RequestContext, name: &str) -> String {
175 format!(
176 "arn:aws:qldb:{}:{}:ledger/{}",
177 ctx.region, ctx.account_id, name
178 )
179}
180
181fn stream_arn(ctx: &RequestContext, ledger: &str, stream_id: &str) -> String {
182 format!(
183 "arn:aws:qldb:{}:{}:stream/{ledger}/{stream_id}",
184 ctx.region, ctx.account_id,
185 )
186}
187
188fn stream_to_value(s: &JournalKinesisStream, ctx: &RequestContext) -> Value {
189 let exclusive = match s.exclusive_end_time {
190 Some(t) => json!(t),
191 None => Value::Null,
192 };
193 json!({
194 "LedgerName": s.ledger_name,
195 "CreationTime": s.creation_time,
196 "InclusiveStartTime": s.inclusive_start_time,
197 "ExclusiveEndTime": exclusive,
198 "RoleArn": s.role_arn,
199 "StreamId": s.stream_id,
200 "Arn": stream_arn(ctx, &s.ledger_name, &s.stream_id),
201 "Status": s.status,
202 "KinesisConfiguration": {
203 "StreamArn": s.kinesis_stream_arn,
204 "AggregationEnabled": s.aggregation_enabled,
205 },
206 "ErrorCause": s.error_cause,
207 "StreamName": s.stream_name,
208 })
209}
210
211enum ResourceRef {
214 Ledger(String),
215 Stream(String),
216 Export(String),
217}
218
219fn parse_resource_arn(arn: &str) -> Result<ResourceRef, AwsError> {
220 let resource = arn.splitn(6, ':').nth(5).ok_or_else(|| {
221 AwsError::bad_request(
222 "BadRequestException",
223 format!("ResourceArn `{arn}` is malformed."),
224 )
225 })?;
226 let (kind, tail) = resource.split_once('/').ok_or_else(|| {
227 AwsError::bad_request(
228 "BadRequestException",
229 format!("ResourceArn `{arn}` is malformed."),
230 )
231 })?;
232 match kind {
233 "ledger" => Ok(ResourceRef::Ledger(tail.to_string())),
234 "stream" => {
235 let stream_id = tail.rsplit('/').next().unwrap_or(tail);
236 Ok(ResourceRef::Stream(stream_id.to_string()))
237 }
238 "export" => {
239 let export_id = tail.rsplit('/').next().unwrap_or(tail);
240 Ok(ResourceRef::Export(export_id.to_string()))
241 }
242 _ => Err(AwsError::bad_request(
243 "BadRequestException",
244 format!("Resource kind `{kind}` is not a QLDB resource type."),
245 )),
246 }
247}
248
249fn apply_resource_tags(
250 state: &QldbState,
251 arn: &str,
252 new_tags: HashMap<String, String>,
253) -> Result<(), AwsError> {
254 match parse_resource_arn(arn)? {
255 ResourceRef::Ledger(name) => {
256 let mut l = state.ledgers.get_mut(&name).ok_or_else(|| {
257 AwsError::not_found(
258 "ResourceNotFoundException",
259 format!("Ledger {name} not found"),
260 )
261 })?;
262 for (k, v) in new_tags {
263 l.tags.insert(k, v);
264 }
265 }
266 ResourceRef::Stream(id) => {
267 if !state.kinesis_streams.contains_key(&id) {
268 return Err(AwsError::not_found(
269 "ResourceNotFoundException",
270 format!("Stream {id} not found"),
271 ));
272 }
273 let mut entry = state.resource_tags.entry(arn.to_string()).or_default();
274 for (k, v) in new_tags {
275 entry.insert(k, v);
276 }
277 }
278 ResourceRef::Export(id) => {
279 if !state.s3_exports.contains_key(&id) {
280 return Err(AwsError::not_found(
281 "ResourceNotFoundException",
282 format!("Export {id} not found"),
283 ));
284 }
285 let mut entry = state.resource_tags.entry(arn.to_string()).or_default();
286 for (k, v) in new_tags {
287 entry.insert(k, v);
288 }
289 }
290 }
291 Ok(())
292}
293
294fn remove_resource_tags(state: &QldbState, arn: &str, keys: &[String]) -> Result<(), AwsError> {
295 match parse_resource_arn(arn)? {
296 ResourceRef::Ledger(name) => {
297 let mut l = state.ledgers.get_mut(&name).ok_or_else(|| {
298 AwsError::not_found(
299 "ResourceNotFoundException",
300 format!("Ledger {name} not found"),
301 )
302 })?;
303 for k in keys {
304 l.tags.remove(k);
305 }
306 }
307 ResourceRef::Stream(id) => {
308 if !state.kinesis_streams.contains_key(&id) {
309 return Err(AwsError::not_found(
310 "ResourceNotFoundException",
311 format!("Stream {id} not found"),
312 ));
313 }
314 if let Some(mut entry) = state.resource_tags.get_mut(arn) {
315 for k in keys {
316 entry.remove(k);
317 }
318 }
319 }
320 ResourceRef::Export(id) => {
321 if !state.s3_exports.contains_key(&id) {
322 return Err(AwsError::not_found(
323 "ResourceNotFoundException",
324 format!("Export {id} not found"),
325 ));
326 }
327 if let Some(mut entry) = state.resource_tags.get_mut(arn) {
328 for k in keys {
329 entry.remove(k);
330 }
331 }
332 }
333 }
334 Ok(())
335}
336
337fn read_resource_tags(state: &QldbState, arn: &str) -> Result<HashMap<String, String>, AwsError> {
338 match parse_resource_arn(arn)? {
339 ResourceRef::Ledger(name) => {
340 state
341 .ledgers
342 .get(&name)
343 .map(|l| l.tags.clone())
344 .ok_or_else(|| {
345 AwsError::not_found(
346 "ResourceNotFoundException",
347 format!("Ledger {name} not found"),
348 )
349 })
350 }
351 ResourceRef::Stream(id) => {
352 if !state.kinesis_streams.contains_key(&id) {
353 return Err(AwsError::not_found(
354 "ResourceNotFoundException",
355 format!("Stream {id} not found"),
356 ));
357 }
358 Ok(state
359 .resource_tags
360 .get(arn)
361 .map(|e| e.value().clone())
362 .unwrap_or_default())
363 }
364 ResourceRef::Export(id) => {
365 if !state.s3_exports.contains_key(&id) {
366 return Err(AwsError::not_found(
367 "ResourceNotFoundException",
368 format!("Export {id} not found"),
369 ));
370 }
371 Ok(state
372 .resource_tags
373 .get(arn)
374 .map(|e| e.value().clone())
375 .unwrap_or_default())
376 }
377 }
378}
379
380fn export_to_value(ex: &JournalS3Export) -> Value {
381 let mut encryption = json!({
382 "ObjectEncryptionType": ex.object_encryption_type,
383 });
384 if let Some(arn) = &ex.kms_key_arn {
385 encryption["KmsKeyArn"] = json!(arn);
386 }
387 json!({
388 "LedgerName": ex.ledger_name,
389 "ExportId": ex.export_id,
390 "ExportCreationTime": ex.creation_time,
391 "Status": ex.status,
392 "InclusiveStartTime": ex.inclusive_start_time,
393 "ExclusiveEndTime": ex.exclusive_end_time,
394 "S3ExportConfiguration": {
395 "Bucket": ex.bucket,
396 "Prefix": ex.prefix,
397 "EncryptionConfiguration": encryption,
398 },
399 "RoleArn": ex.role_arn,
400 "OutputFormat": ex.output_format,
401 })
402}
403
404fn ledger_to_value(l: &Ledger) -> Value {
405 let inaccessible = match l.inaccessible_kms_key_date_time {
411 Some(t) => json!(t),
412 None => Value::Null,
413 };
414 json!({
415 "Name": l.name,
416 "Arn": l.arn,
417 "State": l.state,
418 "CreationDateTime": l.creation_date_time,
419 "PermissionsMode": l.permissions_mode,
420 "DeletionProtection": l.deletion_protection,
421 "KmsKeyArn": l.kms_key_arn,
422 "EncryptionDescription": {
423 "KmsKeyArn": l.kms_key_arn,
424 "EncryptionStatus": l.encryption_status,
425 "InaccessibleKmsKeyDateTime": inaccessible,
426 },
427 })
428}
429
430pub struct QldbService {
431 store: AccountRegionStore<QldbState>,
432}
433
434impl QldbService {
435 pub fn new() -> Self {
436 Self {
437 store: AccountRegionStore::new(),
438 }
439 }
440
441 pub fn store(&self) -> AccountRegionStore<QldbState> {
442 self.store.clone()
443 }
444
445 fn get_state(&self, ctx: &RequestContext) -> Arc<QldbState> {
446 self.store.get(&ctx.account_id, &ctx.region)
447 }
448}
449
450impl Default for QldbService {
451 fn default() -> Self {
452 Self::new()
453 }
454}
455
456#[async_trait]
457impl ServiceHandler for QldbService {
458 fn service_name(&self) -> &str {
459 "qldb"
460 }
461
462 fn signing_name(&self) -> &str {
463 "qldb"
464 }
465
466 fn protocol(&self) -> Protocol {
467 Protocol::RestJson1
468 }
469
470 fn routes(&self) -> Vec<RouteDefinition> {
471 vec![
472 RouteDefinition {
473 method: "POST",
474 path_pattern: "/ledgers",
475 operation: "CreateLedger",
476 required_query_param: None,
477 },
478 RouteDefinition {
479 method: "GET",
480 path_pattern: "/ledgers",
481 operation: "ListLedgers",
482 required_query_param: None,
483 },
484 RouteDefinition {
485 method: "GET",
486 path_pattern: "/ledgers/{name}",
487 operation: "DescribeLedger",
488 required_query_param: None,
489 },
490 RouteDefinition {
491 method: "PATCH",
492 path_pattern: "/ledgers/{name}",
493 operation: "UpdateLedger",
494 required_query_param: None,
495 },
496 RouteDefinition {
497 method: "DELETE",
498 path_pattern: "/ledgers/{name}",
499 operation: "DeleteLedger",
500 required_query_param: None,
501 },
502 RouteDefinition {
503 method: "PATCH",
504 path_pattern: "/ledgers/{name}/permissions-mode",
505 operation: "UpdateLedgerPermissionsMode",
506 required_query_param: None,
507 },
508 RouteDefinition {
509 method: "POST",
510 path_pattern: "/tags/{resourceArn}",
511 operation: "TagResource",
512 required_query_param: None,
513 },
514 RouteDefinition {
515 method: "DELETE",
516 path_pattern: "/tags/{resourceArn}",
517 operation: "UntagResource",
518 required_query_param: None,
519 },
520 RouteDefinition {
521 method: "GET",
522 path_pattern: "/tags/{resourceArn}",
523 operation: "ListTagsForResource",
524 required_query_param: None,
525 },
526 ]
527 }
528
529 async fn handle(
530 &self,
531 operation: &str,
532 input: Value,
533 ctx: &RequestContext,
534 ) -> Result<Value, AwsError> {
535 debug!(operation, "QLDB request");
536 let state = self.get_state(ctx);
537 match operation {
538 "CreateLedger" => {
539 let name = require_str(&input, "Name")?.to_string();
540 if state.ledgers.contains_key(&name) {
541 return Err(AwsError::conflict(
542 "ResourceAlreadyExistsException",
543 format!("Ledger {name} already exists"),
544 ));
545 }
546 if state.ledgers.len() >= LEDGER_QUOTA_PER_REGION {
547 return Err(AwsError::bad_request(
548 "LimitExceededException",
549 format!(
550 "Account already has {LEDGER_QUOTA_PER_REGION} ledgers in this region.",
551 ),
552 ));
553 }
554 let tags: HashMap<String, String> = input
555 .get("Tags")
556 .and_then(|v| v.as_object())
557 .map(|o| {
558 o.iter()
559 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
560 .collect()
561 })
562 .unwrap_or_default();
563 let permissions_mode = require_str(&input, "PermissionsMode")?.to_string();
564 if !matches!(permissions_mode.as_str(), "ALLOW_ALL" | "STANDARD") {
565 return Err(AwsError::bad_request(
566 "ValidationException",
567 format!(
568 "PermissionsMode `{permissions_mode}` must be ALLOW_ALL or STANDARD.",
569 ),
570 ));
571 }
572 let l = Ledger {
573 name: name.clone(),
574 arn: ledger_arn(ctx, &name),
575 state: "CREATING".to_string(),
579 creation_date_time: now(),
580 permissions_mode,
581 deletion_protection: input
582 .get("DeletionProtection")
583 .and_then(|v| v.as_bool())
584 .unwrap_or(true),
585 kms_key_arn: input
586 .get("KmsKey")
587 .and_then(|v| v.as_str())
588 .map(String::from),
589 tags,
590 encryption_status: default_encryption_status(),
591 inaccessible_kms_key_date_time: None,
592 };
593 let result = ledger_to_value(&l);
594 state.ledgers.insert(name, l);
595 Ok(result)
596 }
597 "DescribeLedger" => {
598 let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
599 if let Some(mut l) = state.ledgers.get_mut(name)
603 && l.state == "CREATING"
604 {
605 l.state = "ACTIVE".to_string();
606 }
607 let l = state.ledgers.get(name).ok_or_else(|| {
608 AwsError::not_found(
609 "ResourceNotFoundException",
610 format!("Ledger {name} not found"),
611 )
612 })?;
613 Ok(ledger_to_value(&l))
614 }
615 "ListLedgers" => {
616 for mut e in state.ledgers.iter_mut() {
619 if e.value().state == "CREATING" {
620 e.value_mut().state = "ACTIVE".to_string();
621 }
622 }
623 let max_results = clamp_max_results_strict(
624 input.get("MaxResults").and_then(Value::as_i64),
625 100,
626 100,
627 )?;
628 let starting_token = input.get("NextToken").and_then(Value::as_str);
629 let mut summaries: Vec<(String, Value)> = state
630 .ledgers
631 .iter()
632 .map(|e| {
633 let l = e.value();
634 (
635 l.name.clone(),
636 json!({
637 "Name": l.name,
638 "State": l.state,
639 "CreationDateTime": l.creation_date_time,
640 }),
641 )
642 })
643 .collect();
644 summaries.sort_by(|a, b| a.0.cmp(&b.0));
645 let page = paginate(summaries, max_results, starting_token, |(k, _)| k.clone())?;
646 let mut body = json!({
647 "Ledgers": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
648 });
649 if let Some(token) = page.next_token {
650 body["NextToken"] = json!(token);
651 }
652 Ok(body)
653 }
654 "UpdateLedger" => {
655 let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
656 let mut l = state.ledgers.get_mut(name).ok_or_else(|| {
657 AwsError::not_found(
658 "ResourceNotFoundException",
659 format!("Ledger {name} not found"),
660 )
661 })?;
662 if let Some(d) = input.get("DeletionProtection").and_then(|v| v.as_bool()) {
663 l.deletion_protection = d;
664 }
665 if let Some(k) = input.get("KmsKey").and_then(|v| v.as_str()) {
666 l.kms_key_arn = Some(k.to_string());
667 }
668 Ok(ledger_to_value(&l))
669 }
670 "UpdateLedgerPermissionsMode" => {
671 let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
672 let mode = require_str(&input, "PermissionsMode")?.to_string();
673 if !matches!(mode.as_str(), "ALLOW_ALL" | "STANDARD") {
674 return Err(AwsError::bad_request(
675 "ValidationException",
676 format!("PermissionsMode `{mode}` must be ALLOW_ALL or STANDARD."),
677 ));
678 }
679 let mut l = state.ledgers.get_mut(name).ok_or_else(|| {
680 AwsError::not_found(
681 "ResourceNotFoundException",
682 format!("Ledger {name} not found"),
683 )
684 })?;
685 l.permissions_mode = mode;
686 Ok(json!({
687 "Name": l.name,
688 "Arn": l.arn,
689 "PermissionsMode": l.permissions_mode,
690 }))
691 }
692 "ExportJournalToS3" => {
693 let ledger_name = require_str(&input, "name")
694 .or_else(|_| require_str(&input, "LedgerName"))?
695 .to_string();
696 if !state.ledgers.contains_key(&ledger_name) {
697 return Err(AwsError::not_found(
698 "ResourceNotFoundException",
699 format!("Ledger {ledger_name} not found"),
700 ));
701 }
702 let inclusive_start_time = input
703 .get("InclusiveStartTime")
704 .and_then(Value::as_f64)
705 .ok_or_else(|| {
706 AwsError::bad_request(
707 "ValidationException",
708 "InclusiveStartTime is required and must be a number",
709 )
710 })?;
711 let exclusive_end_time = input
712 .get("ExclusiveEndTime")
713 .and_then(Value::as_f64)
714 .ok_or_else(|| {
715 AwsError::bad_request(
716 "ValidationException",
717 "ExclusiveEndTime is required and must be a number",
718 )
719 })?;
720 if exclusive_end_time <= inclusive_start_time {
721 return Err(AwsError::bad_request(
722 "ValidationException",
723 "ExclusiveEndTime must be strictly after InclusiveStartTime.",
724 ));
725 }
726 let role_arn = require_str(&input, "RoleArn")?.to_string();
727 let output_format = input
728 .get("OutputFormat")
729 .and_then(Value::as_str)
730 .unwrap_or("ION_BINARY")
731 .to_string();
732 if !matches!(output_format.as_str(), "ION_BINARY" | "ION_TEXT" | "JSON") {
733 return Err(AwsError::bad_request(
734 "ValidationException",
735 format!(
736 "OutputFormat `{output_format}` must be ION_BINARY, ION_TEXT, or JSON.",
737 ),
738 ));
739 }
740 let s3_cfg = input.get("S3ExportConfiguration").ok_or_else(|| {
741 AwsError::bad_request(
742 "ValidationException",
743 "S3ExportConfiguration is required",
744 )
745 })?;
746 let bucket = s3_cfg
747 .get("Bucket")
748 .and_then(Value::as_str)
749 .ok_or_else(|| {
750 AwsError::bad_request(
751 "ValidationException",
752 "S3ExportConfiguration.Bucket is required",
753 )
754 })?
755 .to_string();
756 let prefix = s3_cfg
757 .get("Prefix")
758 .and_then(Value::as_str)
759 .unwrap_or("")
760 .to_string();
761 let encryption = s3_cfg.get("EncryptionConfiguration").ok_or_else(|| {
762 AwsError::bad_request(
763 "ValidationException",
764 "S3ExportConfiguration.EncryptionConfiguration is required",
765 )
766 })?;
767 let object_encryption_type = encryption
768 .get("ObjectEncryptionType")
769 .and_then(Value::as_str)
770 .ok_or_else(|| {
771 AwsError::bad_request(
772 "ValidationException",
773 "EncryptionConfiguration.ObjectEncryptionType is required",
774 )
775 })?
776 .to_string();
777 if !matches!(
778 object_encryption_type.as_str(),
779 "SSE_KMS" | "SSE_S3" | "NO_ENCRYPTION"
780 ) {
781 return Err(AwsError::bad_request(
782 "ValidationException",
783 format!(
784 "ObjectEncryptionType `{object_encryption_type}` must be SSE_KMS, SSE_S3, or NO_ENCRYPTION."
785 ),
786 ));
787 }
788 let kms_key_arn = encryption
789 .get("KmsKeyArn")
790 .and_then(Value::as_str)
791 .map(String::from);
792 if object_encryption_type == "SSE_KMS" && kms_key_arn.is_none() {
793 return Err(AwsError::bad_request(
794 "ValidationException",
795 "EncryptionConfiguration.KmsKeyArn is required when ObjectEncryptionType=SSE_KMS.",
796 ));
797 }
798 let export_id = uuid::Uuid::new_v4().to_string();
799 let ex = JournalS3Export {
800 export_id: export_id.clone(),
801 ledger_name,
802 role_arn,
803 inclusive_start_time,
804 exclusive_end_time,
805 output_format,
806 bucket,
807 prefix,
808 object_encryption_type,
809 kms_key_arn,
810 status: "IN_PROGRESS".to_string(),
811 creation_time: now(),
812 };
813 state.s3_exports.insert(export_id.clone(), ex);
814 Ok(json!({ "ExportId": export_id }))
815 }
816 "DescribeJournalS3Export" => {
817 let _ledger =
818 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
819 let export_id =
820 require_str(&input, "exportId").or_else(|_| require_str(&input, "ExportId"))?;
821 let ex = state.s3_exports.get(export_id).ok_or_else(|| {
822 AwsError::not_found(
823 "ResourceNotFoundException",
824 format!("Export {export_id} not found"),
825 )
826 })?;
827 Ok(json!({ "ExportDescription": export_to_value(&ex) }))
828 }
829 "ListJournalS3Exports" => {
830 let max_results = clamp_max_results_strict(
831 input.get("MaxResults").and_then(Value::as_i64),
832 100,
833 100,
834 )?;
835 let starting_token = input.get("NextToken").and_then(Value::as_str);
836 let mut exports: Vec<(String, Value)> = state
837 .s3_exports
838 .iter()
839 .map(|e| (e.value().export_id.clone(), export_to_value(e.value())))
840 .collect();
841 exports.sort_by(|a, b| a.0.cmp(&b.0));
842 let page = paginate(exports, max_results, starting_token, |(k, _)| k.clone())?;
843 let mut body = json!({
844 "JournalS3Exports": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
845 });
846 if let Some(token) = page.next_token {
847 body["NextToken"] = json!(token);
848 }
849 Ok(body)
850 }
851 "ListJournalS3ExportsForLedger" => {
852 let ledger_name =
853 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
854 if !state.ledgers.contains_key(ledger_name) {
855 return Err(AwsError::not_found(
856 "ResourceNotFoundException",
857 format!("Ledger {ledger_name} not found"),
858 ));
859 }
860 let max_results = clamp_max_results_strict(
861 input.get("MaxResults").and_then(Value::as_i64),
862 100,
863 100,
864 )?;
865 let starting_token = input.get("NextToken").and_then(Value::as_str);
866 let mut exports: Vec<(String, Value)> = state
867 .s3_exports
868 .iter()
869 .filter(|e| e.value().ledger_name == ledger_name)
870 .map(|e| (e.value().export_id.clone(), export_to_value(e.value())))
871 .collect();
872 exports.sort_by(|a, b| a.0.cmp(&b.0));
873 let page = paginate(exports, max_results, starting_token, |(k, _)| k.clone())?;
874 let mut body = json!({
875 "JournalS3Exports": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
876 });
877 if let Some(token) = page.next_token {
878 body["NextToken"] = json!(token);
879 }
880 Ok(body)
881 }
882 "CancelJournalS3Export" => {
883 let _ledger =
884 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
885 let export_id =
886 require_str(&input, "exportId").or_else(|_| require_str(&input, "ExportId"))?;
887 let mut ex = state.s3_exports.get_mut(export_id).ok_or_else(|| {
888 AwsError::not_found(
889 "ResourceNotFoundException",
890 format!("Export {export_id} not found"),
891 )
892 })?;
893 match ex.status.as_str() {
894 "CANCELLED" => {}
895 "COMPLETED" | "FAILED" => {
896 return Err(AwsError::precondition_failed(
897 "ResourcePreconditionNotMetException",
898 format!(
899 "Export {export_id} is in terminal state `{}` and cannot be canceled.",
900 ex.status,
901 ),
902 ));
903 }
904 _ => {
905 ex.status = "CANCELLED".to_string();
906 }
907 }
908 Ok(json!({}))
909 }
910 "StreamJournalToKinesis" => {
911 let ledger_name = require_str(&input, "name")
912 .or_else(|_| require_str(&input, "LedgerName"))?
913 .to_string();
914 if !state.ledgers.contains_key(&ledger_name) {
915 return Err(AwsError::not_found(
916 "ResourceNotFoundException",
917 format!("Ledger {ledger_name} not found"),
918 ));
919 }
920 let stream_name = require_str(&input, "StreamName")?.to_string();
921 let role_arn = require_str(&input, "RoleArn")?.to_string();
922 let inclusive_start_time = input
923 .get("InclusiveStartTime")
924 .and_then(Value::as_f64)
925 .ok_or_else(|| {
926 AwsError::bad_request(
927 "ValidationException",
928 "InclusiveStartTime is required and must be a number",
929 )
930 })?;
931 let exclusive_end_time = input.get("ExclusiveEndTime").and_then(Value::as_f64);
932 let kinesis = input.get("KinesisConfiguration").ok_or_else(|| {
933 AwsError::bad_request("ValidationException", "KinesisConfiguration is required")
934 })?;
935 let kinesis_stream_arn = kinesis
936 .get("StreamArn")
937 .and_then(Value::as_str)
938 .ok_or_else(|| {
939 AwsError::bad_request(
940 "ValidationException",
941 "KinesisConfiguration.StreamArn is required",
942 )
943 })?
944 .to_string();
945 if !kinesis_stream_arn.starts_with("arn:") {
946 return Err(AwsError::bad_request(
947 "ValidationException",
948 format!(
949 "KinesisConfiguration.StreamArn `{kinesis_stream_arn}` is not a valid ARN.",
950 ),
951 ));
952 }
953 let aggregation_enabled = kinesis
954 .get("AggregationEnabled")
955 .and_then(Value::as_bool)
956 .unwrap_or(false);
957 let tags: HashMap<String, String> = input
958 .get("Tags")
959 .and_then(|v| v.as_object())
960 .map(|o| {
961 o.iter()
962 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
963 .collect()
964 })
965 .unwrap_or_default();
966 let stream_id = uuid::Uuid::new_v4().to_string();
967 let stream = JournalKinesisStream {
968 stream_id: stream_id.clone(),
969 ledger_name: ledger_name.clone(),
970 stream_name,
971 role_arn,
972 kinesis_stream_arn,
973 inclusive_start_time,
974 exclusive_end_time,
975 aggregation_enabled,
976 creation_time: now(),
977 status: "ACTIVE".to_string(),
978 error_cause: None,
979 tags,
980 };
981 state.kinesis_streams.insert(stream_id.clone(), stream);
982 Ok(json!({ "StreamId": stream_id }))
983 }
984 "ListJournalKinesisStreamsForLedger" => {
985 let ledger_name =
986 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
987 if !state.ledgers.contains_key(ledger_name) {
988 return Err(AwsError::not_found(
989 "ResourceNotFoundException",
990 format!("Ledger {ledger_name} not found"),
991 ));
992 }
993 let max_results = clamp_max_results_strict(
994 input.get("MaxResults").and_then(Value::as_i64),
995 100,
996 100,
997 )?;
998 let starting_token = input.get("NextToken").and_then(Value::as_str);
999 let mut streams: Vec<(String, Value)> = state
1000 .kinesis_streams
1001 .iter()
1002 .filter(|e| e.value().ledger_name == ledger_name)
1003 .map(|e| (e.value().stream_id.clone(), stream_to_value(e.value(), ctx)))
1004 .collect();
1005 streams.sort_by(|a, b| a.0.cmp(&b.0));
1006 let page = paginate(streams, max_results, starting_token, |(k, _)| k.clone())?;
1007 let mut body = json!({
1008 "Streams": page.items.into_iter().map(|(_, v)| v).collect::<Vec<_>>(),
1009 });
1010 if let Some(token) = page.next_token {
1011 body["NextToken"] = json!(token);
1012 }
1013 Ok(body)
1014 }
1015 "CancelJournalKinesisStream" => {
1016 let _ledger =
1017 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
1018 let stream_id =
1019 require_str(&input, "streamId").or_else(|_| require_str(&input, "StreamId"))?;
1020 let mut s = state.kinesis_streams.get_mut(stream_id).ok_or_else(|| {
1021 AwsError::not_found(
1022 "ResourceNotFoundException",
1023 format!("Stream {stream_id} not found"),
1024 )
1025 })?;
1026 match s.status.as_str() {
1030 "CANCELED" => {}
1031 "COMPLETED" | "FAILED" => {
1032 return Err(AwsError::precondition_failed(
1033 "ResourcePreconditionNotMetException",
1034 format!(
1035 "Stream {stream_id} is in terminal state `{}` and cannot be canceled.",
1036 s.status,
1037 ),
1038 ));
1039 }
1040 _ => {
1041 s.status = "CANCELED".to_string();
1042 }
1043 }
1044 Ok(json!({ "StreamId": stream_id }))
1045 }
1046 "DescribeJournalKinesisStream" => {
1047 let _ledger =
1048 require_str(&input, "name").or_else(|_| require_str(&input, "LedgerName"))?;
1049 let stream_id =
1050 require_str(&input, "streamId").or_else(|_| require_str(&input, "StreamId"))?;
1051 let s = state.kinesis_streams.get(stream_id).ok_or_else(|| {
1052 AwsError::not_found(
1053 "ResourceNotFoundException",
1054 format!("Stream {stream_id} not found"),
1055 )
1056 })?;
1057 Ok(json!({ "Stream": stream_to_value(&s, ctx) }))
1058 }
1059 "DeleteLedger" => {
1060 let name = require_str(&input, "name").or_else(|_| require_str(&input, "Name"))?;
1061 let l = state.ledgers.get(name).ok_or_else(|| {
1062 AwsError::not_found(
1063 "ResourceNotFoundException",
1064 format!("Ledger {name} not found"),
1065 )
1066 })?;
1067 if l.deletion_protection {
1068 return Err(AwsError::precondition_failed(
1069 "ResourcePreconditionNotMetException",
1070 "Disable DeletionProtection before deleting the ledger",
1071 ));
1072 }
1073 drop(l);
1074 state.ledgers.remove(name);
1075 Ok(json!({}))
1076 }
1077 "TagResource" => {
1078 let arn = input
1079 .get("resourceArn")
1080 .or_else(|| input.get("ResourceArn"))
1081 .and_then(|v| v.as_str())
1082 .unwrap_or("");
1083 let new_tags: HashMap<String, String> = input
1084 .get("Tags")
1085 .or_else(|| input.get("tags"))
1086 .and_then(|v| v.as_object())
1087 .map(|o| {
1088 o.iter()
1089 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
1090 .collect()
1091 })
1092 .unwrap_or_default();
1093 apply_resource_tags(&state, arn, new_tags)?;
1094 Ok(json!({}))
1095 }
1096 "UntagResource" => {
1097 let arn = input
1098 .get("resourceArn")
1099 .or_else(|| input.get("ResourceArn"))
1100 .and_then(|v| v.as_str())
1101 .unwrap_or("");
1102 let keys: Vec<String> = input
1103 .get("TagKeys")
1104 .or_else(|| input.get("tagKeys"))
1105 .and_then(|v| v.as_array())
1106 .map(|a| {
1107 a.iter()
1108 .filter_map(|x| x.as_str().map(String::from))
1109 .collect()
1110 })
1111 .unwrap_or_default();
1112 remove_resource_tags(&state, arn, &keys)?;
1113 Ok(json!({}))
1114 }
1115 "ListTagsForResource" => {
1116 let arn = input
1117 .get("resourceArn")
1118 .or_else(|| input.get("ResourceArn"))
1119 .and_then(|v| v.as_str())
1120 .unwrap_or("");
1121 let tags = read_resource_tags(&state, arn)?;
1122 Ok(json!({ "Tags": tags }))
1123 }
1124 _ => Err(AwsError::unknown_operation(operation)),
1125 }
1126 }
1127
1128 fn snapshot(&self) -> Option<Vec<u8>> {
1129 let mut all = QldbSnapshot {
1130 ledgers: vec![],
1131 kinesis_streams: vec![],
1132 s3_exports: vec![],
1133 resource_tags: Default::default(),
1134 };
1135 for (_, st) in self.store.iter_all() {
1136 let s = st.to_snapshot();
1137 all.ledgers.extend(s.ledgers);
1138 all.kinesis_streams.extend(s.kinesis_streams);
1139 all.s3_exports.extend(s.s3_exports);
1140 all.resource_tags.extend(s.resource_tags);
1141 }
1142 serde_json::to_vec(&all).ok()
1143 }
1144
1145 fn restore(&self, data: &[u8]) -> Result<(), String> {
1146 let snap: QldbSnapshot = serde_json::from_slice(data).map_err(|e| e.to_string())?;
1147 let st = self.store.get("000000000000", "us-east-1");
1148 st.restore_from_snapshot(snap);
1149 Ok(())
1150 }
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155 use super::*;
1156
1157 fn ctx() -> RequestContext {
1158 RequestContext::new("qldb", "us-east-1")
1159 }
1160
1161 fn block_on<F: std::future::Future>(f: F) -> F::Output {
1162 use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
1163 fn noop_clone(_: *const ()) -> RawWaker {
1164 noop_raw_waker()
1165 }
1166 fn noop(_: *const ()) {}
1167 fn noop_raw_waker() -> RawWaker {
1168 static VTABLE: RawWakerVTable = RawWakerVTable::new(noop_clone, noop, noop, noop);
1169 RawWaker::new(std::ptr::null(), &VTABLE)
1170 }
1171 let waker = unsafe { Waker::from_raw(noop_raw_waker()) };
1172 let mut cx = Context::from_waker(&waker);
1173 let mut fut = std::pin::pin!(f);
1174 loop {
1175 if let Poll::Ready(v) = fut.as_mut().poll(&mut cx) {
1176 return v;
1177 }
1178 }
1179 }
1180
1181 #[test]
1182 fn deletion_protection_blocks_delete() {
1183 let svc = QldbService::new();
1184 let ctx = ctx();
1185 block_on(svc.handle(
1186 "CreateLedger",
1187 json!({ "Name": "audit", "PermissionsMode": "STANDARD" }),
1188 &ctx,
1189 ))
1190 .unwrap();
1191 let err =
1192 block_on(svc.handle("DeleteLedger", json!({ "name": "audit" }), &ctx)).unwrap_err();
1193 assert_eq!(err.code, "ResourcePreconditionNotMetException");
1194 block_on(svc.handle(
1195 "UpdateLedger",
1196 json!({ "name": "audit", "DeletionProtection": false }),
1197 &ctx,
1198 ))
1199 .unwrap();
1200 block_on(svc.handle("DeleteLedger", json!({ "name": "audit" }), &ctx)).unwrap();
1201 }
1202
1203 #[test]
1204 fn create_ledger_enforces_per_region_quota() {
1205 let svc = QldbService::new();
1206 let ctx = RequestContext::new("qldb", "us-east-1");
1207 for i in 0..LEDGER_QUOTA_PER_REGION {
1208 block_on(svc.handle(
1209 "CreateLedger",
1210 json!({
1211 "Name": format!("led-{i}"),
1212 "PermissionsMode": "STANDARD",
1213 "DeletionProtection": false,
1214 }),
1215 &ctx,
1216 ))
1217 .unwrap();
1218 }
1219 let err = block_on(svc.handle(
1220 "CreateLedger",
1221 json!({
1222 "Name": "led-extra",
1223 "PermissionsMode": "STANDARD",
1224 "DeletionProtection": false,
1225 }),
1226 &ctx,
1227 ))
1228 .unwrap_err();
1229 assert_eq!(err.code, "LimitExceededException");
1230 }
1231
1232 #[test]
1233 fn deletion_protection_returns_412() {
1234 let svc = QldbService::new();
1235 let ctx = RequestContext::new("qldb", "us-east-1");
1236 block_on(svc.handle(
1237 "CreateLedger",
1238 json!({
1239 "Name": "p412",
1240 "PermissionsMode": "STANDARD",
1241 "DeletionProtection": true,
1242 }),
1243 &ctx,
1244 ))
1245 .unwrap();
1246 let err =
1247 block_on(svc.handle("DeleteLedger", json!({ "name": "p412" }), &ctx)).unwrap_err();
1248 assert_eq!(err.code, "ResourcePreconditionNotMetException");
1249 assert_eq!(err.status.as_u16(), 412);
1250 }
1251
1252 #[test]
1253 fn update_ledger_accepts_kms_key_and_surfaces_encryption_description() {
1254 let svc = QldbService::new();
1255 let ctx = RequestContext::new("qldb", "us-east-1");
1256 block_on(svc.handle(
1257 "CreateLedger",
1258 json!({ "Name": "kms-led", "PermissionsMode": "STANDARD", "DeletionProtection": false }),
1259 &ctx,
1260 ))
1261 .unwrap();
1262
1263 let kms_key = "arn:aws:kms:us-east-1:123456789012:key/abcdef01-2345-6789-abcd-ef0123456789";
1264 let resp = block_on(svc.handle(
1265 "UpdateLedger",
1266 json!({ "name": "kms-led", "KmsKey": kms_key }),
1267 &ctx,
1268 ))
1269 .unwrap();
1270 assert_eq!(resp["KmsKeyArn"], kms_key);
1271 let enc = &resp["EncryptionDescription"];
1272 assert_eq!(enc["KmsKeyArn"], kms_key);
1273 assert_eq!(enc["EncryptionStatus"], "ENABLED");
1274 assert!(
1275 enc.get("InaccessibleKmsKeyDateTime")
1276 .map(|v| v.is_null())
1277 .unwrap_or(false),
1278 "expected InaccessibleKmsKeyDateTime to be present as null, got {enc:?}",
1279 );
1280 }
1281
1282 #[test]
1283 fn update_ledger_permissions_mode_persists() {
1284 let svc = QldbService::new();
1285 let ctx = RequestContext::new("qldb", "us-east-1");
1286 block_on(svc.handle(
1287 "CreateLedger",
1288 json!({
1289 "Name": "audit-mode",
1290 "PermissionsMode": "ALLOW_ALL",
1291 "DeletionProtection": false,
1292 }),
1293 &ctx,
1294 ))
1295 .unwrap();
1296
1297 let resp = block_on(svc.handle(
1298 "UpdateLedgerPermissionsMode",
1299 json!({ "name": "audit-mode", "PermissionsMode": "STANDARD" }),
1300 &ctx,
1301 ))
1302 .unwrap();
1303 assert_eq!(resp["PermissionsMode"], "STANDARD");
1304
1305 let described =
1306 block_on(svc.handle("DescribeLedger", json!({ "name": "audit-mode" }), &ctx)).unwrap();
1307 assert_eq!(described["PermissionsMode"], "STANDARD");
1308
1309 let resp = block_on(svc.handle(
1311 "UpdateLedgerPermissionsMode",
1312 json!({ "name": "audit-mode", "PermissionsMode": "ALLOW_ALL" }),
1313 &ctx,
1314 ))
1315 .unwrap();
1316 assert_eq!(resp["PermissionsMode"], "ALLOW_ALL");
1317
1318 let err = block_on(svc.handle(
1320 "UpdateLedgerPermissionsMode",
1321 json!({ "name": "audit-mode", "PermissionsMode": "ROOT" }),
1322 &ctx,
1323 ))
1324 .unwrap_err();
1325 assert_eq!(err.code, "ValidationException");
1326 }
1327
1328 #[test]
1329 fn create_ledger_rejects_unknown_permissions_mode() {
1330 let svc = QldbService::new();
1331 let ctx = RequestContext::new("qldb", "us-east-1");
1332 let err = block_on(svc.handle(
1333 "CreateLedger",
1334 json!({ "Name": "x", "PermissionsMode": "WIDE_OPEN" }),
1335 &ctx,
1336 ))
1337 .unwrap_err();
1338 assert_eq!(err.code, "ValidationException");
1339 }
1340
1341 #[test]
1342 fn create_ledger_starts_in_creating_and_settles_to_active() {
1343 let svc = QldbService::new();
1344 let ctx = ctx();
1345 let created = block_on(svc.handle(
1346 "CreateLedger",
1347 json!({
1348 "Name": "lifecycle",
1349 "PermissionsMode": "ALLOW_ALL",
1350 "DeletionProtection": false,
1351 }),
1352 &ctx,
1353 ))
1354 .unwrap();
1355 assert_eq!(created["State"], "CREATING");
1356 let described =
1357 block_on(svc.handle("DescribeLedger", json!({ "name": "lifecycle" }), &ctx)).unwrap();
1358 assert_eq!(described["State"], "ACTIVE");
1359 }
1360
1361 #[test]
1362 fn list_ledgers_paginates_with_max_results_and_next_token() {
1363 let svc = QldbService::new();
1364 let ctx = RequestContext::new("qldb", "us-east-1");
1365 for name in ["alpha", "bravo", "charlie", "delta", "echo"] {
1366 block_on(svc.handle(
1367 "CreateLedger",
1368 json!({
1369 "Name": name,
1370 "PermissionsMode": "ALLOW_ALL",
1371 "DeletionProtection": false,
1372 }),
1373 &ctx,
1374 ))
1375 .unwrap();
1376 }
1377 let first = block_on(svc.handle("ListLedgers", json!({ "MaxResults": 2 }), &ctx)).unwrap();
1378 let names: Vec<String> = first["Ledgers"]
1379 .as_array()
1380 .unwrap()
1381 .iter()
1382 .map(|v| v["Name"].as_str().unwrap().to_string())
1383 .collect();
1384 assert_eq!(names, vec!["alpha", "bravo"]);
1385 let token = first["NextToken"].as_str().unwrap().to_string();
1386 let second = block_on(svc.handle(
1387 "ListLedgers",
1388 json!({ "MaxResults": 2, "NextToken": token }),
1389 &ctx,
1390 ))
1391 .unwrap();
1392 let names: Vec<String> = second["Ledgers"]
1393 .as_array()
1394 .unwrap()
1395 .iter()
1396 .map(|v| v["Name"].as_str().unwrap().to_string())
1397 .collect();
1398 assert_eq!(names, vec!["charlie", "delta"]);
1399 }
1400
1401 #[test]
1402 fn list_ledgers_rejects_max_results_out_of_range() {
1403 let svc = QldbService::new();
1404 let ctx = RequestContext::new("qldb", "us-east-1");
1405 for bad in [0i64, -1, 101, 1000] {
1406 let err = block_on(svc.handle("ListLedgers", json!({ "MaxResults": bad }), &ctx))
1407 .unwrap_err();
1408 assert_eq!(err.code, "ValidationException", "input {bad}");
1409 }
1410 }
1411
1412 #[test]
1413 fn create_ledger_accepts_documented_permissions_modes() {
1414 let svc = QldbService::new();
1415 let ctx = RequestContext::new("qldb", "us-east-1");
1416 for mode in ["ALLOW_ALL", "STANDARD"] {
1417 block_on(svc.handle(
1418 "CreateLedger",
1419 json!({
1420 "Name": format!("ledger-{mode}"),
1421 "PermissionsMode": mode,
1422 "DeletionProtection": false,
1423 }),
1424 &ctx,
1425 ))
1426 .unwrap();
1427 }
1428 }
1429
1430 #[test]
1431 fn export_journal_to_s3_persists_and_describes_export() {
1432 let svc = QldbService::new();
1433 let ctx = ctx();
1434 block_on(svc.handle(
1435 "CreateLedger",
1436 json!({
1437 "Name": "exp-target",
1438 "PermissionsMode": "ALLOW_ALL",
1439 "DeletionProtection": false,
1440 }),
1441 &ctx,
1442 ))
1443 .unwrap();
1444 let resp = block_on(svc.handle(
1445 "ExportJournalToS3",
1446 json!({
1447 "name": "exp-target",
1448 "InclusiveStartTime": 1700000000.0,
1449 "ExclusiveEndTime": 1700003600.0,
1450 "S3ExportConfiguration": {
1451 "Bucket": "audit-out",
1452 "Prefix": "exp/",
1453 "EncryptionConfiguration": {
1454 "ObjectEncryptionType": "SSE_KMS",
1455 "KmsKeyArn": "arn:aws:kms:us-east-1:000000000000:key/abc",
1456 },
1457 },
1458 "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1459 "OutputFormat": "JSON",
1460 }),
1461 &ctx,
1462 ))
1463 .unwrap();
1464 let export_id = resp["ExportId"].as_str().unwrap().to_string();
1465 let desc = block_on(svc.handle(
1466 "DescribeJournalS3Export",
1467 json!({ "name": "exp-target", "exportId": export_id }),
1468 &ctx,
1469 ))
1470 .unwrap();
1471 assert_eq!(desc["ExportDescription"]["Status"], "IN_PROGRESS");
1472 assert_eq!(desc["ExportDescription"]["OutputFormat"], "JSON");
1473 assert_eq!(
1474 desc["ExportDescription"]["S3ExportConfiguration"]["EncryptionConfiguration"]["ObjectEncryptionType"],
1475 "SSE_KMS"
1476 );
1477 }
1478
1479 #[test]
1480 fn export_journal_to_s3_rejects_sse_kms_without_key_arn() {
1481 let svc = QldbService::new();
1482 let ctx = ctx();
1483 block_on(svc.handle(
1484 "CreateLedger",
1485 json!({
1486 "Name": "exp-bad",
1487 "PermissionsMode": "ALLOW_ALL",
1488 "DeletionProtection": false,
1489 }),
1490 &ctx,
1491 ))
1492 .unwrap();
1493 let err = block_on(svc.handle(
1494 "ExportJournalToS3",
1495 json!({
1496 "name": "exp-bad",
1497 "InclusiveStartTime": 1700000000.0,
1498 "ExclusiveEndTime": 1700003600.0,
1499 "S3ExportConfiguration": {
1500 "Bucket": "audit-out",
1501 "EncryptionConfiguration": { "ObjectEncryptionType": "SSE_KMS" },
1502 },
1503 "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1504 }),
1505 &ctx,
1506 ))
1507 .unwrap_err();
1508 assert_eq!(err.code, "ValidationException");
1509 }
1510
1511 #[test]
1512 fn cancel_journal_s3_export_marks_cancelled_idempotently() {
1513 let svc = QldbService::new();
1514 let ctx = ctx();
1515 block_on(svc.handle(
1516 "CreateLedger",
1517 json!({
1518 "Name": "exp-cancel",
1519 "PermissionsMode": "ALLOW_ALL",
1520 "DeletionProtection": false,
1521 }),
1522 &ctx,
1523 ))
1524 .unwrap();
1525 let resp = block_on(svc.handle(
1526 "ExportJournalToS3",
1527 json!({
1528 "name": "exp-cancel",
1529 "InclusiveStartTime": 1700000000.0,
1530 "ExclusiveEndTime": 1700003600.0,
1531 "S3ExportConfiguration": {
1532 "Bucket": "audit-out",
1533 "EncryptionConfiguration": { "ObjectEncryptionType": "NO_ENCRYPTION" },
1534 },
1535 "RoleArn": "arn:aws:iam::000000000000:role/qldb-export",
1536 }),
1537 &ctx,
1538 ))
1539 .unwrap();
1540 let export_id = resp["ExportId"].as_str().unwrap().to_string();
1541 block_on(svc.handle(
1542 "CancelJournalS3Export",
1543 json!({ "name": "exp-cancel", "exportId": export_id }),
1544 &ctx,
1545 ))
1546 .unwrap();
1547 block_on(svc.handle(
1549 "CancelJournalS3Export",
1550 json!({ "name": "exp-cancel", "exportId": export_id }),
1551 &ctx,
1552 ))
1553 .unwrap();
1554 let desc = block_on(svc.handle(
1555 "DescribeJournalS3Export",
1556 json!({ "name": "exp-cancel", "exportId": export_id }),
1557 &ctx,
1558 ))
1559 .unwrap();
1560 assert_eq!(desc["ExportDescription"]["Status"], "CANCELLED");
1561 }
1562
1563 #[test]
1564 fn list_journal_kinesis_streams_for_ledger_filters_by_ledger() {
1565 let svc = QldbService::new();
1566 let ctx = ctx();
1567 for name in ["one", "two"] {
1568 block_on(svc.handle(
1569 "CreateLedger",
1570 json!({
1571 "Name": name,
1572 "PermissionsMode": "ALLOW_ALL",
1573 "DeletionProtection": false,
1574 }),
1575 &ctx,
1576 ))
1577 .unwrap();
1578 }
1579 for ledger in ["one", "one", "two"] {
1580 block_on(svc.handle(
1581 "StreamJournalToKinesis",
1582 json!({
1583 "name": ledger,
1584 "StreamName": format!("s-{ledger}"),
1585 "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1586 "InclusiveStartTime": 1700000000.0,
1587 "KinesisConfiguration": {
1588 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1589 },
1590 }),
1591 &ctx,
1592 ))
1593 .unwrap();
1594 }
1595 let resp = block_on(svc.handle(
1596 "ListJournalKinesisStreamsForLedger",
1597 json!({ "name": "one" }),
1598 &ctx,
1599 ))
1600 .unwrap();
1601 let streams = resp["Streams"].as_array().unwrap();
1602 assert_eq!(streams.len(), 2);
1603 assert!(streams.iter().all(|s| s["LedgerName"] == "one"));
1604 }
1605
1606 #[test]
1607 fn cancel_journal_kinesis_stream_marks_canceled_idempotently() {
1608 let svc = QldbService::new();
1609 let ctx = ctx();
1610 block_on(svc.handle(
1611 "CreateLedger",
1612 json!({
1613 "Name": "cancel-target",
1614 "PermissionsMode": "ALLOW_ALL",
1615 "DeletionProtection": false,
1616 }),
1617 &ctx,
1618 ))
1619 .unwrap();
1620 let created = block_on(svc.handle(
1621 "StreamJournalToKinesis",
1622 json!({
1623 "name": "cancel-target",
1624 "StreamName": "cancel-stream",
1625 "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1626 "InclusiveStartTime": 1700000000.0,
1627 "KinesisConfiguration": {
1628 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1629 },
1630 }),
1631 &ctx,
1632 ))
1633 .unwrap();
1634 let stream_id = created["StreamId"].as_str().unwrap().to_string();
1635 block_on(svc.handle(
1636 "CancelJournalKinesisStream",
1637 json!({ "name": "cancel-target", "streamId": stream_id }),
1638 &ctx,
1639 ))
1640 .unwrap();
1641 block_on(svc.handle(
1643 "CancelJournalKinesisStream",
1644 json!({ "name": "cancel-target", "streamId": stream_id }),
1645 &ctx,
1646 ))
1647 .unwrap();
1648 let desc = block_on(svc.handle(
1649 "DescribeJournalKinesisStream",
1650 json!({ "name": "cancel-target", "streamId": stream_id }),
1651 &ctx,
1652 ))
1653 .unwrap();
1654 assert_eq!(desc["Stream"]["Status"], "CANCELED");
1655 }
1656
1657 #[test]
1658 fn cancel_journal_kinesis_stream_rejects_terminal_state() {
1659 let svc = QldbService::new();
1660 let ctx = ctx();
1661 block_on(svc.handle(
1662 "CreateLedger",
1663 json!({
1664 "Name": "completed-ledger",
1665 "PermissionsMode": "ALLOW_ALL",
1666 "DeletionProtection": false,
1667 }),
1668 &ctx,
1669 ))
1670 .unwrap();
1671 let created = block_on(svc.handle(
1672 "StreamJournalToKinesis",
1673 json!({
1674 "name": "completed-ledger",
1675 "StreamName": "done",
1676 "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1677 "InclusiveStartTime": 1700000000.0,
1678 "KinesisConfiguration": {
1679 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/k",
1680 },
1681 }),
1682 &ctx,
1683 ))
1684 .unwrap();
1685 let stream_id = created["StreamId"].as_str().unwrap().to_string();
1686 {
1689 let st = svc.store.get("000000000000", "us-east-1");
1690 let mut entry = st.kinesis_streams.get_mut(&stream_id).unwrap();
1691 entry.status = "COMPLETED".to_string();
1692 }
1693 let err = block_on(svc.handle(
1694 "CancelJournalKinesisStream",
1695 json!({ "name": "completed-ledger", "streamId": stream_id }),
1696 &ctx,
1697 ))
1698 .unwrap_err();
1699 assert_eq!(err.code, "ResourcePreconditionNotMetException");
1700 assert_eq!(err.status.as_u16(), 412);
1701 }
1702
1703 #[test]
1704 fn stream_journal_to_kinesis_persists_and_describes_stream() {
1705 let svc = QldbService::new();
1706 let ctx = ctx();
1707 block_on(svc.handle(
1708 "CreateLedger",
1709 json!({
1710 "Name": "audit",
1711 "PermissionsMode": "ALLOW_ALL",
1712 "DeletionProtection": false,
1713 }),
1714 &ctx,
1715 ))
1716 .unwrap();
1717 let resp = block_on(svc.handle(
1718 "StreamJournalToKinesis",
1719 json!({
1720 "name": "audit",
1721 "StreamName": "audit-stream",
1722 "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1723 "InclusiveStartTime": 1700000000.0,
1724 "KinesisConfiguration": {
1725 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/audit-out",
1726 "AggregationEnabled": true,
1727 },
1728 }),
1729 &ctx,
1730 ))
1731 .unwrap();
1732 let stream_id = resp["StreamId"].as_str().unwrap().to_string();
1733 let desc = block_on(svc.handle(
1734 "DescribeJournalKinesisStream",
1735 json!({ "name": "audit", "streamId": stream_id }),
1736 &ctx,
1737 ))
1738 .unwrap();
1739 let s = &desc["Stream"];
1740 assert_eq!(s["LedgerName"], "audit");
1741 assert_eq!(s["StreamName"], "audit-stream");
1742 assert_eq!(s["Status"], "ACTIVE");
1743 assert_eq!(s["KinesisConfiguration"]["AggregationEnabled"], true);
1744 }
1745
1746 #[test]
1747 fn stream_journal_to_kinesis_rejects_unknown_ledger() {
1748 let svc = QldbService::new();
1749 let ctx = ctx();
1750 let err = block_on(svc.handle(
1751 "StreamJournalToKinesis",
1752 json!({
1753 "name": "ghost",
1754 "StreamName": "audit-stream",
1755 "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1756 "InclusiveStartTime": 1700000000.0,
1757 "KinesisConfiguration": {
1758 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/x",
1759 },
1760 }),
1761 &ctx,
1762 ))
1763 .unwrap_err();
1764 assert_eq!(err.code, "ResourceNotFoundException");
1765 }
1766
1767 #[test]
1768 fn stream_journal_to_kinesis_requires_kinesis_stream_arn() {
1769 let svc = QldbService::new();
1770 let ctx = ctx();
1771 block_on(svc.handle(
1772 "CreateLedger",
1773 json!({
1774 "Name": "audit2",
1775 "PermissionsMode": "ALLOW_ALL",
1776 "DeletionProtection": false,
1777 }),
1778 &ctx,
1779 ))
1780 .unwrap();
1781 let err = block_on(svc.handle(
1782 "StreamJournalToKinesis",
1783 json!({
1784 "name": "audit2",
1785 "StreamName": "audit-stream",
1786 "RoleArn": "arn:aws:iam::000000000000:role/qldb-stream",
1787 "InclusiveStartTime": 1700000000.0,
1788 "KinesisConfiguration": {},
1789 }),
1790 &ctx,
1791 ))
1792 .unwrap_err();
1793 assert_eq!(err.code, "ValidationException");
1794 }
1795
1796 #[test]
1797 fn tag_resource_returns_not_found_for_unknown_ledger() {
1798 let svc = QldbService::new();
1799 let ctx = ctx();
1800 let err = block_on(svc.handle(
1801 "TagResource",
1802 json!({
1803 "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing",
1804 "Tags": { "team": "qldb" },
1805 }),
1806 &ctx,
1807 ))
1808 .unwrap_err();
1809 assert_eq!(err.code, "ResourceNotFoundException");
1810 }
1811
1812 #[test]
1813 fn untag_resource_returns_not_found_for_unknown_ledger() {
1814 let svc = QldbService::new();
1815 let ctx = ctx();
1816 let err = block_on(svc.handle(
1817 "UntagResource",
1818 json!({
1819 "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing",
1820 "TagKeys": ["team"],
1821 }),
1822 &ctx,
1823 ))
1824 .unwrap_err();
1825 assert_eq!(err.code, "ResourceNotFoundException");
1826 }
1827
1828 #[test]
1829 fn list_tags_returns_not_found_for_unknown_ledger() {
1830 let svc = QldbService::new();
1831 let ctx = ctx();
1832 let err = block_on(svc.handle(
1833 "ListTagsForResource",
1834 json!({ "resourceArn": "arn:aws:qldb:us-east-1:000000000000:ledger/missing" }),
1835 &ctx,
1836 ))
1837 .unwrap_err();
1838 assert_eq!(err.code, "ResourceNotFoundException");
1839 }
1840
1841 #[test]
1842 fn tag_resource_round_trips_stream_tags() {
1843 let svc = QldbService::new();
1844 let ctx = ctx();
1845 block_on(svc.handle(
1846 "CreateLedger",
1847 json!({
1848 "Name": "tagged-stream-ledger",
1849 "PermissionsMode": "ALLOW_ALL",
1850 "DeletionProtection": false,
1851 }),
1852 &ctx,
1853 ))
1854 .unwrap();
1855 let created = block_on(svc.handle(
1856 "StreamJournalToKinesis",
1857 json!({
1858 "name": "tagged-stream-ledger",
1859 "StreamName": "stream-x",
1860 "RoleArn": "arn:aws:iam::000000000000:role/qldb",
1861 "InclusiveStartTime": 1700000000.0,
1862 "KinesisConfiguration": {
1863 "StreamArn": "arn:aws:kinesis:us-east-1:000000000000:stream/x",
1864 },
1865 }),
1866 &ctx,
1867 ))
1868 .unwrap();
1869 let stream_id = created["StreamId"].as_str().unwrap().to_string();
1870 let stream_arn =
1871 format!("arn:aws:qldb:us-east-1:000000000000:stream/tagged-stream-ledger/{stream_id}",);
1872 block_on(svc.handle(
1873 "TagResource",
1874 json!({ "resourceArn": stream_arn, "Tags": { "team": "qldb" } }),
1875 &ctx,
1876 ))
1877 .unwrap();
1878 let listed = block_on(svc.handle(
1879 "ListTagsForResource",
1880 json!({ "resourceArn": stream_arn }),
1881 &ctx,
1882 ))
1883 .unwrap();
1884 assert_eq!(listed["Tags"]["team"], "qldb");
1885 }
1886
1887 #[test]
1888 fn tag_resource_returns_not_found_for_unknown_stream() {
1889 let svc = QldbService::new();
1890 let ctx = ctx();
1891 let err = block_on(svc.handle(
1892 "TagResource",
1893 json!({
1894 "resourceArn": "arn:aws:qldb:us-east-1:000000000000:stream/missing-ledger/missing-stream",
1895 "Tags": {},
1896 }),
1897 &ctx,
1898 ))
1899 .unwrap_err();
1900 assert_eq!(err.code, "ResourceNotFoundException");
1901 }
1902
1903 #[test]
1904 fn tag_resource_round_trips_tags() {
1905 let svc = QldbService::new();
1906 let ctx = ctx();
1907 block_on(svc.handle(
1908 "CreateLedger",
1909 json!({
1910 "Name": "tagged",
1911 "PermissionsMode": "ALLOW_ALL",
1912 "DeletionProtection": false,
1913 }),
1914 &ctx,
1915 ))
1916 .unwrap();
1917 let arn = "arn:aws:qldb:us-east-1:000000000000:ledger/tagged";
1918 block_on(svc.handle(
1919 "TagResource",
1920 json!({ "resourceArn": arn, "Tags": { "team": "qldb", "env": "test" } }),
1921 &ctx,
1922 ))
1923 .unwrap();
1924 let resp = block_on(svc.handle("ListTagsForResource", json!({ "resourceArn": arn }), &ctx))
1925 .unwrap();
1926 assert_eq!(resp["Tags"]["team"], "qldb");
1927 assert_eq!(resp["Tags"]["env"], "test");
1928 block_on(svc.handle(
1929 "UntagResource",
1930 json!({ "resourceArn": arn, "TagKeys": ["team"] }),
1931 &ctx,
1932 ))
1933 .unwrap();
1934 let resp = block_on(svc.handle("ListTagsForResource", json!({ "resourceArn": arn }), &ctx))
1935 .unwrap();
1936 assert!(resp["Tags"]["team"].is_null());
1937 assert_eq!(resp["Tags"]["env"], "test");
1938 }
1939
1940 #[test]
1941 fn capacity_and_rate_exceeded_map_to_http_429() {
1942 let cap = capacity_exceeded("table read units exhausted");
1943 assert_eq!(cap.status.as_u16(), 429);
1944 assert_eq!(cap.code, "CapacityExceededException");
1945 let rate = rate_exceeded("client retry budget burned");
1946 assert_eq!(rate.status.as_u16(), 429);
1947 assert_eq!(rate.code, "RateExceededException");
1948 }
1949}